{ "info": { "author": "Brett Haydon", "author_email": "brett@haydon.id.au", "bugtrack_url": null, "classifiers": [ "Development Status :: 3 - Alpha", "Framework :: Django", "Intended Audience :: Developers", "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.3", "Topic :: System :: Distributed Computing" ], "description": "django-pq\n==========\n\nA task queue with scheduling and simple workflow engine based on the elegant RQ_ but with a django postgresql backend, using postgresql's asynchronous notifications to wait for work.\n\nRQ sets a low barrier for entry, and django-pq takes it lower for sites that can\u2019t or don\u2019t want to use Redis in their stack. By using django-pq you are trading off throughput on cheap tasks for the transactional integrity of Postgresql. For tasks that are expected to complete in a few milliseconds or less such as internal messaging you can expect RQ to be at least 5x faster than django-pq. For expensive tasks taking 1/2 a second or more to complete the throughput of RQ and django-pq will be about the same. As such, django-pq is suitable for very low volume messaging or slow running task applications (see benchmarks below).\n\nDjango-pq is tested against Django 1.5, python 2.7, 3.3 with psycopg2 and pypy 2.0 with psycopg2cffi\n\nSource repository at https://github.com/bretth/django-pq/.\n\nInstallation\n--------------\n\n.. code-block:: bash\n\n $ pip install django-pq\n\nAdd ``pq`` to your ``INSTALLED_APPS`` in your django settings.\n\nYou must ensure your Postgresql connections options have autocommit set to True. This is enabled by default beyond Django 1.5 but in 1.5 and earlier you should set it via ``'OPTIONS': {'autocommit': True}`` in your database settings. You may also need to set ``PQ_DEFAULT_WORKER_TTL`` if you use pooling software or your postgresql installation does not support Postgresql messaging. See `Troubleshooting Workers`_ for more.\n\nGetting started\n----------------\n\nIf you have used RQ then you\u2019ll know django-pq but lets start with the RQ example.\n\n.. code-block:: python\n\n import requests\n\n def count_words_at_url(url):\n resp = requests.get(url)\n return len(resp.text.split())\n\nCreate the queue.\n\n.. code-block:: python\n\n from pq import Queue\n q = Queue()\n\nEnqueue the function.\n\n.. code-block:: python\n\n q.enqueue(count_words_at_url, 'http://python-rq.org')\n\nConsume your queue with a worker.\n\n.. code-block:: bash\n\n $ python manage.py pqworker --burst\n *** Listening for work on default\n Got count_words_at_url('http://python-rq.org') from default\n Job result = 818\n *** Listening for work on default\n\n\nQueues\n---------\n\nSince django-pq uses django models we have one piece of syntactic sugar to maintain compatibility with RQ.\n\n.. code-block:: python\n\n from pq import Queue\n # create a default queue called 'default'\n queue = Queue()\n\nIs syntactic sugar for:\n\n.. code-block:: python\n\n from pq.queue import Queue\n queue = Queue.create()\n\nSome more queue creation examples:\n\n.. code-block:: python\n\n # name it\n q = Queue('farqueue')\n\n # run synchronously when settings.DEBUG == True\n from django.conf import settings\n\n q = Queue(async=not settings.DEBUG)\n\n # Up the timeout for slow jobs to 10 minutes\n q = Queue(timeout=600)\n\n # Connect to a different settings.DATABASES alias named 'happy-db'\n q = Queue(connection='happy-db')\n\nDefine or import a function or class method to enqueue:\n\n.. code-block:: python\n\n def say_hello(name=None):\n \"\"\"A job with a single argument and a return value.\"\"\"\n if name is None:\n name = 'Stranger'\n return 'Hi there, %s!' % (name,)\n\n class Calculator(object):\n \"\"\"Test instance methods.\"\"\"\n def __init__(self, denominator):\n self.denominator = denominator\n\n def calculate(self, x, y):\n return x * y / self.denominator\n\nEnqueue your jobs in any of the following ways:\n\n.. code-block:: python\n\n q.enqueue(say_hello, 'You')\n q.enqueue_call(say_hello, args=('You',))\n q.enqueue_call(say_call, kwargs={'name': 'You'})\n\n # then with a shorter timeout than 10 minutes\n q.enqueue_call(say_hello, args=('again',), timeout=60)\n\n #Instance methods:\n calc = Calculator(2)\n q.enqueue(calc.calculate, 4, 5)\n q.enqueue_call(calc.calculate, args=(4,5))\n\n # with the @job decorator\n from pq.decorators import job\n\n # decorate the function to be processed by the 'default' queue\n @job('default')\n def say_hello(name=None):\n \"\"\"A job with a single argument and a return value.\"\"\"\n if name is None:\n name = 'Stranger'\n return 'Hi there, %s!' % (name,)\n\n # add a job to the queue\n job = say_hello.delay('friend')\n\nFinally, there is a management command to enqueue from the command line:\n\n.. code-block:: bash\n\n $python manage.py pqenqueue pq.utils.test_job\n $python manage.py pqenqueue test_pq.fixtures.say_hello Bob --timeout=10\n\n\nSerial Queues\n--------------\n\nA serial queue exists which soft locks the queue for the task being performed. Additional tasks can be enqueued but not performed while the current task is being performed.\n\n.. code-block:: python\n\n from pq import SerialQueue\n\n sq = SerialQueue()\n\nA default serial queue is created called 'serial'. Serial queues are not in RQ.\n\nScheduling\n-----------\n\nTasks can be scheduled at specific times, repeated at intervals, repeated until a given date, and performed in a specific time window and weekday. Unlike a cron job, a scheduled task is a promise not a guarantee to perfom a task at a specific datetime. Timezone awareness depends on your ``USE_TZ`` django setting, and the task will be performed if a worker is available and idle. Some examples:\n\n.. code-block:: python\n\n from django.utils.timezone import utc, now\n from dateutil.relativedelta import relativedelta\n from datetime import datetime\n\n # you should use timezone aware dates if you have USE_TZ=True\n future = datetime(2014,1,1, tzinfo=utc)\n q = Queue(scheduled=True)\n\n # The simple enqueue like call\n q.schedule(future, say_hello, 'you')\n\n # A more complicated enqueue_call style version\n q.schedule_call(future, say_hello, args=('Happy New Year',), timeout=60)\n\n # or to repeat 10 times every 60 seconds\n q.schedule_call(now(), say_hello, args=('you & you',), repeat=10, interval=60)\n\n # to repeat indefinitely every day\n q.schedule_call(now(), say_hello, args=('groundhog day',), repeat=-1, interval=60*60*24)\n\n # ensure the schedule falls within a time range\n q.schedule_call(now(), say_hello, args=('groundhog day',),\n repeat=-1, interval=60*60*24, between='2:00/18:30')\n # could also use variants like '2.00-18.30' or '2-18:30'\n\n # repeat on Monday to Friday\n from dateutil.relativedelta import MO, TU, WE, TH, FR\n\n q.schedule_call(dt, do_nothing, repeat=-1, weekdays=(MO, TU, WE, TH, FR))\n # as integers, Monday to Wednesday\n q.schedule_call(dt, do_nothing, repeat=-1, weekdays=(0, 1, 2,))\n\n ## repeat on timedelta or relativedelta instances\n\n # repeat on the first indefinitely starting next month\n n = now()\n dt = datetime(n.year,n.month+1,1, tzinfo=utc)\n monthly = relativedelta(months=1)\n\n q.schedule_call(dt, say_hello, args=('groundhog day',), repeat=-1, interval=monthly)\n\n # or repeat on the last day of the month until 2020\n monthly = relativedelta(months=1, days=-1)\n until = datetime(2020,1,1, tzinfo=utc)\n\n q.schedule_call(dt, say_hello, args=('groundhog day',), repeat=until, interval=monthly)\n\nFrom the commandline:\n\n.. code-block:: bash\n\n # schedule now, repeat daily on Mondays and Tuesdays \n $ ./manage.py pqschedule now package.module.func \\\n --repeat=-1 --interval=86400 --mo --tu\n\n # schedule a one of task for the first day of 2020\n $ ./manage.py pqschedule 2020-01-01 package.module.func\n # at a specific time (any ISO8601 date/datetime)\n $ ./manage.py pqschedule 2020-01-01T9:10 package.module.func arg1 arg2 \n\n\nScheduling is a proposed feature of RQ so the api may change.\n\nWorkFlows\n----------\n\nA simple workflow engine class ``Flow`` allows executing a specific set of tasks in sequence, each task dependent on the prior one completing.\n\n.. code-block:: python\n\n from pq import Queue, Flow\n from datetime import datetime\n\n q = Queue()\n with Flow(q) as f:\n f.enqueue(first_task)\n f.enqueue_call(another_task, args=(1,2,3))\n f.schedule(datetime(2020,1,1), mission_to_mars)\n\n # or name the flow\n with Flow(q, name='myflow') as f:\n ...\n\n # access the job ids\n f.jobs\n\n # A Flow is stored in a django FlowStore instance. To retrieve them.\n fs = f.get(f.id)\n\n # or get a queryset of FlowStore instances by name\n fs_list = fs.get('myflow')\n\n # This is just a shortcut for accessing the FlowStore objects directly through the orm.\n from pq.flow import FlowStore\n fs = FlowStore.objects.get(pk=f.id)\n fs = FlowStore.objects.filter(name='myflow')\n\nWorkflows are not part of RQ.\n\nResults\n---------\n\nBy default, jobs should execute within 10 minutes. You can alter the default time in your django ``PQ_DEFAULT_JOB_TIMEOUT`` setting. After that, the worker kills the work horse and puts the job onto the failed queue, indicating the job timed out.\n\nIf a job requires more (or less) time to complete, the default timeout period can be loosened (or tightened), by specifying it as a keyword argument to the Queue.enqueue() call, like so:\n\n.. code-block:: python\n\n q = Queue()\n q.enqueue(func=mytask, args=(foo,), kwargs={'bar': qux}, timeout=600)\n\n\nCompleted jobs hang around for a minimum TTL (time to live) of 500 seconds. Since Postgres doesn\u2019t have an expiry option like Redis the worker will periodically poll the database for jobs to delete hence the minimum TTL. The TTL can be altered per job or through a django setting ``PQ_DEFAULT_RESULT_TTL``. If you are using workflows, a FlowStore instance has the same TTL as its final job, so they will be cleaned up too.\n\n.. code-block:: python\n\n q.enqueue(func=mytask, result_ttl=0) # out of my sight immediately\n q.enqueue(func=mytask, result_ttl=86400) # love you long time\n q.enqueue(func=mytask, result_ttl=-1) # together forever baby!\n\nWorkers\n--------\n\nWork is done through pqworker, a django management command. To accept work on all queues, ``$ python manage.py pqworker``.\n\nTo accept work on the fictional ``high``, ``low`` queues:\n\n.. code-block:: bash\n\n $ python manage.py pqworker high low\n *** Listening for work on high, low\n Got send_newsletter('me@example.com') from default\n Job ended normally without result\n *** Listening for work on high, low\n\nIf you don\u2019t see any output you might need to configure your django project ``LOGGING``. Here\u2019s an example configuration that will print to the console:\n\n.. code-block:: python\n\n LOGGING = {\n 'version': 1,\n 'disable_existing_loggers': True,\n 'formatters': {\n 'standard': {\n 'format': '[%(levelname)s] %(name)s: %(message)s'\n },\n },\n 'handlers': {\n 'console':{\n 'level':'INFO',\n 'class':\"logging.StreamHandler\",\n 'formatter': 'standard'\n },\n },\n 'loggers': {\n 'pq': {\n 'handlers': ['console'],\n 'level': 'INFO',\n 'propagate': True\n },\n }\n }\n\n\n\nQueue priority is in the order they are listed, so if the worker never finishes processing the high priority queue the other queues will never be consumed.\n\nTo exit after all work is consumed:\n\n.. code-block:: bash\n\n $ ./manage.py pqworker default \u2014-burst\n\nMore examples:\n\n.. code-block:: bash\n\n $ ./manage.py pqworker default --name=doug # change the name from the default hostname\n $ ./manage.py pqworker default --connection=[your-db-alias] # use a different database alias instead of default\n $ ./manage.py pqworker default --sentry-dsn=SENTRY_DSN # can also do this in settings at SENTRY_DSN\n\n\nTo implement a worker in code:\n\n.. code-block:: python\n\n from pq import Worker\n from pq import Queue\n q = Queue()\n\n w = Worker(q)\n w.work(burst=True)\n\n\nTroubleshooting Workers\n------------------------\n \nThe django-pq worker depends on postgresql messaging (LISTEN and NOTIFY) to avoid polling the database. This functionality may not be available on all postgresql installations, and connection pooling may also prevent messaging from working correctly. In the event jobs are not being received instantly you can set ``PQ_DEFAULT_WORKER_TTL = 60`` to poll the database for jobs every 60 seconds. To test if your jobs will go through instantly run ``python manage.py pqworker default`` (a worker on the 'default' queue) in one terminal and a test job in another terminal, ``python manage.py pqenqueue pq.utils.test_job``.\n\nDepending on your hosting environment down-scaling, terminating, or deploying environments with dependent worker processes may not give your workers enough time to complete their task. You can gracefully terminate all workers with a blocking command, ``./manage.py pqworker --terminate``, or use the admin to stop individual workers. In the event the worker is terminated before the job is complete, the job will remain in the dequeued admin list with a 'started' status. \n\n\nMonitoring & Admin\n----------------------\n\nJobs are monitored or administered as necessary through the django admin. Admin actions allow jobs to be requeued or deleted.\n\nWorkers can be stopped within 1 job cycle in the admin by setting the Worker stop. \n\nConnections\n------------\n\nDjango-pq uses the django postgresql backend in place of the RQ Redis connections, so you pass in a connection by referring to it's alias in your django DATABASES settings. Surprise surprise we use 'default' if no connection is defined.\n\n.. code-block:: python\n\n q = Queue(connection='default')\n w = Worker.create(connection='default')\n\nWorkers and queues can be on different connections but workers can only work on multiple queues sharing the same connection. Workers not in burst mode recycle their connections every ``PQ_DEFAULT_WORKER_TTL`` seconds but block and listen for async notification from postgresql that a job has been enqueued.\n\nThe admin connection for job lists can be set via ``PQ_ADMIN_CONNECTION``.\n\nExceptions\n-----------\n\nJobs that raise exceptions go to the ``failed`` queue. You can register a custom handler as per RQ:\n\n.. code-block:: python\n\n w = Worker.create([q], exc_handler=my_handler)\n\n def my_handler(job, exc_type, exc_value, traceback):\n # do custom things here\n # for example, write the exception info to a DB\n ...\n # You might also see the three exception arguments encoded as:\n\n def my_handler(job, *exc_info):\n # do custom things here\n\n\nSettings\n---------\n\nAll settings are optional. Defaults listed below.\n\n.. code-block:: python\n\n SENTRY_DSN # as per sentry\n PQ_DEFAULT_RESULT_TTL = 500 # minumum ttl for jobs\n PQ_DEFAULT_WORKER_TTL = 420 # worker will refresh the connection (and poll the database)\n PQ_DEFAULT_JOB_TIMEOUT = 600 # jobs that exceed this time are failed\n PQ_ADMIN_CONNECTION = 'default' # the connection to use for the admin\n PQ_QUEUE_CACHE = True # Queue is cached in local memory on creation or retrieval\n\nBenchmarks & other lies\n-------------------------\n\nTo gauge rough performance a ``pqbenchmark`` management command is included that is designed to test worker throughput while jobs are being enqueued. The command will enqueue the function ``do_nothing`` a number of times and simultaneously spawn workers to consume the benchmark queue. After enqueuing is completed a count is taken of the number of jobs remaining and an approximate number of jobs/s is calculated. There are a number of factors you can adjust to simulate your load, and as a bonus it can test RQ. For example:\n\n.. code-block:: bash\n\n # Simulate trivial tasks with default settings.\n # Useful for comparing raw backend overhead.\n # 100,000 jobs and 1 worker.\n $ django-admin.py pqbenchmark\n\n # Simulate a slower running task.\n # Useful for seeing how many workers you can put on a task\n # Enqueue 50000 jobs with 4 workers and a 250 millisecond job execution time:\n $ django-admin.py pqbenchmark 50000 -w4 --sleep=250\n\n # If rq/redis is installed you can compare.\n $ django-admin.py pqbenchmark 50000 -w4 --sleep=250 --backend=rq\n\nStarting with an unrealistic benchmark on a Macbook Pro 2.6Ghz i7 with 8GB ram and 256 GB SSD drive I get the following jobs per second throughput with Postresapp (9.2.2.0), Redis Server (2.6.11) with 100,000 enqueued jobs on default settings:\n\n+-----------+-----------+-----------+\n| Workers | PQ-Py2.7 | RQ-Py2.7 |\n+===========+===========+===========+\n| 1 | 28 | 158 |\n+-----------+-----------+-----------+\n| 2 | 42 | 256 |\n+-----------+-----------+-----------+\n| 4 | 46 | 362 |\n+-----------+-----------+-----------+\n| 6 | 45 | 399 |\n+-----------+-----------+-----------+\n\nThese results are unrealistic except to show theoretical differences between PQ and RQ. A commodity virtual server without the benefit of a local SSD for Postgresql will widen the gap dramatically between RQ and PQ, but as you can see from the numbers RQ is a far better choice for higher volumes of cheap tasks such as messaging. Unfortunately PQ needs to reset database connections between jobs which is main impediment to scaling workers.\n\nThe point of a task queue however is to process slower tasks, so simulating a slower task that has 250ms overhead (or greater) is more realistic and the task queue overhead becomes less significant, worker scaling more practical. So adjusting ``--sleep`` to 250:\n\n+-----------+-----------+-----------+\n| Workers | PQ-Py2.7 | RQ-Py2.7 |\n+===========+===========+===========+\n| 1 | 3.4 | 3.9 |\n+-----------+-----------+-----------+\n| 2 | 6.8 | 7.8 |\n+-----------+-----------+-----------+\n| 4 | 13.6 | 15.3 |\n+-----------+-----------+-----------+\n| 6 | 17.5 | 22.8 |\n+-----------+-----------+-----------+\n| 10 | 33.2 | 37.6 |\n+-----------+-----------+-----------+\n| 20 | 44.5 | 75.9 |\n+-----------+-----------+-----------+\n\nOnce your tasks get out beyond 250ms the differences between PQ and RQ become much more marginal. The important factor here are the tasks themselves, and how well your backend scales in memory usage and IO to the number of connections. Obviously again the quasi-persistent RQ is going to scale better than your average disk bound postgresql installation. In general, the slower the task the better PQ will scale connections (since it has to reset connections less often).\n\nDevelopment & Issues\n---------------------\n\nContributions, questions and issues welcome on github.\n\nUnit testing with tox, nose2 and my nose2django plugin. To run the tests, clone the repo then:\n\n.. code-block:: bash\n\n $ pip install tox\n $ tox\n\nI have been judicious about which tests were ported across from RQ, but hooray for tests. To make it easier to panel-beat smashed code django-pq does use setUp as its creator intended.\n\nI intend to stick as closely to the documented RQ api as possible with minimal divergence.\n\nAcknowledgements\n-----------------\n\nWithout RQ (and by extension Vincent Driessen), django-pq would not exist since a fair slab of the codebase comes from that project. RQ_ is licensed according the BSD license here_.\n\n.. _https://github.com/bretth/django-pq/: https://github.com/bretth/django-pq/\n.. _RQ: http://python-rq.org\n.. _here: https://raw.github.com/nvie/rq/master/LICENSE\n\n.. :changelog:\n\n\nChangelog\n---------\n\nv0.3.1 (2013-04-20)\n^^^^^^^^^^^^^^^^^^^\n\n- Fix issue with serial queue names\n- admin fixes\n\nv0.3 (2013-04-10)\n^^^^^^^^^^^^^^^^^\n\n- Added south migrations\n- Added a simple workflow class Flow\n- Added weekdays option for scheduled_call/enqueued_call\n\nv0.2 (2013-04-6)\n^^^^^^^^^^^^^^^^^\n\n- Implement scheduled tasks\n- Implement a serial queue\n\n\nv0.1 (2013-03-29)\n^^^^^^^^^^^^^^^^^\n\n- Initial public release", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/bretth/django-pq", "keywords": "django\nasynchronous\ntasks\nqueue", "license": "BSD", "maintainer": null, "maintainer_email": null, "name": "django-pq", "package_url": "https://pypi.org/project/django-pq/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/django-pq/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/bretth/django-pq" }, "release_url": "https://pypi.org/project/django-pq/0.3.2/", "requires_dist": null, "requires_python": null, "summary": "A task queue based on the RQ api with a postgresql backend", "version": "0.3.2" }, "last_serial": 939714, "releases": { "0.3.2": [ { "comment_text": "", "digests": { "md5": "2a4dd625d83db977cc6358a3a025ae05", "sha256": "62fff46f224eef284910906fe2977ac93f46b08d582d96331500bf7f15d42fb7" }, "downloads": -1, "filename": "django-pq-0.3.2.tar.gz", "has_sig": false, "md5_digest": "2a4dd625d83db977cc6358a3a025ae05", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 39325, "upload_time": "2013-12-09T09:59:27", "url": "https://files.pythonhosted.org/packages/61/6d/4b5c5c36091b15fbbe500835fba86a5098415f2d79c550ffa86a45dbe3f1/django-pq-0.3.2.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "2a4dd625d83db977cc6358a3a025ae05", "sha256": "62fff46f224eef284910906fe2977ac93f46b08d582d96331500bf7f15d42fb7" }, "downloads": -1, "filename": "django-pq-0.3.2.tar.gz", "has_sig": false, "md5_digest": "2a4dd625d83db977cc6358a3a025ae05", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 39325, "upload_time": "2013-12-09T09:59:27", "url": "https://files.pythonhosted.org/packages/61/6d/4b5c5c36091b15fbbe500835fba86a5098415f2d79c550ffa86a45dbe3f1/django-pq-0.3.2.tar.gz" } ] }