PK!4 Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO BUT IT'S NOT MY FAULT PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. 1. Do not hold the author(s), creator(s), developer(s) or distributor(s) liable for anything that happens or goes wrong with your use of the work. PK!simple_test_process/__init__.pyPK!q==simple_test_process/__main__.pyfrom .parseArgs import parseArgs from .runProcess import runProcess import sys def printErr(msg): print(msg, file=sys.stderr) kwargs = parseArgs(*sys.argv[1:]).__dict__ result = runProcess(**kwargs) if result.stdout: print(result.stdout) if result.stderr: printErr(result.stderr) exit(result.code) PK!#.,,#simple_test_process/fns/__init__.pyfrom .all_ import all_ from .any_ import any_ from .appendOne import appendOne from .applyOneTo import applyOneTo from .assign import assign from .discardFirst import discardFirst from .discardWhen import discardWhen from .endsWith import endsWith from .forEach import forEach from .getAttributeKeys import getAttributeKeys from .getListOfCollectionKeys import getListOfCollectionKeys from .getListOfCollectionValues import getListOfCollectionValues from .iif import iif from .invoke import invoke from .invokeAttr import invokeAttr from .isEmpty import isEmpty from .isLaden import isLaden from .joinWith import joinWith from .keepWhen import keepWhen from .map_ import map_ from .noop import noop from .passThrough import passThrough from .prependStr import prependStr from .raise_ import raise_ from .split import split from .toWrittenList import toWrittenList __all__ = [ "all_", "any_", "appendOne", "applyOneTo", "assign", "discardFirst", "discardWhen", "endsWith", "forEach", "getAttributeKeys", "getListOfCollectionKeys", "getListOfCollectionValues", "iif", "invoke", "invokeAttr", "isEmpty", "isLaden", "joinWith", "keepWhen", "map_", "noop", "passThrough", "prependStr", "raise_", "split", "toWrittenList", ] PK!]simple_test_process/fns/all_.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .internal.makeGenericCallFn import makeGenericCallFn from .internal.getTypedResult import getTypedResult from .decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def all_(predicate): fnName = all.__name__ callPredicate = makeGenericCallFn(predicate, 3, fnName) def all_inner(collection): typedAll = getTypedResult(collection, typeToAll, fnName) return typedAll(callPredicate, collection) return all_inner # ------- # # Helpers # # ------- # def all_list(callPredicate, aList): for idx, el in enumerate(aList): if not callPredicate(el, idx, aList): return False return True def all_simpleNamespace(callPredicate, aSimpleNamespace): for key, val in aSimpleNamespace.__dict__.items(): if not callPredicate(val, key, aSimpleNamespace): return False return True typeToAll = {list: all_list, SimpleNamespace: all_simpleNamespace} PK!v_simple_test_process/fns/any_.py# ------- # # Imports # # ------- # from .internal.makeGenericCallFn import makeGenericCallFn from .internal.getTypedResult import getTypedResult from .decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def any_(predicate): fnName = any.__name__ callPredicate = makeGenericCallFn(predicate, 3, fnName) def any_inner(collection): typedAny = getTypedResult(collection, typeToAny, fnName) return typedAny(callPredicate, collection) return any_inner # ------- # # Helpers # # ------- # def any_list(callPredicate, aList): for idx, el in enumerate(aList): if callPredicate(el, idx, aList): return True return False typeToAny = {list: any_list} PK!*>B$simple_test_process/fns/appendOne.py# ------- # # Imports # # ------- # from .internal.getTypedResult import getTypedResult # ---- # # Main # # ---- # def appendOne(el): def appendOne_inner(collection): typedAppendOne = getTypedResult( collection, typeToAppendOne, appendOne.__name__ ) return typedAppendOne(el, collection) return appendOne_inner # ------- # # Helpers # # ------- # def appendOne_list(el, aList): return aList + [el] typeToAppendOne = {list: appendOne_list} PK!G7`%simple_test_process/fns/applyOneTo.pyfrom .decorators.argIsCallable import argIsCallable @argIsCallable def applyOneTo(callable): def applyOneTo_inner(arg): return callable(arg) return applyOneTo_inner PK!: A!simple_test_process/fns/assign.py# ------- # # Imports # # ------- # from .internal.getTypedResult import getTypedResult from types import SimpleNamespace from copy import copy # ---- # # Main # # ---- # def assign(collection): fnName = assign.__name__ assignFn = getTypedResult(collection, typeToAssign, fnName) return assignFn(collection) # ------- # # Helpers # # ------- # def assign_simpleNamespace(primary): def assign_simpleNamespace_inner(secondary): result = copy(primary) for k, v in secondary.__dict__.items(): if k not in result.__dict__: setattr(result, k, v) return result return assign_simpleNamespace_inner def assign_dict(primary): def assign_dict_inner(secondary): result = copy(primary) for k, v in secondary.items(): if k not in result: result[k] = v return result return assign_dict_inner typeToAssign = {dict: assign_dict, SimpleNamespace: assign_simpleNamespace} PK!.simple_test_process/fns/decorators/__init__.pyPK![`)gEE3simple_test_process/fns/decorators/argIsCallable.pyfrom inspect import signature import wrapt @wrapt.decorator def argIsCallable(fn, _instance, args, kwargs): if not callable(args[0]): argName = list(signature(fn).parameters)[0] fnName = fn.__name__ raise ValueError(f"{fnName} requires {argName} to be callable") return fn(*args, **kwargs) PK!waII0simple_test_process/fns/decorators/argIsClass.pyfrom inspect import isclass, signature import wrapt @wrapt.decorator def argIsClass(fn, _instance, args, kwargs): if not isclass(args[0]): argName = list(signature(fn).parameters)[0] fnName = fn.__name__ raise ValueError(f"{fnName} requires {argName} to be a class") return fn(*args, **kwargs) PK!dZT3simple_test_process/fns/decorators/argIsInstance.pyfrom inspect import signature from ..internal.raise_ import raise_ import wrapt def argIsInstance(aType, fnName=None): @wrapt.decorator def wrapper(fn, _instance, args, kwargs): nonlocal fnName if not isinstance(args[0], aType): argName = list(signature(fn).parameters)[0] fnName = fnName or fn.__name__ typePassed = type(args[0]) typeName = aType.__name__ raise_( ValueError, f""" {fnName} requires {argName} to be an instance of {typeName} type passed: {typePassed.__name__} """, ) return fn(*args, **kwargs) return wrapper PK!5simple_test_process/fns/decorators/argIsListOfType.pyfrom ordered_set import OrderedSet import wrapt from ..internal.discardWhen import discardWhen from ..internal.get import get from ..internal.getArgName import getArgName from ..internal.isLaden import isLaden from ..internal.isType import isType from ..internal.joinWith import joinWith from ..internal.map_ import map_ from ..internal.passThrough import passThrough from ..internal.raise_ import raise_ from ..internal.sort import sort from ..internal.toType import toType def argIsListOfType(aType): @wrapt.decorator def wrapper(fn, _instance, args, kwargs): typePassed = type(args[0]) fnName = fn.__name__ typeName = aType.__name__ if typePassed is not list: argName = getArgName(fn) raise_( ValueError, f"""\ {fnName} requires {argName} to have the type list type passed: {typePassed.__name__} """, ) invalidTypes = discardWhen(isType(aType))(args[0]) if isLaden(invalidTypes): argName = getArgName(fn) invalidTypeNames = passThrough( invalidTypes, [ map_(toType), OrderedSet, list, map_(get("__name__")), sort, joinWith(", "), ], ) raise_( ValueError, f"""\ {fnName} requires {argName} to be a list of {typeName} invalid types passed: {invalidTypeNames} """, ) return fn(*args, **kwargs) return wrapper PK!E/simple_test_process/fns/decorators/argIsType.pyfrom inspect import signature from ..internal.raise_ import raise_ import wrapt def argIsType(aType): @wrapt.decorator def wrapper(fn, _instance, args, kwargs): typePassed = type(args[0]) if typePassed is not aType: argName = list(signature(fn).parameters)[0] fnName = fn.__name__ typeName = aType.__name__ raise_( ValueError, f""" {fnName} requires {argName} to have the type {typeName} type passed: {typePassed.__name__} """, ) return fn(*args, **kwargs) return wrapper PK!ǥ&&'simple_test_process/fns/discardFirst.py# ------- # # Imports # # ------- # from .internal.getTypedResult import getTypedResult # ---- # # Main # # ---- # def discardFirst(n): fnName = discardFirst.__name__ def discardFirst_inner(collection): discardFn = getTypedResult(collection, typeToDiscardFirst, fnName) return discardFn(n, collection) return discardFirst_inner # ------- # # Helpers # # ------- # def discardFirst_viaSlice(n, sliceAble): return sliceAble[n:] typeToDiscardFirst = {list: discardFirst_viaSlice, str: discardFirst_viaSlice} PK!| ;;&simple_test_process/fns/discardWhen.pyfrom .internal.discardWhen import discardWhen # noqa f401 PK!t8#simple_test_process/fns/endsWith.pyfrom .decorators.argIsType import argIsType @argIsType(str) def endsWith(prefix): @argIsType(str) def endsWith_inner(fullStr): return fullStr.endswith(prefix) return endsWith_inner PK!`c??"simple_test_process/fns/forEach.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .internal.makeGenericCallFn import makeGenericCallFn from .internal.getTypedResult import getTypedResult from .decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def forEach(fn): fnName = forEach.__name__ callFn = makeGenericCallFn(fn, 3, fnName) def forEach_inner(collection): typedForEach = getTypedResult(collection, typeToForEach, fnName) return typedForEach(callFn, collection) return forEach_inner # ------- # # Helpers # # ------- # def forEach_dict(callFn, aDict): for key, val in aDict.items(): callFn(val, key, aDict) return aDict def forEach_list(callFn, aList): for idx, el in enumerate(aList): callFn(el, idx, aList) return aList def forEach_simpleNamespace(callFn, aSimpleNamespace): forEach_dict(callFn, aSimpleNamespace.__dict__) return aSimpleNamespace typeToForEach = { dict: forEach_dict, list: forEach_list, SimpleNamespace: forEach_simpleNamespace, } PK!FF+simple_test_process/fns/getAttributeKeys.pydef getAttributeKeys(something): return something.__dict__.keys() PK!$2simple_test_process/fns/getListOfCollectionKeys.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .internal.getTypedResult import getTypedResult # ---- # # Main # # ---- # def getListOfCollectionKeys(collection): fnName = getListOfCollectionKeys.__name__ typedKeys = getTypedResult(collection, typeToKeys, fnName) return typedKeys(collection) # ------- # # Helpers # # ------- # def getListOfCollectionKeys_dict(aDict): return list(aDict.keys()) def getListOfCollectionKeys_simpleNamespace(aSimpleNamespace): return list(aSimpleNamespace.__dict__.keys()) typeToKeys = { dict: getListOfCollectionKeys_dict, SimpleNamespace: getListOfCollectionKeys_simpleNamespace, } PK!ȇ#4simple_test_process/fns/getListOfCollectionValues.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .internal.getTypedResult import getTypedResult # ---- # # Main # # ---- # def getListOfCollectionValues(collection): fnName = getListOfCollectionValues.__name__ typedValues = getTypedResult(collection, typeToValues, fnName) return typedValues(collection) # ------- # # Helpers # # ------- # def getListOfCollectionValues_dict(aDict): return list(aDict.values()) def getListOfCollectionValues_simpleNamespace(aSimpleNamespace): return list(aSimpleNamespace.__dict__.values()) typeToValues = { dict: getListOfCollectionValues_dict, SimpleNamespace: getListOfCollectionValues_simpleNamespace, } PK!R++simple_test_process/fns/iif.pyfrom .internal.iif import iif # noqa f401 PK!B u/simple_test_process/fns/internal/NotCallable.pyfrom types import SimpleNamespace from .mAssignToSelf import mAssignToSelf from .raise_ import raise_ import os class NotCallable(SimpleNamespace): def __init__(self, fnName, **typeToFn): self._fnName = fnName self._typeToFn = typeToFn mAssignToSelf(typeToFn, self) def __call__(self, *args, **kwargs): availableKeys = list(self._typeToFn.keys()) fnName = self._fnName raise_( TypeError, f""" The utility '{fnName}' is not callable because it needs to know what to return in the case of an empty list. example usage: {fnName}.{availableKeys[0]}([...]) available keys: {os.linesep.join(availableKeys)} """, ) PK!,simple_test_process/fns/internal/__init__.pyPK!F/simple_test_process/fns/internal/discardWhen.py# ------- # # Imports # # ------- # from .getTypedResult import getTypedResult from .makeGenericCallFn import makeGenericCallFn from ..decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def discardWhen(predicate): fnName = discardWhen.__name__ shouldDiscard = makeGenericCallFn(predicate, 3, fnName) def discardWhen_inner(collection): typedDiscardWhen = getTypedResult(collection, typeToDiscardWhen, fnName) return typedDiscardWhen(shouldDiscard, collection) return discardWhen_inner # ------- # # Helpers # # ------- # def discardWhen_list(shouldDiscard, aList): result = [] for idx, el in enumerate(aList): if not shouldDiscard(el, idx, aList): result.append(el) return result def discardWhen_dict(shouldDiscard, aDict): result = {} for key, val in aDict.items(): if shouldDiscard(val, key, aDict): result[key] = val return result typeToDiscardWhen = {list: discardWhen_list, dict: discardWhen_dict} PK!ss'simple_test_process/fns/internal/get.pydef get(attrName): def get_inner(something): return getattr(something, attrName) return get_inner PK!)jj.simple_test_process/fns/internal/getArgName.pyfrom inspect import signature def getArgName(fn, idx=0): return list(signature(fn).parameters)[idx] PK!X2simple_test_process/fns/internal/getFnSignature.pyfrom inspect import signature from .raise_ import raise_ def getFnSignature(fn, callerName): try: return signature(fn) except Exception as e: raise_( ValueError, f"""\ '{callerName}' is unable to get the signature of the passed callable callable passed: {fn.__name__} one reason this could occur is the callable is written in c (e.g. the builtin 'str' callable). """, fromException=e, ) PK!k:simple_test_process/fns/internal/getNumPositionalParams.py# ------- # # Imports # # ------- # from inspect import Parameter from math import inf from .getFnSignature import getFnSignature from .iif import iif # ---- # # Init # # ---- # _nonVarPositionalParamKinds = { Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD, } # ---- # # Main # # ---- # # # Returns a tuple # numRequired: number of required positional params # numAllowed: number of allowed positional params # def getNumPositionalParams(fn, callerName): sig = getFnSignature(fn, callerName) numAllowed = iif(_hasVarPositionalParam(sig), inf, 0) numRequired = 0 for p in sig.parameters.values(): if p.kind in _nonVarPositionalParamKinds: numAllowed += 1 if p.default is Parameter.empty: numRequired += 1 return (numRequired, numAllowed) # ------- # # Helpers # # ------- # def _hasVarPositionalParam(sig): for p in sig.parameters.values(): if p.kind is Parameter.VAR_POSITIONAL: return True PK!w<2simple_test_process/fns/internal/getTypedResult.py# # README # - I'm not sure what to call this. Its purpose is to centralize the type # validation, minimizing copy/paste. # # ------- # # Imports # # ------- # from .raise_ import raise_ from .get import get # ---- # # Init # # ---- # getName = get("__name__") # # we need a unique object here to ensure the 'typeToSomething' doesn't lead us # to a 'None' value # nothing = object() # ---- # # Main # # ---- # def getTypedResult(value, typeToSomething, fnName): valueType = type(value) result = typeToSomething.get(valueType, nothing) if _isSomething(result): return result supportedTypes = ", ".join(map(getName, typeToSomething.keys())) raise_( ValueError, f"""\ {fnName} doesn't support the type '{valueType.__name__}' supported types: {supportedTypes} """, ) # ------- # # Helpers # # ------- # def _isSomething(x): return x is not nothing PK!;||'simple_test_process/fns/internal/iif.pydef iif(condition, whenTruthy, whenFalsey): if condition: return whenTruthy else: return whenFalsey PK!ws{33+simple_test_process/fns/internal/isLaden.pydef isLaden(aList): return len(aList) is not 0 PK!Ȋ4simple_test_process/fns/internal/isOnlyWhitespace.pyimport re onlyWhitespaceRe = re.compile(r"\s*$") def isOnlyWhitespace(aString): return onlyWhitespaceRe.match(aString) is not None PK!f*simple_test_process/fns/internal/isType.pyfrom inspect import isclass def isType(aType): if not isclass(aType): raise ValueError("isType requires argument 'aType' to pass isclass") def isType_inner(something): return type(something) is aType return isType_inner PK!v_R*++,simple_test_process/fns/internal/joinWith.py# ------- # # Imports # # ------- # from ordered_set import OrderedSet from .getTypedResult import getTypedResult # ---- # # Main # # ---- # def joinWith(separator): def joinWith_inner(collection): typedJoinWith = getTypedResult(collection, typeToJoinWith, "joinWith") return typedJoinWith(separator, collection) return joinWith_inner # ------- # # Helpers # # ------- # def joinWith_iterable(separator, aList): return separator.join(aList) typeToJoinWith = {list: joinWith_iterable, OrderedSet: joinWith_iterable} PK!K1simple_test_process/fns/internal/mAssignToSelf.py# # assigns dict key-values onto self as attributes # return secondaryObj # ** mutates secondaryObj # def mAssignToSelf(aDict, aSelf): for k, v in aDict.items(): setattr(aSelf, k, v) return aSelf PK!K.simple_test_process/fns/internal/makeCallFn.py# ------- # # Imports # # ------- # from math import inf from .returnFirstArgument import returnFirstArgument as identity from .getNumPositionalParams import getNumPositionalParams # ---- # # Main # # ---- # def makeCallFn(fn, callerName, *, modifyResult=identity): (_, allowed) = getNumPositionalParams(fn, callerName) if allowed is inf: return lambda *args, **kwargs: modifyResult(fn(*args, **kwargs)) else: return lambda *args, **kwargs: modifyResult( fn(*args[:allowed], **kwargs) ) PK!T-5simple_test_process/fns/internal/makeGenericCallFn.py# ------- # # Imports # # ------- # from math import inf from .getNumPositionalParams import getNumPositionalParams from .raise_ import raise_ # ---- # # Main # # ---- # def makeGenericCallFn(fn, maxParams, callerName): required, allowed = getNumPositionalParams(fn, callerName) if allowed is inf: allowed = maxParams if required > maxParams: raise_( ValueError, f""" {callerName} can only take functions with up to {maxParams} positional params. The function '{fn.__name__}' requires {required} """, ) return lambda *args, **kwargs: fn(*args[:allowed], **kwargs) PK!mC(simple_test_process/fns/internal/map_.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .makeGenericCallFn import makeGenericCallFn from .getTypedResult import getTypedResult from ..decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def map_(mapperFn): fnName = map_.__name__ callMapperFn = makeGenericCallFn(mapperFn, 3, fnName) def map_inner(collection): typedMap = getTypedResult(collection, typeToMap, fnName) return typedMap(callMapperFn, collection) return map_inner # ------- # # Helpers # # ------- # def map_list(callMapperFn, aList): result = [] for idx, el in enumerate(aList): result.append(callMapperFn(el, idx, aList)) return result def map_simpleNamespace(callMapperFn, aSimpleNamespace): result = SimpleNamespace() for key, val in aSimpleNamespace.__dict__.items(): setattr(result, key, callMapperFn(val, key, aSimpleNamespace)) return result typeToMap = {list: map_list, SimpleNamespace: map_simpleNamespace} PK!yy/simple_test_process/fns/internal/passThrough.pyfrom .reduce import reduce def passThrough(arg, fnList): return reduce(lambda result, fn: fn(result), arg)(fnList) PK!W*simple_test_process/fns/internal/raise_.pyfrom tedent import tedent from .isOnlyWhitespace import isOnlyWhitespace import os def raise_(errorClass, message, *, fromException=None): allLines = message.split(os.linesep) if isOnlyWhitespace(allLines[0]) and isOnlyWhitespace(allLines[-1]): message = tedent(message) err = errorClass(message) if fromException is None: raise err else: raise err from fromException PK!Et*simple_test_process/fns/internal/reduce.py# ------- # # Imports # # ------- # from .makeGenericCallFn import makeGenericCallFn from .getTypedResult import getTypedResult from ..decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def reduce(fn, initial): reducerFn = makeGenericCallFn(fn, 4, "reduce") def reduce_inner(collection): typedReduce = getTypedResult(collection, typeToReduce, "reduce") return typedReduce(reducerFn, initial, collection) return reduce_inner # ------- # # Helpers # # ------- # def reduce_list(reducerFn, initial, aList): result = initial for idx, el in enumerate(aList): result = reducerFn(result, el, idx, aList) return result typeToReduce = {list: reduce_list} PK!pjj7simple_test_process/fns/internal/returnFirstArgument.pyfrom .iif import iif def returnFirstArgument(*args, **kwargs): return iif(len(args), args[0], None) PK!\xx3simple_test_process/fns/internal/sanitizeAscDesc.pyfrom .raise_ import raise_ ascDescSet = {"asc", "desc"} def sanitizeAscDesc(ascOrDesc): sanitized = ascOrDesc.lower() if sanitized not in ascDescSet: raise_( ValueError, f""" ascOrDesc must be either 'asc' or 'desc' (case insensitive) value given: {ascOrDesc} """, ) return sanitized PK!0oo(simple_test_process/fns/internal/sort.py# ------- # # Imports # # ------- # from .getTypedResult import getTypedResult # ---- # # Main # # ---- # def sort(collection): typedSort = getTypedResult(collection, typeToSort, "sort") return typedSort(collection) # ------- # # Helpers # # ------- # def sort_viaSorted(something): return sorted(something) typeToSort = {list: sort_viaSorted} PK!V;22*simple_test_process/fns/internal/toType.pydef toType(something): return type(something) PK!sbss!simple_test_process/fns/invoke.pyfrom .decorators.argIsCallable import argIsCallable @argIsCallable def invoke(aCallable): return aCallable() PK!%tt%simple_test_process/fns/invokeAttr.pydef invokeAttr(key): def invokeAttr_inner(obj): return getattr(obj, key)() return invokeAttr_inner PK!¢["simple_test_process/fns/isEmpty.pyfrom types import SimpleNamespace # TODO: make 'isEmpty' generic like the other utils def isEmpty(lenAble): if isinstance(lenAble, SimpleNamespace): return len(lenAble.__dict__) is 0 else: return len(lenAble) is 0 PK!X33"simple_test_process/fns/isLaden.pyfrom .internal.isLaden import isLaden # noqa f401 PK!955#simple_test_process/fns/joinWith.pyfrom .internal.joinWith import joinWith # noqa f401 PK!*xNOO#simple_test_process/fns/keepWhen.py# ------- # # Imports # # ------- # from types import SimpleNamespace from .internal.makeGenericCallFn import makeGenericCallFn from .internal.getTypedResult import getTypedResult from .decorators.argIsCallable import argIsCallable # ---- # # Main # # ---- # @argIsCallable def keepWhen(predicate): fnName = keepWhen.__name__ shouldKeep = makeGenericCallFn(predicate, 3, fnName) def keepWhen_inner(collection): typedKeepWhen = getTypedResult(collection, typeToKeepWhen, fnName) return typedKeepWhen(shouldKeep, collection) return keepWhen_inner # ------- # # Helpers # # ------- # def keepWhen_list(shouldKeep, aList): result = [] for idx, el in enumerate(aList): if shouldKeep(el, idx, aList): result.append(el) return result def keepWhen_dict(shouldKeep, aDict): result = {} for key, val in aDict.items(): if shouldKeep(val, key, aDict): result[key] = val return result def keepWhen_simpleNamespace(shouldKeep, aSimpleNamespace): result = SimpleNamespace() for key, val in aSimpleNamespace.__dict__.items(): if shouldKeep(val, key, aSimpleNamespace): setattr(result, key, val) return result typeToKeepWhen = { list: keepWhen_list, dict: keepWhen_dict, SimpleNamespace: keepWhen_simpleNamespace, } PK!}--simple_test_process/fns/map_.pyfrom .internal.map_ import map_ # noqa f401 PK!!$$simple_test_process/fns/noop.pydef noop(*args, **kwargs): pass PK!F;;&simple_test_process/fns/passThrough.pyfrom .internal.passThrough import passThrough # noqa f401 PK!76E  %simple_test_process/fns/prependStr.pyfrom .decorators.argIsInstance import argIsInstance @argIsInstance(str) def prependStr(prependThis): fnName = prependStr.__name__ @argIsInstance(str, fnName) def prependStr_inner(toThis): return prependThis + toThis return prependStr_inner PK!(11!simple_test_process/fns/raise_.pyfrom .internal.raise_ import raise_ # noqa f401 PK!:ʼtt simple_test_process/fns/split.pydef split(separator): def split_inner(aString): return aString.split(separator) return split_inner PK!i(simple_test_process/fns/toWrittenList.pyfrom .internal.joinWith import joinWith def toWrittenList(aList): andSeparated = aList[-2:] commaSeparated = aList[:-2] commaSeparated.append(joinWith(" and ")(andSeparated)) return joinWith(", ")(commaSeparated) PK!Ό +simple_test_process/onlyKeepGreppedTests.py# ------- # # Imports # # ------- # import re from .fns import all_, any_, forEach, isEmpty as areEmpty, keepWhen, map_ # ---- # # Main # # ---- # # # This method mutates state # def onlyKeepGreppedTests(state, grepArgs): if all_(areEmpty)(grepArgs): return state grepArgs = map_(allToRegexes)(grepArgs) markGreppedTests(state, grepArgs.grepTests) markGreppedSuites(state, grepArgs.grepSuites) markGreppedTestsAndSuites(state, grepArgs.grep) trimNonMarkedTestsAndSuites(state) removeAllMarks(state) return state # ------- # # Helpers # # ------- # def mark(testOrSuite): testOrSuite._keep = True if testOrSuite.parentSuite: mark(testOrSuite.parentSuite) def allToRegexes(grepStrings): return map_(lambda aString: re.compile(aString))(grepStrings) def regexMatches(someString): def regexMatches_inner(aRegex): return bool(aRegex.search(someString)) return regexMatches_inner def markTestIfLabelMatches(regexes): def markTestIfLabelMatches_inner(testOrSuite): if any_(regexMatches(testOrSuite.label))(regexes): mark(testOrSuite) return markTestIfLabelMatches_inner def markGreppedTests(stateOrSuite, grepTests): forEach(markTestIfLabelMatches(grepTests))(stateOrSuite.tests) forEach(lambda suite: markGreppedTests(suite, grepTests))( stateOrSuite.suites ) def markAllTestsIfLabelMatches(regexes): def markAllTestsIfLabelMatches_inner(suite): if any_(regexMatches(suite.label))(regexes): forEach(mark)(suite.tests) return markAllTestsIfLabelMatches_inner def markGreppedSuites(stateOrSuite, grepSuites): forEach(markAllTestsIfLabelMatches(grepSuites))(stateOrSuite.suites) forEach(lambda suite: markGreppedSuites(suite, grepSuites))( stateOrSuite.suites ) def markGreppedTestsAndSuites(stateOrSuite, grep): forEach(markTestIfLabelMatches(grep))(stateOrSuite.tests) forEach(markAllTestsIfLabelMatches(grep))(stateOrSuite.suites) forEach(lambda suite: markGreppedTestsAndSuites(suite, grep))( stateOrSuite.suites ) def isMarked(testOrSuite): return getattr(testOrSuite, "_keep", False) def trimNonMarkedTestsAndSuites(stateOrSuite): stateOrSuite.tests = keepWhen(isMarked)(stateOrSuite.tests) stateOrSuite.suites = keepWhen(isMarked)(stateOrSuite.suites) forEach(trimNonMarkedTestsAndSuites)(stateOrSuite.suites) def removeMark(testOrSuite): delattr(testOrSuite, "_keep") def removeAllMarks(stateOrSuite): forEach(removeMark)(stateOrSuite.tests) forEach(removeMark)(stateOrSuite.suites) forEach(removeAllMarks)(stateOrSuite.suites) PK!3 simple_test_process/parseArgs.py# ------- # # Imports # # ------- # from case_conversion import camelcase from copy import deepcopy from types import SimpleNamespace as o import case_conversion from .fns import ( getListOfCollectionKeys, map_, passThrough, prependStr, raise_, toWrittenList, ) # ---- # # Init # # ---- # # # case_conversion.dashcase takes more than a single argument so `map_` will pass # it the index and original array which may screw things up # def dashcase(aString): return case_conversion.dashcase(aString) _grepArgs = o(grep=[], grepSuites=[], grepTests=[]) _grepArgsKeys = passThrough( _grepArgs, [getListOfCollectionKeys, map_(dashcase), map_(prependStr("--"))] ) _availableGrepArgsKeys = toWrittenList(_grepArgsKeys) # ---- # # Main # # ---- # # # We can assume the argument order # 0 reporter # 1 silent # 2?+ grep | grepSuites | grepTests # # # and we can also assume # - reporter is a non-relative module name # - silent is a string boolean # - grepArgs may or may not exist. Validation is only for debugging purposes # as calling code should be reliable. # # def parseArgs(*args): return o(reporter=args[0], silent=args[1], grepArgs=parseGrepArgs(args[2:])) # ------- # # Helpers # # ------- # def parseGrepArgs(grepArgs): result = deepcopy(_grepArgs) i = 0 grepArgsLen = len(grepArgs) while i < grepArgsLen: arg = grepArgs[i] if arg not in _grepArgsKeys: raise_( ValueError, f""" key '{arg}' is invalid available grep keys: {_availableGrepArgsKeys} """, ) grepKey = arg i += 1 if i == grepArgsLen: raise_( ValueError, f""" a value must be given to '{grepKey}' """, ) grepVals = getattr(result, camelcase(grepKey)) grepVals.append(grepArgs[i]) i += 1 return result PK!S"simple_test_process/runAllTests.py# ------- # # Imports # # ------- # from traceback import format_exc from .fns import forEach, invoke # ---- # # Main # # ---- # def runAllTests(stateOrSuite): stateOrSuite.before() forEach(runTest)(stateOrSuite.tests) forEach(runAllTests)(stateOrSuite.suites) stateOrSuite.after() # ------- # # Helpers # # ------- # def runTest(aTest): parent = aTest.parentSuite or aTest.rootState try: forEach(invoke)(parent.beforeEach) aTest.before() aTest.fn() aTest.succeeded = True except Exception as e: aTest.succeeded = False aTest.rootState.succeeded = False propagateFailure(aTest.parentSuite) aTest.formattedException = format_exc() aTest.error = e finally: aTest.after() forEach(invoke)(parent.afterEach) def propagateFailure(aSuite): if aSuite is None: return if aSuite.succeeded: aSuite.succeeded = False propagateFailure(aSuite.parentSuite) PK!6YLkk!simple_test_process/runProcess.py# ------- # # Imports # # ------- # from os import path from tedent import tedent from traceback import format_exc from types import SimpleNamespace as o from .onlyKeepGreppedTests import onlyKeepGreppedTests from .runAllTests import runAllTests from .state import initState from .utils import gatherTests, importTests, makeCmd, twoLineSeps from .validateAndGetReportFn import validateAndGetReportFn import os import toml from .fns import ( forEach, iif, isEmpty, joinWith, map_, noop, passThrough, prependStr, ) # ---- # # Main # # ---- # def runProcess(*, reporter, silent, grepArgs): try: silent = silent == "True" cliResult = o(stderr=None, stdout=None, code=None) after = noop before = noop if not silent: validationResult = validateAndGetReportFn( reporter, silent, cliResult ) if validationResult.hasError: return validationResult.cliResult report = validationResult.report reportOpts = None if path.isfile("pyproject.toml"): allProjectSettings = toml.load("pyproject.toml") result = getValueAtPath(allProjectSettings, ["tool", reporter]) if result.hasValue: reportOpts = result.value result = getValueAtPath( allProjectSettings, ["tool", "simple_test"] ) if result.hasValue: if "before" in result.value: before = makeCmd(result.value["before"], "before") if "after" in result.value: after = makeCmd(result.value["after"], "after") state = initState() state.before = before state.after = after importTests() forEach(gatherTests)(state.suites) onlyKeepGreppedTests(state, grepArgs) runAllTests(state) if not state.testsFound: if not silent: cliResult.stderr = tedent( f""" No tests were found in any python files under the project's tests directory: '{path.join(os.getcwd(), 'tests')}' Remember you define tests by decorating a function with @test("test label") """ ) cliResult.code = 2 return cliResult if isEmpty(state.tests) and isEmpty(state.suites): # grepArgs must contain something if this code is reached if not silent: cliResult.stderr = ( "Your grep options failed to match any suites or tests" ) cliResult.code = 2 return cliResult if not silent: if reportOpts is None: report(state) else: report(state, reportOpts) cliResult.code = iif(state.succeeded, 0, 1) return cliResult except Exception: if not silent: cliResult.stderr = ( os.linesep + "An error occurred during simple_test_process" + twoLineSeps + format_exc() ) cliResult.code = 2 return cliResult # ------- # # Helpers # # ------- # def toFormattedGrepArgs(greppedStrings, grepKey): return ( grepKey + ":" + os.linesep + passThrough( greppedStrings, [map_(prependStr(" ")), joinWith(os.linesep)] ) ) def getValueAtPath(aDict, pathToValue): result = o(hasValue=None, value=None) val = aDict for segment in pathToValue: if segment not in val: result.hasValue = False return result val = val[segment] result.hasValue = True result.value = val return result PK!>Lsimple_test_process/state.pyfrom copy import copy, deepcopy from types import SimpleNamespace as o from .fns import appendOne, assign, noop _initialState = o( tests=[], suites=[], currentSuite=None, testsFound=False, succeeded=True, after=noop, afterEach=[], before=noop, beforeEach=[], ) state = deepcopy(_initialState) def _getState(): return state def initState(): global state state = deepcopy(_initialState) return state def _getCommon(*, label, fn, parentSuite, rootState, after, before): return o( label=label, fn=fn, parentSuite=parentSuite, rootState=rootState, after=after, before=before, ) def addSuite(label, after, afterEach, before, beforeEach, fn): currentSuite = state.currentSuite parent = currentSuite or state if afterEach is noop: afterEach = copy(parent.afterEach) else: afterEach = appendOne(afterEach)(parent.afterEach) if beforeEach is noop: beforeEach = copy(parent.beforeEach) else: beforeEach = appendOne(beforeEach)(parent.beforeEach) newSuite = assign( o( tests=[], suites=[], succeeded=True, afterEach=afterEach, beforeEach=beforeEach, ) )( _getCommon( label=label, fn=fn, parentSuite=currentSuite, rootState=state, after=after, before=before, ) ) parent.suites.append(newSuite) return state def addTest(label, after, before, fn): global state currentSuite = state.currentSuite test = _getCommon( label=label, fn=fn, parentSuite=currentSuite, rootState=state, after=after, before=before, ) if currentSuite is None: state.tests.append(test) else: currentSuite.tests.append(test) state.testsFound = True return state PK!wsimple_test_process/suite.pyfrom .state import addSuite from .fns import noop def suite(label, *, after=noop, afterEach=noop, before=noop, beforeEach=noop): def wrapper(fn): addSuite(label, after, afterEach, before, beforeEach, fn) return wrapper PK!2tӳsimple_test_process/test.pyfrom .state import addTest from .fns import noop def test(label, *, after=noop, before=noop): def wrapper(fn): addTest(label, after, before, fn) return wrapper PK!ɬ simple_test_process/utils.py# ------- # # Imports # # ------- # from glob import glob from os import path from subprocess import run from .suite import suite from .test import test import importlib.util import os import sys from .fns import ( discardWhen, endsWith, forEach, joinWith, map_, passThrough, raise_, split, ) # ---- # # Main # # ---- # twoLineSeps = os.linesep + os.linesep # # I'm choosing to gather all the tests prior to running any because I feel that # will be a simpler design. # def gatherTests(aSuite): oldCurrentSuite = aSuite.rootState.currentSuite aSuite.rootState.currentSuite = aSuite aSuite.fn() forEach(gatherTests)(aSuite.suites) aSuite.rootState.currentSuite = oldCurrentSuite # # I can't find a clean way to do this so I'm rolling my own. The python # import system is inherently hacky anyway :( # def importTests(): globStr = path.join("tests", "**", "*.py") passThrough( globStr, [ recursiveGlob, discardWhen(endsWith("__init__.py")), map_(toModulePath), forEach(importModule), ], ) def importModule(modulePath): try: spec = importlib.util.find_spec(modulePath) testModule = importlib.util.module_from_spec(spec) testModule.test = test testModule.suite = suite spec.loader.exec_module(testModule) except Exception as e: raise Exception(f"Error occurred while importing '{modulePath}'") from e def makeCmd(someStr, beforeOrAfter): fname = someStr.strip() _base, ext = path.splitext(fname) # not sure why this is necessary :( relativeFname = path.join(".", fname) if ext == ".py": return lambda: run([sys.executable, relativeFname]) elif ext == ".sh": return lambda: run([relativeFname]) raise_( ValueError, f""" incorrect value found in pyproject.toml -> tool.simple_test -> {beforeOrAfter} file extension must be .py or .sh. extension found: '{ext}' full path to pyproject.toml: {path.join(os.getcwd(), "pyproject.toml")} """, ) def recursiveGlob(globStr): return glob(globStr, recursive=True) def toModulePath(filePathFromTestsDir): return passThrough( filePathFromTestsDir, [removeExtension, split(os.sep), joinWith(".")] ) def removeExtension(filePath): return os.path.splitext(filePath)[0] PK!,,-simple_test_process/validateAndGetReportFn.py# ------- # # Imports # # ------- # from traceback import format_exc from types import SimpleNamespace as o from .utils import twoLineSeps import importlib import os # ---- # # Main # # ---- # def validateAndGetReportFn(reporter, silent, cliResult): validationResult = o(report=None, cliResult=cliResult, hasError=False) try: reporterModule = importlib.import_module(reporter) except: if not silent: err = "An error occurred while importing the reporter" cliResult.stderr = os.linesep + err + twoLineSeps + format_exc() cliResult.code = 2 validationResult.hasError = True return validationResult if hasattr(reporterModule, "report") and callable(reporterModule.report): validationResult.report = reporterModule.report else: if not silent: cliResult.stderr = ( os.linesep + "the reporter must expose a callable 'report'" ) cliResult.code = 2 validationResult.hasError = True return validationResult PK!HڽTU)simple_test_process-0.4.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H>,simple_test_process-0.4.0.dist-info/METADATAo0W#H@MeK7>Ϲ؄<%r>ϝ$J7 Q9[X4XCTk, #>c,~a8U5#C_up` H !q *:- D- |Í.݊ƙYo_kg7ϋC΅:?,h5'3F,#ݝM`82FVm4F ̥$nj^_yq\BEWWg|'DTgm2 mT%ވwi%!€*"ev'G8W!z?U0aW9PFGm~'cS;6T,5,ѻȅ~3v E9sHCmmQ|Zפ<<*qmj ZAگt_oXpvL\5ەRDQ?PK!HwŬ *simple_test_process-0.4.0.dist-info/RECORDYǖH*$@xЄ6NܪW[9r)ĉ&P]i;x!J R ^J \c+%Ѹ2rD Ei0t=a=n xAJt4C(΍ 2NzjT ^;ٽKÜDڄEh&5r|tE#ʮ?`!l~OD'H0-9xCySM*EU[X7  ?z= #[rb!LQo* ݄ǽ*ܴ)^4!]6Yɴ# 8=.e~'lץmX9yx7?T]K[u5Ba® ;psZ$mWٛ;e7_y*2OR%0kh>y+x=t>}ۤaS$A_8>?vl¨z:_ƅN˳̦zHsu&$Рd_!)~k9Rv%Oe+ϹưK:]j$HX49E6* S?|J)Qˬۓh$i^,LEXzqJR5P~ռ@*.L<?9M1aE?;`fs~<{YQߍm3BxE#M/Pʄ"73D.gu(&TY #,dpC. -RE'0Vadڝ\E O?i\p|v(3 aG1~Ym纭 c9%/0 uM1#;}MczI׽([(1A/}-&XlF*9[ş[u7?&:YzӲQ`|.&G,~jN2<2/&'tϕ,VTOIwJdT˪gO$-KV(Y8q*U)h62(5yJHAXiWQρvj-7 ,1z@cc6M&-ԹIb呻ܤdfll "8?xejEdYNMvGĠŻoLPIX>l2Vi ɘΛ[LsrRjky@smm_T=HtfURå찐|2D, Bb 4, e.ۄ^ѥ{9%q-.Gu; YH1(aqȠ#R<ֽ|%uKAQJL|Ak}ۿtOA*^GCc7*zRK.C>P ЬyJ3޻ڭ4^cF}99Iihc|Rz4 YCl-Ͽφ [L濏okaS'c\5ش#v7pT1ۦ}@,(,ߋa"Ckx (|گ;qH@)mӾAu) K{v`ȉO+~\@JM*N-k٪oXN{;wr,Ǽצ|>7Aumу͉w<98ȡG穮ռW( Tڮ["MPMC_I`6KzL#fU<*i؄g d>7O'5:<<|,%q+V[φPB 9?7S{`wXzbtq--O+]g/);|:{c\RyZDD/o%( o";T/E{+-33;3e]4!~AldpM 0G%0xNSX9|³Pd!yWMf}Fhձ+F/`qBnkxxKKt?rHGkh$8(0xk;{Wc791Ahq!hAWnڙB7՜to|=U^W&?^C8A2eNL%Ty5Jbl,ܚrdH~桊w_A @#6JTG/Fp Xa֊9H4eR P|ǯ˥N`Cuy>^h\>/̟s]ZX:^'}t)6U 6y+Exz8m>3D\Hwn<(NfE#tػ҅*?(OZ't*H"EJL]KqL|{g.Ytm1m@&S S>Z[5l'Zikn_M}&| !o/֣ld޻Q} !hV4~OO A"nQX`ypwx[Ţ>a?[|R .mlHh ~Bm+mkx x9L?~08 77dS T= i3tߌ^<{׸InD@h ˳ ZB)LT(x&*&Gib!p?PK!4B$5simple_test_process/fns/appendOne.pyPK!G7`%jsimple_test_process/fns/applyOneTo.pyPK!: A!esimple_test_process/fns/assign.pyPK!.simple_test_process/fns/decorators/__init__.pyPK![`)gEE3simple_test_process/fns/decorators/argIsCallable.pyPK!waII0ssimple_test_process/fns/decorators/argIsClass.pyPK!dZT3 simple_test_process/fns/decorators/argIsInstance.pyPK!5*simple_test_process/fns/decorators/argIsListOfType.pyPK!E/*&simple_test_process/fns/decorators/argIsType.pyPK!ǥ&&')simple_test_process/fns/discardFirst.pyPK!| ;;&n+simple_test_process/fns/discardWhen.pyPK!t8#+simple_test_process/fns/endsWith.pyPK!`c??",simple_test_process/fns/forEach.pyPK!FF+y1simple_test_process/fns/getAttributeKeys.pyPK!$22simple_test_process/fns/getListOfCollectionKeys.pyPK!ȇ#45simple_test_process/fns/getListOfCollectionValues.pyPK!R++8simple_test_process/fns/iif.pyPK!B u/8simple_test_process/fns/internal/NotCallable.pyPK!,;simple_test_process/fns/internal/__init__.pyPK!F/,<simple_test_process/fns/internal/discardWhen.pyPK!ss'@simple_test_process/fns/internal/get.pyPK!)jj.MAsimple_test_process/fns/internal/getArgName.pyPK!X2Bsimple_test_process/fns/internal/getFnSignature.pyPK!k:YDsimple_test_process/fns/internal/getNumPositionalParams.pyPK!w<2Hsimple_test_process/fns/internal/getTypedResult.pyPK!;||'Lsimple_test_process/fns/internal/iif.pyPK!ws{33+qMsimple_test_process/fns/internal/isLaden.pyPK!Ȋ4Msimple_test_process/fns/internal/isOnlyWhitespace.pyPK!f*Nsimple_test_process/fns/internal/isType.pyPK!v_R*++, Psimple_test_process/fns/internal/joinWith.pyPK!K1Rsimple_test_process/fns/internal/mAssignToSelf.pyPK!K.Ssimple_test_process/fns/internal/makeCallFn.pyPK!T-5Vsimple_test_process/fns/internal/makeGenericCallFn.pyPK!mC(Ysimple_test_process/fns/internal/map_.pyPK!yy/]]simple_test_process/fns/internal/passThrough.pyPK!W*#^simple_test_process/fns/internal/raise_.pyPK!Et* `simple_test_process/fns/internal/reduce.pyPK!pjj7Lsimple_test_process/state.pyPK!wsimple_test_process/suite.pyPK!2tӳڥsimple_test_process/test.pyPK!ɬ Ʀsimple_test_process/utils.pyPK!,,-simple_test_process/validateAndGetReportFn.pyPK!HڽTU)#simple_test_process-0.4.0.dist-info/WHEELPK!H>,simple_test_process-0.4.0.dist-info/METADATAPK!HwŬ *simple_test_process-0.4.0.dist-info/RECORDPKHH,