{ "info": { "author": "Giuseppe Tribulato", "author_email": "gtsystem@gmail.com", "bugtrack_url": null, "classifiers": [ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5" ], "description": "parallelpipe\n============\n\n|Build Status|\n\nparallelpipe is a pipeline parallelization library for Python.\n\nA pipeline is composed by one or more stages. Each stage take the output\nof the previous stage as an input and performs some operations on it\nlike map, filter, reduce, etc. This is an extension of the normal\nproducer/consumer pattern where we can have multiple stages. Every stage\nreceives the input data in a queue and push the results to another queue\nthat is connected with the next stage.\n\n \n\nIn this example we define a stage function that takes as an input an\niterator returning urls and return the corresponding content after\ndownloading it:\n\n.. code:: python\n\n from parallelpipe import stage\n import requests\n \n @stage(workers=4)\n def fetch_urls(urls):\n for url in urls:\n result = requests.get(url)\n yield result.content\n\nTo use this stage just run\n\n.. code:: python\n\n urls = ['http://test.com', ...]\n pipe = urls | fetch_urls\n for content in pipe.results():\n print(len(content))\n\nWe built a basic pipe with only one stage. This stage have 4 workers\nthat will start processing in parallel the input urls. The main process\nwill receive the downloaded content as soon as one of them is available\nand print the corresponding length. Notice that a pipeline input can be\nany iterable; this will be automatically wrapped into a stage.\n\nLet's say we are interested into the title string inside the HTML\ncontent. We can add another stage to do that:\n\n.. code:: python\n\n import re\n RE_TITLE = re.compile(\"(.*?)\", re.M)\n\n @stage(workers=2)\n def get_titles(contents):\n for content in contents:\n match = RE_TITLE.search(content)\n if match is not None:\n yield match.group(1)\n\n pipe = urls | fetch_urls | get_titles\n for title in pipe.results():\n print(title)\n\nAgain this second stage will start processing the content as soon as it\nis available and yield his output. Notice that also this task is\nparallelized, since we set workers to 2. As you can see this stage is\nnot an exact map, since the number of titles returned could be less then\nthe number of document (we check for the presence of the title tag).\n\nLet's add now one more stage to return the most common title:\n\n.. code:: python\n\n from collections import Counter\n\n @stage()\n def most_common(titles):\n commons = Counter(titles).most_common(1)\n yield commons[0]\n\n pipe = urls | fetch_urls | get_titles | most_common\n print(pipe.execute())\n\nTo calculate the most common title we need to aggregate all results, so\nwe can use only one worker. We also use ``pipe.execute()`` instead of\n``pipe.results()`` because we know only one result will be returned.\n\nParametrical stage\n------------------\n\n.. code:: python\n\n @stage(workers=4)\n def add_n(input, n):\n for number in input:\n yield number + n\n\n pipe = range(100) | add_n(7)\n for result in pipe.results():\n print(result)\n\nIn this example our stage function not only require the input iterator\nbut also one or more extra parameter to perform his computation. In the\nmoment we build our pipeline we can configure this extra paramerters\nsimply calling the stage with them as input. Remember, all parameters\ncan be passed except for the first one that is the mandatory input\niterator.\n\nMapping stage\n-------------\n\nIf your stage do pure mapping, i.e. it return exactly one result for\nevery input element you can simplify your code using the ``map_stage``\ndecorator:\n\n.. code:: python\n\n from parallelpipe import map_stage\n\n @map_stage(workers=4)\n def add_n(number, n):\n return number + n\n\nQueue Size\n----------\n\nWhen you build a stage you can define how big his output queue is.\nSetting an output queue limit can be useful if the current stage can\nproduce much faster then how the following stage can consume. In this\ncase, once the queue size is reached the stage stop processing his input\nand wait for the consumer to free a slot.\n\n.. code:: python\n\n # only 30 elements can queue in output before blocking this stage\n @stage(workers=4, qsize=30)\n def add_n(input, n):\n for number in input:\n yield number + n\n\nBy default ``qsize=0`` that means the queue have no limit.\n\nSetup a stage\n-------------\n\nSetup the stage queue and workers can also be done after defining the\nstage calling the ``setup()`` method.\n\n.. code:: python\n\n add_n.setup(workers=2, qsize=0)\n\nUse the Stage class directly\n----------------------------\n\nSo far we built stages using decorators on functions, but we can also\nuse the Stage class directly:\n\n.. code:: python\n\n from parallelpipe import Stage\n\n def add_n(input, n):\n for number in input:\n yield number + n\n \n pipe = Stage(range, 10) | Stage(add_n, 5)\n\nAs you can see in the previous example the Stage class take as input an\niterator function and any extra parameters needed by it. The first stage\nis a producer so will not be called with any input iterator. When we use\nthe stage class explictly we can use ``setup()`` to configure how many\nworkers we need and the queue size:\n\n.. code:: python\n\n pipe = Stage(range, 10).setup(qsize=5) | Stage(add_n, 5).setup(workers=2)\n\nThe ``setup()`` method return the stage itself, so we can set it up\nduring the pipeline definition.\n\nException handling\n------------------\n\nDuring the execution of your stage function an exception can occur. When\na stage detects an exception it will automatically consume and ignore\nall the input from the previous stage and then a ``TaskException`` will\nbe throw on the main process.\n\n.. code:: python\n\n @stage(workers=2)\n def add_one(numbers):\n for number in numbers:\n yield number + 1\n\n.. code:: python\n\n >>> pipe = [2, 3, \"ops\", 7] | add_one\n >>> print(sum(pipe.results()))\n Process add_one-0:\n Traceback (most recent call last):\n File \"/Users/gt/miniconda2/lib/python2.7/multiprocessing/process.py\", line 258, in _bootstrap\n self.run()\n File \"/Users/gt/Desktop/code/parallelpipe/parallelpipe.py\", line 67, in run\n for item in res:\n File \"example.py\", line 7, in add_one\n yield number + 1\n TypeError: cannot concatenate 'str' and 'int' objects\n Traceback (most recent call last):\n File \"example.py\", line 10, in \n print(sum(pipe.results()))\n File \"/Users/gt/Desktop/code/parallelpipe/parallelpipe.py\", line 249, in results\n raise TaskException(msg)\n parallelpipe.TaskException: The task \"add_one-0\" raised TypeError(\"cannot concatenate 'str' and 'int' objects\",)\n\nIf you want to avoid that a single bad input blocks your pipeline you\ncan of course catch any exception inside the stage function so that the\npipeline can continue and produce the rest of the results.\n\n.. |Build Status| image:: https://travis-ci.org/gtsystem/parallelpipe.svg?branch=master\n :target: https://travis-ci.org/gtsystem/parallelpipe", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/gtsystem/parallelpipe", "keywords": null, "license": "MIT", "maintainer": null, "maintainer_email": null, "name": "parallelpipe", "package_url": "https://pypi.org/project/parallelpipe/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/parallelpipe/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/gtsystem/parallelpipe" }, "release_url": "https://pypi.org/project/parallelpipe/0.2.6/", "requires_dist": null, "requires_python": null, "summary": "Pipeline parallelization library", "version": "0.2.6" }, "last_serial": 2175756, "releases": { "0.2.6": [ { "comment_text": "", "digests": { "md5": "da1729109bccd111ec7baef09230a9c6", "sha256": "af3d506cb54cdb11bb64a1f7e433e6fcc546296d755e962490001e0cb0810e97" }, "downloads": -1, "filename": "parallelpipe-0.2.6.tar.gz", "has_sig": false, "md5_digest": "da1729109bccd111ec7baef09230a9c6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 7244, "upload_time": "2016-06-19T13:25:00", "url": "https://files.pythonhosted.org/packages/71/a7/aa834d3446e53a2c223cb16d20058a79070ae8e1a4cf98677dd63835baee/parallelpipe-0.2.6.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "da1729109bccd111ec7baef09230a9c6", "sha256": "af3d506cb54cdb11bb64a1f7e433e6fcc546296d755e962490001e0cb0810e97" }, "downloads": -1, "filename": "parallelpipe-0.2.6.tar.gz", "has_sig": false, "md5_digest": "da1729109bccd111ec7baef09230a9c6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 7244, "upload_time": "2016-06-19T13:25:00", "url": "https://files.pythonhosted.org/packages/71/a7/aa834d3446e53a2c223cb16d20058a79070ae8e1a4cf98677dd63835baee/parallelpipe-0.2.6.tar.gz" } ] }