{ "info": { "author": "Roger Ineichen, Projekt01 GmbH", "author_email": "dev@projekt01.ch", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "Environment :: Web Environment", "Framework :: Zope3", "Intended Audience :: Developers", "License :: OSI Approved :: Zope Public License", "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python", "Topic :: Internet :: WWW/HTTP" ], "description": "This package provides a remote processing queue for Zope3 using the mongodb\ninstead of ZODB.\n\n\n======\nREADME\n======\n\nThis package offers a remote processor. This remote processor is implemented as\na simple object using the mongodb as storage. The processor can execute pre\ndefined jobs in another thread. It is also possible to run jobs at specific\ntime using the different scheduler items.\n\nThe RemoteProcessor uses two different processor. One processes jobs and the\nother pickes items from the scheduler and is adding jobs. This separation\nis usefull if you implement a distributed concept. This means one or more\napplication can schedule job items based on the given scheduler items. And\nanother application is processing jobs and doesn't know about how to scheduling\nnext items.\n\nSince we use this remote scheduler for low CPU intensive jobs, we offer multi\nprocessing. This is done by running more then one worker in the main worker\nthread. If you use subprocess for your job processing, you will get a real\nmultiprocessing processor which isn't limited to the current python process.\n\nYou can configure the amount of threads which a job worker can start in the\nremote processor. See jobWorkerArguments/maxThreads. By default this number\nuses the amount of CPU installed on your machine.\n\nThe implementation uses a mongodb as a storage for it's component. This means\njobs, job factories and scheduler items get stored in the mongodb using the\nORM concept given from m01.mongo.\n\nSee p01.remote for a ZODB based remote processor implementation but take care\nthe p01.remote implementation doesn't provide the worker and scheduler\nprocessor separation. At least not yet.\n\n\nSetup\n-----\n\n >>> import transaction\n >>> from pprint import pprint\n >>> import zope.component\n >>> import m01.mongo\n >>> from m01.mongo import UTC\n >>> import m01.remote.job\n >>> from m01.remote import testing\n\nLet's now start by create two a remote processor. We can use our remote queue\nsite implementation:\n\n >>> from zope.security.proxy import removeSecurityProxy\n >>> from m01.remote import interfaces\n\nOur test remote processor should be available as application root:\n\n >>> rp = root\n >>> rp\n \n\nLet's discover the available jobs:\n\n >>> dict(root._jobs)\n {}\n\nThe job container is initially empty, because we have not added any job\nfactory. Let's now define a job factory that simply echos an input string:\n\n >>> echoJob = testing.EchoJob({})\n\nNow we can set the job input:\n\n >>> echoJob.input = {'foo': u'blah'}\n\nThe only API requirement on the job is to be callable. Now we make sure that\nthe job works. Note we call our job with the remote processor instance which\nis our initialized application root:\n\n >>> echoJob(root)\n {'foo': u'blah'}\n\nLet's add the job to the available job list:\n\n >>> rp.addJobFactory(u'echo', echoJob)\n\nThe echo job is now available in the remote processor:\n\n >>> dict(rp._jobFactories)\n {u'echo': }\n\nSince the remote processor cannot instantaneously complete a job, incoming jobs\nare managed by a queue. First we request the echo job to be executed:\n\n >>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'})\n >>> jobid1\n u'...'\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'queued']\n\nThe ``addJob()`` function schedules the job called \"echo\" to be executed\nwith the specified arguments. The method returns a job id with which we can\ninquire about the job. The ``addJob()`` function marks a job as queued.\n\n >>> rp.getJobStatus(jobid1)\n u'queued'\n\nSince the job has not been processed, the status is set to \"queued\". Further,\nthere is no result available yet:\n\n >>> rp.getJobResult(jobid1) is None\n True\n\nAs long as the job is not being processed, it can be cancelled:\n\n >>> rp.cancelJob(jobid1)\n >>> rp.getJobStatus(jobid1)\n u'cancelled'\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled']\n\nThe worker processor isn't being started by default:\n\n >>> rp.isProcessing\n False\n\nTo get a clean logging environment let's clear the logging stack::\n\n >>> logger.clear()\n\nNow we can start the remote processor by calling ``startProcessor``:\n\n >>> rp.startProcessor()\n\nand voila - the remote processor is processing:\n\n >>> rp.isProcessing\n True\n\nChecking out the logging will prove the started remote processor:\n\n >>> print logger\n m01.remote INFO\n Processor 'root-worker' started\n\nLet's stop the processor again:\n\n >>> rp.stopProcessor()\n >>> rp.isProcessing\n False\n\nNow let's get a result from a processed job but first commit the new added job:\n\n >>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'})\n >>> transaction.commit()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'queued']\n\nNow create a worker and process the new jobs by calling our simple worker:\n\n >>> class FakeWorker(object):\n ...\n ... def __init__(self, rp):\n ... self.rp = rp\n ...\n ... def __call__(self):\n ... try:\n ... result = self.rp.processNextJob()\n ... transaction.commit()\n ... except Exception, error:\n ... transaction.commit()\n\n >>> worker = FakeWorker(rp)\n >>> worker()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed']\n\nFirst check if the job get processed:\n\n >>> rp.getJobStatus(jobid2)\n u'completed'\n\n >>> rp.getJobResult(jobid2)\n {u'foo': u'bar'}\n\n\nError handling\n--------------\n\nNow, let's define a new job that causes an error:\n\n >>> errorJob = testing.RemoteExceptionJob()\n >>> rp.addJobFactory(u'error', errorJob)\n\nNow add and execute it:\n\n >>> jobid3 = rp.addJob(u'error')\n >>> transaction.commit()\n >>> worker()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed', u'error']\n\nLet's now see what happened:\n\n >>> rp.getJobStatus(jobid3)\n u'error'\n >>> errors = rp.getJobErrors(jobid3)\n >>> errors\n []\n\nSuch a JobError item provides the following data:\n\n >>> error = tuple(errors)[0]\n >>> data = error.dump()\n >>> data = m01.mongo.dictify(data)\n >>> pprint(data)\n {'_id': ObjectId('...'),\n '_type': u'JobError',\n 'created': datetime.datetime(..., ..., ..., ..., ..., ..., ..., tzinfo=),\n 'tb': u\"

Traceback (most recent call last):...\"}\n\nAs you can see the traceback stored as tb is the most important information:\n\n >>> print data['tb']\n

Traceback (most recent call last):

\n
    \n
  • Module m01.remote.processor, line 297, in _processJob
    \n job.output = job(self)
  • \n
  • Module m01.remote.testing, line 86, in __call__
    \n raise exceptions.RemoteException('An error occurred.')
  • \n

RemoteException: An error occurred.
\n

\n\nTry at also with a not so nice error:\n\n >>> fatalJob = testing.FatalExceptionJob()\n >>> rp.addJobFactory(u'fatal', fatalJob)\n\nNow add and execute it:\n\n >>> jobid4 = rp.addJob(u'fatal')\n >>> transaction.commit()\n >>> worker()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed', u'error', u'queued']\n\n >>> job4 = rp._jobs[jobid4]\n >>> job4.retryCounter\n 1\n >>> job4.status == u'queued'\n True\n\n >>> job4.errors\n []\n\nAnd process the job again but first set our retryTime to an outdated value which\nwill simulate that time passes since our last call:\n\n >>> import datetime\n >>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)\n >>> transaction.commit()\n >>> worker()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed', u'error', u'queued']\n\n >>> job4 = rp._jobs[jobid4]\n >>> job4.retryCounter\n 2\n\n >>> job4.errors\n [,\n ]\n\nAnd process the job again the 3rd time. Now it does not re-raise the exception\nbut the error message get appended to the error list.\n\n >>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)\n >>> transaction.commit()\n >>> worker()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed', u'error', u'error']\n\nLet's now see what happened:\n\n >>> job4 = rp._jobs[jobid4]\n >>> job4.retryCounter\n 3\n\n >>> job4.status\n u'error'\n\n >>> rp.getJobStatus(jobid4)\n u'error'\n\n >>> job4.errors\n [,\n ,\n ]\n\n >>> rp.getJobErrors(jobid4)\n [,\n ,\n ]\n\n\nFor management purposes, the remote processor also allows you to inspect all\njobs:\n\n >>> pprint(dict(rp._jobs))\n {u'...': ,\n u'...': ,\n u'...': ,\n u'...': }\n\n\nTo get rid of jobs not needed anymore we can use the reomveJobs method.\n\n >>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'})\n >>> transaction.commit()\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'cancelled', u'completed', u'error', u'error', u'queued']\n\n >>> rp.removeJobs()\n {u'cancelled': 1, u'completed': 1, u'error': 2}\n\n >>> sorted([job.status for job in rp._jobs.values()])\n [u'queued']\n\nNow process the last pending job and make sure we do not get more jobs:\n\n >>> rp.pullNextJob()\n \n\n\nThreading behavior\n------------------\n\nEach remote processor runs in a separate thread, allowing them to operate\nindependently. Jobs should be designed to avoid conflict errors.\n\nLet's start the remote processor we have defined at this point, and see what\nthreads are running as a result::\n\n >>> rp.startProcessor()\n\n >>> import pprint\n >>> import threading\n\n >>> def show_threads():\n ... threads = [t for t in threading.enumerate()\n ... if t.getName().startswith('root')]\n ... threads.sort(key=lambda t: t.getName())\n ... pprint.pprint(threads)\n\n >>> show_threads()\n []\n\nLet's stop the remote processor, and give the background threads a chance to get\nthe message::\n\n >>> rp.stopProcessor()\n\n >>> import time\n >>> time.sleep(2)\n\nThe threads have exited now::\n\n >>> print [t for t in threading.enumerate()\n ... if t.getName().startswith('root')]\n []\n\n\n===========\nJob Workers\n===========\n\nThe actual processing of the jobs in a queue is handled by a spearate\ncomponent, known as a job worker. This component usually runs in its own\nthread and provides its own main loop.\n\n >>> import time\n >>> import transaction\n\nThe ``worker`` module provides a job worker which executes one job at a\ntime. Another worker is scheduling new jobs items beased on scheduler item\nsettings. Let's create the necessary components to test the job worker:\n\n1. Create the remote processor:\n\n >>> from m01.remote import testing\n >>> rp = root\n >>> rp.isProcessing\n False\n\n >>> rp.isScheduling\n False\n\n2. Register a job that simply sleeps and writes a message:\n\n >>> data = {'retryDelay': 1}\n >>> sleepJob = testing.SleepJob(data)\n >>> rp.addJobFactory(u'sleep', sleepJob)\n\n\nSimpleJobWorker\n---------------\n\nThis worker executes one job at a time. It was designed for jobs that would\ntake a long time and use up most of the processing power of a computer.\n\nLet's first register a few jobs:\n\n >>> jobid1 = rp.addJob(u'sleep', (0.04, 1))\n >>> time.sleep(0.2)\n >>> jobid2 = rp.addJob(u'sleep', (0.1, 2))\n >>> time.sleep(0.2)\n >>> jobid3 = rp.addJob(u'sleep', (0, 3))\n >>> time.sleep(0.2)\n >>> jobid4 = rp.addJob(u'sleep', (0.08, 4))\n >>> time.sleep(0.2)\n >>> transaction.commit()\n\nNow let's first check if we can aceess the jobs:\n\n >>> job = rp._jobs.get(jobid1)\n >>> job\n \n\nAnd let's try if the job is ready for processing:\n\n >>> rp.getJobStatus(jobid1)\n u'queued'\n\n >>> rp.getJobStatus(jobid2)\n u'queued'\n\n >>> rp.getJobStatus(jobid3)\n u'queued'\n\n >>> rp.getJobStatus(jobid4)\n u'queued'\n\nLet's start by executing a job directly. The first argument to the simple\nworker constructor is the remote processor instance. All other arguments are\noptional and can be defined as worker rguments in the RemoteProcessor class,\nsee jobWorkerArguments and schedulerWorkerArguments:\n\n >>> from m01.remote.worker import SimpleJobWorker\n >>> worker = SimpleJobWorker(rp, waitTime=0.0)\n\nLet's now process the first job. We clear the log and we also have to end any\nexisting interactions in order to process the job in this thread:\n\n >>> logger.clear()\n\n >>> from zope.security import management\n >>> management.endInteraction()\n\n >>> worker.doProcessNextJob()\n True\n\n >>> print logger\n m01.remote INFO\n Job: 1\n\nLet's now use the worker from within the remote processor. Since the worker\nconstructors also accept additional arguments, they are specified as well:\n\n >>> rp.jobWorkerFactory = SimpleJobWorker\n >>> rp.jobWorkerFactory\n \n\n >>> rp.jobWorkerArguments\n {'waitTime': 0.0}\n\nThe wait time has been set to zero for testing purposes only. It is really set\nto 1 second by default. Let's now start processing jobs, wait a little bit\nfor all the jobs to complete and then stop processing again:\n\n >>> rp.startProcessor()\n >>> transaction.commit()\n\n >>> time.sleep(0.5)\n\n >>> rp.stopProcessor()\n >>> transaction.commit()\n\n >>> time.sleep(0.5)\n\nThe log shows that all jobs have been processed. But more importantly, they\nwere all completed in the order they were defined. Note the first job get\nprocessed before we started the remote processor. And yes this means a remote\nprocessor can process jobs if the queue is not started. Starting a remote\nprocessor only means that the job get processed as jobs without to do it\nmanualy.\n\n >>> print logger\n m01.remote INFO\n Job: 1\n m01.remote INFO\n Processor 'root-worker' started\n m01.remote INFO\n Job: 2\n m01.remote INFO\n Job: 3\n m01.remote INFO\n Job: 4\n m01.remote INFO\n Processor 'root-worker' stopped\n\n >>> logger.clear()\n\n\nTransactions in jobs\n--------------------\n\nWith the SimpleJobWorker, jobs _should_ not change the transaction status, since\nboth the administration of the jobs by the RemoteProcessor and the job itself\nrun in the same transaction, so aborting it from inside the job could mess up\nthe administrative part.\n\nThis is a regression test that aborting the transaction inside the job does not\nlead to an infinite loop (because SimpleJobWorker pulls the job inside the\ntransaction, so if it is aborted, the job remains on the queue):\n\n >>> testing.testCounter\n 0\n\n >>> counter = 0\n >>> data = {'counter': counter}\n >>> abortJob = testing.TransactionAbortJob(data)\n >>> rp.addJobFactory(u'abortJob', abortJob)\n >>> jobid = rp.addJob(u'abortJob', (1))\n >>> time.sleep(0.5)\n >>> jobid = rp.addJob(u'abortJob', (2))\n >>> transaction.commit()\n\n >>> rp.startProcessor()\n >>> transaction.commit()\n >>> time.sleep(0.5)\n\n >>> rp.stopProcessor()\n >>> transaction.commit()\n >>> time.sleep(0.5)\n\n >>> transaction.abort() # prevent spurious conflict errors\n >>> testing.testCounter\n 2\n\n >>> print logger\n m01.remote INFO\n Processor 'root-worker' started\n m01.remote INFO\n Job: 1\n m01.remote INFO\n Job: 2\n m01.remote INFO\n Processor 'root-worker' stopped\n\nReset test counter\n\n >>> testing.testCounter = 0\n\n\nMultiJobProcessor\n-----------------\n\nThe multi-threaded job worker executes several jobs at once. It was designed\nfor jobs that would take a long time but use very little processing power.\n\nLet's add a few new jobs to execute:\n\n >>> jobid1 = rp.addJob(u'sleep', (0.04, 1))\n >>> time.sleep(0.2)\n >>> jobid2 = rp.addJob(u'sleep', (1.0, 2))\n >>> time.sleep(0.2)\n >>> jobid3 = rp.addJob(u'sleep', (0, 3))\n >>> time.sleep(0.2)\n >>> jobid4 = rp.addJob(u'sleep', (0.2, 4))\n >>> time.sleep(0.2)\n >>> transaction.commit()\n\nBefore testing the worker in the remote processor, let's have a look at every\nmethod by itself. So we instantiate the worker:\n\n >>> from m01.remote.worker import MultiJobWorker\n >>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)\n\nThe maximum amount of threads can be set as well:\n\n >>> worker.maxThreads\n 2\n\nAll working threads can be reviewed at any time:\n\n >>> worker.threads\n []\n\n >>> from zope.security import management\n >>> management.endInteraction()\n\nLet's pull a new job:\n\n >>> job = worker.doPullNextJob()\n >>> job\n \n\nWe need to pull a job before executing it, so that the database marks the job\nas processing and no new thread picks up the same job. As you can see the job\nget marked with the processing status:\n\n >>> job.status\n u'processing'\n\nOnce we pulled a particular job, we can process it:\n\n >>> logger.clear()\n >>> print logger\n\n >>> worker.doProcessJob(job.__name__)\n\n >>> print logger\n m01.remote INFO\n Job: 1\n\nLet's now have a look at using the processor in the task service. This\nprimarily means setting the processor factory:\n\n >>> management.newInteraction()\n\n >>> rp.jobWorkerFactory = MultiJobWorker\n >>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2}\n >>> transaction.commit()\n\n >>> logger.clear()\n\nLet's now process the remaining jobs:\n\n >>> rp.startProcessor()\n >>> transaction.commit()\n >>> time.sleep(1.5)\n\n >>> rp.stopProcessor()\n >>> transaction.commit()\n >>> time.sleep(0.5)\n\nAs you can see, this time the jobs are not completed in order anymore, because\nthey all need different time to execute:\n\n >>> print logger\n m01.remote INFO\n Processor 'root-worker' started\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 3\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 4\n m01.remote INFO\n Job: 2\n m01.remote INFO\n Processor 'root-worker' stopped\n\nLet's now set the thread limit to four and construct a new set of jobs that\ndemonstrate that all jobs will run at the same time:\n\n >>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}\n\n >>> jobid1 = rp.addJob(u'sleep', (0.3, 1))\n >>> time.sleep(0.2)\n >>> jobid2 = rp.addJob(u'sleep', (0.4, 2))\n >>> time.sleep(0.2)\n >>> jobid3 = rp.addJob(u'sleep', (0.1, 3))\n >>> time.sleep(0.2)\n >>> jobid4 = rp.addJob(u'sleep', (0.5, 4))\n >>> time.sleep(0.2)\n >>> transaction.commit()\n\nIf all tasks are processed at once, job 3 should be done first. You can also\nsee that the job 4 get processed ASAP even before the worker logs processing:\n\n >>> logger.clear()\n\n >>> rp.startProcessor()\n >>> transaction.commit()\n\n >>> time.sleep(1.0)\n\n >>> rp.stopProcessor()\n >>> transaction.commit()\n >>> time.sleep(0.5)\n\n >>> print logger\n m01.remote INFO\n Processor 'root-worker' started\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 3\n m01.remote INFO\n Job: 1\n m01.remote INFO\n Job: 2\n m01.remote INFO\n Job: 4\n m01.remote INFO\n Processor 'root-worker' stopped\n\nLet's now set the thread limit to two and construct a new set of jobs that\ndemonstrate that not more than two threads run at the same time:\n\n >>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2}\n >>> transaction.commit()\n\n >>> jobid1 = rp.addJob(u'sleep', (0.3, 1))\n >>> time.sleep(0.2)\n >>> jobid2 = rp.addJob(u'sleep', (0.4, 2))\n >>> time.sleep(0.2)\n >>> jobid3 = rp.addJob(u'sleep', (0.2, 3))\n >>> time.sleep(0.2)\n >>> jobid4 = rp.addJob(u'sleep', (0.5, 4))\n >>> time.sleep(0.2)\n >>> transaction.commit()\n\nIf all tasks are processed at once, job 3 should be done first, but since the\njob has to wait for an available thread, it will come in third. We can now run\nthe jobs and see the result:\n\n >>> logger.clear()\n\n >>> rp.startProcessor()\n >>> transaction.commit()\n\n >>> time.sleep(1.5)\n\n >>> rp.stopProcessor()\n >>> transaction.commit()\n >>> time.sleep(0.5)\n\n >>> print logger\n m01.remote INFO\n Processor 'root-worker' started\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 1\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 2\n m01.remote INFO\n MultiJobWorker: processing job ...\n m01.remote INFO\n Job: 3\n m01.remote INFO\n Job: 4\n m01.remote INFO\n Processor 'root-worker' stopped\n\n=========\nScheduler\n=========\n\nThe scheduler concept is implemented as an additional scheduler container which\ncontains scheduler items.\n\n >>> from m01.mongo import UTC\n >>> import m01.remote.scheduler\n >>> from m01.remote import interfaces\n >>> from m01.remote import testing\n\nLet's now start by get our test remote procesor which contains our scheduler\ncontainer:\n\n >>> remoteProcessor = root\n >>> remoteProcessor\n \n\n >>> scheduler = remoteProcessor._scheduler\n\n >>> tuple(scheduler.values())\n ()\n\n\nDelay\n-----\n\nWe can add a scheduler item for delay a job processing. Let's add such an item:\n\n >>> import datetime\n >>> def getNextTime(dt, seconds):\n ... return dt + datetime.timedelta(seconds=seconds)\n\n >>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)\n >>> now10 = getNextTime(now, 10)\n >>> delay = 10\n >>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay,\n ... 'retryDelay': 5, 'nextCallTime': now10}\n >>> firstEcho = m01.remote.scheduler.Delay(data)\n >>> interfaces.IDelay.providedBy(firstEcho)\n True\n\nThe delay is set to 10:\n\n >>> firstEcho.delay\n 10\n\nand the retryDelay to 5\n\n >>> firstEcho.retryDelay\n 5\n\nand we set an explicit nextCallTime of now + 10:\n\n >>> firstEcho.nextCallTime == getNextTime(now, 10)\n True\n\nand our retryTime is None:\n\n >>> firstEcho.retryTime is None\n True\n\nNow we can add the delay item to the scheduler:\n\n >>> scheduler.add(firstEcho)\n u'...'\n\nAs you can see the scheduler contains on item:\n\n >>> sorted(scheduler.values())\n []\n\nAs next we'll test some scheduler AP methods. First check if we can update\nthe retryTime for an item in our adding cache with ``updateRetryTime``::\n\n >>> scheduler.updateRetryTime(firstEcho.dump(), now)\n False\n\nAs you can see we did not get a new retryTime. This happens because we didn't\nuse the correct callTime. Let's try with the correct nextCallTime:\n\n >>> now10 = getNextTime(now, 10)\n >>> now15 = getNextTime(now, 15)\n >>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10)\n >>> retryTime == now15\n True\n\nAs you can see the new retryTime is using the retryDelay of 5 second. This\nretryTime is used for lock an item. This means an item get not picked as long\nas this time get passed.\n\n\nNow let' try another internal API method hihc is able to get the next item\nfrom our adding cache:\n\n >>> scheduler.getNextCachedItem(now)\n\nAs you can see the method didn't return an item, let's try with the next\nscheduled call time:\n\n >>> nextCallTime = firstEcho.nextCallTime\n >>> scheduler.getNextCachedItem(now10)\n \n\nAs you can see the retryTime get set based on the nextCallTime and the\nretryDelay:\n\n >>> firstEcho.retryTime == getNextTime(nextCallTime, 5)\n True\n\nNow the important part. Let's test our method which is responsible for get\na next item including items from mongo. This method uses the two methods above.\nOf corse with the current time we will not get any item:\n\n >>> scheduler.pullNextSchedulerItem(now) is None\n True\n\nBut now we need another nextCallTime because the previous call update the \nitems nextCallTime. Let's first check the nextCallTime:\n\n >>> firstEcho.nextCallTime == now10\n True\n\nBut as you can see, the retryTime is already set during our previous test. this\nmeans we only will get an item if we at least use a larger time if the\nretryTime:\n\n >>> firstEcho.retryTime == now15\n True\n\n >>> scheduler.pullNextSchedulerItem(now10)\n\n >>> scheduler.pullNextSchedulerItem(now15)\n \n\nNow, let's check our scheduled item times:\n\n >>> now20 = getNextTime(now15, 5)\n >>> firstEcho.nextCallTime == now10\n True\n\nNote, our retryTime get calculated with the current call time and retryDelay.\nIt whould not make sense if we whould use the callTime as retryTime calculation\nbase:\n\n >>> firstEcho.retryTime == now20\n True\n\n\nThe method pullNextSchedulerItem returns a pending item or None since we don't\nhave one pending:\n\n >>> scheduler.pullNextSchedulerItem(now) is None\n True\n\nNow let's add a second scheduler item within some scheduler time:\n\n >>> import datetime\n >>> delay = 10\n >>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay,\n ... 'retryDelay': 5}\n >>> secondEcho = m01.remote.scheduler.Delay(data)\n\n >>> scheduler.add(secondEcho)\n u'...'\n\n >>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__))\n [, ]\n\n >>> scheduler.remove(firstEcho)\n >>> scheduler.remove(secondEcho)\n >>> tuple(scheduler.values())\n ()\n\n\n\nadjustCallTime\n--------------\n\nBefore we test our cron item, let's test test our method which can reset a\ngiven datetime to the smalles starting point e.g. if hours are given as a\ncalculation base, we need to start counting within the first minute:\n\n >>> from m01.remote.scheduler import adjustCallTime\n\n >>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)\n >>> now\n datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)\n\n >>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]})\n >>> adjustCallTime(item, now)\n datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)\n\nCron\n----\n\nA probably more interesting implementation is the cron scheduler item. This\ncron item can schedule jobs at a specific given time. Let's setup such a cron\nitem:\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5}\n >>> cronItem = m01.remote.scheduler.Cron(data)\n\nThe cronItem provides the ISchedulerItem and ICron interface:\n\n >>> interfaces.ISchedulerItem.providedBy(cronItem)\n True\n\n >>> interfaces.ICron.providedBy(cronItem)\n True\n\nAs you can see the cron item also provides a retryDelay:\n\n >>> cronItem.retryDelay\n 5\n\nLet's first explain how this works. The cron scheduler provides a next call\ntime stamp. If the calculated next call time is smaller then the last call time,\nthe cron scheduler item will calculate the new next call time and store them\nas nextCallTime and at the same time the previous nextCallTime get returnd.\nThis will makes sure that we have a minimum of time calculation calls because\neach time a cron scheduler item get asked about the next call time the stored\nnextCallTime is used. The cron schdeuler item only calculates the next call\ntime if the existing next call time is smaller then the given call time.\n\nNow let's test a cron as a scheduler item. Setup a simple corn item with a\n5 minute period.\n\n\n >>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)\n >>> now\n datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)\n\n >>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5,\n ... 'minute': [5], 'nextCallTime': now}\n >>> cronEcho = m01.remote.scheduler.Cron(data)\n\nNow add the item to the schdeuler:\n\n >>> scheduler.add(cronEcho)\n u'...'\n\nAs you can see, our cron item get scheduled based on the given nextCallTime:\n\n >>> cronEcho.nextCallTime\n datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)\n\nthe retrytime is empty\n\n >>> cronEcho.retryTime is None\n True\n\n\nand the minute list contains our 5 minute:\n\n >>> cronEcho.minute\n [5]\n\n >>> cronEcho.hour\n []\n\n >>> cronEcho.dayOfMonth\n []\n\n >>> cronEcho.month\n []\n\n >>> cronEcho.dayOfWeek\n []\n\nAnd the scheduler contains one cron item:\n\n >>> tuple(scheduler.values())\n (,)\n\nNow we can get the job based on the jobName ``echo`` defined by our cron\nscheduler item if we call pullNextSchedulerItem.\n\n >>> scheduler.pullNextSchedulerItem(now)\n \n\nDuring this call the retryTime get set based on the retryDelay:\n\n >>> cronEcho.retryTime\n datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)\n\nNow let's test the the different cron settings. Note that we provide a list of\nvalues for minutes, hours, month, dayOfWeek and dayOfMonth. This means you can\nschedule a job for every 15 minutes if you will set the minutes to\n(0, 15, 30, 45) or if you like to set a job only each 15 minutes after an hour\nyou can set minutes to (15,). If you will set more then one argument e.g.\nminute, hours or days etc. all arguments must fit the given time.\n\nLet's start with a cron scheduler for every first and second minute per hour.\nNormaly the corn scheduler item will set now ``int(time.time())`` as\nnextCallTime value. For test our cron scheduler items, we use a explicit\nstartTime value of 0 (zero):\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'minute': [0, 1]}\n >>> cronItem = m01.remote.scheduler.Cron(data)\n\nThe next call time is set based on the given startTime value. This means the\nfirst call will be at 0 (zero) minute:\n\n >>> cronItem.nextCallTime is None\n True\n\nNow let's call getNextCallTime, as you can see we will get None as nextCallTime\nbecause we ddn't set a nextCallTime during cron initialization and the\nnextCallTime is set to the next minute:\n\n >>> cronItem.getNextCallTime(now) is None\n True\n\n >>> cronItem.nextCallTime\n datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)\n\nNow let's call getNextCallTime again, as you can see we will get the\nnextCallTime we calculated during object initialization which is the given\ncall time and the nextCallTime is set to the next minute:\n\nIf we use a call time + 5 seconds, we still will get the cached next call\ntime of 1 minute and we will not generate a new next call time since this\ntime is already in the future:\n\n >>> cronItem.getNextCallTime(getNextTime(now, 5))\n datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)\n\n >>> cronItem.nextCallTime\n datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)\n\nIf we call the cron scheduler item with a call time equal or larger then our\n1 minute delay from the cached next call time, we will get the cached call time\nas value as we whould get similar to a smaller call time (see sample above).\n\n >>> cronItem.getNextCallTime(getNextTime(now, 65))\n datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)\n\n >>> cronItem.nextCallTime\n datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)\n\nAll future calls with a smaller time then the nextCallTime will return the \ncurrent nextCallTime and not calculate any new time.\n\n >>> cronItem.getNextCallTime(getNextTime(now, 125))\n datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)\n\n >>> cronItem.getNextCallTime(getNextTime(now, 1*60*60))\n datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)\n\n\nRemember, getNextCallTime returns the previous calculated nextCallTime and the\nnew calculated nextCallTime get stored as nextCallTime. For a simpler test\noutput we define a test method which shows the time calculation:\n\n\nMinutes\n~~~~~~~\n\nLet's start testing the time tables.\n\n >>> def getNextCallTime(cron, dt, seconds=None):\n ... \"\"\"Return stored and new calculated nextCallTime\"\"\"\n ... if seconds is None:\n ... callTime = dt\n ... else:\n ... callTime = getNextTime(dt, seconds)\n ... nextCallTime = cron.getNextCallTime(callTime)\n ... return '%s --> %s' % (nextCallTime, cron.nextCallTime)\n\n >>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC)\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'minute': [0, 10], 'nextCallTime':now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> str(now)\n '1970-01-01 00:03:00+00:00'\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'\n\n >>> getNextCallTime(item, now, 1)\n '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'\n\n >>> getNextCallTime(item, now, 2*60)\n '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'\n\n >>> getNextCallTime(item, now, 51*60)\n '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'\n\n >>> getNextCallTime(item, now, 55*60)\n '1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'\n\n\nHour\n~~~~\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'hour': [2, 13], 'nextCallTime':now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'\n\n >>> getNextCallTime(item, now, 2*60*60)\n '1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'\n\n >>> getNextCallTime(item, now, 4*60*60)\n '1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'\n\n >>> getNextCallTime(item, now, 13*60*60)\n '1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'\n\n >>> getNextCallTime(item, now, 15*60*60)\n '1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'\n\n\nMonth\n~~~~~\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'month': [1, 2, 5, 12], 'nextCallTime':now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'\n\n >>> getNextCallTime(item, now, 90*24*60*60)\n '1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'\n\n >>> getNextCallTime(item, now, 120*24*60*60)\n '1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'\n\n >>> getNextCallTime(item, now, 130*24*60*60)\n '1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'\n\n >>> getNextCallTime(item, now, 360*24*60*60)\n '1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'\n\n\ndayOfWeek [0..6]\n~~~~~~~~~~~~~~~~\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now}\n >>> item = m01.remote.scheduler.Cron(data)\n\nThe current weekday of now is:\n\n >>> now.weekday()\n 3\n\nthis means our nextCallTime should get changed using day 4 as our \nnextCallTime if we call them with ``now``:\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'\n\nwith a day more, we will get the weekday 4 (skip):\n\n >>> getNextCallTime(item, now, 24*60*60)\n '1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'\n\nwith another day more, we will get the weekday 5 (incr):\n\n >>> getNextCallTime(item, now, 2*24*60*60)\n '1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'\n\nwith another day more, we will get the weekday 6 (skip):\n\n >>> getNextCallTime(item, now, 3*24*60*60)\n '1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'\n\nwith another day more, we will get the weekday 0 (inc):\n\n >>> getNextCallTime(item, now, 4*24*60*60)\n '1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'\n\n\ndayOfMonth [1..31]\n~~~~~~~~~~~~~~~~~~\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'\n\n >>> getNextCallTime(item, now, 12*24*60*60)\n '1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'\n\n >>> getNextCallTime(item, now, 31*24*60*60)\n '1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'\n\n\nCombined\n~~~~~~~~\n\ncombine some attributes:\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'minute': [10], 'dayOfMonth': [1, 10, 20, 30],\n ... 'nextCallTime': now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'\n\n >>> getNextCallTime(item, now, 10*60)\n '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'\n\n >>> getNextCallTime(item, now, 10*24*60*60)\n '1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'\n\n >>> getNextCallTime(item, now, 20*24*60*60)\n '1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'\n\nanother sample:\n\n >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,\n ... 'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30],\n ... 'nextCallTime': now}\n >>> item = m01.remote.scheduler.Cron(data)\n\n >>> getNextCallTime(item, now)\n '1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'\n\n >>> getNextCallTime(item, now, 10*60)\n '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'\n\n >>> getNextCallTime(item, now, 4*60*60)\n '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'\n\n >>> getNextCallTime(item, now, 5*60*60)\n '1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'\n\n\n=======\nCHANGES\n=======\n\n3.0.0 (2015-11-10)\n------------------\n\n- support pymongo >= 3.0.0 and use 3.0.0 as package version and reflect\n pymongo >= 3.0.0 compatibility\n\n\n0.6.0 (2013-06-28)\n------------------\n\n- feature: implemented JobError as Job sub item. And rename previous JobError\n to RemoteException. This changes requires that you delete all previous\n JobError jobs in the job list before update. Also raise RemoteException\n instead of JobError in your code. The new JobError sub item provides a\n better error traceback message and a created date.\n\n- feature: implement better error handling, save formatted traceback string\n\n\n0.5.1 (2012-11-18)\n------------------\n\n- added MANIFEST.in files\n\n- remove p01.i18n package dependency\n\n- allow to remove jobs with all stati\n\n- split scheduler and container and move scheduler part into mixin class\n\n- switch to bson import\n\n- reflect changes in getBatchData signature\n\n- fix dateime compare, round milliseconds\n\n- adjust different schema description, user the same message id as used in title\n\n- removed unused id\n\n\n0.5.0 (2011-08-19)\n------------------\n\n- initial release", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "http://pypi.python.org/pypi/m01.remote", "keywords": "Zope3 z3c p01 m01 remote processor queue mongodb", "license": "ZPL 2.1", "maintainer": null, "maintainer_email": null, "name": "m01.remote", "package_url": "https://pypi.org/project/m01.remote/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/m01.remote/", "project_urls": { "Download": "UNKNOWN", "Homepage": "http://pypi.python.org/pypi/m01.remote" }, "release_url": "https://pypi.org/project/m01.remote/3.0.0/", "requires_dist": null, "requires_python": null, "summary": "Remote processing queue for Zope3", "version": "3.0.0" }, "last_serial": 1811433, "releases": { "0.5.0": [ { "comment_text": "", "digests": { "md5": "0589dfc9ad9d16b10700b694ce0a2e8f", "sha256": "37a38d542e504e8cd25ff3eede85562e8dc6079c1e170a4e9e9de5f5b4cfae7c" }, "downloads": -1, "filename": "m01.remote-0.5.0.zip", "has_sig": false, "md5_digest": "0589dfc9ad9d16b10700b694ce0a2e8f", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 64184, "upload_time": "2011-08-19T04:05:59", "url": "https://files.pythonhosted.org/packages/e1/18/9a1fe023974f0fafaaec9f0f79605c0dbab564573f8e1c503addbc2c581a/m01.remote-0.5.0.zip" } ], "0.5.1": [ { "comment_text": "", "digests": { "md5": "54693cb1f592a7b99a51ff7e29b50f79", "sha256": "f3224e6a3eb1caf7ce24bb908bef73099569b2f2214c027cfd53a839f231a7f4" }, "downloads": -1, "filename": "m01.remote-0.5.1.zip", "has_sig": false, "md5_digest": "54693cb1f592a7b99a51ff7e29b50f79", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 65388, "upload_time": "2012-11-18T17:18:35", "url": "https://files.pythonhosted.org/packages/cf/3b/b200c20c9fe3571aa35eaf7283fc837f2d4b57a8d6e11d8f6e5a61dafe6b/m01.remote-0.5.1.zip" } ], "0.6.0": [ { "comment_text": "", "digests": { "md5": "df3872822756b3778402605dbc2e745d", "sha256": "c70cfdaad040d319fe558ce44c040f0bd284f50a7da492c64f00891485b5e23b" }, "downloads": -1, "filename": "m01.remote-0.6.0.zip", "has_sig": false, "md5_digest": "df3872822756b3778402605dbc2e745d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 66453, "upload_time": "2013-06-28T15:38:52", "url": "https://files.pythonhosted.org/packages/f6/3f/3ada97a3327ff4dedfe524ec1bbedb158c3aa922e6ae99adeeaa168fe46a/m01.remote-0.6.0.zip" } ], "3.0.0": [ { "comment_text": "", "digests": { "md5": "86d4c3febb7762b467a6c0561fa52538", "sha256": "d7fa107b72a17164ed53d6ea7558718ff40ec44d8e4f0d29bffae7eb8af64267" }, "downloads": -1, "filename": "m01.remote-3.0.0.zip", "has_sig": false, "md5_digest": "86d4c3febb7762b467a6c0561fa52538", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 68523, "upload_time": "2015-11-11T12:57:49", "url": "https://files.pythonhosted.org/packages/b8/7e/79d9c86a988017d46e1bfd5bd615bad6e9e1dd00eb2b084406cf8fd5819e/m01.remote-3.0.0.zip" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "86d4c3febb7762b467a6c0561fa52538", "sha256": "d7fa107b72a17164ed53d6ea7558718ff40ec44d8e4f0d29bffae7eb8af64267" }, "downloads": -1, "filename": "m01.remote-3.0.0.zip", "has_sig": false, "md5_digest": "86d4c3febb7762b467a6c0561fa52538", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 68523, "upload_time": "2015-11-11T12:57:49", "url": "https://files.pythonhosted.org/packages/b8/7e/79d9c86a988017d46e1bfd5bd615bad6e9e1dd00eb2b084406cf8fd5819e/m01.remote-3.0.0.zip" } ] }