{ "info": { "author": "Mara contributors", "author_email": "", "bugtrack_url": null, "classifiers": [], "description": "# Mara ETL Tools\n\n[![Build Status](https://travis-ci.org/mara/mara-etl-tools.svg?branch=master)](https://travis-ci.org/mara/mara-etl-tools)\n[![PyPI - License](https://img.shields.io/pypi/l/mara-etl-tools.svg)](https://github.com/mara/mara-etl-tools/blob/master/LICENSE)\n[![PyPI version](https://badge.fury.io/py/mara-etl-tools.svg)](https://badge.fury.io/py/mara-etl-tools)\n[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://communityinviter.com/apps/mara-users/public-invite)\n\nA collection of utilities around [Project A](https://project-a.com/)'s best practices for creating [data integration](https://github.com/mara/data-integration) pipelines with mara. The package is intended as a start for new projects. Forks/ copies are preferred over PRs.\n\nFor more details on how to use this package, have a look at the [mara example project](https://github.com/mara/mara-example-project).\n\n\nThe package consists of a number modules that all can be used independently from each other:\n\n## SQL utility functions\n\nFunction `initialize_utils` in [etl_tools/initialize_utils/__init__.py](etl_tools/initialize_utils/__init__.py) returns a pipeline that creates a `util` schema with a number of PostgreSQL functions for organizing data pipelines. Add to your root pipeline like this:\n\n```python\nfrom etl_tools import initialize_utils\n\nmy_pipeline.add(initialize_utils.utils_pipeline(with_hll=True, with_cstore_fdw=True))\n```\n\nPlease have a look at the .sql files in [etl_tools/initialize_utils](etl_tools/initialize_utils) for available functions.\n\n\n## Schema copying\n\nThe file The file [etl_tools/schema_copying.py](etl_tools/schema_copying.py) contains the function `add_schema_copying_to_pipeline` that copies a PostgreSQL database schema from on host to another at the end of a pipeline run. This is useful for running the ETL and frontend tools on different database servers so that a running ETL does not affect the performance of dashboard queries.\n\n\nGiven that there is a pipline `my_pipeline` that has a number of child pipelines with the `Schema` label set to the respective schema to copy, then this is how the schema copying can be added to those child pipelines.\n\n```python\nfrom mara_db import dbs\nfrom data_integration.commands.sql import ExecuteSQL\nfrom data_integration.pipelines import Task\nfrom etl_tools.schema_copying import add_schema_copying_to_pipeline\n\n# when etl und frontend db are different, add schema copying\nif dbs.db('mdwh-etl').database != dbs.db('mdwh-frontend').database \\\n or dbs.db('mdwh-etl').host != dbs.db('mdwh-frontend').host:\n\n # run some of the files from etl_tools/initalize_utils in frontend db\n initialize_frontend_db_commands = [ExecuteSQL(\n sql_statement=\"DROP SCHEMA IF EXISTS util CASCADE; CREATE SCHEMA util;\", db_alias='mdwh-frontend')]\n\n for file_name in ['schema_switching.sql', 'data_sets.sql', 'hll.sql', 'cstore_fdw.sql']:\n initialize_frontend_db_commands.append(\n ExecuteSQL(sql_file_name=str(\n my_pipeline.nodes['utils'].nodes['initialize_utils'].base_path() / file_name),\n db_alias='mdwh-frontend'))\n\n my_pipeline.nodes['utils'].add(\n Task(id='initialize_frontend_db',\n description='Adds some functions to the frontend db so that schema copying works',\n commands=initialize_frontend_db_commands))\n\n # Add schema copying for time schema\n add_schema_copying_to_pipeline(pipeline=my_pipeline.nodes['utils'].nodes['create_time_dimensions'],\n schema_name='time',\n source_db_alias='dwh-etl', target_db_alias='dwh-frontend')\n\n # Add schema copying to all root pipelines\n for pipeline in my_pipeline.nodes.values():\n if \"Schema\" in pipeline.labels:\n schema = pipeline.labels['Schema']\n add_schema_copying_to_pipeline(pipeline=pipeline, schema_name=schema + '_next',\n source_db_alias='dwh-etl', target_db_alias='dwh-frontend')\n pipeline.final_node.commands_after.append(\n ExecuteSQL(sql_statement=f\"SELECT util.replace_schema('{schema}', '{schema}_next')\",\n db_alias='mdwh-frontend')\n )\n```\n \n\n## Time dimensions\n\nThe file [etl_tools/create_time_dimensions/__init__.py](etl_tools/create_time_dimensions/__init__.py) defines a pipeline that creates and updates `time` schema with the tables `day` and `duration`:\n\n```\nselect * from time.day order by _date desc limit 10;\n day_id | day_name | year_id | iso_year_id | quarter_id | quarter_name | month_id | month_name | week_id | week_name | day_of_week_id | day_of_week_name | day_of_month_id | _date \n ----------+------------------+---------+-------------+------------+--------------+----------+------------+---------+--------------+----------------+------------------+-----------------+------------\n 20190815 | Thu, Aug 15 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 4 | Thursday | 15 | 2019-08-15\n 20190814 | Wed, Aug 14 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 3 | Wednesday | 14 | 2019-08-14\n 20190813 | Tue, Aug 13 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 2 | Tuesday | 13 | 2019-08-13\n 20190812 | Mon, Aug 12 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 1 | Monday | 12 | 2019-08-12\n 20190811 | Sun, Aug 11 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 7 | Sunday | 11 | 2019-08-11\n 20190810 | Sat, Aug 10 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 6 | Saturday | 10 | 2019-08-10\n 20190809 | Fri, Aug 09 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 5 | Friday | 9 | 2019-08-09\n 20190808 | Thu, Aug 08 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 4 | Thursday | 8 | 2019-08-08\n 20190807 | Wed, Aug 07 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 3 | Wednesday | 7 | 2019-08-07\n 20190806 | Tue, Aug 06 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 2 | Tuesday | 6 | 2019-08-06\n```\n\n```\nselect * from time.duration where duration_id >= 0 order by duration_id limit 10;\n duration_id | days | days_name | weeks | weeks_name | four_weeks | four_weeks_name | months | months_name | sixth_years | sixth_years_name | half_years | half_years_name | years | years_name \n-------------+------+-----------+-------+------------+------------+-----------------+--------+-------------+-------------+------------------+------------+-----------------+-------+------------\n 0 | 0 | 0 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 1 | 1 | 1 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 2 | 2 | 2 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 3 | 3 | 3 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 4 | 4 | 4 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 5 | 5 | 5 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 6 | 6 | 6 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 7 | 7 | 7 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 8 | 8 | 8 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n 9 | 9 | 9 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days\n\n```\n\nAdd the pipeline to your project with \n\n```bash\nfrom etl_tools import create_time_dimensions\n\nmy_pipeline.add(create_time_dimensions.pipeline)\n```\n\nSet min and max dates by overwriting the `first_date_in_time_dimensions` and `last_date_in_time_dimensions` in [etl_tools/config.py](etl_tools/config.py).\n\n\n## Euro currency exchange rates\n\nThe file [etl_tools/load_euro_exchange_rates/__init__.py](etl_tools/create_time_dimensions/__init__.py) contains a pipeline that loads (historic) Euro exchange rates from the European central bank. \n\n\nAdd to your pipeline with \n\n```bash\nfrom etl_tools import load_euro_exchange_rates\n\nmy_pipeline.add(load_euro_exchange_rates.euro_exchange_rates_pipeline('db-alias'))\n```", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/mara/mara-etl-tools", "keywords": "", "license": "MIT", "maintainer": "", "maintainer_email": "", "name": "mara-etl-tools", "package_url": "https://pypi.org/project/mara-etl-tools/", "platform": "", "project_url": "https://pypi.org/project/mara-etl-tools/", "project_urls": { "Homepage": "https://github.com/mara/mara-etl-tools" }, "release_url": "https://pypi.org/project/mara-etl-tools/3.0.0/", "requires_dist": null, "requires_python": ">=3.6", "summary": "Utilities for creating ETL pipelines with mara", "version": "3.0.0" }, "last_serial": 5496822, "releases": { "2.1.0": [ { "comment_text": "", "digests": { "md5": "32f7facc866f1d915a8aa0add43edfd0", "sha256": "9b22fcab6dc24c1bf9d9eedaa72aa539a875c63deffe19f049ddd1c31dc83618" }, "downloads": -1, "filename": "mara-etl-tools-2.1.0.tar.gz", "has_sig": false, "md5_digest": "32f7facc866f1d915a8aa0add43edfd0", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.6", "size": 11199, "upload_time": "2019-07-07T09:06:50", "url": "https://files.pythonhosted.org/packages/89/d5/27909a219fbfb20d74c5b0029112b9721e96ad23de26c3fc1c9b2bb581a1/mara-etl-tools-2.1.0.tar.gz" } ], "3.0.0": [ { "comment_text": "", "digests": { "md5": "875f6f4ce88b76aef4354f52a6f1c341", "sha256": "72b5a96ae241f296c7bcfaff0e89af94102d67a0a612f1b1fb253153c3bddd13" }, "downloads": -1, "filename": "mara-etl-tools-3.0.0.tar.gz", "has_sig": false, "md5_digest": "875f6f4ce88b76aef4354f52a6f1c341", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.6", "size": 11295, "upload_time": "2019-07-07T10:18:44", "url": "https://files.pythonhosted.org/packages/0b/d5/0f1cb74528444f9b1c76cfe84f2a50eea2a761b27da82919eb404b6d9734/mara-etl-tools-3.0.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "875f6f4ce88b76aef4354f52a6f1c341", "sha256": "72b5a96ae241f296c7bcfaff0e89af94102d67a0a612f1b1fb253153c3bddd13" }, "downloads": -1, "filename": "mara-etl-tools-3.0.0.tar.gz", "has_sig": false, "md5_digest": "875f6f4ce88b76aef4354f52a6f1c341", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.6", "size": 11295, "upload_time": "2019-07-07T10:18:44", "url": "https://files.pythonhosted.org/packages/0b/d5/0f1cb74528444f9b1c76cfe84f2a50eea2a761b27da82919eb404b6d9734/mara-etl-tools-3.0.0.tar.gz" } ] }