PKjQO.mdatarade/__init__.py"""This library provides tools to allow users to describe data pipelines in yaml format.""" __version__ = '0.1.2' name = 'datarade' PKjQO,`mm*datarade/abstract_repositories/__init__.py""" This module contains all of the abstract repositories for the application. It should get called by the service layer to do work and the repository/orm layer to implement concrete versions of these repositories to save state at runtime. """ from .datasets import AbstractDatasetRepository from .dataset_containers import AbstractDatasetContainerRepository PKjQOK^4datarade/abstract_repositories/dataset_containers.pyimport abc from typing import TYPE_CHECKING if TYPE_CHECKING: from datarade.domain import models class AbstractDatasetContainerRepository(abc.ABC): """ This repository contains methods to get dataset containers and add dataset containers. """ def __init__(self): self.seen = set() def get(self, dataset_container_id: str) -> 'models.DatasetContainer': obj = self._get(dataset_container_id=dataset_container_id) if obj: self.seen.add(obj) return obj def add(self, dataset_container: 'models.DatasetContainer'): self._add(dataset_container) self.seen.add(dataset_container) @abc.abstractmethod def _get(self, dataset_container_id: str) -> 'models.DatasetContainer': raise NotImplementedError @abc.abstractmethod def _add(self, dataset_container: 'models.DatasetContainer'): raise NotImplementedError PKjQO>*datarade/abstract_repositories/datasets.pyimport abc from typing import Set, TYPE_CHECKING if TYPE_CHECKING: from datarade.domain import models class AbstractDatasetRepository(abc.ABC): """ This repository contains methods to get datasets and add datasets. At runtime, there is no way to add datasets to the concrete repository. It was added here to facilitate testing with the fake repository. """ def __init__(self): self.seen: 'Set[models.Dataset]' = set() def get(self, dataset_name: str, dataset_repository_url: str, dataset_catalog: str, username: str = None, password: str = None) -> 'models.Dataset': d = self._get(dataset_name=dataset_name, dataset_repository_url=dataset_repository_url, dataset_catalog=dataset_catalog, username=username, password=password) if d: self.seen.add(d) return d def add(self, dataset: 'models.Dataset'): self._add(dataset) self.seen.add(dataset) @abc.abstractmethod def _get(self, dataset_name: str, dataset_repository_url: str, dataset_catalog: str, username: str = None, password: str = None) -> 'models.Dataset': raise NotImplementedError @abc.abstractmethod def _add(self, dataset: 'models.Dataset'): raise NotImplementedError PKjQOdatarade/domain/__init__.pyPKjQOjv.!!datarade/domain/commands.py""" This module contains the domain commands that occur throughout processing requests. """ from dataclasses import dataclass, field from typing import List, TYPE_CHECKING if TYPE_CHECKING: from datarade.domain import models class Command: pass @dataclass(frozen=True) class CreateDatasetContainer(Command): """ This command results from the service layer or api layer requesting a new dataset container. """ dataset_container_id: str dataset_repository_url: str dataset_catalog: str driver: str database_name: str host: str port: int = None schema_name: str = None @dataclass(frozen=True) class CreateDataset(Command): """ This command results from the service layer or api layer associating a dataset with a dataset container. """ dataset_container_id: str dataset_name: str username: str = field(default=None, repr=False) password: str = field(default=None, repr=False) @dataclass(frozen=True) class RefreshDataset(Command): """ This command results from the service layer or api layer requesting a data reload for a dataset into a dataset container. """ dataset_container_id: str dataset_name: str username: str = field(default=None, repr=False) password: str = field(default=None, repr=False) @dataclass(frozen=True) class WriteDatasetFromDatabaseToDatabase(Command): """ This command results from the domain layer processing a RefreshDataset request. """ source_database: 'models.Database' target_database: 'models.Database' dataset: 'models.Dataset' table_name: str fields: 'List[models.Field]' username: str = field(default=None, repr=False) password: str = field(default=None, repr=False) PKjQOq*)datarade/domain/events.py""" This module contains the domain events that occur throughout processing requests. """ from dataclasses import dataclass class Event: pass @dataclass class DatasetRequested(Event): """ This event captures the fact that a dataset was requests from a dataset catalog. It can be used for things like logging access and dataset popularity/frequency. """ dataset_name: str dataset_repository_url: str dataset_catalog: str PKjQO_datarade/domain/exceptions.py""" This module contains exceptions for the application. """ class DatasetDoesNotExist(Exception): """ This exception can occur when a dataset is requested to be reloaded, but it isn't already associated with the dataset container. """ pass class DatasetAlreadyExists(Exception): """ This exception can occur when a dataset is added to a dataset container, but the dataset container already has said dataset. """ pass PKjQO~datarade/domain/models.py""" This module contains all domain models for this application. There are currently four models: Field: This can be thought of as a field in a table or view. Database: This can be thought of as a collection of tables. For MS SQL Server, this goes down to the schema level, which makes the name a bit misleading. For other database management systems, it would be very similar to a database. Dataset: This can be thought of as a view. It's a definition of a table of data without the actual data. Dataset Container: This is the combination of a Database object and a Dataset Catalog. It establishes a connection between the two to facilitate the movement of data defined in the Dataset Catalog into the Database. """ from dataclasses import dataclass from typing import List, Set from datarade.domain import commands, exceptions @dataclass(frozen=True) class Field: """ This object represents a column in the dataset Args: name: name of the field type: field type, one of: [Boolean, Date, DateTime, Float, Integer, Numeric, String, Text, Time] description: non-functional, short description of the field, can include notes about what the field is or how it's populated """ name: str type: str description: str = None def __iter__(self): yield 'name', self.name yield 'type', self.type yield 'description', self.description @dataclass(frozen=True) class Database: """ This object represents a database Args: driver: the type of database, currently only mssql is supported database_name: the name of the database host: the name of the server port: the port for the server schema_name: the name of the schema for a MS SQL Server database """ driver: str database_name: str host: str port: int = None schema_name: str = None def __iter__(self): yield 'driver', self.driver yield 'database_name', self.database_name yield 'host', self.host yield 'port', self.port yield 'schema_name', self.schema_name class Dataset: """ This object represents a collection of dataset metadata Args: name: a unique identifier provided by the user definition: the sql defining the dataset fields: a list of field objects in the dataset description: non-functional, short description of the dataset, can include notes about what the dataset is or how it's populated database: a database object that contains the data for the dataset """ def __init__(self, name: str, definition: str, fields: List[Field], description: str = None, database: Database = None): self.name = name self.definition = definition self.fields = fields self.description = description self.database = database self.events = [] def __iter__(self): yield 'name', self.name yield 'definition', self.definition yield 'fields', [dict(field) for field in self.fields] yield 'description', self.description yield 'database', dict(self.database) class DatasetContainer: """ This object represents a collection of dataset metadata Args: dataset_container_id: a unique identifier provided to the user dataset_repository_url: the url to the git repo containing the dataset catalog dataset_catalog: the name of the directory within the git repo containing the particular catalog driver: the type of database, currently only mssql is supported database_name: the name of the database host: the name of the server port: the port for the server schema_name: the name of the schema for a MS SQL Server database """ def __init__(self, dataset_container_id: str, dataset_repository_url: str, dataset_catalog: str, driver: str, database_name: str, host: str, port: int = None, schema_name: str = None): self.dataset_container_id = dataset_container_id self.dataset_repository_url = dataset_repository_url self.dataset_catalog = dataset_catalog self.database = Database(driver=driver, database_name=database_name, host=host, port=port, schema_name=schema_name) self._datasets: Set[Dataset] = set() self.events = [] def add_dataset(self, dataset: Dataset): """ Adds a dataset to the collection of datasets in this dataset container. It will raise an error if the dataset already exists in this container. Args: dataset: the dataset object to be added to the container """ existing_dataset = next((d for d in self._datasets if d.name == dataset.name), None) if existing_dataset: raise exceptions.DatasetAlreadyExists self._datasets.add(dataset) def refresh_dataset(self, dataset_name: str, username: str = None, password: str = None): """ Reloads the supplied dataset using the provided credentials. If no credentials are supplied, Windows AD is used for the account running this script. The dataset needs to already be associated with this container. An error is raised if this is not the case. Args: dataset_name: the name of the dataset to be relaaded username: the username to used for authentication on both the source and target databases password: the password to used for authentication on both the source and target databases """ dataset = next((d for d in self._datasets if d.name == dataset_name), None) if dataset is None: raise exceptions.DatasetDoesNotExist source_database = dataset.database target_database = self.database cmd = commands.WriteDatasetFromDatabaseToDatabase(source_database=source_database, target_database=target_database, dataset=dataset, fields=dataset.fields, table_name=self.get_full_table_name(dataset.name), username=username, password=password) self.events.append(cmd) def get_full_table_name(self, table_name: str) -> str: """ This is a helper method that is needed for MS SQL Server databases that have schemas. Args: table_name: the one part name of the table Returns: the three part name of the table, if the schema is present (MS SQL Server) """ if self.database.schema_name is not None: return f'{self.database.database_name}.{self.database.schema_name}.{table_name}' else: return table_name PKjQO߮Zdatarade/orm/__init__.pyfrom .git import GitSession PKjQO`datarade/orm/git.py""" This is the ORM for obtaining datasets out of a git repo. It combines marshmallow with a home-grown, bare bones git 'session'. """ from io import BytesIO import marshmallow as ma import requests import yaml from requests_ntlm import HttpNtlmAuth from datarade.domain import models class FieldSchema(ma.Schema): """ A marshmallow schema corresponding to a datarade Field object """ name = ma.fields.Str(required=True) description = ma.fields.Str(required=False) type = ma.fields.Str(required=True) @ma.post_load() def post_load(self, data: dict, **kwargs) -> models.Field: return models.Field(**data) class DatabaseSchema(ma.Schema): """ A marshmallow schema corresponding to a datarade Database object """ driver = ma.fields.Str(required=True) database_name = ma.fields.Str(required=True) host = ma.fields.Str(required=True) port = ma.fields.Int(required=False) schema_name = ma.fields.Str(required=False) @ma.post_load() def post_load(self, data: dict, **kwargs) -> models.Database: return models.Database(**data) class DatasetSchema(ma.Schema): """ A marshmallow schema corresponding to a datarade Dataset object """ name = ma.fields.Str(required=True) description = ma.fields.Str(required=False) fit_for_use = ma.fields.Str(required=False) definition = ma.fields.Str(required=True) fields = ma.fields.Nested(FieldSchema, required=True, many=True) database = ma.fields.Nested(DatabaseSchema, required=False) @ma.post_load() def post_load(self, data: dict, **kwargs) -> models.Dataset: return models.Dataset(**data) class GitSession: """ This is a wrapper around a git repo that allows file access. It can be handed a github url or git-tfs url. Args: repository: the url to the git repo catalog: the name of the directory containing the target files (analogous to a table, with datasets being the records) username: the username for the repo password: the password for the repo .. Note: This was intended to be indifferent to the rest of this package, in the sense that it wouldn't 'know' about any datarade concepts. This would be similar to how a sqlalchemy session can be defined prior to associating model classes with sqlalchemy tables. However, this turns out not to be the case, as is evident with the get_dataset() method and catalog parameter. """ def __init__(self, repository: str, catalog: str, username: str = None, password: str = None): self.repository = repository self.catalog = catalog self.username = username self.password = password def get_file(self, file_path: str) -> BytesIO: """ This will return a specific file within the catalog. Args: file_path: the relative path to the file within // Returns: a file as a BytesIO object """ if 'github' in self.repository: return self._get_github_file(file_path) if '/tfs/' in self.repository: return self._get_git_tfs_file(file_path) def get_dataset(self, dataset_name: str) -> models.Dataset: """ This will return a dataset from the dataset catalog. Args: dataset_name: the name of the dataset Returns: a datarade Dataset object """ return get_dataset(self, dataset_name) def _get_github_file(self, file_path: str) -> BytesIO: """ This is the github version of the get_file() function. Args: file_path: the relative path to the file within // Returns: a file as a BytesIO object """ url = f'{self.repository}/{self.catalog}/{file_path}' response = requests.get(url=url) return response.content def _get_git_tfs_file(self, file_path: str) -> BytesIO: """ This is the git-tfs version of the get_file() function. Args: file_path: the relative path to the file within // Returns: a file as a BytesIO object """ url = f'{self.repository}/items/{self.catalog}/{file_path}' if self.username and self.password: auth = HttpNtlmAuth(username=self.username, password=self.password) response = requests.get(url=url, auth=auth) else: response = requests.get(url=url) return response.content def get_dataset(session: 'GitSession', dataset_name: str) -> models.Dataset: """ This method returns a Dataset object given a name. It collects all of the required files from the repository, puts the contents in a configuration dictionary, passes that dictionary up to the abstract repository for validation, and returns the resulting Dataset instance. Args: session: the session for the catalog containing the dataset dataset_name: the name of the dataset, which should be the name of the directory containing the files Returns: a Dataset object """ config_yaml = session.get_file(f'{dataset_name}/config.yaml') config_dict = yaml.safe_load(config_yaml) definition = session.get_file(f'{dataset_name}/definition.sql') config_dict['definition'] = definition dataset_schema = DatasetSchema() return dataset_schema.load(config_dict) PKjQO(V%M!datarade/repositories/__init__.py""" This module contains concrete implementations of abstract repositories. """ from .datasets_git import GitDatasetRepository from .dataset_containers_stateless import StatelessDatasetContainerRepository PKjQOv+5datarade/repositories/dataset_containers_stateless.pyfrom typing import TYPE_CHECKING from datarade import abstract_repositories if TYPE_CHECKING: from datarade.domain import models class StatelessDatasetContainerRepository(abstract_repositories.AbstractDatasetContainerRepository): """ This repository is an in-memory collection of dataset containers. It is used when directly importing this package. Saving dataset containers is not necessary when using this package as a library, so it functions very much like a fake repository. That is why there is no ORM hooked up to it. """ def __init__(self): super().__init__() self._dataset_containers = set() def _get(self, dataset_container_id: str) -> 'models.DatasetContainer': return next(d for d in self._dataset_containers if d.dataset_container_id == dataset_container_id) def _add(self, dataset_container: 'models.DatasetContainer'): self._dataset_containers.add(dataset_container) PKjQO%datarade/repositories/datasets_git.pyfrom typing import TYPE_CHECKING from datarade import abstract_repositories, orm if TYPE_CHECKING: from datarade.domain import models class GitDatasetRepository(abstract_repositories.AbstractDatasetRepository): """ This repository allows the user to store their datasets in a git-compliant source control repository. The structure should look like this: .. code-block:: none repository | |--- catalog | |--- my_dataset | |--- config.yaml |--- definition.sql |--- my_other_dataset | |--- config.yaml |--- definition.sql This repository can be connected to a repo on github or to a git-tfs on-prem repo. """ def __init__(self): super().__init__() def _get(self, dataset_name: str, dataset_repository_url: str, dataset_catalog: str, username: str = None, password: str = None) -> 'models.Dataset': session = orm.GitSession(repository=dataset_repository_url, catalog=dataset_catalog, username=username, password=password) return session.get_dataset(dataset_name=dataset_name) def _add(self, dataset: 'models.Dataset'): pass PKjQOdatarade/services/__init__.pyPKjQODp~-datarade/services/dataset_catalog/__init__.py""" This is the Dataset Catalog service. It provides a wrapper around a dataset catalog that is maintained in a git-compliant source control service (github or git-tfs). This service is read-only. """PKjQO/(datarade/services/dataset_catalog/api.py""" This is the datarade api to be used when this package is used as a python library. It currently has read functionality. """ from typing import TYPE_CHECKING from datarade.services.dataset_catalog import services, message_bus, unit_of_work if TYPE_CHECKING: from datarade.domain import models uow = unit_of_work.GitUnitOfWork() bus = message_bus.MessageBus(uow=uow) uow.set_bus(bus=bus) def get_dataset(dataset_name: str, dataset_repository_url: str, dataset_catalog: str, username: str = None, password: str = None) -> 'models.Dataset': """ This will get the identified dataset from the supplied dataset catalog. If no credentials are supplied, it is assumed that the git repo is public to the machine (no auth is passed in the request). Args: dataset_name: the name of the dataset in the dataset catalog dataset_repository_url: the url for the git repo containing the dataset catalog dataset_catalog: the directory within the git repo containing the dataset catalog username: the username for the git repo password: the password for the git repo Returns: a datarade Dataset object """ return services.get_dataset(dataset_name=dataset_name, dataset_repository_url=dataset_repository_url, dataset_catalog=dataset_catalog, username=username, password=password, uow=bus.uow) PKjQO '0datarade/services/dataset_catalog/message_bus.py""" This is the message bus for the Dataset Catalog service. It is politely borrowed almost whole cloth from the book linked below. There are currently no events or commands hooked up to this bus, so it doesn't do anything at the moment. However, this is expected to change in the near future, so it has been setup ahead of that point in time. Original Source: https://github.com/python-leap/code/blob/master/src/allocation/messagebus.py """ import inspect import traceback from typing import Callable, Union, TYPE_CHECKING from datarade.domain import events if TYPE_CHECKING: from datarade.services.dataset_catalog import unit_of_work Message = Union[events.Event] class MessageBus: def __init__(self, uow: 'unit_of_work.AbstractUnitOfWork'): self.uow = uow self.dependencies = dict(uow=uow) def handle(self, message: Message): if isinstance(message, events.Event): self.handle_event(message) else: raise Exception(f'{message} was not an Event') def handle_event(self, event: events.Event): for handler in EVENT_HANDLERS[type(event)]: try: print('handling event', event, 'with handler', handler, flush=True) self.call_handler_with_dependencies(handler, event) except: print(f'Exception handling event {event}\n:{traceback.format_exc()}') continue def call_handler_with_dependencies(self, handler: Callable, message: Message): params = inspect.signature(handler).parameters deps = {name: dependency for name, dependency in self.dependencies.items() if name in params} handler(message, **deps) EVENT_HANDLERS = {} COMMAND_HANDLERS = {} PKjQOdw..-datarade/services/dataset_catalog/services.pyfrom typing import TYPE_CHECKING from datarade.domain import events if TYPE_CHECKING: from datarade.services.dataset_catalog import unit_of_work from datarade.domain import models def get_dataset(dataset_name: str, dataset_repository_url: str, dataset_catalog: str, username: str, password: str, uow: 'unit_of_work.AbstractUnitOfWork') -> 'models.Dataset': """ This will obtain the requested dataset and return it as an object. If username and password are not provided, it is assumed that the repo is public to the machine (no auth is passed into the request). Args: dataset_name: the name of the dataset in the dataset catalog dataset_repository_url: the url to the git repo containing the dataset catalog dataset_catalog: the directory within the git repo that contains the dataset username: the username to use for authentication password: the password to use for authentication uow: the unit of work to use to process the request Returns: a Dataset object """ with uow: dataset = uow.datasets.get(dataset_name=dataset_name, dataset_repository_url=dataset_repository_url, dataset_catalog=dataset_catalog, username=username, password=password) if dataset is not None: dataset.events.append(events.DatasetRequested(dataset_name=dataset_name, dataset_repository_url=dataset_repository_url, dataset_catalog=dataset_catalog)) return dataset PKjQODD1datarade/services/dataset_catalog/unit_of_work.py""" This is the unit of work for the Dataset Catalog service. It is politely borrowed almost whole cloth from the book linked below. Original Source: https://github.com/python-leap/code/blob/master/src/allocation/unit_of_work.py Since the Dataset Catalog service is read-only, there is currently no reason to implement commit or rollback. This will change in the future as things like logging and monitoring are added. """ import abc from typing import TYPE_CHECKING from datarade import repositories if TYPE_CHECKING: from datarade.services.dataset_catalog import message_bus from datarade import abstract_repositories class AbstractUnitOfWork(abc.ABC): _datasets = None bus = None def __enter__(self): return self def __exit__(self, *args): self.rollback() def commit(self): self._commit() self.publish_events() @abc.abstractmethod def rollback(self): raise NotImplementedError def set_bus(self, bus: 'message_bus.MessageBus'): self.bus = bus def init_repositories(self, datasets: 'abstract_repositories.AbstractDatasetRepository'): self._datasets = datasets @property def datasets(self) -> 'abstract_repositories.AbstractDatasetRepository': return self._datasets def publish_events(self): for dataset in self.datasets.seen: while dataset.events: event = dataset.events.pop(0) self.bus.handle(event) @abc.abstractmethod def _commit(self): raise NotImplementedError class GitUnitOfWork(AbstractUnitOfWork): def __init__(self): self.init_repositories(datasets=repositories.GitDatasetRepository()) def rollback(self): pass def _commit(self): pass PKjQO޴ 2datarade/services/dataset_subscription/__init__.py""" This is the Dataset Subscription service. It uses the Dataset Catalog service to obtain datasets and then allows the user to materialize those datasets into a provided dataset container. """PKjQOQc-datarade/services/dataset_subscription/api.py""" This is the datarade api to be used when this package is used as a python library. """ from datarade.services.dataset_subscription import unit_of_work, message_bus from datarade.domain import commands uow = unit_of_work.StatelessUnitOfWork() bus = message_bus.MessageBus(uow=uow) uow.set_bus(bus=bus) def register_dataset_container(dataset_container_id: str, dataset_repository_url: str, dataset_catalog: str, driver: str, database_name: str, host: str, port: int = None, schema_name: str = None) -> bool: """ Use this to register a dataset container in-memory, to be referenced later. It associates a dataset catalog with a database. Args: dataset_container_id: a user provided unique identifier dataset_repository_url: the url to the git repo containing the dataset catalog dataset_catalog: the directory within the git repo containing the datasets driver: the type of database (currently only MS SQL Server is supported) database_name: the name of the database host: the name of the server port: the port for the server schema_name: the name of the schema for a MS SQL Server database """ cmd = commands.CreateDatasetContainer(dataset_container_id=dataset_container_id, dataset_repository_url=dataset_repository_url, dataset_catalog=dataset_catalog, driver=driver, database_name=database_name, host=host, port=port, schema_name=schema_name) bus.handle(cmd) return True def add_dataset(dataset_container_id: str, dataset_name: str, dataset_username: str = None, dataset_password: str = None) -> bool: """ Use this to associate a particular dataset with a dataset container. It will obtain the dataset metadata from the dataset catalog and store it with the dataset container. Args: dataset_container_id: the unique identifier of the dataset container dataset_name: the name of the dataset in the dataset container's dataset catalog dataset_username: the username for the dataset catalog dataset_password: the password for the dataset catalog """ cmd = commands.CreateDataset(dataset_container_id=dataset_container_id, dataset_name=dataset_name, username=dataset_username, password=dataset_password) bus.handle(cmd) return True def refresh_dataset(dataset_container_id: str, dataset_name: str, dataset_container_username: str = None, dataset_container_password: str = None) -> bool: """ Use this to reload a particular dataset into a dataset container. The dataset must already be associated with the dataset container (via add_dataset()). Args: dataset_container_id: the unique identifier of the dataset container dataset_name: the name of the dataset in the dataset container's dataset catalog dataset_container_username: the username for both the source database and the target database dataset_container_password: the password for both the source database and the target database .. note: The same username and password are used for both the source database and the target database. This is done in an attempt to maintain data access traceability from source to target. In other words, we want to ensure that if an account was able to get data to the target, it's because it also had access to the source. This forces the user to explicitly grant access to the same account on both databases. """ cmd = commands.RefreshDataset(dataset_container_id=dataset_container_id, dataset_name=dataset_name, username=dataset_container_username, password=dataset_container_password) bus.handle(cmd) return True PKjQOØ2datarade/services/dataset_subscription/handlers.py""" This module contains all of the handlers for the Dataset Subscription service. It provides methods to associate datasets with dataset containers and materialize said datasets into dataset containers. """ from typing import TYPE_CHECKING from bcp import DataFile from sqlalchemy import Table from datarade.services.dataset_catalog.api import get_dataset from datarade.domain import models from datarade.services.dataset_subscription import utils if TYPE_CHECKING: from datarade.services.dataset_subscription import unit_of_work from datarade.domain import commands def create_dataset_container(cmd: 'commands.CreateDatasetContainer', uow: 'unit_of_work.AbstractUnitOfWork'): """ This will register a dataset container object in this application Args: cmd: parameters required to register a dataset container object uow: the implementation required to register the dataset container object """ with uow: new_dataset_container = models.DatasetContainer(dataset_container_id=cmd.dataset_container_id, dataset_repository_url=cmd.dataset_repository_url, dataset_catalog=cmd.dataset_catalog, driver=cmd.driver, database_name=cmd.database_name, host=cmd.host, port=cmd.port, schema_name=cmd.schema_name) uow.dataset_containers.add(new_dataset_container) uow.commit() def add_dataset(cmd: 'commands.CreateDataset', uow: 'unit_of_work.AbstractUnitOfWork'): """ This will associate a dataset with a dataset container Args: cmd: parameters required to associate a dataset to a dataset container uow: the implementation required to associate the dataset to the dataset container """ with uow: dataset_container = uow.dataset_containers.get(cmd.dataset_container_id) dataset = get_dataset(dataset_name=cmd.dataset_name, dataset_repository_url=dataset_container.dataset_repository_url, dataset_catalog=dataset_container.dataset_catalog, username=cmd.username, password=cmd.password) dataset_container.add_dataset(dataset) uow.commit() def refresh_dataset(cmd: 'commands.RefreshDataset', uow: 'unit_of_work.AbstractUnitOfWork'): """ This will trigger a refresh of a dataset in a dataset container Args: cmd: parameters required to refresh a dataset uow: the implementation required to refresh a dataset """ with uow: dataset_container = uow.dataset_containers.get(cmd.dataset_container_id) dataset_container.refresh_dataset(dataset_name=cmd.dataset_name, username=cmd.username, password=cmd.password) uow.commit() def write_dataset_from_database_to_database(cmd: 'commands.WriteDatasetFromDatabaseToDatabase', uow: 'unit_of_work.AbstractUnitOfWork'): """ This will execute a refresh of a dataset in a dataset container Args: cmd: parameters required to refresh a dataset uow: the implementation required to refresh a dataset """ with uow: metadata = utils.get_sqlalchemy_metadata(database=cmd.target_database, username=cmd.username, password=cmd.password) field_args = [utils.get_sqlalchemy_column(field) for field in cmd.fields] table = Table(cmd.table_name.split('.')[-1], metadata, extend_existing=True, *field_args) table.drop(checkfirst=True) table.create() data_file = DataFile(delimiter='|~|') dataset = cmd.dataset source_bcp = utils.get_bcp(database=cmd.source_database, username=cmd.username, password=cmd.password) source_bcp.dump(query=dataset.definition, output_file=data_file) target_bcp = utils.get_bcp(database=cmd.target_database, username=cmd.username, password=cmd.password) target_bcp.load(input_file=data_file, table=cmd.table_name) data_file.file.unlink() uow.commit() PKjQO9L} } 5datarade/services/dataset_subscription/message_bus.py""" This is the message bus for the Dataset Subscription service. It is politely borrowed almost whole cloth from the book linked below. Original Source: https://github.com/python-leap/code/blob/master/src/allocation/messagebus.py """ import inspect import traceback from typing import Callable, Union, TYPE_CHECKING from datarade.services.dataset_subscription import handlers from datarade.domain import commands, events if TYPE_CHECKING: from datarade.services.dataset_subscription import unit_of_work Message = Union[commands.Command, events.Event] class MessageBus: def __init__(self, uow: 'unit_of_work.AbstractUnitOfWork'): self.uow = uow self.dependencies = dict(uow=uow) def handle(self, message: Message): if isinstance(message, events.Event): self.handle_event(message) elif isinstance(message, commands.Command): self.handle_command(message) else: raise Exception(f'{m} was not an Event or Command') def handle_event(self, event: events.Event): for handler in EVENT_HANDLERS[type(event)]: try: print('handling event', event, 'with handler', handler, flush=True) self.call_handler_with_dependencies(handler, event) except: print(f'Exception handling event {event}\n:{traceback.format_exc()}') continue def handle_command(self, command: commands.Command): print('handling command', command, flush=True) try: handler = COMMAND_HANDLERS[type(command)] self.call_handler_with_dependencies(handler, command) except Exception as e: print(f'Exception handling command {command}: {e}') raise e def call_handler_with_dependencies(self, handler: Callable, message: Message): params = inspect.signature(handler).parameters deps = {name: dependency for name, dependency in self.dependencies.items() if name in params} handler(message, **deps) EVENT_HANDLERS = {} COMMAND_HANDLERS = { commands.CreateDatasetContainer: handlers.create_dataset_container, commands.CreateDataset: handlers.add_dataset, commands.RefreshDataset: handlers.refresh_dataset, commands.WriteDatasetFromDatabaseToDatabase: handlers.write_dataset_from_database_to_database, } PKjQO 6datarade/services/dataset_subscription/unit_of_work.py""" This is the unit of work for the Dataset Subscription service. It is politely borrowed almost whole cloth from the book linked below. Original Source: https://github.com/python-leap/code/blob/master/src/allocation/unit_of_work.py """ import abc from typing import TYPE_CHECKING from datarade import repositories if TYPE_CHECKING: from datarade.services.dataset_subscription import message_bus from datarade import abstract_repositories class AbstractUnitOfWork(abc.ABC): _dataset_containers = None bus = None def __enter__(self): return self def __exit__(self, *args): pass def commit(self): self._commit() self.publish_events() @abc.abstractmethod def rollback(self): raise NotImplementedError def set_bus(self, bus: 'message_bus.MessageBus'): self.bus = bus def init_repositories(self, dataset_containers: 'abstract_repositories.AbstractDatasetContainerRepository'): self._dataset_containers = dataset_containers @property def dataset_containers(self) -> 'abstract_repositories.AbstractDatasetContainerRepository': return self._dataset_containers def publish_events(self): for dataset_container in self.dataset_containers.seen: while dataset_container.events: event = dataset_container.events.pop(0) self.bus.handle(event) @abc.abstractmethod def _commit(self): raise NotImplementedError class StatelessUnitOfWork(AbstractUnitOfWork): def __init__(self): self.init_repositories(dataset_containers=repositories.StatelessDatasetContainerRepository()) def rollback(self): pass def _commit(self): pass PKjQOT||/datarade/services/dataset_subscription/utils.py""" This module contains utility functions that are used by the handlers, but which clutter up the readability of the handlers. It's possible that some of this could be refactored away at a future date so that it could be put back into the handlers, avoiding the need for a 'utils' file. Also, these could all be methods/properties on the appropriate datarade domain models. However, keeping them at the service layer abstracts away the methods used to work with these objects as those methods are not the concern of the domain model. """ from typing import TYPE_CHECKING from bcp import BCP, Connection from sqlalchemy import MetaData, create_engine, schema, types from sqlalchemy.engine.url import URL if TYPE_CHECKING: from datarade.domain import models def get_sqlalchemy_metadata(database: 'models.Database', username: str = None, password: str = None) -> MetaData: """ This will take a datarade Database object and credentials and return a sqlalchemy MetaData object. Args: database: the datarade Database object containing the attributes needed for a MetaData object username: the username for the database password: the password for the database Returns: a sqlalchemy MetaData object """ driver = database.driver if driver == 'mssql': driver = 'mssql+pymssql' url = URL(drivername=driver, host=database.host, port=database.port, database=database.database_name, username=username, password=password) engine = create_engine(url) if database.schema_name is not None: return MetaData(bind=engine, schema=database.schema_name) else: return MetaData(bind=engine) def get_sqlalchemy_column(field: 'models.Field') -> schema.Column: """ This will convert a datarade Field object into a sqlalchemay Column object. Args: field: a datarade Field object Returns: a sqlalchemy Column object """ if field.type == 'Boolean': return schema.Column(field.name, types.Boolean, comment=field.description) elif field.type == 'Date': return schema.Column(field.name, types.Date, comment=field.description) elif field.type == 'DateTime': return schema.Column(field.name, types.DateTime, comment=field.description) elif field.type == 'Float': return schema.Column(field.name, types.Float, comment=field.description) elif field.type == 'Integer': return schema.Column(field.name, types.Integer, comment=field.description) elif field.type == 'Numeric': return schema.Column(field.name, types.Numeric(18, 2), comment=field.description) elif field.type == 'String': return schema.Column(field.name, types.String, comment=field.description) elif field.type == 'Text': return schema.Column(field.name, types.Text, comment=field.description) elif field.type == 'Time': return schema.Column(field.name, types.Time, comment=field.description) def get_bcp(database: 'models.Database', username: str = None, password: str = None) -> BCP: """ This will take a datarade Database object and credentials and return a BCP object. Args: database: a datarade Database object username: the username for the database password: the password for the database Returns: a BCP object """ conn = Connection(host=f'{database.host},{database.port}', driver=database.driver, username=username, password=password) return BCP(conn) PKjQOʨj-- datarade-0.1.2.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright 2019 Michael R. Alfare Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!HPOdatarade-0.1.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H'V !datarade-0.1.2.dist-info/METADATAWr6}W`ŮHEdT+S7N$NG"!@ gAR[t4Iz /uJ>\r܊T݋xUU9SjnJǽ9rz+m.j.]n*!J^" m c.ddd7 g!y׷TJ&Ωp}&odMY~兯L(ʎh _\秆)OH2ɯ¯o'~iҊP$B+$qajeA2Tb|U%r;JQ ]u(!;SO8* rD[nWл%\V4CCAC߼S[O~'y#xLh2+3O a"O^[ FF!=V:Y"@0$=8Gc5+G'YS,ɳ*u}8! WY<$CG>~Lٵzl=:QoB/ >˖^tɝ3dQEQD.nQr؎'' B Vv L&4hIK)VM 1 8P"zEymA Wp&ZhpXV.eyKm@D;B݆." Sfݬm-ؑhFex)G c.yg &6o$0D6_8H[z@${OȊ ]g=VtYSіUuw*)گ}GX[J4վguŌ /@eX{SoO1=34؅oz:j~wUL)qsnꅡD5KmV.fkIBJL%wp״ |$xZT߭?Ei8h<>* &f{L Va}R-w&{{s(`%iH^GRȞQta餢%wچ_]}q7 _d=VPrYr扱PK!HZ datarade-0.1.2.dist-info/RECORDIHF-`/9 $v]؄#ODh1'DW_Va& (b .!FRLd佅""Qqyz9^O@vL7ߒ7V ccЧE:2̻3)Ώ!vU"]&szۃ_ ?ĠâMu+ yD2M4yi!z60wV 'A;5DƢ Ϗ#"JoEy_Uwޕș!XR¤M'=5<{bN !1hMY ,c8Or~ LȥMϰSbe tNw>mz:+g#.%ɔvSc"%N3ԟ$s`rDDZ}JDo?,w@e#L.]it(K$#xk0EC-W3ALLhLŦMskǭ΍m-{Z~[:^y8N.:03v8#vjY RLS $*!/-RJw}* o8GqفFu27s96T%L"ѷc ;YN-\[et0y=~n\p]|gWQ׻9tmWc.P˺x  %`ũxHquul]>[+Iw{R #\H[h0mu+̉.9rOG= IEƭ]BI+ƕnόN(OLFu"R3ϣ^*|datarade/abstract_repositories/datasets.pyPKjQOI datarade/domain/__init__.pyPKjQOjv.!! datarade/domain/commands.pyPKjQOq*)datarade/domain/events.pyPKjQO_datarade/domain/exceptions.pyPKjQO~ datarade/domain/models.pyPKjQO߮Z5datarade/orm/__init__.pyPKjQO`Z5datarade/orm/git.pyPKjQO(V%M!Kdatarade/repositories/__init__.pyPKjQOv+5Ldatarade/repositories/dataset_containers_stateless.pyPKjQO%Pdatarade/repositories/datasets_git.pyPKjQOVdatarade/services/__init__.pyPKjQODp~-Vdatarade/services/dataset_catalog/__init__.pyPKjQO/(Wdatarade/services/dataset_catalog/api.pyPKjQO '0<^datarade/services/dataset_catalog/message_bus.pyPKjQOdw..-edatarade/services/dataset_catalog/services.pyPKjQODD1mdatarade/services/dataset_catalog/unit_of_work.pyPKjQO޴ 2tdatarade/services/dataset_subscription/__init__.pyPKjQOQc-udatarade/services/dataset_subscription/api.pyPKjQOØ2datarade/services/dataset_subscription/handlers.pyPKjQO9L} } 5datarade/services/dataset_subscription/message_bus.pyPKjQO 6Ҥdatarade/services/dataset_subscription/unit_of_work.pyPKjQOT||/<datarade/services/dataset_subscription/utils.pyPKjQOʨj-- datarade-0.1.2.dist-info/LICENSEPK!HPO_datarade-0.1.2.dist-info/WHEELPK!H'V !datarade-0.1.2.dist-info/METADATAPK!HZ datarade-0.1.2.dist-info/RECORDPK