{ "info": { "author": "Edward Newell", "author_email": "edward.newell@gmail.com", "bugtrack_url": null, "classifiers": [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 2", "Programming Language :: Python :: 3", "Topic :: Software Development :: Build Tools" ], "description": "# iterable-queue\nPrepare to feel relaxed. Last time was the last time you will muck around \nwith the unnecessarily messy logic of managing pools of producers and \nconsumers in multiprocessing python programs.\n\n## Install ##\n\n```bash\npip install iterable-queue\n```\n\n## Why? ##\n\nSuppose you have a pool of *consumers* consuming tasks from a queue, which is\nbeing populated by a pool of *producers*. Of course, you want the consumers to\nkeep pulling work from the queue as long as that queue isn't empty.\n\nThe tricky part is, if the consumers are fast, they may continually drive\nthe queue to empty even though the producers are still busy adding work. So,\nhow do the consumers know if the work is really done, or if the queue is just\ntemporarily empty?\n\nIf you have one producer and many consumers, or if you have one consumer and\nmany producers, you can manage it by sending special `done` signals over the\nqueue. I find even that to be a bit annoying, but when you have many producers\nand many consumers, things get more complex.\n\n## Meet `IterableQueue` ##\n\n`IterableQueue` handles signalling in the background to keep track of how many\nproducers and consumers are active, so you only have to worry about doing the\nactual producing and consuming. The queue itself knows the difference between\nbeing temporarily empty, and being *done*.\n\n`IterableQueue` is a directed queue, which means that it has \n(arbitrarily many) *producer endpoints* and *consumer endpoints*. \n\nBecause it knows when it's done, it can support the iterable interface. That\nmeans that for consumers, the queue looks just like an iterable. The consumers\ndon't even have to know they have a queue. \n\nProducers use the queue pretty much like a `multiprocessing.Queue`, but with \none small variation: when they are done putting work on the queue, they should \ncall `queue.close()`:\n\n```python\nproducer_func(queue):\n while some_condition:\n ...\n queue.put(some_work)\n ...\n queue.close()\n```\n\nThe call to `IterableQueue.close()` is what makes it possible for the queue to \nknow when there's no more work coming, so that it can be treated like an\niterable by consumers:\n\n```python\nconsumer_func(queue):\n for work in queue:\n do_something_with(work)\n```\n\nYou can, if you choose, consume the queue \"manually\" by calling `queue.get()`.\nIt delegates to an underlying `multiprocessing.Queue` and supports all of the\nusual arguments. Calling `get()` on a queue, with `block=True` and\n`timeout=None` (the defaults) will raise `Queue.Empty` if the queue is empty,\nlike usual. However, if the queue is not just empty, but properly *done* (i.e.\nthere are no more active producers) `IterableQueue.Done` will be raised\ninstead.\n\n## Example ##\nAs mentioned, `IterableQueue` is a directed queue, meaning that it has \nproducer and consumer endpoints. Both wrap the same underlying \n`multiprocessing.Queue`, and expose *nearly* all of its methods.\nImportant exceptions are the `put()` and `get()` methods: you can only\n`put()` onto producer endpoints, and you can only `get()` from consumer \nendpoints. This distinction is needed for the management of consumer \niteration to work automatically.\n\nLet's start by setting up a function that will be executed by *producers*, i.e.\nworkers that *put onto* the queue:\n\n```python\ndef producer_func(queue, producer_id):\n for i in range(10):\n queue.put(producer_id)\n queue.close()\n```\n\nNotice how the producer calls `queue.close()` when it's done putting\nstuff onto the queue.\n\nNow let's set up a consumer function:\n```python\ndef consumer_func(queue, consumer_id):\n for item in queue:\n print('consumer %d saw item %d' % (consumer_id, item))\n```\n\nNotice how the consumer treats the queue as an iterable.\n\nNow, let's get some processes started:\n\n```python\n\nfrom multiprocessing import Process\nfrom iterable_queue import IterableQueue\n\nNUM_PRODUCERS = 3\nNUM_CONSUMERS = 5\n\n# Make an iterableQueue instance\niq = IterableQueue()\n\n# Start a bunch of producers, give each one a producer endpoint\nproducers = []\nfor producer_id in range(NUM_PRODUCERS):\n queue = iq.get_producer()\n p = Process(target=producer_func, args=(queue, producer_id))\n p.start()\n producers.append(p)\n\n# And start a bunch of consumers\nconsumers = []\nfor consumer_id in range(NUM_CONSUMERS):\n\n # Give each consumer a \"consumer-queue\"\n consumer_endpoint = iq.get_consumer()\n p = Process(target=consumer_func, args=(consumer_endpoint, consumer_id))\n p.start()\n consumers.append(p)\n\n# Lastly -- this is important -- close the IterableQueue.\niq.close() # This indicates no new producers endpoints will be made\n\n# Wait for workers to finish\nfor p in producers + consumers:\n p.join()\n\n```\n\nNotice the last line—this lets the `IterableQueue` know that no new \nproducers will be coming onto the scene and adding more work.\n\nAnd we're done. No signalling, no keeping track of process completion, \nand no `try ... except Empty`, just put on one end, and iterate on the other.\n\nThe output you'd see from running the example is below. You can try the above example by running [`example.py`](https://github.com/enewe101/iterable_queue/blob/master/iterable_queue/example.py).\n\n```\nconsumer 1 saw item 0\nconsumer 0 saw item 0\nconsumer 1 saw item 0\nconsumer 0 saw item 0\nconsumer 1 saw item 0\nconsumer 1 saw item 0\nconsumer 0 saw item 1\nconsumer 1 saw item 1\nconsumer 2 saw item 0\nconsumer 0 saw item 0\nconsumer 2 saw item 1\nconsumer 0 saw item 0\nconsumer 1 saw item 2\nconsumer 0 saw item 1\nconsumer 2 saw item 2\nconsumer 1 saw item 2\nconsumer 0 saw item 1\nconsumer 1 saw item 2\nconsumer 3 saw item 0\nconsumer 2 saw item 1\nconsumer 1 saw item 2\nconsumer 2 saw item 1\nconsumer 3 saw item 2\nconsumer 3 saw item 1\nconsumer 0 saw item 2\nconsumer 1 saw item 2\nconsumer 2 saw item 1\nconsumer 4 saw item 1\nconsumer 2 saw item 2\nconsumer 4 saw item 2\n```", "description_content_type": null, "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/enewe101/iterable_queue", "keywords": "threading multiprocessing scheduling batch processing queue queueing", "license": "MIT", "maintainer": "", "maintainer_email": "", "name": "iterable-queue", "package_url": "https://pypi.org/project/iterable-queue/", "platform": "", "project_url": "https://pypi.org/project/iterable-queue/", "project_urls": { "Homepage": "https://github.com/enewe101/iterable_queue" }, "release_url": "https://pypi.org/project/iterable-queue/1.2.2/", "requires_dist": null, "requires_python": "", "summary": "A queue for python that feels like an iterable and knows when its producers are finished", "version": "1.2.2" }, "last_serial": 3067496, "releases": { "0.1.0": [ { "comment_text": "", "digests": { "md5": "4570ae332ba2e1c73dd04da84802a658", "sha256": "e2499fb5e1bd162c39caba7f013732031efa69f1f04646a3c7a5d07b9203f2c7" }, "downloads": -1, "filename": "iterable-queue-0.1.0.tar.gz", "has_sig": false, "md5_digest": "4570ae332ba2e1c73dd04da84802a658", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11491, "upload_time": "2016-05-04T17:09:57", "url": "https://files.pythonhosted.org/packages/a6/f9/66848d7fc66c742f64e450c2ab52dbff32cec389a7b4270aeb3a5f6a251b/iterable-queue-0.1.0.tar.gz" } ], "0.2.0": [ { "comment_text": "", "digests": { "md5": "eaa123036be69979068e87a0aac1cc26", "sha256": "2f961ec34c4a9fd816ca801074d5272e0628b8d2e3213b4d842a1dcf2372698a" }, "downloads": -1, "filename": "iterable-queue-0.2.0.tar.gz", "has_sig": false, "md5_digest": "eaa123036be69979068e87a0aac1cc26", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11539, "upload_time": "2017-01-02T05:14:44", "url": "https://files.pythonhosted.org/packages/92/3e/5a276edadf9a681cb1c4ee0595a2a02d17d8f0e511bab35abccaf76b1ccb/iterable-queue-0.2.0.tar.gz" } ], "0.2.1": [ { "comment_text": "", "digests": { "md5": "065547eea3ae942b0a276ea626254bfd", "sha256": "de1e61d5a35e9d070c42e89a4c9aff59c3eb78791e3c4776884af141703e35fa" }, "downloads": -1, "filename": "iterable-queue-0.2.1.tar.gz", "has_sig": false, "md5_digest": "065547eea3ae942b0a276ea626254bfd", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11546, "upload_time": "2017-01-02T05:27:22", "url": "https://files.pythonhosted.org/packages/4a/d1/536b9a069dfce173300ec752e152b146a09732da9f2f3e976e990481b98d/iterable-queue-0.2.1.tar.gz" } ], "1.0.0": [ { "comment_text": "", "digests": { "md5": "3a9dac6f963d42efae241d1c281fa118", "sha256": "ea4f45aedafdb97154249aee1324e62908c947af5bbc25068721bbae11cded05" }, "downloads": -1, "filename": "iterable-queue-1.0.0.tar.gz", "has_sig": false, "md5_digest": "3a9dac6f963d42efae241d1c281fa118", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11614, "upload_time": "2017-01-17T02:49:49", "url": "https://files.pythonhosted.org/packages/94/05/2e1b03d9674b6d8576346c43b678c7269d73f3ca040bf5b4b8060ee4b53e/iterable-queue-1.0.0.tar.gz" } ], "1.1.0": [ { "comment_text": "", "digests": { "md5": "5117437d2098c07c8f5440263de6b7fd", "sha256": "2aa3298ad04efbf8993bd6ff0e454a62290328fa843f99f14ce689b79633732d" }, "downloads": -1, "filename": "iterable-queue-1.1.0.tar.gz", "has_sig": false, "md5_digest": "5117437d2098c07c8f5440263de6b7fd", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12255, "upload_time": "2017-01-31T07:43:13", "url": "https://files.pythonhosted.org/packages/6e/4c/3f494208c6d267ad7c606354f02af52e55957cd78fefee32682d251a69b8/iterable-queue-1.1.0.tar.gz" } ], "1.1.1": [ { "comment_text": "", "digests": { "md5": "f8eee012e6033a1fe550145514b55091", "sha256": "4f564bb0092ad8c069706351012c39de59ad46bec018d91cc85ecdfd1448d2bd" }, "downloads": -1, "filename": "iterable-queue-1.1.1.tar.gz", "has_sig": false, "md5_digest": "f8eee012e6033a1fe550145514b55091", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12249, "upload_time": "2017-01-31T07:48:43", "url": "https://files.pythonhosted.org/packages/87/7f/66108b6b3e89a3149ee9c66cc99c4aeb8c2151ff092c1999cd1a104d4a8a/iterable-queue-1.1.1.tar.gz" } ], "1.2.0": [ { "comment_text": "", "digests": { "md5": "256bdf0d474ab8047d8155be95cb786a", "sha256": "14ede27007c2cc7cb3dde5b097aea79a44d2f42f06412d97e68dd6378b867377" }, "downloads": -1, "filename": "iterable-queue-1.2.0.tar.gz", "has_sig": false, "md5_digest": "256bdf0d474ab8047d8155be95cb786a", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11927, "upload_time": "2017-07-15T18:33:36", "url": "https://files.pythonhosted.org/packages/00/7c/7ff8b9f634782212025f57294d2498f196ef2411f81cb9efe2796f08c6b1/iterable-queue-1.2.0.tar.gz" } ], "1.2.1": [ { "comment_text": "", "digests": { "md5": "6fea549818157a79edc8243f78a017e2", "sha256": "97960aeb63f286facc45250d35d272192357918303f1db63fc32c4fb43d766ce" }, "downloads": -1, "filename": "iterable-queue-1.2.1.tar.gz", "has_sig": false, "md5_digest": "6fea549818157a79edc8243f78a017e2", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12250, "upload_time": "2017-07-17T17:28:11", "url": "https://files.pythonhosted.org/packages/23/02/4dc46cbb558ca1da0456b4a440f4c94815bc46801b96a2c54e85dbc7e6cc/iterable-queue-1.2.1.tar.gz" } ], "1.2.2": [ { "comment_text": "", "digests": { "md5": "d5048a823671410c6ba2e698a7be1f2b", "sha256": "4dc52b4b560ba950ee114b4a242fd0d23c78488e5f95afa6f05b2158b2a4c65a" }, "downloads": -1, "filename": "iterable-queue-1.2.2.tar.gz", "has_sig": false, "md5_digest": "d5048a823671410c6ba2e698a7be1f2b", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12739, "upload_time": "2017-08-02T16:07:56", "url": "https://files.pythonhosted.org/packages/38/de/2fb3cbaa46d0543c4653f58d0fbb65ff7c4a9f1390cf3c4cfdebf2b62bc9/iterable-queue-1.2.2.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "d5048a823671410c6ba2e698a7be1f2b", "sha256": "4dc52b4b560ba950ee114b4a242fd0d23c78488e5f95afa6f05b2158b2a4c65a" }, "downloads": -1, "filename": "iterable-queue-1.2.2.tar.gz", "has_sig": false, "md5_digest": "d5048a823671410c6ba2e698a7be1f2b", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12739, "upload_time": "2017-08-02T16:07:56", "url": "https://files.pythonhosted.org/packages/38/de/2fb3cbaa46d0543c4653f58d0fbb65ff7c4a9f1390cf3c4cfdebf2b62bc9/iterable-queue-1.2.2.tar.gz" } ] }