{ "info": { "author": "Domen Ko\u017ear, Aaron McMillin", "author_email": "aaron@mcmillinclan.org", "bugtrack_url": null, "classifiers": [], "description": ".. highlight:: python\n\n.. currentmodule:: capillary\n\nCapillary\n=========\n\n:Status: Alpha (API feedback is welcome, API could break compatibility)\n:Generated: |today|\n:Version: |release|\n:License: BSD 3 Clause\n:Authors: Domen Ko\u017ear and Aaron McMillin\n\n\n.. topic:: Introduction\n\n :mod:`capillary` is a small integration package for\n the :mod:`celery` Distributed Task Queue with an aim of designing\n `workflows (Canvas) `_\n in `a declarative manner `_\n using Python decorators.\n\n The main reason why :mod:`capillary` exists is to get rid of manual\n tracking how celery tasks depend on each other.\n\n :mod:`capillary` executes in two phases:\n\n 1. Scans for all Celery tasks in defined Python packages\n 2. Executes tasks based on metadata passed to :func:`@pipeline` decorators\n\n :mod:`capillary` uses:\n\n - :mod:`venusian` to discover Celery tasks using deferred decorators\n - :mod:`networkx` to handle the graph operations for tracking\n dependencies between tasks in the workflow\n\n\n\nUser Guide\n==========\n\n.. _simple-example:\n\nSimple Example\n--------------\n\nBelow is the `first steps with Celery `_\ntutorial expanded with :mod:`capillary` integration.\n\nThe ``tasks.py`` module contains the steps of the pipeline. Steps are\nmarked with the ``@pipeline`` decorator, which has optional parameters\nto indicate that this step must be executed after some other step or\nthat the step has certain tags to allow groups of tasks to be executed together.\n\n.. code:: python\n\n from capillary import pipeline\n\n @pipeline()\n def foo(celery_task):\n return 'simple task flow'\n\n @pipeline(after='foo')\n def bar(celery_task, l):\n return l.upper()\n\nThe ``myapp.py`` module then creates a :class:`PipelineConfigurator`\ninstance which will assemble the declared steps:\n\n.. code:: python\n\n from celery import Celery\n from capillary import PipelineConfigurator\n\n import tasks\n\n app = Celery('tasks', broker='redis://', backend='redis://')\n pc = PipelineConfigurator(app)\n pc.scan(tasks)\n\nStart the worker with\n\n.. code:: bash\n\n $ celery worker -A myapp -D\n\nand execute the pipeline in a Python shell:\n\n.. code:: python\n\n >>> from myapp import pc\n >>> asyncresult = pc.run()\n >>> asyncresult.get()\n SIMPLE TASK FLOW\n\n\n*This example will be used throughout the user guide as a base.*\n\n.. note::\n\n This example assumes the Redis broker is running with default settings, but any\n `Celery broker `_\n will do.\n\n ``backend`` is defined only for retrieving the result using `.get()`, it is\n otherwise not required.\n\n\nCore concept: Handling input and output parameters\n--------------------------------------------------\n\nCelery uses a concept called `partials `_\n(sometimes also known as `Currying `_) to\ncreate function `signatures `_.\n\n:mod:`capillary` reuses these concepts to execute tasks. A value returned\nfrom task ``foo`` is passed into task ``bar``.\n\nIt is possible to pass extra parameters to specific tasks as described in\n:ref:`extra-parameters`.\n\n\nCore concept: Tagging pipelines\n-------------------------------\n\nBy default :meth:`PipelineConfigurator.run` will execute all scanned tasks\nwithout tags in topological order.\n\nIf ``tags=['foobar']`` is passed to :func:`@pipeline`, the task will be run\nwhen ```tagged_as=['foobar']`` is passed to :meth:`PipelineConfigurator.run`.\n\nSee :ref:`predefined_defaults` for information on how to reduce boilerplate and\ngroup pipelines per tag.\n\n\nAborting the Pipeline\n---------------------\n\nIf a step needs to stop the current pipeline (meaning no further tasks\nare processed in the pipeline), just raise :exc:`capillary.AbortPipeline`\nanywhere in your pipeline tasks.\n\n.. _extra-parameters:\n\nPassing extra parameters to a specific task\n-------------------------------------------\n\nSome :func:`@pipeline` elements might require extra arguments that are only\nknown when :meth:`PipelineConfigurator.run` is called.\n\n.. code:: python\n\n >>> @pipeline(\n ... required_kwarg_names=['param'],\n ... )\n ... def foobar(celery_task, param=None):\n ... print param\n ... return 'simple task flow'\n\nWhen :meth:`PipelineConfigurator.run` is called, it will need `param` passed inside\n`required_kwargs`; otherwise :exc:`MissingArgument` will be thrown.\n\n\nApplying multiple :func:`@pipeline` decorators\n----------------------------------------------\n\nThe most typical use case where two :func:`@pipeline` decorators are useful is when\nyou'd like to reuse a function for two different pipelines each differently tagged.\n\n.. code:: python\n\n @pipeline(\n after=['first', 'second'],\n tags=['some_pipeline'],\n )\n @pipeline(\n after=['third'],\n tags=['other_pipeline'],\n )\n def foobar(celery_task):\n return 'simple task flow'\n\n\nExecuting ``ConfigurePipeline.run(tagged_as=['some_pipeline'])``\nwould run the `foobar` function as a task after `first` and `second` tasks were done.\n\nHowever executing ``ConfigurePipeline.run(tagged_as=['other_pipeline'])``\nwould run the `foobar` function after `third` task was done.\n\n.. note::\n\n If both tags are used (e.g. ``ConfigurePipeline.run(tagged_as=['some_pipeline', 'other_pipeline'])``)\n then ordering of tags specified matters and the latter will override a former.\n\n if you specify a different `name` parameter for each, they will be both executed.\n\n\n.. _predefined_defaults:\n\nCreate pipelines based on predefined defaults\n---------------------------------------------\n\nOften :func:`@pipeline` definitions will repeat arguments through your\napplication. :func:`make_pipeline_from_defaults` allows you to create customized\npredefined defaults for a pipeline. This example makes a ``foobar_pipeline``\ndecorator that will apply the same tag to each step:\n\n.. code:: python\n\n >>> from capillary import make_pipeline_from_defaults\n >>> foobar_pipeline = make_pipeline_from_defaults(\n >>> tags=[\"foobar\"]\n >>> )\n\n\nThen use ``@foobar_pipeline`` just as one would use :func:`@pipeline` while all your\ndefinitions will have `foobar` as a tag.\n\n.. note::\n\n Passing ``tags`` to ``@foobar_pipeline`` will override ``[\"foobar\"]`` value.\n\n\nPrinting the task tree\n----------------------\n\nTo actually see what kind of canvas will be executed call\n:meth:`ConfigurePipeline.prettyprint` with the same arguments as\n:meth:`ConfigurePipeline.run`\n\n.. code:: python\n\n >>> pc.prettyprint(args=[], kwargs={})\n tasks.foo() | tasks.bar()\n\n\nThe very last task in the pipeline\n----------------------------------\n\nUsing a constant :class:`capillary.ALL` it's possible to declare a task\nas the last one in the pipeline\n\n.. code:: python\n\n >>> from capillary import ALL, pipeline\n >>> @pipeline(\n ... after=ALL,\n ... )\n ... def last(celery_task, obj):\n ... print('ALL DONE!')\n ... return obj\n\n\n.. note::\n\n Multiple tasks with `after=ALL` steps will be run\n in :class:`celery.group` as the last part of the pipeline.\n\n\nInner workings of :meth:`~PipelineConfigurator.run()`\n-----------------------------------------------------\n\nThe following is a quick summary of what happens inside :meth:`~PipelineConfigurator.run()`:\n\n- task tree is generated using dependency information\n- Celery signatures are created\n- task tree is reduced into a `chain `_\n using topological sort\n- tasks is executed using :meth:`celery.app.Task.apply_async`\n\n.. note::\n\n Currently the task tree is reduced into a linear chained list of tasks, but\n in future different \"runners\" could be implemented.\n\n\nUnit Testing\n------------\n\nFunctions marked as :func:`@pipeline` elements are still just simple untouched functions,\nuntil :meth:`PipelineConfigurator.scan()` is called. If function code doesn't\ndepend on the first argument of ``celery_task``, just pass `None` as the value.\n\nTo unit test our two pipeline elements from :ref:`simple-example`:\n\n.. code:: python\n\n class PipelineTestCase(unittest.TestCase):\n\n def test_bar(self):\n self.assertEquals(bar(None, 'test'), 'TEST')\n\n def test_foo(self):\n self.assertEquals(foo(None), 'simple task flow')\n\n\nDevelopment\n===========\n\nTo run tests install `py.test` and run it:\n\n.. code:: bash\n\n $ py.test tests/\n\n\nFeatures to be considered\n-------------------------\n\n- Using a lot of tasks with large objects passed as arguments can be quite\n storage intensive. One alternative would be to generate signatures on-the-fly\n if Celery permits that.\n\n\nAPI Reference\n=============\n\n.. automodule:: capillary\n :members:\n :exclude-members: PipelineConfigurator\n\n.. autoclass:: capillary.PipelineConfigurator\n :members: run, prettyprint, scan\n", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/celery-capillary/capillary", "keywords": null, "license": "UNKNOWN", "maintainer": null, "maintainer_email": null, "name": "capillary", "package_url": "https://pypi.org/project/capillary/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/capillary/", "project_urls": { "Download": "UNKNOWN", "Homepage": "https://github.com/celery-capillary/capillary" }, "release_url": "https://pypi.org/project/capillary/0.0.1/", "requires_dist": null, "requires_python": null, "summary": "Declarative workflows for celery.", "version": "0.0.1" }, "last_serial": 2179752, "releases": { "0.0.1": [ { "comment_text": "", "digests": { "md5": "20c9caaf2e8a5443472e7b355d187203", "sha256": "630cb0b31304a4ec0b4468a33426dd0eac7bac6e7fa60503727ed1a4cd40f4f5" }, "downloads": -1, "filename": "capillary-0.0.1.tar.gz", "has_sig": false, "md5_digest": "20c9caaf2e8a5443472e7b355d187203", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11066, "upload_time": "2016-06-21T22:01:03", "url": "https://files.pythonhosted.org/packages/6f/60/f72c7f9fd1c6d0e61777f378d1b91500c59b843f476fd3020379fbd11eae/capillary-0.0.1.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "20c9caaf2e8a5443472e7b355d187203", "sha256": "630cb0b31304a4ec0b4468a33426dd0eac7bac6e7fa60503727ed1a4cd40f4f5" }, "downloads": -1, "filename": "capillary-0.0.1.tar.gz", "has_sig": false, "md5_digest": "20c9caaf2e8a5443472e7b355d187203", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11066, "upload_time": "2016-06-21T22:01:03", "url": "https://files.pythonhosted.org/packages/6f/60/f72c7f9fd1c6d0e61777f378d1b91500c59b843f476fd3020379fbd11eae/capillary-0.0.1.tar.gz" } ] }