PK!IDh**project_manager/__init__.pyfrom .main import cli __all__ = ['cli'] PK!+Y$project_manager/commands/__init__.pyfrom .build import main as build from .run import main as run from .gather import main as gather __all__ = ['build', 'run', 'gather'] PK! !project_manager/commands/build.pyimport os import copy import shutil import operator import functools import itertools from pprint import pprint import sh import anyconfig from tqdm import tqdm from ..utils import load_config, assign_to_dict, dict_to_keyset def main(config_path: str, dry: bool = False): config = load_config(config_path) base_config = anyconfig.load(config['base_config']) # if needed prepare environment if not dry: shutil.rmtree(config['working_dir'], ignore_errors=True) os.makedirs(config['working_dir']) exec_dir = os.getcwd() os.chdir(config['working_dir']) # setup and run schedule special_extra_keys = ['repetitions'] # these get handled individually config_schedule = [] for entry in config['config_parameters']: tmp = [] # handle possible pairing if 'paired' in entry: # sanity checks for e in entry['paired']: if len(e['values']) != len(entry['values']): raise RuntimeError( f'Invalid pairing for "{entry["key"]}" & "{e["key"]}"') # generate associations paired_data = [] for i in range(len(entry['values'])): case_values = [(c['key'], c['values'][i]) for c in entry['paired']] paired_data.append([{ 'key': key, 'value': val } for key, val in case_values]) else: paired_data = [None] * len(entry['values']) # add to schedule for val, pair in zip(entry['values'], paired_data): tmp.append(('config', entry['key'], val, pair)) config_schedule.append(tmp) extra_schedule = [[('extra', k, v, None) for v in vs] for k, vs in config['extra_parameters'].items() if k not in special_extra_keys] schedule = config_schedule + extra_schedule for spec in tqdm( itertools.product(*schedule), total=functools.reduce(operator.mul, [len(s) for s in schedule], 1), desc='Setting up environments' ): # create custom config cur_conf = copy.deepcopy(base_config) for t, k, v, p in spec: if t != 'config': continue # set main parameter assign_to_dict(cur_conf, k, v) # set potential paired parameters if p is not None: for entry in p: k, v = entry['key'], entry['value'] assign_to_dict(cur_conf, k, v) # assert subset relation (so only valid keys are used) cur_keys = dict_to_keyset(cur_conf) base_keys = dict_to_keyset(base_config) if cur_keys != base_keys: msg = 'Generated config is invalid.\n' only_cur = cur_keys - base_keys msg += 'Only in generated config:\n' for k in only_cur: msg += f' > {k}\n' raise RuntimeError(msg) # make spec sortable c = lambda x: '+'.join(x) if isinstance(x, list) else x # noqa: E731 c2 = lambda x: str(x).replace('/', '_') # noqa: E731 spec = [(t, c(k), v, p) for t, k, v, p in spec] # extract extra info extra_info = {k: v for t, k, v, p in sorted(spec) if t == 'extra'} repetition_count = config['extra_parameters']['repetitions'] for rep in range(repetition_count): # assemble index idx = ';'.join([f'{k}:{c2(v)}' for t, k, v, p in sorted(spec)]) rep_app = f';repetition:{rep+1}' if repetition_count > 1 else '' target_dir = f'run__{idx}{rep_app}' # abort if in dry run if dry: print(target_dir) pprint(cur_conf) pprint(extra_info) print() continue # setup environment if os.path.isdir( os.path.join(exec_dir, config['project_source']) ): shutil.copytree( os.path.join(exec_dir, config['project_source']), target_dir) else: sh.git.clone(config['project_source'], target_dir) if 'git_branch' in extra_info: sh.git.checkout(extra_info['git_branch'], _cwd=target_dir) conf_name = os.path.basename(config['base_config']) anyconfig.dump(cur_conf, f'{target_dir}/{conf_name}') for sym in config['symlinks']: sym_path = os.path.relpath(os.path.join( exec_dir, os.path.dirname(config_path), sym), start=target_dir) if not os.path.exists(os.path.join(target_dir, sym_path)): print(f'Cannot find "{sym_path}"') sym_base = os.path.basename(os.path.normpath(sym)) os.symlink( sym_path, os.path.join(target_dir, sym_base), target_is_directory=os.path.isdir(sym)) PK!|"project_manager/commands/gather.pyimport os import shutil from ..utils import load_config def main(config_path: str, output: str) -> None: config = load_config(config_path) if output is None: target_dir = os.path.join(config['working_dir'], 'aggregated_results') else: target_dir = output shutil.rmtree(target_dir, ignore_errors=True) os.makedirs(target_dir) # iterate over individual pipeline runs for entry in os.scandir(config['working_dir']): if not entry.name.startswith('run__'): continue print(entry.name) # iterate over requested result directories for dir_ in config['result_dirs']: for file_ in os.scandir( os.path.join(entry.path, dir_) ): print(f' > {file_.name}') # assemble new filename raw_file, ext = os.path.splitext(file_) idx = entry.name[3:] suf_file = os.path.basename(raw_file) + idx + ext # extract file os.makedirs(os.path.join(target_dir, dir_), exist_ok=True) shutil.copyfile( file_.path, os.path.join(target_dir, dir_, suf_file)) PK!~~project_manager/commands/run.pyimport os from tqdm import tqdm from ..utils import load_config def main(config_path: str, dry: bool): config = load_config(config_path) for entry in tqdm( os.scandir(config['working_dir']), total=len(os.listdir(config['working_dir'])) ): if not entry.name.startswith('run__'): continue # switch to correct directory print(entry.name) wd = os.getcwd() os.chdir(entry.path) # execute commands for cmd in config['exec_command']: print(f' > {cmd}') if not dry: os.system(cmd) os.chdir(wd) PK!j* 6 6 $project_manager/config_validation.pyimport sys from jsonschema import validators def extend_with_default(validator_class): validate_properties = validator_class.VALIDATORS['properties'] def set_defaults(validator, properties, instance, schema): for property, subschema in properties.items(): if 'default' in subschema: instance.setdefault(property, subschema['default']) for error in validate_properties( validator, properties, instance, schema, ): yield error return validators.extend( validator_class, {'properties': set_defaults}) def validate_config(data, schema): Validator = validators.validator_for(schema) DefaultValidator = extend_with_default(Validator) v = DefaultValidator(schema) if not v.is_valid(data): print('Invalid config:') for error in v.iter_errors(data): print(f' - {error.message}') print('Aborting...') sys.exit(-1) return data def main(data): schema = { 'definitions': { 'config_items': { 'type': 'object', 'properties': { 'key': {'type': ['string', 'array']}, 'values': {'type': 'array'}, 'paired': { 'type': 'array', 'items': {'$ref': '#/definitions/config_items'} } }, 'additionalProperties': False, 'required': ['key', 'values'] } }, 'type': 'object', 'properties': { 'project_source': {'type': 'string'}, 'working_dir': {'type': 'string'}, 'exec_command': { 'type': 'array', 'items': {'type': 'string'}, 'default': [] }, 'result_dirs': { 'type': 'array', 'items': {'type': 'string'}, 'default': [] }, 'symlinks': { 'type': 'array', 'items': {'type': 'string'}, 'default': [] }, 'base_config': {'type': 'string'}, 'config_parameters': { 'type': 'array', 'items': {'$ref': '#/definitions/config_items'} }, 'extra_parameters': { 'type': 'object', 'properties': { 'git_branch': { 'type': 'array' }, 'repetitions': { 'type': 'integer', 'default': 1 } }, 'additionalProperties': False, 'default': {'repetitions': 1} } }, 'additionalProperties': False, 'required': ['project_source', 'working_dir', 'base_config'] } return validate_config(data, schema) if __name__ == '__main__': print(main({ 'project_source': 'foo', 'working_dir': 'bar', 'base_config': 'baz' })) PK![m+project_manager/main.py""" Automatically span a matrix of configurations using the `build` command. Then execute each pipeline using `run`. And finally aggregate the obtained results using the `gather` command. """ import click @click.group() def cli() -> None: """Automate multi-config simulation runs.""" pass @cli.command(help='Setup environments.') @click.option( '--config', '-c', 'config_path', default='config.yaml', type=click.Path(exists=True, dir_okay=False), help='Config file to use.') @click.option( '--dry', '-d', default=False, is_flag=True, help='Conduct dry run.') def build(config_path: str, dry: bool) -> None: from .commands import build as build_cmd build_cmd(config_path, dry) @cli.command(help='Run simulations in each environment.') @click.option( '--config', '-c', 'config_path', default='config.yaml', type=click.Path(exists=True, dir_okay=False), help='Config file to use.') @click.option( '--dry', '-d', default=False, is_flag=True, help='Conduct dry run.') def run(config_path: str, dry: bool) -> None: from .commands import run as run_cmd run_cmd(config_path, dry) @cli.command(help='Gather results from each run.') @click.option( '--config', '-c', 'config_path', default='config.yaml', type=click.Path(exists=True, dir_okay=False), help='Config file to use.') @click.option( '--output', '-o', default=None, type=click.Path(exists=False, file_okay=False), help='Path to store aggregated results at.') def gather(config_path: str, output: str) -> None: from .commands import gather as gather_cmd gather_cmd(config_path, output) if __name__ == '__main__': cli() PK!!project_manager/tests/__init__.pyPK!5!project_manager/tests/config.yamlproject_source: dummy_project working_dir: tmp exec_command: - python3 run.py result_dirs: - results base_config: dummy_project/my_conf.yaml config_parameters: - key: message values: [A, B, null, D] PK!ĤL0project_manager/tests/dummy_project/my_conf.yamlmessage: 'this is important' PK!dd*project_manager/tests/dummy_project/run.pyimport os import yaml def main(): with open('my_conf.yaml') as fd: config = yaml.load(fd) msg = config['message'] os.makedirs('results') with open('results/data.txt', 'w') as fd: if msg is None: fd.write('special') else: fd.write(config['message']) if __name__ == '__main__': main() PK!/project_manager/tests/test_config_validation.pyimport copy import yaml from ..config_validation import main as validate def test_minimal_config(): data = { 'project_source': '/path/to/project/', 'working_dir': 'results/', 'base_config': '/path/to/project/config.yaml' } validate(data) assert data == { 'project_source': '/path/to/project/', 'working_dir': 'results/', 'base_config': '/path/to/project/config.yaml', 'extra_parameters': {'repetitions': 1}, 'exec_command': [], 'result_dirs': [], 'symlinks': [] } def test_maximal_config(): data = yaml.load(""" project_source: '/path/to/project/' working_dir: 'output/' exec_command: - snakemake -pr result_dirs: - images - results base_config: '/path/to/project/config.yaml' symlinks: - ../data/ config_parameters: - key: param1 values: [0, 1, 2, d] paired: - key: param2 values: [a, b, c, 6] - key: [nested, param3] values: ['a', 'b', 'c'] extra_parameters: git_branch: ['master'] repetitions: 1 """) orig_data = copy.deepcopy(data) validate(data) assert data == orig_data PK!)project_manager/tests/test_integration.pyimport os import shutil import yaml import pytest from click.testing import CliRunner from ..main import cli from ..commands import build def test_dummy(): root = os.path.join(os.getcwd(), 'project_manager', 'tests') runner = CliRunner() with runner.isolated_filesystem(): # setup environment root_iso = os.getcwd() shutil.copy(os.path.join(root, 'config.yaml'), 'config.yaml') shutil.copytree(os.path.join(root, 'dummy_project'), 'dummy_project') # run commands os.chdir(root_iso) result_build = runner.invoke(cli, ['build']) assert result_build.exit_code == 0 os.chdir(root_iso) result_run = runner.invoke(cli, ['run']) assert result_run.exit_code == 0 os.chdir(root_iso) result_gather = runner.invoke(cli, ['gather']) assert result_gather.exit_code == 0 # check output os.chdir(root_iso) all_data = [] for entry in os.scandir('tmp/aggregated_results/results/'): expected_data = entry.name.split('.')[0].split(':')[1] # handle special case (null/None in config) if expected_data == 'None': expected_data = 'special' with open(entry.path) as fd: data = fd.read() assert expected_data == data all_data.append(data) with open('config.yaml') as fd: config = yaml.load(fd) # only one entry in list, thus this must be the message expected_all_data = [v if v is not None else 'special' for v in config['config_parameters'][0]['values']] assert set(all_data) == set(expected_all_data) def test_key_misspelling(): runner = CliRunner() with runner.isolated_filesystem(): # setup environment with open('my_conf.yaml', 'w') as fd: fd.write(""" actual_key: 42 another_key: foo """) with open('config.yaml', 'w') as fd: fd.write(""" project_source: fubar working_dir: tmp base_config: my_conf.yaml config_parameters: - key: misspelled_key values: [invalid] - key: another_key values: [bar, baz, qux] """) # run commands with pytest.raises(RuntimeError, match='> misspelled_key'): build('config.yaml') def test_mismatching_key_pairing_name(): runner = CliRunner() with runner.isolated_filesystem(): # setup environment with open('my_conf.yaml', 'w') as fd: fd.write(""" my_key: 1 other_key: a """) with open('config.yaml', 'w') as fd: fd.write(""" project_source: fubar working_dir: tmp base_config: my_conf.yaml config_parameters: - key: my_key values: [2,3,4] paired: - key: other_key_wrong values: [b,c,d] """) # run commands with pytest.raises( RuntimeError, match='> other_key_wrong' ): build('config.yaml') def test_mismatching_key_pairing_length(): runner = CliRunner() with runner.isolated_filesystem(): # setup environment with open('my_conf.yaml', 'w') as fd: fd.write(""" my_key: 1 other_key: a """) with open('config.yaml', 'w') as fd: fd.write(""" project_source: fubar working_dir: tmp base_config: my_conf.yaml config_parameters: - key: my_key values: [2,3,4] paired: - key: other_key values: [b,c] """) # run commands with pytest.raises( RuntimeError, match='Invalid pairing for "my_key" & "other_key"' ): build('config.yaml') PK!project_manager/utils.pyimport operator import functools import yaml from .config_validation import main as validate def load_config(fname): with open(fname) as fd: config = yaml.load(fd) return validate(config) def get_by_keylist(root, items): return functools.reduce(operator.getitem, items, root) def set_by_keylist(root, items, value): get_by_keylist(root, items[:-1])[items[-1]] = value def assign_to_dict(dict_, key, value): if isinstance(key, list): set_by_keylist(dict_, key, value) else: dict_[key] = value def dict_to_keyset(d): all_keys = set() for k, v in d.items(): if isinstance(v, dict): cur = (k, dict_to_keyset(v)) else: cur = k all_keys.add(cur) return frozenset(all_keys) PK!H9&?,70project_manager-0.0.3.dist-info/entry_points.txtN+I/N.,()*(JM.MKLO-E[%drqPK!HڽTU%project_manager-0.0.3.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H!(project_manager-0.0.3.dist-info/METADATATێH}ﯨ@$ hG̮if"'nӗ=q$d<]ԩ:]՗E)Hߢu| ZsOE} Oث4v9-TؠAH Y+<c/LiKso]e /L-lî Yv<Ń)6B͗_Uļ ?iѥ8̳=W9H29T x&]x2iB*PߋuƣݝIc܏=&1BwDؐ?Ꮣ鐏G2a2}|qvEMac0NQ}/8||8eI23"fۙ}suÙ)Bo.2es-XqiW']r1=CWXFў*H_w-ͭO>Yf;C*1oVªߵw%rSdeCiV+'vQORدQ'pJm"]__W{dMB7Ό:6 k3j¨g R}cuF٦}*^F 5k V!].YS41KPrTt^MD$@8l#h;#)K_JPg62DUY#m^P~K٘t c4z&L¼v l`{gBeژ!wNugFIz>h\ q&i M{G۞ا PK!HUYG&project_manager-0.0.3.dist-info/RECORDɎ:}? ]0$06` fL1i-)ݩ˲9m$*,iְmAkD?tξZ[potE(˧Y?ĸ[}Fw=YEx&1r}RyBpھ O0a1z잟` c7Pc[]-qB@hǾ&} ( [9bϕ VmZT`* vqƂK:lv\L9Nd[#k0gl,ͳ#,V;5@/)_\HUI0lY3ĉBW|ƚoA^Q."-!3 gWOӉXYn9~hꕏ).*4n?ȫ3Gk_X-QU~4,%Ng> zl#Gn%Ro/`0TwL-j.V>s0Z?Lc/˹>%MŠXQz.^(Az|;IUkSA){TTG+g;U8K-}&hc3 uzS|SFN^%wn99et,:C aU0օ{W`pOl< $RUTZr/QX[R ݪ"$eFx8Aֹ>1t=Eq SD U:Fh({ ayFOtF\FrU=Y 6WW-7%8&`9n:%ct#k5ŏsmH{[FPK!IDh**project_manager/__init__.pyPK!+Y$cproject_manager/commands/__init__.pyPK! !-project_manager/commands/build.pyPK!|"bproject_manager/commands/gather.pyPK!~~nproject_manager/commands/run.pyPK!j* 6 6 $)project_manager/config_validation.pyPK![m+)project_manager/main.pyPK!!X0project_manager/tests/__init__.pyPK!5!0project_manager/tests/config.yamlPK!ĤL01project_manager/tests/dummy_project/my_conf.yamlPK!dd*2project_manager/tests/dummy_project/run.pyPK!/3project_manager/tests/test_config_validation.pyPK!)8project_manager/tests/test_integration.pyPK!Gproject_manager/utils.pyPK!H9&?,70Jproject_manager-0.0.3.dist-info/entry_points.txtPK!HڽTU%xKproject_manager-0.0.3.dist-info/WHEELPK!H!(Lproject_manager-0.0.3.dist-info/METADATAPK!HUYG&TOproject_manager-0.0.3.dist-info/RECORDPKR