{ "info": { "author": "Rafael Gonzalez", "author_email": "astrorafael@yahoo.es", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "Framework :: Twisted", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.4", "Topic :: Communications", "Topic :: Internet" ], "description": "twisted-mqtt\n============\n\nMQTT Client protocol for Twisted.\n\nDescription\n-----------\n\n**twisted-mqtt** is a library using the Twisted framework and\nimplementing the MQTT protocol (v3.1 & v3.1.1) in these flavours:\n\n- pure subscriber\n- pure publisher\n- or a mixing of both. This is useful to subscribe and publish through\n the same broker using only one TCP connection.\n\nInstalation\n-----------\n\nJust type:\n\n``sudo pip install twisted-mqtt``\n\nor from GitHub:\n\n::\n\n git clone https://github.com/astrorafael/twisted-mqtt.git\n cd twisted-mqtt\n sudo python setup.py install\n\nCredits\n-------\n\nI started writting this software after finding `Adam Rudd's MQTT.py\ncode `__. A small\npart his code is still there. However, I soon began taking my own\ndirection both in design and scope.\n\nFunction/methods docstrings contain quotes of the OASIS\n`mqtt-v3.1.1 `__\nstandard.\n\nMQTT Version 3.1.1. Edited by Andrew Banks and Rahul Gupta. 29 October\n2014. OASIS Standard.\nhttp://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.\nLatest version:\nhttp://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.\n\nUsage\n-----\n\nThe APIs are described in the `library defined\ninterfaces `__\n\nThis library builds ``MQTTProtocol`` objects and is designed to be *used\nrather than inherited*.\n\nExamples\n~~~~~~~~\n\nThese examples show my library intended usage: managed by a service.\nYour Twisted application should probably be designed as a collection of\nservices and one of these would be an MQTT Service. Note that a service\nis simply an object that can be started by ``startService()`` and\nstopped by ``stopService()``.\n\nProbably you also want your service to handle automatic reconnections to\nthe MQTT broker and that's where Twisted's ``ClientService`` class comes\nin. A ``ClientService`` instance detects its transport has been closed\nand re-opens the connection to the MQTT Broker.\n\nHowever, this is not enough for the MQTT protocol since the broker\nexpects a CONNECT packet request shortly after the socket has been\nopened. For this reason, we must subclass ``ClientService`` to override\n``startService()``. Also we will add some MQTT connection/disconnection\nhandling code. This requires us to obtain somehow the protocol instance\nbuilt by the factory.\n\nIn the startup code, we create a ``ClientService`` instance, passing the\nproper MQTT protocol factory and we simply start the service. Inside\n``startService()`` we invoke ClientService's method ``whenConnected()``\nthat returns a ``Deferred``. This ``Deferred`` - when fired - will\ninvoke a user function with the protocol object been created as the\nparameter.\n\nOur custom ``ClientService`` subclass defines a ``connectToBroker()``\nmethod, receiving the protocol object just built. At minimun, we will\nstore a reference to this protocol for further reference. If we wish to\nhandle automatic reconnections, we should set the MQTT protocol\n``onDisconnection`` attribute to a callback that will handle what to do\nin such cases. Our service ``onDisconnection()`` callback will simple\ntell us to rebuild a new protocol instance and call\n``connectToBroker()`` again when done. In this way, we start the whole\nMQTT CONNECT thing all over again.\n\nFinally, our custom ``ClientService`` example subclass may define a\ncustom retry policy by customizing ``backoffPolicy()`` default arguments\n``initialDelay``, ``maxDelay`` and ``factor``. See the\n``twisted.application.internet.backoffPolicy()`` API reference for\nfurther details.\n\nPublisher Example\n~~~~~~~~~~~~~~~~~\n\nA publisher is built by obtaining a factory for the\n``MQTTFactory.PUBLISHER`` profile.\n\nYour MQTT Publisher service should configure a couple of things in the\n``connectToBroker()`` method:\n\n- The MQTT protocol ``onDisconnection`` attribute storing a callback\n that will be invoked when a disconnection occurs.\n- The maximun Window Size - that is - how many asynchronous PUBLISH\n request you will issue in a row to the library, before getting and\n acknowledge from the Broker (Qos=1 and 2 only). By thefault, the\n window size is 1 and this guarantees in-order delivery of published\n messages.\n\nThis example additionally starts a periodic task to publish sample data.\n\n.. code:: python\n\n import sys\n\n from twisted.internet import reactor, task\n from twisted.internet.defer import inlineCallbacks, DeferredList\n from twisted.application.internet import ClientService, backoffPolicy\n from twisted.internet.endpoints import clientFromString\n from twisted.logger import (\n Logger, LogLevel, globalLogBeginner, textFileLogObserver, \n FilteringLogObserver, LogLevelFilterPredicate)\n\n from mqtt.client.factory import MQTTFactory\n\n # ----------------\n # Global variables\n # ----------------\n\n # Global object to control globally namespace logging\n logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)\n\n BROKER = \"tcp:test.mosquitto.org:1883\"\n\n # -----------------\n # Utility Functions\n # -----------------\n\n def startLogging(console=True, filepath=None):\n '''\n Starts the global Twisted logger subsystem with maybe\n stdout and/or a file specified in the config file\n '''\n global logLevelFilterPredicate\n \n observers = []\n if console:\n observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout), \n predicates=[logLevelFilterPredicate] ))\n \n if filepath is not None and filepath != \"\":\n observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')), \n predicates=[logLevelFilterPredicate] ))\n globalLogBeginner.beginLoggingTo(observers)\n\n\n def setLogLevel(namespace=None, levelStr='info'):\n '''\n Set a new log level for a given namespace\n LevelStr is: 'critical', 'error', 'warn', 'info', 'debug'\n '''\n level = LogLevel.levelWithName(levelStr)\n logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)\n\n\n # -----------------------\n # MQTT Publishing Service\n # -----------------------\n\n class MQTTService(ClientService):\n\n def __init(self, endpoint, factory):\n ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())\n\n\n def startService(self):\n log.info(\"starting MQTT Client Publisher Service\")\n # invoke whenConnected() inherited method\n self.whenConnected().addCallback(self.connectToBroker)\n ClientService.startService(self)\n\n\n @inlineCallbacks\n def connectToBroker(self, protocol):\n '''\n Connect to MQTT broker\n '''\n self.protocol = protocol\n self.protocol.onDisconnection = self.onDisconnection\n # We are issuing 3 publish in a row\n # if order matters, then set window size to 1\n # Publish requests beyond window size are enqueued\n self.protocol.setWindowSize(3) \n self.task = task.LoopingCall(self.publish)\n self.task.start(5.0)\n try:\n yield self.protocol.connect(\"TwistedMQTT-pub\", keepalive=60)\n except Exception as e:\n log.error(\"Connecting to {broker} raised {excp!s}\", \n broker=BROKER, excp=e)\n else:\n log.info(\"Connected to {broker}\", broker=BROKER)\n\n\n def onDisconnection(self, reason):\n '''\n get notfied of disconnections\n and get a deferred for a new protocol object (next retry)\n '''\n log.debug(\" >< Connection was lost ! ><, reason={r}\", r=reason)\n self.whenConnected().addCallback(self.connectToBroker)\n\n\n def publish(self):\n \n\n def _logFailure(failure):\n log.debug(\"reported {message}\", message=failure.getErrorMessage())\n return failure\n\n def _logAll(*args):\n log.debug(\"all publihing complete args={args!r}\",args=args)\n\n log.debug(\" >< Starting one round of publishing >< \")\n d1 = self.protocol.publish(topic=\"foo/bar/baz1\", qos=0, message=\"hello world 0\")\n d1.addErrback(_logFailure)\n d2 = self.protocol.publish(topic=\"foo/bar/baz2\", qos=1, message=\"hello world 1\")\n d2.addErrback(_logFailure)\n d3 = self.protocol.publish(topic=\"foo/bar/baz3\", qos=2, message=\"hello world 2\")\n d3.addErrback(_logFailure)\n dlist = DeferredList([d1,d2,d3], consumeErrors=True)\n dlist.addCallback(_logAll)\n return dlist\n\n\n\n if __name__ == '__main__':\n import sys\n log = Logger()\n startLogging()\n setLogLevel(namespace='mqtt', levelStr='debug')\n setLogLevel(namespace='__main__', levelStr='debug')\n\n factory = MQTTFactory(profile=MQTTFactory.PUBLISHER)\n myEndpoint = clientFromString(reactor, BROKER)\n serv = MQTTService(myEndpoint, factory)\n serv.startService()\n reactor.run()\n\nSubscriber Example\n~~~~~~~~~~~~~~~~~~\n\nA subscriber is built by obtaining a factory for the\n``MQTTFactory.SUBSCRIBER`` profile.\n\nYour MQTT Subscriber service should configure the following things in\nthe ``connectToBroker()`` method:\n\n- The MQTT protocol ``onDisconnection`` attribute storing a callback\n that will be invoked when a disconnection occurs.\n- The maximun Window Size - that is - how many asynchronous SUBSCRIBE\n or UNSUBSCRIBE request you will issue in a row to the library, before\n getting and acknowledge from the Broker.\n- The MQTT protocol ``onPublish`` attribute storing a callback that\n will be fired whenever a new PUBLISH packed is delivered to the\n subscriber.\n\n.. code:: python\n\n import sys\n\n from twisted.internet.defer import inlineCallbacks, DeferredList\n from twisted.internet import reactor\n from twisted.internet.endpoints import clientFromString\n from twisted.application.internet import ClientService, backoffPolicy\n\n from twisted.logger import (\n Logger, LogLevel, globalLogBeginner, textFileLogObserver, \n FilteringLogObserver, LogLevelFilterPredicate)\n\n from mqtt.client.factory import MQTTFactory\n\n # ----------------\n # Global variables\n # ----------------\n\n # Global object to control globally namespace logging\n logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)\n\n BROKER = \"tcp:test.mosquitto.org:1883\"\n\n # -----------------\n # Utility Functions\n # -----------------\n\n def startLogging(console=True, filepath=None):\n '''\n Starts the global Twisted logger subsystem with maybe\n stdout and/or a file specified in the config file\n '''\n global logLevelFilterPredicate\n \n observers = []\n if console:\n observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout), \n predicates=[logLevelFilterPredicate] ))\n \n if filepath is not None and filepath != \"\":\n observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')), \n predicates=[logLevelFilterPredicate] ))\n globalLogBeginner.beginLoggingTo(observers)\n\n\n def setLogLevel(namespace=None, levelStr='info'):\n '''\n Set a new log level for a given namespace\n LevelStr is: 'critical', 'error', 'warn', 'info', 'debug'\n '''\n level = LogLevel.levelWithName(levelStr)\n logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)\n\n # -----------------------\n # MQTT Subscriber Service\n # ------------------------\n\n class MQTTService(ClientService):\n\n\n def __init(self, endpoint, factory):\n ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())\n\n\n def startService(self):\n log.info(\"starting MQTT Client Subscriber Service\")\n # invoke whenConnected() inherited method\n self.whenConnected().addCallback(self.connectToBroker)\n ClientService.startService(self)\n\n\n @inlineCallbacks\n def connectToBroker(self, protocol):\n '''\n Connect to MQTT broker\n '''\n self.protocol = protocol\n self.protocol.onPublish = self.onPublish\n self.protocol.onDisconnection = self.onDisconnection\n self.protocol.setWindowSize(3) \n try:\n yield self.protocol.connect(\"TwistedMQTT-subs\", keepalive=60)\n yield self.subscribe()\n except Exception as e:\n log.error(\"Connecting to {broker} raised {excp!s}\", \n broker=BROKER, excp=e)\n else:\n log.info(\"Connected and subscribed to {broker}\", broker=BROKER)\n\n\n def subscribe(self):\n\n def _logFailure(failure):\n log.debug(\"reported {message}\", message=failure.getErrorMessage())\n return failure\n\n def _logGrantedQoS(value):\n log.debug(\"response {value!r}\", value=value)\n return True\n\n def _logAll(*args):\n log.debug(\"all subscriptions complete args={args!r}\",args=args)\n\n d1 = self.protocol.subscribe(\"foo/bar/baz1\", 2 )\n d1.addCallbacks(_logGrantedQoS, _logFailure)\n\n d2 = self.protocol.subscribe(\"foo/bar/baz2\", 2 )\n d2.addCallbacks(_logGrantedQoS, _logFailure)\n\n d3 = self.protocol.subscribe(\"foo/bar/baz3\", 2 )\n d3.addCallbacks(_logGrantedQoS, _logFailure)\n\n dlist = DeferredList([d1,d2,d3], consumeErrors=True)\n dlist.addCallback(_logAll)\n return dlist\n\n\n def onPublish(self, topic, payload, qos, dup, retain, msgId):\n '''\n Callback Receiving messages from publisher\n '''\n log.debug(\"msg={payload}\", payload=payload)\n\n\n def onDisconnection(self, reason):\n '''\n get notfied of disconnections\n and get a deferred for a new protocol object (next retry)\n '''\n log.debug(\" >< Connection was lost ! ><, reason={r}\", r=reason)\n self.whenConnected().addCallback(self.connectToBroker)\n\n\n if __name__ == '__main__':\n import sys\n log = Logger()\n startLogging()\n setLogLevel(namespace='mqtt', levelStr='debug')\n setLogLevel(namespace='__main__', levelStr='debug')\n\n factory = MQTTFactory(profile=MQTTFactory.SUBSCRIBER)\n myEndpoint = clientFromString(reactor, BROKER)\n serv = MQTTService(myEndpoint, factory)\n serv.startService()\n reactor.run()\n\nPublisher/Subscriber Example\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\nA Publisher/Subscriber example is no more than a mix of the previous\nexamples, not forgetting to set the MQTT factory profile to\n``MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER``.\n\nDesign Notes\n------------\n\nThere is a separate ``MQTTProtocol`` in each module implementing a\ndifferent profile (subscriber, publiser, publisher/subscriber). The\n``MQTTBaseProtocol`` and the various ``MQTTProtocol`` classes implement\na State Pattern to avoid the \"if spaghetti code\" in the connection\nstates. A basic state machine is built into the ``MQTTBaseProtocol`` and\nthe ``ConnectedState`` is patched according to the profile.\n\nPrevious 0.1.x implementations used two separate (subclases, publisher)\nand with separate logic for both roles. The publisher/subscriber was a\nmixin class implemented by delegation that managed the connection state\nand forwarded all client requests and network events to the proper\ndelegate.\n\nHowever, this approach had some quirks and issues with sharing state. It\nhas been re-written to a single publisher/subscriber class that manages\neverything.\n\nTo maintain the former API, separate subclasses has been derived to\nimplement a pure subscriber or publisher roles. The subclassing simply\npatches the state machine in order to honor only the methods for a given\nrole.\n\nLimitations\n-----------\n\nThe current implementation has the following limitations:\n\n- This library does not claim to be full comformant to the standard.\n\n- There is a limited form of session persistance for the publisher.\n Pending acknowledges for PUBLISH and PUBREL are kept in RAM and\n outlive the connection and the protocol object while Twisted is\n running. However, they are not stored in a persistent medium.\n\nTODO\n----\n\nI wrote this library for my pet projects and learn Twisted. However, it\ngoes a long way from an apparently looking good library to an\nindustrial-strength, polished product. I don't simply have the time,\nenergy and knowledge to do so.\n\nSome areas in which this can be improved:\n\n- Bug fixing\n- Include a thorough test battery.\n- Improve documentation.\n- etc.", "description_content_type": "", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "http://github.com/astrorafael/twisted-mqtt/", "keywords": "Python Twisted", "license": "MIT", "maintainer": "", "maintainer_email": "", "name": "twisted-mqtt", "package_url": "https://pypi.org/project/twisted-mqtt/", "platform": "", "project_url": "https://pypi.org/project/twisted-mqtt/", "project_urls": { "Homepage": "http://github.com/astrorafael/twisted-mqtt/" }, "release_url": "https://pypi.org/project/twisted-mqtt/0.3.9/", "requires_dist": null, "requires_python": "", "summary": "MQTT client protocol package for Twisted", "version": "0.3.9" }, "last_serial": 4589550, "releases": { "0.3.2": [ { "comment_text": "", "digests": { "md5": "ac830bfe10b4af1ecf873b5a6f69e49c", "sha256": "dcff61548a6d7c757129631efc518a861c0ecdd2851020bb2c7646c15db609ec" }, "downloads": -1, "filename": "twisted-mqtt-0.3.2.tar.gz", "has_sig": false, "md5_digest": "ac830bfe10b4af1ecf873b5a6f69e49c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44458, "upload_time": "2016-11-04T17:27:40", "url": "https://files.pythonhosted.org/packages/ac/5a/a1f54f6621083a8c06e4c6258305db24c4f53822127836c37c07555f7585/twisted-mqtt-0.3.2.tar.gz" } ], "0.3.3": [ { "comment_text": "", "digests": { "md5": "2741a2bf0f195a4d3b7b2ceff9b3a0a8", "sha256": "ea866d5317e0c3e5c8313044c088f61e7152561a68a72a0e4cb9f30375bab25a" }, "downloads": -1, "filename": "twisted-mqtt-0.3.3.tar.gz", "has_sig": false, "md5_digest": "2741a2bf0f195a4d3b7b2ceff9b3a0a8", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44601, "upload_time": "2017-01-10T19:51:52", "url": "https://files.pythonhosted.org/packages/fe/24/d44d81770fc9eaee5cdb8ea8954abab9593a6f5e486c46e600c4fe037a20/twisted-mqtt-0.3.3.tar.gz" } ], "0.3.4": [ { "comment_text": "", "digests": { "md5": "3b4f64ea80ba95ef0ce1c24264ad5762", "sha256": "e9307483f09d14977ce52a3fd574c7ef4828bf2945df0be50d1866598eb36d35" }, "downloads": -1, "filename": "twisted-mqtt-0.3.4.tar.gz", "has_sig": false, "md5_digest": "3b4f64ea80ba95ef0ce1c24264ad5762", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44529, "upload_time": "2017-02-24T21:08:47", "url": "https://files.pythonhosted.org/packages/c1/76/b7fe69372ac4e58e84de7895a25b73943b6b17c0bbe5a067eca22aedfaf8/twisted-mqtt-0.3.4.tar.gz" } ], "0.3.6": [ { "comment_text": "", "digests": { "md5": "6fd2dd84eb13314badd63a69b059ca43", "sha256": "35a5e9b910d6fbaf9726476ad5c6311c8ae054650cd26734e760b6133233dcf0" }, "downloads": -1, "filename": "twisted-mqtt-0.3.6.tar.gz", "has_sig": false, "md5_digest": "6fd2dd84eb13314badd63a69b059ca43", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44588, "upload_time": "2018-02-22T12:45:33", "url": "https://files.pythonhosted.org/packages/41/da/fb746d79d74c7ae0753cda2613fb815e5760cdc345920fd55803cf50b707/twisted-mqtt-0.3.6.tar.gz" } ], "0.3.9": [ { "comment_text": "", "digests": { "md5": "81395e1888960089db199fdd1033729c", "sha256": "6f1fc12e8d9c5fccfa6bc00563840bb1a344c25ddc4538d9022e70019a4d2984" }, "downloads": -1, "filename": "twisted-mqtt-0.3.9.tar.gz", "has_sig": false, "md5_digest": "81395e1888960089db199fdd1033729c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44540, "upload_time": "2018-12-12T09:56:55", "url": "https://files.pythonhosted.org/packages/60/bb/8699acba19c441b07ed8d396803a4647b95b003c7f2fd6c85fbaebcfee31/twisted-mqtt-0.3.9.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "81395e1888960089db199fdd1033729c", "sha256": "6f1fc12e8d9c5fccfa6bc00563840bb1a344c25ddc4538d9022e70019a4d2984" }, "downloads": -1, "filename": "twisted-mqtt-0.3.9.tar.gz", "has_sig": false, "md5_digest": "81395e1888960089db199fdd1033729c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 44540, "upload_time": "2018-12-12T09:56:55", "url": "https://files.pythonhosted.org/packages/60/bb/8699acba19c441b07ed8d396803a4647b95b003c7f2fd6c85fbaebcfee31/twisted-mqtt-0.3.9.tar.gz" } ] }