{ "info": { "author": "Tobias Guenther", "author_email": "orkan@tobias.io", "bugtrack_url": null, "classifiers": [], "description": "=====\nOrkan\n=====\n\nOrkan is a pipeline parallelization library, written in Python.\n\nMaking use of the multicore capabilities of ones machine in\nPython is often not as easy as it should be. Orkan aims to\nprovide a plain API to utilze those underused CPUs of yours\nin cases you need some extra horse power for your computation.\n\nCode Repo: https://github.com/tobigue/Orkan\n\n\nPipelines\n=========\n\nA pipeline is a chain of computations, in which the output of\none computation is the input to the next. Orkan allows pipeline\nprocessing of a finite number of elements, but also the processing\nof an infinite stream of elements. The processing of different\nmodules in the pipeline can be parallelized, as well as multiple\nworkers for each module.\n\nTaking its cue from the terminology of `Storm `_,\nOrkan adopts the concept of spouts and bolts. In Orkan:\n\n**Spouts** are the processes which feed elements into the Pipeline.\nThey are defined as functions accepting a callback function, which\nis used to pass an element into the pipeline. Examples for spouts\nare functions listening for input via HTTP requests, crawling the\ninternet, reading large files and sending off chunks for further\nprocessing or just feeding the elements of an iterable into the pipeline::\n\n big_numbers = [\n 112272535095293,\n 112582705942171,\n 112272535095293,\n 115280095190773,\n 115797848077099,\n 1099726899285419] * 5\n\n def put_primes_spout(callback):\n for n in big_numbers:\n callback(n)\n\n**Bolts** are the processes inside the pipeline which do the further\nprocessing. They are defined as functions which accept an element from\nthe previous processing step and pass on a (possibly modified) element\nto the next module in the pipeline (or the list of results) with a\ncallback function::\n\n import math\n\n def is_prime_bolt(n, callback):\n \"\"\"From http://docs.python.org/dev/library/concurrent.futures.html\"\"\"\n if n % 2 == 0:\n callback((n, False))\n sqrt_n = int(math.floor(math.sqrt(n)))\n for i in range(3, sqrt_n + 1, 2):\n if n % i == 0:\n callback((n, False))\n callback((n, True))\n\nFor convenience of using \"normal\" functions, you can also specify bolts\nwhich do not expect a callback function. In this case, the return value\nof the function is passed to the next module in the pipeline::\n\n import math\n\n def is_prime_bolt(n):\n \"\"\"From http://docs.python.org/dev/library/concurrent.futures.html\"\"\"\n if n % 2 == 0:\n return n, False\n sqrt_n = int(math.floor(math.sqrt(n)))\n for i in range(3, sqrt_n + 1, 2):\n if n % i == 0:\n return n, False\n return n, True\n\nNote that spouts and bolts will be started in seperate\npython processes. That means, their in- and output elements have\nto be *pickable* and they should *not interact with non-threadsafe\nelements* in the main process. The passing of elements between the\ndifferent modules of the pipeline is realized using threadsafe Queues.\n\n\nUsage\n-----\n\nThis is how you set up and start a simple pipeline using the spout\nand bolt defined above::\n\n from orkan import Pipeline\n\n pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 2)])\n result = list(pipeline.start())\n\nThe pipeline is defined by passing a list of spouts and a list of\nbolts. Each element in a list\nis a tuple of the function to be executed and the number of workers\nto be spawned for this function. Note that if you run more than one\nworker for a function the order of result elements might not correspond\nto the order of the respective input elements. If you need to relate\nthe result elements to the input elements, you should pass the input\nelements along the pipeline (e.g. by returning a tuple in each bolt).\n\nBy default a pipeline is started with maximal n parallel processes,\nwhere n is the number of CPUs in your machine. That means, in the\nexample above on a dual core machine at first one spout and one bolt\nare running in parallel. As soon as the spout finishes an additional\nbolt worker is spawned. On a quad core machine all three workers will\nrun in parallel from the beginning.\n\nYou can change this by passing a value for ``n_jobs`` to ``start()``::\n\n # this example corresponds to non-parallel processing\n pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 1)])\n result = list(pipeline.start(n_jobs=1))\n\nNote that in case of an infinite input stream of data you will need\nat least one worker for every spout/bolt, as no worker will ever\nfinish and thus won't free a slot for a new worker further down in the\npipeline. I also is a good idea to not pass on any information with\nthe last bolt of an infinitely running pipeline, as otherwise you\nprobably will run out of memory at some point.\n\nYou should test your spouts and bolts before using in the pipeline,\nas error messages are not always propagated back to the main process.\n\n\nExamples\n========\n\nThe examples will use the following simple spout and bolts::\n\n def s(callback):\n \"\"\"Simple spout that puts some random numbers into the Pipeline.\"\"\"\n for _ in range(10):\n n = int(random.random() * 1000000)\n callback(n)\n\n def b1(n):\n \"\"\"Simple bolt that doubles the passed element (via return).\"\"\"\n return n * 2\n\n def b2(n, callback):\n \"\"\"Simple bolt that halves the passed element (via callback).\"\"\"\n callback(n / 2)\n\n def v(n, callback):\n \"\"\"Simple bolt for an inifinte stream of incoming data, that\n prints the result at the end of the Pipeline and does not pass\n anything on.\"\"\"\n print n\n\n\nFinite input\n------------\n\nNon-parallel processing::\n\n pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)])\n results = list(pipeline.start(n_jobs=1))\n\n \"\"\"\n s\n |\n b1\n |\n b2\n |\n result\n \"\"\"\n\nParallel processing of pipeline modules::\n\n pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)])\n results = list(pipeline.start(n_jobs=4))\n\n s----b1----b2\n |\n result\n\nParallel workers for the b1 bolt::\n\n pipeline = Pipeline([(s, 1)], [(b1, 2), (b2, 1)])\n results = list(pipeline.start(n_jobs=4))\n\n \"\"\"\n .-b1-------.\n s--| |--b2\n '-------b1-' |\n result\n \"\"\"\n\nMore workers than processes (b2 workers will wait for spouts to finish)::\n\n pipeline = Pipeline([(s, 2)], [(b1, 2), (b2, 2)])\n results = list(pipeline.start(n_jobs=4))\n\n \"\"\"\n s-------. .-b1-------.\n |--| |-+\n s-' '-------b1-' |\n .-b2-------. |\n +-| |--------------+\n | '-------b2-'\n |\n result\n \"\"\"\n\n\nInfinite Input Stream\n---------------------\n\nEndless stream of input data done right::\n\n def s2(callback):\n \"\"\"Simple spout that produces an infinite stream of random numbers.\"\"\"\n while 1:\n n = int(random.random() * 1000000)\n callback(n)\n\n pipeline = Pipeline([(s2, 1)], [(b1, 1), (v, 1)])\n results = list(pipeline.start(n_jobs=4))\n\n \"\"\"\n s2---b1----v\n \"\"\"\n\nEndless stream of input data done wrong (v workers will never start)::\n\n pipeline = Pipeline([(s, 2)], [(b1, 2), (v, 2)])\n results = list(pipeline.start(n_jobs=4))\n\n \"\"\"\n s2------. .-b1-------.\n |--| |---#!\n s2-' '-------b1-'\n \"\"\"\n\n\nTests\n=====\n\nTesting requires having the nose library (`pip install nose`).\nAfter installation, the package can be tested by executing from\noutside the source directory::\n\n nosetests --exe -v\n\n\nKnown Issues\n============\n\n* Does not work on Windows", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/tobigue/Orkan", "keywords": null, "license": "LICENSE.txt", "maintainer": null, "maintainer_email": null, "name": "Orkan", "package_url": "https://pypi.org/project/Orkan/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/Orkan/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/tobigue/Orkan" }, "release_url": "https://pypi.org/project/Orkan/0.2.1/", "requires_dist": null, "requires_python": null, "summary": "Orkan is a pipeline parallelization library, written in Python.", "version": "0.2.1" }, "last_serial": 834263, "releases": { "0.1.0": [], "0.1.1": [], "0.1.2": [], "0.2.1": [ { "comment_text": "", "digests": { "md5": "520723d3c36fa11f691d9e373ac82956", "sha256": "4cce8e79754882e6f1acc83c2a00a5ac7d16c0b6bf6e862f720edbf573a8d33b" }, "downloads": -1, "filename": "Orkan-0.2.1.tar.gz", "has_sig": false, "md5_digest": "520723d3c36fa11f691d9e373ac82956", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6412, "upload_time": "2013-08-07T11:29:35", "url": "https://files.pythonhosted.org/packages/c7/20/f24c45c9f733d05a25dc0b2d3c59d9ad7b833786fe78226a6679ea5cd7be/Orkan-0.2.1.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "520723d3c36fa11f691d9e373ac82956", "sha256": "4cce8e79754882e6f1acc83c2a00a5ac7d16c0b6bf6e862f720edbf573a8d33b" }, "downloads": -1, "filename": "Orkan-0.2.1.tar.gz", "has_sig": false, "md5_digest": "520723d3c36fa11f691d9e373ac82956", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6412, "upload_time": "2013-08-07T11:29:35", "url": "https://files.pythonhosted.org/packages/c7/20/f24c45c9f733d05a25dc0b2d3c59d9ad7b833786fe78226a6679ea5cd7be/Orkan-0.2.1.tar.gz" } ] }