PK("PK66sqlnomad/__init__.py# -*- coding: utf-8 -*- """Monadic SQL wrapper""" __version__ = "0.1.01" __author__ = "Mark Stefanovic" __copyright__ = "Mark Stefanovic" __license__ = "MIT" __all__ = [ "SqlDialect", "SqlField", "SqlFilter", "aggregate", "find", "join", "order_by", "Subquery", "Query", "StorageTable", "StorageField" ] from .custom_types import ( AggregationMethod, FilterOperator, SortOrder, SqlDialect ) from .utils import standardize_string from .sql_field import SqlField from .sql_filter import SqlFilter from .query import Query from .storage_field import StorageField from .storage_table import StorageTable from .subquery import Subquery from .query_transformations import ( aggregate, find, join, order_by ) PK;OK(sqlnomad/custom_types.py# coding=utf-8 from enum import Enum from typing import NewType, Union, Optional, Sequence, Dict import datetime FieldDisplayName = NewType("FieldDisplayName", str) FieldStorageName = NewType("FieldStorageName", str) SQL = NewType("SQL", str) SqlValue = Optional[Union[bool, str, int, float, datetime.date, datetime.datetime]] TableDisplayName = NewType("TableDisplayName", str) TableStorageName = NewType("TableStorageName", str) class AggregationMethod(Enum): """Method for aggregating an AggregateField""" AVG = "Average" COUNT = "Count" MAX = "Maximum" MIN = "Minimum" SUM = "Total" def __str__(self): return self.value def __repr__(self): return f"{self.__class__.__name__}.{self.name}" class FilterOperator(Enum): CONTAINS = "Contains" ENDS_WITH = "Ends With" EQUALS = "Equals" GREATER_THAN = "Greater Than" GREATER_THAN_OR_EQUAL_TO = "Greater Than or Equal To" LESS_THAN = "Less Than" LESS_THAN_OR_EQUAL_TO = "Less Than or Equal To" STARTS_WITH = "Starts With" def __str__(self): return self.value def __repr__(self): return f"{self.__class__.__name__}.{self.name}" class SortOrder(Enum): ASCENDING = "Ascending" DESCENDING = "Descending" def __str__(self): return self.value def __repr__(self): return f"{self.__class__.__name__}.{self.name}" class SqlDialect(Enum): ACCESS = "access" MSSQL = "mssql" MYSQL = "mysql" ORACLE = "oracle" POSTGRES = "postgres" SQLITE = "sqlite" def __str__(self): return self.value def __repr__(self): return f"{self.__class__.__name__}.{self.name}" PKBNKsqlnomad/iqueryable.py# coding=utf-8 from abc import abstractmethod, ABC from typing import Sequence from .custom_types import FieldDisplayName, SQL, SqlDialect from .sql_field import SqlField class IQueryable(ABC): """""" @property @abstractmethod def alias(self) -> FieldDisplayName: """""" @property @abstractmethod def dialect(self) -> SqlDialect: """""" @abstractmethod def field(self, field_alias: FieldDisplayName) -> SqlField: """""" @property @abstractmethod def fields(self) -> Sequence[SqlField]: """""" @property @abstractmethod def root_alias(self) -> str: """""" @property @abstractmethod def schema(self) -> str: """""" @property @abstractmethod def sql(self) -> SQL: """""" @property @abstractmethod def suffix(self) -> int: """""" PKTCNK.FPsqlnomad/query.py# coding=utf-8 from typing import ( Callable, Sequence, List, Tuple, Optional ) from .custom_types import ( FieldDisplayName, SQL, TableDisplayName, SqlDialect ) from .iqueryable import IQueryable from .sql_field import SqlField from .subquery import Subquery class Query(IQueryable): """Monad that composes SQL subqueries with transformation functions""" _lineage: List[Tuple[Subquery, Optional[Subquery]]] = [] def __init__(self, subquery: IQueryable): self._subquery = subquery Query._lineage.append(([self._subquery], None)) @property def alias(self) -> str: return self._subquery.alias @property def dialect(self) -> SqlDialect: return self._subquery.dialect @property def fields(self) -> Sequence[SqlField]: return self._subquery.fields @property def root_alias(self) -> TableDisplayName: return self._subquery.root_alias @property def schema(self) -> str: return self._subquery.schema @property def sql(self) -> SQL: return self._subquery.sql @property def suffix(self) -> int: return self._subquery.suffix def field(self, field_alias: FieldDisplayName) -> SqlField: return self._subquery.field(field_alias) def bind(self, fn: Callable[[IQueryable], Subquery]) -> "Query": new_subquery = fn(self._subquery) Query._lineage.append((new_subquery, self._subquery)) return Query(new_subquery) def __str__(self): return f""" Query subquery: {self._subquery!s} """ def __repr__(self): return f"Query(subquery={self._subquery!r})" PKqNKqw"rr!sqlnomad/query_transformations.py# coding=utf-8 from itertools import chain from typing import ( Callable, Sequence, Dict, Union ) from .custom_types import ( AggregationMethod, FilterOperator, SQL, SortOrder, FieldDisplayName ) from .iqueryable import IQueryable from .sql_field import SqlField from .sql_filter import SqlFilter from .subquery import Subquery def _aggregate_field( field: SqlField, aggregation: AggregationMethod ) -> SqlField: aggregate_functions = { AggregationMethod.AVG: "AVG({})", AggregationMethod.COUNT: "COUNT({})", AggregationMethod.MAX: "MAX({})", AggregationMethod.MIN: "MIN({})", AggregationMethod.SUM: "SUM({})" } p = aggregate_functions[aggregation] return SqlField( definition=p.format(field.definition), alias=field.alias ) def aggregate( group_by_fields: Sequence[FieldDisplayName], aggregations: Dict[FieldDisplayName, AggregationMethod] ) -> Callable[[IQueryable], Subquery]: def inner(subquery: IQueryable) -> Subquery: gb_flds = [ subquery.field(fld_name) for fld_name in group_by_fields ] agg_flds = [ _aggregate_field( field=subquery.field(fld_name), aggregation=fn ) for fld_name, fn in aggregations.items() ] fields = gb_flds + agg_flds select_clause = "SELECT " + ", ".join( fld.full_name_with_alias for fld in sorted(fields, key=lambda f: f.alias) ) from_clause = f"FROM ({subquery.sql}) {subquery.alias}" group_by_clause = "GROUP BY " + ", ".join( fld.definition for fld in sorted(gb_flds, key=lambda f: f.alias) ) return Subquery( sql=f"{select_clause} {from_clause} {group_by_clause}", fields=fields, dialect=subquery.dialect, alias=subquery.root_alias, suffix=subquery.suffix + 1 ) return inner def _filter_condition(field: SqlField, flt: SqlFilter) -> str: filter_patterns = { FilterOperator.CONTAINS: "{} LIKE '*{}*'", FilterOperator.EQUALS: "{} = '{}'", FilterOperator.STARTS_WITH: "{} LIKE '{}%'", FilterOperator.ENDS_WITH: "{} = '%{}'", FilterOperator.GREATER_THAN: "{} > '{}'", FilterOperator.GREATER_THAN_OR_EQUAL_TO: "{} >= '{}'", FilterOperator.LESS_THAN: "{} < '{}'", FilterOperator.LESS_THAN_OR_EQUAL_TO: "{} <= '{}'" } return filter_patterns[flt.operator].format(field.definition, flt.value) def find(filters: Sequence[Union[SqlFilter, list]]) -> Callable[[IQueryable], Subquery]: """ If a sequence of sequences is provided, then each sequence is grouped using AND conditions, and the groups themselves are bound using OR conditions. eg: [[flt1, flt2], [flt3, flt4]] = (flt1 and flt2) or (flt3 and flt4) """ def inner(subquery: IQueryable) -> Subquery: def unwrap(flt) -> SQL: if isinstance(flt, SqlFilter): return flt.sql elif isinstance(flt[0], SqlFilter): return "(" + " AND ".join( _filter_condition(field=subquery.field(f.field_alias), flt=f) for f in flt ) + ")" else: return " OR ".join(unwrap(f) for f in flt) return Subquery( sql=f""" SELECT {', '.join( fld.full_name_with_alias for fld in sorted(subquery.fields, key=lambda f: f.alias) )} FROM ({subquery.sql}) {subquery.alias} WHERE {unwrap(filters)} """, fields=subquery.fields, dialect=subquery.dialect, alias=subquery.root_alias, suffix=subquery.suffix + 1 ) return inner def join(*, right_table: IQueryable, left_field_alias: FieldDisplayName, right_field_alias: FieldDisplayName, join_type: str = "inner", ) -> Callable[[IQueryable], Subquery]: """Subquery transformation that combines two tables :param join_type: Either 'inner', 'left', 'right' or 'full'. The default join type is 'INNER' :param right_table: Subquery to join to the current (left) table :param left_field_alias: Foreign key on the current (left) table to join to the foreign key on the right table :param right_field_alias: Foreign key on the right table to join to the foreign key on the current (left) table :return: Subquery resulting from the combination of the two tables """ def inner(left_table: IQueryable) -> Subquery: jtype = join_type.upper() if jtype not in ["INNER", "LEFT", "RIGHT", "FULL"]: raise ValueError(f"Invalid join type {join_type!r}") left_key = left_table.field(left_field_alias) right_key = right_table.field(right_field_alias) if jtype == "RIGHT": exclude_key = left_key else: exclude_key = right_key fields = [ fld for fld in chain(left_table.fields, right_table.fields) if fld.definition != exclude_key.definition ] qry = f""" SELECT {', '.join( fld.full_name_with_alias for fld in sorted(fields, key=lambda f: f.alias) )} FROM ({left_table.sql}) {left_table.alias} {jtype} JOIN ({right_table.sql}) {right_table.alias} ON {left_key.definition} = {right_key.definition} """ return Subquery( sql=qry, fields=fields, dialect=left_table.dialect, alias=left_table.root_alias, suffix=left_table.suffix + 1 ) return inner def order_by(order_by_fields: Dict[FieldDisplayName, SortOrder]) -> Callable[[IQueryable], Subquery]: def inner(subquery: IQueryable) -> Subquery: order_map = { SortOrder.ASCENDING: "{} ASC", SortOrder.DESCENDING: "{} DESC" } order_by_clause = ", ".join( order_map[sort_order].format(subquery.field(fld_name).definition) for fld_name, sort_order in order_by_fields.items() ) qry = f""" SELECT {', '.join( fld.full_name_with_alias for fld in sorted(subquery.fields, key=lambda f: f.alias) )} FROM ({subquery.sql}) {subquery.alias} ORDER BY {order_by_clause} """ return Subquery( sql=qry, fields=subquery.fields, dialect=subquery.dialect, alias=subquery.root_alias, suffix=subquery.suffix + 1 ) return inner PKcPKi zisssqlnomad/sql_field.py# coding=utf-8 import re from .custom_types import FieldDisplayName from .utils import rebracket _NON_FORMULA_PATTERN = re.compile(r"^[\w|\s]+$") class SqlField: """Value-object representing a field in a relational database""" def __init__(self, alias: FieldDisplayName, definition: str ): self._alias = alias self._definition = definition @property def alias(self) -> str: """Column heading to display to the user""" return rebracket(self._alias) @property def definition(self) -> str: """Name (or formula) of the field within the Subqueries SQL attribute. If the definition is a formula, then don't wrap it in brackets. :return: SQL string representation of a field in a Subquery """ if _NON_FORMULA_PATTERN.match(rebracket(self._definition)): return rebracket(self._definition) return self._definition @property def full_name_with_alias(self) -> str: """[Subquery Alias].[Field Display Name] AS [Field Alias]""" return f"{self.definition} AS {self.alias}" def __hash__(self): return hash(self.alias) def __eq__(self, other): return self.alias == other.alias def __str__(self): return f""" SqlField: alias: {self._alias!s} definition: {self._definition!s} """ def __repr__(self): return f"""= SqlField( definition={self.definition!r}, alias={self.alias!r} )""" PKNK]22sqlnomad/sql_filter.py# coding=utf-8 from typing import NamedTuple from .custom_types import FilterOperator, SqlValue, FieldDisplayName class SqlFilter(NamedTuple): """Value-object representing a SQL where clause statement""" field_alias: FieldDisplayName operator: FilterOperator value: SqlValue def __eq__(self, other): return str(self) == str(other) def __hash__(self): return hash(str(self)) def __lt__(self, other): return self.field_alias < other.field_alias def __repr__(self): return f""" SqlFilter( field={self.field_alias!r}, operator={self.operator!r}, value={self.value!r} )""" def __str__(self): return f"{self.field_alias} {self.operator} {self.value!r}" PKNKMs``sqlnomad/storage_field.py# coding=utf-8 from typing import NamedTuple from .custom_types import FieldStorageName, FieldDisplayName class StorageField(NamedTuple): storage_name: FieldStorageName display_name: FieldDisplayName def __str__(self): return f""" StorageField storage_name: {self.storage_name} display_name: {self.display_name} """ def __repr__(self): return f""" StorageField( storage_name={self.storage_name!r}, display_name={self.display_name!r} ) """ PKNK>; sqlnomad/storage_table.py# coding=utf-8 from typing import Sequence from sqlnomad.subquery import Subquery from .custom_types import ( TableStorageName, TableDisplayName, SqlDialect, FieldDisplayName, SQL ) from .sql_field import SqlField from .iqueryable import IQueryable from .storage_field import StorageField from .utils import rebracket class StorageTable(IQueryable): """Value-object representing a table in a relational database""" def __init__(self, storage_name: TableStorageName, display_name: TableDisplayName, fields: Sequence[StorageField], dialect: SqlDialect, schema: str=None ): self._storage_name = storage_name self._display_name = display_name self._fields = fields self._dialect = dialect self._schema = schema @property def alias(self) -> FieldDisplayName: return rebracket(self._display_name) @property def dialect(self) -> SqlDialect: return self._dialect @property def fields(self) -> Sequence[SqlField]: return [ SqlField( alias=rebracket(fld.display_name), definition=f"{rebracket(self.alias)}.{rebracket(fld.display_name)}", ) for fld in self._fields ] @property def root_alias(self) -> str: return rebracket(self._display_name) @property def schema(self) -> str: return rebracket(self._schema) @property def sql(self) -> SQL: if self._schema: full_table_name = f"{rebracket(self._schema)}.{rebracket(self._storage_name)}" else: full_table_name = rebracket(self._storage_name) qualify_field = lambda fld: f"{rebracket(fld.storage_name)} AS {rebracket(fld.display_name)}" select_fields = ", ".join(qualify_field(fld) for fld in self._fields) return f"SELECT {select_fields} FROM {full_table_name}" @property def suffix(self) -> int: return 1 def field(self, field_alias: FieldDisplayName) -> SqlField: try: return next( fld for fld in self.fields if fld.alias == rebracket(field_alias) ) except StopIteration: raise ValueError(f"No field named {field_alias!r} was found on {self._display_name!r}.") def __repr__(self): return f""" StorageTable( storage_name={self._storage_name!r}, display_name={self._display_name!r}, fields={self._fields!r}, dialect={self._dialect!r}, schema={self._schema!r} ) """ def __str__(self): return f""" StorageTable storage_name: {self._storage_name!s} display_name: {self._display_name!s} fields: {self._fields!s} dialect: {self._dialect!s} schema: {self._schema!s} """ PKNK($sqlnomad/subquery.py# coding=utf-8 from typing import Sequence, Counter from .custom_types import ( TableDisplayName, SqlDialect, SQL, FieldDisplayName ) from .iqueryable import IQueryable from .sql_field import SqlField from .utils import rebracket, strip_chars def _check_for_duplicate_field_aliases(fields: Sequence[SqlField]) -> Sequence[SqlField]: aliases = (fld.alias for fld in fields) duplicates = [ alias for alias, ct in Counter(aliases).items() if ct > 1 ] if duplicates: raise UserWarning(f"Duplicate field aliases found {duplicates}") return fields class Subquery(IQueryable): """Value-object representing a SQL subquery with its accompanying field metadata :param sql: SQL statement of the subquery This parameter can be either a SQL statement or a table storage name. :param fields: Set of SqlFields for the subquery :param dialect: SqlDialect of the underlying table (eg, SqlDialect.MSSQL) :param alias: Name to refer to the current subquery in outer scopes :param schema: The database schema of the underlying table. This parameter is only relevant when the SQL parameter is simply the name of a table. :param suffix: Suffix to append to the alias of the subquery to distinguish it from subqueries in outer scopes with the same alias. """ def __init__(self, sql: TableDisplayName, fields: Sequence[SqlField], dialect: SqlDialect, alias: TableDisplayName, schema: str=None, suffix: int=1 ): self._sql = sql self._fields = _check_for_duplicate_field_aliases(fields) self._dialect = dialect self._alias = alias self._schema = schema self._suffix = suffix @property def alias(self) -> str: return rebracket(f"{self._alias} {self._suffix}") @property def dialect(self) -> SqlDialect: return self._dialect @property def fields(self) -> Sequence[SqlField]: return [ SqlField( definition=f"{self.alias}.{fld.alias}", alias=fld.alias ) for fld in self._fields ] @property def root_alias(self) -> TableDisplayName: return self._alias @property def schema(self) -> str: if self._schema: return rebracket(self._schema) @property def suffix(self) -> int: return self._suffix @property def sql(self) -> SQL: return self._sql def field(self, field_alias: FieldDisplayName) -> SqlField: std_alias = lambda a: strip_chars(a, "[]").lower() try: return next( fld for fld in self.fields if std_alias(fld.alias) == std_alias(rebracket(field_alias)) ) except StopIteration: raise ValueError(f"No field named {field_alias} was found on the {self.alias} table.") def __str__(self): return f"{self._sql} {rebracket(self._alias)}" def __repr__(self): return f""" RawSqlTable( sql={self._sql!r}, fields={self._fields!r}, dialect={self._dialect!r}, alias={self._alias!r}, schema={self._schema!r}, suffix={self._suffix!r} ) """ PK3DNKM?sqlnomad/utils.py# coding=utf-8 from typing import Sequence, Union, Optional def rebracket(s) -> str: """Remove brackets within a string, and then surround the string with new brackets This is used to deal with field aliases that may or may not have brackets, and allows calling code to assume that regardless of its initial state the field will have brackets surrounding it. """ new_str = strip_chars(s, "[]") return f"[{new_str}]" def strip_chars(s: Optional[str], chars: Union[str, Sequence[str]]) -> str: """Strip specified characters from a string Example ------- >>> strip_chars("[test]", "[]") 'test' >>> strip_chars(None, "[]") '' """ if s: clean = s.strip() for c in chars: clean = clean.replace(c, "") return clean # return s.replace("[", "").replace("]", "").strip() return "" def standardize_string(original: str) -> str: """Remove special characters and standardize spaces in a string Example ------- >>> standardize_string(f"SELECT first_name\\n FROM ( SELECT * FROM customers)\\n") 'SELECT first_name FROM (SELECT * FROM customers)' """ new = strip_chars(original, "\n\r\t") new = " ".join(new.split()) new = new.replace("{ ", "{") new = new.replace(" }", "}") new = new.replace("( ", "(") new = new.replace(" )", ")") return new if __name__ == '__main__': import doctest doctest.testmod() PK>NKڽee%sqlnomad-0.1.01.dist-info/LICENSE.txtCopyright 2017 Mark Stefanovic MIT License Copyright (c) [year] [fullname] Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to wh om the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H١Wdsqlnomad-0.1.01.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,Q0343 /, (-JLR()*M ILR(4KM̫#DPK!H, "sqlnomad-0.1.01.dist-info/METADATAE0DqٕčF|pF-_˙3SP&'r^- sV"ka?6eBPoZrle%8Ѕ`HVnڨ@w]QQb+k>:rS%ˇ/'PotNtĖ=z/g]ZgFj\_^PK!HqF$ sqlnomad-0.1.01.dist-info/RECORD}KH-p\ ((IU˯$=LzW'WonUQguAp)G(dJtu~mߕ܋g,5S~Oaϼ*E}b63mD)CnP6l;.X4]ҎѭLf\G+8e %iuAyAV յH0::o0N˝s7)$mTUD Tko|{ȻJo rIbjɁ0-2iexZ짴ZN>CrwM6dQ$ߵ$sMw[M:D$@ ܣs/X0j,|l ydLfZ^<3,Ix))|+L&1\ qو:yn{A6j 2O=7vnnJRl,],2_rմ?M\̕(̵:{+뜜Ybƕ'2|?eHkMcy^Mo*`R8 5E PK("PK66sqlnomad/__init__.pyPK;OK(hsqlnomad/custom_types.pyPKBNK sqlnomad/iqueryable.pyPKTCNK.FPfsqlnomad/query.pyPKqNKqw"rr!sqlnomad/query_transformations.pyPKcPKi zissC2sqlnomad/sql_field.pyPKNK]228sqlnomad/sql_filter.pyPKNKMs``O<sqlnomad/storage_field.pyPKNK>; >sqlnomad/storage_table.pyPKNK($ Ksqlnomad/subquery.pyPK3DNKM?ZYsqlnomad/utils.pyPK>NKڽee%_sqlnomad-0.1.01.dist-info/LICENSE.txtPK!H١Wd,dsqlnomad-0.1.01.dist-info/WHEELPK!H, "dsqlnomad-0.1.01.dist-info/METADATAPK!HqF$ esqlnomad-0.1.01.dist-info/RECORDPK.h