PK!@E H hb/cli.py"""Hash Brown CLI""" from concurrent.futures import ProcessPoolExecutor from glob import iglob from pathlib import Path from time import time from typing import Tuple import click from hb.main import Checksum def _shorten(error: Exception) -> str: return str(error).partition("] ")[-1] def _compute(algorithm: str, path: str, given: str) -> Tuple[int, str]: try: actual = Checksum(path).get(algorithm) except OSError as error: return (2, f"{click.style(_shorten(error), fg='yellow')}") else: if given: if actual == given: return (0, f"{Checksum.print(algorithm, path, given)} {click.style('OK', fg='green')}") return (1, f"{Checksum.print(algorithm, path, given)} {click.style(f'ACTUAL: {actual}', fg='red')}") return (0, Checksum.print(algorithm, path, actual)) def _algorithm_mode(algorithm: str, path: str, given: str, parallel: str) -> None: computed = 0 with ProcessPoolExecutor(max_workers=None if parallel else 1) as executor: for filename in iglob(path, recursive=True): if not Path(filename).is_file(): continue future = executor.submit(_compute, algorithm, filename, given) future.add_done_callback(lambda f: click.echo(f.result()[1])) computed += 1 if not computed: click.echo(f"No files matched the pattern: '{path}'") def _check_mode(path: str, parallel: str) -> None: def _cb(code: int, result: str) -> None: if code: click.echo(result) with ProcessPoolExecutor(max_workers=None if parallel else 1) as executor: for algorithm, filename, given in Checksum.parse(path): future = executor.submit(_compute, algorithm, filename, given) future.add_done_callback(lambda f: _cb(f.result()[0], f.result()[1])) @click.version_option(version=Checksum.VERSION) @click.command(context_settings={"help_option_names": ["-h", "--help"]}) @click.option("-a", "--algorithm", type=click.Choice(Checksum.SUPPORTED)) @click.option("-c", "--check", is_flag=True, help="Read checksums from a file.") @click.option("-g", "--given", help="See if the given checksum `TEXT` matches the computed checksum. (use with -a)") @click.option("-p", "--parallel", is_flag=True, default=False, help="Process files in parallel.") @click.option("-t", "--timer", is_flag=True, help="Display elapsed time in seconds.") @click.argument("file") def cli(**kwargs: str) -> None: """Hash Brown: Compute and verify checksums.""" start_time = time() try: if kwargs["algorithm"]: _algorithm_mode(kwargs["algorithm"], kwargs["file"], kwargs["given"], kwargs["parallel"]) elif kwargs["check"]: _check_mode(kwargs["file"], kwargs["parallel"]) else: pass except (OSError, ValueError) as error: click.echo(_shorten(error)) if kwargs["timer"]: click.echo(f"# {time() - start_time:.3f}s") if __name__ == "__main__": cli() PK!snn hb/main.py"""Hash Brown""" import hashlib import re import zlib from pathlib import Path from threading import Thread from time import sleep from typing import Dict, IO, List, Tuple class Checksum(): """Compute various checksums. Digest, hash, and checksum are all referred to as checksum for simplicity. """ SUPPORTED = ("blake2b", "blake2s", "md5", "sha1", "sha224", "sha256", "sha384", "sha512", "adler32", "crc32") VERSION = "1.3.0" @staticmethod def parse(path: str) -> List[Tuple[str, ...]]: """Parse lines from a checksum file.""" parsed_lines = [] with Path(path).open("r") as lines: for line in lines: line = line.strip() if not line or line[0] == "#": # skip blank lines and comments continue match = re.match(r"(\w+) \((.+)\) = (\w+)", line) if not match: raise ValueError(f"Bad line in checksum file: '{line}'") parsed_lines.append(tuple(match.group(1, 2, 3))) return parsed_lines @staticmethod def print(algorithm: str, path: str, checksum: str) -> str: """BSD style checksum output.""" return f"{algorithm} ({Path(path)}) = {checksum}" def __init__(self, path: str, threshold: int = 200) -> None: self._path = Path(path) self.checksums: Dict[str, str] = {} self.filesize = self._path.stat().st_size self.threshold = threshold def _progress(self, file: IO) -> None: def _p(file: IO) -> None: while not file.closed: print(f"{int(file.tell() / self.filesize * 100)}%", end="\r") sleep(0.2) if self.filesize > self.threshold * 1024 * 1024: Thread(target=_p, args=(file,)).start() def _hashlib_compute(self, name: str) -> str: result = hashlib.new(name) with self._path.open("rb") as lines: self._progress(lines) for line in lines: result.update(line) return result.hexdigest() def _zlib_compute(self, name: str) -> str: if name == "adler32": result = 1 update = zlib.adler32 elif name == "crc32": result = 0 update = zlib.crc32 with self._path.open("rb") as lines: self._progress(lines) for line in lines: result = update(line, result) return hex(result)[2:].zfill(8) def compute(self, algorithm: str) -> str: """Compute a checksum.""" if algorithm not in Checksum.SUPPORTED: raise ValueError(f"Unsupported algorithm: '{algorithm}'") elif algorithm in ["adler32", "crc32"]: result = self._zlib_compute(algorithm) else: result = self._hashlib_compute(algorithm) self.checksums[algorithm] = result return result def get(self, algorithm: str) -> str: """Same as `compute` but does not recalculate the checksum if it is already known.""" if algorithm in self.checksums: return self.checksums[algorithm] return self.compute(algorithm) @property def path(self) -> str: """The path to calculate.""" return str(self._path) @path.setter def path(self, path: str) -> None: """Set new path and clear the checksums dictionary.""" self._path = Path(path) self.checksums = {} @property def blake2b(self) -> str: """blake2b""" return self.get("blake2b") @property def blake2s(self) -> str: """blake2s""" return self.get("blake2s") @property def md5(self) -> str: """md5""" return self.get("md5") @property def sha1(self) -> str: """sha1""" return self.get("sha1") @property def sha224(self) -> str: """sha224""" return self.get("sha224") @property def sha256(self) -> str: """sha256""" return self.get("sha256") @property def sha384(self) -> str: """sha384""" return self.get("sha384") @property def sha512(self) -> str: """sha512""" return self.get("sha512") @property def adler32(self) -> str: """adler32""" return self.get("adler32") @property def crc32(self) -> str: """crc32""" return self.get("crc32") PK!H!!#hb-1.3.0.dist-info/entry_points.txtN+I/N.,()HHKɴb..PK!r++hb-1.3.0.dist-info/LICENSEMIT License Copyright (c) 2018 Ching Chow Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HlŃTThb-1.3.0.dist-info/WHEEL A н#J@Z|Jmqvh&#hڭw!Ѭ"J˫( } %PK!Hd hb-1.3.0.dist-info/METADATAVmo6_!lŲkд t+P0E$6;ʯI[2sǻ{zrjZrL||I8վx=gc2!Zܕ!YTՄQI(3r`)U,1w0yxe9Y-չVws)/nwS5- hsIӾ3iL1Z:Uw% *5fdvw>?AK_^|[{JH ,@Ґd<&!q#y*[(bJ#EiɤJv;[jBBlF ]IUj,6ǏL|kzj2JZi[\1lkմ(%-{5i+jG7M;~6I@Zл;*bJnӁu3ЬAMJ ƽYVV6""+9_~250zkWhߋӋwmNu.@΋(Tbh:3ŁjnRӒ#bUy7<UyZ"V[ְEAl6켥^^_b`8t`!8(6ABG y A^!Eo 4fQ/FҞ*e!7M"$c+@jD5AD D ;S$D"=M2ZꦪTm]J-x o0 %)sX*lqP}g%2* rSp Ӳj j"ץ2SxjHz ~l+ya{ X%ԁj_Y6M۶m[sڮ盧аS s o' ">8d1`G|%\;[h*Z=)0z (nXS# tI@J۲!2h\ |TSnj~҆h?]~[PPKq'g9NE0T3~d?"wV%M?x׺x]~sUie4(D}MeeBުSF[1Lmxbɓ]^WƄpuF bJMié=oFuK:/!ػh9w^; {: 2>F54s`HEpU-+."9(GOp_SYOw?o]f K3*E/NPK!@E H hb/cli.pyPK!snn hb/main.pyPK!H!!#hb-1.3.0.dist-info/entry_points.txtPK!r++hb-1.3.0.dist-info/LICENSEPK!HlŃTTZ"hb-1.3.0.dist-info/WHEELPK!Hd "hb-1.3.0.dist-info/METADATAPK!Hb#)_U(hb-1.3.0.dist-info/RECORDPK *