{ "info": { "author": "Tikitu de Jager", "author_email": "tikitu@logophile.org", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python :: 2.7", "Topic :: Utilities" ], "description": "A data-processing pipeline that guarantees that tasks exit the pipeline in the same order they came in, but\nuses multiprocessing to parallelise them in the middle. You put callables and their arguments into the queue with\n``put(task, *args, **kwargs)``. Each call to ``get()`` will return the result of executing a task, in the order the\ntasks were added, but the ``task`` calls themselves run concurrently across multiple processes.\n\nWhat would you use that for?\n----------------------------\n\nThe use case that made me write this is logging completion of a large batch operation that contains many parallelisable\ncomponents; in our case, indexing a large number of documents. It's a multi-threaded system that (substantially\nsimplified) looks something like this::\n\n class BatchDone(int): pass # marker for completion of an indexing batch\n\n def producer(queue):\n while True:\n doc_count = 0\n for document in get_document_batch():\n queue.put(indexing_func, document) # callables get executed on the arguments you supply\n doc_count += 1\n queue.put(BatchDone(doc_count)) # non-callables get passed through, but still in FIFO order\n\n def consumer(queue):\n while True:\n result_or_batch_done = queue.get() # either the return value of an indexing_func call, or the marker\n if isinstance(result_or_batch_done, BatchDone):\n log_progress(result_or_batch_done)\n\n queue = ParallelQueue()\n\n Thread(target=producer, args=(queue,)).start()\n Thread(target=consumer, args=(queue,)).start()\n\nThe advantage compared to the naive solution (execute each batch in parallel but wait until all have finished before\nstarting the next batch) is that a single slow document in a batch doesn't prevent the next batch from starting: it\nonly holds up the pipeline as a whole if the internal buffers fill up (by default these can accumulate ten times as\nmany out-of-order tasks as there are available CPUs).\n\nQuick start guide\n-----------------\n\nMake a queue object::\n\n from parallel_queue import ParallelQueue\n queue = ParallelQueue(daemon=True) # there are a bunch of optionals for internal queue and buffer sizes\n\nPut tasks and their data into the queue (not everything is allowed here, because of limitations on what can be passed\nto the worker processes; see below for details)::\n\n queue.put(task_function, \"data\", timeout=5) # NOT THREADSAFE! Only do this from one thread!\n\nAnd get them out (usually on a different thread to the ``put()`` calls)::\n\n queue.get(timeout=1)\n\nClean up the queue when you're done with it (this is also NOT THREADSAFE: ``stop()`` should only be called from the\nsame thread as the ``put()`` calls)::\n\n queue.stop() # send a \"stop\" signal through the pipes, but return immediately\n queue.join(timeout=2) # join all worker and consumer processes (i.e. wait for queue to clear)\n\nIf you forget to do this and you didn't make your queue with ``daemon=True``, if it gets garbage-collected you may\nsee junk like this on stderr::\n\n Exception AssertionError: AssertionError() in ignored\n\nI'm not certain how serious this is, but something is clearly not right so I suggest you avoid it if possible.\n\nFinally, you can confirm that the shutdown is complete::\n\n if queue.is_alive():\n raise Exception(\"join() must have timed out!\")\n\nLimitations on what you can pass through the pipes\n--------------------------------------------------\n\nBehind the scenes the callable task and arguments that you ``put()`` is being sent to a worker process by\nthe ``multiprocessing`` standard library package. Most importantly, this means they have all to be picklable.\nAll the builtin types are picklable, but only classes and functions defined at the top level of a module can be pickled.\nThat means constructions like this are not going to work::\n\n def make_me_a_task_function(first):\n def task_function(second):\n return ' '.join((first, second))\n return task_function\n\n queue.put(make_me_a_task_function(\"curried\"), \"eggs\") # fails\n\nAlso, annoyingly, functions and classes defined by hand in a running interpreter session are *not* picklable, so if\nyou're playing around you'll have to use builtins or imported functions.\n\nThere's another limitation that you're not likely to run into, but which I include for completeness: various\ncross-process synchronisation primitives are not allowed to be shared between processes by pickling.\n\nThe partial solution to both these problems is to use the slightly lower-level ``ParallelSingleTaskQueue`` instead of\n``ParallelQueue``. It bakes in a single task function (so is less flexible) but shares it between processes by\ninheritance rather than pickling (so non-top-level functions and multiprocess synchronisation primitives are fine).\nSee the test using the ``PermutationTask`` class for an example of this technique (and its disadvantages in terms of\nreadability).\n\nAlgorithm\n---------\n\nThe basic algorithm is for the parallelisation is::\n\n ===================================\n Shared multiprocess-safe resources\n - (shared) Queue \"worker_input\" (fed from producer)\n - (shared) Queue \"worker_output\" (from worker to consumer)\n - (shared) Value \"max_packet_id\" (highest packet id consumer is willing to accept from any worker)\n\n Consumer private resources\n - next_packet_id (next packet id consumer wants to send on)\n - PriorityQueue \"consumer_output_buffer\" (buffer for packets that arrive at consumer out of order)\n\n Worker processes loop on:\n 1. packet = worker_input.get()\n 2. result = user_supplied_task(packet)\n 3. while max_packet_id.value < packet.id: take a short nap\n 4. worker_output.put(result-with-packet-id)\n\n Consumer process:\n 1. next_packet_id = 0\n 2. while True:\n 3. max_packet_id.value = next_packet_id + consumer_output_buffer.maxsize - 1\n # e.g. suppose buffer fits 5; waiting for 0 we can safely accept 1,2,3,4\n 4. while consumer_output_buffer.peek().packet_id != next_packet_id:\n 5. consumer_output_buffer.add(worker_output.get())\n 6. output(consumer_output_buffer.pop())\n 7. next_packet_id += 1\n ===================================\n\nThere are some complications (see `the code`_ for what they give rise to):\n\n.. _`the code`: https://bitbucket.org/tikitu/parallel_queue/src/tip/parallel_queue/__init__.py\n\n- ``PriorityQueue`` has no ``peek()``\n- instead of \"take a short nap\", workers wait on an ``Event`` from the consumer (set whenever it progresses)\n- packet ids should not be allowed to increase without limit: they wrap around and comparisons use modular arithmetic\n- we need to be able to gracefully shut everything down when requested\n- exception handling needs to expose stack traces from the worker processes\n- reliable testing needs a tiny bit of information about internal progress of the algorithm\n\nInstalling\n----------\n\nIt's on PyPI::\n\n $ pip install parallel-queue\n\nInstalling for hackery\n----------------------\n\nYMMV, but here's how I do it (you will need virtualenv_ installed, and pip_ to install ``nose`` to run the tests)::\n\n $ hg clone https://bitbucket.org/tikitu/parallel_queue\n $ cd parallel_queue\n $ virtualenv --no-site-packages .\n\n.. _pip: http://www.pip-installer.org/en/latest/installing.html\n.. _virtualenv: https://pypi.python.org/pypi/virtualenv\n\nRunning the tests::\n\n $ bin/pip install nose\n $ bin/nosetests", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "UNKNOWN", "keywords": "", "license": "UNKNOWN", "maintainer": null, "maintainer_email": null, "name": "parallel-queue", "package_url": "https://pypi.org/project/parallel-queue/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/parallel-queue/", "project_urls": { "Download": "UNKNOWN", "Homepage": "UNKNOWN" }, "release_url": "https://pypi.org/project/parallel-queue/0.5/", "requires_dist": null, "requires_python": null, "summary": "A multi-process task queue that maintains FIFO order.", "version": "0.5" }, "last_serial": 796013, "releases": { "0.3": [ { "comment_text": "", "digests": { "md5": "a73d1455f4deba67002238f5678d3773", "sha256": "d3834d9f6e03ed71e57d0791d62e1696dc60d46af2505179b090cdfe0675187d" }, "downloads": -1, "filename": "parallel-queue-0.3.tar.gz", "has_sig": false, "md5_digest": "a73d1455f4deba67002238f5678d3773", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8568, "upload_time": "2013-03-20T08:40:59", "url": "https://files.pythonhosted.org/packages/e5/e3/6f99cef263a1985db827a9f8e26c9cb1655b06d707bd5e5e7d0628064989/parallel-queue-0.3.tar.gz" } ], "0.4": [ { "comment_text": "", "digests": { "md5": "5309d7801c8c7f85f443eb31f6fa3faa", "sha256": "49d10bb30fe25f0f73be18f51a0805011f5ca48cc9f71813cb64a0f0438e7038" }, "downloads": -1, "filename": "parallel-queue-0.4.tar.gz", "has_sig": false, "md5_digest": "5309d7801c8c7f85f443eb31f6fa3faa", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8707, "upload_time": "2013-04-08T12:28:11", "url": "https://files.pythonhosted.org/packages/81/48/146b8678667d63a9fb77517d491b28d8a7fb8161b09246d03a760dd2037c/parallel-queue-0.4.tar.gz" } ], "0.5": [ { "comment_text": "", "digests": { "md5": "f017e33087a4f950a68683bfb5b8f4e6", "sha256": "b091ac891aa0d858482c9c1bbfc38a09dca787c57ab81991eae67301f15697a6" }, "downloads": -1, "filename": "parallel-queue-0.5.tar.gz", "has_sig": false, "md5_digest": "f017e33087a4f950a68683bfb5b8f4e6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8728, "upload_time": "2013-06-17T15:28:20", "url": "https://files.pythonhosted.org/packages/90/93/f1fae17433789406e360a06155e4575fda789a238004425f3ad6d0e42438/parallel-queue-0.5.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "f017e33087a4f950a68683bfb5b8f4e6", "sha256": "b091ac891aa0d858482c9c1bbfc38a09dca787c57ab81991eae67301f15697a6" }, "downloads": -1, "filename": "parallel-queue-0.5.tar.gz", "has_sig": false, "md5_digest": "f017e33087a4f950a68683bfb5b8f4e6", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8728, "upload_time": "2013-06-17T15:28:20", "url": "https://files.pythonhosted.org/packages/90/93/f1fae17433789406e360a06155e4575fda789a238004425f3ad6d0e42438/parallel-queue-0.5.tar.gz" } ] }