PK!,&mona/__init__.pyfrom .app import Mona from .rules import Rule from .runners import run_process, run_shell, run_thread from .sessions import Session __all__ = ['Rule', 'run_process', 'run_shell', 'run_thread', 'Session', 'Mona'] PK!   mona/app.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json import logging import os from contextlib import contextmanager from pathlib import Path from typing import ( Any, Callable, Dict, Iterable, Iterator, List, MutableMapping, NamedTuple, Tuple, TypeVar, cast, ) import toml from .files import HashedFile, file_from_path from .plugins import Cache, FileManager, Parallel, TmpdirManager from .remotes import Remote from .rules import Rule from .sessions import Session from .tasks import Task from .utils import Pathable, get_timestamp __all__ = () log = logging.getLogger(__name__) _R = TypeVar('_R', bound=Rule[object]) ArgFactory = Callable[[str], object] class Entry(NamedTuple): rule: Rule[object] factories: Tuple[ArgFactory, ...] stdout: bool class Mona: MONADIR = '.mona' TMPDIR = 'tmpdir' FILES = 'files' CACHE = 'cache.db' LAST_ENTRY = 'LAST_ENTRY' def __init__(self, monadir: Pathable = None) -> None: monadir = monadir or os.environ.get('MONA_DIR') or Mona.MONADIR self._monadir = Path(monadir).resolve() self._configfile = self._monadir / 'config.toml' self._config: Dict[str, Any] = {} self._entries: Dict[str, Entry] = {} for path in [ Path('~/.config/mona/config.toml').expanduser(), Path('mona.toml'), self._configfile, ]: if path.exists(): with path.open() as f: self._config.update(toml.load(f)) def entry( self, name: str, *factories: ArgFactory, stdout: bool = False ) -> Callable[[_R], _R]: def decorator(rule: _R) -> _R: self._entries[name] = Entry(rule, factories, stdout) return rule return decorator def get_entry(self, name: str) -> Entry: return self._entries[name] def call_entry(self, name: str, *arg_strings: str) -> Task[object]: rule, factories, _ = self._entries[name] args = [factory(arg_str) for factory, arg_str in zip(factories, arg_strings)] return rule(*args) def create_session(self, warn: bool = False, **kwargs: Any) -> Session: sess = Session(warn=warn) self(sess, **kwargs) return sess @property def last_entry(self) -> List[str]: return cast( List[str], json.loads((self._monadir / Mona.LAST_ENTRY).read_text()) ) @last_entry.setter def last_entry(self, entry: List[str]) -> None: (self._monadir / Mona.LAST_ENTRY).write_text(json.dumps(entry)) def call_last_entry(self) -> Task[object]: return self.call_entry(*self.last_entry) def __call__( self, sess: Session, ncores: int = None, write: str = 'eager', full_restore: bool = False, ) -> None: self._plugins = { 'parallel': Parallel(ncores), 'tmpdir': TmpdirManager(self._monadir / Mona.TMPDIR), 'files': FileManager(self._monadir / Mona.FILES), 'cache': Cache.from_path( self._monadir / Mona.CACHE, write=write, full_restore=full_restore ), } for plugin in self._plugins.values(): plugin(sess) def ensure_initialized(self) -> None: if self._monadir.is_dir(): log.info(f'Already initialized in {self._monadir}.') return log.info(f'Initializing an empty repository in {self._monadir}.') self._monadir.mkdir() try: cache_home = Path(self._config['cache']) except KeyError: for dirname in [Mona.TMPDIR, Mona.FILES]: (self._monadir / dirname).mkdir() else: ts = get_timestamp() cachedir = cache_home / f'{Path.cwd().name}_{ts}' cachedir.mkdir() for dirname in [Mona.TMPDIR, Mona.FILES]: (cachedir / dirname).mkdir() (self._monadir / dirname).symlink_to(cachedir / dirname) @contextmanager def update_config(self) -> Iterator[MutableMapping[str, Any]]: if self._configfile.exists(): with self._configfile.open() as f: config = toml.load(f) else: config = {} yield config self._config.update(config) if config: with self._configfile.open('w') as f: toml.dump(config, f) def parse_remotes(self, remote_str: str) -> Iterable[Remote]: if remote_str == 'all': remotes = [r for r in self._config['remotes'].values()] else: remotes = [self._config['remotes'][name] for name in remote_str.split(',')] for remote in remotes: yield Remote(remote['host'], remote['path']) def add_source(self, path: Pathable) -> Callable[[_R], _R]: """Create a rule decorator to add a source to the task arguments. The source is passed as :class:`File`. The file argument is appended to the directly passed arguments. """ def decorator(rule: _R) -> _R: rule.add_extra_arg(lambda: HashedFile(file_from_path(path))) return rule return decorator PK!mo"" mona/cli.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import subprocess import sys import tempfile from pathlib import Path from typing import Dict, List, Optional, Sequence, Tuple, cast import click from .app import Mona from .dirtask import DirtaskInput, checkout_files from .files import File from .futures import STATE_COLORS from .table import Table, lenstr from .tasks import Task from .utils import groupby, import_fullname, match_glob __version__ = '0.1.0' __all__ = () log = logging.getLogger(__name__) @click.group() @click.option('--app', 'appname', envvar='MONA_APP', required=True) @click.option('--debug', is_flag=True, envvar='MONA_DEBUG') @click.pass_context def cli(ctx: click.Context, appname: str, debug: int) -> None: package = Path(appname.split(':')[0].split('.')[0]) if package.is_dir() or package.with_suffix('.py').is_file(): sys.path.insert(0, '') ctx.obj = import_fullname(appname) assert isinstance(ctx.obj, Mona) if debug: log_format = '[{asctime}.{msecs:03.0f}] {levelname}:{name}: {message}' log_level = logging.DEBUG else: log_format = '{message}' log_level = logging.INFO logging.basicConfig(style='{', format=log_format, datefmt='%H:%M:%S') logging.getLogger('mona').setLevel(log_level) @cli.command() @click.pass_obj def init(app: Mona) -> None: """Initialize a Git repository.""" app.ensure_initialized() class TaskFilter: def __init__(self, patterns: List[str] = None, no_path: bool = False) -> None: self._patterns = patterns or [] self._no_path = no_path def __call__(self, task: Task[object]) -> bool: if self._no_path and task.label.startswith('/'): return False if self._patterns and not any( match_glob(task.label, patt) for patt in self._patterns ): return False return True class ExceptionBuffer: def __init__(self, maxerror: int = None) -> None: self._maxerror = maxerror self._n_errors = 0 def __call__(self, task: Task[object], exc: Exception) -> bool: if self._maxerror is None: return False if self._n_errors == self._maxerror: log.warn('Maximum number of errors reached') self._n_errors += 1 if self._n_errors <= self._maxerror: return True return False @cli.command() @click.option('-p', '--pattern', multiple=True, help='Tasks to be executed') @click.option('-P', '--path', is_flag=True, help='Execute path-like tasks') @click.option('-j', '--cores', type=int, help='Number of cores') @click.option('-l', '--limit', type=int, help='Limit number of tasks to N') @click.option('--maxerror', type=int, help='Number of errors in row to quit') @click.argument('entry') @click.argument('args', nargs=-1) @click.pass_obj def run( app: Mona, pattern: List[str], cores: int, path: bool, limit: Optional[int], maxerror: int, entry: str, args: List[str], ) -> None: """Run a given rule.""" app.last_entry = entry_args = [entry, *args] task_filter = TaskFilter(pattern, no_path=not path) exception_buffer = ExceptionBuffer(maxerror) with app.create_session(ncores=cores) as sess: result = sess.eval( app.call_entry(*entry_args), exception_handler=exception_buffer, task_filter=task_filter, limit=limit, ) if app.get_entry(entry).stdout: log.info(f'Printing result to standard output.') print(result) @cli.command() @click.option('-p', '--pattern', multiple=True, help='Patterns to be reported') @click.pass_obj def status(app: Mona, pattern: List[str]) -> None: """Print status of tasks.""" ncols = len(STATE_COLORS) + 1 table = Table(align=['<', *(ncols * ['>'])], sep=[' ', *((ncols - 1) * ['/'])]) table.add_row('pattern', *(s.name.lower() for s in STATE_COLORS), 'all') with app.create_session(warn=False, write='never', full_restore=True) as sess: app.call_last_entry() task_groups: Dict[str, List[Task[object]]] = {} all_tasks = list(sess.all_tasks()) for patt in pattern or ['**']: matched_any = False for task in all_tasks: matched = match_glob(task.label, patt) if matched: task_groups.setdefault(matched, []).append(task) matched_any = True if not matched_any: task_groups[patt] = [] for label, tasks in task_groups.items(): grouped = { state: group for state, group in groupby(tasks, key=lambda t: t.state).items() } counts: List[Tuple[int, Optional[str]]] = [ (len(grouped.get(state, [])), color) for state, color in STATE_COLORS.items() ] counts.append((len(tasks), None)) col_counts = [ lenstr(click.style(str(count), fg=color), len(str(count))) for count, color in counts ] table.add_row(label, *col_counts) click.echo(str(table)) @cli.command() @click.argument('file', type=Path, required=False) @click.pass_obj def graph(app: Mona, file: Optional[Path]) -> None: """Create or open a pdf with the task graph.""" with app.create_session(warn=False, write='never', full_restore=True) as sess: app.call_last_entry() dot = sess.dot_graph() fmt = file.suffix[1:] if file else 'pdf' tgt = dot.render(tempfile.mkstemp()[1], cleanup=True, format=fmt, view=not file) if file: Path(tgt).rename(file) @cli.command() @click.option('-p', '--pattern', multiple=True, help='Tasks to be checked out') @click.option('--done', is_flag=True, help='Check out only finished tasks') @click.option('-c', '--copy', is_flag=True, help='Copy instead of symlinking') @click.pass_obj def checkout(app: Mona, pattern: List[str], done: bool, copy: bool) -> None: """Checkout path-labeled tasks into a directory tree.""" n_tasks = 0 with app.create_session(warn=False, write='never', full_restore=True) as sess: app.call_last_entry() for task in sess.all_tasks(): if task.label[0] != '/': continue if pattern and not any(match_glob(task.label, patt) for patt in pattern): continue if done and not task.done(): continue exe: Optional[File] paths: Sequence[DirtaskInput] if task.rule == 'dir_task': exe = cast(File, task.args[0].value) paths = cast(List[DirtaskInput], task.args[1].value) if task.done(): paths.extend(cast(Dict[str, File], task.result()).values()) elif task.rule == 'file_collection': exe = None paths = cast(List[File], task.args[0].value) root = Path(task.label[1:]) root.mkdir(parents=True, exist_ok=True) checkout_files(root, exe, paths, mutable=copy) n_tasks += 1 log.info(f'Checked out {n_tasks} tasks.') @cli.group() def remote() -> None: """Manage remote repositories.""" pass @remote.command('add') @click.argument('name') @click.argument('url') @click.pass_obj def remote_add(app: Mona, url: str, name: str) -> None: """Add a remote.""" host, path = url.split(':') name = name or host with app.update_config() as config: config.setdefault('remotes', {})[name] = {'host': host, 'path': path} @cli.command() @click.option('--delete', is_flag=True, help='Delete files when syncing') @click.option('--dry', is_flag=True, help='Do a dry run') @click.argument('remotes') @click.pass_obj def update(app: Mona, remotes: str, delete: bool, dry: bool) -> None: """Update remotes.""" for remote in app.parse_remotes(remotes): remote.update(delete=delete, dry=dry) @cli.command() @click.argument('shellcmd') def cmd(shellcmd: str) -> None: """Execute a shell command.""" subprocess.run(shellcmd, shell=True, check=True) @cli.command() @click.argument('remotes') @click.pass_obj def go(app: Mona, remotes: str) -> None: """SSH into the remote repository.""" for remote in app.parse_remotes(remotes): remote.go() @cli.command('r', context_settings={'ignore_unknown_options': True}) @click.argument('remotes') @click.argument('args', nargs=-1) @click.pass_obj def remote_mona_cmd(app: Mona, remotes: str, args: List[str]) -> None: """Execute a Mona command on a remote.""" for remote in app.parse_remotes(remotes): if args[0] in {'init', 'run', 'dispatch'}: remote.update() remote.command(args) PK!% mona/dag.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio from enum import Enum from typing import ( Any, AsyncGenerator, Awaitable, Callable, Container, Deque, Dict, Iterable, Iterator, MutableSequence, NamedTuple, Optional, Set, Tuple, TypeVar, Union, cast, ) __all__ = ['traverse_async', 'traverse', 'traverse_id'] _T = TypeVar('_T') NodeScheduler = Callable[[_T, Callable[[_T], None]], None] NodeResult = Tuple[_T, Optional[Exception], Iterable[_T]] NodeExecuted = Callable[[NodeResult[_T]], None] NodeExecutor = Callable[[_T, NodeExecuted[_T]], Awaitable[None]] Priority = Tuple['Action', 'Action', 'Action'] def extend_from( src: Iterable[_T], seq: MutableSequence[_T], *, filter: Container[_T] ) -> None: seq.extend(x for x in src if x not in filter) class Action(Enum): RESULTS = 0 EXECUTE = 1 TRAVERSE = 2 def __repr__(self) -> str: return self.name class Step(NamedTuple): action: Action node: Optional[Any] # should be _T progress: Dict[str, int] class NodeException(NamedTuple): node: Any # should be _T exc: Exception default_priority = cast(Priority, tuple(Action)) # only limited override for use in traverse_async() class SetDeque(Deque[_T]): def __init__(self, *args: Any) -> None: super().__init__(*args) self._set: Set[_T] = set() def append(self, x: _T) -> None: if x not in self._set: self._set.add(x) super().append(x) def extend(self, xs: Iterable[_T]) -> None: for x in xs: self.append(x) def pop(self) -> _T: # type: ignore x = super().pop() self._set.remove(x) return x def popleft(self) -> _T: x = super().popleft() self._set.remove(x) return x async def traverse_async( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]], schedule: NodeScheduler[_T], execute: NodeExecutor[_T], depth: bool = False, priority: Priority = default_priority, ) -> AsyncGenerator[Union[Step, NodeException], bool]: """Asynchronous generator that traverses a self-extending DAG. Yields potentials steps, and receives whether those should be taken. :param start: Starting nodes :param edges_from: Returns nodes with incoming edge from the given node :param schedule: Schedule the given node for execution :param execute: Execute the given node and return new generated nodes with incoming edge from it (run only on scheduled nodes) :param depth: Traverse depth-first if true, breadth-first otherwise :param priority: Priorize steps in order """ visited: Set[_T] = set() to_visit, to_execute = SetDeque[_T](), Deque[_T]() done: 'asyncio.Queue[NodeResult[_T]]' = asyncio.Queue() executing, executed = 0, 0 actionable: Dict[Action, Callable[[], bool]] = { Action.RESULTS: lambda: not done.empty(), Action.EXECUTE: lambda: bool(to_execute), Action.TRAVERSE: lambda: bool(to_visit), } to_visit.extend(start) while True: for action in priority: if actionable[action](): break else: if executing == 0: break action = Action.RESULTS progress = { 'executing': executing - done.qsize(), 'to_execute': len(to_execute), 'to_visit': len(to_visit), 'with_result': done.qsize(), 'done': executed, 'visited': len(visited), } if action is Action.TRAVERSE: node = to_visit.pop() if depth else to_visit.popleft() visited.add(node) if not (yield Step(action, node, progress)): continue schedule(node, to_execute.append) extend_from(edges_from(node), to_visit, filter=visited) elif action is Action.RESULTS: yield Step(action, None, progress) node, exc, nodes = await done.get() if exc: yield NodeException(node, exc) extend_from(nodes, to_visit, filter=visited) executing -= 1 executed += 1 else: assert action is Action.EXECUTE node = to_execute.popleft() if not (yield Step(action, node, progress)): continue executing += 1 try: await execute(node, done.put_nowait) except Exception as exc: yield NodeException(node, exc) def traverse( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]], sentinel: Callable[[_T], bool] = None, depth: bool = False, ) -> Iterator[_T]: """Traverse a DAG, yield visited notes.""" visited: Set[_T] = set() queue = Deque[_T]() queue.extend(start) while queue: n = queue.pop() if depth else queue.popleft() visited.add(n) yield n if sentinel and sentinel(n): continue queue.extend(m for m in edges_from(n) if m not in visited) def traverse_id( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]] ) -> Iterable[_T]: """Traverse a DAG, yield visited notes. Nodes are stored by their ids, not hashes. """ table: Dict[int, _T] = {} def ids_from(ns: Iterable[_T]) -> Iterable[int]: update = {id(n): n for n in ns} table.update(update) return update.keys() for n in traverse(ids_from(start), lambda n: ids_from(edges_from(table[n]))): yield table[n] PK!:Rmona/dirtask.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import subprocess from pathlib import Path from tempfile import TemporaryDirectory from typing import ( Any, Callable, ContextManager, Dict, List, Optional, Sequence, Tuple, Union, cast, ) from typing_extensions import Protocol, runtime from .errors import InvalidInput from .files import File from .rules import Rule from .runners import run_process from .sessions import Session from .utils import Pathable, make_executable __version__ = '0.2.0' __all__ = ['dir_task', 'DirtaskTmpdir'] log = logging.getLogger(__name__) DirtaskInput = Union[File, Tuple[Path, str]] @runtime class TmpdirManager(Protocol): def tempdir(self) -> ContextManager[Pathable]: ... class DirTaskProcessError(subprocess.CalledProcessError): def __init__(self, stdout: bytes, stderr: bytes, *args: Any) -> None: super().__init__(*args) self.stdout = stdout self.stderr = stderr def __str__(self) -> str: return '\n'.join( [ 'STDOUT:', self.stdout.decode(), '', 'STDERR:', self.stderr.decode(), '', super().__str__(), ] ) class DirtaskTmpdir: """Context manager of a temporary directory that collects created files. :param output_filter: true for files to be collected """ def __init__(self, output_filter: Callable[[str], bool] = None) -> None: sess = Session.active() dirmngr = cast( Optional[TmpdirManager], sess.storage.get('dir_task:tmpdir_manager') ) assert not dirmngr or isinstance(dirmngr, TmpdirManager) self._dirmngr = dirmngr self._output_filter = output_filter def has_tmpdir_manager(self) -> bool: """Is the temporary directory managed.""" return self._dirmngr is not None def __enter__(self) -> Path: self._ctx = ( TemporaryDirectory() if not self._dirmngr else self._dirmngr.tempdir() ) self._tmpdir = Path(self._ctx.__enter__()) return self._tmpdir def __exit__(self, exc_type: Any, *args: Any) -> None: try: if not exc_type: self._outputs: Dict[str, File] = {} for path in self._tmpdir.glob('**/*'): if not path.is_file(): continue relpath = str(path.relative_to(self._tmpdir)) if self._output_filter and not self._output_filter(relpath): continue file = File.from_path(path, self._tmpdir, keep=False) self._outputs[relpath] = file finally: self._ctx.__exit__(exc_type, *args) def result(self) -> Dict[str, File]: """Return a collection of files created in the temporary directory. This is available only after leaving the context. """ return self._outputs def checkout_files( root: Path, exe: Optional[File], files: Sequence[DirtaskInput], mutable: bool = False, ) -> None: assert root.exists() if exe: files = [exe, *files] for file in files: if isinstance(file, File): path = file.path else: path, target = file (root / path.parent).mkdir(parents=True, exist_ok=True) if isinstance(file, File): file.target_in(root, mutable=mutable) else: (root / path).symlink_to(target) if exe: make_executable(root / exe.path) @Rule async def dir_task(exe: File, inputs: List[DirtaskInput]) -> Dict[str, File]: """Create a rule with an executable and a files as inputs. The result of the task is a dictionary of all new files created by running the executable. """ for file in [exe, *inputs]: if not ( isinstance(file, File) or isinstance(file, list) and len(file) == 2 and isinstance(file[0], Path) and isinstance(file[1], str) ): raise InvalidInput(str(file)) input_names = { str(inp if isinstance(inp, File) else inp[0]) for inp in [exe, *inputs] } dirtask_tmpdir = DirtaskTmpdir(lambda p: p not in input_names) with dirtask_tmpdir as tmpdir: checkout_files(tmpdir, exe, inputs) out_path, err_path = tmpdir / 'STDOUT', tmpdir / 'STDERR' try: with out_path.open('w') as stdout, err_path.open('w') as stderr: await run_process( str(tmpdir / exe.path), stdout=stdout, stderr=stderr, cwd=tmpdir ) except subprocess.CalledProcessError as e: if dirtask_tmpdir.has_tmpdir_manager(): raise exc: Optional[subprocess.CalledProcessError] = e else: exc = None if exc: out = out_path.read_bytes() err = err_path.read_bytes() raise DirTaskProcessError(out, err, exc.returncode, exc.cmd) return dirtask_tmpdir.result() PK!yEmona/errors.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from typing import TYPE_CHECKING if TYPE_CHECKING: from .futures import Future from .tasks import Task from .sessions import Session __version__ = '0.1.0' __all__ = () class MonaError(Exception): pass class FutureError(MonaError): def __init__(self, msg: str, fut: 'Future') -> None: super().__init__(msg) self.future = fut class TaskError(MonaError): def __init__(self, msg: str, task: 'Task[object]') -> None: super().__init__(msg) self.task = task class CompositeError(MonaError): pass class SessionError(MonaError): def __init__(self, msg: str, sess: 'Session') -> None: super().__init__(msg) self.session = sess class InvalidInput(MonaError): pass class FilesError(MonaError): pass class HashingError(MonaError): pass PK!cu mona/files.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Iterable, List, Optional, Type, TypeVar, Union, cast from .hashing import Hash, Hashed, HashedBytes, HashedComposite, HashResolver from .rules import Rule from .sessions import Session from .utils import Pathable, make_nonwritable, shorten_text __version__ = '0.3.0' __all__ = ['file_from_path', 'file_collection', 'File'] _FM = TypeVar('_FM', bound='FileManager') @Rule async def file_collection(files: List['File']) -> None: """Create a void task whose purpose is to label a file collection. :param files: a list of :class:`File` """ pass class FileManager(ABC): @abstractmethod def store_path(self, path: Path, *, keep: bool) -> 'Hash': ... @abstractmethod def store_bytes(self, content: bytes) -> 'Hash': ... @abstractmethod def get_bytes(self, content_hash: Hash) -> bytes: ... @abstractmethod def target_in(self, path: Path, content_hash: Hash, *, mutable: bool) -> None: ... @classmethod def active(cls: Type[_FM]) -> Optional[_FM]: fmngr = cast(Optional[_FM], Session.active().storage.get('file_manager')) assert not fmngr or isinstance(fmngr, cls) return fmngr class File: """Represents a file located at an abstract relative path. Users should create instances by one of the classmethod constructors documented below rather than directly. """ def __init__(self, path: Path, content: Union[bytes, Hash]): assert not path.is_absolute() self._path = path self._content = content if not isinstance(content, bytes): fmngr = FileManager.active() assert fmngr self._fmngr = fmngr def __repr__(self) -> str: if isinstance(self._content, bytes): content = repr(shorten_text(self._content, 20)) else: content = self._content[:6] return f'' def __str__(self) -> str: return str(self._path) @property def path(self) -> Path: """Abstract path to the file.""" return self._path @property def stem(self) -> str: """Equivalent to :attr:`path.stem`.""" return self._path.stem @property def name(self) -> str: """Equivalent to :attr:`path.name`.""" return self._path.name @property def content(self) -> Union[bytes, Hash]: """Content as bytes or its hash.""" return self._content def read_bytes(self) -> bytes: """Return content of the file as bytes.""" if isinstance(self._content, bytes): return self._content return self._fmngr.get_bytes(self._content) def read_text(self) -> str: """Return content of the file as string.""" return self.read_bytes().decode() def target_in(self, path: Path, *, mutable: bool = False) -> None: """Create an actual file or a symlink at the given location. :param Path path: where the file should be created :param bool mutable: whether the created file will be mutable """ target = path / self._path if isinstance(self._content, bytes): target.write_bytes(self._content) if not mutable: make_nonwritable(target) else: self._fmngr.target_in(target, self._content, mutable=mutable) @classmethod def from_str(cls, path: Pathable, content: Union[str, bytes]) -> 'File': """Create a file from a string or bytes. :param path: the abstract path of the created file instance :param content: the content of the file """ path = Path(path) if isinstance(content, str): content = content.encode() fmngr = FileManager.active() if fmngr: return cls(path, fmngr.store_bytes(content)) return cls(path, content) @classmethod def from_path( cls, path: Pathable, root: Union[str, Path] = None, *, keep: bool = True ) -> 'File': """Create a file from a physical file. :param path: the path of the physical file. Also a basis for the abstract path of the file instance. :param root: If given, the abstract path will be created from the physical path taken relative to the root. If not given, the ``path`` argument must be relative. :param bool keep: whether the physical file should kept or destroyed """ path = Path(path) relpath = path.relative_to(root) if root else path fmngr = FileManager.active() if fmngr: return cls(relpath, fmngr.store_path(path, keep=keep)) file = cls(relpath, path.read_bytes()) if not keep: path.unlink() return file def file_from_path(*args: Any, **kwargs: Any) -> File: """Alias for :meth:`File.from_path`""" return File.from_path(*args, **kwargs) class HashedFile(Hashed[File]): def __init__(self, file: File): self._path = file.path if isinstance(file.content, bytes): self._content: Optional[HashedBytes] = HashedBytes(file.content) self._content_hash = self._content.hashid else: self._content = None self._content_hash = file.content Hashed.__init__(self) @property def spec(self) -> bytes: return json.dumps([str(self._path), self._content_hash]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedFile': path_str: str content_hash: Hash path_str, content_hash = json.loads(spec) path = Path(path_str) fmngr = FileManager.active() if fmngr: return cls(File(path, content_hash)) return cls(File(path, cast(HashedBytes, resolve(content_hash)).value)) @property def value(self) -> File: return File( self._path, self._content.value if self._content else self._content_hash ) @property def label(self) -> str: return f'./{self._path}' @property def components(self) -> Iterable['Hashed[object]']: if self._content: return (self._content,) return () HashedComposite.type_swaps[File] = HashedFile PK!cL L mona/futures.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging from enum import IntEnum from typing import Callable, Iterable, List, NoReturn, Set, TypeVar from typing_extensions import Final from .errors import MonaError __all__ = () log = logging.getLogger(__name__) _T = TypeVar('_T') _Fut = TypeVar('_Fut', bound='Future') Callback = Callable[[_T], None] class State(IntEnum): PENDING = 0 READY = 1 RUNNING = 2 ERROR = 3 HAS_RUN = 4 AWAITING = 5 DONE = 6 STATE_COLORS: Final = { State.PENDING: None, State.READY: 'magenta', State.RUNNING: 'yellow', State.ERROR: 'red', State.AWAITING: 'cyan', State.DONE: 'green', } class Future: def __init__(self: _Fut, parents: Iterable[_Fut]) -> None: self._parents = frozenset(parents) self._pending = {fut for fut in self._parents if not fut.done()} self._children: Set['Future'] = set() self._done_callbacks: List[Callback[_Fut]] = [] self._ready_callbacks: List[Callback[_Fut]] = [] self._registered = False self._state: State = State.PENDING if self._pending else State.READY def __getstate__(self) -> NoReturn: raise MonaError('Future objects cannot be pickled') @property def state(self) -> State: return self._state def done(self) -> bool: return self._state is State.DONE def add_child(self, fut: 'Future') -> None: assert not self.done() self._children.add(fut) def register(self: _Fut) -> None: if not self._registered: self._registered = True log.debug(f'registered: {self!r}') for fut in self._pending: fut.register() fut.add_child(self) def add_ready_callback(self: _Fut, callback: Callback[_Fut]) -> None: if self._state >= State.READY: callback(self) else: self._ready_callbacks.append(callback) def add_done_callback(self: _Fut, callback: Callback[_Fut]) -> None: assert not self.done() self._done_callbacks.append(callback) def parent_done(self: _Fut, fut: _Fut) -> None: assert self._state is State.PENDING self._pending.remove(fut) if not self._pending: self._state = State.READY log.debug(f'{self}: ready') for callback in self._ready_callbacks: callback(self) def set_done(self: _Fut) -> None: assert State.READY <= self._state < State.DONE self._state = State.DONE log.debug(f'{self}: done') for fut in self._children: fut.parent_done(self) for callback in self._done_callbacks: callback(self) PK!Jffmona/hashing.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import hashlib import json from abc import ABC, abstractmethod from typing import ( Callable, Dict, Generic, Iterable, NewType, Optional, Set, Tuple, TypeVar, Union, cast, ) from .json import ClassJSONDecoder, ClassJSONEncoder, JSONValue, validate_json from .utils import Literal, TypeSwaps, shorten_text, swap_type __version__ = '0.1.0' __all__ = () _T_co = TypeVar('_T_co', covariant=True) Hash = NewType('Hash', str) # symbolic type for a JSON-like container including custom classes Composite = NewType('Composite', object) HashResolver = Callable[[Hash], 'Hashed[object]'] def hash_text(text: Union[str, bytes]) -> Hash: if isinstance(text, str): text = text.encode() return Hash(hashlib.sha1(text).hexdigest()) class Hashed(ABC, Generic[_T_co]): def __init__(self) -> None: assert not hasattr(self, '_hashid') self._hashid = hash_text(self.spec) @property @abstractmethod def spec(self) -> bytes: ... @classmethod @abstractmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'Hashed[_T_co]': ... @property @abstractmethod def label(self) -> str: ... @property @abstractmethod def value(self) -> _T_co: ... @property def components(self) -> Iterable['Hashed[object]']: """:class:`Hashed` instances required by the constructor. To be implemented by subclasses. """ return () def metadata(self) -> Optional[bytes]: return None def set_metadata(self, metadata: bytes) -> None: raise NotImplementedError def __str__(self) -> str: return f'{self.tag}: {self.label}' def __repr__(self) -> str: return f'<{self.__class__.__name__} {self}>' @property def hashid(self) -> Hash: return self._hashid @property def tag(self) -> str: return self.hashid[:6] class HashedBytes(Hashed[bytes]): def __init__(self, content: bytes) -> None: self._content = content Hashed.__init__(self) self._label = repr(shorten_text(content, 20)) @property def spec(self) -> bytes: return self.value @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedBytes': return cls(spec) @property def label(self) -> str: return self._label @property def value(self) -> bytes: return self._content class HashedCompositeLike(Hashed[Composite]): type_swaps: TypeSwaps = {bytes: HashedBytes} def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: self._jsonstr = jsonstr self._components = {comp.hashid: comp for comp in components} Hashed.__init__(self) self._label = repr(self.resolve(lambda hashed: Literal(hashed.label))) @property @abstractmethod def value(self) -> Composite: ... @property def spec(self) -> bytes: return json.dumps([self._jsonstr, *sorted(self._components)]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedCompositeLike': jsonstr: str hashids: Tuple[Hash, ...] jsonstr, *hashids = json.loads(spec) return cls(jsonstr, (resolve(h) for h in hashids)) @property def label(self) -> str: return self._label @property def components(self) -> Iterable[Hashed[object]]: return self._components.values() def resolve( self, handler: Callable[['Hashed[object]'], object] = lambda x: x ) -> Composite: def hook(type_tag: str, dct: Dict[str, JSONValue]) -> object: if type_tag == 'Hashed': return handler(self._components[cast(Hash, dct['hashid'])]) return dct return cast( Composite, json.loads(self._jsonstr, hook=hook, cls=ClassJSONDecoder) ) @classmethod def _default(cls, o: object) -> Optional[Tuple[object, str, Dict[str, JSONValue]]]: o = swap_type(o, cls.type_swaps) if isinstance(o, Hashed): return (o, 'Hashed', {'hashid': o.hashid}) return None @classmethod def parse_object(cls, obj: object) -> Tuple[str, Set[Hashed[object]]]: classes = tuple(cls.type_swaps) + (Hashed,) validate_json(obj, lambda x: isinstance(x, classes)) components: Set[Hashed[object]] = set() jsonstr = json.dumps( obj, sort_keys=True, tape=components, default=cls._default, cls=ClassJSONEncoder, ) return jsonstr, components class HashedComposite(HashedCompositeLike): def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: HashedCompositeLike.__init__(self, jsonstr, components) self._value = self.resolve(lambda comp: comp.value) @property def value(self) -> Composite: return self._value PK! None: classes = tuple(registered_classes) def parents(o: object) -> Iterable[object]: if o is None or isinstance(o, (str, int, float, bool)): return () if isinstance(o, classes): return () if hook and hook(o): return () elif isinstance(o, list): return o elif isinstance(o, dict): for key in o: if not isinstance(key, str): raise CompositeError('Dict keys must be strings') return o.values() else: raise CompositeError(f'Unknown object: {o!r}') for _ in traverse_id([obj], parents): pass class ClassJSONEncoder(json.JSONEncoder): def __init__( self, *args: Any, tape: Set[object], default: JSONDefault, **kwargs: Any ) -> None: super().__init__(*args, **kwargs) self._default = default self._tape = tape self._classes = tuple(registered_classes) self._default_encs = { cls: enc for cls, (enc, dec) in registered_classes.items() } def default(self, o: object) -> JSONValue: type_tag: Optional[str] = None if isinstance(o, self._classes): dct = self._default_encs[o.__class__](o) type_tag = o.__class__.__name__ else: encoded = self._default(o) if encoded is not None: o, type_tag, dct = encoded self._tape.add(o) if type_tag is not None: return cast(JSONContainer, {'_type': type_tag, **dct}) return cast(JSONValue, super().default(o)) class ClassJSONDecoder(json.JSONDecoder): def __init__(self, *args: Any, hook: JSONHook[object], **kwargs: Any) -> None: assert 'object_hook' not in kwargs kwargs['object_hook'] = self._my_object_hook super().__init__(*args, **kwargs) self._hook = hook self._default_decs = { cls.__name__: dec for cls, (enc, dec) in registered_classes.items() } def _my_object_hook(self, dct: Dict[str, JSONValue]) -> object: try: type_tag = dct.pop('_type') assert isinstance(type_tag, str) except KeyError: return dct if type_tag in self._default_decs: return self._default_decs[type_tag](dct) return self._hook(type_tag, dct) PK!&=Պ0 0 mona/pluggable.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging from typing import Any, Dict, Generator, Generic, List, Optional, TypeVar __all__ = () log = logging.getLogger(__name__) _T = TypeVar('_T') _P = TypeVar('_P', bound='Pluggable') class Plugin(Generic[_P]): name: Optional[str] = None def __call__(self, pluggable: _P) -> None: pluggable.register_plugin(self._name, self) @property def _name(self) -> str: return self.name or self.__class__.__name__ class Pluggable: def __init__(self: _P) -> None: self._plugins: Dict[str, Plugin[_P]] = {} def register_plugin(self: _P, name: str, plugin: Plugin[_P]) -> None: self._plugins[name] = plugin def _get_plugins(self: _P, reverse: bool = False) -> List[Plugin[_P]]: plugins = list(self._plugins.values()) if reverse: plugins.reverse() return plugins def _run_plugins( self, func: str, args: List[Any], wrap_first: bool, reverse: bool ) -> Generator[Any, Any, None]: for plugin in self._get_plugins(reverse): try: result = yield getattr(plugin, func)(*args) except Exception: log.error(f'Error in plugin {plugin._name!r}') raise if wrap_first: args[0] = result async def run_plugins_async( self, func: str, *args: Any, wrap_first: bool = False, reverse: bool = False ) -> Any: arg_list = list(args) gen = self._run_plugins(func, arg_list, wrap_first, reverse) result: Any = None try: while True: result = await gen.send(result) except StopIteration: if wrap_first: return arg_list[0] def run_plugins( self, func: str, *args: Any, wrap_first: bool = False, reverse: bool = False ) -> Any: arg_list = list(args) gen = self._run_plugins(func, arg_list, wrap_first, reverse) result: Any = None try: while True: result = gen.send(result) except StopIteration: if wrap_first: return arg_list[0] PK!v%Ymona/plugins/__init__.pyfrom .parallel import Parallel from .cache import Cache from .files import FileManager from .tmpdir import TmpdirManager __all__ = ['Parallel', 'Cache', 'FileManager', 'TmpdirManager'] PK!B2++mona/plugins/cache.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import pickle import sqlite3 from enum import Enum from weakref import WeakValueDictionary from typing import ( Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Type, TypeVar, Union, cast, ) from ..futures import Future, State from ..hashing import Hash, Hashed from ..sessions import Session, SessionPlugin from ..tasks import Task from ..utils import Pathable, get_fullname, get_timestamp, import_fullname __all__ = ['Cache'] log = logging.getLogger(__name__) _T_co = TypeVar('_T_co', covariant=True) WeakDict = WeakValueDictionary class TaskRow(NamedTuple): hashid: Hash state: str side_effects: Optional[str] = None result_type: Optional[str] = None result: Union[Hash, bytes, None] = None class ObjectRow(NamedTuple): hashid: Hash typetag: str spec: bytes class ResultType(Enum): HASHED = 0 PICKLED = 1 class SessionRow(NamedTuple): sessionid: int created: str class TargetRow(NamedTuple): objectid: Hash sessionid: int label: Optional[str] metadata: Optional[bytes] class CachedTask(Task[_T_co]): def __init__(self, hashid: Hash) -> None: self._hashid = hashid self._args = () Future.__init__(self, []) # type: ignore class WriteAccess(Enum): EAGER = 0 ON_EXIT = 1 NEVER = 2 class Cache(SessionPlugin): """Plugin that caches tasks and objects in a session to an SQLite database.""" name = 'db_cache' def __init__( self, db: sqlite3.Connection, write: str = 'eager', full_restore: bool = False ) -> None: self._db = db self._objects: Dict[Hash, Hashed[object]] = {} self._object_cache: WeakDict[Hash, Hashed[object]] = WeakDict() self._write = WriteAccess[write.upper()] self._full_restore = full_restore def __repr__(self) -> str: return f'' @property def db(self) -> sqlite3.Connection: """Database connection.""" return self._db def _store_objects(self, objs: Sequence[Hashed[object]]) -> None: obj_rows = [ ObjectRow(obj.hashid, get_fullname(obj.__class__), obj.spec) for obj in objs ] self._db.executemany('INSERT OR IGNORE INTO objects VALUES (?,?,?)', obj_rows) def _store_targets(self, objs: Sequence[Hashed[object]]) -> None: sessionid = cast(int, Session.active().storage['cache:sessionid']) target_rows = [ TargetRow( obj.hashid, sessionid, obj.label if isinstance(obj, Task) else None, obj.metadata(), ) for obj in objs ] self._db.executemany( 'INSERT OR IGNORE INTO targets VALUES (?,?,?,?)', target_rows ) def _update_state(self, task: Task[object]) -> None: self._db.execute( 'UPDATE tasks SET state = ? WHERE hashid = ?', (task.state.name, task.hashid), ) def _store_result(self, task: Task[object]) -> None: result: Union[Hash, bytes] hashed: Hashed[object] if task.state is State.AWAITING: hashed = task.future_result() result_type = ResultType.HASHED else: assert task.state is State.DONE hashed_or_obj = task.resolve() if isinstance(hashed_or_obj, Hashed): result_type = ResultType.HASHED hashed = hashed_or_obj else: result_type = ResultType.PICKLED result = pickle.dumps(hashed_or_obj) if result_type is ResultType.HASHED: result = hashed.hashid side_effects = ','.join( t.hashid for t in Session.active().get_side_effects(task) ) self._db.execute( 'REPLACE INTO tasks VALUES (?,?,?,?,?)', TaskRow( task.hashid, task.state.name, side_effects, result_type.name, result ), ) def _get_task_row(self, hashid: Hash) -> Optional[TaskRow]: raw_row = self._db.execute( 'SELECT * FROM tasks WHERE hashid = ?', (hashid,) ).fetchone() if not raw_row: return None return TaskRow(*raw_row) def _get_target_row(self, hashid: Hash) -> TargetRow: raw_row = self._db.execute( 'SELECT * FROM targets WHERE objectid = ? ORDER BY sessionid DESC LIMIT 1', (hashid,), ).fetchone() assert raw_row return TargetRow(*raw_row) def _get_object_factory(self, hashid: Hash) -> Tuple[bytes, Type[Hashed[object]]]: raw_row = self._db.execute( 'SELECT * FROM objects WHERE hashid = ?', (hashid,) ).fetchone() assert raw_row row = ObjectRow(*raw_row) factory = cast(Type[object], import_fullname(row.typetag)) assert issubclass(factory, Hashed) return row.spec, factory def _get_object(self, hashid: Hash) -> Hashed[object]: obj: Optional[Hashed[object]] = self._object_cache.get(hashid) if obj: return obj spec, factory = self._get_object_factory(hashid) if factory is Task and not self._full_restore: task_row = self._get_task_row(hashid) assert task_row if State[task_row.state] > State.HAS_RUN: obj = CachedTask(hashid) if not obj: obj = factory.from_spec(spec, self._get_object) assert hashid == obj.hashid metadata = self._get_target_row(hashid).metadata if metadata is not None: obj.set_metadata(metadata) if isinstance(obj, Task): obj, registered = Session.active().register_task(obj) if registered: if not self._full_restore: self._to_restore.append(obj) self._object_cache[hashid] = obj return obj def _get_result(self, row: TaskRow) -> object: if State[row.state] < State.HAS_RUN: return None assert row.result_type result_type = ResultType[row.result_type] if result_type is ResultType.PICKLED: assert isinstance(row.result, bytes) result = cast(object, pickle.loads(row.result)) else: assert result_type is ResultType.HASHED assert isinstance(row.result, str) result = self._get_object(row.result) return result def _restore_task(self, task: Task[object]) -> None: if getattr(task, '_restored', None): # TODO clean up return row = self._get_task_row(task.hashid) assert row if State[row.state] < State.HAS_RUN: return log.debug(f'Restoring from cache: {task}') assert State[row.state] > State.HAS_RUN task.set_running() sess = Session.active() if self._full_restore and row.side_effects: side_effects: List[Task[object]] = [] for hashid in row.side_effects.split(','): child_task = self._get_object(cast(Hash, hashid)) assert isinstance(child_task, Task) sess.add_side_effect_of(task, child_task) side_effects.append(child_task) self._to_restore.extend(reversed(side_effects)) task.set_has_run() sess.set_result(task, self._get_result(row)) task._restored = True # type: ignore def save_hashed(self, objs: Sequence[Hashed[object]]) -> None: # noqa: D102 if self._write is WriteAccess.EAGER: self._store_objects(objs) self._store_targets(objs) self._db.commit() else: self._objects.update({o.hashid: o for o in objs}) def _store_session(self, sess: Session) -> None: cur = self._db.execute( 'INSERT INTO sessions VALUES (?,?)', (None, get_timestamp()) ) sess.storage['cache:sessionid'] = cur.lastrowid def post_enter(self, sess: Session) -> None: # noqa: D102 if self._write is WriteAccess.EAGER: self._store_session(sess) def post_create(self, task: Task[object]) -> None: # noqa: D102 row = self._get_task_row(task.hashid) if row: self._to_restore = [task] tasks: List[Task[object]] = [] while self._to_restore: t = self._to_restore.pop() self._restore_task(t) tasks.append(t) delattr(self, '_to_restore') elif self._write is WriteAccess.EAGER: tasks = [task] self._db.execute( 'INSERT INTO tasks VALUES (?,?,?,?,?)', TaskRow(task.hashid, task.state.name), ) self._store_objects(tasks) if self._write is WriteAccess.EAGER: self._store_targets(tasks) self._db.commit() def post_task_run(self, task: Task[object]) -> None: # noqa: D102 if self._write is not WriteAccess.EAGER: return self._store_result(task) if task.state < State.DONE: task.add_done_callback(lambda task: self._update_state(task)) self._db.commit() def pre_exit(self, sess: Session) -> None: # noqa: D102 if self._write is not WriteAccess.ON_EXIT: return self._store_session(sess) for task in sess.all_tasks(): if task.state > State.HAS_RUN: self._store_result(task) else: self._update_state(task) objects = [*self._objects.values(), *sess.all_tasks()] self._store_objects(objects) self._store_targets(objects) self._objects.clear() self._db.commit() @classmethod def from_path(cls, path: Pathable, **kwargs: Any) -> 'Cache': """Create a cache with a database at the given path.""" db = sqlite3.connect(path) db.execute( """\ CREATE TABLE IF NOT EXISTS objects ( hashid TEXT PRIMARY KEY, typetag TEXT, spec BLOB ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS tasks ( hashid TEXT PRIMARY KEY, state TEXT, side_effects TEXT, result_type TEXT, result BLOB, FOREIGN KEY (hashid) REFERENCES objects(hashid) ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS sessions ( sessionid INTEGER PRIMARY KEY, created TEXT ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS targets ( objectid TEXT, sessionid INTEGER, label TEXT, metadata BLOB, PRIMARY KEY (objectid, sessionid), FOREIGN KEY (objectid) REFERENCES objects(hashid), FOREIGN KEY (sessionid) REFERENCES sessions(sessionid) ) """ ) return Cache(db, **kwargs) PK![F_mona/plugins/files.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import hashlib import shutil from pathlib import Path from typing import Dict, Union from ..errors import FilesError from ..files import FileManager as _FileManager from ..hashing import Hash from ..sessions import Session, SessionPlugin from ..utils import Pathable, make_nonwritable, make_writable __version__ = '0.2.0' class FileManager(_FileManager, SessionPlugin): """Plugin that manages storage of abstract task files in a file system.""" name = 'file_manager' def __init__(self, root: Union[str, Pathable], eager: bool = True) -> None: self._root = Path(root).resolve() self._cache: Dict[Hash, bytes] = {} self._path_cache: Dict[Path, Hash] = {} self._eager = eager def __repr__(self) -> str: return f'' def _path(self, hashid: Hash, must_exist: bool = False) -> Path: path = self._root / hashid[:2] / hashid[2:] if must_exist and not path.exists(): raise FilesError(f'Missing in manager: {hashid}') return path def _path_primed(self, hashid: Hash) -> Path: path = self._path(hashid) path.parent.mkdir(exist_ok=True) return path def __contains__(self, hashid: Hash) -> bool: return hashid in self._cache or self._path(hashid).is_file() def post_enter(self, sess: Session) -> None: # noqa: D102 sess.storage['file_manager'] = self def _store_bytes(self, hashid: Hash, content: bytes) -> None: stored_path = self._path_primed(hashid) stored_path.write_bytes(content) make_nonwritable(stored_path) def store_bytes(self, content: bytes) -> 'Hash': # noqa: D102 hashid = Hash(hashlib.sha1(content).hexdigest()) if hashid not in self: self._cache[hashid] = content if self._eager: self._store_bytes(hashid, content) return hashid def _store_path(self, hashid: Hash, path: Path, keep: bool) -> None: stored_path = self._path_primed(hashid) if keep: shutil.copy(path, stored_path) else: path.rename(stored_path) make_nonwritable(stored_path) def store_path(self, path: Path, *, keep: bool) -> 'Hash': # noqa: D102 hashid = self._path_cache.get(path) if hashid: return hashid sha1 = hashlib.sha1() with path.open('rb') as f: while True: data = f.read(2 ** 20) if not data: break sha1.update(data) hashid = Hash(sha1.hexdigest()) if hashid not in self: # TODO this is not good with large files self._cache[hashid] = path.read_bytes() if self._eager: self._store_path(hashid, path, keep) return self._path_cache.setdefault(path, hashid) def get_bytes(self, hashid: Hash) -> bytes: # noqa: D102 try: return self._cache[hashid] except KeyError: pass path = self._path(hashid, must_exist=True) return self._cache.setdefault(hashid, path.read_bytes()) def target_in( self, path: Path, hashid: Hash, *, mutable: bool ) -> None: # noqa: D102 content = self._cache.get(hashid) if content: path.write_bytes(content) if not mutable: make_nonwritable(path) return stored_path = self._path(hashid, must_exist=True) if mutable: shutil.copy(stored_path, path) make_writable(path) else: if path.exists(): path.unlink() path.symlink_to(stored_path) def store_cache(self) -> None: # noqa: D102 for hashid, content in self._cache.items(): self._store_bytes(hashid, content) PK!SRRmona/plugins/parallel.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import logging import os from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Optional, Set, TypeVar, cast from ..dag import NodeExecuted from ..sessions import Session, SessionPlugin, TaskExecute from ..tasks import Corofunc, Task log = logging.getLogger(__name__) _T = TypeVar('_T') class Parallel(SessionPlugin): """Plugin that enables running tasks in parallel.""" name = 'parallel' def __init__(self, ncores: int = None) -> None: self._ncores = ncores or os.cpu_count() or 1 self._available = self._ncores self._asyncio_tasks: Set[asyncio.Task[Any]] = set() self._pending: Optional[int] = None self._registered_exceptions = 0 def post_enter(self, sess: Session) -> None: # noqa: D102 sess.storage['scheduler'] = self._run_coro async def pre_run(self) -> None: # noqa: D102 self._sem = asyncio.BoundedSemaphore(self._ncores) self._lock = asyncio.Lock() async def post_run(self) -> None: # noqa: D102 if not self._asyncio_tasks: return log.info(f'Cancelling {len(self._asyncio_tasks)} running tasks...') for task in self._asyncio_tasks: task.cancel() await asyncio.gather(*self._asyncio_tasks) assert not self._asyncio_tasks log.info('All tasks cancelled') def _release(self, ncores: int) -> None: if self._pending is None: for _ in range(ncores): self._sem.release() self._available += ncores else: self._pending += ncores def _stop(self) -> None: assert self._pending is None self._pending = 0 log.info(f'Stopping scheduler') def ignored_exception(self) -> None: # noqa: D102 if self._registered_exceptions == 0: return self._registered_exceptions -= 1 if self._registered_exceptions > 0: return assert self._pending is not None log.info(f'Resuming scheduler with {self._pending} cores') pending = self._pending self._pending = None self._release(pending) def wrap_execute(self, execute: TaskExecute) -> TaskExecute: # noqa: D102 async def _execute(task: Task[Any], done: NodeExecuted[Task[Any]]) -> None: try: await execute(task, done) except Exception as e: if not isinstance(e, asyncio.CancelledError): done((task, e, ())) asyncio_task = asyncio.current_task() assert asyncio_task self._asyncio_tasks.remove(asyncio_task) async def spawn_execute(*args: Any) -> None: asyncio_task = asyncio.create_task(_execute(*args)) self._asyncio_tasks.add(asyncio_task) return spawn_execute @asynccontextmanager async def _acquire(self, ncores: int) -> AsyncGenerator[None, None]: async with self._lock: for _ in range(ncores): await self._sem.acquire() self._available -= 1 try: yield except Exception: if self._registered_exceptions == 0: self._stop() self._registered_exceptions += 1 raise finally: self._release(ncores) async def _run_coro(self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any) -> _T: task = Session.active().running_task n = cast(int, task.storage.get('ncores', 1)) if n > self._available: log.debug( f'Waiting for {n-self._available}/{n} ' f'unavailable cores for "{task}"' ) waited = True else: waited = False async with self._acquire(n): if waited: log.debug(f'All {n} cores available for "{task}", resuming') return await corofunc(*args, **kwargs) PK!9ymona/plugins/tmpdir.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import shutil from contextlib import contextmanager from pathlib import Path from tempfile import mkdtemp from typing import Iterator from ..dirtask import TmpdirManager as _TmpdirManager from ..sessions import Session, SessionPlugin from ..utils import Pathable log = logging.getLogger(__name__) class TmpdirManager(_TmpdirManager, SessionPlugin): """Plugin that manages temporary directories.""" name = 'tmpdir_manager' def __init__(self, root: Pathable) -> None: self._root = Path(root).resolve() def post_enter(self, sess: Session) -> None: # noqa: D102 sess.storage['dir_task:tmpdir_manager'] = self @contextmanager def tempdir(self) -> Iterator[str]: # noqa: D102 task = Session.active().running_task path = mkdtemp(prefix=f'{task.hashid[:6]}_', dir=str(self._root)) log.debug(f'Created tempdir for "{task.label}": {path}') try: yield path except Exception: raise else: shutil.rmtree(path) PK!4voomona/pyhash.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import ast import inspect import json import os import sys from itertools import chain, dropwhile from pathlib import Path from textwrap import dedent from types import CodeType, ModuleType from typing import Any, Callable, Dict, Optional, TypeVar, cast from .errors import CompositeError, HashingError from .hashing import Hash, HashedComposite, hash_text from .utils import get_fullname __all__ = () _T = TypeVar('_T') # Travis duplicates some stdlib modules in virtualenv _stdlib_paths = [str(Path(m.__file__).parent) for m in [os, ast]] _cache: Dict[Callable[..., Any], Hash] = {} def is_stdlib(mod: ModuleType) -> bool: return any(mod.__file__.startswith(p) for p in _stdlib_paths) def version_of(mod: ModuleType) -> Optional[str]: parts = mod.__name__.split('.') for n in range(len(parts), 0, -1): mod = sys.modules['.'.join(parts[:n])] try: return cast(str, getattr(mod, '__version__')) except AttributeError: pass return None def hash_function(func: Callable[..., Any]) -> Hash: try: return _cache[func] except KeyError: pass ast_code = ast_code_of(func) hashed_globals = hashed_globals_of(func) spec = json.dumps({'ast_code': ast_code, 'globals': hashed_globals}, sort_keys=True) return _cache.setdefault(func, hash_text(spec)) def ast_code_of(func: Callable[..., Any]) -> str: lines = dedent(inspect.getsource(func)).split('\n') lines = list(dropwhile(lambda l: l[0] == '@', lines)) code = '\n'.join(lines) module = ast.parse(code) assert len(module.body) == 1 assert isinstance(module.body[0], (ast.AsyncFunctionDef, ast.FunctionDef)) for node in ast.walk(module): remove_docstring(node) func_node = module.body[0] func_node.name = '' # clear function's name return ast.dump(func_node, annotate_fields=False) def remove_docstring(node: ast.AST) -> None: classes = ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module if not isinstance(node, classes): return if not (node.body and isinstance(node.body[0], ast.Expr)): return docstr = node.body[0].value if isinstance(docstr, ast.Str): node.body.pop(0) def hashed_globals_of(func: Callable[..., Any]) -> Dict[str, str]: closure_vars = getclosurevars(func) items = chain(closure_vars.nonlocals.items(), closure_vars.globals.items()) hashed_globals: Dict[str, str] = {} for name, obj in items: if hasattr(obj, '_func_hash'): hashid = ( obj._func_hash() if getattr(obj, 'corofunc', None) is not func else 'self' ) hashed_globals[name] = f'func_hash:{hashid}' continue if inspect.isclass(obj) or inspect.isfunction(obj) or inspect.ismodule(obj): if inspect.ismodule(obj): mod = obj fullname = obj.__name__ else: mod = sys.modules[obj.__module__] fullname = get_fullname(obj) if is_stdlib(mod): hashed_globals[name] = f'{fullname}(stdlib)' continue version = version_of(mod) if version: hashed_globals[name] = f'{fullname}({version})' continue if inspect.isfunction(obj): hashid = hash_function(obj) if obj is not func else 'self' hashed_globals[name] = f'function:{hashid}' continue try: hashid = HashedComposite(*HashedComposite.parse_object(obj)).hashid except CompositeError: pass else: hashed_globals[name] = f'composite:{hashid}' continue raise HashingError(f'In {func} cannot hash global {name} = {obj!r}') return hashed_globals # adapted function from stdlib which parses closures in code consts as well # see https://bugs.python.org/issue34947 def getclosurevars(func: Callable[..., Any]) -> inspect.ClosureVars: code = func.__code__ nonlocal_vars = { name: cell.cell_contents for name, cell in zip(code.co_freevars, func.__closure__ or ()) # type: ignore } global_ns = func.__globals__ # type: ignore builtin_ns = global_ns['__builtins__'] if inspect.ismodule(builtin_ns): builtin_ns = builtin_ns.__dict__ global_vars = {} builtin_vars = {} unbound_names = set() codes = [code] while codes: code = codes.pop() for const in code.co_consts: if isinstance(const, CodeType): codes.append(const) for name in code.co_names: try: global_vars[name] = global_ns[name] except KeyError: try: builtin_vars[name] = builtin_ns[name] except KeyError: unbound_names.add(name) return inspect.ClosureVars(nonlocal_vars, global_vars, builtin_vars, unbound_names) PK!4mona/remotes.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import shlex import subprocess from pathlib import Path from typing import List, Optional, Union, cast from .errors import MonaError __all__ = () class Remote: def __init__(self, host: str, path: str) -> None: self._host = host self._path = path def go(self) -> None: subprocess.run( ['ssh', '-t', self._host, f'cd {self._path} && exec $SHELL'], check=True ) def update(self, *, delete: bool = False, dry: bool = False) -> None: subprocess.run(['ssh', self._host, f'mkdir -p {self._path}'], check=True) excludes: List[str] = ['/.mona/', '/.git/', '/venv/'] excludesfiles = Path('.monaignore'), Path('.gitignore') for file in excludesfiles: if file.exists(): with file.open() as f: excludes.extend(l.strip() for l in f.readlines()) args = ['rsync', '-cirl', *(f'--exclude={excl}' for excl in excludes)] if delete: args.append('--delete') if dry: args.append('--dry-run') args.append('./') args.append(f'{self._host}:{self._path}/') subprocess.run(args, check=True) def command( self, args: List[str], inp: Union[str, bytes] = None, capture_stdout: bool = False, ) -> Optional[bytes]: cmd = ' '.join(['venv/bin/mona', *(shlex.quote(arg) for arg in args)]) if isinstance(inp, str): inp = inp.encode() result = subprocess.run( ['ssh', self._host, f'cd {self._path} && exec {cmd}'], input=inp, stdout=subprocess.PIPE if capture_stdout else None, ) if result.returncode: raise MonaError( f'Command `{cmd}` on {self._host} ended ' f'with exit code {result.returncode}' ) if capture_stdout: return cast(bytes, result.stdout) return None # def command_output(self, args: List[str], inp: str = None) -> str: # output = self.command(args, inp, _get_output=True) # assert output # return output # def check(self, hashes: Dict['TPath', 'Hash']) -> None: # info(f'Checking {self.host}...') # remote_hashes: Dict[TPath, Hash] = {} # output = self.command_output(['list', 'tasks', '**', '--no-color']) # for hashid, path, *_ in (l.split() for l in output.strip().split('\n')): # remote_hashes[TPath(path)] = Hash(hashid) # is_ok = True # for path, hashid in hashes.items(): # if path not in remote_hashes: # print(f'{path} does not exist on remote') # is_ok = False # elif remote_hashes[path] != hashid: # print(f'{path} has a different hash on remote') # is_ok = False # for path, hashid in remote_hashes.items(): # if path not in hashes: # print(f'{path} does not exist on local') # is_ok = False # if is_ok: # info('Local tasks are on remote') # else: # error('Local tasks are not on remote') # # def fetch( # self, hashes: List['Hash'], files: bool = True # ) -> Dict['Hash', Dict[str, Any]]: # info(f'Fetching from {self.host}...') # tasks = { # hashid: task # for hashid, task in json.loads( # self.command_output(['printout'], inp='\n'.join(hashes)) # ).items() # if task.get('outputs') # } # if not files: # info(f'Fetched {len(tasks)}/{len(hashes)} task metadata') # return tasks # info(f'Will fetch {len(tasks)}/{len(hashes)} tasks') # if len(tasks) == 0: # return {} # elif input("Continue? ['y' to confirm]: ") != 'y': # return {} # paths = set( # hashid for task in tasks.values() for hashid in task['outputs'].values() # ) # cmd = [ # 'rsync', # '-r', # '--info=progress2', # '--ignore-existing', # '--files-from=-', # f'{self.host}:{self.path}/.caf/objects', # '.caf/objects', # ] # sp.run(cmd, input='\n'.join(f'{p[0:2]}/{p[2:]}' for p in paths).encode()) # return tasks # def push(self, targets, cache, root, dry=False): # info('Pushing to {}...'.format(self.host)) # roots = [p for p in root.glob('*') # if not targets or p.name in targets] # paths = set() # for task in find_tasks(*roots, stored=True, follow=False): # paths.add(get_stored(task)) # cmd = ['rsync', # '-cirlP', # '--delete', # '--exclude=*.pyc', # '--exclude=.caf/env', # '--exclude=__pycache__', # '--dry-run' if dry else None, # '--files-from=-', # str(cache), # '{0.host}:{0.path}/{1}'.format(self, cache)] # p = sp.Popen(filter_cmd(cmd), stdin=sp.PIPE) # p.communicate('\n'.join(paths).encode()) # class Local(Remote): # def __init__(self) -> None: # self.host = 'local' # # def update(self, top: Path, delete: bool = False, dry: bool = False) -> None: # pass # # def command( # self, args: List[str], inp: str = None, _get_output: bool = False # ) -> Optional[str]: # cmd = ' '.join(arg if ' ' not in arg else repr(arg) for arg in args) # cmd = f"sh -c 'python3 -u caf {cmd}'" # if not _get_output: # info(f'Running `./caf {cmd}` on {self.host}...') # try: # if _get_output: # output = sp.check_output(cmd, shell=True) # else: # sp.check_call(cmd, shell=True) # except sp.CalledProcessError: # error(f'Command `{cmd}` on {self.host} ended with error') # return cast(str, output.strip()) if _get_output else None # # def check(self, hashes: Dict['TPath', 'Hash']) -> None: # pass # # def fetch( # self, hashes: List['Hash'], files: bool = True # ) -> Dict['Hash', Dict[str, Any]]: # pass # # def go(self) -> None: # pass PK!h0o o mona/rules.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import inspect from functools import wraps from typing import Any, Callable, Generic, List, TypeVar from .errors import MonaError from .hashing import Hashed from .pyhash import hash_function from .sessions import Session from .tasks import Corofunc, Task _T = TypeVar('_T') ArgFactory = Callable[[], Hashed[object]] class Rule(Generic[_T]): """Decorator that turns a coroutine function into a rule. A rule is a callable that generates a task instead of actually calling the coroutine. :param corofunc: a coroutine function """ def __init__(self, corofunc: Corofunc[_T]) -> None: if not inspect.iscoroutinefunction(corofunc): raise MonaError(f'Task function is not a coroutine: {corofunc}') self._corofunc = corofunc self._extra_arg_factories: List[ArgFactory] = [] wraps(corofunc)(self) def _ensure_extra_args(self) -> None: if not hasattr(self, '_hash'): self._extra_args = [factory() for factory in self._extra_arg_factories] hashes = [ hash_function(self._corofunc), *(obj.hashid for obj in self._extra_args), ] self._hash = ','.join(hashes) def _func_hash(self) -> str: self._ensure_extra_args() return self._hash def __call__(self, *args: Any, **kwargs: Any) -> Task[_T]: """Create a task. All arguments are passed to :class:`Task`. """ self._ensure_extra_args() assert 'rule' not in kwargs kwargs['rule'] = self._corofunc.__name__ return Session.active().create_task( self._corofunc, *args, *self._extra_args, **kwargs ) def add_extra_arg(self, factory: ArgFactory) -> None: """Register an extra argument factory. :param factory: callable that returns an extra argument that will be appended to the arguments passed when creating a task. """ assert not hasattr(self, '_extra_args') self._extra_arg_factories.append(factory) @property def corofunc(self) -> Corofunc[_T]: """Coroutine function associated with the rule.""" return self._corofunc PK!mona/runners.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import logging import subprocess from typing import Any, Callable, Optional, Tuple, TypeVar, Union from typing_extensions import Protocol, runtime from .sessions import Session from .tasks import Corofunc __version__ = '0.1.0' __all__ = ['run_shell', 'run_process', 'run_thread'] log = logging.getLogger(__name__) _T = TypeVar('_T') ProcessOutput = Union[bytes, Tuple[bytes, bytes]] @runtime class Scheduler(Protocol): async def __call__(self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any) -> _T: ... def _scheduler() -> Optional[Scheduler]: scheduler = Session.active().storage.get('scheduler') if scheduler is None: return None assert isinstance(scheduler, Scheduler) return scheduler async def run_shell(cmd: str, **kwargs: Any) -> ProcessOutput: """Execute a command in a shell. Wrapper around :func:`asyncio.create_subprocess_shell` that handles errors and whose behavior can be modified by session plugins. :param str cmd: a shell command to be executed :param kwargs: all keyword arguments are passed to :func:`~asyncio.create_subprocess_shell`. :data:`~subprocess.PIPE` is passed to `stdin` and `stdout` keyword arguments by default. Return the standard output as bytes if no error output was generated or a tuple of bytes containing standard and error outputs. """ scheduler = _scheduler() assert 'shell' not in kwargs kwargs['shell'] = True if scheduler: return await scheduler(_run_process, cmd, **kwargs) return await _run_process(cmd, **kwargs) async def run_process(*args: str, **kwargs: Any) -> ProcessOutput: """Create a subprocess. Wrapper around :func:`asyncio.create_subprocess_exec` that handles errors and whose behavior can be modified by session plugins. :param str args: arguments of the subprocess :param kwargs: all keyword arguments are passed to :func:`~asyncio.create_subprocess_exec`. :data:`~subprocess.PIPE` is passed to `stdin` and `stdout` keyword arguments by default. Return the standard output as bytes if no error output was generated or a tuple of bytes containing standard and error outputs. """ scheduler = _scheduler() if scheduler: return await scheduler(_run_process, args, **kwargs) return await _run_process(args, **kwargs) async def _run_process( args: Union[str, Tuple[str, ...]], shell: bool = False, input: bytes = None, **kwargs: Any, ) -> Union[bytes, Tuple[bytes, bytes]]: kwargs.setdefault('stdin', subprocess.PIPE) kwargs.setdefault('stdout', subprocess.PIPE) if shell: assert isinstance(args, str) proc = await asyncio.create_subprocess_shell(args, **kwargs) else: assert isinstance(args, tuple) proc = await asyncio.create_subprocess_exec(*args, **kwargs) try: stdout, stderr = await proc.communicate(input) except asyncio.CancelledError: try: proc.terminate() except ProcessLookupError: pass else: await proc.wait() raise if proc.returncode: log.error(f'Got nonzero exit code in {args!r}') raise subprocess.CalledProcessError(proc.returncode, args) if stderr is None: return stdout return stdout, stderr async def run_thread(func: Callable[..., _T], *args: Any) -> _T: """Run a callable in a new thread. Wrapper around :meth:`asyncio.AbstractEventLoop.run_in_executor` whose behavior can be modified by session plugins. :param func: a callable :param args: positional arguments to the callable. Return the result of the callable. """ scheduler = _scheduler() if scheduler: return await scheduler(_run_thread, func, *args) return await _run_thread(func, *args) async def _run_thread(func: Callable[..., _T], *args: Any) -> _T: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, func, *args) PK!mona/sci/__init__.pyPK!2{{mona/sci/aims/__init__.pyfrom .aims import Aims, SpeciesDefaults from .parse import parse_aims __all__ = ['Aims', 'SpeciesDefaults', 'parse_aims'] PK!H&  mona/sci/aims/aims.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import shutil from collections import OrderedDict from copy import deepcopy from pathlib import Path from typing import Any, Callable, Dict, Tuple, cast from ...dirtask import dir_task from ...errors import InvalidInput, MonaError from ...files import File from ...pluggable import Pluggable, Plugin from ...pyhash import hash_function from ...tasks import Task from ..geomlib import Atom, Molecule from .dsl import expand_dicts, parse_aims_input __version__ = '0.1.0' __all__ = ['Aims', 'SpeciesDefaults'] class AimsPlugin(Plugin['Aims']): def process(self, kwargs: Dict[str, Any]) -> None: pass def _func_hash(self) -> str: return hash_function(self.process) class Aims(Pluggable): """A task factory that creates FHI-aims directory tasks.""" def __init__(self) -> None: Pluggable.__init__(self) for factory in default_plugins: factory()(self) def __call__(self, *, label: str = None, **kwargs: Any) -> Task[Dict[str, File]]: """Create an FHI-aims. :param kwargs: processed by individual plugins """ self.run_plugins('process', kwargs) script = File.from_str('aims.sh', kwargs.pop('script')) inputs = [File.from_str(name, cont) for name, cont in kwargs.pop('inputs')] if kwargs: raise InvalidInput(f'Unknown Aims kwargs: {list(kwargs.keys())}') return dir_task(script, inputs, label=label) def _func_hash(self) -> str: return ','.join( [ hash_function(Aims.__call__), *(cast(AimsPlugin, p)._func_hash() for p in self._get_plugins()), ] ) class SpeciesDir(AimsPlugin): def __init__(self) -> None: self._speciesdirs: Dict[Tuple[str, str], Path] = {} def process(self, kwargs: Dict[str, Any]) -> None: sp_def_key = aims, sp_def = kwargs['aims'], kwargs.pop('species_defaults') speciesdir = self._speciesdirs.get(sp_def_key) if not speciesdir: pathname = shutil.which(aims) if not pathname: pathname = shutil.which('aims-master') if not pathname: raise MonaError(f'Aims "{aims}" not found') path = Path(pathname) speciesdir = path.parents[1] / 'aimsfiles/species_defaults' / sp_def self._speciesdirs[sp_def_key] = speciesdir # type: ignore kwargs['speciesdir'] = speciesdir class Atoms(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: if 'atoms' in kwargs: kwargs['geom'] = Molecule([Atom(*args) for args in kwargs.pop('atoms')]) class SpeciesDefaults(AimsPlugin): """Aims plugin that handles adding species defaults to control.in.""" def __init__(self, mod: Callable[..., Any] = None) -> None: self._species_defs: Dict[Tuple[Path, str], Dict[str, Any]] = {} self._mod = mod def process(self, kwargs: Dict[str, Any]) -> None: # noqa: D102 speciesdir = kwargs.pop('speciesdir') all_species = {(a.number, a.species) for a in kwargs['geom'].centers} species_defs = [] for Z, species in sorted(all_species): if (speciesdir, species) not in self._species_defs: species_def = parse_aims_input( (speciesdir / f'{Z:02d}_{species}_default').read_text() )['species'][0] self._species_defs[speciesdir, species] = species_def else: species_def = self._species_defs[speciesdir, species] species_defs.append(species_def) if self._mod: species_defs = deepcopy(species_defs) self._mod(species_defs, kwargs) kwargs['species_defs'] = species_defs def _func_hash(self) -> str: if not self._mod: return super()._func_hash() funcs = self.process, self._mod return ','.join(hash_function(f) for f in funcs) # type: ignore class Control(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: species_tags = [] for spec in kwargs.pop('species_defs'): spec = OrderedDict(spec) while spec: tag, value = spec.popitem(last=False) if tag == 'angular_grids': species_tags.append((tag, value)) for grid in spec.pop('grids'): species_tags.extend(grid.items()) elif tag == 'basis': for basis in value: species_tags.extend(basis.items()) else: species_tags.append((tag, value)) species_tags = [(t, expand_dicts(v)) for t, v in species_tags] tags = [*kwargs.pop('tags').items(), *species_tags] lines = [] for tag, value in tags: if value is None: continue if value is (): lines.append(tag) elif isinstance(value, list): lines.extend(f'{tag} {p2f(v)}' for v in value) else: lines.append(f'{tag} {p2f(value)}') kwargs['control'] = '\n'.join(lines) class Geom(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: kwargs['geometry'] = kwargs.pop('geom').dumps('aims') class Core(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: kwargs['inputs'] = [ ('control.in', kwargs.pop('control')), ('geometry.in', kwargs.pop('geometry')), ] class Script(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: aims, check = kwargs.pop('aims'), kwargs.pop('check', True) lines = ['#!/bin/bash', 'set -e', f'AIMS={aims} run_aims'] if check: lines.append('egrep "Have a nice day|stop_if_parser" STDOUT >/dev/null') kwargs['script'] = '\n'.join(lines) default_plugins = [SpeciesDir, Atoms, SpeciesDefaults, Control, Geom, Core, Script] def p2f(value: Any, nospace: bool = False) -> str: if isinstance(value, bool): return f'.{str(value).lower()}.' if isinstance(value, tuple): return (' ' if not nospace else ':').join(p2f(x) for x in value) if isinstance(value, dict): return ' '.join( f'{p2f(k)}={p2f(v, nospace=True)}' if v is not None else f'{p2f(k)}' for k, v in sorted(value.items()) ) return str(value) PK!{UQmona/sci/aims/aims.txInput: species+=Species ; Species: 'species' species=ID ( ('mass' mass=FLOAT_) ('nucleus' nucleus=FLOAT_) ('l_hartree' l_hartree=INT) ('cut_pot' cut_pot=CutPot) ('basis_dep_cutoff' (basis_dep_cutoff=FLOAT_ | basis_dep_cutoff=BOOL_)) ('radial_base' radial_base=RadialBase) ('radial_multiplier' radial_multiplier=INT) ('cite_reference' cite_reference=/\S+/)? ('basis_acc' basis_acc=FLOAT_)? ('include_min_basis' include_min_basis=BOOL_)? ('pure_gauss' pure_gauss=BOOL_)? ('angular_grids' ( angular_grids='auto' | angular_grids='specified' grids+=InnerGridShell grids=OuterGridShell )) ('valence' valence=Occupation | 'ion_occ' ion_occ=Occupation)* basis+=BasisFunction )# ; CutPot: onset=FLOAT_ width=FLOAT_ scale=FLOAT_ ; RadialBase: number=INT radius=FLOAT_ ; InnerGridShell: 'division' division=Division ; Division: radius=FLOAT_ points=LebedevInt ; OuterGridShell: 'outer_grid' outer_grid=LebedevInt ; Occupation: n=INT l=Angular occupation=FLOAT_ ; BasisFunction: 'ionic' ionic=IonicBasisFunction | 'hydro' hydro=HydroBasisFunction | 'gaussian' gaussian=GaussianBasisFunction ; IonicBasisFunction: n=INT l=Angular (radius=FLOAT_ | radius='auto') ; HydroBasisFunction: n=INT l=Angular z_eff=FLOAT_ ; GaussianBasisFunction: L=INT ('1' alpha=FLOAT_ | /(?!1$)\d+/ (alpha=FLOAT_ coeff=FLOAT_)+) ; BOOL_: /.true.|.false./ ; Angular: /[spdfghi]/ ; LebedevInt: /6|14|26|38|50|86|110|146|170|194|302|350|434|590|770|974|1202|1454|1730|2030|2354|2702|3074|3470|3890|4334|4802|5294|5810/ ; FLOAT_: /-?(\d*\.\d+|\d+\.\d*|\d+)([de]-?\d+)?/ ; Comment: /#.*$/ ; PK!.mona/sci/aims/dsl.pyfrom importlib import resources from typing import Any, Dict, cast from textx.metamodel import TextXClass, metamodel_from_str # type: ignore from .. import aims __version__ = '0.1.0' __all__ = () _bools = {'.true.': True, '.false.': False} _aims_mm = metamodel_from_str( resources.read_text(aims, 'aims.tx'), match_filters={ 'FLOAT_': lambda s: float(s.replace('d', 'e')), 'LebedevInt': int, 'BOOL_': lambda s: _bools[s], }, auto_init_attributes=False, ) def _model_to_dict(o: Any) -> Any: if isinstance(o, TextXClass): return { k: _model_to_dict(v) for k, v in vars(o).items() if k[0] != '_' and k != 'parent' } if isinstance(o, list): return [_model_to_dict(x) for x in o] return o def expand_dicts(o: Any) -> Any: if isinstance(o, dict): return tuple(map(expand_dicts, o.values())) if isinstance(o, list): return list(map(expand_dicts, o)) return o def parse_aims_input(source: str) -> Dict[str, Any]: model = _aims_mm.model_from_str(source) return cast(Dict[str, Any], _model_to_dict(model)) PK!5 mona/sci/aims/parse.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import io import xml.etree.ElementTree as ET from typing import IO, Any, Dict, Type import numpy as np # type: ignore from ...files import File from ...rules import Rule __version__ = '0.1.0' __all__ = ['parse_aims'] @Rule async def parse_aims(outputs: Dict[str, File]) -> Any: """Create a task that parses outputs of FHI-aims calculations. The task takes the output of :class:`mona.sci.aims.Aims` as an input and returns a dictionary of parsed results as output. """ stdout = outputs['results.xml'].read_text() parsed = parse_xml(io.StringIO(stdout)) energies = {x['name']: x['value'][0] for x in parsed['energy']} return {'energy': energies['Total energy']} def parse_xml(source: IO[str]) -> Any: root = ET.parse(source).getroot() return parse_xmlelem(root) def parse_xmlelem(elem: Any) -> Any: results = {} children = {c.tag for c in elem} for child in children: child_elems = elem.findall(child) child_results = [] for child_elem in child_elems: if 'type' in child_elem.attrib: if 'size' in child_elem.attrib: child_elem_results = parse_xmlarr(child_elem) else: child_elem_results = float(child_elem.text) elif len(list(child_elem)): child_elem_results = parse_xmlelem(child_elem) else: child_elem_results = child_elem.text.strip() child_results.append(child_elem_results) if len(child_results) == 1: results[child] = child_results[0] else: results[child] = child_results return results def parse_xmlarr(xmlarr: Any, axis: int = None, typef: Type[Any] = None) -> Any: if axis is None: axis = len(xmlarr.attrib['size'].split()) - 1 if not typef: typename = xmlarr.attrib['type'] if typename == 'dble' or typename == 'real': typef = float elif typename == 'int': typef = int else: raise Exception('Unknown array type') if axis > 0: lst = [ parse_xmlarr(v, axis - 1, typef)[..., None] for v in xmlarr.findall('vector') ] return np.concatenate(lst, axis) else: return np.array([typef(x) for x in xmlarr.text.split()]) PK!Nxmona/sci/atom-data.csv"number","symbol","name","vdw radius","covalent radius","mass","ionization energy" 1,"H","hydrogen",1.2,0.38,1.0079,13.5984 2,"He","helium",1.4,0.32,4.0026,24.5874 3,"Li","lithium",1.82,1.34,6.941,5.3917 4,"Be","beryllium",1.53,0.9,9.0122,9.3227 5,"B","boron",1.92,0.82,10.811,8.298 6,"C","carbon",1.7,0.77,12.0107,11.2603 7,"N","nitrogen",1.55,0.75,14.0067,14.5341 8,"O","oxygen",1.52,0.73,15.9994,13.6181 9,"F","fluorine",1.47,0.71,18.9984,17.4228 10,"Ne","neon",1.54,0.69,20.1797,21.5645 11,"Na","sodium",2.27,1.54,22.9897,5.1391 12,"Mg","magnesium",1.73,1.3,24.305,7.6462 13,"Al","aluminium",1.84,1.18,26.9815,5.9858 14,"Si","silicon",2.1,1.11,28.0855,8.1517 15,"P","phosphorus",1.8,1.06,30.9738,10.4867 16,"S","sulfur",1.8,1.02,32.065,10.36 17,"Cl","chlorine",1.75,0.99,35.453,12.9676 18,"Ar","argon",1.88,0.97,39.948,15.7596 19,"K","potassium",2.75,1.96,39.0983,4.3407 20,"Ca","calcium",2.31,1.74,40.078,6.1132 21,"Sc","scandium",2.11,1.44,44.9559,6.5615 22,"Ti","titanium",,1.36,47.867,6.8281 23,"V","vanadium",,1.25,50.9415,6.7462 24,"Cr","chromium",,1.27,51.9961,6.7665 25,"Mn","manganese",,1.39,54.938,7.434 26,"Fe","iron",,1.25,55.845,7.9024 27,"Co","cobalt",,1.26,58.9332,7.881 28,"Ni","nickel",1.63,1.21,58.6934,7.6398 29,"Cu","copper",1.4,1.38,63.546,7.7264 30,"Zn","zinc",1.39,1.31,65.39,9.3942 31,"Ga","gallium",1.87,1.26,69.723,5.9993 32,"Ge","germanium",2.11,1.22,72.64,7.8994 33,"As","arsenic",1.85,1.19,74.9216,9.7886 34,"Se","selenium",1.9,1.16,78.96,9.7524 35,"Br","bromine",1.85,1.14,79.904,11.8138 36,"Kr","krypton",2.02,1.1,83.8,13.9996 37,"Rb","rubidium",3.03,2.11,85.4678,4.1771 38,"Sr","strontium",2.49,1.92,87.62,5.6949 39,"Y","yttrium",,1.62,88.9059,6.2173 40,"Zr","zirconium",,1.48,91.224,6.6339 41,"Nb","niobium",,1.37,92.9064,6.7589 42,"Mo","molybdenum",,1.45,95.94,7.0924 43,"Tc","technetium",,1.56,98,7.28 44,"Ru","ruthenium",,1.26,101.07,7.3605 45,"Rh","rhodium",,1.35,102.9055,7.4589 46,"Pd","palladium",1.63,1.31,106.42,8.3369 47,"Ag","silver",1.72,1.53,107.8682,7.5762 48,"Cd","cadmium",1.58,1.48,112.411,8.9938 49,"In","indium",1.93,1.44,114.818,5.7864 50,"Sn","tin",2.17,1.41,118.71,7.3439 51,"Sb","antimony",2.06,1.38,121.76,8.6084 52,"Te","tellurium",2.06,1.35,127.6,9.0096 53,"I","iodine",1.98,1.33,126.9045,10.4513 54,"Xe","xenon",2.16,1.3,131.293,12.1298 55,"Cs","caesium",3.43,2.25,132.9055,3.8939 56,"Ba","barium",2.68,1.98,137.327,5.2117 57,"La","lanthanum",,1.69,138.9055,5.5769 58,"Ce","cerium",,,140.116,5.5387 59,"Pr","praseodymium",,,140.9077,5.473 60,"Nd","neodymium",,,144.24,5.525 61,"Pm","promethium",,,145,5.582 62,"Sm","samarium",,,150.36,5.6437 63,"Eu","europium",,,151.964,5.6704 64,"Gd","gadolinium",,,157.25,6.1501 65,"Tb","terbium",,,158.9253,5.8638 66,"Dy","dysprosium",,,162.5,5.9389 67,"Ho","holmium",,,164.9303,6.0215 68,"Er","erbium",,,167.259,6.1077 69,"Tm","thulium",,,168.9342,6.1843 70,"Yb","ytterbium",,,173.04,6.2542 71,"Lu","lutetium",,1.6,174.967,5.4259 72,"Hf","hafnium",,1.5,178.49,6.8251 73,"Ta","tantalum",,1.38,180.9479,7.5496 74,"W","tungsten",,1.46,183.84,7.864 75,"Re","rhenium",,1.59,186.207,7.8335 76,"Os","osmium",,1.28,190.23,8.4382 77,"Ir","iridium",,1.37,192.217,8.967 78,"Pt","platinum",1.75,1.28,195.078,8.9587 79,"Au","gold",1.66,1.44,196.9665,9.2255 80,"Hg","mercury",1.55,1.49,200.59,10.4375 81,"Tl","thallium",1.96,1.48,204.3833,6.1082 82,"Pb","lead",2.02,1.47,207.2,7.4167 83,"Bi","bismuth",2.07,1.46,208.9804,7.2856 84,"Po","polonium",1.97,,209,8.417 85,"At","astatine",2.02,,210,9.3 86,"Rn","radon",2.2,1.45,222,10.7485 87,"Fr","francium",3.48,,223,4.0727 88,"Ra","radium",2.83,,226,5.2784 89,"Ac","actinium",,,227,5.17 90,"Th","thorium",,,232.0381,6.3067 91,"Pa","protactinium",,,231.0359,5.89 92,"U","uranium",1.86,,238.0289,6.1941 PK!?`IQFQFmona/sci/geomlib.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import csv import json import os from collections import OrderedDict from copy import deepcopy from importlib import resources from io import StringIO from itertools import chain, product, repeat from typing import ( IO, TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, Iterator, List, Sized, Tuple, Type, TypeVar, Union, cast, ) from .. import sci if TYPE_CHECKING: import numpy as np # type: ignore else: np = None # lazy-loaded in Molecule constructor __version__ = '0.1.0' __all__ = ['Atom', 'Molecule', 'Crystal', 'readfile', 'load', 'loads'] Vec = Tuple[float, float, float] _M = TypeVar('_M', bound='Molecule') bohr = 0.529_177_210_92 with resources.open_text(sci, 'atom-data.csv') as f: species_data = OrderedDict( (r['symbol'], {**r, 'number': int(r['number'])}) # type: ignore for r in csv.DictReader((l for l in f), quoting=csv.QUOTE_NONNUMERIC) ) _string_cache: Dict[Any, str] = {} def no_neg_zeros(r: Any) -> Any: return [0.0 if abs(x) < 1e-8 else x for x in r] class Atom: """Represents a single atom. :param species: atom type :param coord: atom coordinate """ def __init__(self, species: str, coord: Vec, **flags: Any) -> None: self.species = species self.coord: Vec = cast(Vec, tuple(coord)) self.flags = flags @property def mass(self) -> float: """Atom mass.""" mass: float = species_data[self.species]['mass'] return mass @property def number(self) -> int: """Atom number.""" return int(species_data[self.species]['number']) @property def covalent_radius(self) -> float: """Covalent radius.""" r: float = species_data[self.species]['covalent radius'] return r def copy(self) -> 'Atom': """Create a copy.""" return Atom(self.species, self.coord, **deepcopy(self.flags)) class Molecule(Sized, Iterable[Atom]): """Represents a molecule. :param atoms: atoms """ def __init__(self, atoms: List[Atom], **flags: Any) -> None: global np if np is None: import numpy as np self._atoms = atoms self.flags = flags @classmethod def from_coords( cls: Type[_M], species: List[str], coords: List[Vec], **flags: Any ) -> _M: """Alternative constructor. :param species: atom types :param coords: coordinates """ return cls([Atom(sp, coord) for sp, coord in zip(species, coords)], **flags) @property def species(self) -> List[str]: """Atom types.""" return [atom.species for atom in self] @property def numbers(self) -> List[int]: """Atom numbers.""" return [atom.number for atom in self] @property def mass(self) -> float: """Molecular mass.""" return sum(atom.mass for atom in self) @property def cms(self) -> Any: """Center of mass.""" masses = np.array([atom.mass for atom in self]) return (masses[:, None] * self.xyz).sum(0) / self.mass @property def inertia(self) -> Any: """Inertia tensor.""" masses = np.array([atom.mass for atom in self]) coords_w = np.sqrt(masses)[:, None] * (self.xyz - self.cms) a = np.array([np.diag(np.full(3, r)) for r in np.sum(coords_w ** 2, 1)]) b = coords_w[:, :, None] * coords_w[:, None, :] return np.sum(a - b, 0) def __getitem__(self, i: int) -> Atom: return self._atoms[i] @property def coords(self) -> List[Vec]: """Coordinates.""" return [atom.coord for atom in self] def __repr__(self) -> str: return "<{} '{}'>".format(self.__class__.__name__, self.formula) @property def xyz(self) -> Any: """Coordinates as a numpy array.""" return np.array(self.coords) @property def formula(self) -> str: """Formula.""" counter = DefaultDict[str, int](int) for species in self.species: counter[species] += 1 return ''.join(f'{sp}{n if n > 1 else ""}' for sp, n in sorted(counter.items())) def bondmatrix(self, scale: float) -> Any: """Return a connectivity matrix.""" xyz = self.xyz rs = np.array([atom.covalent_radius for atom in self]) dmatrix = np.sqrt(np.sum((xyz[None, :] - xyz[:, None]) ** 2, 2)) thrmatrix = scale * (rs[None, :] + rs[:, None]) return dmatrix < thrmatrix def get_fragments(self, scale: float = 1.3) -> List['Molecule']: """Return a list of clusters of connected atoms.""" bond = self.bondmatrix(scale) ifragments = getfragments(bond) fragments = [ Molecule([self._atoms[i].copy() for i in fragment]) for fragment in ifragments ] return fragments def hash(self) -> int: """Hash of a molecule from rounded moments of inertia.""" if len(self) == 1: return self[0].number return hash(tuple(np.round(sorted(np.linalg.eigvalsh(self.inertia)), 3))) def shifted(self: _M, delta: Vec) -> _M: """Return a new molecule shifted in space.""" m = self.copy() for atom in m: c = atom.coord atom.coord = (c[0] + delta[0], c[1] + delta[1], c[2] + delta[2]) return m def __add__(self: _M, other: object) -> _M: if not isinstance(other, Molecule): return NotImplemented geom = self.copy() geom._atoms.extend(other.copy()) return geom def centered(self: _M) -> _M: """Return a new molecule with a center of mass at origin.""" return self.shifted(-self.cms) def rotated( self: _M, axis: Union[str, int] = None, phi: float = None, center: Vec = None, rotmat: Any = None, ) -> _M: """Return a new rotated molecule.""" if rotmat is None: assert axis and phi phi = phi * np.pi / 180 rotmat = np.array( [ [1, 0, 0], [0, np.cos(phi), -np.sin(phi)], [0, np.sin(phi), np.cos(phi)], ] ) if isinstance(axis, str): shift = {'x': 0, 'y': 1, 'z': 2}[axis] else: shift = axis for i in [0, 1]: rotmat = np.roll(rotmat, shift, i) center = np.array(center) if center else self.cms m = self.copy() for atom in m: atom.coord = tuple(center + rotmat.dot(atom.coord - center)) # type: ignore return m @property def centers(self) -> Iterator[Atom]: """Iterate over atoms.""" yield from self._atoms def __iter__(self) -> Iterator[Atom]: yield from (atom for atom in self._atoms if not atom.flags.get('ghost')) def __len__(self) -> int: return len([atom for atom in self._atoms if not atom.flags.get('ghost')]) def __format__(self, fmt: str) -> str: fp = StringIO() self.dump(fp, fmt) return fp.getvalue() def items(self) -> Iterator[Tuple[str, Vec]]: """Iterate over tuples of atom type and coordinate.""" for atom in self: yield atom.species, atom.coord dumps = __format__ def dump(self, f: IO[str], fmt: str) -> None: """Write a molecule to a file. Supported formats: 'xyz', 'aims', 'mopac'. """ if fmt == '': f.write(repr(self)) elif fmt == 'xyz': f.write('{}\n'.format(len(self))) f.write('Formula: {}\n'.format(self.formula)) for species, coord in self.items(): f.write( '{:>2} {}\n'.format( species, ' '.join('{:15.8}'.format(x) for x in no_neg_zeros(coord)), ) ) elif fmt == 'aims': for i, atom in enumerate(self.centers): species, r = atom.species, atom.coord ghost = atom.flags.get('ghost', False) key = (species, r, ghost, fmt) r = no_neg_zeros(r) try: f.write(_string_cache[key]) except KeyError: kind = 'atom' if not ghost else 'empty' s = f'{kind} {r[0]:15.8f} {r[1]:15.8f} {r[2]:15.8f} {species:>2}\n' f.write(s) _string_cache[key] = s for con in self.flags.get('constrains', {}).get(i, []): f.write(f'constrain_relaxation {con}\n') elif fmt == 'mopac': f.write('* Formula: {}\n'.format(self.formula)) for species, coord in self.items(): f.write( '{:>2} {}\n'.format( species, ' '.join('{:15.8} 1'.format(x) for x in no_neg_zeros(coord)), ) ) else: raise ValueError("Unknown format: '{}'".format(fmt)) def copy(self: _M) -> _M: """Create a cpoy.""" return type(self)([atom.copy() for atom in self._atoms]) def ghost(self: _M) -> _M: """Create a copy with all atoms as ghost atoms.""" m = self.copy() for atom in m: atom.flags['ghost'] = True return m def write(self, filename: str) -> None: """Write to a file.""" ext = os.path.splitext(filename)[1] if ext == '.xyz': fmt = 'xyz' elif ext == '.xyzc': fmt = 'xyzc' elif ext == '.aims' or os.path.basename(filename) == 'geometry.in': fmt = 'aims' elif ext == '.mopac': fmt = 'mopac' with open(filename, 'w') as f: self.dump(f, fmt) class Crystal(Molecule): """Represents a crystal. Inherits from :class:`Molecule`. :param atoms: atoms :param lattice: lattice vectors """ def __init__(self, atoms: List[Atom], lattice: List[Vec], **flags: Any) -> None: super().__init__(atoms, **flags) self.lattice = lattice @classmethod def from_coords( # type: ignore cls, species: List[str], coords: List[Vec], lattice: List[Vec], **flags ) -> 'Crystal': """Alternative constructor. :param species: atom types :param coords: coordinates :param lattice: lattice vectors """ return cls( [Atom(sp, coord) for sp, coord in zip(species, coords)], lattice, **flags ) def dump(self, f: IO[str], fmt: str) -> None: """Write a crystal to a file. Supported formats: 'aims', 'vasp'. """ if fmt == '': f.write(repr(self)) elif fmt == 'aims': for label, r in zip('abc', self.lattice): x, y, z = no_neg_zeros(r) f.write(f'lattice_vector {x:15.8f} {y:15.8f} {z:15.8f}\n') for con in self.flags.get('constrains', {}).get(label, []): f.write(f'constrain_relaxation {con}\n') super().dump(f, fmt) elif fmt == 'vasp': f.write(f'Formula: {self.formula}\n') f.write(f'{1:15.8f}\n') for r in self.lattice: x, y, z = no_neg_zeros(r) f.write(f'{x:15.8f} {y:15.8f} {z:15.8f}\n') species: Dict[str, List[Atom]] = OrderedDict( (sp, []) for sp in set(self.species) ) f.write(' '.join(species.keys()) + '\n') for atom in self: species[atom.species].append(atom) f.write(' '.join(str(len(atoms)) for atoms in species.values()) + '\n') f.write('cartesian\n') for atom in chain(*species.values()): r = no_neg_zeros(atom.coord) s = f'{r[0]:15.8f} {r[1]:15.8f} {r[2]:15.8f}\n' f.write(s) else: raise ValueError(f'Unknown format: {fmt!r}') def copy(self) -> 'Crystal': """Create a copy.""" return Crystal([atom.copy() for atom in self._atoms], self.lattice.copy()) def rotated( self, axis: Union[str, int] = None, phi: float = None, center: Vec = None, rotmat: Any = None, ) -> 'Crystal': """Return a new crystal with rotated unit cell and lattice vectors.""" assert center is None g = super().rotated(axis, phi, (0, 0, 0), rotmat) m = Molecule.from_coords(['_'] * 3, self.lattice) m = m.rotated(axis, phi, (0, 0, 0), rotmat) g.lattice = m.coords return g @property def abc(self) -> Any: """Latice vectors as a numpy array.""" return np.array(self.lattice) def get_kgrid(self, density: float = 0.06) -> Tuple[int, int, int]: """Return a k-point grid with a given density.""" rec_lattice = 2 * np.pi * np.linalg.inv(self.abc.T) rec_lens = np.sqrt((rec_lattice ** 2).sum(1)) nkpts = np.ceil(rec_lens / (density * bohr)) return int(nkpts[0]), int(nkpts[1]), int(nkpts[2]) def supercell(self, ns: Tuple[int, int, int]) -> 'Crystal': """Create a supercell.""" abc = self.abc latt_vectors = np.array( [ sum(s * vec for s, vec in zip(shift, abc)) for shift in product(*map(range, ns)) ] ) species = list(chain.from_iterable(repeat(self.species, len(latt_vectors)))) coords = [ (x, y, z) for x, y, z in (self.xyz[None, :, :] + latt_vectors[:, None, :]).reshape( (-1, 3) ) ] lattice = [(x, y, z) for x, y, z in abc * np.array(ns)[:, None]] return Crystal.from_coords(species, coords, lattice) def normalized(self) -> 'Crystal': """Create a copy with atoms on unit cell faces normalized.""" xyz = ( np.mod(self.xyz @ np.linalg.inv(self.lattice) + 1e-10, 1) - 1e-10 ) @ self.lattice return Crystal.from_coords(self.species, xyz, self.lattice.copy()) def get_vec(ws: List[str]) -> Vec: return float(ws[0]), float(ws[1]), float(ws[2]) def load(fp: IO[str], fmt: str) -> Molecule: # noqa: C901 """Read a molecule or a crystal from a file object.""" if fmt == 'xyz': n = int(fp.readline()) try: flags = json.loads(fp.readline()) except json.decoder.JSONDecodeError: flags = {} species = [] coords = [] for _ in range(n): ws = fp.readline().split() species.append(ws[0]) coords.append(get_vec(ws[1:4])) return Molecule.from_coords(species, coords, **flags) elif fmt == 'xyzc': n = int(fp.readline()) lattice = [] for _ in range(3): lattice.append(get_vec(fp.readline().split())) species = [] coords = [] for _ in range(n): ws = fp.readline().split() species.append(ws[0]) coords.append(get_vec(ws[1:4])) return Crystal.from_coords(species, coords, lattice) if fmt == 'aims': atoms = [] lattice = [] while True: line = fp.readline() if line == '': break line = line.strip() if not line or line.startswith('#'): continue ws = line.split() what = ws[0] if what in ['atom', 'empty']: atoms.append(Atom(ws[4], get_vec(ws[1:4]), ghost=what == 'empty')) elif what == 'lattice_vector': lattice.append(get_vec(ws[1:4])) if lattice: assert len(lattice) == 3 return Crystal(atoms, lattice) else: return Molecule(atoms) raise ValueError(f'Unknown format: {fmt}') def loads(s: str, fmt: str) -> Molecule: """Read a molecule or a crystal from a string.""" fp = StringIO(s) return load(fp, fmt) def readfile(path: str, fmt: str = None) -> Molecule: """Read a molecule or a crystal from a path.""" if not fmt: ext = os.path.splitext(path)[1] if ext == '.xyz': fmt = 'xyz' elif ext == '.aims' or os.path.basename(path) == 'geometry.in': fmt = 'aims' elif ext == '.xyzc': fmt = 'xyzc' else: raise RuntimeError('Cannot determine format') with open(path) as f: return load(f, fmt) def getfragments(conn: Any) -> List[List[int]]: n = conn.shape[0] assigned = [-1 for _ in range(n)] # fragment index, otherwise -1 ifragment = 0 # current fragment index queue = [0 for _ in range(n)] # allocate queue of neighbors for elem in range(n): # iterate over elements if assigned[elem] >= 0: # skip if assigned continue queue[0], a, b = elem, 0, 1 # queue starting with the element itself while b - a > 0: # until queue is exhausted node, a = queue[a], a + 1 # pop from queue assigned[node] = ifragment # assign node neighbors = np.flatnonzero(conn[node, :]) # list of neighbors for neighbor in neighbors: if not (assigned[neighbor] >= 0 or neighbor in queue[a:b]): # add to queue if not assigned or in queue queue[b], b = neighbor, b + 1 ifragment += 1 fragments = [ [i for i, f in enumerate(assigned) if f == fragment] for fragment in range(ifragment) ] return fragments PK!ѳmona/sci/tex.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from typing import Dict from jinja2 import Template __version__ = '0.1.0' __all__ = ['jinja_tex'] def jinja_tex(tex_template: str, ctx: Dict[str, object]) -> str: """Render a Jinja TeX template. Uses ``<>`` for variables, ``<+/+>`` for blocks, and ``<#/#>`` for comments. :param str tex_template: a Jinja template :param dict ctx: a variable context """ jinja_template = Template( tex_template, variable_start_string=r'<<', variable_end_string='>>', block_start_string='<+', block_end_string='+>', comment_start_string='<#', comment_end_string='#>', trim_blocks=True, autoescape=False, ) return jinja_template.render(ctx) PK!U/nT==mona/sessions.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import logging import warnings from collections import defaultdict from contextlib import asynccontextmanager, contextmanager from contextvars import ContextVar from functools import wraps from itertools import chain from typing import ( Any, AsyncGenerator, Awaitable, Callable, Dict, FrozenSet, Iterable, Iterator, List, NamedTuple, Optional, Sequence, Tuple, TypeVar, Union, cast, ) from .dag import ( Action, NodeException, NodeExecuted, Priority, default_priority, traverse, traverse_async, ) from .errors import FutureError, MonaError, SessionError, TaskError from .futures import STATE_COLORS from .hashing import Hash, Hashed from .pluggable import Pluggable, Plugin from .tasks import Corofunc, HashedFuture, State, Task, maybe_hashed from .utils import Literal, call_if, split __version__ = '0.1.0' __all__ = ['Session'] log = logging.getLogger(__name__) _T = TypeVar('_T') ATask = Task[object] TaskExecute = Callable[[ATask, NodeExecuted[ATask]], Awaitable[None]] ExceptionHandler = Callable[[ATask, Exception], bool] TaskFilter = Callable[[ATask], bool] _active_session: ContextVar[Optional['Session']] = ContextVar( 'active_session', default=None ) class SessionPlugin(Plugin['Session']): def post_enter(self, sess: 'Session') -> None: pass def pre_exit(self, sess: 'Session') -> None: pass async def pre_run(self) -> None: pass async def post_run(self) -> None: pass def post_task_run(self, task: ATask) -> None: pass def save_hashed(self, objs: Sequence[Hashed[object]]) -> None: pass def ignored_exception(self) -> None: pass def wrap_execute(self, exe: TaskExecute) -> TaskExecute: return exe def post_create(self, task: ATask) -> None: pass class SessionGraph(NamedTuple): deps: Dict[Hash, FrozenSet[Hash]] side_effects: Dict[Hash, List[Hash]] backflow: Dict[Hash, FrozenSet[Hash]] class Session(Pluggable): """A context manager in which tasks can be created. :param plugins: session plugins to load. This is equivalent to calling each plugin with the created session as an argument :param bool warn: warn at the end of session if some created tasks were not executed """ def __init__( self, plugins: Iterable[SessionPlugin] = None, warn: bool = True ) -> None: Pluggable.__init__(self) for plugin in plugins or (): plugin(self) self._tasks: Dict[Hash, ATask] = {} self._graph = SessionGraph({}, defaultdict(list), {}) self._running_task: ContextVar[Optional[ATask]] = ContextVar('running_task') self._running_task.set(None) self._storage: Dict[str, Any] = {} self._warn = warn self._skipped = False def _check_active(self) -> None: sess = _active_session.get() if sess is None or sess is not self: raise SessionError(f'Not active: {self!r}', self) @property def storage(self) -> Dict[str, object]: """General-purpose dictionary-based storage.""" self._check_active() return self._storage def get_side_effects(self, task: ATask) -> Iterable[ATask]: """Return tasks created by a given task.""" return tuple(self._tasks[h] for h in self._graph.side_effects[task.hashid]) def all_tasks(self) -> Iterable[ATask]: """Return all tasks created in session.""" yield from self._tasks.values() def __enter__(self) -> 'Session': assert _active_session.get() is None self._active_session_token = _active_session.set(self) self.run_plugins('post_enter', self) return self def _filter_tasks(self, cond: TaskFilter) -> List[ATask]: return list(filter(cond, self._tasks.values())) def __exit__(self, exc_type: Any, *args: Any) -> None: assert _active_session.get() is self self.run_plugins('pre_exit', self) _active_session.reset(self._active_session_token) del self._active_session_token if self._warn and not self._skipped and exc_type is None: tasks_not_run = self._filter_tasks(lambda t: t.state < State.RUNNING) if tasks_not_run: warnings.warn(f'tasks have never run: {tasks_not_run}', RuntimeWarning) self._tasks.clear() self._storage.clear() self._graph.deps.clear() self._graph.side_effects.clear() self._graph.backflow.clear() @property def running_task(self) -> ATask: # noqa: D401 """Currently running task.""" task = self._running_task.get() if task: return task raise SessionError(f'No running task: {self!r}', self) @contextmanager def _running_task_ctx(self, task: ATask) -> Iterator[None]: assert not self._running_task.get() self._running_task.set(task) try: yield finally: assert self._running_task.get() is task self._running_task.set(None) def _process_objects(self, objs: Iterable[Hashed[object]]) -> List[ATask]: objs = list( traverse(objs, lambda o: o.components, lambda o: isinstance(o, Task)) ) tasks, objs = cast( Tuple[List[ATask], List[Hashed[object]]], split(objs, lambda o: isinstance(o, Task)), ) for task in tasks: if task.hashid not in self._tasks: raise TaskError(f'Not in session: {task!r}', task) self.run_plugins('save_hashed', objs) return tasks def register_task(self, task: Task[_T]) -> Tuple[Task[_T], bool]: """Register a task in a session.""" try: return cast(Task[_T], self._tasks[task.hashid]), False except KeyError: pass self._tasks[task.hashid] = task task.register() arg_tasks = self._process_objects(task.args) self._graph.deps[task.hashid] = frozenset(t.hashid for t in arg_tasks) return task, True def add_side_effect_of(self, caller: ATask, callee: ATask) -> None: """Register a task created by a task.""" self._graph.side_effects[caller.hashid].append(callee.hashid) def create_task( self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any ) -> Task[_T]: """Create a new task. :param corofunc: a coroutine function to be executed :param args: arguments to the coroutine :param kwargs: keyword arguments passed to :class:`~tasks.Task` """ task = Task(corofunc, *args, **kwargs) caller = self._running_task.get() if caller: self.add_side_effect_of(caller, task) task, registered = self.register_task(task) if registered: self.run_plugins('post_create', task) return task @asynccontextmanager async def run_context(self) -> AsyncGenerator[None, None]: """Context in which tasks should be run.""" await self.run_plugins_async('pre_run') try: yield finally: await self.run_plugins_async('post_run') async def _run_task(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: async with self.run_context(): return await self.run_task_async(task) def run_task(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: """Run a task. :param task: task to run Return the result of the task's coroutine function or it's hashed instance if hashable. """ return asyncio.run(self._run_task(task)) def set_result(self, task: Task[_T], result: Union[_T, Hashed[_T]]) -> None: """Attach a result to a task.""" if not isinstance(result, Hashed): task.set_result(result) return if not isinstance(result, HashedFuture) or result.done(): task.set_result(result) else: log.debug(f'{task}: has run, pending: {result}') task.set_future_result(result) result.add_done_callback(lambda fut: task.set_done()) result.register() backflow = self._process_objects([result]) self._graph.backflow[task.hashid] = frozenset(t.hashid for t in backflow) async def run_task_async(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: """Run a task asynchronously.""" if task.state < State.READY: raise TaskError(f'Not ready: {task!r}', task) if task.state > State.READY: raise TaskError(f'Task was already run: {task!r}', task) task.set_running() with self._running_task_ctx(task): raw_result = await task.corofunc(*(arg.value for arg in task.args)) task.set_has_run() side_effects = self.get_side_effects(task) if side_effects: log.debug(f'{task}: created tasks: {list(map(Literal, side_effects))}') result = cast(_T, maybe_hashed(raw_result)) or raw_result self.set_result(task, result) self.run_plugins('post_task_run', task) return result async def _traverse_execute(self, task: ATask, done: NodeExecuted[ATask]) -> None: await self.run_task_async(task) backflow = (self._tasks[h] for h in self._graph.backflow.get(task.hashid, ())) done((task, None, backflow)) def eval(self, *args: Any, **kwargs: Any) -> Any: """Blocking version of :meth:`eval_async`.""" return asyncio.run(self.eval_async(*args, **kwargs)) # TODO reduce complexity async def _eval_async( # noqa: C901 self, obj: object, depth: bool = False, priority: Priority = default_priority, exception_handler: ExceptionHandler = None, task_filter: TaskFilter = None, limit: int = None, ) -> Any: """Evaluate an object by running all tasks it references. This includes all newly created tasks that are referenced indirectly. :param obj: any hashable object :param bool depth: traverse DAG depth-first if true, breadth-first otherwise :param tuple priority: prioritize steps in DAG traversal in order :param exception_handler: callable that accepts a task and an exception it raised and returns True if the exception should be ignored :param task_filter: callable that accepts a task and returns True if the task should be executed :param int limit: limit of the number of executed task Return the evaluated object. """ fut = maybe_hashed(obj) if not isinstance(fut, HashedFuture): return obj fut.register() exceptions = {} traversal = traverse_async( self._process_objects([fut]), lambda task: ( self._tasks[h] for h in chain( self._graph.deps[task.hashid], self._graph.backflow.get(task.hashid, ()), ) ), lambda task, reg: call_if( task.state < State.RUNNING, task.add_ready_callback, lambda t: reg(t) ), self.run_plugins('wrap_execute', self._traverse_execute, wrap_first=True), depth, priority, ) n_executed = 0 shutdown = False do_step: bool = None # type: ignore while True: try: step_or_exception = await traversal.asend(do_step) except StopAsyncIteration: break if isinstance(step_or_exception, NodeException): task, exc = step_or_exception if isinstance(exc, (MonaError, AssertionError, asyncio.CancelledError)): raise exc else: assert isinstance(exc, Exception) if exception_handler and exception_handler(task, exc): self.run_plugins('ignored_exception') exceptions[task] = exc task.set_error() log.info(f'Handled {exc!r} from {task!r}') do_step = True continue raise exc action, task, progress = step_or_exception progress_line = ' '.join(f'{k}={v}' for k, v in progress.items()) tag = action.name if task: tag += f': {task.label}' log.debug(f'{tag}, progress: {progress_line}') if action is Action.EXECUTE: do_step = not shutdown if do_step: n_executed += 1 if limit: assert n_executed <= limit if n_executed == limit: log.info('Maximum number of executed tasks reached') shutdown = True log.info(f'{task}: will run') elif action is Action.TRAVERSE: if task.done(): do_step = False elif not task_filter: do_step = True else: do_step = task_filter(task) if not do_step: self._skipped = True log.info('Finished') try: return fut.value except FutureError: if exceptions: msg = f'Cannot evaluate future because of errors: {exceptions}' log.warning(msg) elif self._skipped: log.info('Cannot evaluate future because tasks were skipped') else: tasks_not_done = self._filter_tasks(lambda t: not t.done()) if tasks_not_done: msg = f'Task dependency cycle: {tasks_not_done}' raise MonaError(msg) else: raise return fut @wraps(_eval_async) async def eval_async(self, *args: Any, **kwargs: Any) -> Any: async with self.run_context(): return await self._eval_async(*args, **kwargs) def dot_graph(self, *args: Any, **kwargs: Any) -> Any: """Generate :class:`~graphviz.Digraph` for the task DAG.""" from graphviz import Digraph # type: ignore tasks: Union[List[Hash], FrozenSet[Hash]] dot = Digraph(*args, **kwargs) for child, parents in self._graph.deps.items(): task_obj = self._tasks[child] dot.node(child, repr(Literal(task_obj)), color=STATE_COLORS[task_obj.state]) for parent in parents: dot.edge(child, parent) for origin, tasks in self._graph.side_effects.items(): for task in tasks: dot.edge(origin, task, style='dotted') for target, tasks in self._graph.backflow.items(): for task in tasks: dot.edge( task, target, style='tapered', penwidth='7', dir='back', arrowtail='none', ) return dot @classmethod def active(cls) -> 'Session': """Return a currently active session.""" session = _active_session.get() if session is None: raise MonaError('No active session') return session PK!x x mona/table.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from itertools import chain, starmap from typing import Any, Callable, List, Tuple, Union __all__ = () class lenstr(str): # noqa: N801 def __new__(cls, s: Any, len: int) -> str: return str.__new__(cls, s) # type: ignore def __init__(self, s: Any, len: int) -> None: self._len = len def __len__(self) -> int: return self._len def align(s: str, align: str, width: int) -> str: l = len(s) if l >= width: return s if align == '<': s = s + (width - l) * ' ' elif align == '>': s = (width - l) * ' ' + s elif align == '|': s = (-(l - width) // 2) * ' ' + s + ((width - l) // 2) * ' ' return s class Table: def __init__(self, **kwargs: Any) -> None: self._rows: List[Tuple[bool, Tuple[str, ...]]] = [] self.set_format(**kwargs) def add_row(self, *row: str, free: bool = False) -> None: self._rows.append((free, row)) def set_format( self, sep: Union[str, List[str]] = ' ', align: Union[str, List[str]] = '>', indent: str = '', ) -> None: self._sep = sep self._align = align self._indent = indent def sort( self, key: Callable[[Tuple[object, ...]], object] = lambda x: x[0], **kwargs: Any, ) -> None: self._rows.sort(key=lambda x: key(x[1]), **kwargs) def __str__(self) -> str: col_nums = [len(row) for free, row in self._rows if not free] if len(set(col_nums)) != 1: raise ValueError(f'Unequal column lengths: {col_nums}') col_num = col_nums[0] cells_widths = [ [len(cell) for cell in row] for free, row in self._rows if not free ] col_widths = [max(cws) for cws in zip(*cells_widths)] if isinstance(self._sep, list): seps = self._sep else: seps = (col_num - 1) * [self._sep] seps += [''] if isinstance(self._align, list): aligns = self._align else: aligns = col_num * [self._align] lines: List[str] = [] for free, row in self._rows: if free: lines += row[0] else: cells = starmap(align, zip(row, aligns, col_widths)) lines.append( self._indent + ''.join(chain.from_iterable(zip(cells, seps))).rstrip() ) return '\n'.join(lines) PK!3U** mona/tasks.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import inspect import json import logging import pickle from abc import abstractmethod from typing import ( Awaitable, Callable, Dict, Iterable, Optional, Tuple, TypeVar, Union, cast, ) from .errors import CompositeError, FutureError, TaskError from .futures import Future, State from .hashing import ( Composite, Hash, Hashed, HashedComposite, HashedCompositeLike, HashResolver, ) from .pyhash import hash_function from .utils import Empty, Maybe, get_fullname, import_fullname, swap_type __all__ = ['ensure_hashed', 'maybe_hashed'] log = logging.getLogger(__name__) _T = TypeVar('_T') _T_co = TypeVar('_T_co', covariant=True) _U = TypeVar('_U') Corofunc = Callable[..., Awaitable[_T]] def ensure_hashed(obj: object) -> Hashed[object]: """Turn any object into a Hashed object. Return Hashed objects without change. Wraps composites into a TaskComposite or a HashedComposite. Raises InvalidJSONObject when not possible. """ obj = swap_type(obj, TaskComposite.type_swaps) if isinstance(obj, Hashed): return obj jsonstr, components = TaskComposite.parse_object(obj) if any(isinstance(comp, HashedFuture) for comp in components): return TaskComposite(jsonstr, components) return HashedComposite(jsonstr, components) def maybe_hashed(obj: object) -> Optional[Hashed[object]]: """Turn any object into a Hashed object or None if not hashable.""" try: return ensure_hashed(obj) except CompositeError: return None # Although this class could be hashable in principle, this would require # dispatching all futures via a session in the same way that tasks are. # See test_identical_futures() for an example of what wouldn't work. class HashedFuture(Hashed[_T_co], Future): """Represents a hashed future. Inherits abstract methods spec() and label() from Hashed, implements abstract property value and adds abstract method result(). """ @property @abstractmethod def spec(self) -> bytes: ... @property @abstractmethod def label(self) -> str: ... @abstractmethod def result(self) -> _T_co: ... @property def value(self) -> _T_co: if self.done(): return self.result() raise FutureError(f'Not done: {self!r}', self) def default_result(self) -> _T_co: raise FutureError(f'No default: {self!r}', self) @property def value_or_default(self) -> _T_co: if self.done(): return self.result() return self.default_result() def __repr__(self) -> str: return f'<{self.__class__.__name__} {self} state={self.state.name}>' class Task(HashedFuture[_T_co]): def __init__( self, corofunc: Corofunc[_T_co], *args: object, label: str = None, default: Maybe[_T_co] = Empty._, rule: str = None, ) -> None: self._corofunc = corofunc self._args = tuple(map(ensure_hashed, args)) Hashed.__init__(self) Future.__init__( self, (arg for arg in self._args if isinstance(arg, HashedFuture)) ) self._default = default if label: self._label = label else: arg_list = ', '.join(a.label for a in self._args) arg_list = arg_list if len(arg_list) < 50 else '...' self._label = f'{self._corofunc.__qualname__}({arg_list})' self._result: Union[_T_co, Hashed[_T_co], Empty] = Empty._ self._storage: Dict[str, object] = {} self._rule = rule @property def spec(self) -> bytes: return json.dumps( [ get_fullname(self._corofunc), hash_function(self._corofunc), *(fut.hashid for fut in self._args), ] ).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'Task[_T_co]': rule_name: str corohash: Hash arg_hashes: Tuple[Hash, ...] rule_name, corohash, *arg_hashes = json.loads(spec) corofunc: Corofunc[_T_co] = getattr(import_fullname(rule_name), 'corofunc') assert inspect.iscoroutinefunction(corofunc) assert hash_function(corofunc) == corohash args = (resolve(h) for h in arg_hashes) return cls(corofunc, *args) @property def label(self) -> str: return self._label def result(self) -> _T_co: return self.resolve(lambda res: res.value) @property def corofunc(self) -> Corofunc[_T_co]: return self._corofunc @property def args(self) -> Tuple[Hashed[object], ...]: return self._args @property def rule(self) -> Optional[str]: return self._rule @property def storage(self) -> Dict[str, object]: return self._storage def __getitem__(self, key: object) -> 'TaskComponent[object]': return self.get(key) def get( self, key: object, default: Maybe[object] = Empty._ ) -> 'TaskComponent[object]': return TaskComponent(self, [key], default) def resolve( self, handler: Callable[[Hashed[_T_co]], _U] = None ) -> Union[_U, _T_co]: if isinstance(self._result, Empty): raise TaskError(f'Has not run: {self!r}', self) if not isinstance(self._result, Hashed): return self._result handler = handler or (lambda x: x) # type: ignore return handler(self._result) # type: ignore def default_result(self) -> _T_co: if not isinstance(self._default, Empty): return self._default if isinstance(self._result, HashedFuture): return cast(HashedFuture[_T_co], self._result).default_result() raise TaskError(f'Has no defualt: {self!r}', self) def metadata(self) -> Optional[bytes]: return pickle.dumps((self._default, self._label, self._rule)) def set_metadata(self, metadata: bytes) -> None: self._default, self._label, self._rule = pickle.loads(metadata) def set_running(self) -> None: assert self._state is State.READY self._state = State.RUNNING def set_error(self) -> None: assert self._state is State.RUNNING self._state = State.ERROR def set_has_run(self) -> None: assert self._state is State.RUNNING self._state = State.HAS_RUN def set_result(self, result: Union[_T_co, Hashed[_T_co]]) -> None: assert self._state is State.HAS_RUN assert not isinstance(result, HashedFuture) or result.done() self._result = result self.set_done() def set_future_result(self, result: HashedFuture[_T_co]) -> None: assert self.state is State.HAS_RUN assert not result.done() self._state = State.AWAITING self._result = result def future_result(self) -> HashedFuture[_T_co]: if self._state < State.AWAITING: raise TaskError(f'Do not have future: {self!r}', self) if self._state > State.AWAITING: raise TaskError(f'Already done: {self!r}', self) assert isinstance(self._result, HashedFuture) return self._result def call(self) -> _T_co: return asyncio.run(self.call_async()) async def call_async(self) -> _T_co: args = [ arg.value_or_default if isinstance(arg, HashedFuture) else arg.value for arg in self.args ] return await self._corofunc(*args) class TaskComponent(HashedFuture[_T_co]): def __init__( self, task: Task[object], keys: Iterable[object], default: Maybe[_T_co] = Empty._, ) -> None: self._task = task self._keys = list(keys) Hashed.__init__(self) Future.__init__(self, [cast(HashedFuture[object], task)]) self._default = default self._label = ''.join([self._task.label, *(f'[{k!r}]' for k in self._keys)]) self.add_ready_callback(lambda self: self.set_done()) @property def spec(self) -> bytes: return json.dumps([self._task.hashid, *self._keys]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'TaskComponent[_T_co]': task_hash: Hash keys: Tuple[object, ...] task_hash, *keys = json.loads(spec) task = cast(Task[object], resolve(task_hash)) return cls(task, keys) @property def label(self) -> str: return self._label @property def components(self) -> Iterable[Hashed[object]]: return (self._task,) def result(self) -> _T_co: return self.resolve(lambda task: task.result()) def __getitem__(self, key: object) -> 'TaskComponent[object]': return self.get(key) def get( self, key: object, default: Maybe[object] = Empty._ ) -> 'TaskComponent[object]': return TaskComponent(self._task, self._keys + [key], default) @property def task(self) -> Task[object]: return self._task def resolve(self, handler: Callable[[Task[object]], object]) -> _T_co: obj = handler(self._task) for key in self._keys: obj = obj[key] # type: ignore return cast(_T_co, obj) def default_result(self) -> _T_co: if not isinstance(self._default, Empty): return self._default return self.resolve(lambda task: task.default_result()) def metadata(self) -> Optional[bytes]: return pickle.dumps(self._default) def set_metadata(self, metadata: bytes) -> None: self._default = pickle.loads(metadata) # the semantics may imply that the component is taken immediately after # execution, but it is only taken by the child task, so that if the component # does not exist, the exception is raised only later class TaskComposite(HashedCompositeLike, HashedFuture[Composite]): # type: ignore def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: components = list(components) futures = [comp for comp in components if isinstance(comp, HashedFuture)] assert futures Future.__init__(self, futures) HashedCompositeLike.__init__(self, jsonstr, components) self.add_ready_callback(lambda self: self.set_done()) # override abstract property in HashedCompositeLike value = HashedFuture.value # type: ignore def result(self) -> Composite: return self.resolve(lambda comp: comp.value) def default_result(self) -> Composite: return self.resolve( lambda comp: cast(object, comp.value_or_default) if isinstance(comp, HashedFuture) else comp.value ) PK! I\aa mona/utils.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import importlib import os import re import stat from datetime import datetime from enum import Enum from typing import ( Any, Callable, Dict, Iterable, List, Optional, Pattern, Tuple, Type, TypeVar, Union, ) __all__ = ['Empty'] _T = TypeVar('_T') _V = TypeVar('_V') Maybe = Union[_T, 'Empty'] Pathable = Union[str, 'os.PathLike[str]'] TypeSwaps = Dict[Type[object], Callable[[Any], object]] # Ideally Empty.EMPTY could be used directly, but mypy doesn't understand that # yet, so isisntance() it is. class Empty(Enum): """Absence of a value.""" _ = 0 def get_fullname(obj: Union[Callable[..., object], Type[object]]) -> str: return f'{obj.__module__}:{obj.__qualname__}' def import_fullname(fullname: str) -> object: module_name, qualname = fullname.split(':') module = importlib.import_module(module_name) return getattr(module, qualname) # type: ignore def shorten_text(s: Union[str, bytes], n: int) -> str: if len(s) > n: s = s[: n - 3] shortened = True else: shortened = False if isinstance(s, bytes): try: text = s.decode() except UnicodeDecodeError: return '' else: text = s return f'{text.rstrip()}...' if shortened else text class Literal(str): def __repr__(self) -> str: return str.__repr__(self)[1:-1] def swap_type(o: _T, swaps: TypeSwaps) -> object: if o.__class__ in swaps: return swaps[o.__class__](o) return o # TODO ignore existing permissions def make_executable(path: Pathable) -> None: st = os.stat(path) os.chmod(path, st.st_mode | stat.S_IEXEC) def make_nonwritable(path: Pathable) -> None: os.chmod( path, stat.S_IMODE(os.lstat(path).st_mode) & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH), ) def make_writable(path: Pathable) -> None: os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) | stat.S_IWUSR) def get_timestamp() -> str: return datetime.now().isoformat(timespec='seconds') def call_if(cond: bool, func: Callable[..., None], *args: Any, **kwargs: Any) -> None: if cond: func(*args, **kwargs) def split( iterable: Iterable[_T], first: Callable[[_T], bool] ) -> Tuple[List[_T], List[_T]]: left: List[_T] = [] right: List[_T] = [] for item in iterable: (left if first(item) else right).append(item) return left, right def groupby(iterable: Iterable[_T], key: Callable[[_T], _V]) -> Dict[_V, List[_T]]: groups: Dict[_V, List[_T]] = {} for x in iterable: groups.setdefault(key(x), []).append(x) return groups _regexes: Dict[str, Pattern[str]] = {} def match_glob(path: str, pattern: str) -> Optional[str]: regex = _regexes.get(pattern) if not regex: regex = re.compile( pattern.replace('(', r'\(') .replace(')', r'\)') .replace('?', '[^/]') .replace('<>', '([^/]*)') .replace('<', '(') .replace('>', ')') .replace('{', '(?:') .replace('}', ')') .replace(',', '|') .replace('**', r'\\') .replace('*', '[^/]*') .replace(r'\\', '.*') + '$' ) _regexes[pattern] = regex m = regex.match(path) if not m: return None for group in m.groups(): pattern = re.sub(r'<.*?>', group, pattern, 1) return pattern PK!Hޯ#%%mona-0.2.4.dist-info/entry_points.txtN+I/N.,()Kz9V@PK!xFVAVAmona-0.2.4.dist-info/LICENSEMozilla Public License Version 2.0 ================================== 1. Definitions -------------- 1.1. "Contributor" means each individual or legal entity that creates, contributes to the creation of, or owns Covered Software. 1.2. "Contributor Version" means the combination of the Contributions of others (if any) used by a Contributor and that particular Contributor's Contribution. 1.3. "Contribution" means Covered Software of a particular Contributor. 1.4. "Covered Software" means Source Code Form to which the initial Contributor has attached the notice in Exhibit A, the Executable Form of such Source Code Form, and Modifications of such Source Code Form, in each case including portions thereof. 1.5. "Incompatible With Secondary Licenses" means (a) that the initial Contributor has attached the notice described in Exhibit B to the Covered Software; or (b) that the Covered Software was made available under the terms of version 1.1 or earlier of the License, but not also under the terms of a Secondary License. 1.6. "Executable Form" means any form of the work other than Source Code Form. 1.7. "Larger Work" means a work that combines Covered Software with other material, in a separate file or files, that is not Covered Software. 1.8. "License" means this document. 1.9. "Licensable" means having the right to grant, to the maximum extent possible, whether at the time of the initial grant or subsequently, any and all of the rights conveyed by this License. 1.10. "Modifications" means any of the following: (a) any file in Source Code Form that results from an addition to, deletion from, or modification of the contents of Covered Software; or (b) any new file in Source Code Form that contains any Covered Software. 1.11. "Patent Claims" of a Contributor means any patent claim(s), including without limitation, method, process, and apparatus claims, in any patent Licensable by such Contributor that would be infringed, but for the grant of the License, by the making, using, selling, offering for sale, having made, import, or transfer of either its Contributions or its Contributor Version. 1.12. "Secondary License" means either the GNU General Public License, Version 2.0, the GNU Lesser General Public License, Version 2.1, the GNU Affero General Public License, Version 3.0, or any later versions of those licenses. 1.13. "Source Code Form" means the form of the work preferred for making modifications. 1.14. "You" (or "Your") means an individual or a legal entity exercising rights under this License. For legal entities, "You" includes any entity that controls, is controlled by, or is under common control with You. For purposes of this definition, "control" means (a) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (b) ownership of more than fifty percent (50%) of the outstanding shares or beneficial ownership of such entity. 2. License Grants and Conditions -------------------------------- 2.1. Grants Each Contributor hereby grants You a world-wide, royalty-free, non-exclusive license: (a) under intellectual property rights (other than patent or trademark) Licensable by such Contributor to use, reproduce, make available, modify, display, perform, distribute, and otherwise exploit its Contributions, either on an unmodified basis, with Modifications, or as part of a Larger Work; and (b) under Patent Claims of such Contributor to make, use, sell, offer for sale, have made, import, and otherwise transfer either its Contributions or its Contributor Version. 2.2. Effective Date The licenses granted in Section 2.1 with respect to any Contribution become effective for each Contribution on the date the Contributor first distributes such Contribution. 2.3. Limitations on Grant Scope The licenses granted in this Section 2 are the only rights granted under this License. No additional rights or licenses will be implied from the distribution or licensing of Covered Software under this License. Notwithstanding Section 2.1(b) above, no patent license is granted by a Contributor: (a) for any code that a Contributor has removed from Covered Software; or (b) for infringements caused by: (i) Your and any other third party's modifications of Covered Software, or (ii) the combination of its Contributions with other software (except as part of its Contributor Version); or (c) under Patent Claims infringed by Covered Software in the absence of its Contributions. This License does not grant any rights in the trademarks, service marks, or logos of any Contributor (except as may be necessary to comply with the notice requirements in Section 3.4). 2.4. Subsequent Licenses No Contributor makes additional grants as a result of Your choice to distribute the Covered Software under a subsequent version of this License (see Section 10.2) or under the terms of a Secondary License (if permitted under the terms of Section 3.3). 2.5. Representation Each Contributor represents that the Contributor believes its Contributions are its original creation(s) or it has sufficient rights to grant the rights to its Contributions conveyed by this License. 2.6. Fair Use This License is not intended to limit any rights You have under applicable copyright doctrines of fair use, fair dealing, or other equivalents. 2.7. Conditions Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in Section 2.1. 3. Responsibilities ------------------- 3.1. Distribution of Source Form All distribution of Covered Software in Source Code Form, including any Modifications that You create or to which You contribute, must be under the terms of this License. You must inform recipients that the Source Code Form of the Covered Software is governed by the terms of this License, and how they can obtain a copy of this License. You may not attempt to alter or restrict the recipients' rights in the Source Code Form. 3.2. Distribution of Executable Form If You distribute Covered Software in Executable Form then: (a) such Covered Software must also be made available in Source Code Form, as described in Section 3.1, and You must inform recipients of the Executable Form how they can obtain a copy of such Source Code Form by reasonable means in a timely manner, at a charge no more than the cost of distribution to the recipient; and (b) You may distribute such Executable Form under the terms of this License, or sublicense it under different terms, provided that the license for the Executable Form does not attempt to limit or alter the recipients' rights in the Source Code Form under this License. 3.3. Distribution of a Larger Work You may create and distribute a Larger Work under terms of Your choice, provided that You also comply with the requirements of this License for the Covered Software. If the Larger Work is a combination of Covered Software with a work governed by one or more Secondary Licenses, and the Covered Software is not Incompatible With Secondary Licenses, this License permits You to additionally distribute such Covered Software under the terms of such Secondary License(s), so that the recipient of the Larger Work may, at their option, further distribute the Covered Software under the terms of either this License or such Secondary License(s). 3.4. Notices You may not remove or alter the substance of any license notices (including copyright notices, patent notices, disclaimers of warranty, or limitations of liability) contained within the Source Code Form of the Covered Software, except that You may alter any license notices to the extent required to remedy known factual inaccuracies. 3.5. Application of Additional Terms You may choose to offer, and to charge a fee for, warranty, support, indemnity or liability obligations to one or more recipients of Covered Software. However, You may do so only on Your own behalf, and not on behalf of any Contributor. You must make it absolutely clear that any such warranty, support, indemnity, or liability obligation is offered by You alone, and You hereby agree to indemnify every Contributor for any liability incurred by such Contributor as a result of warranty, support, indemnity or liability terms You offer. You may include additional disclaimers of warranty and limitations of liability specific to any jurisdiction. 4. Inability to Comply Due to Statute or Regulation --------------------------------------------------- If it is impossible for You to comply with any of the terms of this License with respect to some or all of the Covered Software due to statute, judicial order, or regulation then You must: (a) comply with the terms of this License to the maximum extent possible; and (b) describe the limitations and the code they affect. Such description must be placed in a text file included with all distributions of the Covered Software under this License. Except to the extent prohibited by statute or regulation, such description must be sufficiently detailed for a recipient of ordinary skill to be able to understand it. 5. Termination -------------- 5.1. The rights granted under this License will terminate automatically if You fail to comply with any of its terms. However, if You become compliant, then the rights granted under this License from a particular Contributor are reinstated (a) provisionally, unless and until such Contributor explicitly and finally terminates Your grants, and (b) on an ongoing basis, if such Contributor fails to notify You of the non-compliance by some reasonable means prior to 60 days after You have come back into compliance. Moreover, Your grants from a particular Contributor are reinstated on an ongoing basis if such Contributor notifies You of the non-compliance by some reasonable means, this is the first time You have received notice of non-compliance with this License from such Contributor, and You become compliant prior to 30 days after Your receipt of the notice. 5.2. If You initiate litigation against any entity by asserting a patent infringement claim (excluding declaratory judgment actions, counter-claims, and cross-claims) alleging that a Contributor Version directly or indirectly infringes any patent, then the rights granted to You by any and all Contributors for the Covered Software under Section 2.1 of this License shall terminate. 5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user license agreements (excluding distributors and resellers) which have been validly granted by You or Your distributors under this License prior to termination shall survive termination. ************************************************************************ * * * 6. Disclaimer of Warranty * * ------------------------- * * * * Covered Software is provided under this License on an "as is" * * basis, without warranty of any kind, either expressed, implied, or * * statutory, including, without limitation, warranties that the * * Covered Software is free of defects, merchantable, fit for a * * particular purpose or non-infringing. The entire risk as to the * * quality and performance of the Covered Software is with You. * * Should any Covered Software prove defective in any respect, You * * (not any Contributor) assume the cost of any necessary servicing, * * repair, or correction. This disclaimer of warranty constitutes an * * essential part of this License. No use of any Covered Software is * * authorized under this License except under this disclaimer. * * * ************************************************************************ ************************************************************************ * * * 7. Limitation of Liability * * -------------------------- * * * * Under no circumstances and under no legal theory, whether tort * * (including negligence), contract, or otherwise, shall any * * Contributor, or anyone who distributes Covered Software as * * permitted above, be liable to You for any direct, indirect, * * special, incidental, or consequential damages of any character * * including, without limitation, damages for lost profits, loss of * * goodwill, work stoppage, computer failure or malfunction, or any * * and all other commercial damages or losses, even if such party * * shall have been informed of the possibility of such damages. This * * limitation of liability shall not apply to liability for death or * * personal injury resulting from such party's negligence to the * * extent applicable law prohibits such limitation. Some * * jurisdictions do not allow the exclusion or limitation of * * incidental or consequential damages, so this exclusion and * * limitation may not apply to You. * * * ************************************************************************ 8. Litigation ------------- Any litigation relating to this License may be brought only in the courts of a jurisdiction where the defendant maintains its principal place of business and such litigation shall be governed by laws of that jurisdiction, without reference to its conflict-of-law provisions. Nothing in this Section shall prevent a party's ability to bring cross-claims or counter-claims. 9. Miscellaneous ---------------- This License represents the complete agreement concerning the subject matter hereof. If any provision of this License is held to be unenforceable, such provision shall be reformed only to the extent necessary to make it enforceable. Any law or regulation which provides that the language of a contract shall be construed against the drafter shall not be used to construe this License against a Contributor. 10. Versions of the License --------------------------- 10.1. New Versions Mozilla Foundation is the license steward. Except as provided in Section 10.3, no one other than the license steward has the right to modify or publish new versions of this License. Each version will be given a distinguishing version number. 10.2. Effect of New Versions You may distribute the Covered Software under the terms of the version of the License under which You originally received the Covered Software, or under the terms of any subsequent version published by the license steward. 10.3. Modified Versions If you create software not governed by this License, and you want to create a new license for such software, you may create and use a modified version of this License if you rename the license and remove any references to the name of the license steward (except to note that such modified license differs from this License). 10.4. Distributing Source Code Form that is Incompatible With Secondary Licenses If You choose to distribute Source Code Form that is Incompatible With Secondary Licenses under the terms of this version of the License, the notice described in Exhibit B of this License must be attached. Exhibit A - Source Code Form License Notice ------------------------------------------- This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. If it is not possible or desirable to put the notice in a particular file, then You may include the notice in a location (such as a LICENSE file in a relevant directory) where a recipient would be likely to look for such a notice. You may add additional accurate notices of copyright ownership. Exhibit B - "Incompatible With Secondary Licenses" Notice --------------------------------------------------------- This Source Code Form is "Incompatible With Secondary Licenses", as defined by the Mozilla Public License, v. 2.0. PK!H+dUTmona-0.2.4.dist-info/WHEEL HM K-*ϳR03rOK-J,/R(O-)T0343 /, (-JLR()*M IL*4KM̫PK!H-Fmona-0.2.4.dist-info/METADATAVmS8_һi<۱^ঽ6d:TXĖ\I V If >ϾH{E5ΰ?T*&x0>⒦{0 {M]X.St RX M%{EI @;׺Ri͘IHDXQe١{j䬾mnJ8b7^$u_%bҰE.>\4(RzY'lF=Xh0)0I," =ϼˆbαFU3` u LS޳9hư;[4Z! mmUCʍ{G㒖=Xl]=A}yt-3霮28_@ &ĸjҍZ^ql+X)#V$V)L A%s,udL l_ܦi )*(vl[ E#mlpU=iAC;j`u(IM>Ksprz3e7mS Z1!J5:#[MX9cV6~]P>ޞFCH\̜i(,O`&,ӲKZM@(==60iȚPqoZw~^{ԝ:Aw]0$G zioc'0&N z}rdzz&51e {H6l"I&^n"\&)R@AJZK|ߌZQIH~՚{2I7OMP˚ \h ݺ٦,ņ[=אH }- 0~iS3!]@ٍ-l|{ϕkN5PK!H)ɪ mona-0.2.4.dist-info/RECORDuǒ< b@x+hCވ۪_HN毦kðh9 O`zDNխSԨzH_YKޞ WjIaD}F `$pXl±DMZP9%KTҲ)VT cHh 9 NA31r/0ё]i+,p.LJt0UC?5ylc^$#-97iq8Ofef^oBRP(Jrq鍩cܵx>v̇$CkƏ"C /^ٻ"܅QI-&Wjzy25F0?e^/ξs1N2H̷X pR[9xȘC!?r7dyAU"|)jGz̆WSg] @ Q\gq`'tWtFs@vUmzNQozUqőRNܯdtҮ֤# t)⫗D=~ 4bfôz['ά'?H{4gd)bv+#B{}8v{m¹'O@,pQV Few~h]"yB1ȀGud2$d#DE]>̬j :&!-~gQ/,֛C'n4NflH\=U`浏^4w=@!ĈKbM2+zgyƫ%b0G2TAv˳MQ]iyh(˳-B'2oZ/| ھ?&ٱ9ߠ# uMf KY\iVqpEDn4S6ME׾ĜR"{q &K'GٗRxU1ǽ_#