{ "info": { "author": "Kyle Wilcox", "author_email": "kyle@axds.co", "bugtrack_url": null, "classifiers": [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "Intended Audience :: Science/Research", "License :: OSI Approved :: MIT License", "Operating System :: POSIX :: Linux", "Programming Language :: Python", "Topic :: Scientific/Engineering" ], "description": "## dbsink\n\nRead from a kafka topic and sink to a database table, one row per message.\n\nThis is not unlike the Kafka Connect JdbcConnector. This project has a much lower bar of entry and doesn't require diving into the Kafka Connect ecosystem. I wrote the equivilent to this project using a custom JdbcConnector and it was getting out of control and was basically un-testable. So here we are.\n\nYou can choose to unpack the data as `avro`, `msgpack` or the default `json`. `avro` requires an additional `registry` parameter.\n\nDocker images: https://hub.docker.com/r/axiom/dbsink/builds\n\n## WHY?\n\nI needed to read from well-defined kafka topics and store the results in a database table so collaborators could interact with the data in a more familiar way.\n\nIt is also a very convienent and easy to setup PostgREST on top of the resulting tables to get a quick read-only REST API on top of the tabled messages.\n\n## Mapping messages to tables\n\nYou can define custom mappings between messages and tables using a python class. You may register your custom mappings with the `dbsink.maps` entrypoint to have them available to `dbsink` at run-time.\n\n```python\nentry_points = {\n 'dbsink.maps': [\n 'YourCustomMap = you.custom.map.module:CustomMapClass',\n # ...\n ]\n}\n```\n\nCustom mapping classes should inherit from the `BaseMap` class in `dbsink` and override the following functions as needed:\n\n* `upsert_constraint_name` - Name of the constraint to use for upserting data. Set to to `None` to disable upserting. Use this class property when creating the upsert constraint on your table (see example below).\n\n* `unique_index_name` - Unique index name based on the table name. Use this if defining a single unique index on your table.\n\n* `sequence_name` - Unique sequence name based on the table name. Use this if defining a single sequence column on your table.\n\n* `_check_key` - Checks for validity of a message's `key` before trying to sink. Return `True` if valid and raise an error if not.\n\n* `_check_value` - Checks for validity of a message's `value` before trying to sink. Return `True` if valid and raise an error if not.\n\n* `schema` - A list of SQLAlchmy [Column](https://docs.sqlalchemy.org/en/13/core/metadata.html#sqlalchemy.schema.Column), [Index](https://docs.sqlalchemy.org/en/13/core/constraints.html?highlight=index#sqlalchemy.schema.Index), and [Constraint](https://docs.sqlalchemy.org/en/13/core/constraints.html?highlight=constraint#sqlalchemy.schema.Constraint) schema definitions to use in table creation and updating. This fully describes your table's schema.\n\n* `message_to_values` - A function accepting `key` and `value` arguments and returning a tuple `key, dict` where the dict is the `values` to pass to SQLAlchemy's `insert().values` method. The `value` argument to this function will already be unpacked if `avro` or `msgpack` packing was specified.\n\n ```python\n insert(table).values(\n # dict_returned_ends_up_here\n )\n ```\n\n#### Example\n\nA simple example is the `StringMap` mapping included with `dbsink`\n\n```python\nfrom datetime import datetime\n\nimport pytz\nimport sqlalchemy as sql\nimport simplejson as json\n\nfrom dbsink.maps import BaseMap\n\n\nclass StringMap(BaseMap):\n\n @property\n def upsert_constraint_name(self):\n return None # Ignore upserts\n\n def _check_key(self, key):\n return True # All keys are valid\n\n def _check_value(self, value):\n # Make sure values are JSON parsable\n _ = json.loads(json.dumps(value, ignore_nan=True))\n return True\n\n @property\n def schema(self):\n return [\n sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),\n sql.Column('sinked', sql.DateTime(timezone=True), index=True),\n sql.Column('key', sql.String, default='', index=True),\n sql.Column('payload', sql.String)\n ]\n\n def message_to_values(self, key, value):\n # Raises if invalid. This calls `._check_key` and `._check_value`\n self.check(key, value)\n\n values = {\n 'sinked': datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),\n 'key': key,\n 'payload': json.dumps(value),\n }\n\n return key, values\n```\n\n#### Advanced Example\n\nThere are no restrictions on table schemas or how you map the message data into the schema. Take this example below that uses a `PostGIS` column.\n\n\n```python\nfrom datetime import datetime\n\nimport pytz\nimport sqlalchemy as sql\nimport simplejson as json\nfrom shapely.geometry import shape\nfrom geoalchemy2.types import Geography\n\nfrom dbsink.maps import BaseMap\n\n\nclass NamedGenericGeography(BaseMap):\n\n def _check_key(self, key):\n return True # All keys are valid\n\n def _check_value(self, value):\n # Make sure values are JSON parsable\n _ = json.loads(json.dumps(value, ignore_nan=True))\n return True\n\n @property\n def schema(self):\n return [\n sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),\n sql.Column('name', sql.String, default='', index=True),\n sql.Column('time', sql.DateTime(timezone=True), index=True),\n sql.Column('geom', Geography(srid=4326)),\n sql.Index(\n self.unique_index_name,\n 'name',\n 'time',\n unique=True,\n ),\n sql.UniqueConstraint(\n 'name',\n 'time',\n name=self.upsert_constraint_name\n )\n ]\n\n def message_to_values(self, key, value):\n \"\"\" Assumes a message format of\n {\n \"time\": 1000000000, # unix epoch\n \"name\": \"my cool thing\",\n \"geojson\": {\n \"geometry\": {\n \"type\": \"Polygon\",\n \"coordinates\": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]\n }\n }\n }\n \"\"\"\n # Raises if invalid\n self.check(key, value)\n\n # GeoJSON `geometry` attribute to WKT\n geometry = shape(value['geojson']['geometry']).wkt\n\n values = {\n 'name': value['name']\n 'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()\n 'geom': geometry\n }\n\n return key, values\n```\n\n\n\n## Configuration\n\nThis program uses [`Click`](https://click.palletsprojects.com/) for the CLI interface. For all options please use the `help`:\n\n```sh\n$ dbsink --help\n```\n\n#### Environmental Variables\n\nAll configuration options can be specified with environmental variables using the pattern `DBSINK_[argument_name]=[value]`. For more information see [the click documentation](https://click.palletsprojects.com/en/7.x/options/?highlight=auto_envvar_prefix#values-from-environment-variables).\n\n```bash\nDBSINK_TOPIC=\"topic-to-listen-to\" \\\nDBSINK_LOOKUP=\"StringMap\" \\\nDBSINK_TABLE=\"MyCoolTable\" \\\nDBSINK_CONSUMER=\"myconsumer\" \\\nDBSINK_PACKING=\"msgpack\" \\\nDBSINK_OFFSET=\"earlist\" \\\nDBSINK_DROP=\"true\" \\\nDBSINK_VERBOSE=\"1\" \\\n dbsink\n```\n\n## Testing\n\nYou can run the tests using `pytest`. To run the integration tests, start a database with `docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11` and run `pytest -m integration`\n\n\n", "description_content_type": "text/markdown", "docs_url": null, "download_url": "", "downloads": { "last_day": -1, "last_month": -1, "last_week": -1 }, "home_page": "https://github.com/axiom-data-science/dbsink", "keywords": "", "license": "MIT", "maintainer": "", "maintainer_email": "", "name": "dbsink", "package_url": "https://pypi.org/project/dbsink/", "platform": "", "project_url": "https://pypi.org/project/dbsink/", "project_urls": { "Homepage": "https://github.com/axiom-data-science/dbsink" }, "release_url": "https://pypi.org/project/dbsink/2.0.0/", "requires_dist": [ "click", "easyavro (>=2.5.0)", "geoalchemy2", "msgpack-python", "psycopg2", "python-dateutil", "pytz", "shapely", "simplejson", "sqlalchemy" ], "requires_python": "", "summary": "Sink kafka messages to a database table", "version": "2.0.0" }, "last_serial": 5822063, "releases": { "2.0.0": [ { "comment_text": "", "digests": { "md5": "6fef984ea47cce5b3ca160b185e5f548", "sha256": "779cd1e17af7e5f673ccfb5478d0d9ead81907c822d4cb6644c935725d03cda9" }, "downloads": -1, "filename": "dbsink-2.0.0-py3-none-any.whl", "has_sig": false, "md5_digest": "6fef984ea47cce5b3ca160b185e5f548", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": null, "size": 14564, "upload_time": "2019-09-12T19:07:52", "url": "https://files.pythonhosted.org/packages/eb/92/309a08fab1c29376cf1857b5674c304e2cacaeaabf665bd75d42a9ef12cf/dbsink-2.0.0-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "b13e6872f6ac180a826975a0c808a2c3", "sha256": "2a81b330b28d011384ac85ed5ec965af533061d20d6a4a764ad583f6078911a3" }, "downloads": -1, "filename": "dbsink-2.0.0.tar.gz", "has_sig": false, "md5_digest": "b13e6872f6ac180a826975a0c808a2c3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 15516, "upload_time": "2019-09-12T19:07:54", "url": "https://files.pythonhosted.org/packages/f1/00/682ca831049eca21550d929cef76ce52194b5cfd49976ea16e3c4a5b3018/dbsink-2.0.0.tar.gz" } ] }, "urls": [ { "comment_text": "", "digests": { "md5": "6fef984ea47cce5b3ca160b185e5f548", "sha256": "779cd1e17af7e5f673ccfb5478d0d9ead81907c822d4cb6644c935725d03cda9" }, "downloads": -1, "filename": "dbsink-2.0.0-py3-none-any.whl", "has_sig": false, "md5_digest": "6fef984ea47cce5b3ca160b185e5f548", "packagetype": "bdist_wheel", "python_version": "py3", "requires_python": null, "size": 14564, "upload_time": "2019-09-12T19:07:52", "url": "https://files.pythonhosted.org/packages/eb/92/309a08fab1c29376cf1857b5674c304e2cacaeaabf665bd75d42a9ef12cf/dbsink-2.0.0-py3-none-any.whl" }, { "comment_text": "", "digests": { "md5": "b13e6872f6ac180a826975a0c808a2c3", "sha256": "2a81b330b28d011384ac85ed5ec965af533061d20d6a4a764ad583f6078911a3" }, "downloads": -1, "filename": "dbsink-2.0.0.tar.gz", "has_sig": false, "md5_digest": "b13e6872f6ac180a826975a0c808a2c3", "packagetype": "sdist", "python_version": "source", "requires_python": null, "size": 15516, "upload_time": "2019-09-12T19:07:54", "url": "https://files.pythonhosted.org/packages/f1/00/682ca831049eca21550d929cef76ce52194b5cfd49976ea16e3c4a5b3018/dbsink-2.0.0.tar.gz" } ] }