{ "info": { "author": "Ricardo Pascal", "author_email": "voorloop@gmail.com", "bugtrack_url": null, "classifiers": [], "description": "# PipeFrame\nWhat is a Pipeline? \n\n> In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.\n\n_source: Wikipedia_\n\nPipeFrame is a small **pipe**line **frame**work that help you process data (stream or batch) taking advantage of python multiprocessing library.\n\n## Installation\n\nThe package in available at pip, to install it in your environment just do:\n\n > pip install pipeframe\n\n## Getting started\n\n### Create your pipeline\nThe first thing you should do is create your pipeline, it should inherited from `pipeframe.core.PipelineEngine`\nand include a `steps` class attribute:\n\n```python3\nfrom pipeframe.core import PipelineEngine\nclass YourCustomPipeline(PipelineEngine):\n steps = [func1, func2, ...]\n```\n\nThe pipeline will execute each entry against the `steps` functions. You can \ndefine any amount of functions to perform on your data, the execution order\nwill follow the same order you defined in the steps list.\n\nYour function should receive as parameter the record to be processed and return\nthe modified record and a boolean that is used to bypass further steps execution\non the data (False) or keep going with the pipeline flow (True).\n\n```python3\ndef func1(record):\n if record.is_upper():\n return record, False\n else:\n return record.lower(), True \n```\n\nYou also have to provide a function named `feed` that will feed your process with some data:\n\n```python3\nclass YourCustomPipeline(PipelineEngine):\n steps = [func1, func2, ...]\n\n def feed(self, bucket):\n req = requests.get('https://www.reddit.com/r/all/top.json', headers={'User-agent': 'pipeframe'})\n if req.status_code == 200:\n data = req.json()['data']['children']\n for entry in data:\n bucket.put(entry['data'])\n```\n\n### Run your pipeline\n\nTo execute your newly created pipeline you must call it using PipeFrame executor:\n\n```python3\nfrom pipeframe.core import PipeFrame\npipe_frame = PipeFrame(cpu_count=16, stream_buffer_size=50000)\npipe_frame.run(YourCustomPipeline)\n```\n\nThe `cpu_count` and `buffer_size` are optional arguments:\n\n - cpu_count: an integer that defaults to the number of cores in your machine minus 1\n - buffer_size: an integer that defaults to 10000 \n\n ## Stream or Batch?\n\n By default your pipeline will run in batch mode, it means that **your feed function will run and complete before the step\n functions start**. You have to be aware of how much data entries are going to the queue and tune the _buffer_size_ \n according to that.\n\n If you make _source='stream'_ **your feed function will start after the step functions and the feeding and processing\n will happen in parallel**. In that case you should tune the timeout attribute for a value high enough to prevent the\n pipeline termination due temporary absence of data in the queue ( For the cases that you data ingestion is slower than \n your capacity to process it). \n\n Example:\n\n ```python3\nclass YourCustomPipeline(PipelineEngine):\n steps = [func1, func2, ...]\n source = 'stream'\n timeout = 10 \n\n def feed(self, bucket):\n for entry in infinite_stream_of_data():\n bucket.put(entry)\n```\n\nIn the example above your workers will wait up to 10 seconds for the the infinite_stream_of_data() function to produce \nnew data to be processed, if no new data arrive in 10 seconds, the workers will terminated because your stream has dried.\n\n\n## Full example\n\n```python \nfrom pipeframe.core import PipelineEngine, PipeFrame\nimport fcntl\nimport json\n\n\ndef clear_entry(entry):\n entry['new_number'] = 0\n return entry, True\n\n\ndef power(entry):\n entry['new_number'] = entry['number'] ** entry['number']\n return entry, True\n\n\ndef write_to_disk(entry):\n \"\"\"\n Lock the file, write entry, release the file.\n \"\"\"\n with open(\"log\", \"a\") as fh:\n fcntl.flock(fh, fcntl.LOCK_EX)\n fh.write(json.dumps(entry['number'])+'\\n')\n fcntl.flock(fh, fcntl.LOCK_UN)\n\n return entry, True\n\n\nclass PowerDataPipeline(PipelineEngine):\n steps = [clear_entry, power, write_to_disk]\n source = 'batch'\n\n @staticmethod\n def feed(bucket):\n x = 1000000\n for i in range(10):\n x += 1000\n entry = {'number': x}\n bucket.put(entry)\n\n\n# With all cpu - 1\npipe_frame = PipeFrame()\npipe_frame.run(PowerDataPipeline)\n\n\n# With 2 cpus\npipe_frame = PipeFrame(cpu_count=2)\npipe_frame.run(PowerDataPipeline)\n```\n\n", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/voorloopnul/pipeframe", "keywords": "", "license": "", "maintainer": "", "maintainer_email": "", "name": "pipeframe", "package_url": "https://pypi.org/project/pipeframe/", "platform": "", "project_url": "https://pypi.org/project/pipeframe/", "project_urls": { "Homepage": "https://github.com/voorloopnul/pipeframe" }, "release_url": "https://pypi.org/project/pipeframe/0.0.4/", "requires_dist": null, "requires_python": ">=3.4", "summary": "PipeFrame - Simple module to write multiprocessing data pipelines with python.", "version": "0.0.4" }, "last_serial": 5985907, "releases": { "0.0.1": [ { "comment_text": "", "digests": { "md5": "3b96dba8d8f07cb9aa7a907e4aad5ce7", "sha256": "b052e14e219427382037ff5fd49d91ae3af8a7436827b8df31f44d3fc372f1a9" }, "downloads": -1, "filename": "pipeframe-0.0.1-py3-none-any.whl", "has_sig": false, "md5_digest": "3b96dba8d8f07cb9aa7a907e4aad5ce7", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": ">=3.4", "size": 1937, "upload_time": "2019-10-15T16:16:41", "url": "https://files.pythonhosted.org/packages/12/bf/d71e52ad419f208bd885d9013f1e6ed10a3732914711fb3c2e4f107f96e4/pipeframe-0.0.1-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "bb3ca31446fc41ccad50f841eb7aa362", "sha256": "51ca925c59ea6cfa9d3c3ae961034544d53e851ced2e07cfa8c962e4f074c18a" }, "downloads": -1, "filename": "pipeframe-0.0.1.tar.gz", "has_sig": false, "md5_digest": "bb3ca31446fc41ccad50f841eb7aa362", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 1556, "upload_time": "2019-10-15T16:16:44", "url": "https://files.pythonhosted.org/packages/c3/a8/63dceae897065becf5f334f8c3368098a52c33fc3af1b1b9738e70018c0a/pipeframe-0.0.1.tar.gz" } ], "0.0.2": [ { "comment_text": "", "digests": { "md5": "e16c8ba8abd7454debc09c9bb7348cbc", "sha256": "5479b51353b617a3276cb86c8ee279c6f924bbc968863dc3f2f0bc87d6dfc66c" }, "downloads": -1, "filename": "pipeframe-0.0.2-py3-none-any.whl", "has_sig": false, "md5_digest": "e16c8ba8abd7454debc09c9bb7348cbc", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": ">=3.4", "size": 2675, "upload_time": "2019-10-15T16:21:18", "url": "https://files.pythonhosted.org/packages/0a/16/2f3ecc2f77148f22da7f05a0483c00a3a010997a08f929e24b3039d532f1/pipeframe-0.0.2-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "dc34043159dcab86b061c85a114c1e9a", "sha256": "6bffd6259c9bdd36f6059abc028ae1d5defc26a63188b39f494794975bee5890" }, "downloads": -1, "filename": "pipeframe-0.0.2.tar.gz", "has_sig": false, "md5_digest": "dc34043159dcab86b061c85a114c1e9a", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 1801, "upload_time": "2019-10-15T16:21:19", "url": "https://files.pythonhosted.org/packages/4c/b9/a9f0dc48764541575d7fcd65025df6b30918abe30b712b7cbaed0b5c88c8/pipeframe-0.0.2.tar.gz" } ], "0.0.3": [ { "comment_text": "", "digests": { "md5": "19f4e80ac0b2bf76966f44fe3e618cfb", "sha256": "2a33566c80b07dc6da22a3f1e036e2994cc9364916d1c1c9116384516fbe06fd" }, "downloads": -1, "filename": "pipeframe-0.0.3-py3-none-any.whl", "has_sig": false, "md5_digest": "19f4e80ac0b2bf76966f44fe3e618cfb", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": ">=3.4", "size": 3819, "upload_time": "2019-10-16T18:49:38", "url": "https://files.pythonhosted.org/packages/5c/28/42c04c38836e736f3a96244e21f32a06447e40ba1cc2e7f06f5d048e186b/pipeframe-0.0.3-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "45f0e7a2d61877ec2d62792df9dd5fa3", "sha256": "d584123efecd9780764eac4c7d0b7aa436010b2e82ea804a416520125e9d0138" }, "downloads": -1, "filename": "pipeframe-0.0.3.tar.gz", "has_sig": false, "md5_digest": "45f0e7a2d61877ec2d62792df9dd5fa3", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 3270, "upload_time": "2019-10-16T18:49:40", "url": "https://files.pythonhosted.org/packages/9c/44/cef1e9792268c7c6b91ef361fc66b5d09e5bdfbb90365b93cf3fee7cfad4/pipeframe-0.0.3.tar.gz" } ], "0.0.4": [ { "comment_text": "", "digests": { "md5": "9554592828f967ea49debdfd6c996e56", "sha256": "9493df10f839f2261e31f13a8475bc6ffc97eb831cafdba65b8e807a6f5d013d" }, "downloads": -1, "filename": "pipeframe-0.0.4-py3-none-any.whl", "has_sig": false, "md5_digest": "9554592828f967ea49debdfd6c996e56", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": ">=3.4", "size": 4866, "upload_time": "2019-10-16T19:01:21", "url": "https://files.pythonhosted.org/packages/c4/73/beb5a2d45d3d4935a36a0465ef78b34bf8e7539d427a8357736d56cc69aa/pipeframe-0.0.4-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "4db38900d16ccca76dc643c8819a82d1", "sha256": "2a399f986d8e6e46e96f92f34f09c1ef5ba88bb835ae7576a2ca7c1351a42423" }, "downloads": -1, "filename": "pipeframe-0.0.4.tar.gz", "has_sig": false, "md5_digest": "4db38900d16ccca76dc643c8819a82d1", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 3954, "upload_time": "2019-10-16T19:01:22", "url": "https://files.pythonhosted.org/packages/3a/65/a9c0a5b7f0f7d2d951ada42af7bcea5b4f7383edf901034e4755e9a9e96d/pipeframe-0.0.4.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "9554592828f967ea49debdfd6c996e56", "sha256": "9493df10f839f2261e31f13a8475bc6ffc97eb831cafdba65b8e807a6f5d013d" }, "downloads": -1, "filename": "pipeframe-0.0.4-py3-none-any.whl", "has_sig": false, "md5_digest": "9554592828f967ea49debdfd6c996e56", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": ">=3.4", "size": 4866, "upload_time": "2019-10-16T19:01:21", "url": "https://files.pythonhosted.org/packages/c4/73/beb5a2d45d3d4935a36a0465ef78b34bf8e7539d427a8357736d56cc69aa/pipeframe-0.0.4-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "4db38900d16ccca76dc643c8819a82d1", "sha256": "2a399f986d8e6e46e96f92f34f09c1ef5ba88bb835ae7576a2ca7c1351a42423" }, "downloads": -1, "filename": "pipeframe-0.0.4.tar.gz", "has_sig": false, "md5_digest": "4db38900d16ccca76dc643c8819a82d1", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.4", "size": 3954, "upload_time": "2019-10-16T19:01:22", "url": "https://files.pythonhosted.org/packages/3a/65/a9c0a5b7f0f7d2d951ada42af7bcea5b4f7383edf901034e4755e9a9e96d/pipeframe-0.0.4.tar.gz" } ] }