PK!r"ccartisan/__init__.py''' A build system for explainable science ''' __version__ = '0.2.1' from ._artifacts import Artifact, ArrayFile, EncodedFile from ._artisan import Artisan from ._configurables import Configurable from ._namespaces import Namespace __all__ = [ 'ArrayFile', 'Artifact', 'Configurable', 'EncodedFile', 'Namespace' ] #-- `__module__` rebinding ---------------------------------------------------- Configurable.__module__ = __name__ Artifact.__module__ = __name__ ArrayFile.__module__ = __name__ Namespace.__module__ = __name__ Artisan.__module__ = __name__ #-- Wonky alias docstring definitions ----------------------------------------- EncodedFile = EncodedFile; 'An alias for `pathlib.Path`' # type: ignore #-- Backwards compatibility definitions --------------------------------------- from contextlib import contextmanager def push_conf(**kwargs): Artisan.push(**kwargs) def pop_conf(): return Artisan.pop() @contextmanager def using_conf(**kwargs): push_conf(**kwargs); yield pop_conf() def get_conf(): return Artisan.get_current() def serve(): Artisan.get_current().serve() PK!JJartisan/_artifacts.py''' This module exports the `Artifact` class, an array- and metadata-friendly view into a directory. Instances of the base artifact class have methods to simplify reading/writing collections of arrays. `Artifact` can also be subclassed to define configurable, persistent computed asset types, within Python/PEP 484's type system. This module also exports `ArrayFile` and `EncodedFile`, descriptor protocols that intended to be used as attribute type annotations within `Artifact` subclass definition. ''' import itertools import json from pathlib import Path import shutil import threading from time import sleep from typing import ( Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, cast ) from typing_extensions import Protocol import h5py as h5 import numpy as np from ruamel import yaml from ._configurables import Configurable, get_scope from ._namespaces import Namespace, namespacify __all__ = ['Artifact', 'ArrayFile', 'EncodedFile'] #-- Root artifact directory management ---------------------------------------- context = threading.local() def set_root_dir(root_dir: Optional[Path]) -> None: ''' Set the directory in which to search for artifacts. ''' context.root_dir = root_dir if root_dir is not None else Path('.') def get_root_dir() -> Path: ''' Return the current artifact search directory. ''' return getattr(context, 'root_dir', Path('.')) #-- Static type definitions --------------------------------------------------- from pathlib import Path as EncodedFile class ArrayFile(Protocol): ''' A property that corresponds to a single-array HDF5 file ''' def __get__(self, obj: object, type_: Optional[type]) -> h5.Dataset: ... def __set__(self, obj: object, val: object) -> None: ... #-- Artifacts ----------------------------------------------------------------- class Artifact(Configurable): ''' An array- and metadata-friendly view into a directory Arguments: path (Path|str): The path at which the artifact is, or should be, stored conf (Mapping[str, object]): The build configuration, optionally including a "type" field indicating the type of artifact to search for/construct Constructors: - Artifact(conf: *Mapping[str, object]*) - Artifact(**conf_elem: *object*) - Artifact(path: *Path|str*) - Artifact(path: *Path|str*, conf: *Mapping[str, object]*) - Artifact(path: *Path|str*, **conf_elem: *object*) If only `path` is provided, the artifact corresponding to `Path` is returned. It will be empty if `path` points to an empty or nonexistent directory. If only `conf` is provided, Artisan will search the current `root_dir` for a matching directory, and return an artifact pointing there if it exists. Otherwise, a new artifact will be constructed at the top level of the `root_dir`. If both `path` and `conf` are provided, Artisan will return the artifact at `path`, building it if necessary. If `path` points to an existing directory that is not a sucessfully built artifact matching `conf`, an error is raised. Fields: - **path** (*Path*): The path to the root of the file tree backing this \ artifact - **conf** (*Namespace*): The configuration (inherited from `Configurable`) - **meta** (*Namespace*): The metadata stored in \ `{self.path}/_meta.yaml` After instantiation, artifacts act as string-keyed `MutableMapping`s (with some additional capabilities), containing three types of entries: `ArrayFile`s, `EncodedFile`s, and other `Artifact`s. `ArrayFile`s are single-entry HDF5 files, in SWMR mode. Array-like numeric and byte-string data (valid operands of `numpy.asarray`) written into an artifact via `__setitem__`, `__setattr__`, or `extend` is stored as an array file. `EncodedFile`s are non-array files, presumed to be encoded in a format that Artisan does not understand. They are written and read as normalized `Path`s, which support simple text and byte I/O, and can be passed to more specialized libraries for further processing. `Artifact` entries are returned as properly subtyped artifacts, and can be created, via `__setitem__`, `__setattr__`, or `extend`, from existing artifacts or (possibly nested) string-keyed `Mapping`s (*e.g* a dictionary of arrays). ''' path: Path def __new__(cls, *args: object, **kwargs: object) -> Any: path, conf = _parse_artifact_args(args, kwargs) if path is not None and conf is None: return _artifact_from_path(cls, _resolve_path(path)) elif path is None and conf is not None: return _artifact_from_conf(cls, conf) elif path is not None and conf is not None: return _artifact_from_path_and_conf(cls, _resolve_path(path), conf) @property def meta(self) -> Namespace: ''' The metadata stored in `{self.path}/_meta.yaml` ''' return _read_meta(self.path) #-- MutableMapping methods ---------------------------- def __len__(self) -> int: ''' Returns the number of public files in `self.path` Non-public files (files whose names start with "_") are not counted. ''' return sum(1 for _ in self.path.glob('[!_]*')) def __iter__(self) -> Iterator[str]: ''' Yields field names corresponding to the public files in `self.path` Entries Artisan understands (subdirectories and HDF5 files) are yielded without extensions. Non-public files (files whose names start with "_") are ignored. ''' for p in self.path.glob('[!_]*'): yield p.name[:-3] if p.suffix == '.h5' else p.name def keys(self) -> Iterator[str]: return self.__iter__() def __getitem__(self, key: str) -> Any: ''' Returns an `ArrayFile`, `EncodedFile`, or `Artifact` corresponding to `self.path/key` HDF5 files are returned as `ArrayFile`s, other files are returned as `EncodedFile`s, and directories and nonexistent entries are returned as (possibly empty) `Artifact`s. Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `artifact['name.ext']` is equivalent to `artifact.name__ext`). ''' path = self.path / key # Return an array. if path.with_suffix('.h5').is_file(): return _read_h5(path.with_suffix('.h5')) # Return the path to a file. elif path.is_file(): return path # Return a subrecord else: return Artifact(path) def __setitem__(self, key: str, val: object) -> None: ''' Writes an `ArrayFile`, `EncodedFile`, or `Artifact` to `self.path/key` `np.ndarray`-like objects are written as `ArrayFiles`, `Path`-like objects are written as `EncodedFile`s, and string-keyed mappings are written as subartifacts. Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `artifact['name.ext'] = val` is equivalent to `artifact.name__ext = val`). ''' path = self.path / key # Copy an existing file. if isinstance(val, Path): assert path.suffix != '' _copy_file(path, val) # Write a subartifact. elif isinstance(val, (Mapping, Artifact)): assert path.suffix == '' MutableMapping.update(Artifact(path), val) # type: ignore # Write an array. else: assert path.suffix == '' _write_h5(path.with_suffix('.h5'), val) def __delitem__(self, key: str) -> None: ''' Deletes the entry at `self.path/key` Attribute access syntax is also supported, and occurrences of "__" in `key` are transformed into ".", to support accessing encoded files as attributes (i.e. `del artifact['name.ext']` is equivalent to `del artifact.name__ext`). ''' path = self.path / key # Delete an array file. if path.with_suffix('.h5').is_file(): path.with_suffix('.h5').unlink() # Delete a non-array file. elif path.is_file(): path.unlink() # Delete an artifact. else: shutil.rmtree(path, ignore_errors=True) def extend(self, key: str, val: object) -> None: ''' Extends an `ArrayFile`, `EncodedFile`, or `Artifact` at `self.path/key` Extending `ArrayFile`s performs concatenation along the first axis, extending `EncodedFile`s performs byte-level concatenation, and extending subartifacts extends their fields. Files corresponding to `self[key]` are created if they do not already exist. ''' path = self.path / key # Append an existing file. if isinstance(val, Path): assert path.suffix != '' _extend_file(path, val) # Append a subartifact. elif isinstance(val, (Mapping, Artifact)): assert path.suffix == '' for k in val: Artifact(path).extend(k, val[k]) # Append an array. else: assert path.suffix == '' _extend_h5(path.with_suffix('.h5'), val) #-- Attribute-style element access -------------------- def __getattr__(self, key: str) -> Any: return self.__getitem__(key.replace('__', '.')) def __setattr__(self, key: str, value: object) -> None: self.__setitem__(key.replace('__', '.'), value) def __delattr__(self, key: str) -> None: self.__delitem__(key.replace('__', '.')) #-- Attribute preemption, for REPL autocompletion ----- def __getattribute__(self, key: str) -> Any: if key in object.__getattribute__(self, '_cached_keys'): try: object.__setattr__(self, key, self[key]) except KeyError: object.__delattr__(self, key) object.__getattribute__(self, '_cached_keys').remove(key) return object.__getattribute__(self, key) def __dir__(self) -> List[str]: for key in self._cached_keys: object.__delattr__(self, key) self._cached_keys.clear() for key in set(self).difference(object.__dir__(self)): object.__setattr__(self, key, self[key]) self._cached_keys.add(key) return cast(list, object.__dir__(self)) #-- Artifact construction ----------------------------------------------------- def _parse_artifact_args( args: Tuple[object, ...], kwargs: Mapping[str, object] ) -> Tuple[ Optional[Path], Optional[Mapping[str, object]] ]: ''' Return `(path, conf)` or raise an error. ''' # (conf) if (len(args) == 1 and isinstance(args[0], Mapping) and len(kwargs) == 0): return None, dict(args[0]) # (**conf) elif (len(args) == 0 and len(kwargs) > 0): return None, kwargs # (path) elif (len(args) == 1 and isinstance(args[0], (str, Path)) and len(kwargs) == 0): return Path(args[0]), None # (path, conf) elif (len(args) == 2 and isinstance(args[0], (str, Path)) and isinstance(args[1], Mapping) and len(kwargs) == 0): return Path(args[0]), dict(args[1]) # (path, **conf) elif (len(args) == 1 and isinstance(args[0], (str, Path)) and len(kwargs) > 0): return Path(args[0]), kwargs # else: raise TypeError( 'Invalid argument types for the `Artifact` constructor.\n' 'Valid signatures:\n' '\n' ' - Artifact(conf: Mapping[str, object])\n' ' - Artifact(**conf_elem: object)\n' ' - Artifact(path: Path|str)\n' ' - Artifact(path: Path|str, conf: Mapping[str, object])\n' ' - Artifact(path: Path|str, **conf_elem: object)\n' ) def _artifact_from_path(cls: type, path: Path) -> Artifact: ''' Return an artifact corresponding to the file tree at `path`. An error is raised if the type recorded in `_meta.yaml`, if any, is not a subtype of `cls`. ''' spec = _read_meta(path).spec or {} written_type = get_scope().get(spec.get('type', None), None) if path.is_file(): raise FileExistsError(f'{path} is a file.') if hasattr(cls, 'build') and not path.is_dir(): raise FileNotFoundError(f'{path} does not exist.') if written_type is not None and not issubclass(written_type, cls): raise FileExistsError( f'{path} is a {written_type.__module__}.{written_type.__qualname__}' f', not a {cls.__module__}.{cls.__qualname__}.' ) artifact = cast(Artifact, Configurable.__new__(cls, spec)) object.__setattr__(artifact, '_cached_keys', set()) object.__setattr__(artifact, 'path', path) return artifact def _artifact_from_conf(cls: type, conf: Mapping[str, object]) -> Artifact: ''' Find or build an artifact with the given type and configuration. ''' artifact = cast(Artifact, Configurable.__new__(cls, conf)) object.__setattr__(artifact, '_cached_keys', set()) spec = Namespace(type=_identify(type(artifact)), **artifact.conf) for path in Path(get_root_dir()).glob('*'): meta = _read_meta(path) if meta.spec == spec: while meta.status == 'running': sleep(0.01) meta = _read_meta(path) if meta.status == 'done': object.__setattr__(artifact, 'path', path) return artifact else: object.__setattr__(artifact, 'path', _new_artifact_path(type(artifact))) _build(artifact) return artifact def _artifact_from_path_and_conf(cls: type, path: Path, conf: Mapping[str, object]) -> Artifact: ''' Find or build an artifact with the given type, path, and configuration. ''' artifact = cast(Artifact, Configurable.__new__(cls, conf)) object.__setattr__(artifact, '_cached_keys', set()) object.__setattr__(artifact, 'path', path) if path.exists(): meta = _read_meta(path) if meta.spec != {'type': _identify(type(artifact)), **artifact.conf}: raise FileExistsError(f'"{artifact.path}" (incompatible spec)') while meta.status == 'running': sleep(0.01) meta = _read_meta(path) if artifact.meta.status == 'stopped': raise FileExistsError(f'"{artifact.path}" was stopped mid-build.') else: _build(artifact) return artifact def _build(artifact: Artifact) -> None: ''' Create parent directories, invoke `artifact.build`, and store metadata. ''' # TODO: Fix YAML generation. meta_path = artifact.path / '_meta.yaml' spec = Namespace(type=_identify(type(artifact)), **artifact.conf) write_meta = lambda **kwargs: meta_path.write_text( json.dumps(_identify_elements(kwargs)) ) artifact.path.mkdir(parents=True) write_meta(spec=spec, status='running') try: if callable(getattr(type(artifact), 'build', None)): n_build_args = artifact.build.__code__.co_argcount build_args = [artifact.conf] if n_build_args > 1 else [] artifact.build(*build_args) write_meta(spec=spec, status='done') except BaseException as e: write_meta(spec=spec, status='stopped') raise e def _resolve_path(path: Path) -> Path: ''' Dereference ".", "..", "~", and "@". ''' if str(path).startswith('@/'): path = Path(get_root_dir()) / str(path)[2:] return path.expanduser().resolve() def _new_artifact_path(type_: type) -> Path: ''' Generate an unused path in the artifact root directory. ''' root = Path(get_root_dir()) type_name = _identify(type_) for i in itertools.count(): dst = root / f'{type_name}_{i:04x}' if not dst.exists(): return dst assert False # for MyPy #-- I/O ----------------------------------------------------------------------- def _read_h5(path: Path) -> ArrayFile: f = h5.File(path, 'r', libver='latest', swmr=True) return f['data'] def _write_h5(path: Path, val: object) -> None: val = np.asarray(val) path.parent.mkdir(parents=True, exist_ok=True) if path.is_dir(): path.rmdir() elif path.exists(): path.unlink() f = h5.File(path, libver='latest') f.create_dataset('data', data=np.asarray(val)) def _extend_h5(path: Path, val: object) -> None: val = np.asarray(val) path.parent.mkdir(parents=True, exist_ok=True) f = h5.File(path, libver='latest') if 'data' not in f: dset = f.require_dataset( name = 'data', shape = None, maxshape = (None, *val.shape[1:]), dtype = val.dtype, data = np.empty((0, *val.shape[1:]), val.dtype), chunks = ( int(np.ceil(2**12 / np.prod(val.shape[1:]))), *val.shape[1:] ) ) f.swmr_mode = True else: dset = f['data'] if len(val) > 0: dset.resize(dset.len() + len(val), 0) dset[-len(val):] = val dset.flush() def _copy_file(dst: Path, src: Path) -> None: shutil.rmtree(dst, ignore_errors=True) dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy(src, dst) def _extend_file(dst: Path, src: Path) -> None: dst.parent.mkdir(parents=True, exist_ok=True) with open(src, 'rb') as f_src: with open(dst, 'ab+') as f_dst: f_dst.write(f_src.read()) def _read_meta(path: Path) -> Namespace: # TODO: Implement caching try: meta = namespacify(yaml.safe_load((path/'_meta.yaml').read_text())) assert isinstance(meta, Namespace) assert isinstance(meta.spec, Namespace) assert isinstance(meta.status, str) return meta except: return Namespace(spec=None, status='done') #-- Scope search -------------------------------------------------------------- def _identify(type_: type) -> str: return next(sym for sym, t in get_scope().items() if t == type_) def _identify_elements(obj: object) -> object: if isinstance(obj, type): return _identify(obj) elif isinstance(obj, list): return [_identify_elements(elem) for elem in obj] elif isinstance(obj, dict): return Namespace({k: _identify_elements(obj[k]) for k in obj}) else: return obj PK!ggartisan/_artisan.py''' This module exports the `Artisan` class. `Artisan` objects represent the thread-local state of an Artisan environment. They can also be used as an HTTP server or WSGI application. ''' from pathlib import Path import threading from typing import Callable, Dict, Iterable, List, Mapping, Optional, Union from wsgiref.simple_server import make_server from ._artifacts import Artifact, set_root_dir from ._configurables import default_scope, get_schema, set_scope from ._http import wsgi_app from ._namespaces import Namespace __all__ = ['Artisan'] #------------------------------------------------------------------------------ class Artisan: ''' The thread-local state of an Artisan environment `Artisan` objects can also be used as an HTTP or WSGI server. Parameters: root_dir: The default directory for artifact creation, and the directory that will be searched for matches when an artifact is instantiated from a specification scope: The mapping used to resolve `type`s in specifications during configurable object instantiation build: [Not currently used] ''' root_dir: Path scope: Dict[str, type] build: Optional[Callable[[str, dict], None]] def __init__(self, *, root_dir: Union[str, Path, None] = None, scope: Optional[Mapping[str, type]] = None, build: Optional[Callable[[str, dict], None]] = None) -> None: self.root_dir = Path('.') if root_dir is None else Path(root_dir) self.scope = default_scope if scope is None else Namespace(scope) self.build = Artifact if build is None else build # type: ignore #-- Context manipulation ------------------------------ @staticmethod def get_current() -> 'Artisan': ''' Return the currently active artisan. ''' return artisan_stack.contents[-1] @staticmethod def push(artisan: Optional['Artisan'] = None, *, root_dir: Union[str, Path, None] = None, scope: Optional[Mapping[str, type]] = None, build: Optional[Callable[[str, dict], None]] = None) -> None: ''' Push an artisan onto the thread-local artisan stack, making it active. `root_dir`, `scope`, and `build` override the corresponding attributes of `artisan` if they are defined. ''' top_artisan = Artisan( root_dir = ( root_dir if root_dir is not None else getattr(artisan, 'root_dir', None) ), scope = ( scope if scope is not None else getattr(artisan, 'scope', None) ), build = ( build if build is not None else getattr(artisan, 'build', None) ) ) artisan_stack.contents.append(top_artisan) set_scope(top_artisan.scope) set_root_dir(top_artisan.root_dir) @staticmethod def pop() -> 'Artisan': ''' Remove and return the current artisan from the artisan stack. The previously present artisan becomes active after this method is called. ''' set_scope(artisan_stack.contents[-2].scope) set_root_dir(artisan_stack.contents[-2].root_dir) return artisan_stack.contents.pop() def __enter__(self) -> None: ''' [Equivalent to `Artisan.push(self)`] ''' Artisan.push(self) def __exit__(self, *args: object) -> None: ''' [Equivalent to `Artisan.pop()`] ''' Artisan.pop() #-- JSON-Schema generation ---------------------------- @property def schema(self) -> dict: ''' The JSON Schema describing specifications accepted by this artisan ''' with self: return get_schema() #-- HTTP API ------------------------------------------ def __call__(self, env: dict, start_response: Callable) -> Iterable[bytes]: ''' Respond to a WSGI server request. This method is defined so WSGI servers (*e.g.* `Gunicorn `_ ) can use an `Artisan` object as a WSGI application. ''' with self: return wsgi_app(env, start_response) def serve(self, port: int = 3000) -> None: ''' Start an HTTP server providing access to artifacts and the current schema. This method uses the reference WSGI server defined in the standard library. Other servers, which can be installed via `pip`, may be more robust and performant. ''' with make_server('', port, self) as server: server.serve_forever() #------------------------------------------------------------------------------ class ArtisanStack(threading.local): def __init__(self) -> None: self.contents: List[Artisan] = [Artisan()] artisan_stack = ArtisanStack() PK!q>Martisan/_configurables.py''' This module defines `Configurable`, a class of objects whose constructors accept a JSON-like configuration as their first argument. Configurable objects provide the following features: - A `conf` field that stores the configuration passed into the constructor - The ability to define a `Conf` class (which is converted to a JSON-Schema) that documents/validates the expected form of the configuration - Subclass forwarding — If the configuration contains a "type" field, it determines the class of `Configurable` that is constructed. This is useful e.g. when constructing objects from deserialized configurations. ''' import threading from typing import Dict, Mapping, Optional, Tuple, Type from typing_extensions import Protocol from ._namespaces import Namespace, namespacify from ._schemas import conf_schema_from_type __all__ = [ 'Configurable', 'NameConflict', 'default_scope', 'get_schema', 'get_scope', 'set_scope' ] #-- Scope management ---------------------------------------------------------- default_scope: Dict[str, type] = {} context = threading.local() def set_scope(scope: Optional[Dict[str, type]]) -> None: ''' Set the scope used for "type" field resolution. ''' context.scope = scope if scope is not None else default_scope def get_scope() -> Dict[str, type]: ''' Return the scope used for "type" field resolution. ''' return getattr(context, 'scope', default_scope) #-- Schema generation --------------------------------------------------------- def get_schema() -> dict: ''' Return a schema with a definition for each exposed type. ''' scope = get_scope() return { '$schema': 'http://json-schema.org/draft-07/schema#', 'definitions': { sym: conf_schema_from_type(type_, scope) for sym, type_ in scope.items() }, '$ref': '#/definitions/Configurable' } #-- Configurable object metaclass --------------------------------------------- class ConfigurableMeta(type): ''' A type that generates an inner `Conf` class and adds itself to the default Artisan scope upon creation `ConfigurableMeta` is the metaclass for `Configurable`. ''' # TODO: Eliminate this class. def __init__(self, name: str, bases: Tuple[type, ...], dict_: Dict[str, object]) -> None: super().__init__(name, bases, dict_) # Generate `Conf` if it does not exist. if not hasattr(self, 'Conf'): self.Conf = type('Conf', (Protocol,), { # type: ignore '__qualname__': self.__qualname__+'.Conf', '__module__': self.__module__ }) # Add the configurable class to the default scope. default_scope[self.__qualname__] = ( NameConflict if self.__qualname__ in default_scope else self ) class NameConflict: def __init__(self, *args: object, **kwargs: object) -> None: raise KeyError('[Name conflict in the current Artisan scope]') #-- Configurable objects ------------------------------------------------------ class GenericConf(Protocol): ''' [A descriptor type that enables Jedi autocompletion support for conf fields] ''' def __get__(self, obj, type_): return obj.Conf() class Configurable(metaclass=ConfigurableMeta): ''' An object whose behavior is configured via a JSON-object-like configuration passed as the first argument to its constructor Parameters: conf: a mapping/namespace composed of arbitrarily nested `bool`, `int`, `float`, `str`, `NoneType`, sequence, and mapping/namespace instances (namespace := an object with a `__dict__` attribute). If `conf` contains a "type" field that is a `type`, `__new__` returns an instance of that type. Similarly, if `conf` contains a "type" field that is a string, `__new__` dereferences it in the current type scope and returns an instance of the resulting type (the `Artisan` class can be used to manipulate the type scope). ''' class Conf(Protocol): ''' A configuration If its definition is inline (lexically within the containing class' definition), it will be translated into a JSON-Schema to validate configurations passed into the outer class' constructor. `Conf` classes are intended to be interface definitions. They can extend `typing_extensions.Protocol` to support static analysis. An empty `Conf` definition is created for every `Configurable` subclass defined without one. ''' pass conf: GenericConf; ''' The configuration passed into the constructor, coerced to a `Namespace` ''' def __new__(cls, conf: object, *args: object, **kwargs: object) -> 'Configurable': # Coerce `conf` to a `dict`. conf = dict( conf if isinstance(conf, Mapping) else getattr(conf, '__dict__', {}) ) # Perform subclass forwarding. cls_override = conf.pop('type', None) if isinstance(cls_override, type): cls = cls_override elif isinstance(cls_override, str): try: cls = get_scope()[cls_override] except: raise KeyError(f'"{cls_override}" can\'t be resolved.') # Construct and return a `Configurable` instance. obj = object.__new__(cls) object.__setattr__(obj, 'conf', namespacify(conf)) return obj PK!Q%artisan/_http.py''' This module exports the `wsgi_app` object, a WSGI application that supports queries of artifact data and metadata. TODO: Switch to the following HTTP endpoints: - `/_schema`: The schema, in JSON format - `/x/y/z[t_last=null]`: A full fetch of an artifact, file, or array - `/x/y/z/_meta[depth=0]`: Artifact/file/array metadata ''' from multiprocessing import cpu_count from pathlib import Path import re from typing import Dict, Iterator, Optional as Opt, cast import cbor2 from falcon import API, HTTPStatus, Request, Response, HTTP_200, HTTP_404 from gunicorn.app.base import BaseApplication as GunicornApp import h5py as h5 from ruamel import yaml from ._artifacts import get_root_dir from ._configurables import get_schema __all__ = ['serve'] #-- Web API ------------------------------------------------------------------- def write_response(req: Request, res: Response) -> None: root = get_root_dir() res.content_type = 'application/cbor' res.set_header('Access-Control-Allow-Origin', '*') if req.path.endswith('/_entry-names'): path = root / req.path[1:-len('/_entry-names')] if path.is_file(): raise HTTPStatus(HTTP_404) res.data = cbor2.dumps(dict( type='plain-object', content=sorted([ re.sub(r'\.h5$', '', p.name) + ('/' if p.is_dir() else '') for p in path.glob('[!_]*') ]) )) elif req.path.endswith('/_entries'): path = root / req.path[1:-len('/_entries')] if path.is_file(): raise HTTPStatus(HTTP_404) res.data = cbor2.dumps(dict( type='plain-object', content=list(_entries(path)) )) elif req.path == '/_meta': res.data = cbor2.dumps(dict( type='plain-object', content=dict(spec=None, schema=get_schema()) )) elif req.path.endswith('/_meta'): key = req.path[1:-len('/_meta')] if (root / key).with_suffix('.h5').is_file(): res.data = cbor2.dumps(dict( type='plain-object', content={'IS_ARRAY': True} )) else: res.data = cbor2.dumps(_read_meta(root, key)) else: t_last = float(req.get_param('t_last') or 0) / 1000 entry = _read(root, req.path[1:], t_last) if entry['type'] == 'file': res.data = (root / cast(str, entry['content'])).read_bytes() else: res.data = cbor2.dumps(entry) res.status = HTTP_200 class HandleCORS(object): def process_request(self, req: Request, res: Response) -> None: res.set_header('Access-Control-Allow-Origin', '*') res.set_header('Access-Control-Allow-Methods', '*') res.set_header('Access-Control-Allow-Headers', '*') res.set_header('Access-Control-Max-Age', 600) if req.method == 'OPTIONS': raise HTTPStatus(HTTP_200) wsgi_app = API(middleware=[HandleCORS()]) wsgi_app.add_sink(write_response) #-- I/O ----------------------------------------------------------------------- _web_dtypes = dict( bool='uint8', uint8='uint8', uint16='uint16', uint32='uint32', uint64='uint32', int8='int8', int16='int16', int32='int32', int64='int32', float16='float32', float32='float32', float64='float64', float96='float64', float128='float64' ) def _read(root: Path, key: str, t_last: float) -> Dict[str, object]: if Path(f'{root}/{key}.h5').is_file(): # Array return _read_array(root, key, t_last) elif (root / key).is_file(): # Non-array file return dict(type='file', content=key) else: # Artifact return _read_artifact(root, key, t_last) def _read_array(root: Path, key: str, t_last: float) -> Dict[str, object]: if Path(f'{root}/{key}.h5').stat().st_mtime <= t_last: return dict(type='cached-value', content=None) f = h5.File(f'{root}/{key}.h5', 'r', libver='latest', swmr=True) a = f['data'][()] if a.dtype.kind in ['U', 'S']: return dict( type='string-array', content=a.astype('U').tolist() ) else: a = a.astype(_web_dtypes[a.dtype.name]) return dict( type='numeric-array', content=dict( shape=a.shape, dtype=a.dtype.name, data=a.data.tobytes() ) ) def _read_artifact(root: Path, key: str, t_last: float) -> Dict[str, object]: return dict( type='artifact', content=dict( _meta=_read_meta(root, key), **{ p.name: _read(root, str(p.relative_to(root)), t_last) for p in sorted((root / key).glob('[!_]*')) } ) ) def _read_meta(root: Path, key: str) -> Dict[str, object]: path = root / key / '_meta.yaml' if path.parent.is_file(): raise HTTPStatus(HTTP_404) try: meta = yaml.safe_load(path.read_text()) except: meta = dict(spec=None, status='done') return dict(type='plain-object', content=meta) def _entries(path: Path) -> Iterator[dict]: for p in sorted(path.glob('[!_]*')): if p.is_dir(): yield { 'type': 'artifact', 'name': p.name, 'nEntries': len(list(p.iterdir())) } elif p.suffix == '.h5': f = h5.File(p, 'r', libver='latest', swmr=True) a = f['data'][()] if a.dtype.kind in ['U', 'S']: yield { 'type': 'string-array', 'name': p.stem, 'dtype': 'string', 'shape': a.shape } else: yield { 'type': 'numeric-array', 'name': p.stem, 'dtype': _web_dtypes[a.dtype.name], 'shape': a.shape } else: yield { 'type': 'file', 'name': p.name, 'size': p.stat().st_size } PK!.Q artisan/_namespaces.py''' This module exports `Namespace`, a `dict` that supports accessing items at attributes, for convenience, and to better support static analysis. It also exports`namespacify`, a function that recursively converts mappings and namespace-like containers in JSON-like objects to `Namespace`s. ''' from typing import Any, Dict, List, Mapping __all__ = ['Namespace', 'namespacify'] #-- Namespaces ---------------------------------------------------------------- class Namespace(Dict[str, Any]): ''' A `dict` that supports accessing items as attributes ''' def __dir__(self) -> List[str]: return list(set([*dict.__dir__(self), *dict.__iter__(self)])) def __getattr__(self, key: str) -> Any: return dict.__getitem__(self, key) def __setattr__(self, key: str, val: object) -> None: dict.__setitem__(self, key, val) def __delattr__(self, key: str) -> None: dict.__delitem__(self, key) @property def __dict__(self) -> dict: # type: ignore return self def __repr__(self) -> str: def single_line_repr(elem: object) -> str: if isinstance(elem, list): return '[' + ', '.join(map(single_line_repr, elem)) + ']' elif isinstance(elem, Namespace): return ( 'Namespace(' + ', '.join( f'{k}={single_line_repr(v)}' for k, v in elem.items() ) + ')' ) else: return repr(elem).replace('\n', ' ') def repr_in_context(elem: object, curr_col: int, indent: int) -> str: sl_repr = single_line_repr(elem) if len(sl_repr) <= 80 - curr_col: return sl_repr elif isinstance(elem, list): return ( '[\n' + ' ' * (indent + 2) + (',\n' + ' ' * (indent + 2)).join( repr_in_context(e, indent + 2, indent + 2) for e in elem ) + '\n' + ' ' * indent + ']' ) elif isinstance(elem, Namespace): return ( 'Namespace(\n' + ' ' * (indent + 2) + (',\n' + ' ' * (indent + 2)).join( f'{k} = ' + repr_in_context(v, indent + 5 + len(k), indent + 2) for k, v in elem.items() ) + '\n' + ' ' * indent + ')' ) else: return repr(elem) return repr_in_context(self, 0, 0) def namespacify(obj: object) -> object: ''' Recursively convert mappings (item access only) and ad-hoc namespaces (attribute access only) to `Namespace`s (both item and element access). ''' if isinstance(obj, (type(None), bool, int, float, str, type)): return obj elif isinstance(obj, list): return [namespacify(v) for v in obj] elif isinstance(obj, Mapping): return Namespace({k: namespacify(obj[k]) for k in obj}) else: return namespacify(vars(obj)) PK!(JSSartisan/_schemas.py''' This module exports the `conf_schema_from_type` function, which generates a JSON schema describing the space of JSON-like configurations accepted by a class' constructor. ''' import ast from inspect import getsource from textwrap import dedent from typing import ( Any, DefaultDict, Dict, Iterator, List, Optional, Tuple, Union, cast ) __all__ = ['conf_schema_from_type'] #-- Type aliases -------------------------------------------------------------- ObjDict = Dict[str, object] TypeDict = Dict[str, type] #-- Top-level configuration schema generation --------------------------------- def conf_schema_from_type(type_: type, scope: TypeDict = {}) -> ObjDict: ''' Return a schema for the configuration of a `type_` instance. A concrete type's schema (the schema of a type with no subclasses in `scope`) has the following fields: - `type (str)`: "object" - `description (str)`: `type_`'s docstring, concatenated with any other non-attribute-annotating top-level string literals in the body of `type_`'s definition - `outputDescriptions (Dict[str, str])`: Descriptions of `type_`'s attributes, generated from top-level string literals immediately following attribute type annotations - `properties (Dict[str, dict])`: Per-property schemas, including descriptions and/or defaults, generated from attribute declarations/definitions in `type_`'s inner `Conf` class, if it exists - `required (List[str])`: The list of required properties (those without default values) An abstract type's schema describes the type-tagged union of the schemas of its subclasses. ''' is_strict_subclass = lambda t: t is not type_ and issubclass(t, type_) if any(map(is_strict_subclass, scope.values())): return conf_schema_from_abstract_type(type_, scope) else: return conf_schema_from_concrete_type(type_, scope) def conf_schema_from_abstract_type(type_: type, scope: TypeDict) -> ObjDict: ''' Return a configuration schema for a type with subtypes. ''' return {'oneOf': [ {'allOf': [ {'required': ['type']}, {'properties': {'type': {'const': name}}}, {'$ref': '#/definitions/'+name} ]} for name, t in scope.items() if issubclass(t, type_) and t is not type_ ]} def conf_schema_from_concrete_type(type_: type, scope: TypeDict) -> ObjDict: ''' Return a configuration schema for a type with no subclasses. ''' try: mod_def = cast(ast.Module, ast.parse(dedent(getsource(type_)))) cls_def = cast(ast.ClassDef, mod_def.body[0]) except (OSError, TypeError): cls_def = ast.ClassDef('', (), (), [], []) Conf = getattr(type_, 'Conf', type('', (), {})) conf_def = ast.ClassDef('', (), (), [], []) for stmt in cls_def.body: if isinstance(stmt, ast.ClassDef) and stmt.name == 'Conf': conf_def = stmt schema: dict = { 'type': 'object', 'description': [], 'outputDescriptions': {}, 'properties': DefaultDict[str, dict](lambda: {}) } # Collect `description` & `ouputDescriptions`. for tgt, ann in literal_annotations(cls_def): if isinstance(ann, str): if tgt is None: schema['description'].append(dedent(ann).strip()) else: schema['outputDescriptions'][tgt] = dedent(ann).strip() # Collect property type annotations. for tgt, ann in getattr(Conf, '__annotations__', {}).items(): schema['properties'][tgt].update(schema_from_type_ann(ann, scope)) # Collect property defaults. for key, val in vars(Conf).items(): if not key.startswith('_'): schema['properties'][key]['default'] = val # Collect property descriptions and raw property schema. for tgt, ann in literal_annotations(conf_def): tgt_schema = schema if tgt is None else schema['properties'][tgt] tgt_schema.update(schema_from_literal_ann(ann)) # Define required properties. schema['required'] = [ key for key, val in schema['properties'].items() if 'default' not in val ] schema['description'] = '\n\n'.join(schema['description']) schema['properties'] = dict(schema['properties']) return schema #-- Configuration property schema generation ---------------------------------- def schema_from_type_ann(ann: Any, scope: TypeDict) -> ObjDict: ''' Generate a property schema from a type annotation. ''' ann_metatype = getattr(ann, '__origin__', None) if ann is object or ann is Any: return {} elif ann is None or ann is type(None): return {'type': 'null'} elif ann is bool: return {'type': 'boolean'} elif ann is int: return {'type': 'integer'} elif ann is float: return {'type': 'number'} elif ann is str: return {'type': 'string'} elif ann is list: return {'type': 'array'} elif ann is dict: return {'type': 'object'} elif ann_metatype == Union: return {'oneOf': [ schema_from_type_ann(t, scope) for t in ann.__args__ ]} elif ann_metatype in (list, List): item_schema = schema_from_type_ann(ann.__args__[0], scope) return {'type': 'array', 'items': item_schema} elif ann_metatype in (dict, Dict) and ann.__args__[0] is str: item_schema = schema_from_type_ann(ann.__args__[1], scope) return {'type': 'object', 'additionalProperties': item_schema} elif ann in [getattr(v, 'Conf', None) for v in scope.values()]: name = next(k for k in scope if getattr(scope[k], 'Conf', None) is ann) return {'$ref': '#/definitions/'+name} else: raise ValueError(f'Type "{ann}" can\'t be mapped to a schema.') def schema_from_literal_ann(ann: object) -> ObjDict: ''' Generate a property schema from a literal annotation. ''' if isinstance(ann, str): return {'description': dedent(ann).strip()} elif isinstance(ann, dict): return ann elif (isinstance(ann, tuple) and len(ann) == 2 and isinstance(ann[0], str) and isinstance(ann[1], dict)): return {'description': dedent(ann[0]).strip(), **ann[1]} elif (isinstance(ann, tuple) and len(ann) == 2 and isinstance(ann[0], dict) and isinstance(ann[1], str)): return {**ann[0], 'description': dedent(ann[1]).strip()} else: return {} #-- Syntax tree parsing ------------------------------------------------------- def literal_annotations(cls_def: ast.ClassDef) -> ( Iterator[Tuple[Optional[str], object]]): ''' Yield (target, annotation) pairs for the literal annotations in a class definition. ''' curr_field: Optional[str] = None for stmt in cls_def.body: # Yield the statment's value if it's a literal. if isinstance(stmt, ast.Expr): try: yield curr_field, ast.literal_eval(stmt.value) except ValueError: pass # Compute the current field. if (isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name)): curr_field = stmt.targets[0].id elif (isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name)): curr_field = stmt.target.id else: curr_field = None PK!artisan/py.typedPK!HWY%artisan_builder-0.2.1.dist-info/WHEEL A н#Z;/"b&F]xzwC;dhfCSTֻ0*Ri.4œh6-]{H, JPK!HEsC(artisan_builder-0.2.1.dist-info/METADATAS]o0|?J!NkExl7&1@φH^ț=;3kK3;a+Ťk`#_=Zjc ]m:/兜IZS[̀4Lzv49E⣷T^jE.19mɣ|m Ȣ6 XF+4(+`Jғ4 =6R7yß?臈j7-gċ|ڬ˕WkjjśՊK~)U:Tf qS3G;GSΙHj,CqZx/´\`,v#'f9S>1sn8I\SRQ̗;suG83>V1e!#A_Ϋbإ%:x)sk;ry,Ծ>~(PK!HH&artisan_builder-0.2.1.dist-info/RECORDv@}mAa dͦN S>}w/Uvwq)AXZ!8ĆFNtPro"˽Ac4 re}҆Hp]녦6>y,gi:*L*1YRu݄0nSׇ8hn eA/rQ< T.#.lߔW|hXy 9a$VDR;nȱuY/V:(y0[]J#"Ô[umD[p_zNyUw /Bρr`ѬzZ(n}xK^ m<Mbartisan/_configurables.pyPK!Q% yartisan/_http.pyPK!.Q Tartisan/_namespaces.pyPK!(JSS:artisan/_schemas.pyPK!artisan/py.typedPK!HWY%artisan_builder-0.2.1.dist-info/WHEELPK!HEsC(artisan_builder-0.2.1.dist-info/METADATAPK!HH&^artisan_builder-0.2.1.dist-info/RECORDPK