{ "info": { "author": "Dimitris Papaspyros", "author_email": "dimitris@orfium.com", "bugtrack_url": null, "classifiers": [], "description": "sqspipes\n========\n\nA multi-worker pipe mechanism that uses AWS SQS.\n\nInstructions\n------------\n\n1. Install the latest version of the package: ``pip install sqspipes``\n\n2. Create a client\n\n .. code:: python\n\n from sqspipes import TaskClient\n client = TaskClient(\n domain='my-app',\n aws_key='YOUR_AWS_KEY',\n aws_secret='YOUR_AWS_SECRET',\n aws_region='us-west-2'\n )\n\n Make sure that the ``aws_key`` provided has full access to the SQS\n service, since it needs to be able to create & delete queues.\n\n Also ensure that the ``aws_region`` provided is either ``us-west-2``\n or ``us-east-2``, since other regions do not support FIFO queues\n which are used by this package.\n\n3. Define the tasks you may have:\n\n .. code:: python\n\n import os\n import sys\n import random\n import string\n import time\n\n def _generate(max_size):\n return ''.join(random.choice(string.ascii_lowercase) for _ in range(random.randint(1, max_size)))\n\n\n def _reduce(value, keep='vowels'):\n vowels = ['a', 'e', 'i', 'o', 'u', ]\n\n result = [v for v in value if (v in vowels) == (keep == 'vowels')]\n\n return value, ''.join(result)\n\n\n def _count(data):\n value, vowels = data\n\n return value, len(vowels)\n\n In this example we have a simple flow that looks like this:\n\n generate word -> reduce word to only its vowels -> count the reduced\n word\n\n This is similar to a map-reduce algorithm, however using this module\n you might have many layers where each transforms the original data in\n a different way. These layers (``tasks``) are then combined like bash\n pipes, where the output from a task is the input to the next one.\n\n Notice the few things:\n\n 1) The first argument of each ``task`` is going to be fed with the\n output from the previous one, with the obvious exception of the\n first task.\n\n 2) The output of each task should be json serializable.\n\n 3) You may return ``None`` from a task if you do not want it to\n continue further in the processing line. This could be done e.g\n because your tasks are picked from a database, so you could return\n ``None`` if that database is empty. If for any reason you want to\n process ``None`` like a normal task output/input, you can pass\n ``ignore_none=False`` as a parameter to the ``TaskClient``\n constructor. In that case, you can use the following to return an\n empty task output.\n\n .. code:: python\n\n from sqspipes import EmptyTaskOutput\n\n def my_task()\n # your task's logic here\n\n return EmptyTaskOutput() # for some reason, None is a valid task output\n\n # later in your code...\n\n TaskClient(\n domain='my-app',\n aws_key='YOUR_AWS_KEY',\n aws_secret='YOUR_AWS_SECRET',\n aws_region='us-west-2',\n ignore_none=False\n )\n4. Register the tasks\n\n Now that you have created the various ``tasks``, you simply have to\n define their order & other runtime parameters, like this:\n\n .. code:: python\n\n client.register_tasks([\n {'method': _generate, 'workers': 32, 'interval': 0.1},\n {'method': _reduce, 'workers': 2},\n {'method': _count, 'workers': 16}\n ])\n\n The following keys are supported for each task:\n\n ::\n\n `method`:\n A callable object. This is the function that will actually be executed.\n For all tasks except for the first one, the first argument of this method\n will be the result of the previous task's method.\n\n `name`:\n The name of this tasks.\n If no name is provided, the method's name is automatically used.\n\n `workers`:\n The number of worker threads that will be processing messages in parallel.\n Defaults to 1.\n\n `priorities`:\n The number of different priority levels, where 0 is the lowest possible priority.\n Defaults to 1, maximum value is 16.\n\n `interval`:\n Only applies to the first task.\n Number of seconds to wait between each execution.\n Can either be an number, or a callable that returns an number (e.g `lambda: random.random() * 5`)\n Defaults to 0.\n\n5. Execute the tasks\n\n A script that would execute the tasks we described would look like\n this:\n\n .. code:: python\n\n # script.py file\n import sys\n\n def generate(workers):\n for res in client.run('_generate', args=(10, ), iterate=True, workers=workers):\n print(res)\n\n\n def reduce(workers):\n for res in client.run('_reduce', iterate=True, workers=workers):\n print('%s -> %s' % res)\n\n\n def count(workers):\n for result in client.run('_count', iterate=True, workers=workers):\n print('%s -> %d' % result)\n\n\n try:\n n_workers = int(sys.argv[2])\n except ValueError:\n n_workers = None\n\n try:\n if sys.argv[1] == 'generate':\n generate(n_workers)\n elif sys.argv[1] == 'reduce':\n reduce(n_workers)\n elif sys.argv[1] == 'count':\n count(n_workers)\n else:\n raise ValueError('Invalid argument: must be one of generate, reduce or count')\n except IndexError:\n raise ValueError('Script argument is required')\n\n In this example, we have a script which, based on the provided\n argument, executes one of the three tasks defined in the previous\n step. Notice that you can have the following setup:\n\n 1. A machine M1 running the command ``python script.py generate 8``\n that would create 8 workers which would submit new words for\n processing.\n\n 2. A machine M2 running the command ``python script.py reduce 16``\n that would create 16 workers that would reduce words only to their\n vowels.\n\n 3. A machine in this example could be a different node (VM, physical\n computer etc.), but tasks could of course run on the same\n infrastructure as well.\n\n 4. An unhandled exception on one of the tasks will bring down the\n entire task runner. This is intentional, since otherwise if\n unhandled exceptions were \u201cswallowed\u201d, it would be much harder to\n debug issues, or even identify and track down those \u201clost\u201d\n packages. It is up to you to handle any exceptions you want in any\n possible manner.", "description_content_type": "", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/dipapaspyros/sqspipes", "keywords": "", "license": "LICENSE", "maintainer": "", "maintainer_email": "", "name": "sqspipes", "package_url": "https://pypi.org/project/sqspipes/", "platform": "", "project_url": "https://pypi.org/project/sqspipes/", "project_urls": { "Homepage": "https://github.com/dipapaspyros/sqspipes" }, "release_url": "https://pypi.org/project/sqspipes/0.1.3/", "requires_dist": null, "requires_python": "", "summary": "A multi-worker pipe mechanism that uses AWS SQS", "version": "0.1.3" }, "last_serial": 4563207, "releases": { "0.1.2": [ { "comment_text": "", "digests": { "md5": "8fab6e9937550213e46cf805a6044623", "sha256": "56665f3604eb6c63bbd9ece6a810210f7b4a0c7a10ac0d47bc9914c2dcd062e3" }, "downloads": -1, "filename": "sqspipes-0.1.2.tar.gz", "has_sig": false, "md5_digest": "8fab6e9937550213e46cf805a6044623", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 9264, "upload_time": "2018-07-27T15:42:09", "url": "https://files.pythonhosted.org/packages/32/2b/bd92853008b272026e418dbe596f00e09d6d03970dd9ed3e1a2849a569c8/sqspipes-0.1.2.tar.gz" } ], "0.1.3": [ { "comment_text": "", "digests": { "md5": "64e491a99fb72b777acd0eeadd4cf374", "sha256": "c1300255c13c92563838cb8bfff1e036a119edea58a9562ed293a37bc9d3ae26" }, "downloads": -1, "filename": "sqspipes-0.1.3.tar.gz", "has_sig": false, "md5_digest": "64e491a99fb72b777acd0eeadd4cf374", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 9379, "upload_time": "2018-12-05T10:12:26", "url": "https://files.pythonhosted.org/packages/9e/3d/47c17441b3056d7b68c17778b3ebd1cf7e2786d2b98fe2d38196df7c6b85/sqspipes-0.1.3.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "64e491a99fb72b777acd0eeadd4cf374", "sha256": "c1300255c13c92563838cb8bfff1e036a119edea58a9562ed293a37bc9d3ae26" }, "downloads": -1, "filename": "sqspipes-0.1.3.tar.gz", "has_sig": false, "md5_digest": "64e491a99fb72b777acd0eeadd4cf374", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 9379, "upload_time": "2018-12-05T10:12:26", "url": "https://files.pythonhosted.org/packages/9e/3d/47c17441b3056d7b68c17778b3ebd1cf7e2786d2b98fe2d38196df7c6b85/sqspipes-0.1.3.tar.gz" } ] }