PK! artisan/__init__.py''' A build system for explainable science ''' from ._artifacts import Artifact, ArrayFile, EncodedFile, write_global_meta from ._configurables import Configurable from ._global_conf import Conf, push_conf, pop_conf, using_conf, get_conf from ._http import serve __all__ = [ 'Configurable', 'Artifact', 'ArrayFile', 'EncodedFile', 'write_global_meta', 'Conf', 'push_conf', 'pop_conf', 'using_conf', 'get_conf', 'serve' ] #-- `__module__` rebinding ---------------------------------------------------- Configurable.__module__ = __name__ Artifact.__module__ = __name__ write_global_meta.__module__ = __name__ Conf.__module__ = __name__ push_conf.__module__ = __name__ pop_conf.__module__ = __name__ using_conf.__module__ = __name__ get_conf.__module__ = __name__ serve.__module__ = __name__ #-- Wonky alias docstring definitions ----------------------------------------- ArrayFile = ArrayFile; 'An alias for `h5py.Dataset`' # type: ignore EncodedFile = EncodedFile; 'An alias for `pathlib.Path`' # type: ignore PK! >>artisan/_artifacts.pyimport itertools import json from pathlib import Path import shutil from time import sleep from typing import ( Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, cast ) from typing_extensions import Protocol import h5py as h5 import numpy as np from ruamel import yaml from ._configurables import Configurable, schema from ._global_conf import get_conf from ._namespaces import namespacify, Namespace __all__ = ['Artifact', 'ArrayFile', 'EncodedFile', 'write_global_meta'] #-- Static type definitions --------------------------------------------------- from pathlib import Path as EncodedFile class ArrayFile(Protocol): def __get__(self, obj: object, type_: Optional[type]) -> h5.Dataset: ... def __set__(self, obj: object, val: object) -> None: ... #-- Artifacts ----------------------------------------------------------------- class Artifact(Configurable): ''' An array- and metadata-friendly view into a directory Arguments: path (Path|str): The path at which the artifact is, or should be, stored conf (Mapping[str, object]): The subtype-specific configuration, optionally including a "type" field indicating what type of artifact to construct Constructors: - Artifact(conf: *Mapping[str, object]*) - Artifact(**conf_elem: *object*) - Artifact(path: *Path|str*) - Artifact(path: *Path|str*, conf: *Mapping[str, object]*) - Artifact(path: *Path|str*, **conf_elem: *object*) Fields: - **path** (*Path*): The path to the root of the file tree backing this \ artifact - **meta** (*Mapping[str, object]*): The metadata stored in \ `{self.path}/_meta.yaml` Type lookup is performed in the current scope, which can be modified via the global configuration API. Reading/writing/extending/deleting `ArrayFile`, `EncodedFile`, and `Artifact` fields is supported. ''' path: Path def __new__(cls, *args: object, **kwargs: object) -> Any: path, conf = _parse_artifact_args(args, kwargs) if path is not None and conf is None: return _artifact_from_path(cls, _resolve_path(path)) elif path is None and conf is not None: return _artifact_from_conf(cls, conf) elif path is not None and conf is not None: return _artifact_from_path_and_conf(cls, _resolve_path(path), conf) @property def meta(self) -> Namespace: ''' The metadata stored in `{self.path}/_meta.yaml` ''' return _read_meta(self.path) #-- MutableMapping methods ---------------------------- def __len__(self) -> int: ''' Returns the number of public files in `self.path` Non-public files (files whose names start with "_") are not counted. ''' return sum(1 for _ in self.path.glob('[!_]*')) def __iter__(self) -> Iterator[str]: ''' Yields field names corresponding to the public files in `self.path` Entries Artisan understands (subdirectories and HDF5 files) are yielded without extensions. Non-public files (files whose names start with "_") are ignored. ''' for p in self.path.glob('[!_]*'): yield p.name[:-3] if p.suffix == '.h5' else p.name def keys(self) -> Iterator[str]: return self.__iter__() def __getitem__(self, key: str) -> Any: ''' Returns an `ArrayFile`, `EncodedFile`, or `Artifact` corresponding to `self.path/key` HDF5 files are returned as `ArrayFile`s, other files are returned as `EncodedFile`s, and directories and nonexistent entries are returned as (possibly empty) `Artifact`s. Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `artifact['name.ext']` is equivalent to `artifact.name__ext`). ''' path = self.path / key.replace('__', '.') # Return an array. if Path(f'{path}.h5').is_file(): return _read_h5(path.with_suffix('.h5')) # Return the path to a file. elif path.is_file(): return path # Return a subrecord else: return Artifact(path) def __setitem__(self, key: str, val: object) -> None: ''' Writes an `ArrayFile`, `EncodedFile`, or `Artifact` to `self.path/key` `np.ndarray`-like objects are written as `ArrayFiles`, `Path`-like objects are written as `EncodedFile`s, and string-keyed mappings are written as subartifacts. Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `artifact['name.ext'] = val` is equivalent to `artifact.name__ext = val`). ''' path = self.path / key.replace('__', '.') # Copy an existing file. if isinstance(val, Path): assert path.suffix != '' _copy_file(path, val) # Write a subartifact. elif isinstance(val, (Mapping, Artifact)): assert path.suffix == '' MutableMapping.update(Artifact(path), val) # type: ignore # Write an array. else: assert path.suffix == '' _write_h5(path.with_suffix('.h5'), val) def __delitem__(self, key: str) -> None: ''' Deletes the entry at `self.path/key` Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `del artifact['name.ext']` is equivalent to `del artifact.name__ext`). ''' path = self.path / key.replace('__', '.') shutil.rmtree(path, ignore_errors=True) def extend(self, key: str, val: object) -> None: ''' Extends an `ArrayFile`, `EncodedFile`, or `Artifact` at `self.path/key` Extending `ArrayFile`s performs concatenation along the first axis, extending `EncodedFile`s performs byte-level concatenation, and extending subartifacts extends their fields. Files corresponding to `self[key]` are created if they do not already exist. ''' path = self.path / key # Append an existing file. if isinstance(val, Path): assert path.suffix != '' _extend_file(path, val) # Append a subartifact. elif isinstance(val, (Mapping, Artifact)): assert path.suffix == '' for k, v in val.items(): Artifact(path).extend(k, v) # Append an array. else: assert path.suffix == '' _extend_h5(path.with_suffix('.h5'), val) #-- Attribute-style element access -------------------- def __getattr__(self, key: str) -> Any: return self.__getitem__(key) def __setattr__(self, key: str, value: object) -> None: self.__setitem__(key, value) def __delattr__(self, key: str) -> None: self.__delitem__(key) #-- Attribute preemption, for REPL autocompletion ----- def __getattribute__(self, key: str) -> Any: if key in object.__getattribute__(self, '_cached_keys'): try: object.__setattr__(self, key, self[key]) except KeyError: object.__delattr__(self, key) object.__getattribute__(self, '_cached_keys').remove(key) return object.__getattribute__(self, key) def __dir__(self) -> List[str]: for key in self._cached_keys: object.__delattr__(self, key) self._cached_keys.clear() for key in set(self).difference(object.__dir__(self)): object.__setattr__(self, key, self[key]) self._cached_keys.add(key) return cast(list, object.__dir__(self)) #-- Artifact construction ----------------------------------------------------- def _parse_artifact_args( args: Tuple[object, ...], kwargs: Mapping[str, object] ) -> Tuple[ Optional[Path], Optional[Mapping[str, object]] ]: ''' Return `(path, conf)` or raise an error. ''' # (conf) if (len(args) == 1 and isinstance(args[0], Mapping) and len(kwargs) == 0): return None, dict(args[0]) # (**conf) elif (len(args) == 0 and len(kwargs) > 0): return None, kwargs # (path) elif (len(args) == 1 and isinstance(args[0], (str, Path)) and len(kwargs) == 0): return Path(args[0]), None # (path, conf) elif (len(args) == 2 and isinstance(args[0], (str, Path)) and isinstance(args[1], Mapping) and len(kwargs) == 0): return Path(args[0]), dict(args[1]) # (path, **conf) elif (len(args) == 1 and isinstance(args[0], (str, Path)) and len(kwargs) > 0): return Path(args[0]), kwargs # else: raise TypeError( 'Invalid argument types for the `Artifact` constructor.\n' 'Valid signatures:\n' '\n' ' - Artifact(conf: Mapping[str, object])\n' ' - Artifact(**conf_elem: object)\n' ' - Artifact(path: Path|str)\n' ' - Artifact(path: Path|str, conf: Mapping[str, object])\n' ' - Artifact(path: Path|str, **conf_elem: object)\n' ) def _artifact_from_path(cls: type, path: Path) -> Artifact: ''' Return an artifact corresponding to the file tree at `path`. An error is raised if the type recorded in `_meta.yaml`, if any, is not a subtype of `cls`. ''' spec = _read_meta(path).spec or {} written_type = get_conf().scope.get(spec.get('type', None), None) if written_type is not None and not issubclass(written_type, cls): raise FileExistsError( f'{path} is a {written_type.__module__}.{written_type.__qualname__}' f', not a {cls.__module__}.{cls.__qualname__}.' ) artifact = cast(Artifact, Configurable.__new__(cls, spec)) object.__setattr__(artifact, '_cached_keys', set()) object.__setattr__(artifact, 'path', path) return artifact def _artifact_from_conf(cls: type, conf: Mapping[str, object]) -> Artifact: ''' Find or build an artifact with the given type and configuration. ''' artifact = cast(Artifact, Configurable.__new__(cls, conf)) object.__setattr__(artifact, '_cached_keys', set()) spec = Namespace(type=_identify(type(artifact)), **artifact.conf) for path in Path(get_conf().root_dir).glob('*'): meta = _read_meta(path) if meta.spec == spec: while meta.status == 'running': sleep(0.01) meta = _read_meta(path) if meta.status == 'done': object.__setattr__(artifact, 'path', path) return artifact else: object.__setattr__(artifact, 'path', _new_artifact_path(type(artifact))) _build(artifact) return artifact def _artifact_from_path_and_conf(cls: type, path: Path, conf: Mapping[str, object]) -> Artifact: ''' Find or build an artifact with the given type, path, and configuration. ''' artifact = cast(Artifact, Configurable.__new__(cls, conf)) object.__setattr__(artifact, '_cached_keys', set()) object.__setattr__(artifact, 'path', path) if path.exists(): meta = _read_meta(path) if meta.spec != {'type': _identify(type(artifact)), **artifact.conf}: raise FileExistsError(f'"{artifact.path}" (incompatible spec)') while meta.status == 'running': sleep(0.01) meta = _read_meta(path) if artifact.meta.status == 'stopped': raise FileExistsError(f'"{artifact.path}" was stopped mid-build.') else: _build(artifact) return artifact def _build(artifact: Artifact) -> None: ''' Create parent directories, invoke `artifact.build`, and store metadata. ''' if Path(get_conf().root_dir) in artifact.path.parents: write_global_meta() # TODO: Fix YAML generation. meta_path = artifact.path / '_meta.yaml' spec = Namespace(type=_identify(type(artifact)), **artifact.conf) write_meta = lambda **kwargs: meta_path.write_text(json.dumps(kwargs)) artifact.path.mkdir(parents=True) write_meta(spec=spec, status='running') try: if callable(getattr(type(artifact), 'build', None)): n_build_args = artifact.build.__code__.co_argcount build_args = [artifact.conf] if n_build_args > 1 else [] artifact.build(*build_args) write_meta(spec=spec, status='done') except BaseException as e: write_meta(spec=spec, status='stopped') raise e def _resolve_path(path: Path) -> Path: ''' Dereference ".", "..", "~", and "@". ''' if str(path).startswith('@/'): path = Path(get_conf().root_dir) / str(path)[2:] return path.expanduser().resolve() def _new_artifact_path(type_: type) -> Path: ''' Generate an unused path in the artifact root directory. ''' root = Path(get_conf().root_dir) type_name = _identify(type_) for i in itertools.count(): dst = root / f'{type_name}_{i:04x}' if not dst.exists(): return dst assert False # for MyPy #-- I/O ----------------------------------------------------------------------- def _read_h5(path: Path) -> ArrayFile: f = h5.File(path, 'r', libver='latest', swmr=True) return f['data'] def _write_h5(path: Path, val: object) -> None: val = np.asarray(val) path.parent.mkdir(parents=True, exist_ok=True) if path.is_dir(): path.rmdir() elif path.exists(): path.unlink() f = h5.File(path, libver='latest') f.create_dataset('data', data=np.asarray(val)) def _extend_h5(path: Path, val: object) -> None: val = np.asarray(val) path.parent.mkdir(parents=True, exist_ok=True) f = h5.File(path, libver='latest') if 'data' not in f: dset = f.require_dataset( name = 'data', shape = None, maxshape = (None, *val.shape[1:]), dtype = val.dtype, data = np.empty((0, *val.shape[1:]), val.dtype), chunks = ( int(np.ceil(2**12 / np.prod(val.shape[1:]))), *val.shape[1:] ) ) f.swmr_mode = True else: dset = f['data'] dset.resize(dset.len() + len(val), 0) dset[-len(val):] = val dset.flush() def _copy_file(dst: Path, src: Path) -> None: shutil.rmtree(dst, ignore_errors=True) shutil.copy(src, dst) def _extend_file(dst: Path, src: Path) -> None: with open(src, 'r') as f_src: with open(dst, 'a+') as f_dst: f_dst.write(f_src.read()) def _read_meta(path: Path) -> Namespace: # TODO: Implement caching try: meta = namespacify(yaml.safe_load((path/'_meta.yaml').read_text())) assert isinstance(meta, Namespace) assert isinstance(meta.spec, Namespace) assert isinstance(meta.status, str) return meta except: return Namespace(spec=None, status='done') def write_global_meta() -> None: ''' Write global config information to `{root_path}/_meta.yaml`. ''' meta = {'spec': None, 'schema': schema()} meta_text = yaml.round_trip_dump(meta) Path(get_conf().root_dir).mkdir(parents=True, exist_ok=True) Path(f'{get_conf().root_dir}/_meta.yaml').write_text(meta_text) #-- Scope search -------------------------------------------------------------- def _identify(type_: type) -> str: return next(sym for sym, t in get_conf().scope.items() if t == type_) PK!ՈNartisan/_configurables.pyfrom typing import Dict, Mapping, Tuple from typing_extensions import Protocol, runtime from ._global_conf import get_conf, default_scope from ._namespaces import Namespace, namespacify from ._schemas import conf_schema_from_type __all__ = ['Configurable', 'NameConflict', 'schema'] #-- Configurable object metaclass --------------------------------------------- class ConfigurableMeta(type): ''' A type that generates an inner `Conf` class and adds itself to the default Artisan scope upon creation `ConfigurableMeta` is the metaclass for `Configurable`. ''' def __init__(self, name: str, bases: Tuple[type, ...], dict_: Dict[str, object]) -> None: super().__init__(name, bases, dict_) # Generate `Conf` if it does not exist. if not hasattr(self, 'Conf'): self.Conf = runtime(type('Conf', (Protocol,), { # type: ignore '__qualname__': self.__qualname__+'.Conf', '__module__': self.__module__ })) # Make the configurable class accessable from the configuration class. self.Conf.__recipient_type__ = self # type: ignore # Add the configurable class to the default scope. default_scope[self.__qualname__] = ( NameConflict if self.__qualname__ in default_scope else self ) class NameConflict: def __init__(self, *args: object, **kwargs: object) -> None: raise KeyError('[Name conflict in the current Artisan scope]') #-- Configurable objects ------------------------------------------------------ class Configurable(metaclass=ConfigurableMeta): ''' An object whose behavior is configured via a JSON-object-like configuration passed as the first argument to its constructor Parameters: conf: a mapping/namespace composed of arbitrarily nested `bool`, `int`, `float`, `str`, `NoneType`, sequence, and mapping/namespace instances (namespace := an object with a `__dict__` attribute). If `conf` contains a "type" field that is a `type`, `__new__` returns an instance of that type. Similarly, if `conf` contains a "type" field that is a string, `__new__` dereferences it in the current type scope and returns an instance of the resulting type (see `push_conf`/`pop_conf`/`get_conf`). ''' class Conf(Protocol): ''' A configuration If its definition is inline, it will be translated into a JSON-Schema to validate configurations passed into the outer class' constructor. `Conf` classes are intended to be interface definitions. They can extend `typing_extensions.Protocol` to support static analysis and `isinstance` calls (with the `typing_extensions.runtime` decorator). An empty `Conf` definition is created for every `Configurable` subclass defined without one. ''' pass conf: Namespace; ''' The configuration passed into the constructer, coerced to a `Namespace` ''' def __new__(cls, conf: object, *args: object, **kwargs: object) -> 'Configurable': # Coerce `conf` to a `dict`. conf = dict( conf if isinstance(conf, Mapping) else getattr(conf, '__dict__', {}) ) # Perform subclass forwarding. cls_override = conf.pop('type', None) if isinstance(cls_override, type): cls = cls_override elif isinstance(cls_override, str): try: cls = get_conf().scope[cls_override] except: raise KeyError(f'"{cls_override}" can\'t be resolved.') # Construct and return a `Configurable` instance. obj = object.__new__(cls) object.__setattr__(obj, 'conf', namespacify(conf)) return obj #-- Schema generation --------------------------------------------------------- def schema() -> dict: ''' Return a schema with a definition for each exposed type. ''' conf_types = { sym: type_.Conf # type: ignore for sym, type_ in get_conf().scope.items() if hasattr(type_, 'Conf') } return { '$schema': 'http://json-schema.org/draft-07/schema#', 'definitions': { sym: conf_schema_from_type(type_, conf_types) for sym, type_ in get_conf().scope.items() }, '$ref': '#/definitions/Configurable' } PK!H5kartisan/_global_conf.pyfrom copy import copy from contextlib import contextmanager from dataclasses import dataclass import threading from typing import Dict, Iterator as Iter, Optional as Opt __all__ = [ 'Conf', 'push_conf', 'pop_conf', 'using_conf', 'get_conf', 'default_scope' ] #-- Thread-local configuration ------------------------------------------------ @dataclass class Conf: ''' A thread-scoped Artisan configuration Attributes: root_dir: The default directory for artifact creation, and the directory that will be searched for matches when an artifact is instantiated from a specification scope: The mapping used to resolve `type`s in specifications during configurable object instantiation ''' root_dir: str scope: Dict[str, type] class ConfStack(threading.local): def __init__(self): super().__init__() self.contents = [Conf(root_dir='.', scope=default_scope)] default_scope: Dict[str, type] = {} conf_stack = ConfStack() def get_conf() -> Conf: 'Returns the active configuration' return conf_stack.contents[-1] def push_conf(conf: Opt[Conf] = None, **updates: object) -> None: 'Pushes a `Conf` onto the stack, making it the active `Conf`' conf = copy(get_conf() if conf is None else conf) for key, val in updates.items(): setattr(conf, key, val) for val in conf.scope.values(): assert isinstance(val, type) conf_stack.contents.append(conf) def pop_conf() -> Conf: 'Pops the top `Conf` off of the stack' if len(conf_stack.contents) == 1: raise IndexError( 'The default `Conf` can\'t be removed.\n\n' 'i.e. You may no longer pop. The fun must stop here.' ) return conf_stack.contents.pop() @contextmanager def using_conf(conf: Opt[Conf] = None, **updates: object) -> Iter[None]: 'Returns a context manager that executes its body with `conf` active' push_conf(conf, **updates); yield pop_conf() PK!%77artisan/_http.pyfrom multiprocessing import cpu_count from pathlib import Path import re from typing import Dict, Optional as Opt, cast import cbor2 from falcon import API, HTTPStatus, Request, Response, HTTP_200, HTTP_404 from gunicorn.app.base import BaseApplication as GunicornApp import h5py as h5 from ruamel import yaml from ._global_conf import get_conf __all__ = ['serve'] #------------------------------------------------------------------------------ # Web API def serve(port: int = 3000, root_dir: Opt[str] = None) -> None: ''' Starts a server providing access to the records in a directory ''' root = Path(root_dir or get_conf().root_dir) def write_response(req: Request, res: Response) -> None: res.content_type = 'application/cbor' res.set_header('Access-Control-Allow-Origin', '*') if req.path.endswith('/_entry-names'): path = root / req.path[1:-len('/_entry-names')] if path.is_file(): raise HTTPStatus(HTTP_404) res.data = cbor2.dumps(dict( type='plain-object', content=sorted([ re.sub(r'\.h5$', '', p.name) + ('/' if p.is_dir() else '') for p in path.glob('[!_]*') ]) )) elif req.path.endswith('/_meta'): key = req.path[1:-len('/_meta')] res.data = cbor2.dumps(_read_meta(root, key)) else: t_last = float(req.get_param('t_last') or 0) / 1000 entry = _read(root, req.path[1:], t_last) if entry['type'] == 'file': res.data = (root / cast(str, entry['content'])).read_bytes() else: res.data = cbor2.dumps(entry) res.status = HTTP_200 app = API(middleware=[_HandleCORS()]) app.add_sink(write_response) class Server(GunicornApp): # type: ignore def load(self) -> API: return app def load_config(self) -> None: self.cfg.set('bind', f'localhost:{port}') self.cfg.set('workers', cpu_count()) Server().run() class _HandleCORS(object): def process_request(self, req: Request, res: Response) -> None: res.set_header('Access-Control-Allow-Origin', '*') res.set_header('Access-Control-Allow-Methods', '*') res.set_header('Access-Control-Allow-Headers', '*') res.set_header('Access-Control-Max-Age', 600) if req.method == 'OPTIONS': raise HTTPStatus(HTTP_200) #------------------------------------------------------------------------------ # I/O _web_dtypes = dict( bool='uint8', uint8='uint8', uint16='uint16', uint32='uint32', uint64='uint32', int8='int8', int16='int16', int32='int32', int64='int32', float16='float32', float32='float32', float64='float64', float96='float64', float128='float64' ) def _read(root: Path, key: str, t_last: float) -> Dict[str, object]: if Path(f'{root}/{key}.h5').is_file(): # Array return _read_array(root, key, t_last) elif (root / key).is_file(): # Non-array file return dict(type='file', content=key) else: # Artifact return _read_artifact(root, key, t_last) def _read_array(root: Path, key: str, t_last: float) -> Dict[str, object]: if Path(f'{root}/{key}.h5').stat().st_mtime <= t_last: return dict(type='cached-value', content=None) f = h5.File(f'{root}/{key}.h5', 'r', libver='latest', swmr=True) a = f['data'][:] if a.dtype.kind in ['U', 'S']: return dict( type='string-array', content=a.astype('U').tolist() ) else: a = a.astype(_web_dtypes[a.dtype.name]) return dict( type='numeric-array', content=dict( shape=a.shape, dtype=a.dtype.name, data=a.data.tobytes() ) ) def _read_artifact(root: Path, key: str, t_last: float) -> Dict[str, object]: return dict( type='artifact', content=dict( _meta=_read_meta(root, key), **{ p.name: _read(root, str(p.relative_to(root)), t_last) for p in sorted((root / key).glob('[!_]*')) } ) ) def _read_meta(root: Path, key: str) -> Dict[str, object]: path = root / key / '_meta.yaml' if path.parent.is_file(): raise HTTPStatus(HTTP_404) try: meta = yaml.safe_load(path.read_text()) except: meta = dict(spec=None, status='done') return dict(type='plain-object', content=meta) PK!| N N artisan/_namespaces.pyfrom typing import Any, Dict, List, Mapping, cast from ._global_conf import get_conf __all__ = ['Namespace', 'namespacify'] #-- Namespaces ---------------------------------------------------------------- class Namespace(Dict[str, Any]): ''' A `dict` that supports accessing items as attributes ''' def __dir__(self) -> List[str]: return list(set([*dict.__dir__(self), *dict.__iter__(self)])) def __getattr__(self, key: str) -> Any: return dict.__getitem__(self, key) def __setattr__(self, key: str, val: object) -> None: dict.__setitem__(self, key, val) def __delattr__(self, key: str) -> None: dict.__delitem__(self, key) @property def __dict__(self) -> dict: # type: ignore return self def __repr__(self) -> str: def single_line_repr(elem: object) -> str: if isinstance(elem, list): return '[' + ', '.join(map(single_line_repr, elem)) + ']' elif isinstance(elem, Namespace): return ( 'Namespace(' + ', '.join( f'{k}={single_line_repr(v)}' for k, v in elem.items() ) + ')' ) else: return repr(elem).replace('\n', ' ') def repr_in_context(elem: object, curr_col: int, indent: int) -> str: sl_repr = single_line_repr(elem) if len(sl_repr) <= 80 - curr_col: return sl_repr elif isinstance(elem, list): return ( '[\n' + ' ' * (indent + 2) + (',\n' + ' ' * (indent + 2)).join( repr_in_context(e, indent + 2, indent + 2) for e in elem ) + '\n' + ' ' * indent + ']' ) elif isinstance(elem, Namespace): return ( 'Namespace(\n' + ' ' * (indent + 2) + (',\n' + ' ' * (indent + 2)).join( f'{k} = ' + repr_in_context(v, indent + 5 + len(k), indent + 2) for k, v in elem.items() ) + '\n' + ' ' * indent + ')' ) else: return repr(elem) return repr_in_context(self, 0, 0) def namespacify(obj: object) -> object: ''' Recursively convert mappings (item access only) and ad-hoc namespaces (attribute access only) to `Namespace`s (both item and element access). ''' if isinstance(obj, (type(None), bool, int, float, str)): return obj elif isinstance(obj, type): return next( (sym for sym, t in get_conf().scope.items() if t is obj), '' ) elif isinstance(obj, list): return [namespacify(v) for v in obj] elif isinstance(obj, Mapping): return Namespace({k: namespacify(obj[k]) for k in obj}) else: return namespacify(vars(obj)) PK!.artisan/_schemas.pyimport ast from inspect import getsource from textwrap import dedent from typing import ( Any, DefaultDict, Dict, Iterator, List, Optional, Tuple, Union, cast ) __all__ = ['conf_schema_from_type'] #-- Type aliases -------------------------------------------------------------- ObjDict = Dict[str, object] TypeDict = Dict[str, type] #-- Top-level configuration schema generation --------------------------------- def conf_schema_from_type(type_: type, scope: TypeDict = {}) -> ObjDict: ''' Return a schema for the configuration of a `type_` instance. ''' is_strict_subclass = lambda t: t is not type_ and issubclass(t, type_) if any(map(is_strict_subclass, scope.values())): return conf_schema_from_abstract_type(type_, scope) else: return conf_schema_from_concrete_type(type_, scope) def conf_schema_from_abstract_type(type_: type, scope: TypeDict) -> ObjDict: ''' Return a configuration schema for a type with subtypes. ''' return {'oneOf': [ {'allOf': [ {'required': ['type']}, {'properties': {'type': {'const': name}}}, {'$ref': '#/definitions/'+name} ]} for name, t in scope.items() if issubclass(t, type_) and t is not type_ ]} def conf_schema_from_concrete_type(type_: type, scope: TypeDict) -> ObjDict: ''' Return a configuration schema for a type with no subclasses. ''' try: mod_def = cast(ast.Module, ast.parse(dedent(getsource(type_)))) cls_def = cast(ast.ClassDef, mod_def.body[0]) except (OSError, TypeError): cls_def = ast.ClassDef('', (), (), [], []) Conf = getattr(type_, 'Conf', type('', (), {})) conf_def = ast.ClassDef('', (), (), [], []) for stmt in cls_def.body: if isinstance(stmt, ast.ClassDef) and stmt.name == 'Conf': conf_def = stmt schema: dict = { 'type': 'object', 'description': [], 'outputDescriptions': {}, 'properties': DefaultDict[str, dict](lambda: {}) } # Collect `description` & `ouputDescriptions`. for tgt, ann in literal_annotations(cls_def): if isinstance(ann, str): if tgt is None: schema['description'].append(dedent(ann).strip()) else: schema['outputDescriptions'][tgt] = dedent(ann).strip() # Collect property type annotations. for tgt, ann in getattr(Conf, '__annotations__', {}).items(): schema['properties'][tgt].update(schema_from_type_ann(ann, scope)) # Collect property defaults. for key, val in vars(Conf).items(): if not key.startswith('_'): schema['properties'][key]['default'] = val # Collect property descriptions and raw property schema. for tgt, ann in literal_annotations(conf_def): tgt_schema = schema if tgt is None else schema['properties'][tgt] tgt_schema.update(schema_from_literal_ann(ann)) # Define required properties. schema['required'] = [ key for key, val in schema['properties'].items() if 'default' not in val ] schema['description'] = '\n\n'.join(schema['description']) schema['properties'] = dict(schema['properties']) return schema #-- Configuration property schema generation ---------------------------------- def schema_from_type_ann(ann: Any, scope: TypeDict) -> ObjDict: ''' Generate a property schema from a type annotation. ''' ann_metatype = getattr(ann, '__origin__', None) if ann is object or ann is Any: return {} elif ann is None or ann is type(None): return {'type': 'null'} elif ann is bool: return {'type': 'boolean'} elif ann is int: return {'type': 'integer'} elif ann is float: return {'type': 'number'} elif ann is str: return {'type': 'string'} elif ann is list: return {'type': 'array'} elif ann is dict: return {'type': 'object'} elif ann_metatype == Union: return {'oneOf': [ schema_from_type_ann(t, scope) for t in ann.__args__ ]} elif ann_metatype in (list, List): item_schema = schema_from_type_ann(ann.__args__[0], scope) return {'type': 'array', 'items': item_schema} elif ann_metatype in (dict, Dict) and ann.__args__[0] is str: item_schema = schema_from_type_ann(ann.__args__[1], scope) return {'type': 'object', 'additionalProperties': item_schema} elif ann in scope.values(): name = next(k for k, v in scope.items() if v is ann) return {'$ref': '#/definitions/'+name} else: raise ValueError(f'Type "{ann}" can\'t be mapped to a schema.') def schema_from_literal_ann(ann: object) -> ObjDict: ''' Generate a property schema from a literal annotation. ''' if isinstance(ann, str): return {'description': dedent(ann).strip()} elif isinstance(ann, dict): return ann elif (isinstance(ann, tuple) and len(ann) == 2 and isinstance(ann[0], str) and isinstance(ann[1], dict)): return {'description': dedent(ann[0]).strip(), **ann[1]} elif (isinstance(ann, tuple) and len(ann) == 2 and isinstance(ann[0], dict) and isinstance(ann[1], str)): return {**ann[0], 'description': dedent(ann[1]).strip()} else: return {} #-- Syntax tree parsing ------------------------------------------------------- def literal_annotations(cls_def: ast.ClassDef) -> ( Iterator[Tuple[Optional[str], object]]): ''' Yield (target, annotation) pairs for the literal annotations in a class definition. ''' curr_field: Optional[str] = None for stmt in cls_def.body: # Yield the statment's value if it's a literal. if isinstance(stmt, ast.Expr): try: yield curr_field, ast.literal_eval(stmt.value) except ValueError: pass # Compute the current field. if (isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name)): curr_field = stmt.targets[0].id elif (isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name)): curr_field = stmt.target.id else: curr_field = None PK!artisan/py.typedPK!HWY%artisan_builder-0.1.0.dist-info/WHEEL A н#Z;/"b&F]xzwC;dhfCSTֻ0*Ri.4œh6-]{H, JPK!H+,(artisan_builder-0.1.0.dist-info/METADATAN0q`[@PH.e8dqإR>s_|^Q#v<k4TmQ' yr$xLƠ+X»"hb bg?7kqˊlz#)3gaXXA~W#^Rąe}T=^r{f "ua-.N1/i1/f&˯'g^{ %X_-j%Oz2s$>Cq8|qrD%yH6qFr[XMO&q*\P&Ѻdx1*K3#ZvTKvוߞH!PK!H6ٰL&artisan_builder-0.1.0.dist-info/RECORDϹP἟~ :`SYlԄeYTx*['D_uCѣgbe;}(4G~vTi04)h=P;ieA@OHQ @ewsADc:H})]>3artisan/_artifacts.pyPK!ՈN3Cartisan/_configurables.pyPK!H5kUartisan/_global_conf.pyPK!%77]artisan/_http.pyPK!| N N toartisan/_namespaces.pyPK!.{artisan/_schemas.pyPK!artisan/py.typedPK!HWY%.artisan_builder-0.1.0.dist-info/WHEELPK!H+,(ȕartisan_builder-0.1.0.dist-info/METADATAPK!H6ٰL&artisan_builder-0.1.0.dist-info/RECORDPK