{ "info": { "author": "Dustin Oprea", "author_email": "myselfasunder@gmail.com", "bugtrack_url": null, "classifiers": [], "description": "This project is in active development, and the documentation is evolving as \nindividual pieces.\n\nThis project encapsulates connection management, heartbeat management, and \ndispatching incoming messages (for consumers) to handlers.\n\n\n--------\nFeatures\n--------\n\n- Fully featured:\n\n - Snappy compression\n - DEFLATE compression\n - TLS compression\n - Client (\"mutual\") authentication via TLS\n\n- We rely on the consumer defining a \"classification\" function to determine the \n name of a handler for an incoming message. This allows for event-driven \n consumption. This means a little less boiler-plate for the end-user.\n\n- The complexities of RDY management is automatically managed by the library. \n These parameters can be reconfigured, but *nsqs* emphasized simplicity and \n intuitiveness so that you don't have to be involved in mechanics if you don't \n want to.\n\n- IDENTIFY parameters can be specified directly, but many are managed \n automatically based on parameters to the producer/consumer.\n\n- Messages are marked as \"finished\" with the server after being processed \n unless we're configured not to.\n\n- For consumers, you can prescribe a list of topic and channel couplets, and \n connections will be made to every server and subscribed according to each.\n If lookup servers are used, servers are discovered and connected for each \n topic in the list (if no lookup servers, then we assume that all servers\n given support all topics).\n\n\n-----------------------\nImplementing a Consumer\n-----------------------\n\nImports and boilerplate::\n\n import logging\n import json\n import gevent\n\n import nsq.consumer\n import nsq.node_collection\n import nsq.message_handler\n\n _logger = logging.getLogger(__name__)\n\nCreate a message-handler::\n\n class _MessageHandler(nsq.message_handler.MessageHandler):\n def __init__(self, *args, **kwargs):\n super(_MessageHandler, self).__init__(*args, **kwargs)\n self.__processed = 0\n\n def message_received(self, connection, message):\n super(_MessageHandler, self).message_received(connection, message)\n\n try:\n self.__decoded = json.loads(message.body)\n except:\n _logger.info(\"Couldn't decode message. Finished: [%s]\", \n message.body)\n return\n\n def classify_message(self, message):\n return (self.__decoded['type'], self.__decoded)\n\n def handle_dummy(self, connection, message, context):\n self.__processed += 1\n\n if self.__processed % 1000 == 0:\n _logger.info(\"Processed (%d) messages.\", self.__processed)\n\n def default_message_handler(self, message_class, connection, message, \n classify_context):\n _logger.warning(\"Squashing unhandled message: [%s] [%s]\",\n message_class, message)\n\nDefine the node-collection. We use *nsqlookupd* servers here, but we could just \nas easily use `ServerNodes()` with *nsqd* servers::\n\n lookup_node_prefixes = [\n 'http://127.0.0.1:4161',\n ]\n\n nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)\n\nCreate the consumer object::\n\n _TOPIC = 'test_topic'\n _CHANNEL = 'test_channel'\n _MAX_IN_FLIGHT = 500\n\n c = nsq.consumer.Consumer(\n [(_TOPIC, _CHANNEL)], \n nc, \n _MAX_IN_FLIGHT, \n message_handler_cls=_MessageHandler)\n\nStart the consumer::\n\n c.start()\n\nLoop. As an example, we loop as long as we're connected to at least one \nserver::\n\n while c.is_alive:\n gevent.sleep(1)\n\n\n-----------------------\nImplementing a Producer\n-----------------------\n\nImports and boilerplate::\n\n import logging\n import json\n import random\n\n import nsq.producer\n import nsq.node_collection\n import nsq.message_handler\n\n _logger = logging.getLogger(__name__)\n\nDefine the node-collection. This is a producer, so it only works with *nsqd* \nnodes::\n\n server_nodes = [\n ('127.0.0.1', 4150),\n ]\n\n nc = nsq.node_collection.ServerNodes(server_nodes)\n\nCreate the producer object::\n\n _TOPIC = 'test_topic'\n\n p = nsq.producer.Producer(_TOPIC, nc)\n\nStart the producer::\n\n p.start()\n\nEmit the messages::\n\n for i in range(0, 100000, 10):\n if i % 50 == 0:\n _logger.info(\"(%d) messages published.\", i)\n\n data = { 'type': 'dummy', 'data': random.random(), 'index': i }\n message = json.dumps(data)\n p.mpublish((message,) * 10)\n\nStop the producer::\n\n p.stop()\n\n\n---------\nCallbacks\n---------\n\nBoth the consumer and producer can take a callbacks object.\n\nTo instantiate the callbacks for a *producer*::\n\n import nsq.connection_callbacks\n cc = nsq.connection_callbacks.ConnectionCallbacks()\n\nTo instantiate the callbacks for a *consumer*::\n\n import nsq.consumer\n cc = nsq.consumer.ConsumerCallbacks()\n\nThen, pass the object into the producer or consumer object constructors as \n`ccallbacks`.\n\nThe following callback methods can be implemented for both a producer or \nconsumer (while making sure to call the original implementation):\n\n- `connect(connection)`\n\n The connection has been established.\n\n- `identify(connection)`\n\n The identify response has been processed for this connection.\n\n- `broken(connection)`\n\n The connection has been broken.\n\n- `message_received(connection, message)`\n\n A message has been received.\n\nThe *consumer* has one additional callback:\n\n- `rdy_replenish(connection, current_rdy, original_rdy)`\n\n The RDY needs to be updated. By default, the original RDY will be reemitted. \n If this is not desired, override this callback, and don't call the original.\n\n\n---------\nFootnotes\n---------\n\n- Because we rely on `gevent `_, and *gevent* isn't \n Python3 compatible, *nsqs* isn't Python3 compatible.", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/dsoprea/NsqSpinner", "keywords": "", "license": "GPL 2", "maintainer": null, "maintainer_email": null, "name": "nsqs", "package_url": "https://pypi.org/project/nsqs/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/nsqs/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/dsoprea/NsqSpinner" }, "release_url": "https://pypi.org/project/nsqs/0.2.0/", "requires_dist": null, "requires_python": null, "summary": "A complete, gevent-based, non-Tornado NSQ client.", "version": "0.2.0" }, "last_serial": 1229990, "releases": { "0.2.0": [ { "comment_text": "", "digests": { "md5": "c0cced58a43a3bfdbcadb318c80af4ac", "sha256": "3b13d6db7785884a3d194375d2f09f7cc6e9a67b37feae6ba9aa547857734da2" }, "downloads": -1, "filename": "nsqs-0.2.0-py2-none-any.whl", "has_sig": false, "md5_digest": "c0cced58a43a3bfdbcadb318c80af4ac", "packagetype": "bdist_wheel", "python_version": "2.7", "requires_python": null, "size": 33574, "upload_time": "2014-09-19T04:09:03", "url": "https://files.pythonhosted.org/packages/23/ff/af99e44926d4e59db2526afaa37391b7c0b75ecdb72b1c4cb3381f0b4fba/nsqs-0.2.0-py2-none-any.whl" }, { "comment_text": "", "digests": { "md5": "826bd0bd67f6858c750de7bfc182a90d", "sha256": "076c1981925906c95459bff0c94985bfe341575fff4f57ceeddfe82510bc323d" }, "downloads": -1, "filename": "nsqs-0.2.0.tar.gz", "has_sig": false, "md5_digest": "826bd0bd67f6858c750de7bfc182a90d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 22650, "upload_time": "2014-09-19T04:09:00", "url": "https://files.pythonhosted.org/packages/15/b6/ea590386fee7293991c800f4ffe3330f0c99b5452b6c823412dfb43ca7c1/nsqs-0.2.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "c0cced58a43a3bfdbcadb318c80af4ac", "sha256": "3b13d6db7785884a3d194375d2f09f7cc6e9a67b37feae6ba9aa547857734da2" }, "downloads": -1, "filename": "nsqs-0.2.0-py2-none-any.whl", "has_sig": false, "md5_digest": "c0cced58a43a3bfdbcadb318c80af4ac", "packagetype": "bdist_wheel", "python_version": "2.7", "requires_python": null, "size": 33574, "upload_time": "2014-09-19T04:09:03", "url": "https://files.pythonhosted.org/packages/23/ff/af99e44926d4e59db2526afaa37391b7c0b75ecdb72b1c4cb3381f0b4fba/nsqs-0.2.0-py2-none-any.whl" }, { "comment_text": "", "digests": { "md5": "826bd0bd67f6858c750de7bfc182a90d", "sha256": "076c1981925906c95459bff0c94985bfe341575fff4f57ceeddfe82510bc323d" }, "downloads": -1, "filename": "nsqs-0.2.0.tar.gz", "has_sig": false, "md5_digest": "826bd0bd67f6858c750de7bfc182a90d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 22650, "upload_time": "2014-09-19T04:09:00", "url": "https://files.pythonhosted.org/packages/15/b6/ea590386fee7293991c800f4ffe3330f0c99b5452b6c823412dfb43ca7c1/nsqs-0.2.0.tar.gz" } ] }