{ "info": { "author": "zero323", "author_email": "", "bugtrack_url": null, "classifiers": [ "Development Status :: 3 - Alpha", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 2", "Programming Language :: Python :: 3" ], "description": "pyspark-asyncactions\n====================\n\n|Build Status| |PyPI version|\n\nA proof of concept asynchronous actions for PySpark using\n`concurent.futures `__.\nOriginally developed as proof-of-concept solution for\n`SPARK-20347 `__.\n\nHow does it work?\n-----------------\n\nThe package patches `RDD `__,\n`Estimator `__,\n`DataFrame `__ and\n`DataFrameWriter `__\nclasses by adding thin wrappers to the commonly used action methods.\n\n\nMethods are patched by retrieving shared\n`ThreadPoolExecutor `__\n(attached to ``SparkContext``) and applying its ``submit`` method:\n\n.. code:: python\n\n def async_action(f):\n def async_action_(self, *args, **kwargs):\n executor = get_context(self)._get_executor()\n return executor.submit(f, self, *args, **kwargs)\n return async_action_\n\nThe naming convention for the patched methods is ``methodNameAsync``,\nfor example:\n\n- ``RDD.count`` \u21d2 ``RDD.countAsync``\n- ``DataFrame.take`` \u21d2 ``RDD.takeAsync``\n- ``DataFrameWriter.save`` \u21d2 ``DataFrameWriter.saveAsync``\n\nNumber of threads is determined as follows:\n\n- ``spark.driver.cores`` if is set.\n- 2 otherwise.\n\nUsage\n-----\n\nTo patch existing classes just import the package:\n\n.. code:: python\n\n >>> import asyncactions\n >>> from pyspark.sql import SparkSession\n >>>\n >>> spark = SparkSession.builder.getOrCreate()\n\nAll ``*Async`` methods return `concurrent.futures.Future `__:\n\n.. code:: python\n\n >>> rdd = spark.sparkContext.range(100)\n >>> f = rdd.countAsync()\n >>> f\n \n >>> type(f)\n concurrent.futures._base.Future\n >>> f.add_done_callback(lambda f: print(f.result()))\n 100\n\n\nand the result can be used whenever ``Future`` is expected.\n\nInstallation\n------------\n\nThe package is available on PYPI:\n\n.. code:: bash\n\n pip install pyspark-asyncactions\n\nInstallation is required only on the driver node.\n\n\nDependencies\n------------\n\nThe package supports Python 3.5 or later with a common codebase and\nrequires no external dependencies.\n\nIt is also possible, but not supported, to use it with Python 2.7, using\n`concurrent.futures backport `__.\n\n\nDo it yourself\n--------------\n\nDefine actions dictionary which maps from the method name to the docstring:\n\n.. code:: python\n\n >>> actions = {\"evaluate\": \"\"\"Asynchronously evaluates the output with optional parameters.\n ... :param dataset: a dataset that contains labels/observations and\n ... predictions\n ... :param params: an optional param map that overrides embedded\n ... params\n ... :return: :py:class:`concurrent.futures.Future` of metric\n ... \"\"\"}\n\nCall asyncactions.utils.patch_all method with class and actions as the arguments\n\n.. code:: Python\n\n >>> import asyncactions.utils\n >>> from pyspark.ml.evaluation import Evaluator, RegressionEvaluator\n >>> asyncactions.utils.patch_all(Evaluator, actions)\n\nEnjoy your new asynchronous method\n\n.. code:: python\n\n >>> import asyncactions\n >>> df = spark.createDataFrame([(1.0, 1.0), (1.0, -1.0), (0.0, 1.0)], (\"label\", \"prediction\"))\n >>> metrics = RegressionEvaluator().evaluateAsync(df)\n >>> metrics.result() # Note that result is blocking\n 1.2909944487358058\n\nFAQ\n---\n\n- **Why would I need that? Processing in Spark is already distributed.**\n\n As explained in the `Job Scheduling documentation`_\n\n (...) within each Spark application, multiple \u201cjobs\u201d (Spark actions) may be running concurrently if they were submitted by different threads.\n\n However all PySpark actions are blocking. This means that, even if there are free resources on the cluster, each jobs will be executed sequentially\n (paraphrasing `XKCD `__, I am not slacking off, just fitting a ``Pipeline``).\n\n It is perfectly possible `to achieve the same result using threads `__ or ``concurrent.futures``\n directly, but the resulting code but resulting can be quite verbose, especially when used in an interactive environment.\n The goal of this package is to make this process as streamlined as possible by hiding all the details (creating and stopping thread pool, job submission).\n\n- **What about** `GIL`_?\n\n The goal of the package is to enable non-blocking submission of jobs (see above) while the actual processing is handled by the Spark cluster.\n Since heavy lifting is performed by JVM or Python workers as standalone processes, interpreter lock is of lesser concern.\n\n Because final merging process is applied on the driver, GIL might affect jobs depending heavily on computationally expensive ``Accumulators`` or reduce-like\n (``reduce``, ``fold``, ``aggregate``) jobs with computationally expensive function.\n The latter problem can be partially addressed using `treeReduce`_.\n\n\n- **Why not merge this into PySpark?**\n\n **TL;DR** There was not enough consensus if the feature is essential enough,\n and if it is, what implementation should be used (piggyback onto JVM `AsyncRDDActions`_ vs. native Python implementation).\n For details see `corresponding PR `_.\n\n Keeping a separate package gives more freedom (we can add a number of methods not present in the Scala API)\n and better integration with plain Python code, at expense of some more advanced features\n (most notably support for canceling running Spark jobs).\n\n- **When not to use this package?**\n\n This package is intended primarily to achieve small scale concurrent job execution\n when working with interactive environments. While theoretically it should be possible\n to use it to submit hundreds of independent jobs, it will is likely to stress driver process\n and Py4j gateway and crash the application.\n\n Therefore I strongly recommend against using it as substitute for a workflow management software.\n\nDisclaimer\n----------\n\nApache Spark, Spark, PySpark, Apache, and the Spark logo are `trademarks `__ of `The\nApache Software Foundation `__. This project is not owned, endorsed, or\nsponsored by The Apache Software Foundation.\n\n.. |Build Status| image:: https://travis-ci.org/zero323/pyspark-asyncactions.svg?branch=master\n :target: https://travis-ci.org/zero323/pyspark-asyncactions\n.. |PyPI version| image:: https://badge.fury.io/py/pyspark-asyncactions.svg\n :target: https://badge.fury.io/py/pyspark-asyncactions\n.. _Job Scheduling documentation: https://spark.apache.org/docs/latest/job-scheduling.html#overview\n.. _GIL: https://wiki.python.org/moin/GlobalInterpreterLock\n.. _AsyncRDDActions: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions\n.. _treeReduce: https://stackoverflow.com/q/32281417/1560062\n\n\n", "description_content_type": "", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/zero323/pyspark-asyncactions", "keywords": "", "license": "Apache 2.0", "maintainer": "", "maintainer_email": "", "name": "pyspark-asyncactions", "package_url": "https://pypi.org/project/pyspark-asyncactions/", "platform": "", "project_url": "https://pypi.org/project/pyspark-asyncactions/", "project_urls": { "Homepage": "https://github.com/zero323/pyspark-asyncactions" }, "release_url": "https://pypi.org/project/pyspark-asyncactions/0.0.2/", "requires_dist": null, "requires_python": "", "summary": "A proof of concept asynchronous actions for PySpark using concurent.futures", "version": "0.0.2" }, "last_serial": 3993112, "releases": { "0.0.1": [ { "comment_text": "", "digests": { "md5": "cbaeaaf60459def766a32372dd53afd7", "sha256": "13d6348fa52524a0ce1ed53e4a761cad9c769471a0286a5961d93feff6c94ed3" }, "downloads": -1, "filename": "pyspark_asyncactions-0.0.1-py3-none-any.whl", "has_sig": false, "md5_digest": "cbaeaaf60459def766a32372dd53afd7", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": null, "size": 8321, "upload_time": "2018-06-16T21:42:12", "url": "https://files.pythonhosted.org/packages/37/a4/b4acdb3928a2fa42df88af80e3398cfa8f85a0f5bb5e00692cc581494d10/pyspark_asyncactions-0.0.1-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "c4e97ea0b690d114826e05c0bebb7b98", "sha256": "2f88be2cda8f549b7deab42b84e51f13058bc3d6950a74f72905561215204a6a" }, "downloads": -1, "filename": "pyspark-asyncactions-0.0.1.tar.gz", "has_sig": false, "md5_digest": "c4e97ea0b690d114826e05c0bebb7b98", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 5405, "upload_time": "2018-06-16T21:42:14", "url": "https://files.pythonhosted.org/packages/58/ff/3b3e09c8f74f0893b33bc0b5dfbdba76199808139cb2e9e2696d8f098a47/pyspark-asyncactions-0.0.1.tar.gz" } ], "0.0.2": [ { "comment_text": "", "digests": { "md5": "69b04d07f329faa989375494e962e880", "sha256": "72b721c87e34f67108818f22f8e1232d9e910ac5d7885ee4419580ff6bdb5fbb" }, "downloads": -1, "filename": "pyspark_asyncactions-0.0.2-py3-none-any.whl", "has_sig": false, "md5_digest": "69b04d07f329faa989375494e962e880", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": null, "size": 11466, "upload_time": "2018-06-23T15:46:50", "url": "https://files.pythonhosted.org/packages/75/24/7ea18f5170aa3e454deda758246f847190584156978f64c29425eecfa101/pyspark_asyncactions-0.0.2-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "c76477c9152ffc456f45b502029bcc0d", "sha256": "2fefcf58bff76fd8a671f6637247429466161ab6798623c80191d72a178b60c8" }, "downloads": -1, "filename": "pyspark-asyncactions-0.0.2.tar.gz", "has_sig": false, "md5_digest": "c76477c9152ffc456f45b502029bcc0d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8050, "upload_time": "2018-06-23T15:46:51", "url": "https://files.pythonhosted.org/packages/b4/1c/df9a7fb39c0edf2223865c0db0f399b75f157ac04bed3281b029f70bd8a5/pyspark-asyncactions-0.0.2.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "69b04d07f329faa989375494e962e880", "sha256": "72b721c87e34f67108818f22f8e1232d9e910ac5d7885ee4419580ff6bdb5fbb" }, "downloads": -1, "filename": "pyspark_asyncactions-0.0.2-py3-none-any.whl", "has_sig": false, "md5_digest": "69b04d07f329faa989375494e962e880", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": null, "size": 11466, "upload_time": "2018-06-23T15:46:50", "url": "https://files.pythonhosted.org/packages/75/24/7ea18f5170aa3e454deda758246f847190584156978f64c29425eecfa101/pyspark_asyncactions-0.0.2-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "c76477c9152ffc456f45b502029bcc0d", "sha256": "2fefcf58bff76fd8a671f6637247429466161ab6798623c80191d72a178b60c8" }, "downloads": -1, "filename": "pyspark-asyncactions-0.0.2.tar.gz", "has_sig": false, "md5_digest": "c76477c9152ffc456f45b502029bcc0d", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 8050, "upload_time": "2018-06-23T15:46:51", "url": "https://files.pythonhosted.org/packages/b4/1c/df9a7fb39c0edf2223865c0db0f399b75f157ac04bed3281b029f70bd8a5/pyspark-asyncactions-0.0.2.tar.gz" } ] }