PK!cdatafiles/__init__.py# pylint: disable=unused-import from dataclasses import field from . import converters from .decorators import auto, datafile from .managers import Missing from .models import Model PK!Sudatafiles/builders.pyimport dataclasses import log from .converters import map_type from .managers import Datafile from .meta import ModelMeta def build_datafile(obj, root=None) -> Datafile: try: return object.__getattribute__(obj, 'datafile') except AttributeError: log.debug(f"Building 'datafile' for {obj.__class__} object") m = getattr(obj, 'Meta', None) pattern = getattr(m, 'datafile_pattern', None) attrs = getattr(m, 'datafile_attrs', None) manual = getattr(m, 'datafile_manual', ModelMeta.datafile_manual) defaults = getattr(m, 'datafile_defaults', ModelMeta.datafile_defaults) auto_load = getattr(m, 'datafile_auto_load', ModelMeta.datafile_auto_load) auto_save = getattr(m, 'datafile_auto_save', ModelMeta.datafile_auto_save) auto_attr = getattr(m, 'datafile_auto_attr', ModelMeta.datafile_auto_attr) if attrs is None and dataclasses.is_dataclass(obj): attrs = {} log.debug(f'Mapping attributes for {obj.__class__} object') for field in dataclasses.fields(obj): self_name = f'self.{field.name}' if pattern is None or self_name not in pattern: attrs[field.name] = map_type(field.type, name=field.name) return Datafile( obj, attrs=attrs, pattern=pattern, manual=manual, defaults=defaults, auto_load=auto_load, auto_save=auto_save, auto_attr=auto_attr, root=root, ) PK! )Gp p datafiles/converters/__init__.pyimport dataclasses from inspect import isclass from typing import Any, Dict, Optional, Union import log from ..utils import cached from ._bases import Converter from .builtins import Boolean, Float, Integer, String from .containers import Dataclass, Dictionary, List from .extensions import * # pylint: disable=unused-wildcard-import _REGISTRY: Dict[Union[type, str], type] = {} def register(cls: type, converter: type): _REGISTRY[cls] = converter _REGISTRY[cls.__name__] = converter register(Integer.TYPE, Integer) register(Float.TYPE, Float) register(Boolean.TYPE, Boolean) register(String.TYPE, String) @cached def map_type(cls, *, name: str = '', item_cls: Optional[type] = None): """Infer the converter type from a dataclass, type, or annotation.""" if name: log.debug(f'Mapping {name!r} of {cls!r} to converter') else: log.debug(f'Mapping {cls!r} to converter') if cls in _REGISTRY: converter: Any = _REGISTRY[cls] log.debug(f'Mapped {cls!r} to existing converter: {converter}') return converter if dataclasses.is_dataclass(cls): converters = {} for field in dataclasses.fields(cls): converters[field.name] = map_type(field.type, name=field.name) converter = Dataclass.subclass(cls, converters) log.debug(f'Mapped {cls!r} to new converter: {converter}') return converter if hasattr(cls, '__origin__'): converter = None if cls.__origin__ == list: try: converter = map_type(item_cls or cls.__args__[0]) except TypeError as e: if '~T' in str(e): e = TypeError(f"Type is required with 'List' annotation") raise e from None else: converter = List.subclass(converter) elif cls.__origin__ == dict: if item_cls: key = map_type(str) value = map_type(item_cls) else: log.warn("Schema enforcement not possible with 'Dict' annotation") key = map_type(cls.__args__[0]) value = map_type(cls.__args__[1]) converter = Dictionary.subclass(key, value) elif cls.__origin__ == Union: converter = map_type(cls.__args__[0]) assert len(cls.__args__) == 2 assert cls.__args__[1] == type(None) converter = converter.as_optional() if converter: log.debug(f'Mapped {cls!r} to new converter: {converter}') return converter raise TypeError(f'Unsupported container type: {cls.__origin__}') if not isclass(cls): raise TypeError(f'Annotation is not a type: {cls!r}') if issubclass(cls, Converter): log.debug(f'Mapped {cls!r} to existing converter (itself)') return cls raise TypeError(f'Could not map type: {cls}') PK!hidatafiles/converters/_bases.py# pylint: disable=unused-argument from typing import Any class Converter: """Base class for immutable attribute conversion.""" TYPE: type = object DEFAULT: Any = None @classmethod def as_optional(cls): name = 'Optional' + cls.__name__ bases = (cls,) attributes = {'DEFAULT': None} return type(name, bases, attributes) @classmethod def to_python_value(cls, deserialized_data, *, target_object=None): return cls.to_preserialization_data(deserialized_data) @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): if python_value is None: return cls.DEFAULT if cls.TYPE is object: return python_value return cls.TYPE(python_value) PK!&CC datafiles/converters/builtins.py# pylint: disable=unused-argument import log from ._bases import Converter class Boolean(Converter): """Converter for `bool` literals.""" TYPE = bool DEFAULT = False _FALSY = {'false', 'f', 'no', 'n', 'disabled', 'off', '0'} @classmethod def to_python_value(cls, deserialized_data, *, target_object=None): if isinstance(deserialized_data, str): value = deserialized_data.lower() not in cls._FALSY else: value = cls.TYPE(deserialized_data) return value class Float(Converter): """Converter for `float` literals.""" TYPE = float DEFAULT = 0.0 class Integer(Converter): """Converter for `int` literals.""" TYPE = int DEFAULT = 0 @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): if python_value is None: return cls.DEFAULT try: return cls.TYPE(python_value) except ValueError as exc: try: data = cls.TYPE(float(python_value)) except ValueError: raise exc from None else: log.warn(f'Precision lost in conversion to int: {python_value}') return data class String(Converter): """Converter for `str` literals.""" TYPE = str DEFAULT = "" PK!GIY"datafiles/converters/containers.pyfrom collections.abc import Iterable from contextlib import suppress from dataclasses import _MISSING_TYPE as Missing from typing import Dict import log from ._bases import Converter class List(Converter): """Base converter for homogeneous lists of another converter.""" CONVERTER = None @classmethod def subclass(cls, converter: type): name = f'{converter.__name__}List' # type: ignore bases = (cls,) attributes = {'CONVERTER': converter} return type(name, bases, attributes) @classmethod def to_python_value(cls, deserialized_data, *, target_object): if target_object is None or target_object is Missing: value = [] # type: ignore else: value = target_object value.clear() convert = cls.CONVERTER.to_python_value if deserialized_data is None: pass elif isinstance(deserialized_data, Iterable) and all( (item is None for item in deserialized_data) ): pass elif isinstance(deserialized_data, str): for item in deserialized_data.split(','): value.append(convert(item)) else: try: items = iter(deserialized_data) except TypeError: value.append(convert(deserialized_data, target_object=None)) else: for item in items: value.append(convert(item, target_object=None)) return value @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): data = [] convert = cls.CONVERTER.to_preserialization_data if python_value is None: pass elif isinstance(python_value, Iterable): if isinstance(python_value, str): data.append(convert(python_value, default_to_skip=None)) elif isinstance(python_value, set): data.extend( sorted(convert(item, default_to_skip=None) for item in python_value) ) else: for item in python_value: data.append(convert(item, default_to_skip=None)) else: data.append(convert(python_value, default_to_skip=None)) if data == default_to_skip: data.clear() return data or [None] class Dictionary(Converter): """Base converter for raw dictionaries.""" @classmethod def subclass(cls, key: type, value: type): name = f'{key.__name__}{value.__name__}Dict' bases = (cls,) return type(name, bases, {}) @classmethod def to_python_value(cls, deserialized_data, *, target_object): if isinstance(deserialized_data, dict): data = deserialized_data.copy() else: data = {} if target_object is None or target_object is Missing: value = data else: value = target_object value.clear() value.update(data) return value @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): data = dict(python_value) if data == default_to_skip: data.clear() return data class Dataclass(Converter): """Base converter for dataclasses.""" DATACLASS = None CONVERTERS = None @classmethod def subclass(cls, dataclass, converters: Dict[str, type]): name = f'{dataclass.__name__}Converter' bases = (cls,) attributes = {'DATACLASS': dataclass, 'CONVERTERS': converters} return type(name, bases, attributes) @classmethod def to_python_value(cls, deserialized_data, *, target_object): if isinstance(deserialized_data, dict): data = deserialized_data.copy() else: data = {} for name, value in list(data.items()): if name not in cls.CONVERTERS: log.debug(f'Removed unmapped nested file attribute: {name}') data.pop(name) for name, converter in cls.CONVERTERS.items(): if name not in data: data[name] = converter.to_python_value(None) new_value = cls.DATACLASS(**data) # pylint: disable=not-callable if target_object is None: value = new_value else: value = target_object value.__dict__ = new_value.__dict__ return value @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): data = {} for name, converter in cls.CONVERTERS.items(): if isinstance(python_value, dict): try: value = python_value[name] except KeyError as e: log.debug(e) value = None else: try: value = getattr(python_value, name) except AttributeError as e: log.debug(e) value = None with suppress(AttributeError): if value == getattr(default_to_skip, name): log.debug(f"Skipped default value for '{name}' attribute") continue data[name] = converter.to_preserialization_data(value) return data PK!"datafiles/converters/extensions.py# pylint: disable=unused-argument from ruamel.yaml.scalarstring import LiteralScalarString from .builtins import Float, String class Number(Float): """Converter for integers or floats.""" DEFAULT = 0 @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): data = super().to_preserialization_data(python_value) if int(data) == data: return int(data) return data class Text(String): """Converter for multiline strings.""" DEFAULT = "" @classmethod def to_python_value(cls, deserialized_data, *, target_object=None): value = cls.to_preserialization_data(deserialized_data).strip() if '\n' in value: value = value + '\n' return value @classmethod def to_preserialization_data(cls, python_value, *, default_to_skip=None): data = super().to_preserialization_data(python_value).strip() if '\n' in data: return LiteralScalarString(data + '\n') return data PK!=datafiles/decorators.pyimport dataclasses from pathlib import Path from typing import Dict, Optional from .converters import Converter from .models import ModelMeta, create_model def datafile( pattern: str, attrs: Optional[Dict[str, Converter]] = None, manual: bool = ModelMeta.datafile_manual, defaults: bool = ModelMeta.datafile_defaults, auto_load: bool = ModelMeta.datafile_auto_load, auto_save: bool = ModelMeta.datafile_auto_save, auto_attr: bool = ModelMeta.datafile_auto_attr, ): """Synchronize a data class to the specified path.""" def decorator(cls): if dataclasses.is_dataclass(cls): dataclass = cls else: dataclass = dataclasses.dataclass(cls) return create_model( dataclass, attrs=attrs, pattern=pattern, manual=manual, defaults=defaults, auto_load=auto_load, auto_save=auto_save, auto_attr=auto_attr, ) return decorator def auto(filename: str, **kwargs): kwargs['auto_attr'] = True path = Path.cwd() / filename cls = type(path.stem.strip('.'), (), {}) return datafile(str(path), **kwargs)(cls)() PK!Y  datafiles/formats.pyimport json from abc import ABCMeta, abstractmethod from pathlib import Path from typing import IO, Any, Dict, List import tomlkit from ruamel import yaml class Formatter(metaclass=ABCMeta): """Base class for object serialization and text deserialization.""" @classmethod @abstractmethod def extensions(cls) -> List[str]: raise NotImplementedError @classmethod @abstractmethod def deserialize(cls, file_object: IO[Any]) -> Dict: raise NotImplementedError @classmethod @abstractmethod def serialize(cls, data: Dict) -> str: raise NotImplementedError class JSON(Formatter): """Formatter for JavaScript Object Notation.""" @classmethod def extensions(cls): return {'.json'} @classmethod def deserialize(cls, file_object): return json.load(file_object) or {} @classmethod def serialize(cls, data): return json.dumps(data, indent=2) class TOML(Formatter): """Formatter for (round-trip) Tom's Obvious Minimal Language.""" @classmethod def extensions(cls): return {'.toml'} @classmethod def deserialize(cls, file_object): return tomlkit.loads(file_object.read()) or {} @classmethod def serialize(cls, data): return tomlkit.dumps(data) class YAML(Formatter): """Formatter for (safe, round-trip) YAML Ain't Markup Language.""" @classmethod def extensions(cls): return {'.yml', '.yaml'} @classmethod def deserialize(cls, file_object): return yaml.YAML(typ='rt').load(file_object) or {} @classmethod def serialize(cls, data): text = yaml.round_trip_dump(data) return "" if text == "{}\n" else text def deserialize(path: Path, extension: str) -> Dict: for formatter in Formatter.__subclasses__(): if extension in formatter.extensions(): with path.open('r') as file_object: return formatter.deserialize(file_object) raise ValueError(f'Unsupported file extension: {extension}') def serialize(data: Dict, extension: str = '.yml') -> str: for formatter in Formatter.__subclasses__(): if extension in formatter.extensions(): return formatter.serialize(data) raise ValueError(f'Unsupported file extension: {extension!r}') PK!4b@datafiles/hooks.pyimport dataclasses from contextlib import contextmanager, suppress from functools import wraps import log from . import settings from .builders import build_datafile LOAD_BEFORE_METHODS = ['__getattribute__', '__getitem__', '__iter__'] SAVE_AFTER_METHODS = [ '__setattr__', '__setitem__', '__delitem__', 'append', 'extend', 'insert', 'remove', 'pop', 'clear', 'sort', 'reverse', 'popitem', 'update', ] FLAG = '_patched' class List(list): """Patchable `list` type.""" class Dict(dict): """Patchable `dict` type.""" def apply(instance, datafile): """Path methods that get or set attributes.""" cls = instance.__class__ log.debug(f'Patching methods on {cls}') for method_name in LOAD_BEFORE_METHODS: with suppress(AttributeError): method = getattr(cls, method_name) modified_method = load_before(cls, method) setattr(cls, method_name, modified_method) for method_name in SAVE_AFTER_METHODS: with suppress(AttributeError): method = getattr(cls, method_name) modified_method = save_after(cls, method) setattr(cls, method_name, modified_method) if dataclasses.is_dataclass(instance): for attr_name in instance.datafile.attrs: attr = getattr(instance, attr_name) if not dataclasses.is_dataclass(attr): # pylint: disable=unidiomatic-typecheck if type(attr) == list: attr = List(attr) setattr(instance, attr_name, attr) elif type(attr) == dict: attr = Dict(attr) setattr(instance, attr_name, attr) else: continue attr.datafile = build_datafile(attr, root=datafile) apply(attr, datafile) def load_before(cls, method): """Decorate methods that should load before call.""" if hasattr(method, FLAG): return method @wraps(method) def wrapped(self, *args, **kwargs): __tracebackhide__ = settings.HIDE_TRACEBACK_IN_HOOKS datafile = get_datafile(self) if enabled(datafile, args): if datafile.exists and datafile.modified: log.debug(f"Loading automatically before '{method.__name__}' call") datafile.load() if datafile.auto_save: log.debug("Saving automatically after load") datafile.save(_log=False) return method(self, *args, **kwargs) log.debug(f'Patched method to load before call: {cls.__name__}.{method.__name__}') setattr(wrapped, FLAG, True) return wrapped def save_after(cls, method): """Decorate methods that should save after call.""" if hasattr(method, FLAG): return method @wraps(method) def wrapped(self, *args, **kwargs): __tracebackhide__ = settings.HIDE_TRACEBACK_IN_HOOKS datafile = get_datafile(self) if enabled(datafile, args): if datafile.exists and datafile.modified: log.debug(f"Loading automatically before '{method.__name__}' call") datafile.load() result = method(self, *args, **kwargs) if enabled(datafile, args): log.debug(f"Saving automatically after '{method.__name__}' call") datafile.save() if datafile.auto_load: log.debug("Loading automatically after save") datafile.load(_log=False) return result log.debug(f'Patched method to save after call: {cls.__name__}.{method.__name__}') setattr(wrapped, FLAG, True) return wrapped def get_datafile(obj): try: return object.__getattribute__(obj, 'datafile') except AttributeError: return None def enabled(datafile, args) -> bool: """Determine if hooks are enabled for the current method.""" if not settings.HOOKS_ENABLED: return False if datafile is None: return False if datafile.manual: return False if args and isinstance(args[0], str): if args[0] in {'Meta', 'datafile'}: return False if args[0].startswith('_'): return False return True @contextmanager def disabled(): """Globally disable method hooks, temporarily.""" if settings.HOOKS_ENABLED: settings.HOOKS_ENABLED = False yield settings.HOOKS_ENABLED = True else: yield PK!H5/5/datafiles/managers.pyfrom __future__ import annotations import dataclasses import inspect import os from pathlib import Path from typing import Any, Dict, Optional import log from cached_property import cached_property from . import formats, hooks from .converters import Converter, List, map_type from .utils import prettify, recursive_update Trilean = Optional[bool] Missing = dataclasses._MISSING_TYPE # pylint: disable=protected-access class Manager: def __init__(self, cls): self.model = cls def all(self): raise NotImplementedError class Datafile: def __init__( self, instance: Any, *, attrs: Dict, pattern: Optional[str], manual: bool, defaults: bool, auto_load: bool, auto_save: bool, auto_attr: bool, root: Optional[Datafile] = None, ) -> None: assert manual is not None assert defaults is not None self._instance = instance self.attrs = attrs self._pattern = pattern self._manual = manual self.defaults = defaults self._auto_load = auto_load self._auto_save = auto_save self._auto_attr = auto_attr self._last_load = 0.0 self._last_data: Dict = {} self._root = root @property def classname(self) -> str: return self._instance.__class__.__name__ @cached_property def path(self) -> Optional[Path]: if not self._pattern: return None cls = self._instance.__class__ try: root = Path(inspect.getfile(cls)).parent except TypeError: # pragma: no cover level = log.DEBUG if '__main__' in str(cls) else log.WARNING log.log(level, f'Unable to determine module for {cls}') root = Path.cwd() relpath = self._pattern.format(self=self._instance) return (root / relpath).resolve() @property def relpath(self) -> Path: return Path(os.path.relpath(self.path, Path.cwd())) @property def exists(self) -> bool: if self.path: return self.path.exists() return False @property def modified(self) -> bool: if self.path: return self._last_load != self.path.stat().st_mtime return True @modified.setter def modified(self, modified: bool): if modified: self._last_load = 0.0 else: assert self.path, 'Cannot mark a missing file as unmodified' self._last_load = self.path.stat().st_mtime @property def manual(self) -> bool: return self._root.manual if self._root else self._manual @property def auto_load(self) -> bool: return self._root.auto_load if self._root else self._auto_load @property def auto_save(self) -> bool: return self._root.auto_save if self._root else self._auto_save @property def auto_attr(self) -> bool: return self._root.auto_attr if self._root else self._auto_attr @property def data(self) -> Dict: return self._get_data() def _get_data(self, include_default_values: Trilean = None) -> Dict: log.debug(f'Preserializing object to data: {self._instance!r}') if include_default_values is None: include_default_values = self.defaults if self.auto_attr: data = recursive_update(self._last_data, self._instance.__dict__) else: data = recursive_update(self._last_data, dataclasses.asdict(self._instance)) for name in list(data.keys()): if name not in self.attrs: log.debug(f'Removed unmapped attribute: {name}') data.pop(name) for name, converter in self.attrs.items(): value = data[name] if getattr(converter, 'DATACLASS', None): log.debug(f"Converting '{name}' dataclass with {converter}") if value is None: value = {} for field in dataclasses.fields(converter.DATACLASS): if field.name not in value: log.debug(f'Added missing nested attribute: {field.name}') value[field.name] = None data[name] = converter.to_preserialization_data( value, default_to_skip=Missing if include_default_values else self._get_default_field_value(name), ) elif ( value == self._get_default_field_value(name) and not include_default_values ): log.debug(f"Skipped default value for '{name}' attribute") data.pop(name) else: log.debug(f"Converting '{name}' value with {converter}: {value!r}") data[name] = converter.to_preserialization_data(value) log.debug(f'Preserialized object data: {data}') return data @property def text(self) -> str: return self._get_text() def _get_text(self, **kwargs) -> str: data = self._get_data(**kwargs) if self.path and self.path.suffix: return formats.serialize(data, self.path.suffix) return formats.serialize(data) @text.setter # type: ignore def text(self, value: str): self._write(value.strip() + '\n') def load(self, *, _log=True, _first=False) -> None: if self._root: self._root.load(_log=_log, _first=_first) return if self.path: if _log: log.info(f"Loading '{self.classname}' object from '{self.relpath}'") else: raise RuntimeError("'pattern' must be set to load the model") data = formats.deserialize(self.path, self.path.suffix) self._last_data = data message = f'Data from file: {self.path}' log.debug(message) log.debug('=' * len(message) + '\n\n' + prettify(data) + '\n') with hooks.disabled(): for name, value in data.items(): if name not in self.attrs and self.auto_attr: cls: Any = type(value) if issubclass(cls, list): cls.__origin__ = list if value: item_cls = type(value[0]) for item in value: if not isinstance(item, item_cls): log.warn(f'{name!r} list type cannot be inferred') item_cls = Converter break else: log.warn(f'{name!r} list type cannot be inferred') item_cls = Converter log.debug(f'Inferring {name!r} type: {cls} of {item_cls}') self.attrs[name] = map_type(cls, name=name, item_cls=item_cls) elif issubclass(cls, dict): cls.__origin__ = dict log.debug(f'Inferring {name!r} type: {cls}') self.attrs[name] = map_type(cls, name=name, item_cls=Converter) else: log.debug(f'Inferring {name!r} type: {cls}') self.attrs[name] = map_type(cls, name=name) for name, converter in self.attrs.items(): log.debug(f"Converting '{name}' data with {converter}") if getattr(converter, 'DATACLASS', None): self._set_dataclass_value(data, name, converter) else: self._set_attribute_value(data, name, converter, _first) hooks.apply(self._instance, self) self.modified = False def _set_dataclass_value(self, data, name, converter): # TODO: Support nesting unlimited levels # https://github.com/jacebrowning/datafiles/issues/22 nested_data = data.get(name) if nested_data is None: return log.debug(f'Converting nested data to Python: {nested_data}') dataclass = getattr(self._instance, name) if dataclass is None: for field in dataclasses.fields(converter.DATACLASS): if field.name not in nested_data: # type: ignore nested_data[field.name] = None # type: ignore dataclass = converter.to_python_value(nested_data, target_object=dataclass) # TODO: Find a way to avoid this circular import try: datafile = dataclass.datafile except AttributeError: from .builders import build_datafile log.warn(f"{dataclass} has not yet been patched to have 'datafile'") datafile = build_datafile(dataclass) for name2, converter2 in datafile.attrs.items(): _value = nested_data.get( # type: ignore # pylint: disable=protected-access name2, datafile._get_default_field_value(name2), ) value = converter2.to_python_value( _value, target_object=getattr(dataclass, name2) ) log.debug(f"'{name2}' as Python: {value!r}") setattr(dataclass, name2, value) log.debug(f"Setting '{name}' value: {dataclass!r}") setattr(self._instance, name, dataclass) def _set_attribute_value(self, data, name, converter, first_load): file_value = data.get(name, Missing) init_value = getattr(self._instance, name, Missing) default_value = self._get_default_field_value(name) if first_load: log.debug( 'Initial load values: file=%r, init=%r, default=%r', file_value, init_value, default_value, ) if init_value != default_value and not issubclass(converter, List): log.debug(f"Keeping non-default '{name}' init value: {init_value!r}") return if file_value is Missing: if default_value is Missing: value = converter.to_python_value(None, target_object=init_value) else: value = converter.to_python_value( default_value, target_object=init_value ) else: value = converter.to_python_value(file_value, target_object=init_value) log.debug(f"Setting '{name}' value: {value!r}") setattr(self._instance, name, value) def _get_default_field_value(self, name): for field in dataclasses.fields(self._instance): if field.name == name: if not isinstance(field.default, Missing): return field.default if not isinstance(field.default_factory, Missing): # type: ignore return field.default_factory() # type: ignore if not field.init and hasattr(self._instance, '__post_init__'): return getattr(self._instance, name) return Missing def save(self, *, include_default_values: Trilean = None, _log=True) -> None: if self._root: self._root.save(include_default_values=include_default_values, _log=_log) return if self.path: if _log: log.info(f"Saving '{self.classname}' object to '{self.relpath}'") else: raise RuntimeError(f"'pattern' must be set to save the model") with hooks.disabled(): text = self._get_text(include_default_values=include_default_values) self._write(text) self.modified = False def _write(self, text: str): message = f'Writing file: {self.path}' log.debug(message) log.debug('=' * len(message) + '\n\n' + (text or '\n')) self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text(text) PK!=QEdatafiles/meta.pyfrom dataclasses import dataclass from typing import Dict, Optional from .converters import Converter @dataclass class ModelMeta: datafile_attrs: Optional[Dict[str, Converter]] = None datafile_pattern: Optional[str] = None datafile_manual: bool = False datafile_defaults: bool = False datafile_auto_load: bool = True datafile_auto_save: bool = True datafile_auto_attr: bool = False PK!:hA A datafiles/models.pyimport dataclasses import log from classproperties import classproperty from . import hooks from .builders import build_datafile from .managers import Manager from .meta import ModelMeta class Model: Meta: ModelMeta = ModelMeta() def __post_init__(self): with hooks.disabled(): log.debug(f'Initializing {self.__class__} object') self.datafile = build_datafile(self) path = self.datafile.path exists = self.datafile.exists if path: log.debug(f'Datafile path: {path}') log.debug(f'Datafile exists: {exists}') if exists: self.datafile.load(_first=True) elif path: self.datafile.save() hooks.apply(self, self.datafile) log.debug(f'Initialized {self.__class__} object') @classproperty def datafiles(cls) -> Manager: # pylint: disable=no-self-argument return Manager(cls) def create_model( cls, *, attrs=None, pattern=None, manual=None, defaults=None, auto_load=None, auto_save=None, auto_attr=None, ): """Patch datafile attributes on to an existing dataclass.""" log.debug(f'Converting {cls} to a datafile model') if not dataclasses.is_dataclass(cls): raise ValueError(f'{cls} must be a dataclass') # Patch Meta m = getattr(cls, 'Meta', ModelMeta()) if attrs is not None: m.datafile_attrs = attrs if pattern is not None: m.datafile_pattern = pattern if not hasattr(cls, 'Meta') and manual is not None: m.datafile_manual = manual if not hasattr(cls, 'Meta') and defaults is not None: m.datafile_defaults = defaults if not hasattr(cls, 'Meta') and auto_load is not None: m.datafile_auto_load = auto_load if not hasattr(cls, 'Meta') and auto_save is not None: m.datafile_auto_save = auto_save if not hasattr(cls, 'Meta') and auto_attr is not None: m.datafile_auto_attr = auto_attr cls.Meta = m # Patch __init__ init = cls.__init__ def modified_init(self, *args, **kwargs): with hooks.disabled(): init(self, *args, **kwargs) Model.__post_init__(self) cls.__init__ = modified_init cls.__init__.__doc__ = init.__doc__ return cls PK!55datafiles/settings.pyHOOKS_ENABLED = True HIDE_TRACEBACK_IN_HOOKS = True PK!ޙPb""datafiles/tests/__init__.py"""Unit tests for the package.""" PK!C^^ datafiles/tests/test_builders.py# pylint: disable=unused-variable from datafiles.builders import build_datafile def describe_build_datafile(): def it_reuses_existing_datafile(mocker, expect): obj = mocker.Mock() datafile = mocker.Mock() obj.datafile = datafile new_datafile = build_datafile(obj) expect(new_datafile) == obj.datafile PK!r**"datafiles/tests/test_converters.py# pylint: disable=unused-variable from dataclasses import dataclass from typing import ByteString, Dict, List, Optional import pytest from ruamel.yaml.scalarstring import LiteralScalarString from datafiles import converters @dataclass class MyDataclass: foobar: int flag: bool = False class MyNonDataclass: pass class MyNonDataclass2: pass IntegerList = converters.List.subclass(converters.Integer) StringList = converters.List.subclass(converters.String) MyDict = converters.Dictionary.subclass(converters.String, converters.Integer) MyDataclassConverter = converters.map_type(MyDataclass) MyDataclassConverterList = converters.map_type(List[MyDataclass]) def describe_map_type(): def it_handles_extended_types(expect): converter = converters.map_type(converters.Number) expect(converter.__name__) == 'Number' def it_handles_list_annotations(expect): converter = converters.map_type(List[str]) expect(converter.__name__) == 'StringList' expect(converter.CONVERTER) == converters.String def it_handles_list_annotations_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) expect(converter.__name__) == 'MyDataclassConverterList' expect(converter.CONVERTER.__name__) == 'MyDataclassConverter' def it_requires_list_annotations_to_have_a_type(expect): with expect.raises(TypeError, "Type is required with 'List' annotation"): converters.map_type(List) def it_handles_dict_annotations(expect): converter = converters.map_type(Dict[str, int]) expect(converter.__name__) == 'StringIntegerDict' def it_handles_dataclasses(expect): converter = converters.map_type(MyDataclass) expect(converter.__name__) == 'MyDataclassConverter' expect(converter.CONVERTERS) == { 'foobar': converters.Integer, 'flag': converters.Boolean, } def it_handles_optionals(expect): converter = converters.map_type(Optional[str]) expect(converter.__name__) == 'OptionalString' expect(converter.TYPE) == str expect(converter.DEFAULT) == None def it_handles_string_type_annotations(expect): converter = converters.map_type('float') expect(converter.TYPE) == float def it_rejects_unknown_types(expect): with expect.raises( TypeError, "Could not map type: ", ): converters.map_type(MyNonDataclass) def it_rejects_non_types(expect): with expect.raises(TypeError, "Annotation is not a type: 'foobar'"): converters.map_type("foobar") def it_rejects_unhandled_type_annotations(expect): with expect.raises( TypeError, "Unsupported container type: ", ): converters.map_type(ByteString) def describe_converter(): def describe_to_python_value(): @pytest.mark.parametrize( 'converter, data, value', [ (converters.Boolean, '1', True), (converters.Boolean, '0', False), (converters.Boolean, 'enabled', True), (converters.Boolean, 'disabled', False), (converters.Boolean, 'T', True), (converters.Boolean, 'F', False), (converters.Boolean, 'true', True), (converters.Boolean, 'false', False), (converters.Boolean, 'Y', True), (converters.Boolean, 'N', False), (converters.Boolean, 'yes', True), (converters.Boolean, 'no', False), (converters.Boolean, 'on', True), (converters.Boolean, 'off', False), (converters.Boolean, 0, False), (converters.Boolean, 1, True), (converters.Float, 4, 4.0), (converters.Integer, 4.2, 4), (converters.String, 4.2, '4.2'), (converters.String, 42, '42'), (converters.String, True, 'True'), (converters.String, False, 'False'), ], ) def when_immutable(expect, converter, data, value): expect(converter.to_python_value(data)) == value @pytest.mark.parametrize( 'converter, data, value', [ (IntegerList, [], []), (IntegerList, '1, 2.3', [1, 2]), (IntegerList, '42', [42]), (IntegerList, 42, [42]), (IntegerList, None, []), (IntegerList, [42], [42]), (IntegerList, [None], []), (IntegerList, [None, None], []), (MyDict, None, {}), (MyDict, {}, {}), (MyDict, {'a': 1}, {'a': 1}), (MyDataclassConverter, None, MyDataclass(foobar=0)), (MyDataclassConverterList, None, []), (MyDataclassConverterList, 42, [MyDataclass(foobar=0)]), ], ) def when_mutable(expect, converter, data, value): expect(converter.to_python_value(data, target_object=None)) == value def when_number(expect): convert = converters.Number.to_python_value expect(convert(1.23)).isinstance(float) expect(convert(42)).isinstance(int) def when_text(expect): convert = converters.Text.to_python_value expect(convert("")) == "" expect(convert("Hello, world!")) == "Hello, world!" expect(convert("Line 1\nLine 2\n")) == "Line 1\nLine 2\n" def when_invalid(expect): message = "invalid literal for int() with base 10: 'a'" with expect.raises(ValueError, message): converters.Integer.to_python_value('a') def when_list_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) data = [{'foobar': 1}, {'foobar': 2}, {'foobar': 3}] value = [MyDataclass(1), MyDataclass(2), MyDataclass(3)] expect(converter.to_python_value(data, target_object=None)) == value def with_existing_list(expect): orginal = [1, 2] value = IntegerList.to_python_value("3, 4", target_object=orginal) expect(value) == [3, 4] expect(id(value)) == id(orginal) def when_existing_dict(expect): orginal = {'a': 1} value = MyDict.to_python_value({'b': 2}, target_object=orginal) expect(value) == {'b': 2} expect(id(value)) == id(orginal) def with_existing_dataclass(expect): orginal = MyDataclass(foobar=1) value = MyDataclassConverter.to_python_value( {'foobar': 2}, target_object=orginal ) expect(value) == MyDataclass(foobar=2) expect(id(value)) == id(orginal) def describe_to_preserialization_data(): @pytest.mark.parametrize( 'converter, value, data', [ # Builtins (converters.Boolean, None, False), (converters.Float, None, 0.0), (converters.Integer, None, 0), (converters.String, None, ''), # Lists (StringList, 'ab', ['ab']), (StringList, ('b', 1, 'A'), ['b', '1', 'A']), (StringList, {'b', 1, 'A'}, ['1', 'A', 'b']), (StringList, 42, ['42']), (StringList, [123, True, False], ['123', 'True', 'False']), (StringList, [], [None]), (StringList, None, [None]), # Dataclasses (MyDataclassConverter, None, {'foobar': 0, 'flag': False}), (MyDataclassConverter, {'foobar': 42}, {'foobar': 42, 'flag': False}), (MyDataclassConverterList, None, [None]), (MyDataclassConverterList, 42, [{'foobar': 0, 'flag': False}]), ], ) def when_nominal(expect, converter, value, data): expect(converter.to_preserialization_data(value)) == data def when_number(expect): convert = converters.Number.to_preserialization_data expect(convert(1.23)).isinstance(float) expect(convert(42)).isinstance(int) def when_text(expect): convert = converters.Text.to_preserialization_data expect(convert("")) == "" expect(convert("Hello, world!")) == "Hello, world!" expect(convert("Line 1\nLine 2")) == "Line 1\nLine 2\n" expect(convert("Line 1\nLine 2")).isinstance(LiteralScalarString) def when_invalid(expect): message = "invalid literal for int() with base 10: 'a'" with expect.raises(ValueError, message): converters.Integer.to_preserialization_data('a') def when_list_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) value = [MyDataclass(1), MyDataclass(2), MyDataclass(3)] data = [ {'foobar': 1, 'flag': False}, {'foobar': 2, 'flag': False}, {'foobar': 3, 'flag': False}, ] expect(converter.to_preserialization_data(value)) == data expect(converter.to_preserialization_data(data)) == data def when_list_with_default(expect): data = IntegerList.to_preserialization_data([1], default_to_skip=[1]) expect(data) == [None] data = IntegerList.to_preserialization_data([2], default_to_skip=[1]) expect(data) == [2] def when_dict_with_default(expect): data = MyDict.to_preserialization_data({'a': 1}, default_to_skip={'a': 1}) expect(data) == {} data = MyDict.to_preserialization_data({'b': 2}, default_to_skip={'a': 1}) expect(data) == {'b': 2} def when_dataclass_with_default(expect): data = MyDataclassConverter.to_preserialization_data( MyDataclass(1), default_to_skip=MyDataclass(1) ) expect(data) == {} data = MyDataclassConverter.to_preserialization_data( MyDataclass(2), default_to_skip=MyDataclass(1) ) expect(data) == {'foobar': 2} data = MyDataclassConverter.to_preserialization_data( MyDataclass(1, flag=True), default_to_skip=MyDataclass(1) ) expect(data) == {'flag': True} def describe_register(): def with_new_type(expect): converters.register(MyNonDataclass2, converters.String) converter = converters.map_type(MyNonDataclass2) expect(converter) == converters.String PK!;"datafiles/tests/test_decorators.py# pylint: disable=unused-variable from dataclasses import dataclass, is_dataclass from datafiles import decorators def describe_datafile(): def it_turns_normal_class_into_dataclass(expect): class Normal: pass cls = decorators.datafile("")(Normal) expect(is_dataclass(cls)) == True def it_can_reuse_existing_dataclass(expect): @dataclass class Existing: pass cls = decorators.datafile("")(Existing) expect(id(cls)) == id(Existing) PK!5datafiles/tests/test_formats.py# pylint: disable=unused-variable import pytest from datafiles import formats def describe_deserialize(): @pytest.fixture def path(tmp_path): path = tmp_path / "sample" path.write_text("") return path def with_empty_yaml_file(expect, path): data = formats.deserialize(path, '.yaml') expect(data) == {} def with_empty_json_file(expect, path): path.write_text("{}") data = formats.deserialize(path, '.json') expect(data) == {} def with_empty_toml_file(expect, path): data = formats.deserialize(path, '.toml') expect(data) == {} def with_unknown_extension(expect, path): with expect.raises(ValueError): formats.deserialize(path, '.xyz') PK!ccdatafiles/tests/test_hooks.py# pylint: disable=unused-variable from datafiles import hooks, settings class Sample: foobar = 1 def describe_apply(): def it_can_be_called_twice(mocker): instance = Sample() setattr(instance, 'datafile', mocker.Mock()) hooks.apply(instance, None) hooks.apply(instance, None) def describe_disabled(): def when_nested(expect): expect(settings.HOOKS_ENABLED) == True with hooks.disabled(): expect(settings.HOOKS_ENABLED) == False with hooks.disabled(): expect(settings.HOOKS_ENABLED) == False expect(settings.HOOKS_ENABLED) == False expect(settings.HOOKS_ENABLED) == True PK!ԍD D datafiles/tests/test_managers.py# pylint: disable=unused-variable,protected-access from dataclasses import dataclass from pathlib import Path import pytest from datafiles import managers from datafiles.models import ModelMeta @dataclass class MyModel: foobar: int class MyField: @classmethod def to_preserialization_data(cls, python_value): return python_value def describe_instance_manager(): @pytest.fixture def manager(): return managers.Datafile( instance=MyModel(foobar=42), attrs={}, pattern=None, manual=ModelMeta.datafile_manual, defaults=ModelMeta.datafile_defaults, auto_load=ModelMeta.datafile_auto_load, auto_save=ModelMeta.datafile_auto_save, auto_attr=ModelMeta.datafile_auto_attr, ) def describe_path(): def is_none_when_no_pattern(expect, manager): expect(manager.path) == None def is_absolute_based_on_the_file(expect, manager): manager._pattern = '../../tmp/sample.yml' root = Path(__file__).parents[2] expect(manager.path) == root / 'tmp' / 'sample.yml' def describe_relpath(): def when_cwd_is_parent(expect, manager): manager._pattern = '../../tmp/sample.yml' expect(manager.relpath) == Path('tmp', 'sample.yml') def when_cwd_is_sibling(expect, manager): manager._pattern = '../../../tmp/sample.yml' expect(manager.relpath) == Path('..', 'tmp', 'sample.yml') def describe_text(): def is_blank_when_no_attrs(expect, manager): expect(manager.text) == "" def is_yaml_by_default(expect, manager): manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar: 42\n" def with_json_format(expect, manager): manager._pattern = '_.json' manager.attrs = {'foobar': MyField} expect(manager.text) == '{\n "foobar": 42\n}' def with_toml_format(expect, manager): manager._pattern = '_.toml' manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar = 42\n" def with_no_format(expect, manager): manager._pattern = '_' manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar: 42\n" def with_unknown_format(expect, manager): manager._pattern = '_.xyz' manager.attrs = {'foobar': MyField} with expect.raises(ValueError): print(manager.text) def describe_load(): def it_requires_path(expect, manager): with expect.raises(RuntimeError): manager.load() def describe_save(): def it_requires_path(expect, manager): with expect.raises(RuntimeError): manager.save() PK!GGdatafiles/tests/test_models.py# pylint: disable=unused-variable from datafiles.models import create_model def describe_create_model(): def it_requires_dataclass(expect): class NonDataclass: pass with expect.raises(ValueError): create_model(NonDataclass) PK!/ssdatafiles/tests/test_utils.py# pylint: disable=unused-variable from datafiles.utils import recursive_update def describe_recursive_update(): def it_preserves_root_id(expect): old = {} # type: ignore new = {'a': 1} id_ = id(old) old = recursive_update(old, new) expect(old) == new expect(id(old)) == id_ def it_preserves_nested_dict_id(expect): old = {'a': {'b': 1}} new = {'a': {'b': 2}} id_ = id(old['a']) old = recursive_update(old, new) expect(old) == new expect(id(old['a'])) == id_ def it_preserves_nested_list_id(expect): old = {'a': [1]} new = {'a': [2]} id_ = id(old['a']) old = recursive_update(old, new) expect(old) == new expect(id(old['a'])) == id_ def it_adds_missing_dict(expect): old = {} # type: ignore new = {'a': {'b': 2}} old = recursive_update(old, new) expect(old) == new def it_adds_missing_list(expect): old = {} # type: ignore new = {'a': [1]} old = recursive_update(old, new) expect(old) == new PK!w%dgdatafiles/utils.pyfrom contextlib import suppress from functools import lru_cache from pprint import pformat from typing import Any, Dict cached = lru_cache() def prettify(data: Dict) -> str: return pformat(dictify(data)) def dictify(value: Any) -> Dict: with suppress(AttributeError): return {k: dictify(v) for k, v in value.items()} if isinstance(value, str): return value with suppress(TypeError): return [dictify(x) for x in value] return value def recursive_update(old: Dict, new: Dict): """Recursively update a dictionary.""" for key, value in new.items(): if isinstance(value, dict): if key in old: recursive_update(old[key], value) else: old[key] = value elif isinstance(value, list): if key in old: old[key][:] = value else: old[key] = value else: old[key] = value return old PK!Y@@"datafiles-0.2.dist-info/LICENSE.md**The MIT License (MIT)** Copyright © 2018, Jace Browning Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUdatafiles-0.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H-  datafiles-0.2.dist-info/METADATAWms_j$/w /TW}t<;$ |L&%Q\R}%p_]>:itG!KO$G.Kn φD׷W056;'\iJ1 Mͽ\USYZY}q)s^ٿDdlᲮt5pJ/#7rnӫKvZ繀3kԳt(J.U?I%)٭VM8nMr J/B(SB{C絃,703X{ Li]HR oȶ 1Vf!A*й .՘vAϔt ktI3ke Q>7oY^5DOи7'nE7%?zXb+g|.OXLo`dt;J/dr+H 9L%cHRYyDr(Ej*ixZo7ߒo{SG>~~Ē2yMjU%Vc&w4ʷ2NzcLy=H r++r8<7T~x-"a<\,28JY`6rrA"/m=tlw V5ܗ~=NrT-ATfBsx1(*B8.6UCK =Ђ[)4x 3Ժz+TX.b$Bs] ]\`S:9Q" 1p|iVmsU"D@[eB]apKBiCN{ÿt:%n.*ҧ8U|"ԎI'<NIb(cFUhB{X"`|4`Z/Q} EcN $L)C]sʂHG dIl>bzh 9ԕ1O 4U3JKb9 PH={ ̭34n5츈%;NUrHM F7>(Mk',|0^Ly\Dr⦺5RZK\;FO"Ŕ ؙyoh\X7.k`7ڸ i#;a G,;0nX i=ߌ*eko! C-<mәw"j4X\{23d,$T!HV.xW;'~-ٚMNP|E-W;\dCu>Lk 3dwB2vpTiPK!Hodatafiles-0.2.dist-info/RECORDuɖH}= d3^( ȴ < >}[lJ7V߹G܀(AWM|[1 0/=7e1flyug8"sR? +a`CoD <B jm3xyRR!'I)/T tH79jOB1{0ۼƽdqR91KcIܖQDwCtmD{@l݉U:T=`'cPjAƽRp}]ѬBq~^A7?GЅx#-zs*T_8Db4HnafV2hrjvrkk3-TE6_!GqhDz*Yv]e>$ygylR BuP%C?}*HGS&q8 +$BӵsAK!ŷr.]O" (j=r ½U#&iQ{|$+f(YXSi :̍ DSV*a- zC'{n^5 B