{ "info": { "author": "Depop", "author_email": "dev@depop.com", "bugtrack_url": null, "classifiers": [ "Environment :: Web Environment", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.6" ], "description": "celery-message-consumer\n=======================\n\n|PyPI Version| |Build Status|\n\n.. |PyPI Version| image:: http://img.shields.io/pypi/v/celery-message-consumer.svg?style=flat\n :target: https://pypi.python.org/pypi/celery-message-consumer/\n :alt: Latest PyPI version\n\n.. |Build Status| image:: https://circleci.com/gh/depop/celery-message-consumer.svg?style=shield&circle-token=a9ea2909c5cbc4cb32a87f50444ca79b99e3b09c\n :alt: Build Status\n\nTool for using the ``bin/celery`` worker to consume vanilla AMQP\nmessages (i.e. not Celery tasks)\n\nWhile `writing a simple consumer\nscript `__\nusing Kombu can be quite easy, the Celery worker provides many features\naround process pools, queue/routing connections etc as well as being\nknown to run reliably over long term.\n\nIt seems safer to re-use this battle-tested consumer than try to write\nour own and have to learn from scratch all the ways that such a thing\ncan fail.\n\nUsage\n-----\n\n.. code:: bash\n\n pip install celery-message-consumer\n\n\nHandlers\n~~~~~~~~\n\nIn your code, you can define a message handler by decorating a python\nfunction, in much the same way as you would a Celery task:\n\n.. code:: python\n\n from event_consumer import message_handler\n\n @message_handler('my.routing.key')\n def process_message(body):\n # `body` has been deserialized for us by the Celery worker\n print(body)\n\n @message_handler(['my.routing.key1', 'my.routing.key2'])\n def process_messages(body):\n # you can register handler for multiple routing keys\n\n @message_handler('my.routing.*')\n def process_all_messages(body):\n # or wildcard routing keys, if using a 'topic' exchange\n\nLike a Celery task, the module it is defined in must actually get\nimported at some point for the handler to be registered.\n\nA queue (in fact, three queues - see below) will be created to receive\nmessages matching the routing key.\n\nCelery\n~~~~~~\n\nElsewhere in your code you will need to instantiate a Celery app and\napply our custom 'ConsumerStep' which hooks our message handlers into\nthe worker. If you are already using Celery *as Celery* in your project\nthen you probably want separate Celery apps for tasks and for the\nmessage consumer.\n\n.. code:: python\n\n from celery import Celery\n from event_consumer.handlers import AMQPRetryConsumerStep\n\n main_app = Celery()\n\n consumer_app = Celery()\n consumer_app.steps['consumer'].add(AMQPRetryConsumerStep)\n\nYou likely will want separate config for each app. See\n`Celery docs `__.\n\nIn the config for your message consumer app, add the modules containing\nyour decorated message handler functions to ``CELERY_IMPORTS``, exactly\nas you would for Celery tasks - this ensures they get imported and\nregistered when the worker starts up.\n\nThen from the command-line, run the Celery worker just like you usually\nwould, attaching to the consumer app:\n\n.. code:: bash\n\n bin/celery worker -A myproject.mymodule:consumer_app\n\nConfiguration\n~~~~~~~~~~~~~\n\nSettings are intended to be configured primarily via a python file, such\nas your existing Django ``settings.py`` or Celery ``celeryconfig.py``.\nTo bootstrap this, there are a couple of env vars to control how config\nis loaded:\n\n- ``EVENT_CONSUMER_APP_CONFIG``\n should be an import path to a python module, for example:\n ``EVENT_CONSUMER_APP_CONFIG=django.conf.settings``\n- ``EVENT_CONSUMER_CONFIG_NAMESPACE``\n Sets the prefix used for loading further config values from env and\n config file. Defaults to ``EVENT_CONSUMER``.\n\nSee source of ``event_consumer/conf/`` for more details.\n\nSome useful config keys (all of which are prefixed with\n``EVENT_CONSUMER_`` by default):\n\n- ``SERIALIZER`` this is the name of a Celery serializer name, e.g.\n ``'json'``. The consumer will only accept messages serialized in this\n format.\n- ``QUEUE_NAME_PREFIX`` if using default queue name (routing-key) then\n this prefix will be added to the queue name. If you supply a custom\n queue name in the handler decorator the prefix will not be applied.\n- ``MAX_RETRIES`` defaults to ``4`` (i.e. 1 attempt + 4 retries = 5\n strikes)\n- ``BACKOFF_FUNC`` takes a function ``(int) -> float`` which returns\n the retry delay (in seconds) based on current retry counter for the\n message.\n- ``ARCHIVE_EXPIRY`` time in milliseconds to keep messages in the\n \"archive\" queue, after which the exchange will delete them. Defaults\n to 24 days.\n- ``USE_DJANGO`` set to ``True`` if your message handler uses the\n Django db connection, so that the worker is able to cope with the\n dreaded *\"current transaction is aborted\"* error and continue.\n- ``EXCHANGES`` if you need your message handlers to connect their\n queues to specific exchanges then you can provide a dict like:\n\n.. code:: python\n\n EXCHANGES = {\n # a reference name for this config, used when attaching handlers\n 'default': { \n 'name': 'data', # actual name of exchange in RabbitMQ\n 'type': 'topic', # an AMQP exchange type\n },\n 'other': {\n ...\n },\n ...\n }\n\nThe ``'default'`` config will be used... by default. You can attach\nhandler to a specific exchange when decorating:\n\n.. code:: python\n\n @message_handler('my.routing.key', exchange='other')\n def process_message(body):\n pass\n\nQueue layout\n------------\n\nWhile all of the broker, exchange and queue naming is configurable (see\nsource code) this project implements a *very specific queue pattern*.\n\nBriefly: for each routing key it listens to, the consumer sets up\n*three* queues and a 'dead-letter exchange' (DLX).\n\n#. The \"main\" message queue\n#. If any unhandled exceptions occur, and we have retried less than\n ``settings.MAX_RETRIES``, the message will be put on the \"retry\"\n queue with a TTL. After the TTL expires, the DLX will put the message\n back on the main queue.\n#. If all retries are exhausted (or ``PermanentFailure`` is raised) then\n the consumer will put the message on the \"archive\" queue. This gives\n opportunity for someone to manually retry the archived messages,\n perhaps after a code fix has been deployed.\n\n| You will of course note that this is *totally different and separate*\n from Celery's own ``task.retry`` mechanism.\n| **Pros:** matches pattern we were already using for non-Celery,\n non-Python apps, \"archive\" queue provides an extra safety net.\n| **Cons:** Relies on RabbitMQ-specific feature, more queues (more\n complicated).\n\nCompatibility\n-------------\n\nPython 2.7 and 3.6 are both supported.\n\n**Only** RabbitMQ transport is supported.\n\nWe depend on Celery and Kombu. Their versioning seems to be loosely in\nstep so that Celery 3.x goes with Kombu 3.x and Celery 4.x goes with\nKombu 4.x. We test against both v3 and v4.\n\nDjango is not required, but when used we have some extra integration\nwhich is needed if your event handlers use the Django db connection.\nThis must be enabled if required via the ``settings.USE_DJANGO`` flag.\n\nThis project is tested against:\n\n=========== ============ ============= ================== ==================\n x Django 1.4 Django 1.11 Celery/Kombu 3.x Celery/Kombu 4.x\n=========== ============ ============= ================== ==================\nPython 2.7 * * * *\nPython 3.6 * * * \n=========== ============ ============= ================== ==================\n\nRunning the tests\n-----------------\n\nCircleCI\n~~~~~~~~\n\n| The easiest way to test the full version matrix is to install the\n CircleCI command line app:\n| https://circleci.com/docs/2.0/local-jobs/\n| (requires Docker)\n\nThe cli does not support 'workflows' at the moment so you have to run\nthe two Python version jobs separately:\n\n.. code:: bash\n\n circleci build --job python-2.7\n\n.. code:: bash\n\n circleci build --job python-3.6\n\npy.test (single combination of dependency versions)\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\nIt's also possible to run the tests locally, allowing for debugging of\nerrors that occur.\n\nWe rely on some RabbitMQ features for our retry queues so we need a\nrabbit instance to test against. A ``docker-compose.yml`` file is\nprovided.\n\n.. code:: bash\n\n docker-compose up -d\n export BROKER_HOST=$(docker-machine ip default)\n\n(adjust the last line to suit your local Docker installation)\n\nThe ``rabbitmqadmin`` web UI is available to aid in debugging queue issues:\n\n.. code:: bash\n\n http://{BROKER_HOST}:15672/\n\nNow decide which version combination from the matrix you're going to\ntest and set up your virtualenv accordingly:\n\n.. code:: bash\n\n pyenv virtualenv 3.6.2 celery-message-consumer\n\nYou will need to edit ``requirements.txt`` and ``requirements-test.txt``\nfor the specific versions of dependencies you want to test against. Then\nyou can install everything via:\n\n.. code:: bash\n\n pip install -r requirements-test.txt\n\nSet an env to point to the target Django version's settings in the test\napp (for Django-dependent tests) and for general app settings:\n\n.. code:: bash\n\n export DJANGO_SETTINGS_MODULE=test_app.dj111.settings\n export EVENT_CONSUMER_APP_CONFIG=test_app.settings\n\nNow we can run the tests:\n\n.. code:: bash\n\n PYTHONPATH=. py.test -v -s --pdb tests/\n\ntox (all version combinations for current Python)\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\nYou'll notice in the CircleCI config we run tests against the matrix\ndependency versions using ``tox``.\n\nThere are `some warts `__\naround using ``tox`` with ``pyenv-virtualenv`` so if you created a Python 3.6\nvirtualenv using the instructions above the best thing to do is delete it and\nrecreate it like this:\n\n.. code:: bash\n\n pyenv virtualenv -p python3.6 myenv\n pip install tox\n\n(it's actually easier not to use a virtualenv at all - tox creates its\nown virtualenvs anyway, but that does mean you'd have to install tox\nglobally)\n\nYou need the Docker container running:\n\n.. code:: bash\n\n docker-compose up -d\n export BROKER_HOST=$(docker-machine ip default)\n\nYou can now run tests for any versions compatible with your virtualenv\npython version, e.g.\n\n.. code:: bash\n\n tox -e py36-dj111-cel4\n\nTo run the full version matrix you need to have both Python 2.7 and 3.6. The\neasiest way is via ``pyenv``. You will also need to make both Python versions\n'global' (or 'local') via pyenv, and then install and run ``tox`` outside of\nany virtualenv.\n\n.. code:: bash\n\n source deactivate\n pyenv global 2.7.14 3.6.2\n pip install tox\n tox", "description_content_type": "", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/depop/celery-message-consumer", "keywords": "", "license": "Apache 2.0", "maintainer": "", "maintainer_email": "", "name": "celery-message-consumer", "package_url": "https://pypi.org/project/celery-message-consumer/", "platform": "", "project_url": "https://pypi.org/project/celery-message-consumer/", "project_urls": { "Homepage": "https://github.com/depop/celery-message-consumer" }, "release_url": "https://pypi.org/project/celery-message-consumer/1.1.1/", "requires_dist": null, "requires_python": "", "summary": "Tool for using the bin/celery worker to consume vanilla AMQP messages (i.e. not Celery tasks)", "version": "1.1.1" }, "last_serial": 4581865, "releases": { "1.0.10": [ { "comment_text": "", "digests": { "md5": "73c34578c52b75527332842e3e146bde", "sha256": "36e3fa1daa8e6756cab411a9537a6b63e8fd7b56c5bf692884e2c8b56fa0a89b" }, "downloads": -1, "filename": "celery-message-consumer-1.0.10.tar.gz", "has_sig": false, "md5_digest": "73c34578c52b75527332842e3e146bde", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 16823, "upload_time": "2018-05-31T12:39:41", "url": "https://files.pythonhosted.org/packages/b3/4e/63380551683f0cb16389046382682fd108fa485a327434e89db1fed4d211/celery-message-consumer-1.0.10.tar.gz" } ], "1.0.6": [ { "comment_text": "", "digests": { "md5": "38c374d641960a8630451921c556b0a5", "sha256": "8199d46db2318ccb03fc7b3f8561fa6ff00a97d00d5e0886d4598b1b8c8b4151" }, "downloads": -1, "filename": "celery-message-consumer-1.0.6.tar.gz", "has_sig": false, "md5_digest": "38c374d641960a8630451921c556b0a5", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 16388, "upload_time": "2017-11-03T10:35:36", "url": "https://files.pythonhosted.org/packages/ad/7f/494a89a360ca0cdad6c50f138c3b70871d4917ef1440f9e33119474fe588/celery-message-consumer-1.0.6.tar.gz" } ], "1.0.7": [ { "comment_text": "", "digests": { "md5": "bab5064cd32aa472165a5c44148ebd22", "sha256": "b7ffd3ba47588b4e85b4d51ff3222dda9513123ebb923b66bafa4619bbf93604" }, "downloads": -1, "filename": "celery-message-consumer-1.0.7.tar.gz", "has_sig": false, "md5_digest": "bab5064cd32aa472165a5c44148ebd22", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 16643, "upload_time": "2018-02-19T16:16:06", "url": "https://files.pythonhosted.org/packages/b5/d9/c4abcec8339476f821af0335822884e8f383fc886230c1953c36380aefa8/celery-message-consumer-1.0.7.tar.gz" } ], "1.0.8": [ { "comment_text": "", "digests": { "md5": "6d0b1d37c89dccd7681b5036b9df651c", "sha256": "8d445cf3cff5611dbb285ed90d625a1cf471373d3df0415cd947e4aac83dc87d" }, "downloads": -1, "filename": "celery-message-consumer-1.0.8.tar.gz", "has_sig": false, "md5_digest": "6d0b1d37c89dccd7681b5036b9df651c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 17075, "upload_time": "2018-05-16T12:45:57", "url": "https://files.pythonhosted.org/packages/5f/75/62ebc35759b4cdf3e17b66aea6906dcc860f15e10f9a45258f439af87ce9/celery-message-consumer-1.0.8.tar.gz" } ], "1.0.9": [ { "comment_text": "", "digests": { "md5": "00c3701ccfd4d8a33ed99cc1a78fe041", "sha256": "0f57da849f01409989a5c8125d241da9bf1c5085f9c16ce0327cf0f6b4ee41df" }, "downloads": -1, "filename": "celery-message-consumer-1.0.9.tar.gz", "has_sig": false, "md5_digest": "00c3701ccfd4d8a33ed99cc1a78fe041", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 17222, "upload_time": "2018-05-31T11:14:19", "url": "https://files.pythonhosted.org/packages/02/f4/0e162aa6bc11a722908fe4b38fd5bb137d1832621c671e61482e00f2ec08/celery-message-consumer-1.0.9.tar.gz" } ], "1.1.0": [ { "comment_text": "", "digests": { "md5": "4fb2ae1250a50572219229e517dfb1b3", "sha256": "2d84ce74ac0e4ed4cfb6fea254224de4a877e65ccdbd658507eb37c0e81b1e2b" }, "downloads": -1, "filename": "celery-message-consumer-1.1.0.tar.gz", "has_sig": false, "md5_digest": "4fb2ae1250a50572219229e517dfb1b3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 17160, "upload_time": "2018-12-04T11:06:19", "url": "https://files.pythonhosted.org/packages/f3/f8/79568f40de98dba02776e082ed8e3dffdaff47c22a076cb69b37c9275269/celery-message-consumer-1.1.0.tar.gz" } ], "1.1.1": [ { "comment_text": "", "digests": { "md5": "4e4ee6232d5cc5a63d00c55d7dfbea2a", "sha256": "ca9d530bc7479fe7940bbf9d802f12d3bccc48d8fc90110bd4cd625d371c5821" }, "downloads": -1, "filename": "celery-message-consumer-1.1.1.tar.gz", "has_sig": false, "md5_digest": "4e4ee6232d5cc5a63d00c55d7dfbea2a", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 17177, "upload_time": "2018-12-10T17:52:29", "url": "https://files.pythonhosted.org/packages/8a/3d/cd3e981709d967df263454b7d9ace40661cdec4cdb1d81d2f0b690dadc08/celery-message-consumer-1.1.1.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "4e4ee6232d5cc5a63d00c55d7dfbea2a", "sha256": "ca9d530bc7479fe7940bbf9d802f12d3bccc48d8fc90110bd4cd625d371c5821" }, "downloads": -1, "filename": "celery-message-consumer-1.1.1.tar.gz", "has_sig": false, "md5_digest": "4e4ee6232d5cc5a63d00c55d7dfbea2a", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 17177, "upload_time": "2018-12-10T17:52:29", "url": "https://files.pythonhosted.org/packages/8a/3d/cd3e981709d967df263454b7d9ace40661cdec4cdb1d81d2f0b690dadc08/celery-message-consumer-1.1.1.tar.gz" } ] }