PK!x쬦 hb/cli.py"""Hash Brown CLI""" from concurrent.futures import ProcessPoolExecutor from glob import iglob from pathlib import Path from time import time from typing import Any, Tuple import click from hb.main import Checksum def _shorten(error: Exception) -> str: msg = str(error) return msg.partition("] ")[-1] if "] " in msg else msg def _compute(algorithm: str, path: str, given: str) -> Tuple[int, str]: try: actual = Checksum(path).get(algorithm) except OSError as error: return (2, f"{click.style(_shorten(error), fg='yellow')}") else: if given: if actual == given: return (0, f"{Checksum.print(algorithm, path, given)} {click.style('OK', fg='green')}") return (1, f"{Checksum.print(algorithm, path, given)} {click.style(f'BAD', fg='red')}") return (0, Checksum.print(algorithm, path, actual)) def _algorithm_mode(algorithm: str, path: str, given: str, parallel: bool) -> None: computed = 0 with ProcessPoolExecutor(max_workers=None if parallel else 1) as executor: for filename in iglob(path, recursive=True): if not Path(filename).is_file(): continue future = executor.submit(_compute, algorithm, filename, given) future.add_done_callback(lambda f: click.echo(f.result()[1])) computed += 1 if not computed: click.echo(f"No files matched the pattern: '{path}'") def _check_mode(path: str, quiet: bool, parallel: bool) -> None: def _cb(code: int, result: str) -> None: if not quiet: click.echo(result) elif code: click.echo(result) with ProcessPoolExecutor(max_workers=None if parallel else 1) as executor: for algorithm, filename, given in Checksum.parse(path): future = executor.submit(_compute, algorithm, filename, given) future.add_done_callback(lambda f: _cb(f.result()[0], f.result()[1])) @click.version_option(version=Checksum.VERSION) @click.command(context_settings={"help_option_names": ["-h", "--help"]}) @click.option("-a", "--algorithm", type=click.Choice(Checksum.SUPPORTED)) @click.option("-c", "--check", is_flag=True, help="Read checksums from a file.") @click.option("-g", "--given", help="See if the given checksum `TEXT` matches the computed checksum. (use with -a)") @click.option("-p", "--parallel", is_flag=True, default=False, help="Process files in parallel.") @click.option("-q", "--quiet", is_flag=True, default=False, help="Hide results that are OK. (use with -c)") @click.option("-t", "--timer", is_flag=True, help="Display elapsed time in seconds.") @click.argument("path") def cli(**kwargs: Any) -> None: """Hash Brown: Compute and verify checksums.""" start_time = time() try: if kwargs["algorithm"]: _algorithm_mode(**{k: v for k, v in kwargs.items() if k in ["algorithm", "path", "given", "parallel"]}) elif kwargs["check"]: _check_mode(**{k: v for k, v in kwargs.items() if k in ["path", "quiet", "parallel"]}) else: pass except (OSError, ValueError) as error: click.echo(_shorten(error)) if kwargs["timer"]: click.echo(f"# {time() - start_time:.3f}s") if __name__ == "__main__": cli() PK!Hy> hb/main.py"""Hash Brown""" from contextlib import contextmanager from pathlib import Path from sys import stderr from threading import Thread from time import sleep from typing import Dict, Generator, IO, List, Tuple import hashlib import re import zlib class Checksum(): """Compute various checksums. Digest, hash, and checksum are all referred to as checksum for simplicity. """ SUPPORTED = ("blake2b", "blake2s", "md5", "sha1", "sha224", "sha256", "sha384", "sha512", "adler32", "crc32") VERSION = "1.4.2" @staticmethod def parse(path: str) -> List[Tuple[str, ...]]: """Parse lines from a checksum file.""" parsed_lines = [] with Path(path).open("r") as lines: for line in lines: line = line.strip() if not line or line[0] == "#": # skip blank lines and comments continue match = re.match(r"(\w+) \((.+)\) = (\w+)", line) if not match: raise ValueError(f"Bad line in checksum file: '{line}'") parsed_lines.append(tuple(match.group(1, 2, 3))) return parsed_lines @staticmethod def print(algorithm: str, path: str, checksum: str) -> str: """BSD style checksum output.""" return f"{algorithm} ({Path(path)}) = {checksum}" def __init__(self, path: str, threshold: int = 200) -> None: self._path = Path(path) self.checksums: Dict[str, str] = {} self.filesize = self._path.stat().st_size self.threshold = threshold @contextmanager def _progress_open(self) -> Generator: def _p(file: IO) -> None: while not file.closed: print(f"{int(file.tell() / self.filesize * 100)}%", end="\r", file=stderr) sleep(0.2) print(" ", end="\r", file=stderr) # clear the progress display with open(self._path, "rb") as lines: thread = Thread(target=_p, args=(lines,)) if self.filesize > self.threshold * 1024 * 1024: thread.start() yield lines if thread.is_alive(): thread.join() def _hashlib_compute(self, name: str) -> str: result = hashlib.new(name) with self._progress_open() as lines: for line in lines: result.update(line) return result.hexdigest() def _zlib_compute(self, name: str) -> str: if name == "adler32": result = 1 update = zlib.adler32 elif name == "crc32": result = 0 update = zlib.crc32 with self._progress_open() as lines: for line in lines: result = update(line, result) return hex(result)[2:].zfill(8) def compute(self, algorithm: str) -> str: """Compute a checksum.""" if algorithm not in Checksum.SUPPORTED: raise ValueError(f"Unsupported algorithm: '{algorithm}'") elif algorithm in ["adler32", "crc32"]: result = self._zlib_compute(algorithm) else: result = self._hashlib_compute(algorithm) self.checksums[algorithm] = result return result def get(self, algorithm: str) -> str: """Same as `compute` but does not recalculate the checksum if it is already known.""" if algorithm in self.checksums: return self.checksums[algorithm] return self.compute(algorithm) @property def path(self) -> str: """The path to calculate.""" return str(self._path) @path.setter def path(self, path: str) -> None: """Set new path and clear the checksums dictionary.""" self._path = Path(path) self.checksums = {} @property def blake2b(self) -> str: """blake2b""" return self.get("blake2b") @property def blake2s(self) -> str: """blake2s""" return self.get("blake2s") @property def md5(self) -> str: """md5""" return self.get("md5") @property def sha1(self) -> str: """sha1""" return self.get("sha1") @property def sha224(self) -> str: """sha224""" return self.get("sha224") @property def sha256(self) -> str: """sha256""" return self.get("sha256") @property def sha384(self) -> str: """sha384""" return self.get("sha384") @property def sha512(self) -> str: """sha512""" return self.get("sha512") @property def adler32(self) -> str: """adler32""" return self.get("adler32") @property def crc32(self) -> str: """crc32""" return self.get("crc32") PK!H!!#hb-1.4.2.dist-info/entry_points.txtN+I/N.,()HHKɴb..PK!r++hb-1.4.2.dist-info/LICENSEMIT License Copyright (c) 2018 Ching Chow Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H\TThb-1.4.2.dist-info/WHEEL 1 0 нR \I$ơ7.ZON `h6oi14m,b4>4ɛpK>X;baP>PK!HDjƩ hb-1.4.2.dist-info/METADATAWn8}Wp}H Kulnmд t@Q5P*RN w(Sg@, je9"ӳ>F$h Ϛ4EAŋJaGh.TքQA5/3r`)c͌֕uy;e9Y=U}R˻Һ J7:Qb'(ɑ)rڳzDFrȆra80p;3ƾu ?^e=~MXPxOaBVL4Ս" MN0;гrkYPČeyL !M¡d`K籓P%WZ>-hsr\Uʨ jzr۷7X.'ϣ8iy;, b^N&/_߁_2iQ-s;j_;_fD;Yqn ybv3t?uAmZpvKEy3pßRHt50 ?__`I֘˲RJ*y8!yeX$ھWXumbp+bXduk&`|`e#gke)=pr"9|6[23-ŀ^ݸ%qEuSʻ1M20^X5, X2V+(KݎE2t/g&gIj^Ǐ|4㎬n|Dt-뷏+lqJS!,k:Z_ gEgY 1ȔPrH#6]L tڟ߇CzOt'&Az)& 'i%QIRPQAgPW<6;#Q] r]G?A/2]oݞa? iHx !'ǧ[K7XH 4!i-}g[|~3fU+H7(G 7p18uc,- L"{ԧ}E$X3t*ht%QzA0̍Х@M#U4G!iC &*m1j T(BL;J1/F l)y'dDI ӕdadd0L_O[5:RN,F{L4rc dJa,La}V ="l!ME& 5aT͗#웇?hV=D@{fͲmI}Miڎeg/7E?0 i OM^[쟘q! 0X؇]o*Zc Zf ax;{u6v/?mt4/ޫJ9A+e[Nje۳ſOa6+$_*W(چPK!HQRhb-1.4.2.dist-info/RECORDmr0@}G\t!"J ">7 ` 4_7tM({Og,\lI`2^%\'~>ύ{6SТ[^L?}ݺ:ڍLYS+WsD6^|Ckb;sQS U~N t`3ɾ ON8w"vQ8Tg`ݖwq?^gƪis5B{X˴t:B4qŶc(v|#-(( (y=;qQE׳n3#ȑ?PK!x쬦 hb/cli.pyPK!Hy>  hb/main.pyPK!H!!#hb-1.4.2.dist-info/entry_points.txtPK!r++ hb-1.4.2.dist-info/LICENSEPK!H\TTv$hb-1.4.2.dist-info/WHEELPK!HDjƩ %hb-1.4.2.dist-info/METADATAPK!HQR*hb-1.4.2.dist-info/RECORDPKk,