PK!̧FFtinydb_constraint/__init__.pyfrom .table import ConstraintTable from .constraint import Constraint PK!>tinydb_constraint/constraint.pyfrom typing import NamedTuple, Any, Union class Constraint(NamedTuple): type_: Union[type, list, type(Any)] = Any unique: bool = False not_null: bool = False class ConstraintMapping: def __init__(self): self.type_ = dict() self.preexisting = dict() self.not_null = set() def update(self, schema_dict): for k, c in schema_dict.items(): if isinstance(c, type): self._parse_type(k, c) else: assert isinstance(c, Constraint), repr(c) if c.type_: self._parse_type(k, c.type_) if c.unique: self.preexisting.setdefault(k, set()) if c.not_null: self.not_null.add(k) def _parse_type(self, k, type_): if k in self.type_.keys(): expected_type = self.type_[k] if expected_type is not Any: if type_ is not expected_type: raise TypeError else: self.type_[k] = type_ def _view(self): all_keys = set(self.type_.keys()) | set(self.preexisting.keys()) | self.not_null for k in all_keys: type_ = self.type_.get(k, Any) unique = k in self.preexisting.keys() not_null = k in self.not_null yield k, Constraint(type_, unique, not_null) def view(self): return dict(self._view()) def __repr__(self): return repr(self.view()) PK!)'Utinydb_constraint/exception.pyclass NonUniformTypeException(TypeError): pass class NotUniqueException(ValueError): pass class NotNullException(ValueError): pass PK!CNI I tinydb_constraint/table.pyfrom tinydb.database import Table from tinydb import Query import os import dateutil.parser from datetime import datetime, date from copy import deepcopy from .util import normalize_chars from .constraint import ConstraintMapping from .exception import NonUniformTypeException, NotNullException, NotUniqueException class ConstraintTable(Table): constraint_mapping = ConstraintMapping() def insert(self, element): self._update_uniqueness(element) return super().insert(self._sanitize_one(element)) def insert_multiple(self, elements): for element in elements: self._update_uniqueness(element) return super().insert_multiple(self._sanitize_multiple(elements)) def insert_if_not_exists(self, element, cond): if isinstance(cond, (list, tuple)): con0 = (Query()[cond[0]] == element[cond[0]]) for con in cond[1:]: con0 = (con0 & con) cond = con0 elif isinstance(cond, str): cond = (Query()[cond] == element[cond]) records = self.search(cond) if not records: return self.insert(element) def update(self, fields, cond=None, doc_ids=None, eids=None): if callable(fields): _update = lambda data, eid: self._sanitize_one(fields(data[eid])) else: if isinstance(cond, (list, tuple)): fields = fields.copy() con0 = (Query()[cond[0]] == fields.pop(cond[0])) for con in cond[1:]: con0 = (con0 & con) cond = con0 elif isinstance(cond, str): fields = fields.copy() cond = (Query()[cond] == fields.pop(cond)) _update = lambda data, eid: data[eid].update(self._sanitize_one(fields)) doc_ids = self.process_elements(_update, cond, doc_ids, eids) if doc_ids: for record in self.search(cond): self._update_uniqueness(fields) return doc_ids def _sanitize_multiple(self, records): """Sanitizes records, e.g. from Excel spreadsheet Arguments: records {iterable} -- Iterable of records Returns: list -- List of records """ def _records(): for record in records: record_schema = dict(self._parse_record(record, yield_type=True)) num_to_str = set() for k, v in record_schema.items(): expected_type = self.constraint_mapping.type_.get(k, None) if expected_type and v is not expected_type: if expected_type is str and v in (int, float): # v = str num_to_str.add(k) else: raise NonUniformTypeException('{} not in table schema {}' .format(v, self.get_schema(refresh=False))) self.update_schema(record_schema) record = dict(self._parse_record(record, yield_type=False)) for k, v in record.items(): if k in num_to_str: record[k] = str(v) yield record if bool(int(os.getenv('TINYDB_SANITIZE', '1'))): self.refresh() for c in self.get_schema(refresh=False).values(): assert not isinstance(c.type_, list) # self.refresh() return list(_records()) else: return records def _sanitize_one(self, record): return self._sanitize_multiple([record])[0] @property def schema(self): """Get table's latest schema Returns: dict -- dictionary of constraints """ return self.get_schema(refresh=True) def set_schema(self, schema_dict): """Reset and set a new schema Arguments: schema_dict {dict} -- dictionary of constraints or types """ self.constraint_mapping = ConstraintMapping() self.update_schema(schema_dict) def get_schema(self, refresh=False): """Get table's schema, while providing an option to disable refreshing to allow faster getting of schema Keyword Arguments: refresh {bool} -- disable refreshing to allow faster getting of schema (default: {False}) Returns: dict -- dictionary of constraints """ if refresh: return self.refresh(output=True) else: return self.constraint_mapping.view() def update_schema(self, schema_dict): """Update the schema Arguments: schema_dict {dict} -- dictionary of constraints or types """ self.constraint_mapping.update(schema_dict) def _update_uniqueness(self, record_dict): for k, v in self._parse_record(record_dict, yield_type=False): if k in self.constraint_mapping.preexisting.keys(): if v in self.constraint_mapping.preexisting[k]: raise NotUniqueException('Duplicate {} for {} exists'.format(v, k)) self.constraint_mapping.preexisting[k].add(v) def refresh(self, output=False): """Refresh the schema table Keyword Arguments: output {bool} -- if False, there will be no output, and maybe a little faster (default: {False}) Raises: NonUniformTypeException -- Type constraint failed NotNullException -- NotNull constraint failed NotUniqueException -- Unique constraint failed Returns: dict -- dictionary of constraints """ output_mapping = None if output: output_mapping = deepcopy(self.constraint_mapping) for record in self.all(): for k, v in self._parse_record(record, yield_type=True): expected_type = self.constraint_mapping.type_.get(k, None) if expected_type and v is not expected_type: if expected_type is str and v in (int, float): v = str else: raise NonUniformTypeException('{} type is not {}'.format(v, expected_type)) if output_mapping: type_list = output_mapping.type_.get(k, []) if isinstance(type_list, type): type_list = [type_list] if v not in type_list: type_list.append(v) output_mapping.type_[k] = type_list record = dict(self._parse_record(record, yield_type=False)) is_null = self.constraint_mapping.not_null - set(record.keys()) if len(is_null) > 0: raise NotNullException('{} is null'.format(list(is_null))) if output_mapping: for k, v in output_mapping.type_.items(): if isinstance(v, list) and len(v) == 1: output_mapping.type_[k] = v[0] return output_mapping.view() @staticmethod def _parse_record(record, yield_type): def _yield_switch(x): if yield_type: return type(x) else: if isinstance(x, (datetime, date)): return x.isoformat() else: return x for k, v in record.items(): if bool(int(os.getenv('TINYDB_DATETIME', '1'))): if isinstance(v, str): v = normalize_chars(v.strip()) if v.isdigit(): v = int(v) elif '.' in v and v.replace('.', '', 1).isdigit(): v = float(v) elif v in {'', '-'}: continue else: try: v = dateutil.parser.parse(v) except ValueError: pass if isinstance(v, date): v = datetime.combine(v, datetime.min.time()) yield k, _yield_switch(v) PK!`bYStinydb_constraint/util.pyimport unicodedata # import re from datetime import datetime import os import dateutil.parser # all_chars = (chr(i) for i in range(0x110000)) # control_chars = ''.join(c for c in all_chars if unicodedata.category(c) in {'Cc'}) # # control_char_re = re.compile('[%s]' % re.escape(control_chars)) # # # def remove_control_chars(s): # return control_char_re.sub('', s) def normalize_chars(s): return unicodedata.normalize("NFKD", s) def jsonify(d): for k, v in d.items(): if isinstance(v, datetime): yield k, v.isoformat() else: yield k, v def parse_record(record): for k, v in record.items(): if bool(int(os.getenv('TINYDB_DATETIME', '1'))): if isinstance(v, str): v = remove_control_chars(v.strip()) if v.isdigit(): v = int(v) elif '.' in v and v.replace('.', '', 1).isdigit(): v = float(v) elif v in {'', '-'}: continue else: try: v = dateutil.parser.parse(v) except ValueError: pass yield k, v PK! ::)tinydb_constraint-0.1.8.dist-info/LICENSEMIT License Copyright (c) 2018 Pacharapol Withayasakpunt Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H WX'tinydb_constraint-0.1.8.dist-info/WHEEL A н#Z."jm)Afb~ڠO68oF04UhoAf f4=4h0k::wXPK!H7\Z *tinydb_constraint-0.1.8.dist-info/METADATAVmo6_qC,*/:u07nc$Z-lZe6=Q,β[D=їܲY|%#8d NA iɽ=?(#e^V/ q2`2DW#5h(J.Tetimi0̄]Vs",1>Jݐ>.G1yC6ȠK#d1gawe^!y[O?fƹ'93F,G7"Zԝ+DLpy`20M?ѳt_'ӗ{ZO_mm\ϕCa,6H+^YC笏:GǺ;N^p:Cn-JΕ\ K~ wZKBr`R -oȢ>,R]"Sʺ[+F}vT̛rO__gY3W~B.9M8"d6 ɷP;''hy$'ZN؋jwB6;Ǥ6:J&?}$K 0 j|_FxAr7YR3({$>Q<<[ 7VUlύ֙L7 #o;~2Jv7"jd;PJFOEzalo+)Tf q¯[|!;H:twPI,7R٩ܟ=Hm?|VN`V\Zh4A,4cF2[!j״JV p f;YUmVŰZ% p,ΦAb2+!>sMa rj&>Cg<*_p ~ceWӳѢ՚aX+&wBn*Ia-8>$=ܲwheh, 7sWbB$LJt˛meXύJvռn*,\ daŐD%~LoQ%m.zgQǂZgޕeI;9 ŰAY^/" $[3v7G C7Zpc302'䍒E\sR(ͤ=n =8qڮ2 ԋvcwq\@g9kKC= rxu&jiش# ͠l8JٽW2,?V/Z?}b!J/SQөvn}ȪNZ@xf2vgƇ;I{`)<%Xyp4/PK!̧FFtinydb_constraint/__init__.pyPK!>tinydb_constraint/constraint.pyPK!)'Utinydb_constraint/exception.pyPK!CNI I utinydb_constraint/table.pyPK!`bYS'tinydb_constraint/util.pyPK! ::),tinydb_constraint-0.1.8.dist-info/LICENSEPK!H WX'n1tinydb_constraint-0.1.8.dist-info/WHEELPK!H7\Z * 2tinydb_constraint-0.1.8.dist-info/METADATAPK!H` ~m(6tinydb_constraint-0.1.8.dist-info/RECORDPK 8