{ "info": { "author": "Jacob Ferriero", "author_email": "jferriero@google.com", "bugtrack_url": null, "classifiers": [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Topic :: Internet" ], "description": "# BigQuery Pipeline\n\nUtility class for building data pipelines in BigQuery.\n\nProvides methods for query, copy table, delete table and export to GCS.\n\nSupports Jinja2 templated SQL.\n\n# Getting Started\n[Check out the codelab!](https://codinginadtech.com/ox-bqpipeline/codelab/index.html#0)\n\n## Usage\n\nCreate an instance of BQPipeline. By setting `query_project`, `default_project` and `default_dataset`, you can omit project and dataset from table references in your SQL statements.\n\n`default_project` is the project used when a tablespec does not specify a project.\n\n`default_dataset` is the dataset used when a tablespec does not specify project or dataset.\n\nPlace files containing a single BigQuery Standard SQL statement per file.\n\nNote that if you reference a project with a '-' in the name, you must backtick the tablespec in your SQL: `` `my-project.dataset_id.table_id` ``\n\n### Writing scripts to be easily portable between environments\n- Use `{{ project }}` in all your sql queries\n- In your replacements dictionary, set `'project'` to the value of `BQPipeline.infer_project()`\nthis will infer the project from the credentials. This means in your local shell it will use\n`GOOGLE_APPLICATION_DEFAULT` and on the cron box it will use project of the CronBox's Service Account.\n\n```\n#!/usr/bin/env python\n# -*- coding: utf-8 -*-\nfrom ox_bqpipeline.bqpipeline import BQPipeline\n\nbq = BQPipeline(job_name='myjob',\n default_dataset='mydataset',\n json_credentials_path='credentials.json')\n\nreplacements = {\n 'project': bq.infer_project(),\n 'dataset': 'mydataset'\n}\n\nbq.copy_table('source_table', 'dest_table')\n\nbq.run_queries(['../sql/q1.sql', ('../sql/q2.sql', 'tmp_table_1')], **replacements)\n\nbq.export_csv_to_gcs('tmp_table_2', 'gs://my-bucket/path/to/tmp_table_2-*.csv')\n\nbq.delete_tables(['tmp_table_1', 'tmp_table_2'])\n```\n\nNote, that the `run_queries` method provided this utility can alternatively take a list of tuples where the first entry is the sql path, and the second is a destination table. You can see an example of this in [`example_pipeline.py`](/example_pipeline.py).\n\nFor detailed documentation about the methods provided by this utility class see [docs.md](docs.md).\n\n### Creating Service Account JSON Credentials\n\n1. Visit the [Service Account Console](https://console.cloud.google.com/iam-admin/serviceaccounts)\n2. Select a service account\n3. Select \"Create Key\"\n4. Select \"JSON\"\n5. Click \"Create\" to download the file\n\n\n## Installation\n\n### Optional: Install in virtualenv\n\n```\npython3 -m virtualenv venv\nsource venv/bin/activate\n```\n\n### Install with pip\n\n```bash\npipenv install --python 3\n```\n\n### Install with pipenv\n\n```bash\npython3 -m pip install -r requirements.txt\n```\n\nor\n\n```bash\npip3 install -r requirements.txt\n```\n\n### Run test suite\n\n```bash\npython3 -m unittest discover\n```\n\n### Run test suite using pipenv\n\n```bash\npipenv run python -m unittest discover\n```\n\n\n## Requirements\n\nYou'll need to [download Python 3.4 or later](https://www.python.org/downloads/)\n\n[Google Cloud Python Client](https://github.com/googleapis/google-cloud-python)\n\n\n## Disclaimer\n\nThis is not an official Google project.\n\n\n## References\n\n[Python Example Code](https://github.com/GoogleCloudPlatform/python-docs-samples)\n[google-cloud-bigquery](https://pypi.org/project/google-cloud-bigquery/)\n[Jinja2](http://jinja.pocoo.org/docs/2.10/)\n# ox_bqpipeline\n\n# ox_bqpipeline.bqpipeline\n\n## get_logger\n```python\nget_logger(name, fmt='%(asctime)-15s %(levelname)s %(message)s')\n```\n\nCreates a Logger that logs to stdout\n\n:param name: name of the logger\n:param fmt: format string for log messages\n:return: Logger\n\n## read_sql\n```python\nread_sql(path)\n```\n\nReads UTF-8 encoded SQL from a file\n:param path: path to SQL file\n:return: str contents of file\n\n## tableref\n```python\ntableref(project, dataset_id, table_id)\n```\n\nCreates a TableReference from project, dataset and table\n:param project: project id containing the dataset\n:param dataset_id: dataset id\n:param table_id: table_id\n:return: bigquery.table.TableReference\n\n## to_tableref\n```python\nto_tableref(tablespec_str)\n```\n\nCreates a TableReference from TableSpec\n:param tablespec_str: BigQuery TableSpec in format 'project.dataset_id.table_id'\n:return: bigquery.table.TableReference\n\n## create_copy_job_config\n```python\ncreate_copy_job_config(overwrite=True)\n```\n\nCreates CopyJobConfig\n:param overwrite: if set to False, target table must not exist\n:return: bigquery.job.CopyJobConfig\n\n## exception_logger\n```python\nexception_logger(func)\n```\n\nA decorator that wraps the passed in function and logs\nexceptions should one occur\n\n## gcs_export_job_poller\n```python\ngcs_export_job_poller(func)\n```\n\nA decorator to wait on export job\n\n## BQPipeline\n```python\nBQPipeline(self, job_name, query_project=None, location='US', default_project=None, default_dataset=None, json_credentials_path=None)\n```\n\nBigQuery Python SDK Client Wrapper\nProvides methods for running queries, copying and deleting tables.\nSupports Jinja2 templated SQL and enables default project and dataset to\nbe set for an entire pipeline.\n\n### get_client\n```python\nBQPipeline.get_client(self)\n```\n\nInitializes bigquery.Client\n:return bigquery.Client\n\n### infer_project\n```python\nBQPipeline.infer_project(self)\n```\n\nInfers project based on client's credentials.\n\n### resolve_table_spec\n```python\nBQPipeline.resolve_table_spec(self, dest)\n```\n\nResolves a full TableSpec from a partial TableSpec by adding default\nproject and dataset.\n:param dest: TableSpec string or partial TableSpec string\n:return str TableSpec\n\n### resolve_dataset_spec\n```python\nBQPipeline.resolve_dataset_spec(self, dataset)\n```\n\nResolves a full DatasetSpec from a partial DatasetSpec by adding default\nproject.\n:param dest: DatasetSpec string or partial DatasetSpec string\n:return str DatasetSpec\n\n### create_dataset\n```python\nBQPipeline.create_dataset(self, dataset, exists_ok=False)\n```\n\nCreates a BigQuery Dataset from a full or partial dataset spec.\n:param dataset: DatasetSpec string or partial DatasetSpec string\n\n### create_job_config\n```python\nBQPipeline.create_job_config(self, batch=False, dest=None, create=True, overwrite=True, append=False)\n```\n\nCreates a QueryJobConfig\n:param batch: use QueryPriority.BATCH if true\n:param dest: tablespec of destination table, or a GCS wildcard to\n write query results to.\n:param create: if False, destination table must already exist\n:param overwrite: if False, destination table must not exist\n:param append: if True, destination table will be appended to\n:return: bigquery.QueryJobConfig\n\n### run_query\n```python\nBQPipeline.run_query(self, path, batch=False, wait=True, create=True, overwrite=True, timeout=None, gcs_export_format='CSV', **kwargs)\n```\n\nExecutes a SQL query from a Jinja2 template file\n:param path: path to sql file or tuple of (path to sql file, destination tablespec)\n:param batch: run query with batch priority\n:param wait: wait for job to complete before returning\n:param create: if False, destination table must already exist\n:param overwrite: if False, destination table must not exist\n:param timeout: time in seconds to wait for job to complete\n:param gcs_export_format: CSV, AVRO, or JSON.\n:param kwargs: replacements for Jinja2 template\n:return: bigquery.job.QueryJob\n\n### run_queries\n```python\nBQPipeline.run_queries(self, query_paths, batch=True, wait=True, create=True, overwrite=True, timeout=1200, **kwargs)\n```\n\n:param query_paths: List[Union[str,Tuple[str,str]]] path to sql file or\n tuple of (path, destination tablespec)\n:param batch: run query with batch priority\n:param wait: wait for job to complete before returning\n:param create: if False, destination table must already exist\n:param overwrite: if False, destination table must not exist\n:param timeout: time in seconds to wait for job to complete\n:param kwargs: replacements for Jinja2 template\n\n### copy_table\n```python\nBQPipeline.copy_table(self, src, dest, wait=True, overwrite=True, timeout=None)\n```\n\n:param src: tablespec 'project.dataset.table'\n:param dest: tablespec 'project.dataset.table'\n:param wait: block until job completes\n:param overwrite: overwrite destination table\n:param timeout: time in seconds to wait for operation to complete\n:return: bigquery.job.CopyJob\n\n### delete_table\n```python\nBQPipeline.delete_table(self, table)\n```\n\nDeletes a table\n:param table: table spec `project.dataset.table`\n\n### delete_tables\n```python\nBQPipeline.delete_tables(self, tables)\n```\n\nDeletes multiple tables\n:param tables: List[str] of table spec `project.dataset.table`\n\n### export_csv_to_gcs\n```python\nBQPipeline.export_csv_to_gcs(self, table, gcs_path, delimiter=',', header=True, wait=True, timeout=None)\n```\n\nExport a table to GCS as CSV.\n:param table: str of table spec `project.dataset.table`\n:param gcs_path: str of destination GCS path\n:param delimiter: str field delimiter for output data.\n:param header: boolean indicates the output CSV file print the header.\n\n### export_json_to_gcs\n```python\nBQPipeline.export_json_to_gcs(self, table, gcs_path, wait=True, timeout=None)\n```\n\nExport a table to GCS as a Newline Delimited JSON file.\n:param table: str of table spec `project.dataset.table`\n:param gcs_path: str of destination GCS path\n\n### export_avro_to_gcs\n```python\nBQPipeline.export_avro_to_gcs(self, table, gcs_path, compression='snappy', wait=True, timeout=None)\n```\n\nExport a table to GCS as a Newline Delimited JSON file.\n:param table: str of table spec `project.dataset.table`\n:param gcs_path: str of destination GCS path\n\n## main\n```python\nmain()\n```\n\nHandles CLI invocations of bqpipelines.", "description_content_type": "text/markdown", "docs_url": null, "download_url": "https://github.com/openx/ox-bqpipeline/archive/v0.0.3.zip", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/openx/ox-bqpipeline", "keywords": "", "license": "Apache 2.0", "maintainer": "", "maintainer_email": "", "name": "ox-bqpipeline", "package_url": "https://pypi.org/project/ox-bqpipeline/", "platform": "Posix; MacOS X; Windows", "project_url": "https://pypi.org/project/ox-bqpipeline/", "project_urls": { "Download": "https://github.com/openx/ox-bqpipeline/archive/v0.0.3.zip", "Homepage": "https://github.com/openx/ox-bqpipeline" }, "release_url": "https://pypi.org/project/ox-bqpipeline/0.0.3/", "requires_dist": null, "requires_python": ">=3.4", "summary": "Utility class for building data pipelines in BigQuery", "version": "0.0.3" }, "last_serial": 5483512, "releases": { "0.0.1": [ { "comment_text": "", "digests": { "md5": "4a82b2d5c916a6fae6e6f5a68fadd421", "sha256": "09db3aab5f921982f2b1a330ff373a6be353ac5e05f73ed6c056baf0eaa7fe86" }, "downloads": -1, "filename": "ox_bqpipeline-0.0.1.tar.gz", "has_sig": false, "md5_digest": "4a82b2d5c916a6fae6e6f5a68fadd421", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 10173, "upload_time": "2019-06-14T02:19:00", "url": "https://files.pythonhosted.org/packages/77/eb/6b0ab400e3e27f9bae066ea381156c76b795690c04f4bcee831f56138196/ox_bqpipeline-0.0.1.tar.gz" } ], "0.0.2": [ { "comment_text": "", "digests": { "md5": "2e8487d619a29a96dd1206ae4ce622d6", "sha256": "82afeaa8ee739e9e22cd26d54996068c97858c0fb9fc45054b681cb2822e808c" }, "downloads": -1, "filename": "ox_bqpipeline-0.0.2.tar.gz", "has_sig": false, "md5_digest": "2e8487d619a29a96dd1206ae4ce622d6", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 10152, "upload_time": "2019-06-14T03:04:41", "url": "https://files.pythonhosted.org/packages/5d/6f/ff05daa338acfd8ef1484b992720a22b167e2693462a39f5dd61001ae672/ox_bqpipeline-0.0.2.tar.gz" } ], "0.0.3": [ { "comment_text": "", "digests": { "md5": "4e274c113c1c2bb8e0b750e56e7582b1", "sha256": "3611413d4ea6b022e27a9068494a880570f442c7efb673bdba8a36bb4bcad2fd" }, "downloads": -1, "filename": "ox_bqpipeline-0.0.3.tar.gz", "has_sig": false, "md5_digest": "4e274c113c1c2bb8e0b750e56e7582b1", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 10740, "upload_time": "2019-07-03T21:20:40", "url": "https://files.pythonhosted.org/packages/3f/ea/6b15019a5ec4cea1d5b952ec62cf1a85b9b2961333ad87b8cde34d8b56fa/ox_bqpipeline-0.0.3.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "4e274c113c1c2bb8e0b750e56e7582b1", "sha256": "3611413d4ea6b022e27a9068494a880570f442c7efb673bdba8a36bb4bcad2fd" }, "downloads": -1, "filename": "ox_bqpipeline-0.0.3.tar.gz", "has_sig": false, "md5_digest": "4e274c113c1c2bb8e0b750e56e7582b1", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 10740, "upload_time": "2019-07-03T21:20:40", "url": "https://files.pythonhosted.org/packages/3f/ea/6b15019a5ec4cea1d5b952ec62cf1a85b9b2961333ad87b8cde34d8b56fa/ox_bqpipeline-0.0.3.tar.gz" } ] }