{ "info": { "author": "Postmates, Inc", "author_email": "rhettg@gmail.com", "bugtrack_url": null, "classifiers": [], "description": "# Triton Project\n\nPython Utility code for building a Data Pipeline with AWS Kinesis.\n\nSee [Triton](https://github.com/postmates/go-triton)\n\nKinesis (http://aws.amazon.com/kinesis/) lets you define streams of records.\nYou put records in one end, and the other end can consumer them. The stream\nmaintains the records for 24 hours. These streams come in multiple shards\n(defined by the adminstrator).\n\nThe tooling provided here builds on top of the boto library to make real-world\nwork with these streams and record easier. This is preferential to using the\nAmazon provided KCL (Kinesis Client Library) which is Java-based, or the python\nbindings built on top of KCL, because it isn't very pythonic.\n\nThis tooling also provides built in support for checkpointing, which allows a client to pick up processing records wherever it stopped last. The raw kinesis libraries require the client to take care of the checkpointing process itself.\n\n### Configuration\n\nNormal AWS credential environment variables or IAM roles for boto apply.\n\nClients will need to define a yaml file containing definitions for the streams\nthey will want to use. That yaml file will look like:\n\n my_stream:\n name: my_stream_v2\n partition_key: value\n region: us-west-1\n\nClients generating records will reference the `my_stream` stream which will\nautomatically know to use the real underlying stream of `my_stream_v2` in the\n`us-west-1` region. Records put into this stream are assumed to have a key\nnamed `value` which is use for partitioning.\n\n\n### Demo\n\nTriton comes with a command line script `triton` which can be used to demo some simple functionality.\n\n $ echo 'hi' | triton put -s test_stream\n\nAnd then to consume:\n\n $ triton get -s test_stream\n \n {'msg': 'hi\\n', 'ts': 1433969276.172019}\n\n(Note the order is actually important here, this consumer is set to 'latest',\nso if your producer produces first, you might miss it.)\nYou can set the config by using the environment variable TRITON_CONFIG, the default is /etc/triton.yaml\n\n### Producers\n\nAdding records to the stream is easy:\n\n import triton\n\n c = triton.load_config(\"/etc/triton.yaml\")\n\n s = triton.get_stream('my_stream', c)\n s.put(value='hi mom', ts=time.time())\n\n\nFor more advanced uses, you can record the shard and sequence number returned\nby the put operation.\n\n shard, seq_num = s.put(...)\n\nYou could in theory communicate these values to some other process if you want\nto ensure they have received this record.\n\n__CAVEAT UTILITOR__: Triton currently only supports data types directly converatible\ninto [msgpack formated data](https://github.com/msgpack/msgpack/blob/master/spec.md).\nUnsupported types will raise a `TypeError`.\n\n### Non-Blocking Producers and `tritond`\n\nUsing the producer syntax above, `s.put(value='hi mom', ts=time.time())`, will block until\nthe operation to put the data into Kinesis is complete, which can take on the order of 100 ms.\nThis guarantees that the write has succeeded before continuing the flow of control.\n\nTo allow for writes that do not block, triton comes with `tritond`;\na daemon that will spool Kinesis messages to local memory and write those messages to Kinesis asynchronously.\nWrites via this pathway block for approximately 0.1 ms.\nThe `tritond` spools messages to memory and writes all recieved messages to Kinesis\nevery 100 ms.\nIt is important to note that using this non-blocking pathway eliminates the guarantee\nthat data will be written to Kinesis.\n\nAn instance of `tritond` needs be running to collect Kinesis writes.\nIt is recomended to run an instance on each host that will be producing Kinesis writes.\nBy default, `tritond` will listen on `127.0.0.1:3515` or it will\nrespect the environment variables `TRITON_ZMQ_HOST` and `TRITON_ZMQ_PORT`.\nThe `tritond` uses the same `triton.yaml` files to configure triton streams;\nand will _log errors and skip_ any data if the stream is not configured\nor the config file is not found.\n\n`tritond` can be run by simply calling it from the command line. For testing\nand/or debugging, it can be run in verbose mode and with its output directed to stdout or a file e.g.\n\n tritond -v --skip-kinesis # writes verbose logs and writes events to stdout\n\n tritond -cc --skip-kinesis --output_file test_output.txt\n\nOnce `tritond` is running, usage follows the basic write pattern:\n\n import triton\n\n c = triton.load_config(\"/etc/triton.yaml\")\n\n s = triton.get_nonblocking_stream('my_stream', c)\n s.put(value='hi mom', ts=time.time())\n\nSince the actual Kinesis write happens asynchronously, the shard and sequence number\nare not returned from this operation. \nAlso, as mentioned above, Triton currently only supports data types directly converatible\ninto [msgpack formated data](https://github.com/msgpack/msgpack/blob/master/spec.md).\nFor data put into a `NonblockingStream` object, unsupported types will log an error and continue.\n\n### Consumers\n\nWriting consumers is more complicated as you must deal with sharding. Even in\nthe lightest of workloads you'll likely want to have multiple shards. Triton makes this simple:\n\n import triton\n\n c = triton.load_config(\"/etc/triton.yaml\")\n\n s = triton.get_stream('my_stream', c)\n i = s.build_iterator_from_latest()\n\n for rec in i:\n print rec.data\n\nThis will consume only new records from the stream. Since the stream in theory\nnever ends, you can in your own process management tell it when to stop:\n\n for rec in i:\n\n do_stuff(rec)\n\n if has_reason():\n i.stop()\n\nThis will cause the iterator to stop fetching new data, but will flush out data\nthat's already been fetched.\n\nKinesis supports other types of iterators. For example, if you want to see all the data in the stream:\n\n i = s.build_iterator_for_all()\n\nor if you know a specific shard and offset:\n\n i = s.build_iterator_from_seqnum(shard_num, seq_num)\n\nFor building distributed consumers, you'll want to divide up the work by shards.\nSo if you have 4 shards, the first worker would:\n\n i = s.build_iterator_from_latest([0, 1])\n\nand the second worker would do:\n\n i = s.build_iterator_from_latest([2, 3])\n\nNote that these are 'share numbers', not shard ids. These are indexes into the\nactual shard list.\n\n### Checkpointing\n\nTriton supports checkpointing to a DB so that processing can start where\nprevious processing left off. It requires a postgresDB available.\nTo specify the DB location, set the ENV variable `TRITON_DB` to the DSN\nof the postgres DB, e.g.\n\n export TRITON_DB=\"dbname=db_name port=5432 host=www.dbhosting.com user=user_name password=password\"\n\nAttempting to checkpoint without this DB being configured will raise a\n`TritonCheckpointError` exception.\n\nThe DB also needs to have a specific table created; calling the following will initialized the table (this call is safe to repeat; it is a no-op if the table already exists):\n\n triton.checkpoint.init_db()\n\nTriton checkpointing also requires a unique client name, since the basic\nassumption is that the checkpoint DB will be shared. The client name is specified\nby the ENV variable `TRITON_CLIENT_NAME`. \nAttempting to checkpoint without this ENV variable will also raise a\n`TritonCheckpointError` exception.\n\n\nOnce configured, checkpointing can be used simply by calling the `checkpoint`\nmethod on a stream iterator.\n\nFor example:\n\n s = triton.get_stream('my_stream', c)\n i = s.build_iterator_from_checkpoint()\n\n for ctr in range(1):\n rec = i.next()\n print rec.data\n\n i.checkpoint()\n\nThe next time this code is run, it will pick up from where the last run left off.\n\n\n### Consuming Archives\n\nTriton data is typically archived to S3. Using the triton command, you can view that data:\n\n $ triton cat --bucket=triton-data --stream=my_stream --start-date=20150715 --end-date=20150715\n\nOr using the API, something like:\n\n import triton\n\n c = triton.load_config(\"/etc/triton.yaml\")\n b = triton.open_bucket(\"triton-data\", \"us-west-1\")\n s = triton.stream_from_s3_store(b, c['my_stream'], start_dt, end_dt)\n\n for rec in s:\n ... do something ...\n\n\n## Development\n\nYou should be able to configure your development environment using make:\n\n ~/python-triton $ make dev\n\nYou will likely need to install system libraries as well:\n\n ~/python-triton $ sudo apt-get install libsnappy-dev libzmq-dev\n\nThe tests should all work:\n\n ~/python-triton $ make test\n .\n PASSED. 1 test / 1 case: 1 passed, 0 failed. (Total test time 0.00s)\n\nIf you need to debug your application with ipython:\n\n ~/python-triton $ make shell\n Python 2.7.3 (default, Apr 27 2012, 21:31:10)\n Type \"copyright\", \"credits\" or \"license\" for more information.\n\n IPython 0.12.1 -- An enhanced Interactive Python.\n ? -> Introduction and overview of IPython's features.\n %quickref -> Quick reference.\n help -> Python's own help system.\n object? -> Details about 'object', use 'object??' for extra details.\n\n In [1]: from project.models import Project\n\n In [2]:\n\n## TODO\n\n * It would probably be helpful to have some common code for building a worker\n process that just handles records.\n", "description_content_type": null, "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/postmates/triton-python", "keywords": "", "license": "", "maintainer": "", "maintainer_email": "", "name": "py-triton", "package_url": "https://pypi.org/project/py-triton/", "platform": "", "project_url": "https://pypi.org/project/py-triton/", "project_urls": { "Homepage": "https://github.com/postmates/triton-python" }, "release_url": "https://pypi.org/project/py-triton/0.0.19/", "requires_dist": null, "requires_python": "", "summary": "Triton - Kinesis Data Pipeline", "version": "0.0.19" }, "last_serial": 3426229, "releases": { "0.0.10": [ { "comment_text": "", "digests": { "md5": "b3aa671dbf0897043df4a821ce3b264a", "sha256": "e63e4f9fe6ebd36c1d8e725f55f0d387a0a14566987edab60737ee17ac5b828d" }, "downloads": -1, "filename": "py-triton-0.0.10.tar.gz", "has_sig": false, "md5_digest": "b3aa671dbf0897043df4a821ce3b264a", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 15104, "upload_time": "2016-03-10T02:52:30", "url": "https://files.pythonhosted.org/packages/58/85/55cc97147ce6f8810d2f4693cb44935b2b7d57a0291707091e318d5f28f8/py-triton-0.0.10.tar.gz" } ], "0.0.11": [ { "comment_text": "", "digests": { "md5": "ec5b2959462d2cf8935bc80d53fdc6b0", "sha256": "c61b49d021e87fe650e66d9f7e1c32821bc6692b6f79b5cffb1d9b65bf7c68b9" }, "downloads": -1, "filename": "py-triton-0.0.11.tar.gz", "has_sig": false, "md5_digest": "ec5b2959462d2cf8935bc80d53fdc6b0", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 15723, "upload_time": "2016-09-15T21:53:42", "url": "https://files.pythonhosted.org/packages/f9/7d/1b2747138675783e4f3d3c4e4bdce38ac90b1acf130cc528e0e202704b89/py-triton-0.0.11.tar.gz" } ], "0.0.12": [], "0.0.13": [ { "comment_text": "", "digests": { "md5": "18024fada6d6f5f52d2823bb84e37855", "sha256": "8b5f655be9e7c8c8b400aff0aa2635cd5f3222f79789003f52b2b500cc2b5a73" }, "downloads": -1, "filename": "py-triton-0.0.13.tar.gz", "has_sig": false, "md5_digest": "18024fada6d6f5f52d2823bb84e37855", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 18001, "upload_time": "2016-09-30T00:50:14", "url": "https://files.pythonhosted.org/packages/bd/70/aa54c023a9ec2757f47cd5f1cabf8e5617686f5f898b6246140fca25a3cc/py-triton-0.0.13.tar.gz" } ], "0.0.16": [ { "comment_text": "", "digests": { "md5": "b4110525a3f15008f93df8373c45af7c", "sha256": "18d8078c2ec82fbcbefb264fb7ed6542f348e14637c171e9fa9bb6d6094ab8f5" }, "downloads": -1, "filename": "py-triton-0.0.16.tar.gz", "has_sig": false, "md5_digest": "b4110525a3f15008f93df8373c45af7c", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 19147, "upload_time": "2017-09-12T21:33:14", "url": "https://files.pythonhosted.org/packages/55/ac/1d9bf59e51ed8a72c878a24c3066b1c116526887fbf6383718f466397854/py-triton-0.0.16.tar.gz" } ], "0.0.18": [ { "comment_text": "", "digests": { "md5": "232957ecd4d16b89ef23ba31f41e4fd2", "sha256": "df7162d87ff9726d4e8a95c22a31eb2cf6b22f1b9c660412ae7ed31900c61da9" }, "downloads": -1, "filename": "py-triton-0.0.18.tar.gz", "has_sig": false, "md5_digest": "232957ecd4d16b89ef23ba31f41e4fd2", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 19129, "upload_time": "2017-11-21T19:46:15", "url": "https://files.pythonhosted.org/packages/01/f9/e500e87848f8c3bcb78e560ede2d2c646e8088341497cb5325498b43efe2/py-triton-0.0.18.tar.gz" } ], "0.0.19": [ { "comment_text": "", "digests": { "md5": "8124ae01426a402dec84a04b2f8538f5", "sha256": "78be55188f7bfa002ba72b79796c3ee290d299f459f0cd058cbec7d5303451db" }, "downloads": -1, "filename": "py-triton-0.0.19.tar.gz", "has_sig": false, "md5_digest": "8124ae01426a402dec84a04b2f8538f5", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 21871, "upload_time": "2017-12-18T22:37:38", "url": "https://files.pythonhosted.org/packages/77/bb/15b1359a9038eabb1aa30abd78068e4e980301790d263f25325265642657/py-triton-0.0.19.tar.gz" } ], "0.0.6": [ { "comment_text": "", "digests": { "md5": "18dbecd714cf530f4f0b1ac79671d2e2", "sha256": "1110f760b69d7e737f662059e9c8e6d4bfa64fcf2d2618f1f6f66098569fc506" }, "downloads": -1, "filename": "py-triton-0.0.6.tar.gz", "has_sig": false, "md5_digest": "18dbecd714cf530f4f0b1ac79671d2e2", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 13656, "upload_time": "2015-12-08T23:29:55", "url": "https://files.pythonhosted.org/packages/67/68/1d96fdbefc09aac0136ee082c7831a20292e5ba02446a9f0a3347848120b/py-triton-0.0.6.tar.gz" } ], "0.0.7": [ { "comment_text": "", "digests": { "md5": "ba43ac93aad1639c921a14ff989b2746", "sha256": "2e2b3cd18417a96b0c395a651db82693ccba8d87ed11bd0de2383b0b1b69f5bc" }, "downloads": -1, "filename": "py-triton-0.0.7.tar.gz", "has_sig": false, "md5_digest": "ba43ac93aad1639c921a14ff989b2746", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 13763, "upload_time": "2015-12-11T02:13:05", "url": "https://files.pythonhosted.org/packages/af/c7/84e6b121fdebb25e8a5b5c896f4938ca5ad4725b6292a43cc0583541ea51/py-triton-0.0.7.tar.gz" } ], "0.0.8": [ { "comment_text": "", "digests": { "md5": "7bc5d7668b450b2f61267a28a5ffbaa5", "sha256": "804de79aec7082678743ae40a5c9840bdd28ea550781f98487c3cfd4b3b67195" }, "downloads": -1, "filename": "py-triton-0.0.8.tar.gz", "has_sig": false, "md5_digest": "7bc5d7668b450b2f61267a28a5ffbaa5", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 14733, "upload_time": "2016-01-30T01:12:17", "url": "https://files.pythonhosted.org/packages/b0/3a/667e6b7c3e1ab0c3d5a94ded521cd9c75025cb07613411e9b09b807f7558/py-triton-0.0.8.tar.gz" } ], "0.0.9": [ { "comment_text": "", "digests": { "md5": "3968563d8daf376ebe10317fe8f2f068", "sha256": "916d1f3064daa4944b6fa115c84642f40c95d693e7f6899798948bcfec08b8a2" }, "downloads": -1, "filename": "py-triton-0.0.9.tar.gz", "has_sig": false, "md5_digest": "3968563d8daf376ebe10317fe8f2f068", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 14860, "upload_time": "2016-02-12T22:46:13", "url": "https://files.pythonhosted.org/packages/34/81/22f51c69b08ebbd2258145a3fb7eed5c7a4b4a5f569250d52a3adb016c26/py-triton-0.0.9.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "8124ae01426a402dec84a04b2f8538f5", "sha256": "78be55188f7bfa002ba72b79796c3ee290d299f459f0cd058cbec7d5303451db" }, "downloads": -1, "filename": "py-triton-0.0.19.tar.gz", "has_sig": false, "md5_digest": "8124ae01426a402dec84a04b2f8538f5", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 21871, "upload_time": "2017-12-18T22:37:38", "url": "https://files.pythonhosted.org/packages/77/bb/15b1359a9038eabb1aa30abd78068e4e980301790d263f25325265642657/py-triton-0.0.19.tar.gz" } ] }