{ "info": { "author": "Jorge Alpedrinha Ramos", "author_email": "python@uphold.com", "bugtrack_url": null, "classifiers": [], "description": "aiopype\n=======\n\nPython asynchronous data pipelines\n\n**aiopype** allows running continuous data pipelines reliably with a\nplain simple approach to their development.\n\n**Aiopype** creates a centralized message handler to allow every\nprocessor to work as an independent non-blocking message\nproducer/consumer.\n\n**Aiopype** has 4 main concepts:\n\n- Flow\n- Manager\n- Processor\n- Message Handler\n\nFlow\n----\n\nThe Flow is **aiopype**'s main component. A flow is the entrypoint for\nreliability running pipeline managers.\n\n``Flow`` is responsible for:\n\n- Starting all registered managers\n- Handling manager failures\n- Reporting errors\n- Restarting failed managers\n\nManager\n-------\n\nThe manager is responsible for registering a data pipeline from top to\nbottom. This means it must register a source and connect it with it's\nconsumers, until the pipeline finally outputs.\n\nProcessor\n---------\n\nA processor is a message consumer/producer.\n\nSources\n~~~~~~~\n\nSources are special cases of processors. Their special characteristic is\nthat they can run forever, and are the starting point of any pipeline.\n\nExamples of sources may be:\n\n- A ``REST API`` poller\n- An ``Websocket`` client\n- A ``Cron`` job\n\nMessage handler\n---------------\n\nThe message handler is the central piece that allows **aiopype** to\nscale.\n\nA Flow will start one or more Sources as the starting point for each\nregistered Manager. Once a Source produces an event, a message will be\ntriggered and the handler will identify and fire the corresponding\nhandlers.\n\nThere are two available message handlers:\n\n- SyncProtocol\n- AsyncProtocol\n\nSyncProtocol\n------------\n\nThe synchronous event handler is, as its name suggests, synchronous,\nmeaning that once the source emits a message, it must be handled until\nthe end of the pipeline and the source can proceed with it's normal\nbehavior. This is good for development purposes but fails to meet the\nasynchronous event driven pattern required to allowing component\nisolation.\n\nAsyncProtocol\n-------------\n\nThe main difference between SyncProtocol and AsyncProtocol is that the\nlatter uses a decoupled event loop to assess if there are new messages\nin the queue for processing, whilst the first simply starts processing\nreceived messages instantaneously. This allows total isolation of\nprocessors.\n\nExample\n=======\n\nApple stock processor.\n\nSource\n------\n\nOur source will be ``Yahoo Finance`` for gathering data from ``AAPL``\nticker price. We'll use **aiopype** ``RestSource`` as a base class.\n\n.. code:: py\n\n from aiopype.sources import RestSource\n\n\n class YahooRestSource(RestSource):\n \"\"\"\n Yahoo REST API source.\n \"\"\"\n def __init__(self, name, handler, symbol):\n super().__init__(\n name,\n handler,\n 'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol), {\n 'exception_threshold': 10,\n 'request_interval': 30\n }\n )\n\nProcessor\n---------\n\nOur sample processor will simply extract the price from the returned\njson.\n\n.. code:: py\n\n from aiopype import Processor\n\n\n class HandleRawData(Processor):\n def handle(self, data, time):\n self.emit('price', time, data['list']['resources'][0]['resource']['fields']['price'])\n\nOutput\n------\n\nOur output processor will write price data onto a CSV File.\n\n.. code:: py\n\n import csv\n\n\n class CSVOutput(Processor):\n def __init__(self, name, handler, filename):\n super().__init__(name, handler)\n self.filename = filename\n\n with open(self.filename, 'w', newline = '') as csvfile:\n writer = csv.writer(csvfile, delimiter = ';')\n writer.writerow(['time', 'price'])\n\n def write(self, time, price):\n with open(self.filename, 'w', newline = '') as csvfile:\n writer = csv.writer(csvfile, delimiter = ';')\n writer.writerow([time, price])\n\nManager\n-------\n\nThe manager will instantiate ``Source``, ``Processor`` and ``Output``.\nIt will connect ``Source``'s ``data`` event to ``Processor.handle``\nhandler and ``Processor``'s ``price`` event to ``Output.write`` handler.\nThis will be our data pipeline.\n\n.. code:: py\n\n from aiopype import Manager\n\n\n class YahooManager(Manager):\n name = 'yahoo_apple'\n\n def __init__(self, handler):\n super().__init__(handler)\n self.processor = HandleRawData(self.build_processor_name('processor'), self.handler)\n self.source = YahooRestSource(self.build_processor_name('source'), self.handler, 'AAPL')\n self.writer = CSVOutput(self.build_processor_name('writer'), self.handler, 'yahoo_appl.csv')\n\n self.source.on('data', self.processor.handle)\n self.processor.on('price', self.writer.write)\n\nFlow\n----\n\nOur flow config will have the ``yahoo_apple`` manager only.\n\n.. code:: py\n\n from aiopype import AsyncFlow\n\n\n class FlowConfig(object):\n FLOWS = ['yahoo_apple']\n\n dataflow = AsyncFlow(FlowConfig())\n\nMain method:\n------------\n\nWill simply start the dataflow.\n\n.. code:: py\n\n if __name__ == \"__main__\":\n dataflow.start()\n\nRunning the example\n-------------------\n\nCompile all the above code in a file called ``example.py`` and run:\n\n.. code:: sh\n\n python example.py\n\nClusters\n========\n\nWIP:\n----\n\nThis decentralized mechanism makes distributed pipelines a possibility,\nif we have coordination between nodes.\n\nChangelog\n---------\n\n0.1.4 / 2016-07-14\n~~~~~~~~~~~~~~~~~~\n\n- `#10 `__ Avoid unfinished\n flows (@jAlpedrinha)\n\n0.1.3 / 2016-07-11\n~~~~~~~~~~~~~~~~~~\n\n- `#8 `__ Fix AsyncProtocol\n termination condition (@jAlpedrinha)\n\n0.1.2 / 2016-07-06\n~~~~~~~~~~~~~~~~~~\n\n- `#6 `__ Handle exceptions\n from async protocol listener (@jAlpedrinha)\n\n0.1.1 / 2016-07-05\n~~~~~~~~~~~~~~~~~~\n\n- `#4 `__ Avoid failure on\n pusherclient disconnection (@jAlpedrinha)\n\n0.1.0 / 2016-07-05\n~~~~~~~~~~~~~~~~~~\n\n- `#1 `__ Add flow manager\n and processors (@jAlpedrinha)", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://www.github.com/uphold/aiopype/", "keywords": null, "license": "LICENSE", "maintainer": null, "maintainer_email": null, "name": "aiopype", "package_url": "https://pypi.org/project/aiopype/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/aiopype/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://www.github.com/uphold/aiopype/" }, "release_url": "https://pypi.org/project/aiopype/0.1.4/", "requires_dist": null, "requires_python": null, "summary": "AioPype - Flow based programming with asyncio", "version": "0.1.4" }, "last_serial": 2220567, "releases": { "0.0.63": [ { "comment_text": "", "digests": { "md5": "60d92a23f9a3dbc7a99d72f3a4007ecb", "sha256": "0072620e70f790b77626c92d434eefee31273256002fc5087f54e122b6d0bdc0" }, "downloads": -1, "filename": "aiopype-0.0.63.tar.gz", "has_sig": false, "md5_digest": "60d92a23f9a3dbc7a99d72f3a4007ecb", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 22519, "upload_time": "2016-07-04T14:19:58", "url": "https://files.pythonhosted.org/packages/1f/2d/44ae23daad7353c792e7f1bcb60ebdb2b2060d6753e9511539f74daca326/aiopype-0.0.63.tar.gz" } ], "0.1.0": [ { "comment_text": "", "digests": { "md5": "e0b02eb26e47f788ec7029a2c2a310a6", "sha256": "a6b3bf7ed783b4897a91f6be1a4d11bf7fa6ab659361e6f47f0ad8f929241a9d" }, "downloads": -1, "filename": "aiopype-0.1.0.tar.gz", "has_sig": false, "md5_digest": "e0b02eb26e47f788ec7029a2c2a310a6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 23143, "upload_time": "2016-07-05T14:46:11", "url": "https://files.pythonhosted.org/packages/da/c3/11f34c1633e1772695d071c34da19d1a39057002ce13ae0ca177121ee19e/aiopype-0.1.0.tar.gz" } ], "0.1.1": [ { "comment_text": "", "digests": { "md5": "351693689fcd2da8b38087f3d0c48da1", "sha256": "ac487047fde155aee187e22e8c62e90087e9167c9c7c9bcb305d46f84a784483" }, "downloads": -1, "filename": "aiopype-0.1.1.tar.gz", "has_sig": false, "md5_digest": "351693689fcd2da8b38087f3d0c48da1", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 23179, "upload_time": "2016-07-05T16:58:56", "url": "https://files.pythonhosted.org/packages/38/06/b2661f6343ef9fa94f6d3882966b90fd5f70093dd64556b3753cdd67f97a/aiopype-0.1.1.tar.gz" } ], "0.1.2": [ { "comment_text": "", "digests": { "md5": "d805d0cd77d83b6cc8f439baa95735e7", "sha256": "f85ac6552f8c97246a13ba02d35d2419cc460017e732637d0f975ad7e8dd5dc4" }, "downloads": -1, "filename": "aiopype-0.1.2.tar.gz", "has_sig": false, "md5_digest": "d805d0cd77d83b6cc8f439baa95735e7", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 23412, "upload_time": "2016-07-06T16:15:01", "url": "https://files.pythonhosted.org/packages/92/4c/e8269de3db35f63813cb9698b7ee3fefae2c0024b6e2b18edaa1fba19cde/aiopype-0.1.2.tar.gz" } ], "0.1.3": [ { "comment_text": "", "digests": { "md5": "5da02ffe32367195c9361674a26a38d6", "sha256": "ff564146500d82c952a2bf184fd1d8c70163cd4d47d724399d73d2a6d5861598" }, "downloads": -1, "filename": "aiopype-0.1.3.tar.gz", "has_sig": false, "md5_digest": "5da02ffe32367195c9361674a26a38d6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 23477, "upload_time": "2016-07-11T09:29:00", "url": "https://files.pythonhosted.org/packages/39/2e/0f8190b926dd4ba3178f6af206adb46fa1894ebca1892dc9b10f981bf7cc/aiopype-0.1.3.tar.gz" } ], "0.1.4": [ { "comment_text": "", "digests": { "md5": "f8cedb94ac81ece658ce25a103d98c36", "sha256": "8f971ff82a3948c9a4557046480d7774762458ac4d79d0ad008e3d6096db694c" }, "downloads": -1, "filename": "aiopype-0.1.4.tar.gz", "has_sig": false, "md5_digest": "f8cedb94ac81ece658ce25a103d98c36", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 24048, "upload_time": "2016-07-14T09:51:11", "url": "https://files.pythonhosted.org/packages/92/b6/418438dad9ba5d79450a7f978ee9782bd27d54ac0cc5d74fd25c5bf69759/aiopype-0.1.4.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "f8cedb94ac81ece658ce25a103d98c36", "sha256": "8f971ff82a3948c9a4557046480d7774762458ac4d79d0ad008e3d6096db694c" }, "downloads": -1, "filename": "aiopype-0.1.4.tar.gz", "has_sig": false, "md5_digest": "f8cedb94ac81ece658ce25a103d98c36", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 24048, "upload_time": "2016-07-14T09:51:11", "url": "https://files.pythonhosted.org/packages/92/b6/418438dad9ba5d79450a7f978ee9782bd27d54ac0cc5d74fd25c5bf69759/aiopype-0.1.4.tar.gz" } ] }