PK!Ĩ>nnmona/__init__.pyfrom .rules import Rule from .sessions import Session from .runners import run_process, run_shell, run_thread PK!į**mona/cli/__init__.pyfrom .app import App from .cli import cli PK!r  mona/cli/app.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import logging from pathlib import Path from typing import Dict, Any import toml from ..sessions import Session from ..plugins import Parallel, TmpdirManager, FileManager, Cache from ..utils import get_timestamp, Pathable log = logging.getLogger(__name__) class App: MONADIR = '.mona' TMPDIR = 'tmpdir' FILES = 'files' CACHE = 'cache.db' LAST_ENTRY = 'LAST_ENTRY' def __init__(self, monadir: Pathable = None) -> None: monadir = monadir or os.environ.get('MONA_DIR') or App.MONADIR self._monadir = Path(monadir).resolve() self._config: Dict[str, Any] = {} for path in [ Path('~/.config/mona/config.toml').expanduser(), Path('mona.toml'), self._monadir / 'config.toml', ]: if path.exists(): with path.open() as f: self._config.update(toml.load(f)) def session(self, warn: bool = False, **kwargs: Any) -> Session: sess = Session(warn=warn) self(sess, **kwargs) return sess @property def last_entry(self) -> str: return (self._monadir / App.LAST_ENTRY).read_text() @last_entry.setter def last_entry(self, entry: str) -> None: (self._monadir / App.LAST_ENTRY).write_text(entry) def __call__( self, sess: Session, ncores: int = None, write: str = 'eager', full_restore: bool = False, ) -> None: self._plugins = { 'parallel': Parallel(ncores), 'tmpdir': TmpdirManager(self._monadir / App.TMPDIR), 'files': FileManager(self._monadir / App.FILES), 'cache': Cache.from_path( self._monadir / App.CACHE, write=write, full_restore=full_restore ), } for plugin in self._plugins.values(): plugin(sess) def ensure_monadir(self) -> None: if self._monadir.is_dir(): log.info(f'Already initialized in {self._monadir}.') return log.info(f'Initializing an empty repository in {self._monadir}.') self._monadir.mkdir() try: cache_home = Path(self._config['cache']) except KeyError: for dirname in [App.TMPDIR, App.FILES]: (self._monadir / dirname).mkdir() else: ts = get_timestamp() cachedir = cache_home / f'{Path.cwd().name}_{ts}' cachedir.mkdir() for dirname in [App.TMPDIR, App.FILES]: (cachedir / dirname).mkdir() (self._monadir / dirname).symlink_to(cachedir / dirname) PK! rmona/cli/cli.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import logging import tempfile from pathlib import Path from typing import List, Optional, Any, cast, Dict, Tuple, Sequence import click from ..tasks import Task from ..futures import STATE_COLORS from ..utils import import_fullname, groupby from ..files import File from ..rules.dirtask import checkout_files, DirtaskInput from .glob import match_glob from .app import App from .table import Table, lenstr __version__ = '0.1.0' logging.basicConfig( style='{', format='[{asctime}.{msecs:03.0f}] {levelname}:{name}: {message}', datefmt='%H:%M:%S', ) log = logging.getLogger(__name__) log.setLevel(logging.INFO) logging.getLogger('mona').setLevel(int(os.environ.get('MONA_DEBUG', logging.INFO))) @click.group() @click.pass_context def cli(ctx: click.Context) -> None: ctx.ensure_object(App) @cli.command() @click.pass_obj def init(app: App) -> None: """Initialize a Git repository.""" app.ensure_monadir() class TaskFilter: def __init__(self, patterns: List[str] = None, no_path: bool = False) -> None: self._patterns = patterns or [] self._no_path = no_path def __call__(self, task: Task[Any]) -> bool: if self._no_path and task.label.startswith('/'): return False if self._patterns and not any( match_glob(task.label, patt) for patt in self._patterns ): return False return True class ExceptionBuffer: def __init__(self, maxerror: int = None) -> None: self._maxerror = maxerror self._n_errors = 0 def __call__(self, task: Task[Any], exc: Exception) -> bool: if self._maxerror is None: return False if self._n_errors == self._maxerror: log.warn('Maximum number of errors reached') self._n_errors += 1 if self._n_errors <= self._maxerror: return True return False @cli.command() @click.option('-p', '--pattern', multiple=True, help='Tasks to be executed') @click.option('-P', '--path', is_flag=True, help='Execute path-like tasks') @click.option('-j', '--cores', type=int, help='Number of cores') @click.option('-l', '--limit', type=int, help='Limit number of tasks to N') @click.option('--maxerror', type=int, help='Number of errors in row to quit') @click.argument('rulename', metavar='RULE') @click.pass_obj def run( app: App, pattern: List[str], cores: int, path: bool, limit: Optional[int], maxerror: int, rulename: str, ) -> None: """Run a given rule.""" rule = import_fullname(rulename) app.last_entry = rulename task_filter = TaskFilter(pattern, no_path=not path) exception_buffer = ExceptionBuffer(maxerror) with app.session(ncores=cores) as sess: sess.eval( rule(), exception_handler=exception_buffer, task_filter=task_filter, limit=limit, ) @cli.command() @click.option('-p', '--pattern', multiple=True, help='Patterns to be reported') @click.pass_obj def status(app: App, pattern: List[str]) -> None: """Print status of tasks.""" sess = app.session(warn=False, write='never', full_restore=True) rule = import_fullname(app.last_entry) ncols = len(STATE_COLORS) + 1 table = Table(align=['<', *(ncols * ['>'])], sep=[' ', *((ncols - 1) * ['/'])]) table.add_row('pattern', *(s.name.lower() for s in STATE_COLORS), 'all') with sess: rule() task_groups: Dict[str, List[Task[object]]] = {} all_tasks = list(sess.all_tasks()) for patt in pattern or ['**']: matched_any = False for task in all_tasks: matched = match_glob(task.label, patt) if matched: task_groups.setdefault(matched, []).append(task) matched_any = True if not matched_any: task_groups[patt] = [] for label, tasks in task_groups.items(): grouped = { state: group for state, group in groupby(tasks, key=lambda t: t.state).items() } counts: List[Tuple[int, Optional[str]]] = [ (len(grouped.get(state, [])), color) for state, color in STATE_COLORS.items() ] counts.append((len(tasks), None)) col_counts = [ lenstr(click.style(str(count), fg=color), len(str(count))) for count, color in counts ] table.add_row(label, *col_counts) click.echo(str(table)) @cli.command() @click.pass_obj def graph(app: App) -> None: """Open a pdf with the task graph.""" sess = app.session(warn=False, write='never', full_restore=True) rule = import_fullname(app.last_entry) with sess: rule() dot = sess.dot_graph() dot.render(tempfile.mkstemp()[1], view=True, cleanup=True, format='pdf') @cli.command() @click.option('-p', '--pattern', multiple=True, help='Tasks to be checked out') @click.option('--done', is_flag=True, help='Check out only finished tasks') @click.option('-c', '--copy', is_flag=True, help='Copy instead of symlinking') @click.pass_obj def checkout(app: App, pattern: List[str], done: bool, copy: bool) -> None: """Checkout path-labeled tasks into a directory tree.""" n_tasks = 0 sess = app.session(warn=False, write='never', full_restore=True) rule = import_fullname(app.last_entry) with sess: rule() for task in sess.all_tasks(): if task.label[0] != '/': continue if pattern and not any(match_glob(task.label, patt) for patt in pattern): continue if done and not task.done(): continue exe: Optional[File] paths: Sequence[DirtaskInput] if task.rule == 'dir_task': exe = cast(File, task.args[0].value) paths = cast(List[DirtaskInput], task.args[1].value) if task.done(): paths.extend(cast(Dict[str, File], task.result()).values()) elif task.rule == 'file_collection': exe = None paths = cast(List[File], task.args[0].value) root = Path(task.label[1:]) root.mkdir(parents=True, exist_ok=True) checkout_files(root, exe, paths, mutable=copy) n_tasks += 1 log.info(f'Checked out {n_tasks} tasks.') PK! &&mona/cli/glob.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import re from typing import Dict, Pattern, Optional _regexes: Dict[str, Pattern[str]] = {} def match_glob(path: str, pattern: str) -> Optional[str]: regex = _regexes.get(pattern) if not regex: regex = re.compile( pattern.replace('(', r'\(') .replace(')', r'\)') .replace('?', '[^/]') .replace('<>', '([^/]*)') .replace('<', '(') .replace('>', ')') .replace('{', '(?:') .replace('}', ')') .replace(',', '|') .replace('**', r'\\') .replace('*', '[^/]*') .replace(r'\\', '.*') + '$' ) _regexes[pattern] = regex m = regex.match(path) if not m: return None for group in m.groups(): pattern = re.sub(r'<.*?>', group, pattern, 1) return pattern PK!}] ] mona/cli/table.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from itertools import chain, starmap from typing import Callable, Any, List, Tuple, Union class lenstr(str): def __new__(cls, s: Any, len: int) -> str: return str.__new__(cls, s) # type: ignore def __init__(self, s: Any, len: int) -> None: self._len = len def __len__(self) -> int: return self._len def align(s: str, align: str, width: int) -> str: l = len(s) if l >= width: return s if align == '<': s = s + (width - l) * ' ' elif align == '>': s = (width - l) * ' ' + s elif align == '|': s = (-(l - width) // 2) * ' ' + s + ((width - l) // 2) * ' ' return s class Table: def __init__(self, **kwargs: Any) -> None: self._rows: List[Tuple[bool, Tuple[str, ...]]] = [] self.set_format(**kwargs) def add_row(self, *row: str, free: bool = False) -> None: self._rows.append((free, row)) def set_format( self, sep: Union[str, List[str]] = ' ', align: Union[str, List[str]] = '>', indent: str = '', ) -> None: self._sep = sep self._align = align self._indent = indent def sort( self, key: Callable[[Tuple[object, ...]], object] = lambda x: x[0], **kwargs: Any, ) -> None: self._rows.sort(key=lambda x: key(x[1]), **kwargs) def __str__(self) -> str: col_nums = [len(row) for free, row in self._rows if not free] if len(set(col_nums)) != 1: raise ValueError(f'Unequal column lengths: {col_nums}') col_num = col_nums[0] cells_widths = [ [len(cell) for cell in row] for free, row in self._rows if not free ] col_widths = [max(cws) for cws in zip(*cells_widths)] if isinstance(self._sep, list): seps = self._sep else: seps = (col_num - 1) * [self._sep] seps += [''] if isinstance(self._align, list): aligns = self._align else: aligns = col_num * [self._align] lines: List[str] = [] for free, row in self._rows: if free: lines += row[0] else: cells = starmap(align, zip(row, aligns, col_widths)) lines.append( self._indent + ''.join(chain.from_iterable(zip(cells, seps))).rstrip() ) return '\n'.join(lines) PK!mona/errors.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from typing import Any, TYPE_CHECKING if TYPE_CHECKING: from .futures import Future from .tasks import Task from .sessions import Session __version__ = '0.1.0' class MonaError(Exception): pass class FutureError(MonaError): def __init__(self, msg: str, fut: 'Future') -> None: super().__init__(msg) self.future = fut class TaskError(MonaError): def __init__(self, msg: str, task: 'Task[Any]') -> None: super().__init__(msg) self.task = task class CompositeError(MonaError): pass class SessionError(MonaError): def __init__(self, msg: str, sess: 'Session') -> None: super().__init__(msg) self.session = sess class InvalidInput(MonaError): pass class FilesError(MonaError): pass class HashingError(MonaError): pass PK!9d mona/files.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json from pathlib import Path from abc import ABC, abstractmethod from typing import Union, Optional, cast, Iterable, List, Callable, TypeVar from .sessions import Session from .rules import Rule from .hashing import Hash, Hashed, HashResolver, HashedBytes, HashedComposite from .utils import make_nonwritable, Pathable, shorten_text __version__ = '0.3.0' _R = TypeVar('_R', bound=Rule) # type: ignore def add_source(path: Pathable) -> Callable[[_R], _R]: def decorator(rule: _R) -> _R: rule.add_extra_arg(lambda: HashedFile(File.from_path(path))) return rule return decorator @Rule async def file_collection(files: List['File']) -> None: pass class FileManager(ABC): @abstractmethod def store_path(self, path: Path, *, keep: bool) -> 'Hash': ... @abstractmethod def store_bytes(self, content: bytes) -> 'Hash': ... @abstractmethod def get_bytes(self, content_hash: Hash) -> bytes: ... @abstractmethod def target_in(self, path: Path, content_hash: Hash, *, mutable: bool) -> None: ... @classmethod def active(cls) -> Optional['FileManager']: fmngr = Session.active().storage.get('file_manager') assert not fmngr or isinstance(fmngr, cls) return fmngr class File: def __init__(self, path: Path, content: Union[bytes, Hash]): assert not path.is_absolute() self._path = path self._content = content if not isinstance(content, bytes): fmngr = FileManager.active() assert fmngr self._fmngr = fmngr def __repr__(self) -> str: if isinstance(self._content, bytes): content = repr(shorten_text(self._content, 20)) else: content = self._content[:6] return f'' def __str__(self) -> str: return str(self._path) @property def stem(self) -> str: return self._path.stem @property def path(self) -> Path: return self._path @property def name(self) -> str: return self._path.name @property def content(self) -> Union[bytes, Hash]: return self._content def read_bytes(self) -> bytes: if isinstance(self._content, bytes): return self._content return self._fmngr.get_bytes(self._content) def read_text(self) -> str: return self.read_bytes().decode() def target_in(self, path: Path, *, mutable: bool = False) -> None: target = path / self._path if isinstance(self._content, bytes): target.write_bytes(self._content) if not mutable: make_nonwritable(target) else: self._fmngr.target_in(target, self._content, mutable=mutable) @classmethod def from_str(cls, path: Pathable, content: Union[str, bytes]) -> 'File': path = Path(path) if isinstance(content, str): content = content.encode() fmngr = FileManager.active() if fmngr: return cls(path, fmngr.store_bytes(content)) return cls(path, content) @classmethod def from_path( cls, path: Pathable, root: Union[str, Path] = None, *, keep: bool = True ) -> 'File': path = Path(path) relpath = path.relative_to(root) if root else path fmngr = FileManager.active() if fmngr: return cls(relpath, fmngr.store_path(path, keep=keep)) file = cls(relpath, path.read_bytes()) if not keep: path.unlink() return file class HashedFile(Hashed[File]): def __init__(self, file: File): self._path = file.path if isinstance(file.content, bytes): self._content: Optional[HashedBytes] = HashedBytes(file.content) self._content_hash = self._content.hashid else: self._content = None self._content_hash = file.content Hashed.__init__(self) @property def spec(self) -> bytes: return json.dumps([str(self._path), self._content_hash]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedFile': path, content_hash = json.loads(spec) path = Path(path) fmngr = FileManager.active() if fmngr: return cls(File(path, content_hash)) return cls(File(path, cast(HashedBytes, resolve(content_hash)).value)) @property def value(self) -> File: return File( self._path, self._content.value if self._content else self._content_hash ) @property def label(self) -> str: return f'./{self._path}' @property def components(self) -> Iterable['Hashed[object]']: if self._content: return (self._content,) return () HashedComposite.type_swaps[File] = HashedFile PK!~> > mona/futures.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging from enum import IntEnum from typing import Iterable, Set, Callable, List, TypeVar, NoReturn from typing_extensions import Final from .errors import MonaError log = logging.getLogger(__name__) _T = TypeVar('_T') _Fut = TypeVar('_Fut', bound='Future') Callback = Callable[[_T], None] class State(IntEnum): PENDING = 0 READY = 1 RUNNING = 2 ERROR = 3 HAS_RUN = 4 AWAITING = 5 DONE = 6 STATE_COLORS: Final = { State.PENDING: None, State.READY: 'magenta', State.RUNNING: 'yellow', State.ERROR: 'red', State.AWAITING: 'cyan', State.DONE: 'green', } class Future: def __init__(self: _Fut, parents: Iterable[_Fut]) -> None: self._parents = frozenset(parents) self._pending = {fut for fut in self._parents if not fut.done()} self._children: Set['Future'] = set() self._done_callbacks: List[Callback[_Fut]] = [] self._ready_callbacks: List[Callback[_Fut]] = [] self._registered = False self._state: State = State.PENDING if self._pending else State.READY def __getstate__(self) -> NoReturn: raise MonaError('Future objects cannot be pickled') @property def state(self) -> State: return self._state def done(self) -> bool: return self._state is State.DONE def add_child(self, fut: 'Future') -> None: assert not self.done() self._children.add(fut) def register(self: _Fut) -> None: if not self._registered: self._registered = True log.debug(f'registered: {self!r}') for fut in self._pending: fut.register() fut.add_child(self) def add_ready_callback(self: _Fut, callback: Callback[_Fut]) -> None: if self._state >= State.READY: callback(self) else: self._ready_callbacks.append(callback) def add_done_callback(self: _Fut, callback: Callback[_Fut]) -> None: assert not self.done() self._done_callbacks.append(callback) def parent_done(self: _Fut, fut: _Fut) -> None: assert self._state is State.PENDING self._pending.remove(fut) if not self._pending: self._state = State.READY log.debug(f'{self}: ready') for callback in self._ready_callbacks: callback(self) def set_done(self: _Fut) -> None: assert State.READY <= self._state < State.DONE self._state = State.DONE log.debug(f'{self}: done') for fut in self._children: fut.parent_done(self) for callback in self._done_callbacks: callback(self) PK!2fH   mona/graph.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio from enum import Enum from typing import ( TypeVar, Deque, Set, Callable, Iterable, MutableSequence, Dict, Awaitable, Container, Iterator, AsyncGenerator, Tuple, cast, Optional, Any, Union, NamedTuple, ) _T = TypeVar('_T') NodeScheduler = Callable[[_T, Callable[[_T], None]], None] NodeResult = Tuple[_T, Optional[Exception], Iterable[_T]] NodeExecuted = Callable[[NodeResult[_T]], None] NodeExecutor = Callable[[_T, NodeExecuted[_T]], Awaitable[None]] Priority = Tuple['Action', 'Action', 'Action'] def extend_from( src: Iterable[_T], seq: MutableSequence[_T], *, filter: Container[_T] ) -> None: seq.extend(x for x in src if x not in filter) class Action(Enum): RESULTS = 0 EXECUTE = 1 TRAVERSE = 2 def __repr__(self) -> str: return self.name class Step(NamedTuple): action: Action node: Optional[Any] # should be _T progress: Dict[str, int] class NodeException(NamedTuple): node: Any # should be _T exc: Exception default_priority = cast(Priority, tuple(Action)) # only limited override for use in traverse_async() class SetDeque(Deque[_T]): def __init__(self, *args: Any) -> None: super().__init__(*args) self._set: Set[_T] = set() def append(self, x: _T) -> None: if x not in self._set: self._set.add(x) super().append(x) def extend(self, xs: Iterable[_T]) -> None: for x in xs: self.append(x) def pop(self) -> _T: # type: ignore x = super().pop() self._set.remove(x) return x def popleft(self) -> _T: x = super().popleft() self._set.remove(x) return x async def traverse_async( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]], schedule: NodeScheduler[_T], execute: NodeExecutor[_T], depth: bool = False, priority: Priority = default_priority, ) -> AsyncGenerator[Union[Step, NodeException], bool]: """ Traverse a self-extending DAG, yield steps. :param start: Starting nodes :param edges_from: Returns nodes with incoming edge from the given node :param schedule: Schedule the given node for execution (not run on sentinels) :param execute: Execute the given node and return new generated nodes with incoming edge from it (run only on scheduled nodes) :param sentinel: Should traversal stop at the given node? :param depth: Traverse depth-first if true, breadth-first otherwise :param priority: Priorize steps in order """ visited: Set[_T] = set() to_visit, to_execute = SetDeque[_T](), Deque[_T]() done: 'asyncio.Queue[NodeResult[_T]]' = asyncio.Queue() executing, executed = 0, 0 actionable: Dict[Action, Callable[[], bool]] = { Action.RESULTS: lambda: not done.empty(), Action.EXECUTE: lambda: bool(to_execute), Action.TRAVERSE: lambda: bool(to_visit), } to_visit.extend(start) while True: for action in priority: if actionable[action](): break else: if executing == 0: break action = Action.RESULTS progress = { 'executing': executing - done.qsize(), 'to_execute': len(to_execute), 'to_visit': len(to_visit), 'with_result': done.qsize(), 'done': executed, 'visited': len(visited), } if action is Action.TRAVERSE: node = to_visit.pop() if depth else to_visit.popleft() visited.add(node) if not (yield Step(action, node, progress)): continue schedule(node, to_execute.append) extend_from(edges_from(node), to_visit, filter=visited) elif action is Action.RESULTS: yield Step(action, None, progress) node, exc, nodes = await done.get() if exc: yield NodeException(node, exc) extend_from(nodes, to_visit, filter=visited) executing -= 1 executed += 1 else: assert action is Action.EXECUTE node = to_execute.popleft() if not (yield Step(action, node, progress)): continue executing += 1 try: await execute(node, done.put_nowait) except Exception as exc: yield NodeException(node, exc) def traverse( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]], sentinel: Callable[[_T], bool] = None, depth: bool = False, ) -> Iterator[_T]: """Traverse a DAG, yield visited notes.""" visited: Set[_T] = set() queue = Deque[_T]() queue.extend(start) while queue: n = queue.pop() if depth else queue.popleft() visited.add(n) yield n if sentinel and sentinel(n): continue queue.extend(m for m in edges_from(n) if m not in visited) def traverse_id( start: Iterable[_T], edges_from: Callable[[_T], Iterable[_T]] ) -> Iterable[_T]: table: Dict[int, _T] = {} def ids_from(ns: Iterable[_T]) -> Iterable[int]: update = {id(n): n for n in ns} table.update(update) return update.keys() for n in traverse(ids_from(start), lambda n: ids_from(edges_from(table[n]))): yield table[n] PK!(%mona/hashing/__init__.pyfrom .hashing import ( Hash, Hashed, Composite, HashedCompositeLike, HashedComposite, HashedBytes, HashResolver, ) from .func import hash_function PK!إbRRmona/hashing/func.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import ast import sys import json import inspect from pathlib import Path from textwrap import dedent from itertools import dropwhile, chain from types import ModuleType, CodeType from typing import Callable, TypeVar, Any, Dict, Optional, cast from ..errors import HashingError, CompositeError from ..utils import get_fullname from .hashing import Hash, HashedComposite, hash_text _T = TypeVar('_T') # Travis duplicates some stdlib modules in virtualenv _stdlib_paths = [str(Path(m.__file__).parent) for m in [os, ast]] _cache: Dict[Callable[..., Any], Hash] = {} def is_stdlib(mod: ModuleType) -> bool: return any(mod.__file__.startswith(p) for p in _stdlib_paths) def version_of(mod: ModuleType) -> Optional[str]: parts = mod.__name__.split('.') for n in range(len(parts), 0, -1): mod = sys.modules['.'.join(parts[:n])] try: return cast(str, getattr(mod, '__version__')) except AttributeError: pass return None def hash_function(func: Callable[..., Any]) -> Hash: try: return _cache[func] except KeyError: pass ast_code = ast_code_of(func) hashed_globals = hashed_globals_of(func) spec = json.dumps({'ast_code': ast_code, 'globals': hashed_globals}, sort_keys=True) return _cache.setdefault(func, hash_text(spec)) def ast_code_of(func: Callable[..., Any]) -> str: lines = dedent(inspect.getsource(func)).split('\n') lines = list(dropwhile(lambda l: l[0] == '@', lines)) code = '\n'.join(lines) module = ast.parse(code) assert len(module.body) == 1 assert isinstance(module.body[0], (ast.AsyncFunctionDef, ast.FunctionDef)) for node in ast.walk(module): remove_docstring(node) func_node = module.body[0] func_node.name = '' # clear function's name return ast.dump(func_node, annotate_fields=False) def remove_docstring(node: ast.AST) -> None: classes = ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module if not isinstance(node, classes): return if not (node.body and isinstance(node.body[0], ast.Expr)): return docstr = node.body[0].value if isinstance(docstr, ast.Str): node.body.pop(0) def hashed_globals_of(func: Callable[..., Any]) -> Dict[str, str]: closure_vars = getclosurevars(func) items = chain(closure_vars.nonlocals.items(), closure_vars.globals.items()) hashed_globals: Dict[str, str] = {} for name, obj in items: if hasattr(obj, '_func_hash'): hashid = ( obj._func_hash() if getattr(obj, 'corofunc', None) is not func else 'self' ) hashed_globals[name] = f'func_hash:{hashid}' continue if inspect.isclass(obj) or inspect.isfunction(obj) or inspect.ismodule(obj): if inspect.ismodule(obj): mod = obj fullname = obj.__name__ else: mod = sys.modules[obj.__module__] fullname = get_fullname(obj) if is_stdlib(mod): hashed_globals[name] = f'{fullname}(stdlib)' continue version = version_of(mod) if version: hashed_globals[name] = f'{fullname}({version})' continue if inspect.isfunction(obj): hashid = hash_function(obj) if obj is not func else 'self' hashed_globals[name] = f'function:{hashid}' continue try: hashid = HashedComposite(*HashedComposite.parse_object(obj)).hashid except CompositeError: pass else: hashed_globals[name] = f'composite:{hashid}' continue raise HashingError(f'In {func} cannot hash global {name} = {obj!r}') return hashed_globals # fixed function from stdlib which parses closures in code consts as well # TODO submit cpython fix def getclosurevars(func: Callable[..., Any]) -> inspect.ClosureVars: code = func.__code__ nonlocal_vars = { name: cell.cell_contents for name, cell in zip(code.co_freevars, func.__closure__ or ()) # type: ignore } global_ns = func.__globals__ # type: ignore builtin_ns = global_ns['__builtins__'] if inspect.ismodule(builtin_ns): builtin_ns = builtin_ns.__dict__ global_vars = {} builtin_vars = {} unbound_names = set() codes = [code] while codes: code = codes.pop() for const in code.co_consts: if isinstance(const, CodeType): codes.append(const) for name in code.co_names: try: global_vars[name] = global_ns[name] except KeyError: try: builtin_vars[name] = builtin_ns[name] except KeyError: unbound_names.add(name) return inspect.ClosureVars(nonlocal_vars, global_vars, builtin_vars, unbound_names) PK!))mona/hashing/hashing.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json import hashlib from abc import ABC, abstractmethod from typing import ( NewType, Union, Generic, TypeVar, Dict, cast, Iterable, Set, Callable, Tuple, Optional, ) from ..json import ClassJSONEncoder, ClassJSONDecoder, JSONValue, validate_json from ..utils import Literal, shorten_text, TypeSwaps, swap_type __version__ = '0.1.0' _T_co = TypeVar('_T_co', covariant=True) Hash = NewType('Hash', str) # symbolic type for a JSON-like container including custom classes Composite = NewType('Composite', object) HashResolver = Callable[[Hash], 'Hashed[object]'] def hash_text(text: Union[str, bytes]) -> Hash: if isinstance(text, str): text = text.encode() return Hash(hashlib.sha1(text).hexdigest()) class Hashed(ABC, Generic[_T_co]): def __init__(self) -> None: assert not hasattr(self, '_hashid') self._hashid = hash_text(self.spec) @property @abstractmethod def spec(self) -> bytes: ... @classmethod @abstractmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'Hashed[_T_co]': ... @property @abstractmethod def label(self) -> str: ... @property @abstractmethod def value(self) -> _T_co: ... @property def components(self) -> Iterable['Hashed[object]']: """To be overwritten by derived classes when the object contains references to other Hashed objects. """ return () def metadata(self) -> Optional[bytes]: return None def set_metadata(self, metadata: bytes) -> None: raise NotImplementedError def __str__(self) -> str: return f'{self.tag}: {self.label}' def __repr__(self) -> str: return f'<{self.__class__.__name__} {self}>' @property def hashid(self) -> Hash: return self._hashid @property def tag(self) -> str: return self.hashid[:6] class HashedBytes(Hashed[bytes]): def __init__(self, content: bytes) -> None: self._content = content Hashed.__init__(self) self._label = repr(shorten_text(content, 20)) @property def spec(self) -> bytes: return self.value @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedBytes': return cls(spec) @property def label(self) -> str: return self._label @property def value(self) -> bytes: return self._content class HashedCompositeLike(Hashed[Composite]): type_swaps: TypeSwaps = {bytes: HashedBytes} def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: self._jsonstr = jsonstr self._components = {comp.hashid: comp for comp in components} Hashed.__init__(self) self._label = repr(self.resolve(lambda hashed: Literal(hashed.label))) @property @abstractmethod def value(self) -> Composite: ... @property def spec(self) -> bytes: return json.dumps([self._jsonstr, *sorted(self._components)]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'HashedCompositeLike': jsonstr, *hashids = json.loads(spec) return cls(jsonstr, (resolve(h) for h in hashids)) @property def label(self) -> str: return self._label @property def components(self) -> Iterable[Hashed[object]]: return self._components.values() def resolve( self, handler: Callable[['Hashed[object]'], object] = lambda x: x ) -> Composite: def hook(type_tag: str, dct: Dict[str, JSONValue]) -> object: if type_tag == 'Hashed': return handler(self._components[cast(Hash, dct['hashid'])]) return dct obj = json.loads(self._jsonstr, hook=hook, cls=ClassJSONDecoder) return cast(Composite, obj) @classmethod def _default(cls, o: object) -> Optional[Tuple[object, str, Dict[str, JSONValue]]]: o = swap_type(o, cls.type_swaps) if isinstance(o, Hashed): return (o, 'Hashed', {'hashid': o.hashid}) return None @classmethod def parse_object(cls, obj: object) -> Tuple[str, Set[Hashed[object]]]: classes = (Hashed,) + tuple(cls.type_swaps) validate_json(obj, lambda x: isinstance(x, classes)) components: Set[Hashed[object]] = set() jsonstr = json.dumps( obj, sort_keys=True, tape=components, default=cls._default, cls=ClassJSONEncoder, ) return jsonstr, components class HashedComposite(HashedCompositeLike): def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: HashedCompositeLike.__init__(self, jsonstr, components) self._value = self.resolve(lambda comp: comp.value) @property def value(self) -> Composite: return self._value PK!W,, mona/json.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json from pathlib import PosixPath from typing import ( Any, Set, Type, Dict, Callable, cast, Tuple, Optional, NewType, Union, TypeVar, Iterable, ) from .graph import traverse_id from .errors import CompositeError _T = TypeVar('_T') # JSONContainer should be Union[List[JSONValue], Dict[str, JSONValue]] JSONContainer = NewType('JSONContainer', object) JSONValue = Union[None, bool, int, float, str, JSONContainer] JSONConverter = Callable[[_T], Dict[str, JSONValue]] JSONAdapter = Callable[[Dict[str, JSONValue]], _T] JSONDefault = Callable[[_T], Optional[Tuple[Any, str, Dict[str, JSONValue]]]] JSONHook = Callable[[str, Dict[str, JSONValue]], Union[_T, Dict[str, JSONValue]]] ClassRegister = Dict[Type[Any], Tuple[JSONConverter[Any], JSONAdapter[Any]]] registered_classes: ClassRegister = { PosixPath: ( lambda p: {'path': str(p)}, lambda dct: PosixPath(cast(str, dct['path'])), ) } def validate_json(obj: Any, hook: Callable[[Any], bool] = None) -> None: classes = tuple(registered_classes) def parents(o: Any) -> Iterable[Any]: if o is None or isinstance(o, (str, int, float, bool)): return () if isinstance(o, classes): return () if hook and hook(o): return () elif isinstance(o, list): return o elif isinstance(o, dict): for key in o: if not isinstance(key, str): raise CompositeError('Dict keys must be strings') return o.values() else: raise CompositeError(f'Unknown object: {o!r}') for _ in traverse_id([obj], parents): pass class ClassJSONEncoder(json.JSONEncoder): def __init__( self, *args: Any, tape: Set[Any], default: JSONDefault[Any], **kwargs: Any ) -> None: super().__init__(*args, **kwargs) self._default = default self._tape = tape self._classes = tuple(registered_classes) self._default_encs = { cls: enc for cls, (enc, dec) in registered_classes.items() } def default(self, o: Any) -> JSONValue: type_tag: Optional[str] = None if isinstance(o, self._classes): dct = self._default_encs[o.__class__](o) type_tag = o.__class__.__name__ else: encoded = self._default(o) if encoded is not None: o, type_tag, dct = encoded self._tape.add(o) if type_tag is not None: return cast(JSONContainer, {'_type': type_tag, **dct}) return cast(JSONValue, super().default(o)) class ClassJSONDecoder(json.JSONDecoder): def __init__(self, *args: Any, hook: JSONHook[Any], **kwargs: Any) -> None: assert 'object_hook' not in kwargs kwargs['object_hook'] = self._my_object_hook super().__init__(*args, **kwargs) self._hook = hook self._default_decs = { cls.__name__: dec for cls, (enc, dec) in registered_classes.items() } def _my_object_hook(self, dct: Dict[str, JSONValue]) -> Any: try: type_tag = dct.pop('_type') assert isinstance(type_tag, str) except KeyError: return dct if type_tag in self._default_decs: return self._default_decs[type_tag](dct) return self._hook(type_tag, dct) PK!o  mona/pluggable.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging from typing import Dict, Any, TypeVar, cast, List, Generator, Awaitable, Generic log = logging.getLogger(__name__) _T = TypeVar('_T') _P = TypeVar('_P', bound='Pluggable') class Plugin(Generic[_P]): name: str def __call__(self, pluggable: _P) -> None: pluggable.register_plugin(self._name, self) @property def _name(self) -> str: return cast(str, getattr(self, 'name', self.__class__.__name__)) class Pluggable: def __init__(self: _P) -> None: self._plugins: Dict[str, Plugin[_P]] = {} def register_plugin(self: _P, name: str, plugin: Plugin[_P]) -> None: self._plugins[name] = plugin def _get_plugins(self: _P, reverse: bool = False) -> List[Plugin[_P]]: plugins = list(self._plugins.values()) if reverse: plugins.reverse() return plugins def _run_plugins( self, func: str, start: Any, *args: Any, reverse: bool = False, **kwargs: Any ) -> Generator[Any, Any, None]: for plugin in self._get_plugins(): all_args = args if start is None else (start, *args) try: start = yield getattr(plugin, func)(*all_args, **kwargs) except Exception: log.error(f'Error in plugin {plugin._name!r}') raise async def run_plugins_async( self, func: str, *args: Any, start: _T, reverse: bool = False, **kwargs: Any ) -> _T: gen = self._run_plugins(func, start, *args, reverse=reverse, **kwargs) try: start = await cast(Awaitable[_T], next(gen)) while True: start = await cast(Awaitable[_T], gen.send(start)) except StopIteration: return start def run_plugins( self, func: str, *args: Any, start: _T, reverse: bool = False, **kwargs: Any ) -> _T: gen = self._run_plugins(func, start, *args, reverse=reverse, **kwargs) try: start = cast(_T, next(gen)) while True: start = cast(_T, gen.send(start)) except StopIteration: return start PK!^- yymona/plugins/__init__.pyfrom .parallel import Parallel from .cache import Cache from .files import FileManager from .tmpdir import TmpdirManager PK!rjg*g*mona/plugins/cache.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import sqlite3 import pickle from enum import Enum from weakref import WeakValueDictionary from typing import ( Any, Optional, NamedTuple, Union, Sequence, Dict, cast, Type, Tuple, TypeVar, List, ) from ..sessions import Session, SessionPlugin from ..futures import State, Future from ..tasks import Task from ..utils import Pathable, get_timestamp, get_fullname, import_fullname from ..hashing import Hash, Hashed log = logging.getLogger(__name__) _T_co = TypeVar('_T_co', covariant=True) WeakDict = WeakValueDictionary class TaskRow(NamedTuple): hashid: Hash state: str side_effects: Optional[str] = None result_type: Optional[str] = None result: Union[Hash, bytes, None] = None class ObjectRow(NamedTuple): hashid: Hash typetag: str spec: bytes class ResultType(Enum): HASHED = 0 PICKLED = 1 class SessionRow(NamedTuple): sessionid: int created: str class TargetRow(NamedTuple): objectid: Hash sessionid: int label: Optional[str] metadata: Optional[bytes] class CachedTask(Task[_T_co]): def __init__(self, hashid: Hash) -> None: self._hashid = hashid self._args = () Future.__init__(self, []) # type: ignore class WriteAccess(Enum): EAGER = 0 ON_EXIT = 1 NEVER = 2 class Cache(SessionPlugin): name = 'db_cache' def __init__( self, db: sqlite3.Connection, write: str = 'eager', full_restore: bool = False ) -> None: self._db = db self._objects: Dict[Hash, Hashed[object]] = {} self._object_cache: WeakDict[Hash, Hashed[object]] = WeakDict() self._write = WriteAccess[write.upper()] self._full_restore = full_restore def __repr__(self) -> str: return f'' @property def db(self) -> sqlite3.Connection: return self._db def _store_objects(self, objs: Sequence[Hashed[object]]) -> None: obj_rows = [ ObjectRow(obj.hashid, get_fullname(obj.__class__), obj.spec) for obj in objs ] self._db.executemany('INSERT OR IGNORE INTO objects VALUES (?,?,?)', obj_rows) def _store_targets(self, objs: Sequence[Hashed[object]]) -> None: sessionid = Session.active().storage['cache:sessionid'] target_rows = [ TargetRow( obj.hashid, sessionid, obj.label if isinstance(obj, Task) else None, obj.metadata(), ) for obj in objs ] self._db.executemany( 'INSERT OR IGNORE INTO targets VALUES (?,?,?,?)', target_rows ) def _update_state(self, task: Task[object]) -> None: self._db.execute( 'UPDATE tasks SET state = ? WHERE hashid = ?', (task.state.name, task.hashid), ) def update_state(self, task: Task[object]) -> None: self._update_state(task) self._db.commit() def _store_result(self, task: Task[object]) -> None: result: Union[Hash, bytes] hashed: Hashed[object] if task.state is State.AWAITING: hashed = task.future_result() result_type = ResultType.HASHED else: assert task.state is State.DONE hashed_or_obj = task.resolve() if isinstance(hashed_or_obj, Hashed): result_type = ResultType.HASHED hashed = hashed_or_obj else: result_type = ResultType.PICKLED result = pickle.dumps(hashed_or_obj) if result_type is ResultType.HASHED: result = hashed.hashid side_effects = ','.join( t.hashid for t in Session.active().get_side_effects(task) ) self._db.execute( 'REPLACE INTO tasks VALUES (?,?,?,?,?)', TaskRow( task.hashid, task.state.name, side_effects, result_type.name, result ), ) def _get_task_row(self, hashid: Hash) -> Optional[TaskRow]: raw_row = self._db.execute( 'SELECT * FROM tasks WHERE hashid = ?', (hashid,) ).fetchone() if not raw_row: return None return TaskRow(*raw_row) def _get_target_row(self, hashid: Hash) -> TargetRow: raw_row = self._db.execute( 'SELECT * FROM targets WHERE objectid = ? ORDER BY sessionid DESC LIMIT 1', (hashid,), ).fetchone() assert raw_row return TargetRow(*raw_row) def _get_object_factory(self, hashid: Hash) -> Tuple[bytes, Type[Hashed[object]]]: raw_row = self._db.execute( 'SELECT * FROM objects WHERE hashid = ?', (hashid,) ).fetchone() assert raw_row row = ObjectRow(*raw_row) factory = import_fullname(row.typetag) assert issubclass(factory, Hashed) return row.spec, factory def _get_object(self, hashid: Hash) -> Hashed[object]: obj: Optional[Hashed[object]] = self._object_cache.get(hashid) if obj: return obj spec, factory = self._get_object_factory(hashid) if factory is Task and not self._full_restore: task_row = self._get_task_row(hashid) assert task_row if State[task_row.state] > State.HAS_RUN: obj = CachedTask(hashid) if not obj: obj = factory.from_spec(spec, self._get_object) assert hashid == obj.hashid metadata = self._get_target_row(hashid).metadata if metadata is not None: obj.set_metadata(metadata) if isinstance(obj, Task): obj, registered = Session.active().register_task(obj) if registered: if not self._full_restore: self._to_restore.append(obj) self._object_cache[hashid] = obj return obj def _get_result(self, row: TaskRow) -> object: if State[row.state] < State.HAS_RUN: return None assert row.result_type result_type = ResultType[row.result_type] if result_type is ResultType.PICKLED: assert isinstance(row.result, bytes) result: object = pickle.loads(row.result) else: assert result_type is ResultType.HASHED assert isinstance(row.result, str) result = self._get_object(row.result) return result def _restore_task(self, task: Task[object]) -> None: row = self._get_task_row(task.hashid) assert row if State[row.state] < State.HAS_RUN: return log.debug(f'Restoring from cache: {task}') assert State[row.state] > State.HAS_RUN task.set_running() sess = Session.active() if self._full_restore and row.side_effects: side_effects: List[Task[object]] = [] for hashid in row.side_effects.split(','): child_task = self._get_object(cast(Hash, hashid)) assert isinstance(child_task, Task) sess.add_side_effect_of(task, child_task) side_effects.append(child_task) self._to_restore.extend(reversed(side_effects)) task.set_has_run() sess.set_result(task, self._get_result(row)) def save_hashed(self, objs: Sequence[Hashed[object]]) -> None: if self._write is WriteAccess.EAGER: self._store_objects(objs) self._store_targets(objs) self._db.commit() else: self._objects.update({o.hashid: o for o in objs}) def _store_session(self, sess: Session) -> None: cur = self._db.execute( 'INSERT INTO sessions VALUES (?,?)', (None, get_timestamp()) ) sess.storage['cache:sessionid'] = cur.lastrowid def post_enter(self, sess: Session) -> None: if self._write is WriteAccess.EAGER: self._store_session(sess) def post_create(self, task: Task[object]) -> None: row = self._get_task_row(task.hashid) if row: self._to_restore = [task] tasks: List[Task[object]] = [] while self._to_restore: t = self._to_restore.pop() self._restore_task(t) tasks.append(t) delattr(self, '_to_restore') elif self._write is WriteAccess.EAGER: tasks = [task] self._db.execute( 'INSERT INTO tasks VALUES (?,?,?,?,?)', TaskRow(task.hashid, task.state.name), ) self._store_objects(tasks) if self._write is WriteAccess.EAGER: self._store_targets(tasks) self._db.commit() def post_task_run(self, task: Task[object]) -> None: if self._write is not WriteAccess.EAGER: return self._store_result(task) if task.state < State.DONE: task.add_done_callback(lambda task: self.update_state(task)) self._db.commit() def pre_exit(self, sess: Session) -> None: if self._write is not WriteAccess.ON_EXIT: return self._store_session(sess) for task in sess.all_tasks(): if task.state > State.HAS_RUN: self._store_result(task) else: self._update_state(task) objects = [*self._objects.values(), *sess.all_tasks()] self._store_objects(objects) self._store_targets(objects) self._objects.clear() self._db.commit() @classmethod def from_path(cls, path: Pathable, **kwargs: Any) -> 'Cache': db = sqlite3.connect(path) db.execute( """\ CREATE TABLE IF NOT EXISTS objects ( hashid TEXT PRIMARY KEY, typetag TEXT, spec BLOB ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS tasks ( hashid TEXT PRIMARY KEY, state TEXT, side_effects TEXT, result_type TEXT, result BLOB, FOREIGN KEY (hashid) REFERENCES objects(hashid) ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS sessions ( sessionid INTEGER PRIMARY KEY, created TEXT ) """ ) db.execute( """\ CREATE TABLE IF NOT EXISTS targets ( objectid TEXT, sessionid INTEGER, label TEXT, metadata BLOB, PRIMARY KEY (objectid, sessionid), FOREIGN KEY (objectid) REFERENCES objects(hashid), FOREIGN KEY (sessionid) REFERENCES sessions(sessionid) ) """ ) return Cache(db, **kwargs) PK! >>mona/plugins/files.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import shutil import hashlib from pathlib import Path from ..hashing import Hash from ..sessions import Session, SessionPlugin from ..utils import make_nonwritable, make_writable, Pathable from ..errors import FilesError from ..files import FileManager as _FileManager from typing import Dict, Union __version__ = '0.2.0' class FileManager(_FileManager, SessionPlugin): name = 'file_manager' def __init__(self, root: Union[str, Pathable], eager: bool = True) -> None: self._root = Path(root).resolve() self._cache: Dict[Hash, bytes] = {} self._path_cache: Dict[Path, Hash] = {} self._eager = eager def __repr__(self) -> str: return f'' def _path(self, hashid: Hash, must_exist: bool = False) -> Path: path = self._root / hashid[:2] / hashid[2:] if must_exist and not path.exists(): raise FilesError(f'Missing in manager: {hashid}') return path def _path_primed(self, hashid: Hash) -> Path: path = self._path(hashid) path.parent.mkdir(exist_ok=True) return path def __contains__(self, hashid: Hash) -> bool: return hashid in self._cache or self._path(hashid).is_file() def post_enter(self, sess: Session) -> None: sess.storage['file_manager'] = self def _store_bytes(self, hashid: Hash, content: bytes) -> None: stored_path = self._path_primed(hashid) stored_path.write_bytes(content) make_nonwritable(stored_path) def store_bytes(self, content: bytes) -> 'Hash': hashid = Hash(hashlib.sha1(content).hexdigest()) if hashid not in self: self._cache[hashid] = content if self._eager: self._store_bytes(hashid, content) return hashid def _store_path(self, hashid: Hash, path: Path, keep: bool) -> None: stored_path = self._path_primed(hashid) if keep: shutil.copy(path, stored_path) else: path.rename(stored_path) make_nonwritable(stored_path) def store_path(self, path: Path, *, keep: bool) -> 'Hash': hashid = self._path_cache.get(path) if hashid: return hashid sha1 = hashlib.sha1() with path.open('rb') as f: while True: data = f.read(2 ** 20) if not data: break sha1.update(data) hashid = Hash(sha1.hexdigest()) if hashid not in self: # TODO this is not good with large files self._cache[hashid] = path.read_bytes() if self._eager: self._store_path(hashid, path, keep) return self._path_cache.setdefault(path, hashid) def get_bytes(self, hashid: Hash) -> bytes: try: return self._cache[hashid] except KeyError: pass path = self._path(hashid, must_exist=True) return self._cache.setdefault(hashid, path.read_bytes()) def target_in(self, path: Path, hashid: Hash, *, mutable: bool) -> None: content = self._cache.get(hashid) if content: path.write_bytes(content) if not mutable: make_nonwritable(path) return stored_path = self._path(hashid, must_exist=True) if mutable: shutil.copy(stored_path, path) make_writable(path) else: if path.exists(): path.unlink() path.symlink_to(stored_path) def store_cache(self) -> None: for hashid, content in self._cache.items(): self._store_bytes(hashid, content) PK!h*mona/plugins/parallel.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import asyncio import logging from contextlib import asynccontextmanager from typing import Any, TypeVar, AsyncGenerator, Optional, Set, cast from ..graph import NodeExecuted from ..tasks import Task, Corofunc from ..sessions import Session, SessionPlugin, TaskExecute log = logging.getLogger(__name__) _T = TypeVar('_T') class Parallel(SessionPlugin): name = 'parallel' def __init__(self, ncores: int = None) -> None: self._ncores = ncores or os.cpu_count() or 1 self._available = self._ncores self._asyncio_tasks: Set[asyncio.Task[Any]] = set() self._pending: Optional[int] = None self._registered_exceptions = 0 def post_enter(self, sess: Session) -> None: sess.storage['scheduler'] = self.run_coro async def pre_run(self) -> None: self._sem = asyncio.BoundedSemaphore(self._ncores) self._lock = asyncio.Lock() async def post_run(self) -> None: if not self._asyncio_tasks: return log.info(f'Cancelling {len(self._asyncio_tasks)} running tasks...') for task in self._asyncio_tasks: task.cancel() await asyncio.gather(*self._asyncio_tasks) assert not self._asyncio_tasks log.info('All tasks cancelled') def _release(self, ncores: int) -> None: if self._pending is None: for _ in range(ncores): self._sem.release() self._available += ncores else: self._pending += ncores def _stop(self) -> None: assert self._pending is None self._pending = 0 log.info(f'Stopping scheduler') def ignored_exception(self) -> None: if self._registered_exceptions == 0: return self._registered_exceptions -= 1 if self._registered_exceptions > 0: return assert self._pending is not None log.info(f'Resuming scheduler with {self._pending} cores') pending = self._pending self._pending = None self._release(pending) def wrap_execute(self, execute: TaskExecute) -> TaskExecute: async def _execute(task: Task[Any], done: NodeExecuted[Task[Any]]) -> None: try: await execute(task, done) except Exception as e: if not isinstance(e, asyncio.CancelledError): done((task, e, ())) asyncio_task = asyncio.current_task() assert asyncio_task self._asyncio_tasks.remove(asyncio_task) async def spawn_execute(*args: Any) -> None: asyncio_task = asyncio.create_task(_execute(*args)) self._asyncio_tasks.add(asyncio_task) return spawn_execute @asynccontextmanager async def _acquire(self, ncores: int) -> AsyncGenerator[None, None]: async with self._lock: for _ in range(ncores): await self._sem.acquire() self._available -= 1 try: yield except Exception: if self._registered_exceptions == 0: self._stop() self._registered_exceptions += 1 raise finally: self._release(ncores) async def run_coro(self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any) -> _T: task = Session.active().running_task n = cast(int, task.storage.get('ncores', 1)) if n > self._available: log.debug( f'Waiting for {n-self._available}/{n} ' f'unavailable cores for "{task}"' ) waited = True else: waited = False async with self._acquire(n): if waited: log.debug(f'All {n} cores available for "{task}", resuming') return await corofunc(*args, **kwargs) PK!NCmona/plugins/tmpdir.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import shutil from tempfile import mkdtemp from pathlib import Path from contextlib import contextmanager from typing import Iterator from ..sessions import Session, SessionPlugin from ..utils import Pathable from ..rules.dirtask import TmpdirManager as _TmpdirManager log = logging.getLogger(__name__) class TmpdirManager(_TmpdirManager, SessionPlugin): name = 'tmpdir_manager' def __init__(self, root: Pathable) -> None: self._root = Path(root).resolve() def post_enter(self, sess: Session) -> None: sess.storage['dir_task:tmpdir_manager'] = self @contextmanager def tempdir(self) -> Iterator[str]: task = Session.active().running_task path = mkdtemp(prefix=f'{task.hashid[:6]}_', dir=str(self._root)) log.debug(f'Created tempdir for "{task.label}": {path}') try: yield path except Exception: raise else: shutil.rmtree(path) PK!lEEmona/rules/__init__.pyfrom .rules import Rule from .dirtask import dir_task, DirtaskTmpdir PK!Ұ>''mona/rules/dirtask.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import subprocess from pathlib import Path from tempfile import TemporaryDirectory from typing import ( Dict, ContextManager, Any, Optional, Callable, List, Union, Tuple, Sequence, ) from typing_extensions import Protocol, runtime from ..utils import make_executable, Pathable from ..sessions import Session from ..rules import Rule from ..runners import run_process from ..errors import InvalidInput from ..files import File __version__ = '0.2.0' log = logging.getLogger(__name__) DirtaskInput = Union[File, Tuple[Path, str]] @runtime class TmpdirManager(Protocol): def tempdir(self) -> ContextManager[Pathable]: ... class DirTaskProcessError(subprocess.CalledProcessError): def __init__(self, stdout: bytes, stderr: bytes, *args: Any) -> None: super().__init__(*args) self.stdout = stdout self.stderr = stderr def __str__(self) -> str: return '\n'.join( [ 'STDOUT:', self.stdout.decode(), '', 'STDERR:', self.stderr.decode(), '', super().__str__(), ] ) class DirtaskTmpdir: """ Context manager of a temporary directory that collects created files. :param output_filter: true for files to be collected """ def __init__(self, output_filter: Callable[[str], bool] = None) -> None: sess = Session.active() dirmngr = sess.storage.get('dir_task:tmpdir_manager') assert not dirmngr or isinstance(dirmngr, TmpdirManager) self._dirmngr = dirmngr self._output_filter = output_filter def has_tmpdir_manager(self) -> bool: return self._dirmngr is not None def __enter__(self) -> Path: self._ctx = ( TemporaryDirectory() if not self._dirmngr else self._dirmngr.tempdir() ) self._tmpdir = Path(self._ctx.__enter__()) return self._tmpdir def __exit__(self, exc_type: Any, *args: Any) -> None: try: if not exc_type: self._outputs: Dict[str, File] = {} for path in self._tmpdir.glob('**/*'): if not path.is_file(): continue relpath = str(path.relative_to(self._tmpdir)) if self._output_filter and not self._output_filter(relpath): continue file = File.from_path(path, self._tmpdir, keep=False) self._outputs[relpath] = file finally: self._ctx.__exit__(exc_type, *args) def result(self) -> Dict[str, File]: """ The collection of files created in the temporary directory. This is available only after leaving the context. """ return self._outputs def checkout_files( root: Path, exe: Optional[File], files: Sequence[DirtaskInput], mutable: bool = False, ) -> None: assert root.exists() if exe: files = [exe, *files] for file in files: if isinstance(file, File): path = file.path else: path, target = file (root / path.parent).mkdir(parents=True, exist_ok=True) if isinstance(file, File): file.target_in(root, mutable=mutable) else: (root / path).symlink_to(target) if exe: make_executable(root / exe.path) @Rule async def dir_task(exe: File, inputs: List[DirtaskInput]) -> Dict[str, File]: """ Task rule with an executable and a collection of files as inputs and a collection of output files as output. """ for file in [exe, *inputs]: if not ( isinstance(file, File) or isinstance(file, list) and len(file) == 2 and isinstance(file[0], Path) and isinstance(file[1], str) ): raise InvalidInput(str(file)) input_names = { str(inp if isinstance(inp, File) else inp[0]) for inp in [exe, *inputs] } dirtask_tmpdir = DirtaskTmpdir(lambda p: p not in input_names) with dirtask_tmpdir as tmpdir: checkout_files(tmpdir, exe, inputs) out_path, err_path = tmpdir / 'STDOUT', tmpdir / 'STDERR' try: with out_path.open('w') as stdout, err_path.open('w') as stderr: await run_process( str(tmpdir / exe.path), stdout=stdout, stderr=stderr, cwd=tmpdir ) except subprocess.CalledProcessError as e: if dirtask_tmpdir.has_tmpdir_manager(): raise exc: Optional[subprocess.CalledProcessError] = e else: exc = None if exc: out = out_path.read_bytes() err = err_path.read_bytes() raise DirTaskProcessError(out, err, exc.returncode, exc.cmd) return dirtask_tmpdir.result() PK!s ,mona/rules/rules.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import inspect from functools import wraps from typing import Any, TypeVar, Generic, List, Callable from ..tasks import Task, Corofunc from ..sessions import Session from ..errors import MonaError from ..hashing import Hashed, hash_function _T = TypeVar('_T') ArgFactory = Callable[[], Hashed[object]] class Rule(Generic[_T]): """Decorator that turns a coroutine function into a rule, which is a callable that generates a task instead of actually calling the coroutine. :param corofunc: a coroutine function """ def __init__(self, corofunc: Corofunc[_T]) -> None: if not inspect.iscoroutinefunction(corofunc): raise MonaError(f'Task function is not a coroutine: {corofunc}') self._corofunc = corofunc self._extra_arg_factories: List[ArgFactory] = [] wraps(corofunc)(self) def _ensure_extra_args(self) -> None: if not hasattr(self, '_hash'): self._extra_args = [factory() for factory in self._extra_arg_factories] hashes = [ hash_function(self._corofunc), *(obj.hashid for obj in self._extra_args), ] self._hash = ','.join(hashes) def _func_hash(self) -> str: self._ensure_extra_args() return self._hash def __call__(self, *args: Any, **kwargs: Any) -> Task[_T]: self._ensure_extra_args() assert 'rule' not in kwargs kwargs['rule'] = self._corofunc.__name__ return Session.active().create_task( self._corofunc, *args, *self._extra_args, **kwargs ) def add_extra_arg(self, factory: ArgFactory) -> None: assert not hasattr(self, '_extra_args') self._extra_arg_factories.append(factory) @property def corofunc(self) -> Corofunc[_T]: return self._corofunc PK!큐::mona/runners.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import asyncio import subprocess from typing import Any, TypeVar, Callable, Optional, Tuple, Union from typing_extensions import Protocol, runtime from .tasks import Corofunc from .sessions import Session log = logging.getLogger(__name__) _T = TypeVar('_T') ProcessOutput = Union[bytes, Tuple[bytes, bytes]] __version__ = '0.1.0' @runtime class Scheduler(Protocol): async def __call__(self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any) -> _T: ... def _scheduler() -> Optional[Scheduler]: scheduler = Session.active().storage.get('scheduler') if scheduler is None: return None assert isinstance(scheduler, Scheduler) return scheduler async def run_shell(cmd: str, **kwargs: Any) -> ProcessOutput: """Execute a command in a shell. Wrapper around :func:`asyncio.create_subprocess_shell` that handles errors and whose behavior can be modified by session plugins. :param cmd: a shell command to be executed :param kwargs: all keyword arguments are passed to :func:`asyncio.create_subprocess_shell`. `PIPE` is passed to `stdin` and `stdout` keyword arguments by default. Return the standard output as bytes if no error output was generated or a tuple of bytes containing standard and error outputs. """ scheduler = _scheduler() assert 'shell' not in kwargs kwargs['shell'] = True if scheduler: return await scheduler(_run_process, cmd, **kwargs) return await _run_process(cmd, **kwargs) async def run_process(*args: str, **kwargs: Any) -> ProcessOutput: """Create a subprocess. Wrapper around :func:`asyncio.create_subprocess_exec` that handles errors and whose behavior can be modified by session plugins. :param args: arguments of the subprocess :param kwargs: all keyword arguments are passed to :func:`asyncio.create_subprocess_exec`. `PIPE` is passed to `stdin` and `stdout` keyword arguments by default. Return the standard output as bytes if no error output was generated or a tuple of bytes containing standard and error outputs. """ scheduler = _scheduler() if scheduler: return await scheduler(_run_process, args, **kwargs) return await _run_process(args, **kwargs) async def _run_process( args: Union[str, Tuple[str, ...]], shell: bool = False, input: bytes = None, **kwargs: Any, ) -> Union[bytes, Tuple[bytes, bytes]]: kwargs.setdefault('stdin', subprocess.PIPE) kwargs.setdefault('stdout', subprocess.PIPE) if shell: assert isinstance(args, str) proc = await asyncio.create_subprocess_shell(args, **kwargs) else: assert isinstance(args, tuple) proc = await asyncio.create_subprocess_exec(*args, **kwargs) try: stdout, stderr = await proc.communicate(input) except asyncio.CancelledError: try: proc.terminate() except ProcessLookupError: pass else: await proc.wait() raise if proc.returncode: log.error(f'Got nonzero exit code in {args!r}') raise subprocess.CalledProcessError(proc.returncode, args) if stderr is None: return stdout return stdout, stderr async def run_thread(func: Callable[..., _T], *args: Any) -> _T: """Run a callable in a new thread. Wrapper around :meth:`loop.run_in_executor` whose behavior can be modified by session plugins. :param func: a callable :param args: positional arguments to the callable. Return the result of the callable. """ scheduler = _scheduler() if scheduler: return await scheduler(_run_thread, func, *args) return await _run_thread(func, *args) async def _run_thread(func: Callable[..., _T], *args: Any) -> _T: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, func, *args) PK!mona/sci/__init__.pyPK!FFmona/sci/aims/__init__.pyfrom .aims import Aims, SpeciesDefaults from .parse import parse_aims PK!F22mona/sci/aims/aims.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import shutil from copy import deepcopy from pathlib import Path from collections import OrderedDict from typing import Dict, Any, Tuple, Callable, cast from ...rules.dirtask import dir_task from ...files import File from ...tasks import Task from ...errors import MonaError, InvalidInput from ...pluggable import Plugin, Pluggable from ...hashing import hash_function from ..geomlib import Molecule, Atom from .dsl import parse_aims_input, expand_dicts __version__ = '0.1.0' class AimsPlugin(Plugin['Aims']): def process(self, kwargs: Dict[str, Any]) -> None: pass def _func_hash(self) -> str: return hash_function(self.process) class Aims(Pluggable): """Instances of this class are task factories that create directory tasks that represent calculations with FHI-aims. """ def __init__(self) -> None: Pluggable.__init__(self) for factory in default_plugins: factory()(self) def __call__(self, *, label: str = None, **kwargs: Any) -> Task[Dict[str, File]]: """Create an FHI-aims. :param kwargs: processed by individual plugins """ self.run_plugins('process', kwargs, start=None) script = File.from_str('aims.sh', kwargs.pop('script')) inputs = [File.from_str(name, cont) for name, cont in kwargs.pop('inputs')] if kwargs: raise InvalidInput(f'Unknown Aims kwargs: {list(kwargs.keys())}') return dir_task(script, inputs, label=label) def _func_hash(self) -> str: return ','.join( [ hash_function(Aims.__call__), *(cast(AimsPlugin, p)._func_hash() for p in self._get_plugins()), ] ) class SpeciesDir(AimsPlugin): def __init__(self) -> None: self._speciesdirs: Dict[Tuple[str, str], Path] = {} def process(self, kwargs: Dict[str, Any]) -> None: sp_def_key = aims, sp_def = kwargs['aims'], kwargs.pop('species_defaults') speciesdir = self._speciesdirs.get(sp_def_key) if not speciesdir: pathname = shutil.which(aims) if not pathname: pathname = shutil.which('aims-master') if not pathname: raise MonaError(f'Aims "{aims}" not found') path = Path(pathname) speciesdir = path.parents[1] / 'aimsfiles/species_defaults' / sp_def self._speciesdirs[sp_def_key] = speciesdir # type: ignore kwargs['speciesdir'] = speciesdir class Atoms(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: if 'atoms' in kwargs: kwargs['geom'] = Molecule([Atom(*args) for args in kwargs.pop('atoms')]) class SpeciesDefaults(AimsPlugin): """Aims plugin that handles adding species defaults to control.in. """ def __init__(self, mod: Callable[..., Any] = None) -> None: self._species_defs: Dict[Tuple[Path, str], Dict[str, Any]] = {} self._mod = mod def process(self, kwargs: Dict[str, Any]) -> None: speciesdir = kwargs.pop('speciesdir') all_species = {(a.number, a.species) for a in kwargs['geom'].centers} species_defs = [] for Z, species in sorted(all_species): if (speciesdir, species) not in self._species_defs: species_def = parse_aims_input( (speciesdir / f'{Z:02d}_{species}_default').read_text() )['species'][0] self._species_defs[speciesdir, species] = species_def else: species_def = self._species_defs[speciesdir, species] species_defs.append(species_def) if self._mod: species_defs = deepcopy(species_defs) self._mod(species_defs, kwargs) kwargs['species_defs'] = species_defs def _func_hash(self) -> str: if not self._mod: return super()._func_hash() funcs = self.process, self._mod return ','.join(hash_function(f) for f in funcs) # type: ignore class Control(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: species_tags = [] for spec in kwargs.pop('species_defs'): spec = OrderedDict(spec) while spec: tag, value = spec.popitem(last=False) if tag == 'angular_grids': species_tags.append((tag, value)) for grid in spec.pop('grids'): species_tags.extend(grid.items()) elif tag == 'basis': for basis in value: species_tags.extend(basis.items()) else: species_tags.append((tag, value)) species_tags = [(t, expand_dicts(v)) for t, v in species_tags] tags = [*kwargs.pop('tags').items(), *species_tags] lines = [] for tag, value in tags: if value is None: continue if value is (): lines.append(tag) elif isinstance(value, list): lines.extend(f'{tag} {p2f(v)}' for v in value) else: lines.append(f'{tag} {p2f(value)}') kwargs['control'] = '\n'.join(lines) class Geom(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: kwargs['geometry'] = kwargs.pop('geom').dumps('aims') class Core(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: kwargs['inputs'] = [ ('control.in', kwargs.pop('control')), ('geometry.in', kwargs.pop('geometry')), ] class Script(AimsPlugin): def process(self, kwargs: Dict[str, Any]) -> None: aims, check = kwargs.pop('aims'), kwargs.pop('check', True) lines = ['#!/bin/bash', 'set -e', f'AIMS={aims} run_aims'] if check: lines.append('egrep "Have a nice day|stop_if_parser" STDOUT >/dev/null') kwargs['script'] = '\n'.join(lines) default_plugins = [SpeciesDir, Atoms, SpeciesDefaults, Control, Geom, Core, Script] def p2f(value: Any, nospace: bool = False) -> str: if isinstance(value, bool): return f'.{str(value).lower()}.' if isinstance(value, tuple): return (' ' if not nospace else ':').join(p2f(x) for x in value) if isinstance(value, dict): return ' '.join( f'{p2f(k)}={p2f(v, nospace=True)}' if v is not None else f'{p2f(k)}' for k, v in sorted(value.items()) ) return str(value) PK!{UQmona/sci/aims/aims.txInput: species+=Species ; Species: 'species' species=ID ( ('mass' mass=FLOAT_) ('nucleus' nucleus=FLOAT_) ('l_hartree' l_hartree=INT) ('cut_pot' cut_pot=CutPot) ('basis_dep_cutoff' (basis_dep_cutoff=FLOAT_ | basis_dep_cutoff=BOOL_)) ('radial_base' radial_base=RadialBase) ('radial_multiplier' radial_multiplier=INT) ('cite_reference' cite_reference=/\S+/)? ('basis_acc' basis_acc=FLOAT_)? ('include_min_basis' include_min_basis=BOOL_)? ('pure_gauss' pure_gauss=BOOL_)? ('angular_grids' ( angular_grids='auto' | angular_grids='specified' grids+=InnerGridShell grids=OuterGridShell )) ('valence' valence=Occupation | 'ion_occ' ion_occ=Occupation)* basis+=BasisFunction )# ; CutPot: onset=FLOAT_ width=FLOAT_ scale=FLOAT_ ; RadialBase: number=INT radius=FLOAT_ ; InnerGridShell: 'division' division=Division ; Division: radius=FLOAT_ points=LebedevInt ; OuterGridShell: 'outer_grid' outer_grid=LebedevInt ; Occupation: n=INT l=Angular occupation=FLOAT_ ; BasisFunction: 'ionic' ionic=IonicBasisFunction | 'hydro' hydro=HydroBasisFunction | 'gaussian' gaussian=GaussianBasisFunction ; IonicBasisFunction: n=INT l=Angular (radius=FLOAT_ | radius='auto') ; HydroBasisFunction: n=INT l=Angular z_eff=FLOAT_ ; GaussianBasisFunction: L=INT ('1' alpha=FLOAT_ | /(?!1$)\d+/ (alpha=FLOAT_ coeff=FLOAT_)+) ; BOOL_: /.true.|.false./ ; Angular: /[spdfghi]/ ; LebedevInt: /6|14|26|38|50|86|110|146|170|194|302|350|434|590|770|974|1202|1454|1730|2030|2354|2702|3074|3470|3890|4334|4802|5294|5810/ ; FLOAT_: /-?(\d*\.\d+|\d+\.\d*|\d+)([de]-?\d+)?/ ; Comment: /#.*$/ ; PK!Gvvmona/sci/aims/dsl.pyfrom importlib import resources from typing import Dict, Any, cast from textx.metamodel import metamodel_from_str, TextXClass # type: ignore from .. import aims __version__ = '0.1.0' _bools = {'.true.': True, '.false.': False} _aims_mm = metamodel_from_str( resources.read_text(aims, 'aims.tx'), match_filters={ 'FLOAT_': lambda s: float(s.replace('d', 'e')), 'LebedevInt': int, 'BOOL_': lambda s: _bools[s], }, auto_init_attributes=False, ) def _model_to_dict(o: Any) -> Any: if isinstance(o, TextXClass): return { k: _model_to_dict(v) for k, v in vars(o).items() if k[0] != '_' and k != 'parent' } if isinstance(o, list): return [_model_to_dict(x) for x in o] return o def expand_dicts(o: Any) -> Any: if isinstance(o, dict): return tuple(map(expand_dicts, o.values())) if isinstance(o, list): return list(map(expand_dicts, o)) return o def parse_aims_input(source: str) -> Dict[str, Any]: model = _aims_mm.model_from_str(source) return cast(Dict[str, Any], _model_to_dict(model)) PK!Ly y mona/sci/aims/parse.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import io import xml.etree.ElementTree as ET from typing import Any, Type, Dict, IO import numpy as np # type: ignore from ...files import File from ...rules import Rule @Rule async def parse_aims(outputs: Dict[str, File]) -> Any: """Task rule with an output of :class:`mona.sci.aims.Aims` as input and a dictionary of parsed results as output. """ stdout = outputs['results.xml'].read_text() parsed = parse_xml(io.StringIO(stdout)) energies = {x['name']: x['value'][0] for x in parsed['energy']} return {'energy': energies['Total energy']} def parse_xml(source: IO[str]) -> Any: root = ET.parse(source).getroot() return parse_xmlelem(root) def parse_xmlelem(elem: Any) -> Any: results = {} children = {c.tag for c in elem} for child in children: child_elems = elem.findall(child) child_results = [] for child_elem in child_elems: if 'type' in child_elem.attrib: if 'size' in child_elem.attrib: child_elem_results = parse_xmlarr(child_elem) else: child_elem_results = float(child_elem.text) elif len(list(child_elem)): child_elem_results = parse_xmlelem(child_elem) else: child_elem_results = child_elem.text.strip() child_results.append(child_elem_results) if len(child_results) == 1: results[child] = child_results[0] else: results[child] = child_results return results def parse_xmlarr(xmlarr: Any, axis: int = None, typef: Type[Any] = None) -> Any: if axis is None: axis = len(xmlarr.attrib['size'].split()) - 1 if not typef: typename = xmlarr.attrib['type'] if typename == 'dble' or typename == 'real': typef = float elif typename == 'int': typef = int else: raise Exception('Unknown array type') if axis > 0: lst = [ parse_xmlarr(v, axis - 1, typef)[..., None] for v in xmlarr.findall('vector') ] return np.concatenate(lst, axis) else: return np.array([typef(x) for x in xmlarr.text.split()]) PK!Nxmona/sci/atom-data.csv"number","symbol","name","vdw radius","covalent radius","mass","ionization energy" 1,"H","hydrogen",1.2,0.38,1.0079,13.5984 2,"He","helium",1.4,0.32,4.0026,24.5874 3,"Li","lithium",1.82,1.34,6.941,5.3917 4,"Be","beryllium",1.53,0.9,9.0122,9.3227 5,"B","boron",1.92,0.82,10.811,8.298 6,"C","carbon",1.7,0.77,12.0107,11.2603 7,"N","nitrogen",1.55,0.75,14.0067,14.5341 8,"O","oxygen",1.52,0.73,15.9994,13.6181 9,"F","fluorine",1.47,0.71,18.9984,17.4228 10,"Ne","neon",1.54,0.69,20.1797,21.5645 11,"Na","sodium",2.27,1.54,22.9897,5.1391 12,"Mg","magnesium",1.73,1.3,24.305,7.6462 13,"Al","aluminium",1.84,1.18,26.9815,5.9858 14,"Si","silicon",2.1,1.11,28.0855,8.1517 15,"P","phosphorus",1.8,1.06,30.9738,10.4867 16,"S","sulfur",1.8,1.02,32.065,10.36 17,"Cl","chlorine",1.75,0.99,35.453,12.9676 18,"Ar","argon",1.88,0.97,39.948,15.7596 19,"K","potassium",2.75,1.96,39.0983,4.3407 20,"Ca","calcium",2.31,1.74,40.078,6.1132 21,"Sc","scandium",2.11,1.44,44.9559,6.5615 22,"Ti","titanium",,1.36,47.867,6.8281 23,"V","vanadium",,1.25,50.9415,6.7462 24,"Cr","chromium",,1.27,51.9961,6.7665 25,"Mn","manganese",,1.39,54.938,7.434 26,"Fe","iron",,1.25,55.845,7.9024 27,"Co","cobalt",,1.26,58.9332,7.881 28,"Ni","nickel",1.63,1.21,58.6934,7.6398 29,"Cu","copper",1.4,1.38,63.546,7.7264 30,"Zn","zinc",1.39,1.31,65.39,9.3942 31,"Ga","gallium",1.87,1.26,69.723,5.9993 32,"Ge","germanium",2.11,1.22,72.64,7.8994 33,"As","arsenic",1.85,1.19,74.9216,9.7886 34,"Se","selenium",1.9,1.16,78.96,9.7524 35,"Br","bromine",1.85,1.14,79.904,11.8138 36,"Kr","krypton",2.02,1.1,83.8,13.9996 37,"Rb","rubidium",3.03,2.11,85.4678,4.1771 38,"Sr","strontium",2.49,1.92,87.62,5.6949 39,"Y","yttrium",,1.62,88.9059,6.2173 40,"Zr","zirconium",,1.48,91.224,6.6339 41,"Nb","niobium",,1.37,92.9064,6.7589 42,"Mo","molybdenum",,1.45,95.94,7.0924 43,"Tc","technetium",,1.56,98,7.28 44,"Ru","ruthenium",,1.26,101.07,7.3605 45,"Rh","rhodium",,1.35,102.9055,7.4589 46,"Pd","palladium",1.63,1.31,106.42,8.3369 47,"Ag","silver",1.72,1.53,107.8682,7.5762 48,"Cd","cadmium",1.58,1.48,112.411,8.9938 49,"In","indium",1.93,1.44,114.818,5.7864 50,"Sn","tin",2.17,1.41,118.71,7.3439 51,"Sb","antimony",2.06,1.38,121.76,8.6084 52,"Te","tellurium",2.06,1.35,127.6,9.0096 53,"I","iodine",1.98,1.33,126.9045,10.4513 54,"Xe","xenon",2.16,1.3,131.293,12.1298 55,"Cs","caesium",3.43,2.25,132.9055,3.8939 56,"Ba","barium",2.68,1.98,137.327,5.2117 57,"La","lanthanum",,1.69,138.9055,5.5769 58,"Ce","cerium",,,140.116,5.5387 59,"Pr","praseodymium",,,140.9077,5.473 60,"Nd","neodymium",,,144.24,5.525 61,"Pm","promethium",,,145,5.582 62,"Sm","samarium",,,150.36,5.6437 63,"Eu","europium",,,151.964,5.6704 64,"Gd","gadolinium",,,157.25,6.1501 65,"Tb","terbium",,,158.9253,5.8638 66,"Dy","dysprosium",,,162.5,5.9389 67,"Ho","holmium",,,164.9303,6.0215 68,"Er","erbium",,,167.259,6.1077 69,"Tm","thulium",,,168.9342,6.1843 70,"Yb","ytterbium",,,173.04,6.2542 71,"Lu","lutetium",,1.6,174.967,5.4259 72,"Hf","hafnium",,1.5,178.49,6.8251 73,"Ta","tantalum",,1.38,180.9479,7.5496 74,"W","tungsten",,1.46,183.84,7.864 75,"Re","rhenium",,1.59,186.207,7.8335 76,"Os","osmium",,1.28,190.23,8.4382 77,"Ir","iridium",,1.37,192.217,8.967 78,"Pt","platinum",1.75,1.28,195.078,8.9587 79,"Au","gold",1.66,1.44,196.9665,9.2255 80,"Hg","mercury",1.55,1.49,200.59,10.4375 81,"Tl","thallium",1.96,1.48,204.3833,6.1082 82,"Pb","lead",2.02,1.47,207.2,7.4167 83,"Bi","bismuth",2.07,1.46,208.9804,7.2856 84,"Po","polonium",1.97,,209,8.417 85,"At","astatine",2.02,,210,9.3 86,"Rn","radon",2.2,1.45,222,10.7485 87,"Fr","francium",3.48,,223,4.0727 88,"Ra","radium",2.83,,226,5.2784 89,"Ac","actinium",,,227,5.17 90,"Th","thorium",,,232.0381,6.3067 91,"Pa","protactinium",,,231.0359,5.89 92,"U","uranium",1.86,,238.0289,6.1941 PK!0D0Dmona/sci/geomlib.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import csv import json from io import StringIO from copy import deepcopy from importlib import resources from collections import OrderedDict from itertools import chain, product, repeat from typing import ( List, Tuple, DefaultDict, Iterator, IO, Sized, Iterable, Union, Dict, Any, TYPE_CHECKING, TypeVar, Type, cast, ) from .. import sci if TYPE_CHECKING: import numpy as np # type: ignore else: np = None # lazy-loaded in Molecule constructor __version__ = '0.1.0' Vec = Tuple[float, float, float] _M = TypeVar('_M', bound='Molecule') bohr = 0.529_177_210_92 with resources.open_text(sci, 'atom-data.csv') as f: species_data = OrderedDict( (r['symbol'], {**r, 'number': int(r['number'])}) # type: ignore for r in csv.DictReader((l for l in f), quoting=csv.QUOTE_NONNUMERIC) ) _string_cache: Dict[Any, str] = {} def no_neg_zeros(r: Any) -> Any: return [0.0 if abs(x) < 1e-8 else x for x in r] class Atom: """ Represents a single atom. :param species: atom type :param coord: atom coordinate """ def __init__(self, species: str, coord: Vec, **flags: Any) -> None: self.species = species self.coord: Vec = cast(Vec, tuple(coord)) self.flags = flags @property def mass(self) -> float: """atom mass""" mass: float = species_data[self.species]['mass'] return mass @property def number(self) -> int: """atom number""" return int(species_data[self.species]['number']) @property def covalent_radius(self) -> float: """covalent radius""" r: float = species_data[self.species]['covalent radius'] return r def copy(self) -> 'Atom': """Create a copy.""" return Atom(self.species, self.coord, **deepcopy(self.flags)) class Molecule(Sized, Iterable[Atom]): """Represents a molecule. :param atoms: atoms """ def __init__(self, atoms: List[Atom], **flags: Any) -> None: global np if np is None: import numpy as np self._atoms = atoms self.flags = flags @classmethod def from_coords( cls: Type[_M], species: List[str], coords: List[Vec], **flags: Any ) -> _M: """Alternative constructor. :param species: atom types :param coords: coordinates """ return cls([Atom(sp, coord) for sp, coord in zip(species, coords)], **flags) @property def species(self) -> List[str]: """atom types""" return [atom.species for atom in self] @property def numbers(self) -> List[int]: """atom numbers""" return [atom.number for atom in self] @property def mass(self) -> float: """molecular mass""" return sum(atom.mass for atom in self) @property def cms(self) -> Any: """center of mass""" masses = np.array([atom.mass for atom in self]) return (masses[:, None] * self.xyz).sum(0) / self.mass @property def inertia(self) -> Any: """inertia tensor""" masses = np.array([atom.mass for atom in self]) coords_w = np.sqrt(masses)[:, None] * (self.xyz - self.cms) A = np.array([np.diag(np.full(3, r)) for r in np.sum(coords_w ** 2, 1)]) B = coords_w[:, :, None] * coords_w[:, None, :] return np.sum(A - B, 0) def __getitem__(self, i: int) -> Atom: return self._atoms[i] @property def coords(self) -> List[Vec]: """coordinates""" return [atom.coord for atom in self] def __repr__(self) -> str: return "<{} '{}'>".format(self.__class__.__name__, self.formula) @property def xyz(self) -> Any: """coordinates as a numpy array""" return np.array(self.coords) @property def formula(self) -> str: """formula""" counter = DefaultDict[str, int](int) for species in self.species: counter[species] += 1 return ''.join(f'{sp}{n if n > 1 else ""}' for sp, n in sorted(counter.items())) def bondmatrix(self, scale: float) -> Any: """Return a connectivity matrix.""" xyz = self.xyz Rs = np.array([atom.covalent_radius for atom in self]) dmatrix = np.sqrt(np.sum((xyz[None, :] - xyz[:, None]) ** 2, 2)) thrmatrix = scale * (Rs[None, :] + Rs[:, None]) return dmatrix < thrmatrix def get_fragments(self, scale: float = 1.3) -> List['Molecule']: """Return a list of clusters of connected atoms.""" bond = self.bondmatrix(scale) ifragments = getfragments(bond) fragments = [ Molecule([self._atoms[i].copy() for i in fragment]) for fragment in ifragments ] return fragments def hash(self) -> int: if len(self) == 1: return self[0].number return hash(tuple(np.round(sorted(np.linalg.eigvalsh(self.inertia)), 3))) def shifted(self: _M, delta: Vec) -> _M: """Return a new molecule shifted in space.""" m = self.copy() for atom in m: c = atom.coord atom.coord = (c[0] + delta[0], c[1] + delta[1], c[2] + delta[2]) return m def __add__(self: _M, other: object) -> _M: if not isinstance(other, Molecule): return NotImplemented geom = self.copy() geom._atoms.extend(other.copy()) return geom def centered(self: _M) -> _M: """Return a new molecule with a center of mass at origin.""" return self.shifted(-self.cms) def rotated( self: _M, axis: Union[str, int] = None, phi: float = None, center: Vec = None, rotmat: Any = None, ) -> _M: """Return a new rotated molecule.""" if rotmat is None: assert axis and phi phi = phi * np.pi / 180 rotmat = np.array( [ [1, 0, 0], [0, np.cos(phi), -np.sin(phi)], [0, np.sin(phi), np.cos(phi)], ] ) if isinstance(axis, str): shift = {'x': 0, 'y': 1, 'z': 2}[axis] else: shift = axis for i in [0, 1]: rotmat = np.roll(rotmat, shift, i) center = np.array(center) if center else self.cms m = self.copy() for atom in m: atom.coord = tuple(center + rotmat.dot(atom.coord - center)) # type: ignore return m @property def centers(self) -> Iterator[Atom]: """iterator over atoms""" yield from self._atoms def __iter__(self) -> Iterator[Atom]: yield from (atom for atom in self._atoms if not atom.flags.get('ghost')) def __len__(self) -> int: return len([atom for atom in self._atoms if not atom.flags.get('ghost')]) def __format__(self, fmt: str) -> str: fp = StringIO() self.dump(fp, fmt) return fp.getvalue() def items(self) -> Iterator[Tuple[str, Vec]]: """iterator over tuples of atom type and coordinate""" for atom in self: yield atom.species, atom.coord dumps = __format__ def dump(self, f: IO[str], fmt: str) -> None: """Write a molecule to a file.""" if fmt == '': f.write(repr(self)) elif fmt == 'xyz': f.write('{}\n'.format(len(self))) f.write('Formula: {}\n'.format(self.formula)) for species, coord in self.items(): f.write( '{:>2} {}\n'.format( species, ' '.join('{:15.8}'.format(x) for x in no_neg_zeros(coord)), ) ) elif fmt == 'aims': for i, atom in enumerate(self.centers): species, r = atom.species, atom.coord ghost = atom.flags.get('ghost', False) key = (species, r, ghost, fmt) r = no_neg_zeros(r) try: f.write(_string_cache[key]) except KeyError: kind = 'atom' if not ghost else 'empty' s = f'{kind} {r[0]:15.8f} {r[1]:15.8f} {r[2]:15.8f} {species:>2}\n' f.write(s) _string_cache[key] = s for con in self.flags.get('constrains', {}).get(i, []): f.write(f'constrain_relaxation {con}\n') elif fmt == 'mopac': f.write('* Formula: {}\n'.format(self.formula)) for species, coord in self.items(): f.write( '{:>2} {}\n'.format( species, ' '.join('{:15.8} 1'.format(x) for x in no_neg_zeros(coord)), ) ) else: raise ValueError("Unknown format: '{}'".format(fmt)) def copy(self: _M) -> _M: """Create a cpoy.""" return type(self)([atom.copy() for atom in self._atoms]) def ghost(self: _M) -> _M: m = self.copy() for atom in m: atom.flags['ghost'] = True return m def write(self, filename: str) -> None: """Write to a file.""" ext = os.path.splitext(filename)[1] if ext == '.xyz': fmt = 'xyz' elif ext == '.xyzc': fmt = 'xyzc' elif ext == '.aims' or os.path.basename(filename) == 'geometry.in': fmt = 'aims' elif ext == '.mopac': fmt = 'mopac' with open(filename, 'w') as f: self.dump(f, fmt) class Crystal(Molecule): """Represents a crystal. Inherits from :class:`Molecule`. :param atoms: atoms :param lattice: lattice vectors """ def __init__(self, atoms: List[Atom], lattice: List[Vec], **flags: Any) -> None: super().__init__(atoms, **flags) self.lattice = lattice @classmethod def from_coords( # type: ignore cls, species: List[str], coords: List[Vec], lattice: List[Vec], **flags ) -> 'Crystal': """Alternative constructor. :param species: atom types :param coords: coordinates :param lattice: lattice vectors """ return cls( [Atom(sp, coord) for sp, coord in zip(species, coords)], lattice, **flags ) def dump(self, f: IO[str], fmt: str) -> None: if fmt == '': f.write(repr(self)) elif fmt == 'aims': for label, r in zip('abc', self.lattice): x, y, z = no_neg_zeros(r) f.write(f'lattice_vector {x:15.8f} {y:15.8f} {z:15.8f}\n') for con in self.flags.get('constrains', {}).get(label, []): f.write(f'constrain_relaxation {con}\n') super().dump(f, fmt) elif fmt == 'vasp': f.write(f'Formula: {self.formula}\n') f.write(f'{1:15.8f}\n') for r in self.lattice: x, y, z = no_neg_zeros(r) f.write(f'{x:15.8f} {y:15.8f} {z:15.8f}\n') species: Dict[str, List[Atom]] = OrderedDict( (sp, []) for sp in set(self.species) ) f.write(' '.join(species.keys()) + '\n') for atom in self: species[atom.species].append(atom) f.write(' '.join(str(len(atoms)) for atoms in species.values()) + '\n') f.write('cartesian\n') for atom in chain(*species.values()): r = no_neg_zeros(atom.coord) s = f'{r[0]:15.8f} {r[1]:15.8f} {r[2]:15.8f}\n' f.write(s) else: raise ValueError(f'Unknown format: {fmt!r}') def copy(self) -> 'Crystal': return Crystal([atom.copy() for atom in self._atoms], self.lattice.copy()) def rotated( self, axis: Union[str, int] = None, phi: float = None, center: Vec = None, rotmat: Any = None, ) -> 'Crystal': assert center is None g = super().rotated(axis, phi, (0, 0, 0), rotmat) m = Molecule.from_coords(['_'] * 3, self.lattice) m = m.rotated(axis, phi, (0, 0, 0), rotmat) g.lattice = m.coords return g @property def abc(self) -> Any: """latice vectors as a numpy array""" return np.array(self.lattice) def get_kgrid(self, density: float = 0.06) -> Tuple[int, int, int]: """Return a k-point grid with a given density.""" rec_lattice = 2 * np.pi * np.linalg.inv(self.abc.T) rec_lens = np.sqrt((rec_lattice ** 2).sum(1)) nkpts = np.ceil(rec_lens / (density * bohr)) return int(nkpts[0]), int(nkpts[1]), int(nkpts[2]) def supercell(self, ns: Tuple[int, int, int]) -> 'Crystal': """Create a supercell.""" abc = self.abc latt_vectors = np.array( [ sum(s * vec for s, vec in zip(shift, abc)) for shift in product(*map(range, ns)) ] ) species = list(chain.from_iterable(repeat(self.species, len(latt_vectors)))) coords = [ (x, y, z) for x, y, z in (self.xyz[None, :, :] + latt_vectors[:, None, :]).reshape( (-1, 3) ) ] lattice = [(x, y, z) for x, y, z in abc * np.array(ns)[:, None]] return Crystal.from_coords(species, coords, lattice) def normalized(self) -> 'Crystal': xyz = ( np.mod(self.xyz @ np.linalg.inv(self.lattice) + 1e-10, 1) - 1e-10 ) @ self.lattice return Crystal.from_coords(self.species, xyz, self.lattice.copy()) def get_vec(ws: List[str]) -> Vec: return float(ws[0]), float(ws[1]), float(ws[2]) def load(fp: IO[str], fmt: str) -> Molecule: # noqa: C901 """Read a molecule or a crystal from a file object.""" if fmt == 'xyz': n = int(fp.readline()) try: flags = json.loads(fp.readline()) except json.decoder.JSONDecodeError: flags = {} species = [] coords = [] for _ in range(n): ws = fp.readline().split() species.append(ws[0]) coords.append(get_vec(ws[1:4])) return Molecule.from_coords(species, coords, **flags) elif fmt == 'xyzc': n = int(fp.readline()) lattice = [] for _ in range(3): lattice.append(get_vec(fp.readline().split())) species = [] coords = [] for _ in range(n): ws = fp.readline().split() species.append(ws[0]) coords.append(get_vec(ws[1:4])) return Crystal.from_coords(species, coords, lattice) if fmt == 'aims': atoms = [] lattice = [] while True: line = fp.readline() if line == '': break line = line.strip() if not line or line.startswith('#'): continue ws = line.split() what = ws[0] if what in ['atom', 'empty']: atoms.append(Atom(ws[4], get_vec(ws[1:4]), ghost=what == 'empty')) elif what == 'lattice_vector': lattice.append(get_vec(ws[1:4])) if lattice: assert len(lattice) == 3 return Crystal(atoms, lattice) else: return Molecule(atoms) raise ValueError(f'Unknown format: {fmt}') def loads(s: str, fmt: str) -> Molecule: """Read a molecule or a crystal from a string.""" fp = StringIO(s) return load(fp, fmt) def readfile(path: str, fmt: str = None) -> Molecule: """Read a molecule or a crystal from a path.""" if not fmt: ext = os.path.splitext(path)[1] if ext == '.xyz': fmt = 'xyz' elif ext == '.aims' or os.path.basename(path) == 'geometry.in': fmt = 'aims' elif ext == '.xyzc': fmt = 'xyzc' else: raise RuntimeError('Cannot determine format') with open(path) as f: return load(f, fmt) def getfragments(C: Any) -> List[List[int]]: n = C.shape[0] assigned = [-1 for _ in range(n)] # fragment index, otherwise -1 ifragment = 0 # current fragment index queue = [0 for _ in range(n)] # allocate queue of neighbors for elem in range(n): # iterate over elements if assigned[elem] >= 0: # skip if assigned continue queue[0], a, b = elem, 0, 1 # queue starting with the element itself while b - a > 0: # until queue is exhausted node, a = queue[a], a + 1 # pop from queue assigned[node] = ifragment # assign node neighbors = np.flatnonzero(C[node, :]) # list of neighbors for neighbor in neighbors: if not (assigned[neighbor] >= 0 or neighbor in queue[a:b]): # add to queue if not assigned or in queue queue[b], b = neighbor, b + 1 ifragment += 1 fragments = [ [i for i, f in enumerate(assigned) if f == fragment] for fragment in range(ifragment) ] return fragments PK!u`mona/sci/tex.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from typing import Dict from jinja2 import Template def jinja_tex(tex_template: str, ctx: Dict[str, object]) -> str: jinja_template = Template( tex_template, variable_start_string=r'<<', variable_end_string='>>', block_start_string='<+', block_end_string='+>', comment_start_string='<#', comment_end_string='#>', trim_blocks=True, autoescape=False, ) return jinja_template.render(ctx) PK!۴==mona/sessions.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import warnings from functools import wraps from itertools import chain from collections import defaultdict import asyncio from contextvars import ContextVar from contextlib import contextmanager, asynccontextmanager from typing import ( Any, Dict, Callable, Optional, TypeVar, Iterator, NamedTuple, cast, Iterable, List, Tuple, Union, Awaitable, AsyncGenerator, FrozenSet, Sequence, ) from .hashing import Hash, Hashed from .tasks import Task, HashedFuture, State, maybe_hashed, Corofunc from .graph import ( traverse, traverse_async, NodeExecuted, Action, Priority, default_priority, NodeException, ) from .futures import STATE_COLORS from .utils import Literal, split, call_if from .errors import SessionError, TaskError, FutureError, MonaError from .pluggable import Plugin, Pluggable __version__ = '0.1.0' log = logging.getLogger(__name__) _T = TypeVar('_T') TaskExecute = Callable[[Task[Any], NodeExecuted[Task[Any]]], Awaitable[None]] ExceptionHandler = Callable[[Task[Any], Exception], bool] TaskFilter = Callable[[Task[Any]], bool] _active_session: ContextVar[Optional['Session']] = ContextVar( 'active_session', default=None ) class SessionPlugin(Plugin['Session']): def post_enter(self, sess: 'Session') -> None: pass def pre_exit(self, sess: 'Session') -> None: pass async def pre_run(self) -> None: pass async def post_run(self) -> None: pass def post_task_run(self, task: Task[Any]) -> None: pass def save_hashed(self, objs: Sequence[Hashed[Any]]) -> None: pass def ignored_exception(self) -> None: pass def wrap_execute(self, exe: TaskExecute) -> TaskExecute: return exe def post_create(self, task: Task[Any]) -> None: pass class Graph(NamedTuple): deps: Dict[Hash, FrozenSet[Hash]] side_effects: Dict[Hash, List[Hash]] backflow: Dict[Hash, FrozenSet[Hash]] class Session(Pluggable): """The Session context manager represents a session in which tasks can be created and executed. :param plugins: plugins to load. This is equivalent to calling each plugin with the created session as an argument :param warn: warn at the end of session if some created tasks were not executed """ def __init__( self, plugins: Iterable[SessionPlugin] = None, warn: bool = True ) -> None: Pluggable.__init__(self) for plugin in plugins or (): plugin(self) self._tasks: Dict[Hash, Task[Any]] = {} self._graph = Graph({}, defaultdict(list), {}) self._running_task: ContextVar[Optional[Task[Any]]] = ContextVar('running_task') self._running_task.set(None) self._storage: Dict[str, Any] = {} self._warn = warn self._skipped = False def _check_active(self) -> None: sess = _active_session.get() if sess is None or sess is not self: raise SessionError(f'Not active: {self!r}', self) @property def storage(self) -> Dict[str, Any]: """A ganeral string-based dictionary available when a session is active.""" self._check_active() return self._storage def get_side_effects(self, task: Task[Any]) -> Iterable[Task[Any]]: return tuple(self._tasks[h] for h in self._graph.side_effects[task.hashid]) def all_tasks(self) -> Iterable[Task[object]]: yield from self._tasks.values() def __enter__(self) -> 'Session': assert _active_session.get() is None self._active_session_token = _active_session.set(self) self.run_plugins('post_enter', self, start=None) return self def _filter_tasks(self, cond: TaskFilter) -> List[Task[Any]]: return list(filter(cond, self._tasks.values())) def __exit__(self, exc_type: Any, *args: Any) -> None: assert _active_session.get() is self self.run_plugins('pre_exit', self, start=None) _active_session.reset(self._active_session_token) del self._active_session_token if self._warn and not self._skipped and exc_type is None: tasks_not_run = self._filter_tasks(lambda t: t.state < State.RUNNING) if tasks_not_run: warnings.warn(f'tasks have never run: {tasks_not_run}', RuntimeWarning) self._tasks.clear() self._storage.clear() self._graph.deps.clear() self._graph.side_effects.clear() self._graph.backflow.clear() @property def running_task(self) -> Task[Any]: """Currently running task. This would be usually used from within a task coroutine. """ task = self._running_task.get() if task: return task raise SessionError(f'No running task: {self!r}', self) @contextmanager def _running_task_ctx(self, task: Task[Any]) -> Iterator[None]: assert not self._running_task.get() self._running_task.set(task) try: yield finally: assert self._running_task.get() is task self._running_task.set(None) def _process_objects(self, objs: Iterable[Hashed[Any]]) -> List[Task[Any]]: objs = list( traverse(objs, lambda o: o.components, lambda o: isinstance(o, Task)) ) tasks, objs = cast( Tuple[List[Task[Any]], List[Hashed[Any]]], split(objs, lambda o: isinstance(o, Task)), ) for task in tasks: if task.hashid not in self._tasks: raise TaskError(f'Not in session: {task!r}', task) self.run_plugins('save_hashed', objs, start=None) return tasks def register_task(self, task: Task[_T]) -> Tuple[Task[_T], bool]: try: return self._tasks[task.hashid], False except KeyError: pass self._tasks[task.hashid] = task task.register() arg_tasks = self._process_objects(task.args) self._graph.deps[task.hashid] = frozenset(t.hashid for t in arg_tasks) return task, True def add_side_effect_of(self, caller: Task[object], callee: Task[object]) -> None: self._graph.side_effects[caller.hashid].append(callee.hashid) def create_task( self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any ) -> Task[_T]: """Create a new task. :param corofunc: a coroutine function to be executed :param args: arguments to the coroutine :param kwargs: keyword arguments passed to :class:`~mona.tasks.Task` """ task = Task(corofunc, *args, **kwargs) caller = self._running_task.get() if caller: self.add_side_effect_of(caller, task) task, registered = self.register_task(task) if registered: self.run_plugins('post_create', task, start=None) return task @asynccontextmanager async def run_context(self) -> AsyncGenerator[None, None]: await self.run_plugins_async('pre_run', start=None) try: yield finally: await self.run_plugins_async('post_run', start=None) async def _run_task(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: async with self.run_context(): return await self.run_task_async(task) def run_task(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: """Run a task. :param task: task to run Return the result of the task's coroutine function or it's hashed instance if hashable. """ return asyncio.run(self._run_task(task)) def set_result(self, task: Task[_T], result: Union[_T, Hashed[_T]]) -> None: if not isinstance(result, Hashed): task.set_result(result) return if not isinstance(result, HashedFuture) or result.done(): task.set_result(result) else: log.debug(f'{task}: has run, pending: {result}') task.set_future_result(result) result.add_done_callback(lambda fut: task.set_done()) result.register() backflow = self._process_objects([result]) self._graph.backflow[task.hashid] = frozenset(t.hashid for t in backflow) async def run_task_async(self, task: Task[_T]) -> Union[_T, Hashed[_T]]: if task.state < State.READY: raise TaskError(f'Not ready: {task!r}', task) if task.state > State.READY: raise TaskError(f'Task was already run: {task!r}', task) task.set_running() with self._running_task_ctx(task): raw_result = await task.corofunc(*(arg.value for arg in task.args)) task.set_has_run() side_effects = self.get_side_effects(task) if side_effects: log.debug(f'{task}: created tasks: {list(map(Literal, side_effects))}') result = cast(_T, maybe_hashed(raw_result)) or raw_result self.set_result(task, result) self.run_plugins('post_task_run', task, start=None) return result async def _traverse_execute( self, task: Task[Any], done: NodeExecuted[Task[Any]] ) -> None: await self.run_task_async(task) backflow = (self._tasks[h] for h in self._graph.backflow.get(task.hashid, ())) done((task, None, backflow)) def eval(self, *args: Any, **kwargs: Any) -> Any: """Blocking version of :meth:`eval_async`.""" return asyncio.run(self.eval_async(*args, **kwargs)) # TODO reduce complexity async def _eval_async( # noqa: C901 self, obj: Any, depth: bool = False, priority: Priority = default_priority, exception_handler: ExceptionHandler = None, task_filter: TaskFilter = None, limit: int = None, ) -> Any: """Evaluate the given object by running all tasks it contains as well as any new generated tasks. :param obj: any hashable object :param depth: traverse DAG depth-first if true, breadth-first otherwise :param priority: prioritize steps in DAG traversal in order :param exception_handler: callable that accepts a task and an exception it raised and returns True if the exception should be ignored :param task_filter: callable that accepts a task and returns True if the task should be executed :param limit: limit of the number of executed task Return the evaluated object. """ fut = maybe_hashed(obj) if not isinstance(fut, HashedFuture): return obj fut.register() exceptions = {} traversal = traverse_async( self._process_objects([fut]), lambda task: ( self._tasks[h] for h in chain( self._graph.deps[task.hashid], self._graph.backflow.get(task.hashid, ()), ) ), lambda task, reg: call_if( task.state < State.RUNNING, task.add_ready_callback, lambda t: reg(t) ), self.run_plugins('wrap_execute', start=self._traverse_execute), depth, priority, ) n_executed = 0 shutdown = False do_step: bool = None # type: ignore while True: try: step_or_exception = await traversal.asend(do_step) except StopAsyncIteration: break if isinstance(step_or_exception, NodeException): task, exc = step_or_exception if isinstance(exc, (MonaError, AssertionError, asyncio.CancelledError)): raise exc else: assert isinstance(exc, Exception) if exception_handler and exception_handler(task, exc): self.run_plugins('ignored_exception', start=None) exceptions[task] = exc task.set_error() log.info(f'Handled {exc!r} from {task!r}') do_step = True continue raise exc action, task, progress = step_or_exception progress_line = ' '.join(f'{k}={v}' for k, v in progress.items()) tag = action.name if task: tag += f': {task.label}' log.debug(f'{tag}, progress: {progress_line}') if action is Action.EXECUTE: do_step = not shutdown if do_step: n_executed += 1 if limit: assert n_executed <= limit if n_executed == limit: log.info('Maximum number of executed tasks reached') shutdown = True log.info(f'{task}: will run') elif action is Action.TRAVERSE: if task.done(): do_step = False elif not task_filter: do_step = True else: do_step = task_filter(task) if not do_step: self._skipped = True log.info('Finished') try: return fut.value except FutureError: if exceptions: msg = f'Cannot evaluate future because of errors: {exceptions}' log.warning(msg) elif self._skipped: log.info('Cannot evaluate future because tasks were skipped') else: tasks_not_done = self._filter_tasks(lambda t: not t.done()) if tasks_not_done: msg = f'Task dependency cycle: {tasks_not_done}' raise MonaError(msg) else: raise return fut @wraps(_eval_async) async def eval_async(self, *args: Any, **kwargs: Any) -> Any: async with self.run_context(): return await self._eval_async(*args, **kwargs) def dot_graph(self, *args: Any, **kwargs: Any) -> Any: """Generate `graphviz.Digraph `_ for the task DAG. Requires `Graphviz `_. """ from graphviz import Digraph # type: ignore tasks: Union[List[Hash], FrozenSet[Hash]] dot = Digraph(*args, **kwargs) for child, parents in self._graph.deps.items(): task_obj = self._tasks[child] dot.node(child, repr(Literal(task_obj)), color=STATE_COLORS[task_obj.state]) for parent in parents: dot.edge(child, parent) for origin, tasks in self._graph.side_effects.items(): for task in tasks: dot.edge(origin, task, style='dotted') for target, tasks in self._graph.backflow.items(): for task in tasks: dot.edge( task, target, style='tapered', penwidth='7', dir='back', arrowtail='none', ) return dot @classmethod def active(cls) -> 'Session': session = _active_session.get() if session is None: raise MonaError('No active session') return session PK!qo/)) mona/tasks.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging import json from abc import abstractmethod import asyncio import inspect import pickle from typing import ( Callable, Optional, List, TypeVar, cast, Tuple, Union, Awaitable, Dict, Iterable, ) from .futures import Future, State from .hashing import ( Hashed, Composite, HashedCompositeLike, HashedComposite, hash_function, HashResolver, ) from .utils import get_fullname, Maybe, Empty, swap_type, import_fullname from .errors import FutureError, TaskError, CompositeError log = logging.getLogger(__name__) _T = TypeVar('_T') _T_co = TypeVar('_T_co', covariant=True) _U = TypeVar('_U') Corofunc = Callable[..., Awaitable[_T]] def ensure_hashed(obj: object) -> Hashed[object]: """Turn any object into a Hashed object. Return Hashed objects without change. Wraps composites into a TaskComposite or a HashedComposite. Raises InvalidJSONObject when not possible. """ obj = swap_type(obj, TaskComposite.type_swaps) if isinstance(obj, Hashed): return obj jsonstr, components = TaskComposite.parse_object(obj) if any(isinstance(comp, HashedFuture) for comp in components): return TaskComposite(jsonstr, components) return HashedComposite(jsonstr, components) def maybe_hashed(obj: object) -> Optional[Hashed[object]]: """Wraps maybe_hashed() with return value None on error.""" try: return ensure_hashed(obj) except CompositeError: return None # Although this class could be hashable in principle, this would require # dispatching all futures via a session in the same way that tasks are. # See test_identical_futures() for an example of what wouldn't work. class HashedFuture(Hashed[_T_co], Future): """ Represents a hashed future. Inherits abstract methods spec() and label() from Hashed, implements abstract property value and adds abstract method result(). """ @property @abstractmethod def spec(self) -> bytes: ... @property @abstractmethod def label(self) -> str: ... @abstractmethod def result(self) -> _T_co: ... @property def value(self) -> _T_co: if self.done(): return self.result() raise FutureError(f'Not done: {self!r}', self) def default_result(self) -> _T_co: raise FutureError(f'No default: {self!r}', self) @property def value_or_default(self) -> _T_co: if self.done(): return self.result() return self.default_result() def __repr__(self) -> str: return f'<{self.__class__.__name__} {self} state={self.state.name}>' class Task(HashedFuture[_T_co]): def __init__( self, corofunc: Corofunc[_T_co], *args: object, label: str = None, default: Maybe[_T_co] = Empty._, rule: str = None, ) -> None: self._corofunc = corofunc self._args = tuple(map(ensure_hashed, args)) Hashed.__init__(self) Future.__init__( self, (arg for arg in self._args if isinstance(arg, HashedFuture)) ) self._default = default if label: self._label = label else: arg_list = ', '.join(a.label for a in self._args) arg_list = arg_list if len(arg_list) < 50 else '...' self._label = f'{self._corofunc.__qualname__}({arg_list})' self._result: Union[_T_co, Hashed[_T_co], Empty] = Empty._ self._storage: Dict[str, object] = {} self._rule = rule @property def spec(self) -> bytes: return json.dumps( [ get_fullname(self._corofunc), hash_function(self._corofunc), *(fut.hashid for fut in self._args), ] ).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'Task[_T_co]': rule_name, corohash, *arg_hashes = json.loads(spec) corofunc = import_fullname(rule_name).corofunc assert inspect.iscoroutinefunction(corofunc) assert hash_function(corofunc) == corohash args = (resolve(h) for h in arg_hashes) return cls(corofunc, *args) @property def label(self) -> str: return self._label def result(self) -> _T_co: return self.resolve(lambda res: res.value) @property def corofunc(self) -> Corofunc[_T_co]: return self._corofunc @property def args(self) -> Tuple[Hashed[object], ...]: return self._args @property def rule(self) -> Optional[str]: return self._rule @property def storage(self) -> Dict[str, object]: return self._storage def __getitem__(self, key: object) -> 'TaskComponent[object]': return self.get(key) def get( self, key: object, default: Maybe[object] = Empty._ ) -> 'TaskComponent[object]': return TaskComponent(self, [key], default) def resolve( self, handler: Callable[[Hashed[_T_co]], _U] = None ) -> Union[_U, _T_co]: if isinstance(self._result, Empty): raise TaskError(f'Has not run: {self!r}', self) if not isinstance(self._result, Hashed): return self._result handler = handler or (lambda x: x) # type: ignore return handler(self._result) # type: ignore def default_result(self) -> _T_co: if not isinstance(self._default, Empty): return self._default if isinstance(self._result, HashedFuture): return cast(HashedFuture[_T_co], self._result).default_result() raise TaskError(f'Has no defualt: {self!r}', self) def metadata(self) -> Optional[bytes]: return pickle.dumps((self._default, self._label, self._rule)) def set_metadata(self, metadata: bytes) -> None: self._default, self._label, self._rule = pickle.loads(metadata) def set_running(self) -> None: assert self._state is State.READY self._state = State.RUNNING def set_error(self) -> None: assert self._state is State.RUNNING self._state = State.ERROR def set_has_run(self) -> None: assert self._state is State.RUNNING self._state = State.HAS_RUN def set_result(self, result: Union[_T_co, Hashed[_T_co]]) -> None: assert self._state is State.HAS_RUN assert not isinstance(result, HashedFuture) or result.done() self._result = result self.set_done() def set_future_result(self, result: HashedFuture[_T_co]) -> None: assert self.state is State.HAS_RUN assert not result.done() self._state = State.AWAITING self._result = result def future_result(self) -> HashedFuture[_T_co]: if self._state < State.AWAITING: raise TaskError(f'Do not have future: {self!r}', self) if self._state > State.AWAITING: raise TaskError(f'Already done: {self!r}', self) assert isinstance(self._result, HashedFuture) return self._result def call(self) -> _T_co: return asyncio.run(self.call_async()) async def call_async(self) -> _T_co: args = [ arg.value_or_default if isinstance(arg, HashedFuture) else arg.value for arg in self.args ] return await self._corofunc(*args) class TaskComponent(HashedFuture[_T_co]): def __init__( self, task: Task[object], keys: List[object], default: Maybe[_T_co] = Empty._ ) -> None: self._task = task self._keys = keys Hashed.__init__(self) Future.__init__(self, [cast(HashedFuture[object], task)]) self._default = default self._label = ''.join([self._task.label, *(f'[{k!r}]' for k in self._keys)]) self.add_ready_callback(lambda self: self.set_done()) @property def spec(self) -> bytes: return json.dumps([self._task.hashid, *self._keys]).encode() @classmethod def from_spec(cls, spec: bytes, resolve: HashResolver) -> 'TaskComponent[_T_co]': task_hash, *keys = json.loads(spec) task = cast(Task[object], resolve(task_hash)) return cls(task, keys) @property def label(self) -> str: return self._label @property def components(self) -> Iterable[Hashed[object]]: return (self._task,) def result(self) -> _T_co: return self.resolve(lambda task: task.result()) def __getitem__(self, key: object) -> 'TaskComponent[object]': return self.get(key) def get( self, key: object, default: Maybe[object] = Empty._ ) -> 'TaskComponent[object]': return TaskComponent(self._task, self._keys + [key], default) @property def task(self) -> Task[object]: return self._task def resolve(self, handler: Callable[[Task[object]], object]) -> _T_co: obj = handler(self._task) for key in self._keys: obj = obj[key] # type: ignore return cast(_T_co, obj) def default_result(self) -> _T_co: if not isinstance(self._default, Empty): return self._default return self.resolve(lambda task: task.default_result()) def metadata(self) -> Optional[bytes]: return pickle.dumps(self._default) def set_metadata(self, metadata: bytes) -> None: self._default = pickle.loads(metadata) # the semantics may imply that the component is taken immediately after # execution, but it is only taken by the child task, so that if the component # does not exist, the exception is raised only later class TaskComposite(HashedCompositeLike, HashedFuture[Composite]): # type: ignore def __init__(self, jsonstr: str, components: Iterable[Hashed[object]]) -> None: components = list(components) futures = [comp for comp in components if isinstance(comp, HashedFuture)] assert futures Future.__init__(self, futures) HashedCompositeLike.__init__(self, jsonstr, components) self.add_ready_callback(lambda self: self.set_done()) # override abstract property in HashedCompositeLike value = HashedFuture.value # type: ignore def result(self) -> Composite: return self.resolve(lambda comp: comp.value) def default_result(self) -> Composite: return self.resolve( lambda comp: cast(object, comp.value_or_default) if isinstance(comp, HashedFuture) else comp.value ) PK!K mona/utils.py# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import stat import importlib from enum import Enum from datetime import datetime from typing import Any, Callable, TypeVar, Union, List, Tuple, Iterable, Dict, Type _T = TypeVar('_T') _V = TypeVar('_V') Maybe = Union[_T, 'Empty'] Pathable = Union[str, 'os.PathLike[str]'] TypeSwaps = Dict[Type[Any], Callable[[Any], Any]] # Ideally Empty.EMPTY could be used directly, but mypy doesn't understand that # yet, so isisntance() it is. class Empty(Enum): """Absence of a value.""" _ = 0 def get_fullname(obj: Union[Callable[..., Any], Type[Any]]) -> str: return f'{obj.__module__}:{obj.__qualname__}' def import_fullname(fullname: str) -> Any: module_name, qualname = fullname.split(':') module = importlib.import_module(module_name) return getattr(module, qualname) def shorten_text(s: Union[str, bytes], n: int) -> str: if len(s) > n: s = s[: n - 3] shortened = True else: shortened = False if isinstance(s, bytes): try: text = s.decode() except UnicodeDecodeError: return '' else: text = s return f'{text.rstrip()}...' if shortened else text class Literal(str): def __repr__(self) -> str: return str.__repr__(self)[1:-1] def swap_type(o: Any, swaps: TypeSwaps) -> Any: if o.__class__ in swaps: return swaps[o.__class__](o) return o # TODO ignore existing permissions def make_executable(path: Pathable) -> None: st = os.stat(path) os.chmod(path, st.st_mode | stat.S_IEXEC) def make_nonwritable(path: Pathable) -> None: os.chmod( path, stat.S_IMODE(os.lstat(path).st_mode) & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH), ) def make_writable(path: Pathable) -> None: os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) | stat.S_IWUSR) def get_timestamp() -> str: return datetime.now().isoformat(timespec='seconds') def call_if(cond: bool, func: Callable[..., None], *args: Any, **kwargs: Any) -> None: if cond: func(*args, **kwargs) def split( iterable: Iterable[_T], first: Callable[[_T], bool] ) -> Tuple[List[_T], List[_T]]: left: List[_T] = [] right: List[_T] = [] for item in iterable: (left if first(item) else right).append(item) return left, right def groupby(iterable: Iterable[_T], key: Callable[[_T], _V]) -> Dict[_V, List[_T]]: groups: Dict[_V, List[_T]] = {} for x in iterable: groups.setdefault(key(x), []).append(x) return groups PK!H#u(*%mona-0.2.1.dist-info/entry_points.txtN+I/N.,()Kz9V@PK!xFVAVAmona-0.2.1.dist-info/LICENSEMozilla Public License Version 2.0 ================================== 1. Definitions -------------- 1.1. "Contributor" means each individual or legal entity that creates, contributes to the creation of, or owns Covered Software. 1.2. "Contributor Version" means the combination of the Contributions of others (if any) used by a Contributor and that particular Contributor's Contribution. 1.3. "Contribution" means Covered Software of a particular Contributor. 1.4. "Covered Software" means Source Code Form to which the initial Contributor has attached the notice in Exhibit A, the Executable Form of such Source Code Form, and Modifications of such Source Code Form, in each case including portions thereof. 1.5. "Incompatible With Secondary Licenses" means (a) that the initial Contributor has attached the notice described in Exhibit B to the Covered Software; or (b) that the Covered Software was made available under the terms of version 1.1 or earlier of the License, but not also under the terms of a Secondary License. 1.6. "Executable Form" means any form of the work other than Source Code Form. 1.7. "Larger Work" means a work that combines Covered Software with other material, in a separate file or files, that is not Covered Software. 1.8. "License" means this document. 1.9. "Licensable" means having the right to grant, to the maximum extent possible, whether at the time of the initial grant or subsequently, any and all of the rights conveyed by this License. 1.10. "Modifications" means any of the following: (a) any file in Source Code Form that results from an addition to, deletion from, or modification of the contents of Covered Software; or (b) any new file in Source Code Form that contains any Covered Software. 1.11. "Patent Claims" of a Contributor means any patent claim(s), including without limitation, method, process, and apparatus claims, in any patent Licensable by such Contributor that would be infringed, but for the grant of the License, by the making, using, selling, offering for sale, having made, import, or transfer of either its Contributions or its Contributor Version. 1.12. "Secondary License" means either the GNU General Public License, Version 2.0, the GNU Lesser General Public License, Version 2.1, the GNU Affero General Public License, Version 3.0, or any later versions of those licenses. 1.13. "Source Code Form" means the form of the work preferred for making modifications. 1.14. "You" (or "Your") means an individual or a legal entity exercising rights under this License. For legal entities, "You" includes any entity that controls, is controlled by, or is under common control with You. For purposes of this definition, "control" means (a) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (b) ownership of more than fifty percent (50%) of the outstanding shares or beneficial ownership of such entity. 2. License Grants and Conditions -------------------------------- 2.1. Grants Each Contributor hereby grants You a world-wide, royalty-free, non-exclusive license: (a) under intellectual property rights (other than patent or trademark) Licensable by such Contributor to use, reproduce, make available, modify, display, perform, distribute, and otherwise exploit its Contributions, either on an unmodified basis, with Modifications, or as part of a Larger Work; and (b) under Patent Claims of such Contributor to make, use, sell, offer for sale, have made, import, and otherwise transfer either its Contributions or its Contributor Version. 2.2. Effective Date The licenses granted in Section 2.1 with respect to any Contribution become effective for each Contribution on the date the Contributor first distributes such Contribution. 2.3. Limitations on Grant Scope The licenses granted in this Section 2 are the only rights granted under this License. No additional rights or licenses will be implied from the distribution or licensing of Covered Software under this License. Notwithstanding Section 2.1(b) above, no patent license is granted by a Contributor: (a) for any code that a Contributor has removed from Covered Software; or (b) for infringements caused by: (i) Your and any other third party's modifications of Covered Software, or (ii) the combination of its Contributions with other software (except as part of its Contributor Version); or (c) under Patent Claims infringed by Covered Software in the absence of its Contributions. This License does not grant any rights in the trademarks, service marks, or logos of any Contributor (except as may be necessary to comply with the notice requirements in Section 3.4). 2.4. Subsequent Licenses No Contributor makes additional grants as a result of Your choice to distribute the Covered Software under a subsequent version of this License (see Section 10.2) or under the terms of a Secondary License (if permitted under the terms of Section 3.3). 2.5. Representation Each Contributor represents that the Contributor believes its Contributions are its original creation(s) or it has sufficient rights to grant the rights to its Contributions conveyed by this License. 2.6. Fair Use This License is not intended to limit any rights You have under applicable copyright doctrines of fair use, fair dealing, or other equivalents. 2.7. Conditions Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in Section 2.1. 3. Responsibilities ------------------- 3.1. Distribution of Source Form All distribution of Covered Software in Source Code Form, including any Modifications that You create or to which You contribute, must be under the terms of this License. You must inform recipients that the Source Code Form of the Covered Software is governed by the terms of this License, and how they can obtain a copy of this License. You may not attempt to alter or restrict the recipients' rights in the Source Code Form. 3.2. Distribution of Executable Form If You distribute Covered Software in Executable Form then: (a) such Covered Software must also be made available in Source Code Form, as described in Section 3.1, and You must inform recipients of the Executable Form how they can obtain a copy of such Source Code Form by reasonable means in a timely manner, at a charge no more than the cost of distribution to the recipient; and (b) You may distribute such Executable Form under the terms of this License, or sublicense it under different terms, provided that the license for the Executable Form does not attempt to limit or alter the recipients' rights in the Source Code Form under this License. 3.3. Distribution of a Larger Work You may create and distribute a Larger Work under terms of Your choice, provided that You also comply with the requirements of this License for the Covered Software. If the Larger Work is a combination of Covered Software with a work governed by one or more Secondary Licenses, and the Covered Software is not Incompatible With Secondary Licenses, this License permits You to additionally distribute such Covered Software under the terms of such Secondary License(s), so that the recipient of the Larger Work may, at their option, further distribute the Covered Software under the terms of either this License or such Secondary License(s). 3.4. Notices You may not remove or alter the substance of any license notices (including copyright notices, patent notices, disclaimers of warranty, or limitations of liability) contained within the Source Code Form of the Covered Software, except that You may alter any license notices to the extent required to remedy known factual inaccuracies. 3.5. Application of Additional Terms You may choose to offer, and to charge a fee for, warranty, support, indemnity or liability obligations to one or more recipients of Covered Software. However, You may do so only on Your own behalf, and not on behalf of any Contributor. You must make it absolutely clear that any such warranty, support, indemnity, or liability obligation is offered by You alone, and You hereby agree to indemnify every Contributor for any liability incurred by such Contributor as a result of warranty, support, indemnity or liability terms You offer. You may include additional disclaimers of warranty and limitations of liability specific to any jurisdiction. 4. Inability to Comply Due to Statute or Regulation --------------------------------------------------- If it is impossible for You to comply with any of the terms of this License with respect to some or all of the Covered Software due to statute, judicial order, or regulation then You must: (a) comply with the terms of this License to the maximum extent possible; and (b) describe the limitations and the code they affect. Such description must be placed in a text file included with all distributions of the Covered Software under this License. Except to the extent prohibited by statute or regulation, such description must be sufficiently detailed for a recipient of ordinary skill to be able to understand it. 5. Termination -------------- 5.1. The rights granted under this License will terminate automatically if You fail to comply with any of its terms. However, if You become compliant, then the rights granted under this License from a particular Contributor are reinstated (a) provisionally, unless and until such Contributor explicitly and finally terminates Your grants, and (b) on an ongoing basis, if such Contributor fails to notify You of the non-compliance by some reasonable means prior to 60 days after You have come back into compliance. Moreover, Your grants from a particular Contributor are reinstated on an ongoing basis if such Contributor notifies You of the non-compliance by some reasonable means, this is the first time You have received notice of non-compliance with this License from such Contributor, and You become compliant prior to 30 days after Your receipt of the notice. 5.2. If You initiate litigation against any entity by asserting a patent infringement claim (excluding declaratory judgment actions, counter-claims, and cross-claims) alleging that a Contributor Version directly or indirectly infringes any patent, then the rights granted to You by any and all Contributors for the Covered Software under Section 2.1 of this License shall terminate. 5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user license agreements (excluding distributors and resellers) which have been validly granted by You or Your distributors under this License prior to termination shall survive termination. ************************************************************************ * * * 6. Disclaimer of Warranty * * ------------------------- * * * * Covered Software is provided under this License on an "as is" * * basis, without warranty of any kind, either expressed, implied, or * * statutory, including, without limitation, warranties that the * * Covered Software is free of defects, merchantable, fit for a * * particular purpose or non-infringing. The entire risk as to the * * quality and performance of the Covered Software is with You. * * Should any Covered Software prove defective in any respect, You * * (not any Contributor) assume the cost of any necessary servicing, * * repair, or correction. This disclaimer of warranty constitutes an * * essential part of this License. No use of any Covered Software is * * authorized under this License except under this disclaimer. * * * ************************************************************************ ************************************************************************ * * * 7. Limitation of Liability * * -------------------------- * * * * Under no circumstances and under no legal theory, whether tort * * (including negligence), contract, or otherwise, shall any * * Contributor, or anyone who distributes Covered Software as * * permitted above, be liable to You for any direct, indirect, * * special, incidental, or consequential damages of any character * * including, without limitation, damages for lost profits, loss of * * goodwill, work stoppage, computer failure or malfunction, or any * * and all other commercial damages or losses, even if such party * * shall have been informed of the possibility of such damages. This * * limitation of liability shall not apply to liability for death or * * personal injury resulting from such party's negligence to the * * extent applicable law prohibits such limitation. Some * * jurisdictions do not allow the exclusion or limitation of * * incidental or consequential damages, so this exclusion and * * limitation may not apply to You. * * * ************************************************************************ 8. Litigation ------------- Any litigation relating to this License may be brought only in the courts of a jurisdiction where the defendant maintains its principal place of business and such litigation shall be governed by laws of that jurisdiction, without reference to its conflict-of-law provisions. Nothing in this Section shall prevent a party's ability to bring cross-claims or counter-claims. 9. Miscellaneous ---------------- This License represents the complete agreement concerning the subject matter hereof. If any provision of this License is held to be unenforceable, such provision shall be reformed only to the extent necessary to make it enforceable. Any law or regulation which provides that the language of a contract shall be construed against the drafter shall not be used to construe this License against a Contributor. 10. Versions of the License --------------------------- 10.1. New Versions Mozilla Foundation is the license steward. Except as provided in Section 10.3, no one other than the license steward has the right to modify or publish new versions of this License. Each version will be given a distinguishing version number. 10.2. Effect of New Versions You may distribute the Covered Software under the terms of the version of the License under which You originally received the Covered Software, or under the terms of any subsequent version published by the license steward. 10.3. Modified Versions If you create software not governed by this License, and you want to create a new license for such software, you may create and use a modified version of this License if you rename the license and remove any references to the name of the license steward (except to note that such modified license differs from this License). 10.4. Distributing Source Code Form that is Incompatible With Secondary Licenses If You choose to distribute Source Code Form that is Incompatible With Secondary Licenses under the terms of this version of the License, the notice described in Exhibit B of this License must be attached. Exhibit A - Source Code Form License Notice ------------------------------------------- This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. If it is not possible or desirable to put the notice in a particular file, then You may include the notice in a location (such as a LICENSE file in a relevant directory) where a recipient would be likely to look for such a notice. You may add additional accurate notices of copyright ownership. Exhibit B - "Incompatible With Secondary Licenses" Notice --------------------------------------------------------- This Source Code Form is "Incompatible With Secondary Licenses", as defined by the Mozilla Public License, v. 2.0. PK!H$TTmona-0.2.1.dist-info/WHEEL = 0 нR \C HCoo~ \B"|lchлYh|hzWٓ7}|v }PK!HXw mona-0.2.1.dist-info/METADATAV{o6_m1,vw YX"UR|ݑrg&MHϸ?`jI/ B+wWErNy.;cSm{]@\Εv3$HuS/;EQOI _\` TCe>`oDEl|D fx?GӜ[+PԐqWY6# 왪ct9l@>(*T%fqXfz F}`'eit 3`ՌAAsD:7z+SnլuH-_'6~F{EzfRTُd.a-3\[g\z/b#/絼bWԁu睴ΛkJI=ѧ; \C9BE6I;UD&.wo者}bqD1*%AzIoicFvd%8Gk6(4x:6˴\@z=@y4nYv8D~EDh匜.:D?,Pq8%/ͮEl~O^_Lǜ%o׆{5aAҢ*Kt4EYP7Pj+6#EXadI1J ŗ0!E)^Lߨ(z>>zr5d}me1K\BY'_K"XLbY`65tV (ʹ|$f 'WMJ\*i)D9aBIc`%^,6؃ד9-׽kȁ[˼cA1oz8--ei$ד-}8=4>kn6l~zh<ɹX.ľԩN q&=s\e(ϢʣtuqO+%X05˅ۅMs&9s|LOV:+?N}RExUU$A^vIO&NmG(,uXIr4 +6{$0zNp!^\rrv$2Y,QYqÇ` S8>R8]Ŕ?>퐘ΉWJț]r2CiJ3!/6.m#P3^O#6 ?y' )-ooX456 !8vQ嘩1X*_Q~Gć8nm{13@٪]TNZ)Sï5Z W^V~+Zkڇ?(Q8/mߛidža )h#l'UK^ !O&#u]1bwM:t\kGPK!Hz C- mona-0.2.1.dist-info/RECORDmַ|f@! H88az鞥g;oW]{^VehVO}vvm'+lju]Z(֕ˣ(w_{βKfdc e,o6(O",$@)g]k#*&Of-퇨Y&C{q=;'6n@K[K4v/Uc>B'%z!- Вv"E`~,p}`Q#BC1Ϭ> "6Ug|/L*Hn>{D1C$~; G %]S6$ *_NMS&:Ujb&k[ / sK?DRGLߧY|׌`]sv=p͏>^)RB!e"-[Tsk[{UИا:A9=X~C}qq.#C+!WhdR2۳fԐeU҈lGy_W;(1\/_sq5Vڤ6vr >ޔcٖdIxDw۹| w($?PU섟RVd!=·@$%K:,c& %ٟ^tZR,z)m< g mD$v4I~l{1[Α|:P^+Q]!|Sk/˸܁tJy2hq#]S:&D $^Mu;N^@Lpjv@X1xu!/NpXm!Se-JE$*2R2i2:cZX+_H!@Ƕc ~_SH]h}Hr`E@Ǵ4-R%h"-tN>f[7RiSI3{m۪.d*'U~aW4JmecH}7sNs/rZa.lNz"k#.x6!_^>Eɢrz`(F *'(܀CxZCW"yj\n5@?Q WQw||Z%"h_/Flyƶ^R:s`DƍD{QovȞ|aF~??~p቙"M;YL[\ĎQ]u5 |G9OJK\{/ CsRg:fܲ5@* *\rM؛<"tOurhcYABܪ ﳺwTnb,IS#iWii~Zp sg">$Dx6!7T[8ڝsں(q#~Ka>(LT]cBiq|C)Rh{-p?(a%{="~6"QSX98E,[I]Q 'sB_PK!Ĩ>nnmona/__init__.pyPK!į**mona/cli/__init__.pyPK!r  mona/cli/app.pyPK! r: mona/cli/cli.pyPK! &&&mona/cli/glob.pyPK!}] ] e*mona/cli/table.pyPK!4mona/errors.pyPK!9d 9mona/files.pyPK!~> > KMmona/futures.pyPK!2fH   Xmona/graph.pyPK!(%nmona/hashing/__init__.pyPK!إbRRomona/hashing/func.pyPK!))Smona/hashing/hashing.pyPK!W,, mona/json.pyPK!o  mona/pluggable.pyPK!^- yyLmona/plugins/__init__.pyPK!rjg*g*mona/plugins/cache.pyPK! >>mona/plugins/files.pyPK!h*mona/plugins/parallel.pyPK!NCmona/plugins/tmpdir.pyPK!lEEmona/rules/__init__.pyPK!Ұ>''Kmona/rules/dirtask.pyPK!s ,mona/rules/rules.pyPK!큐::mona/runners.pyPK!*-mona/sci/__init__.pyPK!FF\-mona/sci/aims/__init__.pyPK!F22-mona/sci/aims/aims.pyPK!{UQ>Hmona/sci/aims/aims.txPK!GvvOmona/sci/aims/dsl.pyPK!Ly y Smona/sci/aims/parse.pyPK!Nxs]mona/sci/atom-data.csvPK!0D0D>lmona/sci/geomlib.pyPK!u`mona/sci/tex.pyPK!۴==kmona/sessions.pyPK!qo/)) Mmona/tasks.pyPK!K Lmona/utils.pyPK!H#u(*%*&mona-0.2.1.dist-info/entry_points.txtPK!xFVAVA&mona-0.2.1.dist-info/LICENSEPK!H$TT%hmona-0.2.1.dist-info/WHEELPK!HXw hmona-0.2.1.dist-info/METADATAPK!Hz C- Dnmona-0.2.1.dist-info/RECORDPK)) u