PK!^[datafiles/__init__.py__version__ = '0.0.0' from dataclasses import field from . import converters from .decorators import datafile from .models import Model PK!H**datafiles/converters.pyimport dataclasses from abc import ABCMeta, abstractmethod from collections.abc import Iterable from typing import Any, Dict, Union import log from ruamel.yaml.scalarstring import LiteralScalarString from .utils import Missing, cached class Converter(metaclass=ABCMeta): """Base class for attribute conversion.""" TYPE: Any = None DEFAULT: Any = None @classmethod def as_optional(cls): name = 'Optional' + cls.__name__ return type(name, (cls,), {'DEFAULT': None}) @classmethod @abstractmethod def to_python_value(cls, deserialized_data, *, target): raise NotImplementedError @classmethod @abstractmethod def to_preserialization_data(cls, python_value, *, skip=Missing): raise NotImplementedError class Boolean(Converter): """Converter for `bool` literals.""" TYPE = bool DEFAULT = False _FALSY = {'false', 'f', 'no', 'n', 'disabled', 'off', '0'} # pylint: disable=unused-argument @classmethod def to_python_value(cls, deserialized_data, *, target=None): if isinstance(deserialized_data, str): value = deserialized_data.lower() not in cls._FALSY else: value = cls.TYPE(deserialized_data) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): if python_value is None: return cls.DEFAULT return cls.TYPE(python_value) class Float(Converter): """Converter for `float` literals.""" TYPE = float DEFAULT = 0.0 # pylint: disable=unused-argument @classmethod def to_python_value(cls, deserialized_data, *, target=None): value = cls.to_preserialization_data(deserialized_data) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): if python_value is None: return cls.DEFAULT return cls.TYPE(python_value) class Integer(Converter): """Converter for `int` literals.""" TYPE = int DEFAULT = 0 # pylint: disable=unused-argument @classmethod def to_python_value(cls, deserialized_data, *, target=None): value = cls.to_preserialization_data(deserialized_data) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): if python_value is None: return cls.DEFAULT try: return cls.TYPE(python_value) except ValueError as exc: try: data = cls.TYPE(float(python_value)) except ValueError: raise exc from None else: msg = f'Precision lost in conversion to int: {python_value}' log.warn(msg) return data class Number(Float): """Converter for integers or floats.""" DEFAULT = 0 # pylint: disable=unused-argument @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): data = super().to_preserialization_data(python_value) if int(data) == data: return int(data) return data class String(Converter): """Converter for `str` literals.""" TYPE = str DEFAULT = "" # pylint: disable=unused-argument @classmethod def to_python_value(cls, deserialized_data, *, target=None): value = cls.to_preserialization_data(deserialized_data) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): if python_value is None: return cls.DEFAULT return cls.TYPE(python_value) class Text(String): """Converter for multiline strings.""" DEFAULT = "" # pylint: disable=unused-argument @classmethod def to_python_value(cls, deserialized_data, *, target=None): value = cls.to_preserialization_data(deserialized_data).strip() if '\n' in value: value = value + '\n' return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): data = super().to_preserialization_data(python_value).strip() if '\n' in data: return LiteralScalarString(data + '\n') return data class List(Converter): """Base converter for homogeneous lists of another converter.""" CONVERTER = None @classmethod def subclass(cls, converter: type): name = f'{converter.__name__}List' # type: ignore bases = (cls,) attributes = {'CONVERTER': converter} return type(name, bases, attributes) @classmethod def to_python_value(cls, deserialized_data, *, target): if target is None: value = [] # type: ignore else: value = target value.clear() convert = cls.CONVERTER.to_python_value if deserialized_data is None: pass elif isinstance(deserialized_data, Iterable) and all( (item is None for item in deserialized_data) ): pass elif isinstance(deserialized_data, str): for item in deserialized_data.split(','): value.append(convert(item)) else: try: items = iter(deserialized_data) except TypeError: value.append(convert(deserialized_data, target=None)) else: for item in items: value.append(convert(item, target=None)) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): data = [] convert = cls.CONVERTER.to_preserialization_data if python_value is None: pass elif isinstance(python_value, Iterable): if isinstance(python_value, str): data.append(convert(python_value, skip=Missing)) elif isinstance(python_value, set): data.extend( sorted(convert(item, skip=Missing) for item in python_value) ) else: for item in python_value: data.append(convert(item, skip=Missing)) else: data.append(convert(python_value, skip=Missing)) if data == skip: data.clear() return data or [None] class Dictionary(Converter): """Base converter for raw dictionaries.""" @classmethod def subclass(cls, key: type, value: type): name = f'{key.__name__}{value.__name__}Dict' bases = (cls,) return type(name, bases, {}) @classmethod def to_python_value(cls, deserialized_data, *, target): if isinstance(deserialized_data, dict): data = deserialized_data.copy() else: data = {} if target is None: value = data else: value = target value.clear() value.update(data) return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): data = dict(python_value) if data == skip: data.clear() return data class Object(Converter): """Base converter for dataclasses.""" DATACLASS = None CONVERTERS = None @classmethod def subclass(cls, dataclass, converters: Dict[str, type]): name = f'{dataclass.__name__}Converter' bases = (cls,) attributes = {'DATACLASS': dataclass, 'CONVERTERS': converters} return type(name, bases, attributes) @classmethod def to_python_value(cls, deserialized_data, *, target): if isinstance(deserialized_data, dict): data = deserialized_data.copy() else: data = {} for name, converter in cls.CONVERTERS.items(): if name not in data: data[name] = converter.to_python_value(None) new_value = cls.DATACLASS(**data) # pylint: disable=not-callable if target is None: value = new_value else: value = target value.__dict__ = new_value.__dict__ return value @classmethod def to_preserialization_data(cls, python_value, *, skip=Missing): data = {} for name, converter in cls.CONVERTERS.items(): if isinstance(python_value, dict): try: value = python_value[name] except KeyError as e: log.debug(e) value = None else: try: value = getattr(python_value, name) except AttributeError as e: log.debug(e) value = None if skip is not Missing: if value == getattr(skip, name): log.debug(f"Skipped default value for '{name}' attribute") continue data[name] = converter.to_preserialization_data(value) return data @cached def map_type(cls): """Infer the converter type from a dataclass, type, or annotation.""" log.debug(f'Mapping {cls} to converter') if dataclasses.is_dataclass(cls): converters = {} for field in dataclasses.fields(cls): converters[field.name] = map_type(field.type) converter = Object.subclass(cls, converters) log.debug(f'Mapped {cls} to new converter: {converter}') return converter if hasattr(cls, '__origin__'): converter = None if cls.__origin__ == list: try: converter = map_type(cls.__args__[0]) except TypeError as exc: log.debug(exc) exc = TypeError(f"Type is required with 'List' annotation") raise exc from None else: converter = List.subclass(converter) if cls.__origin__ == dict: log.warn("Schema enforcement not possible with 'Dict' annotation") key = map_type(cls.__args__[0]) value = map_type(cls.__args__[1]) converter = Dictionary.subclass(key, value) elif cls.__origin__ == Union: converter = map_type(cls.__args__[0]) assert len(cls.__args__) == 2 assert cls.__args__[1] == type(None) converter = converter.as_optional() if converter: log.debug(f'Mapped {cls} to new converter: {converter}') return converter raise TypeError(f'Unsupported container type: {cls.__origin__}') else: for converter in Converter.__subclasses__(): if converter.TYPE == cls: log.debug(f'Mapped {cls} to existing converter: {converter}') return converter if issubclass(cls, Converter): log.debug(f'Mapped {cls} to existing converter (itself)') return cls raise TypeError(f'Could not map type: {cls}') PK!=tttdatafiles/decorators.pyimport dataclasses from typing import Dict, Optional from .converters import Converter from .models import create_model def datafile( pattern: str, attrs: Optional[Dict[str, Converter]] = None, manual: bool = False, defaults: bool = False, ): """Synchronize a data class to the specified path.""" def decorator(cls): if dataclasses.is_dataclass(cls): dataclass = cls else: dataclass = dataclasses.dataclass(cls) return create_model( dataclass, attrs=attrs, pattern=pattern, manual=manual, defaults=defaults ) return decorator PK!Y  datafiles/formats.pyimport json from abc import ABCMeta, abstractmethod from pathlib import Path from typing import IO, Any, Dict, List import tomlkit from ruamel import yaml class Formatter(metaclass=ABCMeta): """Base class for object serialization and text deserialization.""" @classmethod @abstractmethod def extensions(cls) -> List[str]: raise NotImplementedError @classmethod @abstractmethod def deserialize(cls, file_object: IO[Any]) -> Dict: raise NotImplementedError @classmethod @abstractmethod def serialize(cls, data: Dict) -> str: raise NotImplementedError class JSON(Formatter): """Formatter for JavaScript Object Notation.""" @classmethod def extensions(cls): return {'.json'} @classmethod def deserialize(cls, file_object): return json.load(file_object) or {} @classmethod def serialize(cls, data): return json.dumps(data, indent=2) class TOML(Formatter): """Formatter for (round-trip) Tom's Obvious Minimal Language.""" @classmethod def extensions(cls): return {'.toml'} @classmethod def deserialize(cls, file_object): return tomlkit.loads(file_object.read()) or {} @classmethod def serialize(cls, data): return tomlkit.dumps(data) class YAML(Formatter): """Formatter for (safe, round-trip) YAML Ain't Markup Language.""" @classmethod def extensions(cls): return {'.yml', '.yaml'} @classmethod def deserialize(cls, file_object): return yaml.YAML(typ='rt').load(file_object) or {} @classmethod def serialize(cls, data): text = yaml.round_trip_dump(data) return "" if text == "{}\n" else text def deserialize(path: Path, extension: str) -> Dict: for formatter in Formatter.__subclasses__(): if extension in formatter.extensions(): with path.open('r') as file_object: return formatter.deserialize(file_object) raise ValueError(f'Unsupported file extension: {extension}') def serialize(data: Dict, extension: str = '.yml') -> str: for formatter in Formatter.__subclasses__(): if extension in formatter.extensions(): return formatter.serialize(data) raise ValueError(f'Unsupported file extension: {extension!r}') PK!,datafiles/hooks.pyimport dataclasses from contextlib import contextmanager, suppress from functools import wraps import log from . import settings LOAD_BEFORE_METHODS = ['__getattribute__', '__getitem__', '__iter__'] SAVE_AFTER_METHODS = [ '__setattr__', '__setitem__', '__delitem__', 'append', 'extend', 'insert', 'remove', 'pop', 'clear', 'sort', 'reverse', 'popitem', 'update', ] FLAG = '_patched' class List(list): """Patchable `list` type.""" class Dict(dict): """Patchable `dict` type.""" def apply(instance, datafile, build_datafile): """Path methods that get or set attributes.""" cls = instance.__class__ log.debug(f'Patching methods on {cls}') for method_name in LOAD_BEFORE_METHODS: with suppress(AttributeError): method = getattr(cls, method_name) modified_method = load_before(cls, method) setattr(cls, method_name, modified_method) for method_name in SAVE_AFTER_METHODS: with suppress(AttributeError): method = getattr(cls, method_name) modified_method = save_after(cls, method) setattr(cls, method_name, modified_method) if dataclasses.is_dataclass(instance): for attr_name in instance.datafile.attrs: attr = getattr(instance, attr_name) if not dataclasses.is_dataclass(attr): # pylint: disable=unidiomatic-typecheck if type(attr) == list: attr = List(attr) setattr(instance, attr_name, attr) elif type(attr) == dict: attr = Dict(attr) setattr(instance, attr_name, attr) else: continue attr.datafile = build_datafile(attr, root=datafile) apply(attr, datafile, build_datafile) def load_before(cls, method): """Decorate methods that should load before call.""" if hasattr(method, FLAG): return method @wraps(method) def wrapped(self, *args, **kwargs): __tracebackhide__ = settings.HIDE_TRACEBACK_IN_HOOKS datafile = get_datafile(self) if enabled(datafile, args): if datafile.exists and datafile.modified: log.debug(f"Loading automatically before '{method.__name__}' call") datafile.load() # TODO: Implement this? # if mapper.auto_save_after_load: # mapper.save() # mapper.modified = False return method(self, *args, **kwargs) log.debug(f'Patched method to load before call: {cls.__name__}.{method.__name__}') setattr(wrapped, FLAG, True) return wrapped def save_after(cls, method): """Decorate methods that should save after call.""" if hasattr(method, FLAG): return method @wraps(method) def wrapped(self, *args, **kwargs): __tracebackhide__ = settings.HIDE_TRACEBACK_IN_HOOKS datafile = get_datafile(self) if enabled(datafile, args): if datafile.exists and datafile.modified: log.debug(f"Loading automatically before '{method.__name__}' call") datafile.load() result = method(self, *args, **kwargs) if enabled(datafile, args): log.debug(f"Saving automatically after '{method.__name__}' call") datafile.save() return result log.debug(f'Patched method to save after call: {cls.__name__}.{method.__name__}') setattr(wrapped, FLAG, True) return wrapped def get_datafile(obj): try: return object.__getattribute__(obj, 'datafile') except AttributeError: return None def enabled(datafile, args) -> bool: """Determine if hooks are enabled for the current method.""" if not settings.HOOKS_ENABLED: return False if datafile is None: return False if datafile.manual: return False # TODO: Investigate performance impact of removing this code if args and isinstance(args[0], str): if args[0] in {'Meta', 'datafile'}: return False if args[0].startswith('_'): return False return True @contextmanager def disabled(): """Globally disable method hooks, temporarily.""" if settings.HOOKS_ENABLED: settings.HOOKS_ENABLED = False yield settings.HOOKS_ENABLED = True else: yield PK!W%W%datafiles/managers.pyfrom __future__ import annotations import dataclasses import inspect import os from pathlib import Path from typing import Any, Dict, Optional import log from cached_property import cached_property from . import formats, hooks from .converters import List from .utils import Missing, prettify Trilean = Optional[bool] class ModelManager: def __init__(self, cls): self.model = cls def all(self): raise NotImplementedError class InstanceManager: def __init__( self, instance: Any, *, attrs: Dict, pattern: Optional[str], manual: bool, defaults: bool, root: Optional[InstanceManager] = None, ) -> None: assert manual is not None assert defaults is not None self._instance = instance self.attrs = attrs self._pattern = pattern self._manual = manual self.defaults = defaults self._last_load = 0.0 self._last_data: Dict = {} self._root = root @property def classname(self) -> str: return self._instance.__class__.__name__ @cached_property def path(self) -> Optional[Path]: if not self._pattern: return None cls = self._instance.__class__ try: root = Path(inspect.getfile(cls)).parent except TypeError: # pragma: no cover level = log.DEBUG if '__main__' in str(cls) else log.WARNING log.log(level, f'Unable to determine module for {cls}') root = Path.cwd() relpath = self._pattern.format(self=self._instance) return (root / relpath).resolve() @property def relpath(self) -> Path: return Path(os.path.relpath(self.path, Path.cwd())) @property def exists(self) -> bool: if self.path: return self.path.exists() return False @property def modified(self) -> bool: if self.path: return self._last_load != self.path.stat().st_mtime return True @modified.setter def modified(self, modified: bool): if modified: self._last_load = 0.0 else: assert self.path, 'Cannot mark a missing file as unmodified' self._last_load = self.path.stat().st_mtime @property def manual(self) -> bool: return self._root.manual if self._root else self._manual @property def data(self) -> Dict: return self._get_data() def _get_data(self, include_default_values: Trilean = None) -> Dict: log.debug(f'Preserializing object to data: {self._instance!r}') if include_default_values is None: include_default_values = self.defaults self._last_data.update(dataclasses.asdict(self._instance)) data = self._last_data for name in list(data.keys()): if name not in self.attrs: log.debug(f'Removed unmapped attribute: {name}') data.pop(name) for name, converter in self.attrs.items(): value = data[name] if getattr(converter, 'DATACLASS', None): log.debug(f"Converting '{name}' dataclass with {converter}") if value is None: value = {} for field in dataclasses.fields(converter.DATACLASS): if field.name not in value: log.debug(f'Added missing nested attribute: {field.name}') value[field.name] = None data[name] = converter.to_preserialization_data( value, skip=Missing if include_default_values else self._get_default_field_value(name), ) elif ( value == self._get_default_field_value(name) and not include_default_values ): log.debug(f"Skipped default value for '{name}' attribute") data.pop(name) else: log.debug(f"Converting '{name}' value with {converter}: {value!r}") data[name] = converter.to_preserialization_data(value) log.debug(f'Preserialized object data: {data}') return data @property def text(self) -> str: return self._get_text() def _get_text(self, **kwargs) -> str: data = self._get_data(**kwargs) if self.path and self.path.suffix: return formats.serialize(data, self.path.suffix) return formats.serialize(data) def load(self, *, first_load=False) -> None: if self._root: self._root.load(first_load=first_load) return if self.path: log.info(f"Loading '{self.classname}' object from '{self.relpath}'") else: raise RuntimeError("'pattern' must be set to load the model") message = f'Reading data from file: {self.path}' log.debug(message) data = formats.deserialize(self.path, self.path.suffix) self._last_data = data log.debug('=' * len(message) + '\n\n' + prettify(data) + '\n') with hooks.disabled(): for name, converter in self.attrs.items(): log.debug(f"Converting '{name}' data with {converter}") if getattr(converter, 'DATACLASS', None): self._set_dataclass_value(data, name, converter) else: self._set_attribute_value(data, name, converter, first_load) self.modified = False def _set_dataclass_value(self, data, name, converter): # TODO: Support nesting unlimited levels # https://github.com/jacebrowning/datafiles/issues/22 nested_data = data.get(name) if nested_data is None: return log.debug(f'Converting nested data to Python: {nested_data}') dataclass = getattr(self._instance, name) if dataclass is None: for field in dataclasses.fields(converter.DATACLASS): if field.name not in nested_data: # type: ignore nested_data[field.name] = None # type: ignore dataclass = converter.to_python_value(nested_data, target=dataclass) # TODO: Find a way to avoid this circular import try: datafile = dataclass.datafile except AttributeError: from .models import build_datafile log.warn(f"{dataclass} has not yet been patched to have 'datafile'") datafile = build_datafile(dataclass) for name2, converter2 in datafile.attrs.items(): _value = nested_data.get( # type: ignore # pylint: disable=protected-access name2, datafile._get_default_field_value(name2), ) value = converter2.to_python_value(_value, target=getattr(dataclass, name2)) log.debug(f"'{name2}' as Python: {value!r}") setattr(dataclass, name2, value) log.debug(f"Setting '{name}' value: {dataclass!r}") setattr(self._instance, name, dataclass) def _set_attribute_value(self, data, name, converter, first_load): file_value = data.get(name, Missing) init_value = getattr(self._instance, name) default_value = self._get_default_field_value(name) if first_load: log.debug( 'First load values: file=%r, init=%r, default=%r', file_value, init_value, default_value, ) if init_value != default_value and not issubclass(converter, List): log.debug(f"Keeping non-default '{name}' init value: {init_value!r}") return if file_value is Missing: if default_value is Missing: value = converter.to_python_value(None, target=init_value) else: value = converter.to_python_value(default_value, target=init_value) else: value = converter.to_python_value(file_value, target=init_value) log.debug(f"Setting '{name}' value: {value!r}") setattr(self._instance, name, value) def _get_default_field_value(self, name): for field in dataclasses.fields(self._instance): if field.name == name: if not isinstance(field.default, Missing): return field.default if not isinstance(field.default_factory, Missing): # type: ignore return field.default_factory() # type: ignore if not field.init and hasattr(self._instance, '__post_init__'): return getattr(self._instance, name) return Missing def save(self, include_default_values: Trilean = None) -> None: if self._root: self._root.save(include_default_values=include_default_values) return if self.path: log.info(f"Saving '{self.classname}' object to '{self.relpath}'") else: raise RuntimeError(f"'pattern' must be set to save the model") with hooks.disabled(): text = self._get_text(include_default_values=include_default_values) message = f'Writing file: {self.path}' log.debug(message) log.debug('=' * len(message) + '\n\n' + (text or '\n')) self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text(text) self.modified = False PK!.:B datafiles/models.pyimport dataclasses from dataclasses import dataclass from typing import Dict, Optional import log from classproperties import classproperty from . import hooks from .converters import Converter, map_type from .managers import InstanceManager, ModelManager @dataclass class ModelMeta: datafile_attrs: Optional[Dict[str, Converter]] = None datafile_pattern: Optional[str] = None datafile_manual: bool = False datafile_defaults: bool = False class Model: Meta: ModelMeta = ModelMeta() def __post_init__(self): with hooks.disabled(): log.debug(f'Initializing {self.__class__} object') self.datafile = build_datafile(self) path = self.datafile.path exists = self.datafile.exists if path: log.debug(f'Datafile path: {path}') log.debug(f'Datafile exists: {exists}') if exists: self.datafile.load(first_load=True) elif path: self.datafile.save() hooks.apply(self, self.datafile, build_datafile) log.debug(f'Initialized {self.__class__} object') @classproperty def datafiles(cls) -> ModelManager: # pylint: disable=no-self-argument return ModelManager(cls) def build_datafile(obj, root=None) -> InstanceManager: try: return object.__getattribute__(obj, 'datafile') except AttributeError: log.debug(f"Building 'datafile' for {obj.__class__} object") m = getattr(obj, 'Meta', None) pattern = getattr(m, 'datafile_pattern', None) attrs = getattr(m, 'datafile_attrs', None) manual = getattr(m, 'datafile_manual', False) defaults = getattr(m, 'datafile_defaults', False) if attrs is None and dataclasses.is_dataclass(obj): attrs = {} log.debug(f'Mapping attributes for {obj.__class__} object') for field in dataclasses.fields(obj): self_name = f'self.{field.name}' if pattern is None or self_name not in pattern: attrs[field.name] = map_type(field.type) return InstanceManager( obj, attrs=attrs, pattern=pattern, manual=manual, defaults=defaults, root=root ) def create_model(cls, *, attrs=None, pattern=None, manual=None, defaults=None): """Patch datafile attributes on to an existing dataclass.""" log.debug(f'Converting {cls} to a datafile model') if not dataclasses.is_dataclass(cls): raise ValueError(f'{cls} must be a dataclass') # Patch Meta m = getattr(cls, 'Meta', ModelMeta()) if attrs is not None: m.datafile_attrs = attrs if pattern is not None: m.datafile_pattern = pattern if not hasattr(cls, 'Meta') and manual is not None: m.datafile_manual = manual if not hasattr(cls, 'Meta') and defaults is not None: m.datafile_defaults = defaults cls.Meta = m # Patch __init__ init = cls.__init__ def modified_init(self, *args, **kwargs): with hooks.disabled(): init(self, *args, **kwargs) Model.__post_init__(self) cls.__init__ = modified_init cls.__init__.__doc__ = init.__doc__ return cls PK!55datafiles/settings.pyHOOKS_ENABLED = True HIDE_TRACEBACK_IN_HOOKS = True PK!ޙPb""datafiles/tests/__init__.py"""Unit tests for the package.""" PK!Y:. & &"datafiles/tests/test_converters.py# pylint: disable=unused-variable from dataclasses import dataclass from typing import ByteString, Dict, List, Optional import pytest from ruamel.yaml.scalarstring import LiteralScalarString from datafiles import converters @dataclass class MyDataclass: foobar: int flag: bool = False class MyNonDataclass: pass IntegerList = converters.List.subclass(converters.Integer) StringList = converters.List.subclass(converters.String) MyDict = converters.Dictionary.subclass(converters.String, converters.Integer) MyDataclassConverter = converters.map_type(MyDataclass) MyDataclassConverterList = converters.map_type(List[MyDataclass]) def describe_map_type(): def it_handles_extended_types(expect): converter = converters.map_type(converters.Number) expect(converter.__name__) == 'Number' def it_handles_list_annotations(expect): converter = converters.map_type(List[str]) expect(converter.__name__) == 'StringList' expect(converter.CONVERTER) == converters.String def it_handles_list_annotations_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) expect(converter.__name__) == 'MyDataclassConverterList' expect(converter.CONVERTER.__name__) == 'MyDataclassConverter' def it_requires_list_annotations_to_have_a_type(expect): with expect.raises(TypeError): converters.map_type(List) def it_handles_dict_annotations(expect): converter = converters.map_type(Dict[str, int]) expect(converter.__name__) == 'StringIntegerDict' def it_handles_dataclasses(expect): converter = converters.map_type(MyDataclass) expect(converter.__name__) == 'MyDataclassConverter' expect(converter.CONVERTERS) == { 'foobar': converters.Integer, 'flag': converters.Boolean, } def it_handles_optionals(expect): converter = converters.map_type(Optional[str]) expect(converter.__name__) == 'OptionalString' expect(converter.TYPE) == str expect(converter.DEFAULT) == None def it_rejects_unknown_types(expect): with expect.raises(TypeError): converters.map_type(MyNonDataclass) def it_rejects_unhandled_type_annotations(expect): with expect.raises(TypeError): converters.map_type(ByteString) def describe_converter(): def describe_to_python_value(): @pytest.mark.parametrize( 'converter, data, value', [ # Literals (converters.Boolean, '1', True), (converters.Boolean, '0', False), (converters.Boolean, 'enabled', True), (converters.Boolean, 'disabled', False), (converters.Boolean, 'T', True), (converters.Boolean, 'F', False), (converters.Boolean, 'true', True), (converters.Boolean, 'false', False), (converters.Boolean, 'Y', True), (converters.Boolean, 'N', False), (converters.Boolean, 'yes', True), (converters.Boolean, 'no', False), (converters.Boolean, 'on', True), (converters.Boolean, 'off', False), (converters.Boolean, 0, False), (converters.Boolean, 1, True), (converters.Float, 4, 4.0), (converters.Integer, 4.2, 4), (converters.String, 4.2, '4.2'), (converters.String, 42, '42'), (converters.String, True, 'True'), (converters.String, False, 'False'), # Containers (IntegerList, [], []), (IntegerList, '1, 2.3', [1, 2]), (IntegerList, '42', [42]), (IntegerList, 42, [42]), (IntegerList, None, []), (IntegerList, [42], [42]), (IntegerList, [None], []), (IntegerList, [None, None], []), (MyDict, None, {}), (MyDict, {}, {}), (MyDict, {'a': 1}, {'a': 1}), # Dataclasses (MyDataclassConverter, None, MyDataclass(foobar=0)), (MyDataclassConverterList, None, []), (MyDataclassConverterList, 42, [MyDataclass(foobar=0)]), ], ) def when_nominal(expect, converter, data, value): expect(converter.to_python_value(data, target=None)) == value def when_number(expect): convert = converters.Number.to_python_value expect(convert(1.23)).isinstance(float) expect(convert(42)).isinstance(int) def when_text(expect): convert = converters.Text.to_python_value expect(convert("")) == "" expect(convert("Hello, world!")) == "Hello, world!" expect(convert("Line 1\nLine 2\n")) == "Line 1\nLine 2\n" def when_invalid(expect): message = "invalid literal for int() with base 10: 'a'" with expect.raises(ValueError, message): converters.Integer.to_python_value('a') def when_list_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) data = [{'foobar': 1}, {'foobar': 2}, {'foobar': 3}] value = [MyDataclass(1), MyDataclass(2), MyDataclass(3)] expect(converter.to_python_value(data, target=None)) == value def with_existing_list(expect): orginal = [1, 2] value = IntegerList.to_python_value("3, 4", target=orginal) expect(value) == [3, 4] expect(id(value)) == id(orginal) def when_existing_dict(expect): orginal = {'a': 1} value = MyDict.to_python_value({'b': 2}, target=orginal) expect(value) == {'b': 2} expect(id(value)) == id(orginal) def with_existing_dataclass(expect): orginal = MyDataclass(foobar=1) value = MyDataclassConverter.to_python_value({'foobar': 2}, target=orginal) expect(value) == MyDataclass(foobar=2) expect(id(value)) == id(orginal) def describe_to_preserialization_data(): @pytest.mark.parametrize( 'converter, value, data', [ # Literals (converters.Boolean, None, False), (converters.Float, None, 0.0), (converters.Integer, None, 0), (converters.String, None, ''), # Containers (StringList, 'ab', ['ab']), (StringList, ('b', 1, 'A'), ['b', '1', 'A']), (StringList, {'b', 1, 'A'}, ['1', 'A', 'b']), (StringList, 42, ['42']), (StringList, [123, True, False], ['123', 'True', 'False']), (StringList, [], [None]), (StringList, None, [None]), # Dataclasses (MyDataclassConverter, None, {'foobar': 0, 'flag': False}), (MyDataclassConverterList, None, [None]), (MyDataclassConverterList, 42, [{'foobar': 0, 'flag': False}]), ], ) def when_nominal(expect, converter, value, data): expect(converter.to_preserialization_data(value)) == data def when_number(expect): convert = converters.Number.to_preserialization_data expect(convert(1.23)).isinstance(float) expect(convert(42)).isinstance(int) def when_text(expect): convert = converters.Text.to_preserialization_data expect(convert("")) == "" expect(convert("Hello, world!")) == "Hello, world!" expect(convert("Line 1\nLine 2")) == "Line 1\nLine 2\n" expect(convert("Line 1\nLine 2")).isinstance(LiteralScalarString) def when_invalid(expect): message = "invalid literal for int() with base 10: 'a'" with expect.raises(ValueError, message): converters.Integer.to_preserialization_data('a') def when_list_of_dataclasses(expect): converter = converters.map_type(List[MyDataclass]) value = [MyDataclass(1), MyDataclass(2), MyDataclass(3)] data = [ {'foobar': 1, 'flag': False}, {'foobar': 2, 'flag': False}, {'foobar': 3, 'flag': False}, ] expect(converter.to_preserialization_data(value)) == data expect(converter.to_preserialization_data(data)) == data def when_list_with_default(expect): data = IntegerList.to_preserialization_data([1], skip=[1]) expect(data) == [None] data = IntegerList.to_preserialization_data([2], skip=[1]) expect(data) == [2] def when_dict_with_default(expect): data = MyDict.to_preserialization_data({'a': 1}, skip={'a': 1}) expect(data) == {} data = MyDict.to_preserialization_data({'b': 2}, skip={'a': 1}) expect(data) == {'b': 2} def when_dataclass_with_default(expect): data = MyDataclassConverter.to_preserialization_data( MyDataclass(1), skip=MyDataclass(1) ) expect(data) == {} data = MyDataclassConverter.to_preserialization_data( MyDataclass(2), skip=MyDataclass(1) ) expect(data) == {'foobar': 2} data = MyDataclassConverter.to_preserialization_data( MyDataclass(1, flag=True), skip=MyDataclass(1) ) expect(data) == {'flag': True} PK!"datafiles/tests/test_decorators.pyPK!h翔datafiles/tests/test_formats.py# pylint: disable=unused-variable from pathlib import Path from datafiles import formats def describe_deserialize(): def it_rejects_unknown_extensions(expect): with expect.raises(ValueError): formats.deserialize(Path(), '.xyz') PK!,vdatafiles/tests/test_hooks.py# pylint: disable=unused-variable from datafiles import hooks, settings class Sample: foobar = 1 def describe_apply(): def it_can_be_called_twice(mocker): instance = Sample() setattr(instance, 'datafile', mocker.Mock()) get_datafile = mocker.Mock() hooks.apply(instance, None, get_datafile) hooks.apply(instance, None, get_datafile) def describe_disabled(): def when_nested(expect): expect(settings.HOOKS_ENABLED) == True with hooks.disabled(): expect(settings.HOOKS_ENABLED) == False with hooks.disabled(): expect(settings.HOOKS_ENABLED) == False expect(settings.HOOKS_ENABLED) == False expect(settings.HOOKS_ENABLED) == True PK!'U*W^ ^ datafiles/tests/test_managers.py# pylint: disable=unused-variable,protected-access from dataclasses import dataclass from pathlib import Path import pytest from datafiles import managers @dataclass class MyModel: foobar: int class MyField: @classmethod def to_preserialization_data(cls, python_value): return python_value def describe_instance_manager(): @pytest.fixture def manager(): return managers.InstanceManager( instance=MyModel(foobar=42), attrs={}, pattern=None, manual=False, defaults=False, ) def describe_path(): def is_none_when_no_pattern(expect, manager): expect(manager.path) == None def is_absolute_based_on_the_file(expect, manager): manager._pattern = '../../tmp/sample.yml' root = Path(__file__).parents[2] expect(manager.path) == root / 'tmp' / 'sample.yml' def describe_relpath(): def when_cwd_is_parent(expect, manager): manager._pattern = '../../tmp/sample.yml' expect(manager.relpath) == Path('tmp', 'sample.yml') def when_cwd_is_sibling(expect, manager): manager._pattern = '../../../tmp/sample.yml' expect(manager.relpath) == Path('..', 'tmp', 'sample.yml') def describe_text(): def is_blank_when_no_attrs(expect, manager): expect(manager.text) == "" def is_yaml_by_default(expect, manager): manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar: 42\n" def with_json_format(expect, manager): manager._pattern = '_.json' manager.attrs = {'foobar': MyField} expect(manager.text) == '{\n "foobar": 42\n}' def with_toml_format(expect, manager): manager._pattern = '_.toml' manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar = 42\n" def with_no_format(expect, manager): manager._pattern = '_' manager.attrs = {'foobar': MyField} expect(manager.text) == "foobar: 42\n" def with_unknown_format(expect, manager): manager._pattern = '_.xyz' manager.attrs = {'foobar': MyField} with expect.raises(ValueError): print(manager.text) def describe_load(): def it_requires_path(expect, manager): with expect.raises(RuntimeError): manager.load() def describe_save(): def it_requires_path(expect, manager): with expect.raises(RuntimeError): manager.save() PK!--datafiles/tests/test_models.py# pylint: disable=unused-variable from datafiles.models import build_datafile, create_model def describe_build_datafile(): def it_reuses_existing_datafile(mocker, expect): obj = mocker.Mock() datafile = mocker.Mock() obj.datafile = datafile new_datafile = build_datafile(obj) expect(new_datafile) == obj.datafile def describe_create_model(): def it_requires_dataclass(expect): class NonDataclass: pass with expect.raises(ValueError): create_model(NonDataclass) PK! v@@datafiles/utils.pyimport dataclasses from contextlib import suppress from functools import lru_cache from pprint import pformat from typing import Any, Dict Missing = dataclasses._MISSING_TYPE # pylint: disable=protected-access cached = lru_cache() def prettify(data: Dict) -> str: return pformat(dictify(data)) def dictify(value: Any) -> Dict: with suppress(AttributeError): return {k: dictify(v) for k, v in value.items()} if isinstance(value, str): return value with suppress(TypeError): return [dictify(x) for x in value] return value PK!MKK"datafiles-0.1.dist-info/LICENSE.md# License **The MIT License (MIT)** Copyright © 2018, Jace Browning Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTUdatafiles-0.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H Q^X datafiles-0.1.dist-info/METADATAWmSF_q~-M!N6@2a:YZWNwFwNeIKFgw]9wZl7މy)C!F* nzn!gWglW$l |Ε6M.E()2,| EٳM+Smr-v^_wUׂ\aTݛӮKqmrcmRg=UB=() WG,t]J1Ãex;:P> H]FeiXr̗dʁ*0tn!^m >;* %snd\*ĝdR#)xI!JjĮk>0JO%KG)M`B%]^\롛r|N`kz"KYƳ1Xl`'ڍ;dr#H!ڎWcJBQKr(BT8k2y!du q y'\e޻Sl)UG͇-6sB/+_Ap ͑pj@sǥ5'`3#Jr;w3!ýK1rTMv2SJ>I"Q4f2"G42rLAb6m}k]z OF{Ik anN2.elJ|ƨ􀐳X CUYj%⥘UYj( 7Rɲ)"ΌTs++LB$s3 vt 8XB+bf9 y%]Y>ihym0-2YӠ-—),c|( V縿X؎Ȝ,q^bM$yNF?J>yP)qYX ,V: {Y/ $L޵0%?uDhHҮ-I1Ccye}g_Vs2Ym^hv.767;!n@GF-U6fq(i.|SmZ:,t^!_w(cP[& ţ(z ƄtGH#?~y*(=cFߙn*Uvp.8,LʆRs~r0FHѐcl;@iw\~ȴu[z@~ ๡~CR(%cT26݆<@+ xx@7c>=h[s& ⺐:?d.9Ҕ,Shɥژ|n:T(jW=hG0߹a-$ͼ2Ghglg{sRh 00 =H8v}6W@0 vN s`݂]x:&ƀMF- eaEo ~AZ̻I9Hm Z>º^<҄3;gNx15*EA'hD߳7[7cE@+n/58S? d@ Úոe8H$+f32jl5:鰢l5_4P ȤM:xM PK!H8kI>datafiles-0.1.dist-info/RECORDuɒH{? T$ۡ& \P~j&;i뒧/r3iQ'O/ㅌy@P/]yҽ( ҭ4p#8`QapLu:?U;4legL*n$q$v_@ .׬Ή(?)+i:6EMlחvC \ƾGmz6]dX̄]ħG I|]Wm.mGNr2̑#* p0F0Y,PMSK= WuDea_*S^J2 Eq[zku~nymWwtaCd'ENn1hf'\Y轌rEñ$+Z0a]E#>Kk_sX#dtBEHM[um(?Z_ӽ9c49,r~!#JGwf7qPJUC駇x l!{cEΥxZNY,2lx(i]SɑS>ΐwiNC\3%RBޠd]74Ī! ͼGZWNfD|LQZA!Zi nJM ,R '؞{3 :vi6` YOZurKvt_7Z|_dՀ3D$L]eGK}k/77S(4=]a:vz||uG4,} p6DPK!^[datafiles/__init__.pyPK!H**datafiles/converters.pyPK!=ttt+datafiles/decorators.pyPK!Y  .datafiles/formats.pyPK!,7datafiles/hooks.pyPK!W%W%Idatafiles/managers.pyPK!.:B odatafiles/models.pyPK!55{datafiles/settings.pyPK!ޙPb"":|datafiles/tests/__init__.pyPK!Y:. & &"|datafiles/tests/test_converters.pyPK!"ߢdatafiles/tests/test_decorators.pyPK!h翔datafiles/tests/test_formats.pyPK!,v\datafiles/tests/test_hooks.pyPK!'U*W^ ^ datafiles/tests/test_managers.pyPK!--2datafiles/tests/test_models.pyPK! v@@datafiles/utils.pyPK!MKK" datafiles-0.1.dist-info/LICENSE.mdPK!HnHTUdatafiles-0.1.dist-info/WHEELPK!H Q^X %datafiles-0.1.dist-info/METADATAPK!H8kI>datafiles-0.1.dist-info/RECORDPK~