{ "info": { "author": "Klimov Konstantin", "author_email": "moelius1983@gmail.com", "bugtrack_url": null, "classifiers": [], "description": "====================\nAsync task processor\n====================\n\nUsed to distribute tasks between configurable workers.\n\nFeatures\n--------\n\n- simple definition of a task as a normal function.\n- SimpleProcessor is used for simple tasks.\n- PeriodicProcessor is used for periodic tasks.\n- TarantoolProcessor is used for listen tarantool queue and trigger task when data comes.\n- ability to retry on error (max_retries and retry_countdown options).\n- ability to bind task as self option to worker function.\n- ability to implement your own task processor.\n- ability to make control api with processors (can manage your workers)\n\nTODO's\n------\n- [ ] Tests\n- [ ] Console utils\n- [ ] Sphinx docs\n\nInstallation\n------------\n\nAs usually use pip:\n\n.. code-block:: bash\n\n pip install async-task-processor\n\nUsage examples\n--------------\n\n**Periodic task processor example:**\n\n.. code-block:: python\n\n import time\n\n from async_task_processor import ATP\n from async_task_processor.processors import PeriodicProcessor\n from examples import logger\n\n\n # first test function\n def test_func_one(sleep_time, word):\n \"\"\"\n\n :type sleep_time: int\n :type word: str\n :return:\n \"\"\"\n logger.info('start working')\n time.sleep(sleep_time)\n logger.info('Job is done. Word is: %s' % word)\n\n\n # second test function\n def test_func_second(sleep_time, word):\n \"\"\"\n\n :type sleep_time: int\n :type word: str\n :return:\n \"\"\"\n logger.info('start working')\n time.sleep(sleep_time)\n logger.info('Job is done. Word is: %s' % word)\n\n\n # third function with exception\n def test_func_bad(self, sleep_time, word):\n \"\"\"\n\n :type self: async_task_processor.Task\n :type sleep_time: int\n :type word: str\n :return:\n \"\"\"\n logger.info('start working')\n try:\n a = 1 / 0\n except ZeroDivisionError:\n # optionally you can overload max_retries and retry_countdown here\n self.retry()\n time.sleep(sleep_time)\n logger.info('Job is done. Word is: %s' % word)\n\n\n atp = ATP(asyncio_debug=True)\n task_processor = PeriodicProcessor(atp=atp)\n\n # Add function to task processor\n task_processor.add_task(test_func_one, args=[5, 'first hello world'], max_workers=5, timeout=1,\n max_retries=5, retry_countdown=1)\n\n # Add one more function to task processor\n task_processor.add_task(test_func_second, args=[3, 'second hello world'], max_workers=5, timeout=1,\n max_retries=5, retry_countdown=1)\n\n # Add one more bad function with exception. This function will raise exception and will retry it,\n # then when retries exceeded, workers of this func will stop one by one with exception MaxRetriesExceeded\n # bind option make Task as self argument\n task_processor.add_task(test_func_bad, args=[3, 'second hello world'], bind=True, max_workers=2, timeout=1,\n max_retries=3, retry_countdown=3)\n\n # Start async-task-processor\n atp.start()\n\n**Tarantool task processor example:**\n\n.. code-block:: python\n\n import asyncio\n import time\n\n import asynctnt\n import asynctnt_queue\n\n from async_task_processor import ATP\n from async_task_processor.processors import TarantoolProcessor\n from examples import logger\n\n TARANTOOL_QUEUE = 'test_queue'\n TARANTOOL_HOST = 'localhost'\n TARANTOOL_PORT = 3301\n TARANTOOL_USER = None\n TARANTOOL_PASS = None\n\n\n def put_messages_to_tarantool(messages_count=1, tube_name='test_queue', host='localhost', port=3301, user=None,\n password=None):\n \"\"\"Put some test messages to tarantool queue\n\n :param messages_count: messages number to put in queue\n :param tube_name: tarantool queue name\n :type tube_name: str\n :param host: tarantool host\n :param port: tarantool port\n :param user: tarantool user\n :param password: tarantool password\n :return:\n \"\"\"\n\n async def put_jobs():\n conn = asynctnt.Connection(host=host, port=port, username=user, password=password)\n await conn.connect()\n queue = asynctnt_queue.Queue(conn)\n tube = queue.tube(tube_name)\n [await tube.put(dict(num=i, first_name='Jon', last_name='Smith')) for i in range(messages_count)]\n await conn.disconnect()\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(asyncio.ensure_future(put_jobs()))\n loop.close()\n\n\n # Let's put 100 messages to tarantool\n put_messages_to_tarantool(messages_count=100, tube_name=TARANTOOL_QUEUE, host=TARANTOOL_HOST, port=TARANTOOL_PORT,\n user=TARANTOOL_USER, password=TARANTOOL_PASS)\n\n\n # Test function\n def test_func(self, sleep_time, word):\n \"\"\"\n\n :type self: async_task_processor.TarantoolTask\n :type sleep_time: int\n :type word: str\n :return:\n \"\"\"\n logger.info('start working')\n time.sleep(sleep_time)\n logger.info('Job is done. Word is %s. Data is %s. ' % (word, self.data))\n\n\n atp = ATP(asyncio_debug=True)\n task_processor = TarantoolProcessor(atp=atp, host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,\n password=TARANTOOL_PASS, connection_max_retries=3, connection_retry_countdown=3)\n\n # Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers\n # will be started.\n task_processor.add_task(foo=test_func, queue=TARANTOOL_QUEUE, args=[1, 'hello world'], bind=True, max_workers=20,\n max_retries=5, retry_countdown=1)\n # Start async-task-processor\n atp.start()\n\n**Tarantool task processor example with ability to scale workers via tarantool:**\n\n.. code-block:: python\n\n import asyncio\n import importlib\n import socket\n import sys\n import time\n\n import asynctnt\n import asynctnt_queue\n import tarantool\n\n from async_task_processor import ATP\n from async_task_processor.processors import TarantoolProcessor\n from examples import logger\n\n TARANTOOL_QUEUE = 'test_queue'\n TARANTOOL_HOST = 'localhost'\n TARANTOOL_PORT = 3301\n TARANTOOL_USER = None\n TARANTOOL_PASS = None\n\n\n def put_messages_to_tarantool(messages_count=1, tube_name='test_queue', host='localhost', port=3301, user=None,\n password=None):\n \"\"\"Put some test messages to tarantool queue\n\n :param messages_count: messages number to put in queue\n :param tube_name: tarantool queue name\n :type tube_name: str\n :param host: tarantool host\n :param port: tarantool port\n :param user: tarantool user\n :param password: tarantool password\n :return:\n \"\"\"\n\n async def put_jobs():\n conn = asynctnt.Connection(host=host, port=port, username=user, password=password)\n await conn.connect()\n tube = asynctnt_queue.Queue(conn).tube(tube_name)\n [await tube.put(dict(num=i, first_name='Jon', last_name='Smith')) for i in range(messages_count)]\n await conn.disconnect()\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(asyncio.ensure_future(put_jobs()))\n loop.close()\n\n\n # Let's put 100 messages to tarantool\n put_messages_to_tarantool(messages_count=100, tube_name=TARANTOOL_QUEUE, host=TARANTOOL_HOST, port=TARANTOOL_PORT,\n user=TARANTOOL_USER, password=TARANTOOL_PASS)\n\n\n # Create tube in queue for manage workers\n def create_tube(tube_name):\n try:\n t = tarantool.connect(host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,\n password=TARANTOOL_PASS)\n t.call(\"queue.create_tube\", (tube_name, 'fifo', {'if_not_exists': True}))\n except tarantool.error.DatabaseError as e:\n if e.args[0] == 32:\n pass\n else:\n raise\n\n\n # Test function\n def test_func(self, sleep_time, word):\n \"\"\"\n\n :type self: async_task_processor.TarantoolTask\n :type sleep_time: int\n :type word: str\n :return:\n \"\"\"\n logger.info('Start working')\n time.sleep(sleep_time)\n logger.info('Job is done. Word is %s. Data is %s. ' % (word, self.data))\n\n\n # Function for import functions\n def func_import(foo_path):\n path_list = foo_path.split('.')\n func_name = path_list.pop()\n m = importlib.import_module('.'.join(path_list)) if path_list else sys.modules[__name__]\n func = getattr(m, func_name)\n return func\n\n\n # Function for manage workers\n def add_task(self, tp):\n \"\"\"\n\n :type self: async_task_processor.primitives.TarantoolTask\n :type tp: TarantoolProcessor\n :return:\n \"\"\"\n if self.data['command'] == 'stop':\n tp.stop(name=self.data['foo'], workers_count=self.data['max_workers'], leave_last=False)\n self.app.logger.info(\"%d workers was deleted from task %s\" % (self.data['max_workers'], self.data['foo']))\n elif self.data['command'] == 'start':\n tp.add_task(foo=func_import(self.data['foo']), queue=TARANTOOL_QUEUE, args=[1, 'message from new worker'],\n bind=True, max_workers=self.data['max_workers'], name=self.data['foo'])\n self.app.logger.info(\"Added %d workers for task %s\" % (self.data['max_workers'], self.data['foo']))\n elif self.data['command'] == 'info':\n [logger.info(task.as_json()) for task in self.app.tasks]\n else:\n self.app.logger.info(\"Unknown command %s\" % self.data['command'])\n\n\n # get host ip\n ip = [l for l in ([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith(\"127.\")][:1], [\n [(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in\n [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) if l][0][0].replace('.', '_')\n\n # manage tube name\n control_tube_name = 'control_queue_%s' % ip\n logger.info(\"control tube is %s\" % control_tube_name)\n\n # create tube for manage workers\n create_tube(control_tube_name)\n\n atp = ATP(asyncio_debug=True,logger=logger)\n\n task_processor = TarantoolProcessor(atp=atp, host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,\n password=TARANTOOL_PASS, connection_max_retries=3, connection_retry_countdown=3)\n\n # Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers\n # will be started.\n task_processor.add_task(foo=test_func, queue=TARANTOOL_QUEUE, args=[1, 'hello world'], bind=True, max_workers=20,\n max_retries=5, retry_countdown=1)\n\n # Add task for listen manage tube commands. In this case if you start your app on different hosts,\n # you would control all host, because ip in control queue and different queues will be created for each host.\n # You can try to manage workers from tarantool console. Example command:\n # queue.tube.control_queue_:put({ foo='test_func', command = 'start', max_workers = 2})\n task_processor.add_task(foo=add_task, queue=control_tube_name, args=[task_processor], bind=True)\n\n # Start async-task-processor\n atp.start()", "description_content_type": "", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/moelius/async-task-processor", "keywords": "", "license": "MIT", "maintainer": "", "maintainer_email": "", "name": "async-task-processor", "package_url": "https://pypi.org/project/async-task-processor/", "platform": "", "project_url": "https://pypi.org/project/async-task-processor/", "project_urls": { "Homepage": "https://github.com/moelius/async-task-processor" }, "release_url": "https://pypi.org/project/async-task-processor/0.2.6/", "requires_dist": null, "requires_python": "", "summary": "Simple package to run async tasks", "version": "0.2.6" }, "last_serial": 3839021, "releases": { "0.1.1": [ { "comment_text": "", "digests": { "md5": "188d30c47d6ccec798f7cab881910726", "sha256": "b9b381792915ac302d1e8f7fe10398695cbe81a9355780de45fdc13fbb175d6b" }, "downloads": -1, "filename": "async-task-processor-0.1.1.tar.gz", "has_sig": false, "md5_digest": "188d30c47d6ccec798f7cab881910726", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 4180, "upload_time": "2017-09-05T08:56:43", "url": "https://files.pythonhosted.org/packages/93/1b/7a44dcdec5486a1d96e7bba3dcd4386c711359069a36e6c8b9f39bb5d148/async-task-processor-0.1.1.tar.gz" } ], "0.1.2": [ { "comment_text": "", "digests": { "md5": "5592dcf556b2ff3f2216d02396f83133", "sha256": "9798cafb44d258848219861a139a65562f1ce73f7c00a74d8cb7d60e1d8265d6" }, "downloads": -1, "filename": "async-task-processor-0.1.2.tar.gz", "has_sig": false, "md5_digest": "5592dcf556b2ff3f2216d02396f83133", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6578, "upload_time": "2017-09-08T16:57:14", "url": "https://files.pythonhosted.org/packages/8c/1d/37bacafb95efff965dc252ac1fc56a58a8c0f365382717f9f531aa1e9cc6/async-task-processor-0.1.2.tar.gz" } ], "0.1.4": [ { "comment_text": "", "digests": { "md5": "b29df36ffb77cceb4ae51e61617a8c12", "sha256": "cd91ec516078abb818a35701c600ded8ee57365a7a3d80bf70bb4e2968fadc4a" }, "downloads": -1, "filename": "async-task-processor-0.1.4.tar.gz", "has_sig": false, "md5_digest": "b29df36ffb77cceb4ae51e61617a8c12", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6605, "upload_time": "2017-09-12T12:25:07", "url": "https://files.pythonhosted.org/packages/95/f5/e301e7de4077443e3fd9978081dc6c9807304be6e56b5e1f775cb4d181e6/async-task-processor-0.1.4.tar.gz" } ], "0.1.5": [ { "comment_text": "", "digests": { "md5": "3b369b1a748a83f4f480ec0578223c5b", "sha256": "f8a211fb7a252b3745e85c6c9f60699043f6ecea8cb2d8f48cb39429a6b5b31f" }, "downloads": -1, "filename": "async-task-processor-0.1.5.tar.gz", "has_sig": false, "md5_digest": "3b369b1a748a83f4f480ec0578223c5b", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 10818, "upload_time": "2017-09-30T09:10:33", "url": "https://files.pythonhosted.org/packages/53/31/e7a5f7b740c5c41e316edcadadf3f8f9c8b1670f8df5319d8cfd4771f196/async-task-processor-0.1.5.tar.gz" } ], "0.1.6": [ { "comment_text": "", "digests": { "md5": "53d30261d3fccc083d5fd4be342ba940", "sha256": "900ae34bbe93ed9244d49ea41fba0f29d3334464f10aebb792b5a2afa25e5847" }, "downloads": -1, "filename": "async-task-processor-0.1.6.tar.gz", "has_sig": false, "md5_digest": "53d30261d3fccc083d5fd4be342ba940", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 10840, "upload_time": "2017-09-30T09:25:24", "url": "https://files.pythonhosted.org/packages/53/2f/d8976f3f5e8474458a18751a525b14f6cc5ac491f04080c0fb5bdb6d44aa/async-task-processor-0.1.6.tar.gz" } ], "0.1.7": [ { "comment_text": "", "digests": { "md5": "eae42dda4d7fe3f0dd5cbe6ab5a41e18", "sha256": "4e3c46a02f492c5d0301b7b345a09702aa5b19dcb200b44a163c30b61285f294" }, "downloads": -1, "filename": "async-task-processor-0.1.7.tar.gz", "has_sig": false, "md5_digest": "eae42dda4d7fe3f0dd5cbe6ab5a41e18", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 10848, "upload_time": "2017-09-30T09:34:58", "url": "https://files.pythonhosted.org/packages/11/ce/3083a69addf836b90e8e7f730e315e4369d29f4fdbae7ab3f401eeb59158/async-task-processor-0.1.7.tar.gz" } ], "0.1.8": [ { "comment_text": "", "digests": { "md5": "b8011b304009ad02c04bea20de5d8e5d", "sha256": "3ec86bf9910c1f5b8e13a4b1204e0224e60161c9d167d8f345eeb15efddc88d8" }, "downloads": -1, "filename": "async-task-processor-0.1.8.tar.gz", "has_sig": false, "md5_digest": "b8011b304009ad02c04bea20de5d8e5d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 10880, "upload_time": "2017-10-04T11:18:36", "url": "https://files.pythonhosted.org/packages/18/f2/2edae21796c27e9a48f6bca00e8e67891fe5ba414282b3a7015f51fb9cbd/async-task-processor-0.1.8.tar.gz" } ], "0.1.9": [ { "comment_text": "", "digests": { "md5": "6d19a8a5a824082161f3988254dd722d", "sha256": "38d211cf581b6deb3c3679402971174e118baa6001d0f63cc58c2167a571e554" }, "downloads": -1, "filename": "async-task-processor-0.1.9.tar.gz", "has_sig": false, "md5_digest": "6d19a8a5a824082161f3988254dd722d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 10875, "upload_time": "2017-10-06T19:09:36", "url": "https://files.pythonhosted.org/packages/e9/98/c5aa05ffd8e20c902e9ec5a4419353c7c292acb7c3310f81b18de2bbecdd/async-task-processor-0.1.9.tar.gz" } ], "0.2.0": [ { "comment_text": "", "digests": { "md5": "0e96069450c0ad03ca32d2c30625d389", "sha256": "e71806bce74a40e1d6cc036dd4c29d55b6d915f20cdd8cfa3df2d6c8429e2af9" }, "downloads": -1, "filename": "async-task-processor-0.2.0.tar.gz", "has_sig": false, "md5_digest": "0e96069450c0ad03ca32d2c30625d389", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11010, "upload_time": "2017-10-30T12:50:09", "url": "https://files.pythonhosted.org/packages/4e/44/4daaf583f7bea2e40947010aba93910db1a8a0a885fbe0705d21d1261911/async-task-processor-0.2.0.tar.gz" } ], "0.2.1": [ { "comment_text": "", "digests": { "md5": "b8c66345d6e2ba1d796b8239fe34a498", "sha256": "e1a6c1e5ffa8a4d07f50a6474534443e0bb7b6a05206bc91c149fba7e877cd46" }, "downloads": -1, "filename": "async-task-processor-0.2.1.tar.gz", "has_sig": false, "md5_digest": "b8c66345d6e2ba1d796b8239fe34a498", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11320, "upload_time": "2017-11-08T22:58:46", "url": "https://files.pythonhosted.org/packages/79/6c/24943f6424041cbbd415a4379be8f29f0e274b8725400f3f80fa4d222e8c/async-task-processor-0.2.1.tar.gz" } ], "0.2.2": [ { "comment_text": "", "digests": { "md5": "df766150a7336865fa2a84f704fccae0", "sha256": "84256e73608704e959faf2870fcf75b5bc84df99dce7921c0a4f3da82228e6fb" }, "downloads": -1, "filename": "async-task-processor-0.2.2.tar.gz", "has_sig": false, "md5_digest": "df766150a7336865fa2a84f704fccae0", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11304, "upload_time": "2017-11-22T11:14:05", "url": "https://files.pythonhosted.org/packages/43/8a/ce72d3d0eaec2d6fdc168e993eb5f7f09c036cc06a68638a6005e8af1a58/async-task-processor-0.2.2.tar.gz" } ], "0.2.3": [ { "comment_text": "", "digests": { "md5": "2aa51619e212ed603249de1abada685c", "sha256": "27183d514b0f81df9975c3f8404279c60820817f39fb1a359ce09b6bb29472fc" }, "downloads": -1, "filename": "async-task-processor-0.2.3.tar.gz", "has_sig": false, "md5_digest": "2aa51619e212ed603249de1abada685c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11144, "upload_time": "2017-12-09T15:26:59", "url": "https://files.pythonhosted.org/packages/53/23/4a0f4b8dc16cd34113f905d3d7e8cedf56faeb070b0af34436de9f3f5a07/async-task-processor-0.2.3.tar.gz" } ], "0.2.4": [ { "comment_text": "", "digests": { "md5": "01a41f30591527f79d08b3692279e01b", "sha256": "fb3ec1bf8281f0265816eb9f50559ae3e431f020b51a2350a4dee847d8e3a9ae" }, "downloads": -1, "filename": "async-task-processor-0.2.4.tar.gz", "has_sig": false, "md5_digest": "01a41f30591527f79d08b3692279e01b", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11158, "upload_time": "2017-12-09T19:26:40", "url": "https://files.pythonhosted.org/packages/c3/c7/4bec0df6de3a127f865b40c761f865da27a4a3e023168c0a8b60ccb39466/async-task-processor-0.2.4.tar.gz" } ], "0.2.5": [ { "comment_text": "", "digests": { "md5": "9b2018d5db8579e7be9b5a852d0728d3", "sha256": "e0203869bcc909f40f5525dbb1e166d736c786bd7e3d5b2cdcfeb23ce2fbe625" }, "downloads": -1, "filename": "async-task-processor-0.2.5.tar.gz", "has_sig": false, "md5_digest": "9b2018d5db8579e7be9b5a852d0728d3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11053, "upload_time": "2018-05-06T17:45:22", "url": "https://files.pythonhosted.org/packages/10/55/f92678083336c29fdf70911e50504fd4a39e3e0a41e17de327de32f0d038/async-task-processor-0.2.5.tar.gz" } ], "0.2.6": [ { "comment_text": "", "digests": { "md5": "3e301774dcf1540b1275ecdc30d72e4c", "sha256": "372afe6d55987f3fe9e8471e2f00dc70743b51d6e411b130df06b5dd18dd244c" }, "downloads": -1, "filename": "async-task-processor-0.2.6.tar.gz", "has_sig": false, "md5_digest": "3e301774dcf1540b1275ecdc30d72e4c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11055, "upload_time": "2018-05-06T18:04:24", "url": "https://files.pythonhosted.org/packages/d6/18/7e2e902643747a4d642a099b95422b30b2e5f3e33aa201ac78186bd57716/async-task-processor-0.2.6.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "3e301774dcf1540b1275ecdc30d72e4c", "sha256": "372afe6d55987f3fe9e8471e2f00dc70743b51d6e411b130df06b5dd18dd244c" }, "downloads": -1, "filename": "async-task-processor-0.2.6.tar.gz", "has_sig": false, "md5_digest": "3e301774dcf1540b1275ecdc30d72e4c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11055, "upload_time": "2018-05-06T18:04:24", "url": "https://files.pythonhosted.org/packages/d6/18/7e2e902643747a4d642a099b95422b30b2e5f3e33aa201ac78186bd57716/async-task-processor-0.2.6.tar.gz" } ] }