{ "info": { "author": "Wargaming Team", "author_email": "", "bugtrack_url": null, "classifiers": [ "Programming Language :: Python", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6" ], "description": "# Asynchronous AMQP base Consumer and Producer.\n\nAMQP Consumer and Producer base classes. Working on top of `aioamqp`.\n\n## Usage\n\nProducer:\n \n from aioamqp_ext import BaseProducer\n \n producer = BaseProducer(**{\n 'url': 'amqp://localhost:5672/',\n 'exchange': 'my_exchange'\n })\n producer.publish_message(\n payload='foo',\n routing_key='bar'\n )\n producer.close()\n\nConsumer:\n\n from aioamqp_ext import BaseConsumer\n \n class Consumer(BaseConsumer):\n async def process_request(self, data):\n print(data)\n \n if __name__ == \"__main__\":\n consumer = Consumer(**{\n 'url': 'amqp://localhost:5672/',\n 'exchange': 'my_exchange',\n 'queue': 'my_queue',\n 'routing_key': 'foo.bar'\n })\n try:\n loop = asyncio.get_event_loop()\n loop.run_until_complete(consumer.consume())\n loop.run_forever()\n except KeyboardInterrupt:\n loop.run_until_complete(consumer.close())\n finally:\n loop.close()\n \n## Tests\n\nTo run the tests, you'll need to install the Python test dependencies::\n\n pip install -r ./requirements-ci.txt \n\nThen you can run the tests with `make unit-test `.", "description_content_type": null, "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "", "keywords": "", "license": "", "maintainer": "", "maintainer_email": "", "name": "aioamqp-ext", "package_url": "https://pypi.org/project/aioamqp-ext/", "platform": "", "project_url": "https://pypi.org/project/aioamqp-ext/", "project_urls": null, "release_url": "https://pypi.org/project/aioamqp-ext/0.1.5/", "requires_dist": null, "requires_python": "", "summary": "", "version": "0.1.5" }, "last_serial": 3335116, "releases": { "0.1.5": [ { "comment_text": "", "digests": { "md5": "f5946e06d49c31984611e5f80418e76f", "sha256": "80c5d9225ad653b3c16ab9528757e2e2c199aa5f5c971fd0fbca6899caa4c4fb" }, "downloads": -1, "filename": "aioamqp_ext-0.1.5.tar.gz", "has_sig": false, "md5_digest": "f5946e06d49c31984611e5f80418e76f", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 3705, "upload_time": "2017-11-15T12:46:23", "url": "https://files.pythonhosted.org/packages/75/ff/d9378acbcc6f944d8082955d8b8a727d39b59135c2c84bdba60ef3c789c2/aioamqp_ext-0.1.5.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "f5946e06d49c31984611e5f80418e76f", "sha256": "80c5d9225ad653b3c16ab9528757e2e2c199aa5f5c971fd0fbca6899caa4c4fb" }, "downloads": -1, "filename": "aioamqp_ext-0.1.5.tar.gz", "has_sig": false, "md5_digest": "f5946e06d49c31984611e5f80418e76f", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 3705, "upload_time": "2017-11-15T12:46:23", "url": "https://files.pythonhosted.org/packages/75/ff/d9378acbcc6f944d8082955d8b8a727d39b59135c2c84bdba60ef3c789c2/aioamqp_ext-0.1.5.tar.gz" } ] }