{ "info": { "author": "Nipun Talukdar", "author_email": "nipunmlist@gmail.com", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.6", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.2", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Topic :: Software Development :: Libraries :: Python Modules" ], "description": "geeteventbus\n============\nAn eventbus for concurrent programming\n--------------------------------------\n\ngeeteventbus is a library that allows publish-subscribe-style communication. There is no need for the components to register to each-other. It is inspired by a Java library, Guava eventbus from Google. But it is not exactly same as the Guava eventbus library.\n\n- geeteventbus simplifies handling events from publishers and subscribers.\n- publisher and subscribers don't need to create threads to concurrently process the events.\n- the eventbus can be synchronus, where the events are delivered from the same thread posting the events\n- events can be delivered to subscibers in the same order they are posted\n- subscribers may be declared as thread-safe, in that case same subscriber may be invoked concurrently for processing multiple events\n- events for which there are no subscribers are registered yet are simply discared by the eventbus.\n- the eventbus is not to be used for inter process communication. Publishers and subsribers must run on the same process\n\nBasic working\n-------------\n\n1) We create an eventbus\n\n .. code:: python\n\n from geeteventbus.eventbus import eventbus\n eb = eventbus()\n\n This will create an eventbus with the defaults. The default eventbus will have below characteristics:\n\n 1) the maximum queued event limit is set to 10000\n 2) number of executor thread is 8\n 3) the subscribers will be called asynchronously\n 4) subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads\n \n2) Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:\n \n .. code:: python\n\n from geeteventbus.subscriber import subscriber\n from geeteventbus.eventbus import eventbus\n from geeteventbus.event import event\n\n class mysubscriber(subscriber):\n def process(self, eventobj):\n if not isinstance(eventobj, event):\n print('Invalid object type is passed.')\n return\n topic = eventobj.get_topic()\n data = eventobj.get_data()\n print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))\n \n subscr = mysubscriber()\n eb.register_consumer(subscr, 'an_important_topic')\n\n3) Post some events to the eventbus with the topic \"an_important_topic\".\n\n .. code:: python\n\n from geeteventbus.event import event\n\n eobj1 = ('an_important_topic', 'This is some data for the event 1')\n eobj2 = ('an_important_topic', 'This is some data for the event 2')\n eobj3 = ('an_important_topic', 'This is some data for the event 3')\n eobj3 = ('an_important_topic', 'This is some data for the event 4')\n \n eb.post(eobj1)\n eb.post(eobj2)\n eb.post(eobj3)\n eb.post(eobj4)\n\n4) We may gracefully shutdown the eventbus before exiting the process\n\n .. code:: python\n \n eb.shutdown()\n\n\nThe complete example is below:\n \n .. code:: python\n \n from time import sleep\n from geeteventbus.subscriber import subscriber\n from geeteventbus.eventbus import eventbus\n from geeteventbus.event import event\n\n class mysubscriber(subscriber):\n def process(self, eventobj):\n if not isinstance(eventobj, event):\n print('Invalid object type is passed.')\n return\n topic = eventobj.get_topic()\n data = eventobj.get_data()\n print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))\n \n \n eb = eventbus()\n subscr = mysubscriber()\n eb.register_consumer(subscr, 'an_important_topic')\n \n\n eobj1 = event('an_important_topic', 'This is some data for the event 1')\n eobj2 = event('an_important_topic', 'This is some data for the event 2')\n eobj3 = event('an_important_topic', 'This is some data for the event 3')\n eobj4 = event('an_important_topic', 'This is some data for the event 4')\n \n eb.post(eobj1)\n eb.post(eobj2)\n eb.post(eobj3)\n eb.post(eobj4)\n\n eb.shutdown()\n sleep(2)\n\n\nA more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for \na set of counters. It registers itself to an eventbus for receiving events for the \ncounters(topics). A set of producers update the values for the counters and post events describing\nthe counter to the eventbus:\n \n .. code:: python\n\n from threading import Lock, Thread\n from time import sleep, time\n from geeteventbus.eventbus import eventbus\n from geeteventbus.event import event\n from geeteventbus.subscriber import subscriber\n from random import randint\n\n\n class counter_aggregator(subscriber, Thread):\n '''\n Aggregator for a set of counters. Multiple threads updates the counts which\n are aggregated by this class and output the aggregated value periodically.\n '''\n def __init__(self, counter_names):\n Thread.__init__(self)\n self.counter_names = counter_names\n self.locks = {}\n self.counts = {}\n self.keep_running = True\n self.collect_times = {}\n for counter in counter_names:\n self.locks[counter] = Lock()\n self.counts[counter] = 0\n self.collect_times[counter] = time()\n\n def process(self, eobj):\n '''\n Process method calls with the event object eobj. eobj has the counter name as the topic\n and an int count as the value for the counter.\n '''\n counter_name = eobj.get_topic()\n if counter_name not in self.counter_names:\n return\n count = eobj.get_data()\n with self.locks[counter_name]:\n self.counts[counter_name] += count\n\n def stop(self):\n self.keep_running = False\n\n def __call__(self):\n '''\n Keep outputing the aggregated counts every 2 seconds\n '''\n while self.keep_running:\n sleep(2)\n for counter_name in self.counter_names:\n with self.locks[counter_name]:\n print('Change for counter %s = %d, in last %f secs' % (counter_name,\n self.counts[counter_name], time() - self.collect_times[counter_name]))\n self.counts[counter_name] = 0\n self.collect_times[counter_name] = time()\n print('Aggregator exited')\n\n\n class count_producer:\n '''\n Producer for counters. Every 0.02 seconds post the \"updated\" value for a\n counter randomly\n '''\n def __init__(self, counters, ebus):\n self.counters = counters\n self.ebus = ebus\n self.keep_running = True\n self.num_counter = len(counters)\n\n def stop(self):\n self.keep_running = False\n\n def __call__(self):\n while self.keep_running:\n ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))\n ebus.post(ev)\n sleep(0.02)\n print('producer exited')\n\n if __name__ == '__main__':\n ebus = eventbus()\n counters = ['c1', 'c2', 'c3', 'c4']\n subcr = counter_aggregator(counters)\n producer = count_producer(counters, ebus)\n for counter in counters:\n ebus.register_consumer(subcr, counter)\n threads = []\n i = 30\n while i > 0:\n threads.append(Thread(target=producer))\n i -= 1\n\n aggregator_thread = Thread(target=subcr)\n aggregator_thread.start()\n for thrd in threads:\n thrd.start()\n sleep(20)\n producer.stop()\n subcr.stop()\n sleep(2)\n ebus.shutdown()", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/nipuntalukdar/geeteventbus", "keywords": null, "license": "MIT", "maintainer": null, "maintainer_email": null, "name": "geeteventbus", "package_url": "https://pypi.org/project/geeteventbus/", "platform": "all", "project_url": "https://pypi.org/project/geeteventbus/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/nipuntalukdar/geeteventbus" }, "release_url": "https://pypi.org/project/geeteventbus/1.0/", "requires_dist": null, "requires_python": null, "summary": "An eventbus for highly concurrent system", "version": "1.0" }, "last_serial": 1564431, "releases": { "1.0": [ { "comment_text": "", "digests": { "md5": "5aabb993afe245038cad29eb0d875427", "sha256": "f290b7cee13adc033fe3dcbf198678aa8413dd81fed4b8a7f767ca8fb156915c" }, "downloads": -1, "filename": "geeteventbus-1.0.tar.gz", "has_sig": false, "md5_digest": "5aabb993afe245038cad29eb0d875427", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6803, "upload_time": "2015-05-27T11:08:33", "url": "https://files.pythonhosted.org/packages/b6/f9/ed589395b7cad6f7113a4e9e3ccb99f28f318901d517ae54aa3da15c537e/geeteventbus-1.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "5aabb993afe245038cad29eb0d875427", "sha256": "f290b7cee13adc033fe3dcbf198678aa8413dd81fed4b8a7f767ca8fb156915c" }, "downloads": -1, "filename": "geeteventbus-1.0.tar.gz", "has_sig": false, "md5_digest": "5aabb993afe245038cad29eb0d875427", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 6803, "upload_time": "2015-05-27T11:08:33", "url": "https://files.pythonhosted.org/packages/b6/f9/ed589395b7cad6f7113a4e9e3ccb99f28f318901d517ae54aa3da15c537e/geeteventbus-1.0.tar.gz" } ] }