{ "info": { "author": "hampsterx", "author_email": "tim.vdh@gmail.com", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7" ], "description": "# async-kinesis\n\n[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/python/black)\n\n\n## Features\n\n- uses queues for both producer and consumer\n - producer flushes with put_records() if has enough to flush or after \"buffer_time\" reached\n - consumer iterates over msg queue independent of shard readers\n- Configurable to handle Sharding limits but will throttle/retry if required\n - ie multiple independent clients are saturating the Shards\n- Checkpointing with heartbeats\n - deadlock + reallocation of shards if checkpoint fails to heartbeat within \"session_timeout\"\n\n## Consumer Design\n\n(Bears some explanation, kinda complex~)\n\n- fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))\n - iterate over the list of shards (set on startup, does not currently detect resharding)\n - assign shard if not in use and not at \"max_shard_consumers\" limit otherwise ignore/continue\n - ignore/continue if this shard is still fetching\n - process records if shard is done fetching\n - put records on queue\n - add checkpoint record to queue\n - assign NextShardIterator\n - create (get_records()) task again\n\nNote that get_records() is throttled via \"shard_fetch_rate=5\" (ie the same 0.2 sec/ 5x limit)\n\nThis pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.\n\n\n## Not Implemented\n\n- resharding\n- client rebalancing (ie share the shards between consumers)\n\nSee also\n\nhttps://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/\n\n\n## Producer\n \n async with Producer(stream_name=\"test\") as producer:\n # Put item onto queue to be flushed via put_records()\n await producer.put({'my': 'data'})\n\n\nOptions:\n\n(comments in quotes are Kinesis Limits as per AWS Docs)\n\n* region_name\n > AWS Region\n\n* buffer_time=0.5\n > Buffer time in seconds before auto flushing records\n \n* put_rate_limit_per_shard=1000\n > \"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes\n\n* batch_size=500\n > \"Each PutRecords request can support up to 500 records\" \n\n* max_queue_size=10000\n > put() method will block when queue is at max \n \n* after_flush_fun\n > async function to call after doing a flush (err put_records()) call\n \n\n## Consumer\n\n async with Consumer(stream_name=\"test\") as consumer:\n while True:\n async for item in consumer:\n print(item)\n # caught up.. take a breather~\n\n\nOptions:\n\n(comments in quotes are Kinesis Limits as per AWS Docs)\n\n* region_name\n > AWS Region\n\n* max_queue_size=1000\n > the fetch() task shard \n\n* max_shard_consumers=None\n > Max number of shards to use. None = all\n \n* record_limit=10000\n > Number of records to fetch with get_records()\n\n* sleep_time_no_records=2\n > No of seconds to sleep when caught up\n \n* iterator_type=\"TRIM_HORIZON\"\n > Default shard iterator type for new/unknown shards (ie start from start of stream)\n > Alternative is \"LATEST\" (ie end of stream)\n\n* shard_fetch_rate=5\n > No of fetches per second (max = 5) \n\n* checkpointer=None\n > Checkpointer to use\n\n\n## Checkpointers\n\n- memory\n- redis\n\n\n## Yet another Python Kinesis Library?\n\nSadly I had issues with every other library I could find :(\n\n* https://github.com/NerdWalletOSS/kinesis-python\n * pro:\n * kinda works\n * con\n * threaded\n * Outstanding PR to fix some issues\n * checkpoints on every record on main thread\n\n* https://github.com/ungikim/kinsumer\n * pro:\n * handles shard changes\n * no producer\n * no redis checkpointer/heartbeat\n * threaded/seems kinda complicated~\n * con\n * consumer only\n \n* https://github.com/bufferapp/kiner\n * pro:\n * Batching\n * con\n * Producer only\n\n* https://github.com/niklio/aiokinesis\n * pro:\n * asyncio\n * no checkpointing\n * con\n * limited to 1 shard / too simplistic\n\n* https://github.com/ticketea/pynesis\n * pro:\n * checkpoints\n * con\n * hasn't been updated for 1 year\n * doesnt use put_records()\n * single threaded / round robin reads shards\n\n* https://github.com/whale2/async-kinesis-client\n * pro:\n * checkpoints\n * asyncio\n * con\n * ?\n\n(Actually I only found this one recently, might be ok alternative?)", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/hampsterx/async-kinesis", "keywords": "", "license": "Apache2", "maintainer": "", "maintainer_email": "", "name": "py-kinesis", "package_url": "https://pypi.org/project/py-kinesis/", "platform": "", "project_url": "https://pypi.org/project/py-kinesis/", "project_urls": { "Homepage": "https://github.com/hampsterx/async-kinesis" }, "release_url": "https://pypi.org/project/py-kinesis/0.0.1/", "requires_dist": null, "requires_python": "", "summary": "AsyncIO Kinesis Library", "version": "0.0.1" }, "last_serial": 5294777, "releases": { "0.0.1": [ { "comment_text": "", "digests": { "md5": "3aa4bf2a42be707a60004222ca7c6e29", "sha256": "8abdf988df8f6735c4d00c7aa106bd85f2423df74179cc7ddda2c9a48c8c948e" }, "downloads": -1, "filename": "py_kinesis-0.0.1-py3.7.egg", "has_sig": false, "md5_digest": "3aa4bf2a42be707a60004222ca7c6e29", "packagetype": "bdist_egg", "python_version": "3.7", "requires_python": null, "size": 22931, "upload_time": "2019-05-21T00:55:32", "url": "https://files.pythonhosted.org/packages/9a/b1/972556f9a763de7c48d331d063fd62a899ef1294323b29d070f8c263bea0/py_kinesis-0.0.1-py3.7.egg" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "3aa4bf2a42be707a60004222ca7c6e29", "sha256": "8abdf988df8f6735c4d00c7aa106bd85f2423df74179cc7ddda2c9a48c8c948e" }, "downloads": -1, "filename": "py_kinesis-0.0.1-py3.7.egg", "has_sig": false, "md5_digest": "3aa4bf2a42be707a60004222ca7c6e29", "packagetype": "bdist_egg", "python_version": "3.7", "requires_python": null, "size": 22931, "upload_time": "2019-05-21T00:55:32", "url": "https://files.pythonhosted.org/packages/9a/b1/972556f9a763de7c48d331d063fd62a899ef1294323b29d070f8c263bea0/py_kinesis-0.0.1-py3.7.egg" } ] }