PK!8&&reprobench/__init__.pyname = "reprobench" VERSION = "0.1.0" PK!| > $reprobench/runners/slurm/__init__.pyimport math import subprocess import sys from pathlib import Path import click from loguru import logger from reprobench.core.base import Runner from reprobench.core.bootstrap import bootstrap from reprobench.core.db import Run from reprobench.runners.base import BaseRunner from reprobench.utils import get_db_path, init_db, read_config from .utils import create_ranges, get_nodelist class SlurmRunner(BaseRunner): def __init__(self, config, **kwargs): super().__init__(config, **kwargs) self.port = kwargs.pop("port") self.num_workers = kwargs.pop("num_workers", None) def prepare(self): init_db(self.db_path) limits = self.config["limits"] num_jobs = Run.select(Run.id).where(Run.status < Run.DONE).count() jobs_per_worker = int(math.ceil(1.0 * num_jobs / self.num_workers)) time_limit_minutes = int(math.ceil(limits["time"] / 60.0)) self.cpu_count = limits.get("cores", 1) # @TODO improve this self.time_limit = 2 * time_limit_minutes * jobs_per_worker self.mem_limit = 2 * limits["memory"] def spawn_server(self): logger.info("Spawning server...") server_cmd = f"{sys.exec_prefix}/bin/reprobench -vvv server --database={self.db_path} --port={self.port}" server_submit_cmd = [ "sbatch", "--parsable", f"--time={self.time_limit}", f"--job-name={self.config['title']}-benchmark-server", f"--output={self.output_dir}/slurm-server.out", "--wrap", server_cmd, ] logger.debug(server_submit_cmd) self.server_job = subprocess.check_output(server_submit_cmd).decode().strip() logger.info("Waiting for the server to be assigned...") self.server_host = get_nodelist(self.server_job) logger.info(f"Server spawned at {self.server_host}, job id: {self.server_job}") self.server_address = f"tcp://{self.server_host}:{self.port}" def spawn_workers(self): logger.info("Spawning workers...") worker_cmd = f"{sys.exec_prefix}/bin/reprobench -vvv worker --host={self.server_host} --port={self.port}" worker_submit_cmd = [ "sbatch", "--parsable", f"--ntasks={self.num_workers}", f"--time={self.time_limit}", f"--mem={self.mem_limit}", f"--cpus-per-task={self.cpu_count}", f"--job-name={self.config['title']}-benchmark-worker", f"--output={self.output_dir}/slurm-worker.out", "--wrap", f"srun {worker_cmd}", ] logger.debug(worker_submit_cmd) self.worker_job = subprocess.check_output(worker_submit_cmd).decode().strip() logger.info(f"Workers job id: {self.worker_job}") @click.command("slurm") @click.option( "-o", "--output-dir", type=click.Path(file_okay=False, writable=True, resolve_path=True), default="./output", show_default=True, ) @click.option("--resume", is_flag=True, default=False) @click.option("-w", "--num-workers", type=int, required=True) @click.option("-p", "--port", default=31313, show_default=True) @click.argument("config", type=click.Path()) def cli(config, **kwargs): config = read_config(config) runner = SlurmRunner(config, **kwargs) runner.run() if __name__ == "__main__": cli() PK!w !reprobench/runners/slurm/utils.pyimport subprocess from itertools import tee, zip_longest # https://stackoverflow.com/a/3430312/9314778 def pairwise_longest(iterable): "variation of pairwise in http://docs.python.org/library/itertools.html#recipes" a, b = tee(iterable) next(b, None) return zip_longest(a, b) def takeuntil(predicate, iterable): """returns all elements before and including the one for which the predicate is true variation of http://docs.python.org/library/itertools.html#itertools.takewhile""" for x in iterable: yield x if predicate(x): break def get_range(it): "gets a range from a pairwise iterator" rng = list(takeuntil(lambda args: (args[1] is None) or (args[1] - args[0] > 1), it)) if rng: b, e = rng[0][0], rng[-1][0] return "%d-%d" % (b, e) if b != e else str(b) def create_ranges(zones): it = pairwise_longest(zones) return ",".join(iter(lambda: get_range(it), None)) def get_nodelist(job_step): """ Blocks until job step is assigned a node """ while True: cmd = ["sacct", "-n", "--parsable2", "-j", job_step, "-o", "NodeList"] output = subprocess.check_output(cmd) if len(output) > 0 and output != b"None assigned\n": return output.decode().strip() PK!ϗ?reprobench/task_sources/base.pyclass BaseTaskSource(object): def __init__(self, path=None, **kwargs): self.path = path def setup(self): return [] PK!11'reprobench/task_sources/doi/__init__.pyfrom reprobench.task_sources.url import UrlSource from reprobench.task_sources.doi.zenodo import ZenodoHandler, ZenodoSandboxHandler class DOISource(UrlSource): handlers = [ZenodoHandler, ZenodoSandboxHandler] def __init__(self, doi, **kwargs): super().__init__(**kwargs) self.doi = doi for handler in self.handlers: if handler.is_compatible(self.doi): self.urls = handler.get_urls(self.doi) break else: raise NotImplementedError(f"No handler for doi: {doi}") PK!RO#reprobench/task_sources/doi/base.pyclass BaseDOIHandler(object): @classmethod def is_compatible(cls, doi): return False @classmethod def get_urls(cls, doi): return [] PK!Sʺ%reprobench/task_sources/doi/zenodo.pyimport requests from reprobench.task_sources.doi.base import BaseDOIHandler class ZenodoHandler(BaseDOIHandler): doi_prefix = "10.5281/zenodo." api_url = "https://zenodo.org/api" @classmethod def is_compatible(cls, doi): return doi.startswith(cls.doi_prefix) @classmethod def get_urls(cls, doi): record_id = doi[len(cls.doi_prefix) :] # remove doi_prefix url = "{}/records/{}".format(cls.api_url, record_id) record = requests.get(url).json() return [file["links"]["self"] for file in record["files"]] class ZenodoSandboxHandler(ZenodoHandler): doi_prefix = "10.5072/zenodo." api_url = "https://sandbox.zenodo.org/api" PK!< reprobench/task_sources/local.pyfrom pathspec import PathSpec from pathlib import Path from .base import BaseTaskSource class LocalSource(BaseTaskSource): def __init__(self, path=None, patterns="", **kwargs): super().__init__(path) self.patterns = patterns def setup(self): spec = PathSpec.from_lines("gitwildmatch", self.patterns.splitlines()) matches = spec.match_tree(self.path) return map(lambda match: (Path(self.path) / match).resolve(), matches) PK!,assreprobench/task_sources/url.pyfrom pathlib import Path from zipfile import ZipFile from loguru import logger from reprobench.utils import download_file, extract_archives from .local import LocalSource class UrlSource(LocalSource): def __init__( self, urls=None, path=None, patterns="", skip_existing=True, extract_archives=True, **kwargs, ): super().__init__(path, patterns=patterns) self.urls = urls or [] self.extract_archives = extract_archives self.skip_existing = skip_existing def setup(self): root = Path(self.path) root.mkdir(parents=True, exist_ok=True) for url in self.urls: filename = url.split("/")[-1].split("?")[0] path = root / filename if not path.exists() or not self.skip_existing: logger.debug(f"Downloading {url} to {path}") download_file(url, path) else: logger.debug(f"Skipping already downloaded file {path}") if self.extract_archives: extract_archives(path) return super().setup() PK!reprobench/tools/base.pyPK!Rbreprobench/tools/executable.pyfrom pathlib import Path from loguru import logger from reprobench.core.base import Tool class ExecutableTool(Tool): name = "Basic Executable Tool" path = None prefix = "--" @classmethod def is_ready(cls): return True def get_arguments(self): return [f"{self.prefix}{key}={value}" for key, value in self.parameters.items()] def get_cmdline(self): return [self.path, *self.get_arguments()] def get_out_path(self): return Path(self.cwd) / "run.out" def get_err_path(self): return Path(self.cwd) / "run.err" def get_output(self): return self.get_out_path().read_bytes() def get_error(self): return self.get_err_path().read_bytes() def run(self, executor): logger.debug([*self.get_cmdline(), self.task]) executor.run( [*self.get_cmdline(), self.task], directory=self.cwd, out_path=self.get_out_path(), err_path=self.get_err_path(), ) PK!ؙ reprobench/utils.pyimport importlib import logging import os import re import signal import subprocess import tarfile import time import zipfile from pathlib import Path from shutil import which import msgpack import requests import strictyaml from playhouse.apsw_ext import APSWDatabase from tqdm import tqdm from reprobench.core.db import db from reprobench.core.schema import schema from reprobench.core.exceptions import ExecutableNotFoundError log = logging.getLogger(__name__) def find_executable(executable): path = which(executable) if path is None: raise ExecutableNotFoundError return path def silent_run(command): log.debug(f"Running: {command}") return subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def import_class(path): module_path, tail = ".".join(path.split(".")[:-1]), path.split(".")[-1] module = importlib.import_module(module_path) return getattr(module, tail) def copyfileobj(fsrc, fdst, callback, length=16 * 1024): while True: buf = fsrc.read(length) if not buf: break fdst.write(buf) callback(len(buf)) def download_file(url, dest): r = requests.get(url, stream=True) with tqdm( total=int(r.headers.get("content-length", 0)), unit="B", unit_scale=True, unit_divisor=1024, ) as progress_bar: progress_bar.set_postfix(file=Path(dest).name, refresh=False) with open(dest, "wb") as f: copyfileobj(r.raw, f, progress_bar.update) ranged_numbers_re = re.compile(r"(?P\d+)\.\.(?P\d+)(\.\.(?P\d+))?") def is_range_str(range_str): return ranged_numbers_re.match(range_str) def str_to_range(range_str): matches = ranged_numbers_re.match(range_str).groupdict() start = int(matches["start"]) end = int(matches["end"]) if matches["step"]: return range(start, end, int(matches["step"])) return range(start, end) def encode_message(obj): return msgpack.packb(obj, use_bin_type=True) def decode_message(msg): return msgpack.unpackb(msg, raw=False) def send_event(socket, event_type, payload=None): """ Used in the worker with a DEALER socket """ socket.send_multipart([event_type, encode_message(payload)]) def recv_event(socket): """ Used in the SUB handler """ event_type, payload, address = socket.recv_multipart() return event_type, decode_message(payload), address def clean_up(): signal.signal(signal.SIGTERM, signal.SIG_IGN) os.killpg(os.getpgid(0), signal.SIGTERM) time.sleep(1) os.killpg(os.getpgid(0), signal.SIGKILL) def get_db_path(output_dir): return str((Path(output_dir) / f"benchmark.db").resolve()) def init_db(db_path): database = APSWDatabase(db_path) db.initialize(database) def read_config(config_path): with open(config_path, "r") as f: config_text = f.read() config = strictyaml.load(config_text, schema=schema).data return config def extract_zip(path, dest): if not dest.is_dir(): with zipfile.ZipFile(path, "r") as f: f.extractall(dest) def extract_tar(path, dest): if not dest.is_dir(): with tarfile.TarFile.open(path) as f: f.extractall(dest) def extract_archives(path): extract_path = Path(path).with_name(path.stem) if zipfile.is_zipfile(path): extract_zip(path, extract_path) elif tarfile.is_tarfile(path): extract_tar(path, extract_path) PK!HJ.:+reprobench-0.7.2.dist-info/entry_points.txtN+I/N.,()*J-(OJKΰE0r3s2PK!XB33"reprobench-0.7.2.dist-info/LICENSEMIT License Copyright (c) 2019 Rakha Kanz Kautsar Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!Hu)GTU reprobench-0.7.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/R(O-)$qzd&Y)r$UV&UrPK!HS111#reprobench-0.7.2.dist-info/METADATAoO0Sd8ICctP*` 4!׹&^NYM|<].Ak}(Gn 3 Il꣎sS0e9r-.2p?YWKdsprmv>1s.(\٭sRKS R< ܤ<`fFjU?&6g5<TA +ELM3w4rna$!g9\Dbh8O u/JԿM# Z(YYx@$\[$DGqxR"n{c*H a%zk;/W/h ?X &,ڰ`ЙgSD>//`L1ܶ!!?}3 􅳯dl eƊʱnvv;q\j}61 I™K,=Ŗ'T04^5g_2Eo R~pr:;jD|#_tNqxh)rMQO7s+8/]})+X<=I^@Ƽ%"ݮ'(U.tc#,b2b5;heaO GVC(_!&|3B7f]Vk go^1<@(B|!_D^+riDݺ)1&ø㔚 4U\ru1:'ZqEçӆcR#(XF<!Ty[b~ vM-Y]rpOE-Yʜ'fM|V?c&-q6eߓX-o~nur_Oyd8-W#a6syoF BȀ/wޫڀ&#nC^ξ+@_2 xo-8{cցmc2G1zl_j4Sۦɱ݌bwnCŲZ? XU;Wkn.=:t&Xj57xcΔ3KVdfCL,kJ)CF bUo#OcQmctI.X_XRTѬəIv(@(~d *闬iJ=z(.f1)MkVT(RK eP PV ºqB^]˭yw!t")[{bGCu7̨M-qzn1ԫQ%[ڂuNUjQ= ylKfp뮻ι|9ֳsXu \o9LweOz.!,݄sP-ڔ; EǛH\=Vn0S0XEK81!./UAм`KF[!r¤¥&>b>i7|٫dyj% 4λMTP GK pWQo??F"| v01;a$̞Aw+ ߿.JXPf^a禹٬.^9#k#w{oϧ~Ugot ʧָU~vFS,V}bGgyPK!8&&reprobench/__init__.pyPK!| > $}reprobench/runners/slurm/__init__.pyPK!w !reprobench/runners/slurm/utils.pyPK!ϗ?ereprobench/task_sources/base.pyPK!11'.reprobench/task_sources/doi/__init__.pyPK!RO#reprobench/task_sources/doi/base.pyPK!Sʺ%reprobench/task_sources/doi/zenodo.pyPK!< reprobench/task_sources/local.pyPK!,assreprobench/task_sources/url.pyPK!Lreprobench/tools/base.pyPK!Rbreprobench/tools/executable.pyPK!ؙ reprobench/utils.pyPK!HJ.:+reprobench-0.7.2.dist-info/entry_points.txtPK!XB33"reprobench-0.7.2.dist-info/LICENSEPK!Hu)GTU reprobench-0.7.2.dist-info/WHEELPK!HS111##reprobench-0.7.2.dist-info/METADATAPK!H;Y !reprobench-0.7.2.dist-info/RECORDPK$$