{ "info": { "author": "saaj", "author_email": "mail@saaj.me", "bugtrack_url": null, "classifiers": [ "Intended Audience :: Developers", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Topic :: Software Development :: Libraries" ], "description": ".. image:: https://bitbucket-badges.atlassian.io/badge/saaj/looppool.svg?ref=default\r\n :target: https://bitbucket.org/saaj/looppool/addon/pipelines/home\r\n.. image:: https://codecov.io/bitbucket/saaj/looppool/branch/default/graph/badge.svg\r\n :target: https://codecov.io/bitbucket/saaj/looppool/branch/default \r\n.. image:: https://badge.fury.io/py/Looppool.png\r\n :target: https://pypi.python.org/pypi/Looppool\r\n\r\n********\r\nLooppool\r\n********\r\nLooppool is a Python 3 package for running worker process pool of Tornado IO loops. It's useful\r\nfor a heavy asynchronous application which doesn't fit into single process due to increasing\r\nCPU usage and suffers from IO loop blocking (see ``set_blocking_log_threshold`` [1]_).\r\n\r\nIt was developed as a part of performance optimisation of a Tornado data extraction application.\r\nThe application mixed IO-bound and CPU-bound tasks. Moreover, the CPU-bound tasks were highly\r\ncoupled with IO loop. Because of such coupling much simpler approach of ``concurrent.futures`` \r\n[3]_ wouldn't have helped.\r\n\r\nDesign\r\n======\r\nA picture worth a thousand words.\r\n\r\n.. image:: https://bytebucket.org/saaj/looppool/raw/default/manual/overview.png\r\n :target: https://bbcdn.githack.com/saaj/looppool/raw/default/manual/overview.seq.violet.html\r\n\r\nA few observations and notes:\r\n\r\n1. Messages are off-loaded to IO loops immediately but task and result queues are\r\n protected by semaphores to not off-load more tasks on IO loops than the queue size. \r\n2. In fact there are ``n`` task queue, a queue per worker. Task messages are distributed\r\n evenly between workers. \r\n3. ``add_callback`` [2]_ is safe (and only safe) method to pass control from other\r\n thread to IO loop's thread.\r\n4. Because queue message handlers (``fn1`` and ``fn2``) are called from IO loop\r\n they can be coroutines.\r\n5. ``Pool`` stops its workers by sending ``PoisonPill`` task message per worker.\r\n\r\nWorker\r\n======\r\nThere is a base class for a ``Worker``. It represents a process worker that runs its own\r\nTornado IO loop. It processes task messages and puts them in result queue (or just puts without \r\nreceiving anything). It requires override of ``_process_message(self, task)`` in a subclass \r\nand mandates that once the task is done, ``self._task_done()`` is called (directly in \r\n``try-finally`` or with ``@task_done``), like::\r\n\r\n def _process_message(self, task):\r\n try:\r\n result = 'some processing'\r\n self._put_nowait(result)\r\n finally:\r\n self._task_done()\r\n\r\n``WorkerSubclass._process_message`` may be plain function or coroutine. More details are \r\navailable in the package's unit test module [5]_.\r\n\r\n.. note::\r\n For ``Pool.process_message`` there is also requirement to call ``result_done`` argument\r\n once message is done. You can also wrap it with ``@task_done`` in simple case when\r\n result is considered ingested on return of ``process_message``.\r\n \r\n Both requirements are bound to semaphores that limit running tasks and pending results. \r\n It also affects how pool is stopped as it waits for running tasks to complete. \r\n\r\nStateful worker\r\n---------------\r\nIf you want to run stateful workers, for instance, use some periodically calculated lookup table,\r\nbut don't want to calculate it in every workers (e.g. burden of maintaining database connection),\r\nyou can send ``n`` tasks and each of ``n`` workers is guaranteed to receive it.\r\n\r\nYou can also send messages to workers individually. ``Pool.put_nowait`` has optional argument\r\n``worker_num``.\r\n\r\n.. note::\r\n If your process start method [4]_ is *fork* (default on \\*nix platforms), you can share\r\n some static data from parent process.\r\n\r\nInstallation\r\n============\r\n.. sourcecode:: bash\r\n\r\n pip install Looppool\r\n \r\nUsage\r\n=====\r\n.. note::\r\n Note that the following example has 1-to-1 correspondence of input to output messages.\r\n Because worker has IO loop at its disposal, it can, for example, subscribe to something \r\n by input message and put results later without task message.\r\n\r\n.. sourcecode:: python\r\n\r\n #!/usr/bin/env python3\r\n \r\n \r\n import looppool\r\n from tornado import gen, ioloop, httpclient\r\n \r\n \r\n class FetchWorker(looppool.Worker):\r\n \r\n _http_client = None\r\n '''Tornado asynchronous HTTP client'''\r\n \r\n \r\n def _initialise(self):\r\n self._http_client = httpclient.AsyncHTTPClient()\r\n \r\n @looppool.task_done\r\n @gen.coroutine\r\n def _process_message(self, url):\r\n response = yield self._http_client.fetch(url)\r\n self._put_nowait((url, response.headers.get('server')))\r\n \r\n \r\n @looppool.task_done\r\n def process_message(result, result_done):\r\n print(result)\r\n \r\n \r\n @gen.coroutine\r\n def main():\r\n loop = ioloop.IOLoop.instance()\r\n pool = looppool.Pool(loop, pool_size = 4, worker_class = FetchWorker)\r\n pool.process_message = process_message\r\n pool.start()\r\n \r\n urls = [\r\n 'https://python.org/',\r\n 'http://tornadoweb.org/',\r\n 'https://google.com/',\r\n 'https://stackoverflow.com/',\r\n ]\r\n list(map(pool.put_nowait, urls))\r\n \r\n pool.stop()\r\n \r\n \r\n if __name__ == '__main__':\r\n ioloop.IOLoop.instance().run_sync(main)\r\n\r\nMaintenance\r\n===========\r\nMaintaining a process group instead of one process is more tricky thing to do. \r\nInitially, you may want to see if your pool instance has actually spawned any\r\nprocesses. Here's what you can do visualise your process tree, which has main process,\r\none or two (depending on start method) ``multiprocessing`` helper processes, and processes\r\nof your ``looppool`` pools (you can use different pools for different purposes):: \r\n\r\n htop -p $(pgrep -d\",\" -g $(pgrep -f \"main-process-name-or-its-start-args\"))\r\n \r\n``top`` will also work but is limited to 20 PIDs. Enforcing the process tree stop is also\r\ndifferent. If something goes wrong process tree should be killed like::\r\n\r\n kill -9 -- -$(pgrep -f \"main-process-name-or-its-start-args\")\r\n\r\nKilling only main process will leave helper and worker processes running.\r\n\r\n.. note::\r\n Pool workers intentionally ignore ``SIGINT`` and ``SIGTERM`` because these signals\r\n propagate to children from parent process and break normal, message-based shutdown.\r\n \r\nYou can improve the names of your worker processes by setting them in worker's \r\ninitialiser with ``setproctitle`` [6]_ (see example below).\r\n\r\nMonitoring\r\n----------\r\nGenerally it's very important to know how well your application behaves. Even more \r\nimportant it is for single-threaded (asynchronous) and multi-process applications.\r\nFor the former is critical to know that the process doesn't use 100% CPU except for rare peaks,\r\nwhich would otherwise impair IO loop's ability to schedule tasks. For the latter CPU usage \r\nshows how well current number of workers handle the load. This is being said about application\r\nmetrics.\r\n\r\n``looppool`` comes with build-in ``loopppol.utility.ResourceReporter`` which periodically \r\n(10 seconds by default) sends metrics as CPU usage, memory usage (RSS) and length of IO \r\nloop backlogs (``ioloop._handlers``, ``ioloop._callbacks`` and ``ioloop._timeouts``) to \r\nstatsd-compatible server [8]_. \r\n\r\n\r\nLogging\r\n-------\r\nMulti-process logging is complicated. Most important part of logging is error reporting.\r\nSentry [7]_ goes a great solution to error reporting problem. It seamlessly integrates\r\nwith ``logging`` and is suggested tool to know what errors occur in your workers.\r\n\r\nInstrumentated worker\r\n---------------------\r\nFor the following code you will need to run ``pip install raven statsd setproctitle``.\r\n\r\n.. sourcecode:: python\r\n\r\n #!/usr/bin/env python3\r\n \r\n \r\n import logging\r\n \r\n import looppool\r\n from looppool.utility import ResourceReporter \r\n from statsd import StatsClient\r\n from raven import Client\r\n from raven.exceptions import InvalidDsn\r\n from raven.handlers.logging import SentryHandler\r\n from setproctitle import setproctitle\r\n \r\n \r\n class InstrumentatedWorker(looppool.Worker):\r\n \r\n _resource_reporter = None\r\n '''CPU, RSS and IO loop stats reporter'''\r\n \r\n \r\n def _initialise(self):\r\n setproctitle('python APP_NAME POOL_NAME pool worker')\r\n \r\n statsd = StatsClient('localhost', 8125, 'APP_PREFIX')\r\n self._resource_reporter = ResourceReporter(self._ioloop, statsd,\r\n 'worker.instrumentated.process.{}'.format(self._number))\r\n \r\n try:\r\n handler = SentryHandler(Client('SENTRY_DSN'))\r\n handler.setLevel(logging.WARNING)\r\n except InvalidDsn:\r\n logging.exception('Cannot configure Sentry handler')\r\n else:\r\n logging.basicConfig(handlers=[handler], level=logging.WARNING)\r\n \r\n @looppool.task_done\r\n def _process_message(self, message):\r\n self._put_nowait((message, self._number))\r\n \r\n def start(self):\r\n super().start()\r\n \r\n self._resource_reporter.start()\r\n \r\n def join(self):\r\n self._resource_reporter.stop()\r\n \r\n super().join()\r\n\r\n\r\n.. [1] http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.set_blocking_log_threshold\r\n.. [2] http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.add_callback\r\n.. [3] https://docs.python.org/3/library/concurrent.futures.html\r\n.. [4] https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_start_method\r\n.. [5] https://bitbucket.org/saaj/looppool/src/default/looppool/test.py\r\n.. [6] https://pypi.python.org/pypi/setproctitle\r\n.. [7] https://pypi.python.org/pypi/sentry\r\n.. [8] https://github.com/etsy/statsd/wiki#server-implementations", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://bitbucket.org/saaj/looppool", "keywords": "python tornado multiprocessing process-pool io-loop", "license": "LGPL-2.1+", "maintainer": "", "maintainer_email": "", "name": "Looppool", "package_url": "https://pypi.org/project/Looppool/", "platform": "Any", "project_url": "https://pypi.org/project/Looppool/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://bitbucket.org/saaj/looppool" }, "release_url": "https://pypi.org/project/Looppool/0.4.0/", "requires_dist": null, "requires_python": null, "summary": "Tornado IO loop process pool with message passing", "version": "0.4.0" }, "last_serial": 2903379, "releases": { "0.1.0": [ { "comment_text": "", "digests": { "md5": "ab9f81fc0d63b7f76cdd9fed8941636a", "sha256": "e038731628f84bc821f6557dd976efc990c52774878dc976ea8fe3fb999eb555" }, "downloads": -1, "filename": "Looppool-0.1.0.tar.gz", "has_sig": false, "md5_digest": "ab9f81fc0d63b7f76cdd9fed8941636a", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 5256, "upload_time": "2016-10-12T21:13:09", "url": "https://files.pythonhosted.org/packages/7a/49/59118f8d7ac3fb7f38a0711318b49df103b73259105e5e34bb4d2ac46c7c/Looppool-0.1.0.tar.gz" } ], "0.2.0": [ { "comment_text": "", "digests": { "md5": "df463007d7166c04261a1e33e8acdcca", "sha256": "0eb761d1f08f8d0aa8c5150f90220ea831daf7d67757fb9fda3cbd488b1279f7" }, "downloads": -1, "filename": "Looppool-0.2.0.tar.gz", "has_sig": false, "md5_digest": "df463007d7166c04261a1e33e8acdcca", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 12998, "upload_time": "2016-10-16T11:31:55", "url": "https://files.pythonhosted.org/packages/a4/3f/1c7962f9869c0baa3ac9a81457fd8f43fc0b922c34e4ef60b14aaca31ac2/Looppool-0.2.0.tar.gz" } ], "0.3.0": [ { "comment_text": "", "digests": { "md5": "ed05edb9d5ab4d049e1bd6e04735cca1", "sha256": "99d13c3089cd0fb28195e42417aabbdea4b51b7a0cb6d22d3b8d3dcbe6052bcc" }, "downloads": -1, "filename": "Looppool-0.3.0.tar.gz", "has_sig": false, "md5_digest": "ed05edb9d5ab4d049e1bd6e04735cca1", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 13644, "upload_time": "2016-10-26T14:17:09", "url": "https://files.pythonhosted.org/packages/c3/eb/afb4bb0ea6e75259dd526f2848311e87ee6696fe3471704b35f031164423/Looppool-0.3.0.tar.gz" } ], "0.4.0": [ { "comment_text": "", "digests": { "md5": "0b634f87ad2ab20c97c1015fe2a56afc", "sha256": "f71785f1a8c32d347750930f52864e03974933cbaa3e8b06a79d625f745b9cd6" }, "downloads": -1, "filename": "Looppool-0.4.0.tar.gz", "has_sig": false, "md5_digest": "0b634f87ad2ab20c97c1015fe2a56afc", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 14054, "upload_time": "2017-01-13T10:04:51", "url": "https://files.pythonhosted.org/packages/f7/b8/ea84e0dd3d494fd718985ef1f0a0ed7137f75f39a5a3e7c8f0236f6f41ed/Looppool-0.4.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "0b634f87ad2ab20c97c1015fe2a56afc", "sha256": "f71785f1a8c32d347750930f52864e03974933cbaa3e8b06a79d625f745b9cd6" }, "downloads": -1, "filename": "Looppool-0.4.0.tar.gz", "has_sig": false, "md5_digest": "0b634f87ad2ab20c97c1015fe2a56afc", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 14054, "upload_time": "2017-01-13T10:04:51", "url": "https://files.pythonhosted.org/packages/f7/b8/ea84e0dd3d494fd718985ef1f0a0ed7137f75f39a5a3e7c8f0236f6f41ed/Looppool-0.4.0.tar.gz" } ] }