{ "info": { "author": "", "author_email": "", "bugtrack_url": null, "classifiers": [ "Development Status :: 3 - Alpha", "Operating System :: OS Independent", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: Implementation :: CPython" ], "description": "

Service Streamer

\n\n

\nBoosting your Web Services of Deep Learning Applications. \n\u4e2d\u6587README\n

\n\n

\n

\n\n

\n What is Service Streamer ? \u2022\n Highlights \u2022\n Installation \u2022\n Develop BERT Service in 5 Minutes \u2022\n API \u2022\n Benchmark \u2022\n FAQ \u2022\n

\n\n
\n \n \"Build\n \n \u2022 Made by ShannonAI \u2022 :globe_with_meridians: http://www.shannonai.com/\n
\n\n\n

What is Service Streamer ?

\n\nA mini-batch collects data samples and is usually used in deep learning models. In this way, models can utilize the parallel computing capability of GPUs. However, requests from users for web services are usually discrete. If using conventional loop server or threaded server, GPUs will be idle dealing with one request at a time. And the latency time will be linearly increasing when there are concurrent user requests. \n\nServiceStreamer is a middleware for web service of machine learning applications. Queue requests from users are sampled into mini-batches. ServiceStreamer can significantly enhance the overall performance of the system by improving GPU utilization. \n\n

Highlights

\n\n- :hatching_chick: **Easy to use**: Minor changes can speed up the model ten times. \n- :zap: **Fast processing speed**: Low latency for online inference of machine learning models. \n- :octopus: **Good expandability**: Easy to be applied to multi-GPU scenarios for handling enormous requests.\n- :crossed_swords: **Applicability**: Used with any web frameworks and/or deep learning frameworks. \n\n\n

Installation

\n\nInstall ServiceStream by using `pip`\uff0crequires **Python >= 3.5** :\n```bash\npip install service_streamer \n```\n\n

Develop BERT Service in 5 Minutes

\n\nWe provide a step-by-step tutorial for you to bring BERT online in 5 minutes. The service processes 1400 sentences per second. \n\n``Text Infilling`` is a task in natural language processing: given a sentence with several words randomly removed, the model predicts those words removed through the given context. \n\n``BERT`` has attracted a lot of attention in these two years and it achieves State-Of-The-Art results across many nlp tasks. BERT utilizes \"Masked Language Model (MLM)\" as one of the pre-training objectives. MLM models randomly mask some of the tokens from the input, and the objective is to predict the original vocabulary id of the masked word based on its context. MLM has similarities with text infilling. It is natural to introduce BERT to text infilling task. \n\n\n1. First, we define a model for text filling task [bert_model.py](./example/bert_model.py). The `predict` function accepts a batch of sentences and returns predicted position results of the `[MASK]` token. \n\n ```python\n class TextInfillingModel(object):\n ...\n\n\n batch = [\"twinkle twinkle [MASK] star.\",\n \"Happy birthday to [MASK].\",\n 'the answer to life, the [MASK], and everything.']\n model = TextInfillingModel()\n outputs = model.predict(batch)\n print(outputs)\n # ['little', 'you', 'universe']\n ```\n **Note**: Please download pre-trained BERT model at first. \n\n\n2. Second, utilize [Flask](https://github.com/pallets/flask) to pack predicting interfaces to Web service. [flask_example.py](./example/flask_example.py)\n\n\n ```python\n model = TextInfillingModel()\n @app.route(\"/naive\", methods=[\"GET\", \"POST\"])\n def naive_predict():\n if request.method == \"GET\":\n inputs = request.args.getlist(\"s\")\n else:\n inputs = request.form.getlist(\"s\")\n outputs = model.predict(inputs)\n return jsonify(outputs)\n \n app.run(port=5005)\n ```\n \n Please run [flask_example.py](./example/flask_example.py), then you will get a vanilla Web server. \n\n ```bash\n curl -X POST http://localhost:5005/naive -d 's=Happy birthday to [MASK].' \n [\"you\"]\n ```\n\n At this time, your web server can only serve 12 requests per second. Please see [benchmark](#benchmark) for more details.\n\n\n3. Third, encapsulate model functions through `service_streamer`. Three lines of code make the prediction speed of BERT service reach 200+ sentences per second (16x faster).\n\n ```python\n from service_streamer import ThreadedStreamer\n streamer = ThreadedStreamer(model.predict, batch_size=64, max_latency=0.1)\n\n @app.route(\"/stream\", methods=[\"POST\"])\n def stream_predict():\n inputs = request.form.getlist(\"s\")\n outputs = streamer.predict(inputs)\n return jsonify(outputs)\n\n app.run(port=5005, debug=False)\n ```\n\n Run [flask_example.py](./example/flask_example.py) and test the performance with [wrk](https://github.com/wg/wrk). \n\n ```bash\n ./wrk -t 2 -c 128 -d 20s --timeout=10s -s example/benchmark.lua http://127.0.0.1:5005/stream\n ...\n Requests/sec: 200.31\n ```\n\n4. Finally, encapsulate models through ``Streamer`` and start service workers on multiple GPUs. ``Streamer`` further accelerates inference speed and achieves 1000+ sentences per second (80x faster). \n\n\n\n ```python\n from service_streamer import ManagedModel, Streamer\n\n class ManagedBertModel(ManagedModel):\n\n def init_model(self):\n self.model = TextInfillingModel()\n\n def predict(self, batch):\n return self.model.predict(batch)\n\n streamer = Streamer(ManagedBertModel, batch_size=64, max_latency=0.1, worker_num=8, cuda_devices=(0, 1, 2, 3))\n app.run(port=5005, debug=False)\n ```\n\n 8 gpu workers can be started and evenly distributed on 4 GPUs.\n\n\n

API

\n\n#### Quick Start\n\nIn general, the inference speed will be faster by utilizing parallel computing.\n\n```python\noutputs = model.predict(batch_inputs)\n```\n\n**ServiceStreamer** is a middleware for web service of machine learning applications. Queue requests from users are scheduled into mini-batches and forward into GPU workers. ServiceStreamer sacrifices a certain delay (default maximum is 0.1s) and enhance the overall performance by improving the ratio of GPU utilization. \n\n\n```python\nfrom service_streamer import ThreadedStreamer\n\n# Encapsulate batch_predict function with Streamer\n\nstreamer = ThreadedStreamer(model.predict, batch_size=64, max_latency=0.1)\n\n# Replace model.predict with streamer.predict\n\noutputs = streamer.predict(batch_inputs)\n```\n\nStart web server on multi-threading (or coordination). Your server can usually achieve 10x (```batch_size/batch_per_request```) times faster by adding a few lines of code.\n\n\n#### Distributed GPU worker\n\nThe performance of web server (QPS) in practice is much higher than that of GPU model. We also support one web server with multiple GPU worker processes.\n\n```python\nfrom service_streamer import Streamer\n\n# Spawn releases 4 gpu worker processes\nstreamer = Streamer(model.predict, 64, 0.1, worker_num=4)\noutputs = streamer.predict(batch)\n```\n\n\n``Streamer`` uses ``spawn`` subprocesses to run gpu workers by default. ``Streamer`` uses interprocess queues to communicate and queue. It can distribute a large number of requests to multiple workers for processing.\n\nThen the prediction results of the model are returned to the corresponding web server in batches. And results are forwarded to the corresponding http response.\n\n```\n+-----------------------------------------------------------------------------+\n| NVIDIA-SMI 390.116 Driver Version: 390.116 |\n|-------------------------------+----------------------+----------------------+\n...\n+-----------------------------------------------------------------------------+\n| Processes: GPU Memory |\n| GPU PID Type Process name Usage |\n|=============================================================================|\n| 0 7574 C /home/liuxin/nlp/venv/bin/python 1889MiB |\n| 1 7575 C /home/liuxin/nlp/venv/bin/python 1889MiB |\n| 2 7576 C /home/liuxin/nlp/venv/bin/python 1889MiB |\n| 3 7577 C /home/liuxin/nlp/venv/bin/python 1889MiB |\n+-----------------------------------------------------------------------------+\n\n```\n\nThe above method is simple to define, but the main process initialization model takes up an extra portion of memory. And the model can only run on the same GPU.\nTherefore, we have provided the ```ManagedModel``` class to facilitate model lazy initialization and migration while supporting multiple GPUs.\n\n```python\nfrom service_streamer import ManagedModel\n\nclass ManagedBertModel(ManagedModel):\n\n def init_model(self):\n self.model = Model()\n\n def predict(self, batch):\n return self.model.predict(batch)\n\n\n# Spawn produces 4 gpu worker processes, which are evenly distributed on 0/1/2/3 GPU\nstreamer = Streamer(ManagedBertModel, 64, 0.1, worker_num=4, cuda_devices=(0, 1, 2, 3))\noutputs = streamer.predict(batch)\n```\n\n#### Distributed Web Server\n\nSome cpu-intensive calculations, such as image and text preprocessing, need to be done first in web server. The preprocessed data is then forward into GPU worker for predictions. CPU resources often become performance bottlenecks in practice. Therefore, we also provide the mode of multi-web servers matching (single or multiple) gpu workers.\n\n\nUse ```RedisStream``` to specify a unique Redis address for all web servers and gpu workers. \n\n\n```python\n# default parameters can be omitted and localhost:6379 is used.\nstreamer = RedisStreamer(redis_broker=\"172.22.22.22:6379\")\n```\n\nWe make use of ``gunicorn`` or ``uwsgi`` to implement reverse proxy and load balancing.\n\n```bash\ncd example\ngunicorn -c redis_streamer_gunicorn.py flask_example:app\n```\n\nEach request will be load balanced to each web server for cpu preprocessing, and then evenly distributed to gpu worker for model prediction.\n\n\n### Future API\n\nYou might be familiar with `future` if you have used any concurrent library. \nYou can use the Future API directly if you want to use ``service_streamer`` for queueing requests or distributed GPU computing and using scenario is not web service. \n\n\n```python\nfrom service_streamer import ThreadedStreamer\nstreamer = ThreadedStreamer(model.predict, 64, 0.1)\n\nxs = []\nfor i in range(200):\n future = streamer.submit([[\"How\", \"are\", \"you\", \"?\"], [\"Fine\", \".\"], [\"Thank\", \"you\", \".\"]])\n xs.append(future)\n\n\n# Get all instances of future object and wait for asynchronous responses. \nfor future in xs:\n outputs = future.result()\n print(outputs)\n```\n\n

Benchmark

\n\n### Benchmark\n\nWe utilize [wrk](https://github.com/wg/wrk) to conduct benchmark test.\n\nTest examples and scripts can be found in [example](./example).\n\n\n### Environment\n\n* gpu: Titan Xp\n* cuda: 9.0\n* pytorch: 1.1 \n\n### Single GPU process\n\n```bash\n# start flask threaded server\npython example/flask_example.py\n\n# benchmark naive api without service_streamer\n./wrk -t 4 -c 128 -d 20s --timeout=10s -s scripts/streamer.lua http://127.0.0.1:5005/naive\n# benchmark stream api with service_streamer\n./wrk -t 4 -c 128 -d 20s --timeout=10s -s scripts/streamer.lua http://127.0.0.1:5005/stream\n```\n\n| |Naive|ThreaedStreamer|Streamer|RedisStreamer\n|-|-|-|-|-|\n| qps | 12.78 | 207.59 | 321.70 | 372.45 |\n| latency | 8440ms | 603.35ms | 392.66ms | 340.74ms |\n\n### Multiple GPU processes\n\nThe performance loss of the communications and load balancing mechanism of multi-gpu workers are verified compared with a single web server process.\n\nWe adopt gevent server because multi-threaded Flask server has become a performance bottleneck. Please refer to the [flask_multigpu_example.py](example/flask_multigpu_example.py)\n\n\n```bash\n./wrk -t 8 -c 512 -d 20s --timeout=10s -s scripts/streamer.lua http://127.0.0.1:5005/stream\n```\n\n| gpu_worker_num | Naive | ThreadedStreamer |Streamer|RedisStreamer\n|-|-|-|-|-|\n|1|11.62|211.02|362.69|365.80|\n|2|N/A|N/A|488.40|609.63|\n|4|N/A|N/A|494.20|1034.57|\n\n\n* ``Threaded Streamer`` Due to the limitation of Python GIL, multi-worker is meaningless. We conduct comparison studies using single GPU worker. \n\n* ``Streamer`` Performance improvement is not linear when it is greater than 2 gpu worker.\nThe utilization rate of CPU reaches 100. The bottleneck is CPU at this time and the performance issue of flask is the obstacle. \n\n\n### Utilize Future API to start multiple GPU processes\n\nWe adopt [Future API](#future-api) to conduct multi-GPU benchmeark test locally in order to reduce the performance influence of web server. Please refer to code example in [future_example.py](example/future_example.py)\n\n\n| gpu_worker_num | Batched | ThreadedStreamer |Streamer|RedisStreamer\n|-|-|-|-|-|\n|1|422.883|401.01|399.26|384.79|\n|2|N/A|N/A|742.16|714.781|\n|4|N/A|N/A|1400.12|1356.47|\n\nIt can be seen that the performance of ``service_streamer`` is almost linearly related to the number of gpu workers. Communications of inter-process in ``service_streamer`` is more efficient than redis. \n\n

FAQ

\n\n**Q:** using a model trained from [allennlp](https://github.com/allenai/allennlp),set ``worker_num=4`` of [Streamer](./service_streamer/service_streamer.py) during inference, what's the reason that 16-core cpu is full and speed is slower than [Streamer](./service_streamer/service_streamer.py) with ``worker_num=1``?\n\n**A:** for multi-process inference, if the model process data using numpy with multi-thread, it may cause cpu overheads, resulting in a multi-core computing speed that slower than a single core. This kind of problem may occur when using third-party libraries such as alennlp, spacy, etc. It could be solved by setting ``numpy threads``environment variables.\n ```python\n import os\n os.environ[\"MKL_NUM_THREADS\"] = \"1\" # export MKL_NUM_THREADS=1 \n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"1\" # export NUMEXPR_NUM_THREADS=1 \n os.environ[\"OMP_NUM_THREADS\"] = \"1\" # export OMP_NUM_THREADS=1\n import numpy\n ```\n make sure putting environment variables before ``import numpy``.\n\n**Q:** When using RedisStreamer, if there are only one redis broker and more than one model, the input batches may have different structure. How to deal with such situation? \n\n**A:** Specify the prefix when initializing worker and streamer, each streamer will use a unique channel. \n\nexample of initialiazing workers: \n \n```python\nfrom service_streamer import run_redis_workers_forever\nfrom bert_model import ManagedBertModel\n\nif __name__ == \"__main__\":\n from multiprocessing import freeze_support\n freeze_support()\n run_redis_workers_forever(ManagedBertModel, 64, prefix='channel_1')\n run_redis_workers_forever(ManagedBertModel, 64, prefix='channel_2')\n```\n\nexample of using streamer to have result: \n \n```python\nfrom service_streamer import RedisStreamer\n\nstreamer_1 = RedisStreaemr(prefix='channel_1')\nstreamer_2 = RedisStreaemr(prefix='channel_2')\n\n# predict\noutput_1 = streamer_1.predict(batch)\noutput_2 = streamer_2.predict(batch)\n```", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/shannonAI", "keywords": "service_streamer", "license": "", "maintainer": "", "maintainer_email": "", "name": "service-streamer", "package_url": "https://pypi.org/project/service-streamer/", "platform": "", "project_url": "https://pypi.org/project/service-streamer/", "project_urls": { "Homepage": "https://github.com/shannonAI" }, "release_url": "https://pypi.org/project/service-streamer/0.1.2/", "requires_dist": null, "requires_python": ">=3.5", "summary": "Boosting your web service of deep learning applications", "version": "0.1.2" }, "last_serial": 5790739, "releases": { "0.0.2": [ { "comment_text": "", "digests": { "md5": "696dd80458bd72da1758b51137b64f92", "sha256": "cf396742d3718f57d04056f18976dd174af0e93f6df21451ab0376db62b98395" }, "downloads": -1, "filename": "service_streamer-0.0.2.tar.gz", "has_sig": false, "md5_digest": "696dd80458bd72da1758b51137b64f92", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.5", "size": 10577, "upload_time": "2019-08-03T09:54:23", "url": "https://files.pythonhosted.org/packages/99/45/51262de6418dea1765fa6592fb210d6184fbc4282fba70b9840b6d6ff155/service_streamer-0.0.2.tar.gz" } ], "0.1.0": [ { "comment_text": "", "digests": { "md5": "00e35d8e566f630fe597a133dbe4e204", "sha256": "d1fa674017e6f9befe222940f75ce5286e93a0aa1e8f2d48d527d0a8f81913c2" }, "downloads": -1, "filename": "service_streamer-0.1.0.tar.gz", "has_sig": false, "md5_digest": "00e35d8e566f630fe597a133dbe4e204", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.5", "size": 10436, "upload_time": "2019-08-20T07:33:22", "url": "https://files.pythonhosted.org/packages/bb/40/ab5c9c616b4ced2e76f0692536252360a0fde1e8bf093c339889649e587a/service_streamer-0.1.0.tar.gz" } ], "0.1.1": [ { "comment_text": "", "digests": { "md5": "11761f8b1fa0bd288d20bf4f5101b3e0", "sha256": "f385a54f77673619686383d99cd025ce892ded692ea55e81e3813418cdb0bcfd" }, "downloads": -1, "filename": "service_streamer-0.1.1.tar.gz", "has_sig": false, "md5_digest": "11761f8b1fa0bd288d20bf4f5101b3e0", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.5", "size": 11337, "upload_time": "2019-08-30T02:06:50", "url": "https://files.pythonhosted.org/packages/06/18/07d330c794abf537cce71defc85e2c46794d6652c9c24479d63f4381a6b5/service_streamer-0.1.1.tar.gz" } ], "0.1.2": [ { "comment_text": "", "digests": { "md5": "e0af6c517b50206ae1b0ec01f16041d3", "sha256": "097ec1a139e8d7478804326644d681c9b2d635fd14c938d9ce084731c74a8cfd" }, "downloads": -1, "filename": "service_streamer-0.1.2.tar.gz", "has_sig": false, "md5_digest": "e0af6c517b50206ae1b0ec01f16041d3", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.5", "size": 11544, "upload_time": "2019-09-06T08:08:32", "url": "https://files.pythonhosted.org/packages/0d/b8/847f04820858bf8f0abb3c68b0f14216aed18b86d2ce7051fe85c14984f7/service_streamer-0.1.2.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "e0af6c517b50206ae1b0ec01f16041d3", "sha256": "097ec1a139e8d7478804326644d681c9b2d635fd14c938d9ce084731c74a8cfd" }, "downloads": -1, "filename": "service_streamer-0.1.2.tar.gz", "has_sig": false, "md5_digest": "e0af6c517b50206ae1b0ec01f16041d3", "packagetype": "sdist", "python_version": "source", "requires_python": ">=3.5", "size": 11544, "upload_time": "2019-09-06T08:08:32", "url": "https://files.pythonhosted.org/packages/0d/b8/847f04820858bf8f0abb3c68b0f14216aed18b86d2ce7051fe85c14984f7/service_streamer-0.1.2.tar.gz" } ] }