{ "info": { "author": "Marshall Weir", "author_email": "marshall.weir+pigpy@gmail.com", "bugtrack_url": null, "classifiers": [ "Intended Audience :: Developers", "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python", "Topic :: Software Development" ], "description": "pigpy - a python tool to manage Pig reports\n\nPig provides an amazing set of tools to create complex relational processes on top of Hadoop, but it has a few missing pieces:\n 1) Looping constructs for easily creating multiple similar reports\n 2) Caching of intermediate calculations\n 3) Data management and cleanup code\n 4) Easy testing for report correctness\n\npigpy is an attempt to fill in these holes by providing a python module that knows how to talk to a Hadoop cluster and can create and manage complex report structures.\n\nh1. Getting started\n\nh2. Installation\n\nCurrently there is no real install procedure. Checkout the current trunk and play with it from there. The only requirement I am aware of is that the java executable must be on your path and must be 1.6 or newer. The grunt.py file at the top level should be a reasonable example of where to start using this module, in addition to this tutorial. pigpy has been tested on OS X and Debian Linux.\n\nh2. Submitting reports\nAt its basic level, pigpy is a tool to submit Pig jobs to Hadoop and push and pull data on a cluster. Assume we have a pig report called report.pig in the current directory.\n\n\nfrom pigpy.hadoop import Hadoop\n\nclasspaths = [\n os.path.join(os.path.dirname(__file__), \"external\", \"pig-0.2.0\", \"pig.jar\"),\n os.path.join(os.path.dirname(__file__), \"external\", \"pig-0.2.0\", \"lib\", \"*\"),\n]\nlocal_home = os.path.join(os.path.dirname(__file__), \"external\", \"hadoop-0.18.3\")\nname_node = \"file:///\"\n\nhadoop = Hadoop(local_home, name_node, classpaths)\nhadoop.run_pig_job(\"report.pig\")\nhadoop.copyToLocal(\"/reports/output.csv\", \"output.csv\")\n\n\nThis code runs the report on whatever cluster is specified by name_node, and then pulls the output (assuming the output path is /reports/output.csv) to the local filesystem. The name_node argument can be any HDFS url (eg. hdfs://hadoop_cluster_ip:54310). Test code can be easily written to point to data on the local filesystem or a test cluster to make verifying correct results easier.\n\nh2. Creating complex reports\n\nBeyond just submitting reports, pigpy provides tools to create very complex reports using Python and Pig Latin rather than just Pig Latin. This allows you to use the higher level python constructs to generate pig code instead of maintaining large reports by hand.\n\nBelow is an example of a very simple report providing statistics on cars by color.\n\n\nfrom pigpy.hadoop import Hadoop\nfrom pigpy.reports import Report, Plan, PlanRunner\n\nhadoop = (hadoop initialization code)\n\ndata = Report(\"car_color_data\",\n \"LOAD 'car_colors.tsv' AS (make, model, price, color);\"\n)\ncolor_reports = []\nfor price in [5000, 10000, 15000, 20000, 150000]:\n color_reports.append(Report(\"%s_cars\" % price,\n parents={\"color_data\": data}\n code=\"\"\"%(this)s = FILTER %(color_data)s BY price < \"\"\" + str(price) + \"\"\";\n %(this)s = FOREACH (GROUP %(this)s BY color) {\n GENERATE group.color, AVG(%(this)s.price); \n }\"\"\"))\n\nplan = Plan(\"/tmp/reports\", color_reports)\nprint plan.pigfile\n\nplan_runner = PlanRunner(plan, hadoop)\nplan_runner.run_reports()\nplan_runner.save_reports(\"~/reports\")\nplan_runner.cleanup()\n\n\nThis will print the following pigfile and then run it and save the results to the local machine:\n\n\nLOAD 'car_colors.tsv' AS (make, model, price, color);\n5000_cars = FILTER car_color_data BY price < 5000;\n 5000_cars = FOREACH (GROUP 5000_cars BY color) {\n GENERATE group.color, AVG(5000_cars.price); \n }\n\nSTORE 5000_cars INTO '/tmp/reports/1244375262.84/5000_cars' USING PigStorage(',');\n10000_cars = FILTER car_color_data BY price < 10000;\n 10000_cars = FOREACH (GROUP 10000_cars BY color) {\n GENERATE group.color, AVG(10000_cars.price); \n }\n\nSTORE 10000_cars INTO '/tmp/reports/1244375262.84/10000_cars' USING PigStorage(',');\n15000_cars = FILTER car_color_data BY price < 15000;\n 15000_cars = FOREACH (GROUP 15000_cars BY color) {\n GENERATE group.color, AVG(15000_cars.price); \n }\n\nSTORE 15000_cars INTO '/tmp/reports/1244375262.84/15000_cars' USING PigStorage(',');\n20000_cars = FILTER car_color_data BY price < 20000;\n 20000_cars = FOREACH (GROUP 20000_cars BY color) {\n GENERATE group.color, AVG(20000_cars.price); \n }\n\nSTORE 20000_cars INTO '/tmp/reports/1244375262.84/20000_cars' USING PigStorage(',');\n150000_cars = FILTER car_color_data BY price < 150000;\n 150000_cars = FOREACH (GROUP 150000_cars BY color) {\n GENERATE group.color, AVG(150000_cars.price); \n }\n\nSTORE 150000_cars INTO '/tmp/reports/1244375262.84/150000_cars' USING PigStorage(',');\n\n\nWhile this example is trivial to do in raw Pig Latin (and can be written much better with a clever GROUP ... BY), even here it is much easier to alter the results for each report in the python wrapper than in the resulting Pig Latin. Additionally, if I had been lazy and not renamed each alias, instead calling each report car_price, the module will helpfully rename each alias car_price, car_price_1, car_price_2, etc. Here, it would not make any difference, as we are immediately writing each data bag to the filesystem and not doing further processing.\n\nThis renaming property is why I use %(this)s in the report template in the loop. When the Report object is written to the pig file, self and the parents are filled in by dictionary string interpolation. %(this)s is a special key used to pull in the actual name of the current report. There is no requirement to use any of this functionality, the report could just be the result code and avoid using the loop or parents, but this makes it much harder to alter reports in the future.\n\nThe paths on the hadoop filesystem are pretty horrible to access directly, which is why the PlanRunner can deal with saving them to the local directory. The folder name is just the timestamp to try to avoid having Pig complain if the folder already exists. The save_reports method will pull things to a user specified directory from whatever horrible path is used in Hadoop, and the cleanup method on the PlanRunner will remove all traces of the Pig job from the HDFS.\n\nh2. Caching intermediate results\n\nThe following code generates 3 reports to save, all based on the same initial results. For the purpose of this example, assume the custom UDF takes hours to process the files and each final report takes a few minutes.\n\n\nfrom pigpy.hadoop import Hadoop\nfrom pigpy.reports import Report, Plan, PlanRunner\n\nhadoop = (hadoop initialization code)\n\nslow_report = Report(\"demographic_aggregate\",\n code=\"\"\"%(this)s = LOAD 'us_census.tsv' AS (age, wage, family_size, state);\n%(this)s = (horrible filtering/grouping custom UDF) AS (age, wage, family_size, state, count);\"\"\"\n)\n\nstate_reports = []\nfor state in [\"michigan\", \"ohio\", \"nevada\"]:\n state_reports.append(Report(state, parents={\"demo\": slow_report}, \n code=\"%%(this)s = FILTER %%(demo)s BY state == '%s';\" % state))\n\nplan = Plan(\"/tmp/reports\", state_reports)\nprint plan.pigfile\n\nplan_runner = PlanRunner(plan, hadoop)\nplan_runner.run_reports()\nplan_runner.save_reports(\"~/reports\")\nplan_runner.cleanup()\n\n\nThis code will generate and run the following Pig report:\n\n\ndemographic_aggregate = LOAD 'us_census.tsv' AS (age, wage, family_size, state);\ndemographic_aggregate = (horrible filtering/grouping custom UDF) AS (age, wage, family_size, state, count);\nmichigan = FILTER demographic_aggregate BY state == 'michigan';\n\nSTORE michigan INTO '/tmp/reports/1244588320.16/michigan' USING PigStorage(',');\nohio = FILTER demographic_aggregate BY state == 'ohio';\n\nSTORE ohio INTO '/tmp/reports/1244588320.16/ohio' USING PigStorage(',');\nnevada = FILTER demographic_aggregate BY state == 'nevada';\n\nSTORE nevada INTO '/tmp/reports/1244588320.16/nevada' USING PigStorage(',');\n\n\nWith the current version of Pig (0.2.0 at the time of this tutorial), this Pig job will re-run the demographic_aggregate for each of the 3 subsidiary reports. Hopefully, this will be corrected in a future release of Pig, but until then, pigpy can force caching of these results for you. If you set up a logging handler for pigpy.reports, you should see the following warning:\n\nWARNING:pigpy.reports:Report demographic_aggregate should be cached, but does not have cache_columns\n\nThis means pigpy has found a report it thinks should be cached, but cannot find appropriate hints to save and load it. If we modify the demographic_aggregate to this:\n\n\nslow_report = Report(\"demographic_aggregate\",\n code=\"\"\"%(this)s = LOAD 'us_census.tsv' AS (age, wage, family_size, state);\n%(this)s = (horrible filtering/grouping custom UDF) AS (age, wage, family_size, state, count);\"\"\",\n cache_columns=\"age, wage, family_size, state, count\"\n)\n\n\npigpy will add these two lines to the Pig report after the demographic_aggregate has run its horrible calculation:\n\n\nSTORE demographic_aggregate INTO '/tmp/reports/1244589140.23/demographic_aggregate' USING PigStorage(',');\ndemographic_aggregate = LOAD '/tmp/reports/1244589140.23/demographic_aggregate' USING PigStorage(',') AS (age, wage, family_size, state, count);\n\n\nPig will see this code and save the results for demographic_aggregate to file. When future aliases use demographic_aggregate, they will iterate back up through the report and find the load line and load the previous results rather than running the calculations again. If you use caching extensively, it is very important to make sure the cleanup method gets called, or your HDFS will fill up very quickly. Additionally, pigpy is not very smart about the caching, so it will force caching every time you supply cache_columns and there is more than one subsidiary report. This may cause additional map-reduce jobs to get run on your cluster.\n\nAt Zattoo, caching intermediate values has been improved performance up to 10x depending on the number of subsidiary reports.", "description_content_type": null, "docs_url": null, "download_url": "UNKNOWN", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "http://pypi.python.org/pypi/pigpy", "keywords": null, "license": "MIT", "maintainer": null, "maintainer_email": null, "name": "pigpy", "package_url": "https://pypi.org/project/pigpy/", "platform": "UNKNOWN", "project_url": "https://pypi.org/project/pigpy/", "project_urls": { "Download": "UNKNOWN", "Homepage": "http://pypi.python.org/pypi/pigpy" }, "release_url": "https://pypi.org/project/pigpy/0.7/", "requires_dist": null, "requires_python": null, "summary": "a python tool to manage Pig reports", "version": "0.7" }, "last_serial": 796243, "releases": { "0.6": [ { "comment_text": "", "digests": { "md5": "effc1cbcbe04098816a744e40b827296", "sha256": "1be2afc66f184aa3d1957973f1a1c3fc5af92a05dea2e89ed7730ca2449c6d06" }, "downloads": -1, "filename": "pigpy-0.6.tar.gz", "has_sig": false, "md5_digest": "effc1cbcbe04098816a744e40b827296", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 11131, "upload_time": "2009-06-10T20:47:38", "url": "https://files.pythonhosted.org/packages/3e/cd/feba477d659f66d078175103626acff0a95efa7a0780c814d12fc4017fe2/pigpy-0.6.tar.gz" } ], "0.7": [ { "comment_text": "", "digests": { "md5": "7f59b7360d1b1cd5451ba5d65ac7a31f", "sha256": "89f91f07b95a2f84dda28159f8479209d50498d3aef7ff96f653345cbec09c96" }, "downloads": -1, "filename": "pigpy-0.7.tar.gz", "has_sig": false, "md5_digest": "7f59b7360d1b1cd5451ba5d65ac7a31f", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 13621, "upload_time": "2010-12-21T03:38:08", "url": "https://files.pythonhosted.org/packages/a9/d3/4ab9de9bea5439b6279538738dab3d19f350f765627770b4ec05fac0ec2a/pigpy-0.7.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "7f59b7360d1b1cd5451ba5d65ac7a31f", "sha256": "89f91f07b95a2f84dda28159f8479209d50498d3aef7ff96f653345cbec09c96" }, "downloads": -1, "filename": "pigpy-0.7.tar.gz", "has_sig": false, "md5_digest": "7f59b7360d1b1cd5451ba5d65ac7a31f", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 13621, "upload_time": "2010-12-21T03:38:08", "url": "https://files.pythonhosted.org/packages/a9/d3/4ab9de9bea5439b6279538738dab3d19f350f765627770b4ec05fac0ec2a/pigpy-0.7.tar.gz" } ] }