{ "info": { "author": "thuhak", "author_email": "thuhak.zhou@nio.com", "bugtrack_url": null, "classifiers": [], "description": "# kafka to elasticsearch\n\n\u4ecekafka\u8bfb\u53d6\u6570\u636e\uff0c\u5e76\u5904\u7406\u540e\u5199\u5165elastcsearch,\u53ea\u652f\u6301python3\n\n## \u4f7f\u7528\u65b9\u6cd5\n\n### \u7f16\u5199\u81ea\u5df1\u7684\u5904\u7406\u7a0b\u5e8f\n\n\nexample.py\n\n```python\nfrom kfk2es import StreamProcess\n\ndef your_own_handler(event):\n \"\"\"\n event\u7684\u5185\u5bb9\u5c31\u662f\u4ecekafka\u4e2d\u83b7\u53d6\u7684\u6bcf\u6761\u6570\u636e\n \"\"\"\n print(event)\n #return ('esindex-%Y%m%d', '_log', event)\n #return (None, '_log', event)\n return event # \u8fd4\u56de\u7684\u7ed3\u679c\u4f1a\u53d1\u5230es\u4e2d\uff0c\u5982\u679c\u6ca1\u6709\u8fd4\u56de\uff0c\u5219\u4e0d\u53d1\n\nif __name__ == '__main__':\n pipe = StreamProcess(configfile='config.yml')\n # from myconf import Conf\n # conf = Conf('config.yml')\n # pipe = StreamProcess(configfile=conf)\n pipe.handler = your_own_handler\n pipe.run()\n\n```\n\nStreamProcess\u7684configfile\u53c2\u6570\u53ef\u4ee5\u662f\u4e00\u4e2a\u914d\u7f6e\u6587\u4ef6\u8def\u5f84\uff0c\u4e5f\u53ef\u4ee5\u662f\u5305\u542b\u5fc5\u8981\u53c2\u6570\u7684\u5b57\u5178\u3002\n\n\u5982\u679c\u4e0d\u6dfb\u52a0\u4efb\u4f55\u5904\u7406\u7a0b\u5e8f\uff0c\u90a3\u4e48\u4ecekafka\u8bfb\u53d6\u7684\u6570\u636e\u4f1a\u76f4\u63a5\u5c1d\u8bd5\u5f80elasticsearch\u53d1\u9001\n\n\u81ea\u5df1\u6dfb\u52a0\u5904\u7406\u7a0b\u5e8f\uff0c\u53ea\u9700\u8981\u7f16\u5199\u51fd\u6570\uff0c\u5e76\u6309\u7167\u5982\u4e0b\u683c\u5f0f\u8fdb\u884c\u8fd4\u56de:\n\n- \u8fd4\u56de\u4e00\u4e2a\u5b57\u5178\uff0c \u90a3\u4e48\u5b57\u5178\u4e2d\u7684\u6570\u636e\u4f1a\u88ab\u4f20\u5230es\u4e2d\n- \u8fd4\u56de\u4e00\u4e2a3\u4e2a\u5143\u7d20\u7684tuple\uff0c \u90a3\u4e48\u7b2c\u4e00\u4e2a\u5143\u7d20\u662fes\u7684\u7d22\u5f15\uff0c\u7b2c\u4e8c\u4e2a\u5143\u7d20\u5219\u662fes\u7684\u7c7b\u578b\uff0c\u7b2c\u4e09\u4e2a\u5143\u7d20\u4e3a\u9700\u8981\u5bfc\u5165es\u7684\u6570\u636e\u3002 \n\u901a\u8fc7\u6570\u636e\u76f4\u63a5\u8fd4\u56de\u7684\u7d22\u5f15\u548c\u7c7b\u578b\u8981\u4f18\u5148\u4e8e\u914d\u7f6e\u6587\u4ef6\u4e2d\u7684\u914d\u7f6e\u3002\u5982\u679c\u53ea\u5e0c\u671b\u52a8\u6001\u8bbe\u7f6e\u5176\u4e2d\u4e00\u4e2a\uff0c\u53e6\u4e00\u4e2a\u4f7f\u7528\u9ed8\u8ba4\uff0c\u90a3\u4e48\u4e0d\u9700\u8981\u5b9a\u4e49\u7684\u8bbe\u7f6e\u4e3aNone\u5373\u53ef\n- \u5982\u679c\u4e0d\u9700\u8981\u4f20\u8f93\u5230es\u4e2d\uff0c\u53ea\u8981\u4e0d\u8fd4\u56de\u4efb\u4f55\u6570\u636e\u5c31\u53ef\u4ee5\u4e86\u3002\u8fd9\u6837\u53ef\u4ee5\u53ea\u6d4b\u8bd5kafka\u7684\u8fde\u901a\u6027\n\n### \u7f16\u8f91\u914d\u7f6e\u6587\u4ef6\n\n\u914d\u7f6e\u6587\u4ef6\u53ef\u4ee5\u662fjson\u6216\u8005yaml\u683c\u5f0f\uff0c\u4ee5json,yaml\u6216\u8005yml\u7ed3\u5c3e\n\n\u53ef\u4ee5\u4f7f\u7528\u591a\u4e2akafka\u4f5c\u4e3a\u8f93\u5165\uff0celasticsearch\u53ea\u6709\u4e00\u4e2a\u8f93\u51fa\n\n#### yaml\u683c\u5f0f\u4f8b\u5b50\n\nconfig.yml\n\n```yaml\nkafka:\n - topic: top\n bootstrap_servers:\n - server1:9092\n - server2:9092\n - server3:9092\n group_id: group_id\n sasl_plain_username: user\n sasl_plain_password: password\n\nelasticsearch:\n params: #parms for elasticsearch\n hosts:\n - http://user:pass@10.1.0.1:9200\n index: test-%Y%m%d\n type: log # optional(default: \"log\")\n cache_size: 150 # optional(default: 150)\n timeout: 1 # optional(default: 1)\n\n```\n\n\u9700\u8981\u6ce8\u610f\u7684\u662felasticsearch\u9009\u9879\u4e2d\u7684parms\u9009\u9879\u5305\u542b\u4e86elasticsearch\u5e93\u4e2dElasticsearch\u7684\u53c2\u6570. \u800c\u5176\u4ed6\u53c2\u6570\u662fStreamProcess\u9700\u8981\u7684", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/thuhak/kfk2es", "keywords": "kafka elasticsearch logstash", "license": "", "maintainer": "", "maintainer_email": "", "name": "kfk2es", "package_url": "https://pypi.org/project/kfk2es/", "platform": "", "project_url": "https://pypi.org/project/kfk2es/", "project_urls": { "Homepage": "https://github.com/thuhak/kfk2es" }, "release_url": "https://pypi.org/project/kfk2es/1.1.0/", "requires_dist": null, "requires_python": "", "summary": "get data from kafka, after processing send them to elasticsearch", "version": "1.1.0" }, "last_serial": 4892540, "releases": { "1.1.0": [ { "comment_text": "", "digests": { "md5": "df3c8598c8261f7c9090d67aaf2c72d3", "sha256": "72b634c81c32bd900076e6ee7848434bbbd83ca7dec821f5d130f42fc8912aac" }, "downloads": -1, "filename": "kfk2es-1.1.0.tar.gz", "has_sig": false, "md5_digest": "df3c8598c8261f7c9090d67aaf2c72d3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 4481, "upload_time": "2019-03-04T03:12:29", "url": "https://files.pythonhosted.org/packages/f4/58/d20fcf281d8cda9694b848503243ba88cebcafda0c5e844359798f5ccb54/kfk2es-1.1.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "df3c8598c8261f7c9090d67aaf2c72d3", "sha256": "72b634c81c32bd900076e6ee7848434bbbd83ca7dec821f5d130f42fc8912aac" }, "downloads": -1, "filename": "kfk2es-1.1.0.tar.gz", "has_sig": false, "md5_digest": "df3c8598c8261f7c9090d67aaf2c72d3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 4481, "upload_time": "2019-03-04T03:12:29", "url": "https://files.pythonhosted.org/packages/f4/58/d20fcf281d8cda9694b848503243ba88cebcafda0c5e844359798f5ccb54/kfk2es-1.1.0.tar.gz" } ] }