PK! None: """ Dummy function so that tests cover this module. >>> __dummy__() """ pass PK!preacher/app/__init__.pyPK!preacher/app/cli/__init__.pyPK!&Tpreacher/app/cli/application.pyfrom multiprocessing import Pool from typing import Callable, Iterable, Iterator import ruamel.yaml as yaml from preacher.core.scenario_running import ScenarioResult, run_scenario from preacher.compilation.scenario import ScenarioCompiler from preacher.presentation.logging import LoggingPresentation MapFunction = Callable[ [ Callable[[str], ScenarioResult], Iterable[str] ], Iterator[ScenarioResult] ] class Application: def __init__( self, view: LoggingPresentation, base_url: str, retry: int = 0, ): self._view = view self._base_url = base_url self._retry = retry self._scenario_compiler = ScenarioCompiler() self._is_succeeded = True @property def is_succeeded(self) -> bool: return self._is_succeeded def run( self, config_paths: Iterable[str], map_func: MapFunction = map, ) -> None: results = map_func(self._run_each, config_paths) for result in results: self._is_succeeded &= result.status.is_succeeded self._view.show_scenario_result(result) def run_concurrently( self, config_paths: Iterable[str], concurrency: int, ) -> None: with Pool(concurrency) as pool: self.run(config_paths, map_func=pool.imap) def _run_each(self, config_path: str) -> ScenarioResult: with open(config_path) as config_file: config = yaml.safe_load(config_file) scenario = self._scenario_compiler.compile(config) return run_scenario( scenario, base_url=self._base_url, retry=self._retry, ) PK!u preacher/app/cli/main.py"""Preacher CLI.""" from __future__ import annotations import argparse import logging import sys from preacher import __version__ as VERSION from preacher.presentation.logging import LoggingPresentation from .application import Application DEFAULT_URL = 'http://localhost:5042' DEFAULT_URL_DESCRIPTION = 'the sample' HANDLER = logging.StreamHandler() LOGGER = logging.getLogger(__name__) LOGGER.addHandler(HANDLER) LOGGING_LEVEL_MAP = { 'skipped': logging.DEBUG, 'success': logging.INFO, 'unstable': logging.WARN, 'failure': logging.ERROR, } def zero_or_positive_int(value: str) -> int: int_value = int(value) if int_value < 0: raise argparse.ArgumentTypeError( f"must be positive or 0, given {int_value}" ) return int_value def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( 'scenario', nargs='+', help='scenario file paths' ) parser.add_argument( '-v', '--version', action='version', version=VERSION, ) parser.add_argument( '-u', '--url', metavar='url', help='specify the base URL', default='http://localhost:5042', ) parser.add_argument( '-l', '--level', choices=LOGGING_LEVEL_MAP.keys(), help='show only above or equal to this level', default='success', ) parser.add_argument( '-r', '--retry', type=zero_or_positive_int, metavar='num', help='max retry count', default=0, ) parser.add_argument( '-c', '--scenario-concurrency', type=int, metavar='num', help='concurrency for scenarios', default=1, ) return parser.parse_args() def main() -> None: """Main.""" args = parse_args() level = LOGGING_LEVEL_MAP[args.level] HANDLER.setLevel(level) LOGGER.setLevel(level) view = LoggingPresentation(LOGGER) base_url = args.url retry = args.retry app = Application(view=view, base_url=base_url, retry=retry) scenario_paths = args.scenario scenario_concurrency = args.scenario_concurrency app.run_concurrently(scenario_paths, concurrency=scenario_concurrency) if not app.is_succeeded: sys.exit(1) PK! preacher/compilation/__init__.pyPK![preacher/compilation/case.py"""Case compilation.""" from __future__ import annotations from collections.abc import Mapping from typing import Any, Optional, Union from preacher.core.case import Case from .error import CompilationError from .request import RequestCompiler from .response_description import ResponseDescriptionCompiler from .util import run_on_key _KEY_LABEL = 'label' _KEY_REQUEST = 'request' _KEY_RESPONSE = 'response' class CaseCompiler: """ >>> from unittest.mock import MagicMock, sentinel >>> def default_request_compiler() -> RequestCompiler: ... return MagicMock( ... spec=RequestCompiler, ... compile=MagicMock(return_value=sentinel.request), ... ) >>> def default_response_compiler() -> ResponseDescriptionCompiler: ... return MagicMock( ... spec=ResponseDescriptionCompiler, ... compile=MagicMock(return_value=sentinel.response_description), ... ) When given an empty object, then generates a default case. >>> request_compiler = default_request_compiler() >>> response_compiler = default_response_compiler() >>> compiler = CaseCompiler(request_compiler, response_compiler) >>> case = compiler.compile({}) >>> case.request sentinel.request >>> case.response_description sentinel.response_description >>> request_compiler.compile.assert_called_once_with({}) >>> response_compiler.compile.assert_called_once_with({}) When given a not string label, then raises a compilation error. >>> compiler = CaseCompiler() >>> compiler.compile({'label': []}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: label When given an invalid type request, then raises a compilation error. >>> compiler = CaseCompiler() >>> compiler.compile({'request': []}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: request When a request compilation fails, then raises a compilation error. >>> request_compiler = MagicMock( ... spec=RequestCompiler, ... compile=MagicMock( ... side_effect=CompilationError(message='message', path=['foo']) ... ), ... ) >>> compiler = CaseCompiler(request_compiler) >>> compiler.compile({}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: message: request.foo When given an invalid type response description, then raises a compilation error. >>> request_compiler = default_request_compiler() >>> compiler = CaseCompiler(request_compiler) >>> compiler.compile({'response': 'str'}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: response When a response description compilation fails, then raises a compilation error. >>> request_compiler = default_request_compiler() >>> response_compiler = MagicMock( ... spec=ResponseDescriptionCompiler, ... compile=MagicMock( ... side_effect=CompilationError(message='message', path=['bar']), ... ), ... ) >>> compiler = CaseCompiler(request_compiler, response_compiler) >>> compiler.compile({'request': '/path'}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: message: response.bar Creates a case only with a request. >>> request_compiler = default_request_compiler() >>> response_compiler = default_response_compiler() >>> compiler = CaseCompiler(request_compiler, response_compiler) >>> case = compiler.compile({'request': '/path'}) >>> case.label >>> case.request sentinel.request >>> request_compiler.compile.assert_called_once_with('/path') Creates a case. >>> request_compiler = default_request_compiler() >>> response_compiler = default_response_compiler() >>> compiler = CaseCompiler(request_compiler, response_compiler) >>> case = compiler.compile({ ... 'label': 'label', ... 'request': {'path': '/path'}, ... 'response': {'key': 'value'}, ... }) >>> case.label 'label' >>> case.request sentinel.request >>> case.response_description sentinel.response_description >>> request_compiler.compile.assert_called_once_with({'path': '/path'}) >>> response_compiler.compile.assert_called_once_with({'key': 'value'}) When given invalid default request, then raises a compilation error. >>> case_compiler = CaseCompiler() >>> case_compiler.of_default({'request': []}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: request Accepts default values. >>> request_compiler = MagicMock( ... RequestCompiler, ... of_default=MagicMock(return_value=sentinel.foo), ... ) >>> response_compiler = default_response_compiler() >>> compiler = CaseCompiler(request_compiler, response_compiler) >>> default_compiler = compiler.of_default({}) >>> default_compiler.request_compiler sentinel.foo """ def __init__( self, request_compiler: Optional[RequestCompiler] = None, response_compiler: Optional[ResponseDescriptionCompiler] = None ): self._request_compiler = request_compiler or RequestCompiler() self._response_compiler = ( response_compiler or ResponseDescriptionCompiler() ) @property def request_compiler(self) -> RequestCompiler: return self._request_compiler def compile(self, obj: Mapping) -> Case: label = obj.get(_KEY_LABEL) if label is not None and not isinstance(label, str): raise CompilationError( message=f'Case.{_KEY_LABEL} must be a string', path=[_KEY_LABEL], ) request = run_on_key( _KEY_REQUEST, self._request_compiler.compile, _extract_request(obj) ) response_description = run_on_key( _KEY_RESPONSE, self._response_compiler.compile, _extract_response(obj), ) return Case( label=label, request=request, response_description=response_description, ) def of_default(self, obj: Mapping) -> CaseCompiler: request_compiler = run_on_key( _KEY_REQUEST, self._request_compiler.of_default, _extract_request(obj), ) return CaseCompiler(request_compiler=request_compiler) def _extract_request(obj: Any) -> Union[Mapping, str]: target = obj.get(_KEY_REQUEST, {}) if not isinstance(target, Mapping) and not isinstance(target, str): raise CompilationError( message='must be a string or a mapping', path=[_KEY_REQUEST], ) return target def _extract_response(obj: Any) -> Mapping: target = obj.get('response', {}) if not isinstance(target, Mapping): raise CompilationError('must be a mapping', path=[_KEY_RESPONSE]) return target PK!{v]PP preacher/compilation/datetime.py"""Datetime compilations.""" import re from datetime import timedelta from .error import CompilationError RELATIVE_DATETIME_PATTERN = re.compile( r'([+\-]?\d+)\s*(day|hour|minute|second)s?' ) def compile_timedelta(value: str) -> timedelta: normalized = value.strip().lower() if normalized == 'now': return timedelta() match = RELATIVE_DATETIME_PATTERN.match(normalized) if not match: raise CompilationError(f'Invalid datetime format: {value}') offset = int(match.group(1)) unit = match.group(2) + 's' return timedelta(**{unit: offset}) PK!7uHH#preacher/compilation/description.py"""Description compilation.""" from collections.abc import Mapping from typing import Optional from preacher.core.description import Description from .error import CompilationError from .predicate import PredicateCompiler from .extraction import ExtractionCompiler from .util import run_on_key, map_on_key _KEY_DESCRIBE = 'describe' _KEY_SHOULD = 'should' class DescriptionCompiler: def __init__( self, extraction_compiler: Optional[ExtractionCompiler] = None, predicate_compiler: Optional[PredicateCompiler] = None, ): self._extraction_compiler = extraction_compiler or ExtractionCompiler() self._predicate_compiler = predicate_compiler or PredicateCompiler() def compile(self, obj: Mapping): extraction_obj = obj.get(_KEY_DESCRIBE) if ( not isinstance(extraction_obj, Mapping) and not isinstance(extraction_obj, str) ): raise CompilationError( message='Description.describe must be a mapping or a string', path=[_KEY_DESCRIBE], ) extraction = run_on_key( _KEY_DESCRIBE, self._extraction_compiler.compile, extraction_obj ) predicate_objs = obj.get(_KEY_SHOULD, []) if not isinstance(predicate_objs, list): predicate_objs = [predicate_objs] predicates = list(map_on_key( _KEY_SHOULD, self._predicate_compiler.compile, predicate_objs, )) return Description(extraction=extraction, predicates=predicates) PK!Hwpreacher/compilation/error.py"""Compilation errors.""" from __future__ import annotations from typing import List, Optional class CompilationError(Exception): """Compilation errors.""" def __init__( self, message: str, path: List[str] = [], cause: Optional[Exception] = None, ): super().__init__(message) self._message = message self._path = path self._cause = cause @property def path(self) -> List[str]: return self._path def of_parent(self, parent_path: List[str]) -> CompilationError: return CompilationError( message=self._message, path=parent_path + self._path, cause=self._cause, ) def __str__(self): message = super().__str__() if not self._path: return message path = '.'.join(self._path) return f'{message}: {path}' PK!Q]]"preacher/compilation/extraction.py"""Extraction compilation.""" from collections.abc import Mapping from typing import Union from preacher.core.extraction import Extraction, with_jq from .error import CompilationError _EXTRACTION_MAP = { 'jq': with_jq, } _EXTRACTION_KEYS = frozenset(_EXTRACTION_MAP.keys()) class ExtractionCompiler: def compile(self, obj: Union[Mapping, str]) -> Extraction: if isinstance(obj, str): return compile({'jq': obj}) keys = _EXTRACTION_KEYS.intersection(obj.keys()) if len(keys) != 1: raise CompilationError( f'Extraction must have only 1 valid key, but has {len(keys)}' ) key = next(iter(keys)) return _EXTRACTION_MAP[key](obj[key]) def compile(obj: Union[Mapping, str]) -> Extraction: """ >>> compile({}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ... has 0 >>> compile('.foo')({'foo': 'bar'}) 'bar' >>> compile({'jq': '.foo'})({'foo': 'bar'}) 'bar' """ compiler = ExtractionCompiler() return compiler.compile(obj) PK!FY/ / preacher/compilation/matcher.py"""Matcher compilation.""" from collections.abc import Mapping from typing import Any import hamcrest from hamcrest.core.matcher import Matcher from .error import CompilationError from .util import run_on_key _STATIC_MATCHER_MAP = { # For objects. 'be_null': hamcrest.is_(hamcrest.none()), 'not_be_null': hamcrest.is_(hamcrest.not_none()), # For collections. 'be_empty': hamcrest.is_(hamcrest.empty()), } _MATCHER_FUNCTION_MAP_TAKING_SINGLE_VALUE = { # For objects. 'equal': lambda expected: hamcrest.is_(hamcrest.equal_to(expected)), 'have_length': hamcrest.has_length, # For numbers. 'be_greater_than': ( lambda value: hamcrest.is_(hamcrest.greater_than(value)) ), 'be_greater_than_or_equal_to': ( lambda value: hamcrest.is_(hamcrest.greater_than_or_equal_to(value)) ), 'be_less_than': ( lambda value: hamcrest.is_(hamcrest.less_than(value)) ), 'be_less_than_or_equal_to': ( lambda value: hamcrest.is_(hamcrest.less_than_or_equal_to(value)) ), # For strings. 'contain_string': hamcrest.contains_string, 'start_with': hamcrest.starts_with, 'end_with': hamcrest.ends_with, 'match_regexp': hamcrest.matches_regexp, } _MATCHER_FUNCTION_MAP_TAKING_SINGLE_MATCHER = { 'be': hamcrest.is_, 'not': hamcrest.not_, 'have_item': hamcrest.has_item, } def _compile_taking_single_matcher(key: str, value: Any): func = _MATCHER_FUNCTION_MAP_TAKING_SINGLE_MATCHER[key] if isinstance(value, str) or isinstance(value, Mapping): inner = run_on_key(key, compile, value) else: inner = hamcrest.equal_to(value) return func(inner) def compile(obj: Any) -> Matcher: if isinstance(obj, str): if obj in _STATIC_MATCHER_MAP: return _STATIC_MATCHER_MAP[obj] if isinstance(obj, Mapping): if len(obj) != 1: raise CompilationError( f'Must have only 1 element, but has {len(obj)}' ) key, value = next(iter(obj.items())) if key in _MATCHER_FUNCTION_MAP_TAKING_SINGLE_VALUE: return _MATCHER_FUNCTION_MAP_TAKING_SINGLE_VALUE[key](value) if key in _MATCHER_FUNCTION_MAP_TAKING_SINGLE_MATCHER: return _compile_taking_single_matcher(key, value) return hamcrest.equal_to(obj) PK!,@=22!preacher/compilation/predicate.py"""Predicate compilation.""" from collections.abc import Mapping from datetime import datetime from typing import Any, Callable from hamcrest import greater_than, less_than from hamcrest.core.matcher import Matcher from preacher.core.description import Predicate from preacher.core.datetime import now, parse_datetime from preacher.core.predicate import MatcherPredicate, DynamicMatcherPredicate from .datetime import compile_timedelta from .error import CompilationError from .matcher import compile as compile_matcher from .util import run_on_key PREDICATE_MAP = { 'be_before': lambda value: _compile_datetime_predicate('be_before', value, less_than), 'be_after': lambda value: _compile_datetime_predicate('be_after', value, greater_than), } class PredicateCompiler: def compile(self, obj: Any) -> Predicate: if isinstance(obj, Mapping): if len(obj) != 1: raise CompilationError( f'Must have only 1 element, but has {len(obj)}' ) key, value = next(iter(obj.items())) if key in PREDICATE_MAP: return PREDICATE_MAP[key](value) matcher = compile_matcher(obj) return MatcherPredicate(matcher) def _compile_datetime_predicate( key: str, obj: Any, matcher_func: Callable[[datetime], Matcher], ) -> DynamicMatcherPredicate: if not isinstance(obj, str): raise CompilationError(message=f'Must be a string', path=[key]) delta = run_on_key(key, compile_timedelta, obj) def _matcher_factory(*args: Any, **kwargs: Any) -> Matcher: origin = kwargs.get('request_datetime') or now() return matcher_func(origin + delta) return DynamicMatcherPredicate( matcher_factory=_matcher_factory, converter=parse_datetime, ) PK!K preacher/compilation/request.py"""Request compilation.""" from __future__ import annotations from collections.abc import Mapping from dataclasses import dataclass from typing import Any, Mapping as MappingType, Optional, Union from preacher.core.request import Request from .error import CompilationError from .util import or_default _KEY_PATH = 'path' _KEY_HEADERS = 'headers' _KEY_PARAMS = 'params' @dataclass(frozen=True) class _Compiled: path: Optional[str] = None headers: Optional[MappingType[str, str]] = None params: Optional[MappingType[str, Any]] = None def to_request( self: _Compiled, default_path: str = '', default_headers: Mapping = {}, default_params: Mapping = {}, ) -> Request: return Request( path=or_default(self.path, default_path), headers=or_default(self.headers, default_headers), params=or_default(self.params, default_params), ) def _compile(obj: Union[Mapping, str]) -> _Compiled: if isinstance(obj, str): return _compile({_KEY_PATH: obj}) path = obj.get(_KEY_PATH) if path is not None and not isinstance(path, str): raise CompilationError('Must be a string', path=[_KEY_PATH]) headers = obj.get(_KEY_HEADERS) if headers is not None and not isinstance(headers, Mapping): raise CompilationError('Must be a mapping', path=[_KEY_HEADERS]) params = obj.get(_KEY_PARAMS) if params is not None and not isinstance(params, Mapping): raise CompilationError('Must be a mapping', path=[_KEY_PARAMS]) return _Compiled(path=path, headers=headers, params=params) class RequestCompiler: def __init__( self, path: str = '', headers: Mapping = {}, params: Mapping = {}, ): self._path = path self._headers = headers self._params = params def compile(self, obj: Union[Mapping, str]) -> Request: compiled = _compile(obj) return compiled.to_request( default_path=self._path, default_headers=self._headers, default_params=self._params, ) def of_default(self, obj: Union[Mapping, str]) -> RequestCompiler: compiled = _compile(obj) return RequestCompiler( path=or_default(compiled.path, self._path), headers=or_default(compiled.headers, self._headers), params=or_default(compiled.params, self._params), ) PK!g  ,preacher/compilation/response_description.py"""Response description compilations.""" from collections.abc import Mapping from typing import Any, Optional, Iterator from preacher.core.description import Description from preacher.core.response_description import ResponseDescription from .error import CompilationError from .description import DescriptionCompiler from .predicate import PredicateCompiler from .util import map_on_key _KEY_STATUS_CODE = 'status_code' _KEY_HEADERS = 'headers' _KEY_BODY = 'body' class ResponseDescriptionCompiler: def __init__( self, predicate_compiler: Optional[PredicateCompiler] = None, description_compiler: Optional[DescriptionCompiler] = None, ): self._predicate_compiler = predicate_compiler or PredicateCompiler() self._description_compiler = ( description_compiler or DescriptionCompiler( predicate_compiler=self._predicate_compiler ) ) def compile(self, obj: Mapping) -> ResponseDescription: status_code_predicate_objs = obj.get(_KEY_STATUS_CODE, []) if not isinstance(status_code_predicate_objs, list): status_code_predicate_objs = [status_code_predicate_objs] status_code_predicates = list(map_on_key( key=_KEY_STATUS_CODE, func=self._predicate_compiler.compile, items=status_code_predicate_objs, )) headers_descriptions = list(self._compile_descs(_KEY_HEADERS, obj)) body_descriptions = list(self._compile_descs(_KEY_BODY, obj)) return ResponseDescription( status_code_predicates=status_code_predicates, headers_descriptions=headers_descriptions, body_descriptions=body_descriptions, ) def _compile_descs(self, key: str, obj: Any) -> Iterator[Description]: desc_objs = obj.get(key, []) if isinstance(desc_objs, Mapping): desc_objs = [desc_objs] if not isinstance(desc_objs, list): message = f'ResponseDescription.{key} must be a list or a mapping' raise CompilationError(message=message, path=[key]) return map_on_key(key=key, func=self._compile_desc, items=desc_objs) def _compile_desc(self, obj: Any) -> Description: if not isinstance(obj, Mapping): raise CompilationError('Description must be a mapping') return self._description_compiler.compile(obj) PK!6HljKK preacher/compilation/scenario.py"""Scenario compilation.""" from collections.abc import Mapping from functools import partial from typing import Any, Optional from preacher.core.scenario import Scenario from preacher.core.case import Case from .error import CompilationError from .case import CaseCompiler from .util import map_on_key, run_on_key _KEY_LABEL = 'label' _KEY_DEFAULT = 'default' _KEY_CASES = 'cases' class ScenarioCompiler: """ When given an empty object, then generates an empty scenario. >>> from unittest.mock import MagicMock, call, patch, sentinel >>> case_compiler = MagicMock(CaseCompiler) >>> compiler = ScenarioCompiler(case_compiler=case_compiler) >>> scenario = compiler.compile({}) >>> scenario.label >>> list(scenario.cases()) [] >>> case_compiler.of_default.assert_called_once_with({}) When given not an object, then raises a compilation error. >>> ScenarioCompiler().compile({'cases': ''}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: cases When given a not string label, then raises a compilation error. >>> ScenarioCompiler().compile({'label': []}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: label When given not mapping default, then raises a compilation error. >>> ScenarioCompiler().compile({'default': ''}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: default When given a not mapping case, then raises a compilation error. >>> ScenarioCompiler().compile({'cases': ['']}) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: ...: cases[0] Generates an iterator of cases. >>> default_case_compiler = MagicMock( ... CaseCompiler, ... compile=MagicMock(return_value=sentinel.case), ... ) >>> case_compiler = MagicMock( ... CaseCompiler, ... of_default=MagicMock(return_value=default_case_compiler), ... ) >>> compiler = ScenarioCompiler(case_compiler=case_compiler) >>> scenario = compiler.compile({ ... 'label': 'label', ... 'default': {'a': 'b'}, ... 'cases': [{}, {'k': 'v'}], ... }) >>> scenario.label 'label' >>> list(scenario.cases()) [sentinel.case, sentinel.case] >>> case_compiler.of_default.assert_called_once_with({'a': 'b'}) >>> default_case_compiler.compile.assert_has_calls([ ... call({}), ... call({'k': 'v'})], ... ) """ def __init__(self, case_compiler: Optional[CaseCompiler] = None): self._case_compiler = case_compiler or CaseCompiler() def compile(self, obj: Mapping) -> Scenario: label = obj.get(_KEY_LABEL) if label is not None and not isinstance(label, str): raise CompilationError( message='Must be a string', path=[_KEY_LABEL], ) default = obj.get(_KEY_DEFAULT, {}) if not isinstance(default, Mapping): raise CompilationError( message='Must be a mapping', path=[_KEY_DEFAULT], ) case_compiler = run_on_key( _KEY_DEFAULT, self._case_compiler.of_default, default, ) case_objs = obj.get(_KEY_CASES, []) if not isinstance(case_objs, list): raise CompilationError(message='Must be a list', path=[_KEY_CASES]) cases = map_on_key( _KEY_CASES, partial(_compile_case, case_compiler), case_objs, ) return Scenario(label=label, cases=list(cases)) def _compile_case(case_compiler: CaseCompiler, obj: Any) -> Case: if not isinstance(obj, Mapping): raise CompilationError(f'Case must be a mapping') return case_compiler.compile(obj) PK!O\ ??preacher/compilation/util.py"""Utilities for compilations.""" from typing import Callable, Iterable, Iterator, Optional, TypeVar from .error import CompilationError T = TypeVar('T') U = TypeVar('U') def run_on_key( key: str, func: Callable[[T], U], arg: T, ) -> U: """ >>> def succeeding_func(arg): ... return arg >>> run_on_key('key', succeeding_func, 1) 1 >>> def failing_func(arg): ... raise CompilationError(message='message', path=['path']) >>> run_on_key('key', failing_func, 1) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: message: key.path """ try: return func(arg) except CompilationError as error: raise error.of_parent([key]) def map_on_key( key: str, func: Callable[[T], U], items: Iterable[T], ) -> Iterator[U]: """ >>> def succeeding_func(arg): ... return arg >>> results = map_on_key('key', succeeding_func, [1, 2, 3]) >>> next(results) 1 >>> next(results) 2 >>> next(results) 3 >>> def failing_func(arg): ... if arg == 2: ... raise CompilationError(message='message', path=['path']) ... return arg >>> results = map_on_key('key', failing_func, [1, 2, 3]) >>> next(results) 1 >>> next(results) Traceback (most recent call last): ... preacher.compilation.error.CompilationError: message: key[1].path >>> next(results) Traceback (most recent call last): ... StopIteration """ for idx, item in enumerate(items): try: yield func(item) except CompilationError as error: raise error.of_parent([f'{key}[{idx}]']) def or_default(value: Optional[T], default_value: T) -> T: if value is None: return default_value return value PK!preacher/core/__init__.pyPK!@Opreacher/core/case.py"""Test case.""" from dataclasses import dataclass from typing import Optional from .request import Request from .response_description import ( ResponseDescription, ResponseVerification, ) from .status import Status, merge_statuses from .verification import Verification @dataclass(frozen=True) class CaseResult: status: Status request: Verification response: Optional[ResponseVerification] = None label: Optional[str] = None class Case: def __init__( self, request: Request, response_description: ResponseDescription, label: Optional[str] = None, ): self._label = label self._request = request self._response_description = response_description def __call__(self, base_url: str, retry: int = 0) -> CaseResult: if retry < 0: raise ValueError( f'Retry count must be positive or 0, given {retry}' ) for _ in range(1 + retry): result = self._run(base_url) if result.status.is_succeeded: break return result def _run(self, base_url: str) -> CaseResult: try: response = self._request(base_url) except Exception as error: return CaseResult( status=Status.FAILURE, request=Verification.of_error(error), ) request_verification = Verification.succeed() response_verification = self._response_description( status_code=response.status_code, headers=response.headers, body=response.body, request_datetime=response.request_datetime, ) status = merge_statuses( request_verification.status, response_verification.status, ) return CaseResult( status=status, request=request_verification, response=response_verification, label=self._label, ) @property def label(self) -> Optional[str]: return self._label @property def request(self) -> Request: return self._request @property def response_description(self) -> ResponseDescription: return self._response_description PK!gw datetime.datetime: return datetime.datetime.now(_system_timezone()) def parse_datetime(value: str) -> datetime.datetime: try: return aniso8601.parse_datetime(value) except ValueError: raise ValueError(f'An invalid datetime format: {value}') def _system_timezone() -> datetime.timezone: localtime = time.localtime() return datetime.timezone( offset=datetime.timedelta(seconds=localtime.tm_gmtoff), name=localtime.tm_zone, ) PK!8xF33preacher/core/description.py"""Description.""" from typing import Any, Callable, List from .status import merge_statuses from .verification import Verification Extraction = Callable[[Any], Any] Predicate = Callable class Description: def __init__(self, extraction: Extraction, predicates: List[Predicate]): self._extraction = extraction self._predicates = predicates def __call__(self, value: Any, **kwargs: Any) -> Verification: """`**kwargs` will be delegated to predicates.""" try: verified_value = self._extraction(value) except Exception as error: return Verification.of_error(error) verifications = [ predicate(verified_value, **kwargs) for predicate in self._predicates ] status = merge_statuses(v.status for v in verifications) return Verification(status, children=verifications) @property def extraction(self) -> Extraction: return self._extraction @property def predicates(self) -> List[Predicate]: return self._predicates PK!Lpreacher/core/extraction.py"""Extraction.""" from .description import Extraction from pyjq import compile as jq_compile def with_jq(query: str) -> Extraction: """ Returns a extractor of given `jq`. >>> extract = with_jq('.foo') >>> extract({'not_foo': 'bar'}) >>> extract({'foo': 'bar'}) 'bar' >>> extract({'foo': ['bar', 'baz', 1, 2]}) ['bar', 'baz', 1, 2] """ return jq_compile(query).first PK!뺀preacher/core/predicate.py"""Predicate.""" from typing import Any, Callable from hamcrest import assert_that from hamcrest.core.matcher import Matcher from .status import Status from .verification import Verification class MatcherPredicate: """Predicate of a Hamcrest matcher.""" def __init__(self, matcher: Matcher): self._matcher = matcher def __call__(self, actual: Any, **kwargs: Any) -> Verification: try: assert_that(actual, self._matcher) except AssertionError as error: message = str(error).strip() return Verification(status=Status.UNSTABLE, message=message) except Exception as error: return Verification.of_error(error) return Verification.succeed() class DynamicMatcherPredicate: """Predicate of a dynamic Hamcrest matcher and conversion.""" def __init__( self, matcher_factory: Callable, converter: Callable[[Any], Any] = lambda x: x, ): self._matcher_factory = matcher_factory self._converter = converter def __call__(self, actual: Any, **kwargs: Any) -> Verification: try: matcher = self._matcher_factory(**kwargs) predicated_value = self._converter(actual) except Exception as error: return Verification.of_error(error) predicate = MatcherPredicate(matcher) return predicate(predicated_value, **kwargs) PK!~44preacher/core/request.py"""Request.""" from copy import copy from dataclasses import dataclass from datetime import datetime from typing import Any, Mapping import requests from .datetime import now from preacher import __version__ as _VERSION _DEFAULT_HEADERS = {'User-Agent': f'Preacher {_VERSION}'} @dataclass(frozen=True) class Response: status_code: int headers: Mapping[str, str] body: str request_datetime: datetime class Request: def __init__( self, path: str = '', headers: Mapping[str, str] = {}, params: Mapping[str, Any] = {}, ): self._path = path self._headers = headers self._params = params def __call__(self, base_url: str) -> Response: headers = copy(_DEFAULT_HEADERS) headers.update(self._headers) request_datetime = now() res = requests.get( base_url + self._path, headers=headers, params=self._params, ) return Response( status_code=res.status_code, headers={ # Convert to the normal dictionary to adapt jq. # Names are converted to lower case to normalize. name.lower(): value for (name, value) in res.headers.items() }, body=res.text, request_datetime=request_datetime ) @property def path(self) -> str: return self._path @property def headers(self) -> Mapping: return self._headers @property def params(self) -> Mapping: return self._params PK!u G G %preacher/core/response_description.py"""Response descriptions.""" import json from dataclasses import dataclass from typing import Any, List, Mapping from .description import Description, Predicate from .status import Status, merge_statuses from .verification import Verification @dataclass(frozen=True) class ResponseVerification: status: Status status_code: Verification headers: Verification body: Verification class ResponseDescription: def __init__( self, status_code_predicates: List[Predicate] = [], headers_descriptions: List[Description] = [], body_descriptions: List[Description] = [], ): self._status_code_predicates = status_code_predicates self._headers_descriptions = headers_descriptions self._body_descriptions = body_descriptions def __call__( self, status_code: int, headers: Mapping[str, str], body: str, **kwargs: Any, ) -> ResponseVerification: """`**kwargs` will be delegated to descriptions.""" status_code_verification = self._verify_status_code( status_code, **kwargs, ) try: headers_verification = self._verify_headers(headers, **kwargs) except Exception as error: headers_verification = Verification.of_error(error) try: body_verification = self._verify_body(body, **kwargs) except Exception as error: body_verification = Verification.of_error(error) status = merge_statuses( status_code_verification.status, headers_verification.status, body_verification.status, ) return ResponseVerification( status=status, status_code=status_code_verification, headers=headers_verification, body=body_verification, ) @property def status_code_predicates(self) -> List[Predicate]: return self._status_code_predicates @property def headers_descriptions(self) -> List[Description]: return self._headers_descriptions @property def body_descriptions(self) -> List[Description]: return self._body_descriptions def _verify_status_code( self, code: int, **kwargs: Any, ) -> Verification: children = [ predicate(code, **kwargs) for predicate in self._status_code_predicates ] status = merge_statuses(v.status for v in children) return Verification(status=status, children=children) def _verify_headers( self, header: Mapping[str, str], **kwargs: Any, ) -> Verification: verifications = [ describe(header, **kwargs) for describe in self._headers_descriptions ] status = merge_statuses(v.status for v in verifications) return Verification(status=status, children=verifications) def _verify_body(self, body: str, **kwargs: Any) -> Verification: if not self._body_descriptions: return Verification.skipped() data = json.loads(body) verifications = [ describe(data, **kwargs) for describe in self._body_descriptions ] status = merge_statuses(v.status for v in verifications) return Verification(status=status, children=verifications) PK!A!preacher/core/scenario.py"""Scenario""" from typing import Iterator, List, Optional from .case import Case class Scenario: """ When given no cases, then skips. >>> scenario = Scenario() >>> scenario.label >>> list(scenario.cases()) [] When given a label, then returns it. >>> scenario = Scenario(label='label') >>> scenario.label 'label' When given cases, then iterates them. >>> from unittest.mock import sentinel >>> scenario = Scenario(cases=[sentinel.case1, sentinel.case2]) >>> cases = scenario.cases() >>> next(cases) sentinel.case1 >>> next(cases) sentinel.case2 """ def __init__(self, label: Optional[str] = None, cases: List[Case] = []): self._label = label self._cases = cases @property def label(self) -> Optional[str]: return self._label def cases(self) -> Iterator[Case]: return iter(self._cases) PK!NI!preacher/core/scenario_running.py"""Scenario running helpers.""" from dataclasses import dataclass from typing import List, Optional from .case import CaseResult from .scenario import Scenario from .status import Status, merge_statuses @dataclass(frozen=True) class ScenarioResult: label: Optional[str] status: Status case_results: List[CaseResult] def run_scenario( scenario: Scenario, base_url: str, retry: int = 0, ) -> ScenarioResult: case_results = [ case(base_url=base_url, retry=retry) for case in scenario.cases() ] status = merge_statuses(result.status for result in case_results) return ScenarioResult( label=scenario.label, status=status, case_results=case_results, ) PK!n preacher/core/status.py"""Status.""" from __future__ import annotations from collections.abc import Iterable from enum import Enum from functools import reduce, singledispatch class Status(Enum): """ >>> Status.SKIPPED.is_succeeded True >>> Status.SKIPPED.merge(Status.SUCCESS) SUCCESS >>> Status.SKIPPED.merge(Status.UNSTABLE) UNSTABLE >>> Status.SKIPPED.merge(Status.FAILURE) FAILURE >>> Status.SUCCESS.is_succeeded True >>> Status.SUCCESS.merge(Status.SKIPPED) SUCCESS >>> Status.SUCCESS.merge(Status.SUCCESS) SUCCESS >>> Status.SUCCESS.merge(Status.UNSTABLE) UNSTABLE >>> Status.SUCCESS.merge(Status.FAILURE) FAILURE >>> Status.UNSTABLE.is_succeeded False >>> Status.UNSTABLE.merge(Status.SKIPPED) UNSTABLE >>> Status.UNSTABLE.merge(Status.SUCCESS) UNSTABLE >>> Status.UNSTABLE.merge(Status.UNSTABLE) UNSTABLE >>> Status.UNSTABLE.merge(Status.FAILURE) FAILURE >>> Status.FAILURE.is_succeeded False >>> Status.FAILURE.merge(Status.SKIPPED) FAILURE >>> Status.FAILURE.merge(Status.SUCCESS) FAILURE >>> Status.FAILURE.merge(Status.UNSTABLE) FAILURE >>> Status.FAILURE.merge(Status.FAILURE) FAILURE """ # Numbers stand for the priorities for merging. SKIPPED = 0 SUCCESS = 1 UNSTABLE = 2 FAILURE = 3 @property def is_succeeded(self): return self.value <= Status.SUCCESS.value def merge(self, other: Status): return max(self, other, key=lambda status: status.value) def __str__(self) -> str: return self.name def __repr__(self) -> str: return str(self) @singledispatch def merge_statuses(*args) -> Status: """ >>> merge_statuses(1) Traceback (most recent call last): ... ValueError: (1,) For varargs. >>> merge_statuses(Status.UNSTABLE) UNSTABLE >>> merge_statuses(Status.SUCCESS, Status.FAILURE, Status.UNSTABLE) FAILURE For iterables. >>> merge_statuses([]) SKIPPED >>> merge_statuses([Status.SUCCESS, Status.UNSTABLE, Status.FAILURE]) FAILURE """ raise ValueError(str(args)) @merge_statuses.register def _merge_statuses_for_varargs(*statuses: Status): return merge_statuses(statuses) @merge_statuses.register def _merge_statuses_for_iterable(statuses: Iterable): return reduce(lambda lhs, rhs: lhs.merge(rhs), statuses, Status.SKIPPED) PK!Dkpreacher/core/verification.py"""Verification.""" from __future__ import annotations from dataclasses import dataclass from typing import Collection, Optional from .status import Status @dataclass(frozen=True) class Verification: status: Status message: Optional[str] = None children: Collection[Verification] = tuple() @staticmethod def skipped() -> Verification: return Verification(status=Status.SKIPPED) @staticmethod def succeed() -> Verification: return Verification(status=Status.SUCCESS) @staticmethod def of_error(error: Exception) -> Verification: return Verification( status=Status.FAILURE, message=f'{error.__class__.__name__}: {error}', ) PK!!preacher/presentation/__init__.pyPK!  preacher/presentation/logging.py"""Logging presentation.""" import contextlib import logging import io from typing import Iterator from preacher.core.case import CaseResult from preacher.core.response_description import ResponseVerification from preacher.core.scenario_running import ScenarioResult from preacher.core.status import Status from preacher.core.verification import Verification _LEVEL_MAP = { Status.SKIPPED: logging.DEBUG, Status.SUCCESS: logging.INFO, Status.UNSTABLE: logging.WARN, Status.FAILURE: logging.ERROR, } class LoggingPresentation: def __init__(self, logger: logging.Logger): self._logger = logger self._indent = '' def show_scenario_result(self, scenario_result: ScenarioResult) -> None: status = scenario_result.status level = _LEVEL_MAP[status] label = scenario_result.label or 'Not labeled scenario' self._log(level, '%s: %s', label, status) with self._nested(): for case_result in scenario_result.case_results: self.show_case_result(case_result) self._log(level, '') def show_case_result(self, case_result: CaseResult) -> None: status = case_result.status level = _LEVEL_MAP[status] label = case_result.label or 'Not labeled case' self._log(level, '%s: %s', label, status) with self._nested(): self.show_verification( verification=case_result.request, label='Request', ) response = case_result.response if response: self.show_response_verification(response) def show_response_verification( self, verification: ResponseVerification, label: str = 'Response', ) -> None: status = verification.status level = _LEVEL_MAP[status] self._log(level, f'%s: %s', label, status) with self._nested(): self.show_verification( verification=verification.status_code, label='Status Code', ) self.show_verification( verification=verification.headers, label='Headers', child_label='Description', ) self.show_verification( verification=verification.body, label='Body', child_label='Description', ) def show_verification( self, verification: Verification, label: str, child_label: str = 'Predicate', ) -> None: status = verification.status level = _LEVEL_MAP[status] self._log(level, f'%s: %s', label, status) message = verification.message if message: with self._nested(): self._multi_line_message(level, message) with self._nested(): for idx, child in enumerate(verification.children): self.show_verification(child, f'{child_label} {idx + 1}') def _log(self, level: int, message: str, *args) -> None: self._logger.log(level, self._indent + message, *args) def _multi_line_message(self, level: int, message: str) -> None: for line in io.StringIO(message): self._log(level, line.rstrip()) @contextlib.contextmanager def _nested(self) -> Iterator[None]: original = self._indent self._indent += '..' yield self._indent = original PK!HD+`3;)preacher-0.9.8.dist-info/entry_points.txtN+I/N.,()*(JMLH-Mɴq zyV PK!T-- preacher-0.9.8.dist-info/LICENSEMIT License Copyright (c) 2019 Yu Mochizuki Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUpreacher-0.9.8.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HE >!preacher-0.9.8.dist-info/METADATAY[s6~ǯdrHYr&'x&Mv/"DB$P6s&Qi"8-,fyq2Qt/;VV(tk'3)TM@5_te"j!r*TJy^(M!tixET@39u*d@-Pe_3v?$E$WއHOfg2gJ:]4OA{77z~I y~uOӬ,+_cgbfqy}M_Lٯ}K ̧4 *ΡRLגm3E`Z2JLi06S:#HlѺ$ݡ[}so]Σ&'@Equ) U#FJ&]d:ƕ)jKW3rfQڧe5TT%\@qmyI&3|B>הINe!iV7 / tJ0(ҙB㹨PyJR DiΙ,3NҼuWh3. 6E {U 2ob9L=W8U`,a܍Y} dB/g3b s{^ +:EԒ.ieW|Ê*筻5Q'U.&Kji;nc:Q1k1߰:ׁm䴷i@gELq.{8bZ7B VmJ(]ѵZONKk7CRTrZ(ɂ`)`[LTk@?`83yPv4P  VUA z(W,e v/w,bΛr(+r%bwyΕ2} ۸ 1xEw฾ ͸O|peC߉yNnlg-{8BR#˚= ~'𮠡9ٍ~c7p\>` og0ܠ1Nd& 3?#C)#>cqvlB.IŰkA&ks;0a E ;REQ=C zm}'6LwX =gΧO7L~ zp}>ۇdYCak!R@kMj(lOqW#_#}.,lĩLˢ.1k 4/CۛUn)+r-k T$ %h$)Am9Y:j=8Y3.4ʖYuw6GM 28*,tm& ޠ+CI_hv̾ &-CA50cE'& Ah͉1ḆTLm*{JI( ~B+e2͖z>[V >Cb瞦@.d㣎<,6(r"BgX:/#9?'ߺ'3P8E?/ =꠱ր))`DRi`Toj#D!Cs _,.y,W܄|£VosUz bXsc]0~"_yۼ֦u[PɩYJ+Ag XD!,_f@` }$޵( p[hŹaMU[ONQEPBygxa [>ÓGM(J?WE@q,~V|a/?.yS:yY,12ZYbɼݞ"xui7He5 Q)'؜z[rB.XPyq~dtݒ ^)So %OiMD&2?زI9R,*5+zL9/RotjpBKDY'?0}G #շ}MVǸ;ṲAVq]͐cWRs%E.2I_9AoDr +]wy}<&57&Nj, =z2F뵟Yȳ eWU7}Y ߣpbx|+/[}iiʹ#!T9G)(il aPK!!preacher-0.9.8.dist-info/METADATAPK!H spreacher-0.9.8.dist-info/RECORDPK$$ y