PK!>>transformer/.urlignore_example360yield abmr.net addthis.com adfarm adform.net adition.com adkontekst.pl admatic.com admeira.ch adnxs.com adsrvr.org akstat atdmt.com bidswitch.net bing bluekai.com calotag.com casalemedia.com cloudfront.net connectad.io converge-digital.com creativefactory.zalando crwdcntrl.net demdex.net doubleclick.net effitarget.com email-reflex.com exelator.com experianmarketingservices.digital facebook google himediads.com ibillboard.com intelliad.de keyxel.com krxd.net liadm.com lkqd.net mathtag.com mediabong.com metrigo.com metrigo.zalan.do mookie1.com mpulse nscontext.eu nuggad opecloud.com openx.net pixel pubmatic.com pubmine.com reflex rtb-seller.com sara.media semasio.net sharethis.com socialaudience.nl static-img tapad.com usabilla vdopia.com videoplaza.tv vimeo.com vimeocdn.com vmg.host yieldlab.net ztat.net admatic.com PK!)Ԧtransformer/__init__.pyimport pkg_resources from .transform import dumps, dump __version__ = pkg_resources.get_distribution("har-transformer").version __all__ = ["dumps", "dump"] PK!WWtransformer/__main__.pyfrom .cli import script_entrypoint if __name__ == "__main__": script_entrypoint() PK!"nwwtransformer/blacklist.pyimport os import logging def on_blacklist(url): """ Checks for matching URLs in an ignore file (blacklist) from user's current directory. """ blacklist_file = f"{os.getcwd()}/.urlignore" try: with open(blacklist_file) as file: blacklist = [line.rstrip("\n") for line in file if len(line) > 1] for blacklist_item in blacklist: if blacklist_item in url: return True return False except OSError as err: logging.debug( "Could not read blacklist file %s. Reason: %s", blacklist_file, err ) return False PK!'BB transformer/builders_decision.pyfrom hypothesis.strategies import text, builds, booleans, just from .decision import Decision reasons = text(max_size=2) yes_decisions = builds(Decision, valid=just(True), reason=reasons) no_decisions = builds(Decision, valid=just(False), reason=reasons) decisions = builds(Decision, valid=booleans(), reason=reasons) PK!?8transformer/builders_python.py""" Hypothesis builders for property-based testing of the transformer.python module. """ import re import string from typing import Optional from hypothesis.searchstrategy import SearchStrategy from hypothesis.strategies import ( integers, text, lists, builds, deferred, one_of, recursive, just, none, booleans, floats, tuples, dictionaries, ) from transformer import python as py # Strategy for indentation levels we want to test with (just "no indentation" or # "one-level indentation" because "two-level indentation" will likely be the # same, and we don't want the tests running for too long). indent_levels = integers(min_value=0, max_value=1) def ascii_text(min_size: int = 0, max_size: Optional[int] = 5) -> SearchStrategy[str]: """ Strategy for ASCII strings, with a default max_size to avoid wasting time generating too much. """ return text(string.printable, min_size=min_size, max_size=max_size) _ascii_inline = re.sub(r"[\r\n\v\f]", "", string.printable) def ascii_inline_text( min_size: int = 0, max_size: Optional[int] = 5 ) -> SearchStrategy[str]: """Similar to ascii_text, but does not generate multiline strings.""" return text(_ascii_inline, min_size=min_size, max_size=max_size) # Strategy for identifiers, i.e. strings that can be used as symbols (function # names, etc.) in Python programs. # Unqualified identifiers cannot contain ".", qualified identifiers can # (but only between unqualified identifiers, i.e. not at the beginning or end). unqualified_identifiers = text(string.ascii_letters, min_size=1, max_size=5) qualified_identifiers = lists(unqualified_identifiers, min_size=2, max_size=3).map( ".".join ) identifiers = unqualified_identifiers | qualified_identifiers # Strategy for python.Line objects. lines = builds(py.Line, ascii_inline_text(), indent_levels) # Strategy for lists of strings to be used as comment text in tests. comments = lists(ascii_inline_text(), max_size=3) # Strategy for python.Statement objects. Basically, if you ask for a Statement, # you get an instance of one of Statement's subclasses. # All Statement subclasses should be mentioned here. # We need deferred to break the cyclic dependency between builds() that depend statements. statements = deferred( lambda: one_of( opaque_blocks, functions, decorations, classes, standalones, assignments, ifelses, imports, ) ) _atomic_blocks = text(string.ascii_letters + string.punctuation, min_size=1, max_size=3) _complex_blocks = recursive( _atomic_blocks, lambda b: text(string.whitespace, min_size=1, max_size=2).flatmap( lambda ws: tuples(b, b).map(ws.join) ), max_leaves=8, ) # Strategy for python.OpaqueBlock. We don't want whitespace-only comments # (should be ignored by the syntax tree, which requires boilerplate in tests) # so we build the text by joining non-whitespace strings together. # Since this strategy is the default in the statements strategy, which is widely # used in tests, it should be as fast as possible. opaque_blocks = builds(py.OpaqueBlock, block=_complex_blocks, comments=comments) # Strategy for python.Function objects. functions = builds( py.Function, name=unqualified_identifiers, params=lists(unqualified_identifiers, max_size=2), statements=lists(statements, max_size=2), comments=comments, ) # Strategy for python.Class objects. classes = builds( py.Class, name=unqualified_identifiers, statements=lists(statements, max_size=2), superclasses=lists(identifiers, max_size=2), comments=comments, ) # Strategy for python.Decoration objects. decorations = builds( py.Decoration, decorator=identifiers, target=one_of(functions, classes), comments=comments, ) # Strategy for python.Expression objects. Basically, if you ask for an Expression, # you get an instance of one of Expression's subclasses. # All Expression subclasses should be mentioned here. # Uses deferred for the same reason as Statement. expressions = deferred(lambda: one_of(symbols, literals, function_calls, binary_ops)) # Strategy for python.Standalone objects. standalones = builds(py.Standalone, expr=expressions, comments=comments) # Strategy for python.FString objects. The first examples are supposed to # trigger interpolation behavior to show that it doesn't happen with FString. fstrings = builds( py.FString, one_of(just(""), just("{a}"), text(min_size=1, max_size=5)) ) # Strategy for python.Literal objects. # The recursion doesn't use the set data type because python.Statement and # python.Expression objects are not hashable (because they are not immutable); # this is also why we use unqualified_identifiers as dictionary keys. literals = recursive( one_of(none(), booleans(), integers(), floats(), text(max_size=5)).map(py.Literal) | fstrings, lambda x: one_of( lists(x, max_size=2), tuples(x), dictionaries(unqualified_identifiers, x, max_size=2), ).map(py.Literal), max_leaves=8, ) # Strategy for python.Symbol objects. symbols = builds(py.Symbol, identifiers) # Strategy for python.FunctionCall objects. The size of argument collections is # limited for performance reasons and because handling 3 arguments is (hopefully) # the same as handling 2 arguments. function_calls = builds( py.FunctionCall, name=identifiers, positional_args=lists(expressions, max_size=2), named_args=dictionaries(unqualified_identifiers, expressions, max_size=2), ) # Strategy for reasonable operator names: "+", "++", "in", etc. operators = text(string.ascii_letters + string.punctuation, min_size=1, max_size=2) # Strategy for python.BinaryOp. binary_ops = builds(py.BinaryOp, lhs=expressions, op=operators, rhs=expressions) # Strategy for python.Assignment. assignments = builds(py.Assignment, lhs=identifiers, rhs=expressions, comments=comments) # Strategy for python.IfElse. The size of Statement sub-lists is limited for # performance reasons and because handling 3 statements is (hopefully) the same # as handling 2 statements. ifelses = builds( py.IfElse, condition_blocks=lists( tuples(expressions, lists(statements, max_size=2)), min_size=1, max_size=3 ), else_block=one_of(none(), lists(statements, max_size=2)), comments=comments, ) # Strategy for python.Import without an alias part. multi_imports = builds( py.Import, targets=lists(identifiers, min_size=1, max_size=2), source=one_of(none(), identifiers), ) # Strategy for python.Import with an alias part. aliased_imports = builds( py.Import, targets=tuples(identifiers), source=one_of(none(), identifiers), alias=unqualified_identifiers, ) # Strategy for python.Import. imports = multi_imports | aliased_imports PK!6W W transformer/cli.py""" Transformer: Convert web browser sessions (HAR files) into Locust load testing scenarios (locustfiles). Usage: transformer [-p ]... [...] transformer --help transformer --version Options: --help Print this help message and exit. -p, --plugin= Use the specified plugin. Repeatable. --version Print version information and exit. Documentation & code: https://github.com/zalando-incubator/transformer """ import logging import sys from pathlib import Path from typing import Sequence, cast, Tuple import ecological from docopt import docopt from transformer import __version__, dump class Config(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str, ...] = () def read_config(cli_args: Sequence[str]) -> Config: """ Combine command-line arguments & options (managed by docopt) with environment variables (managed by Ecological) into Ecological's Config class. Special cases: - If input paths are provided both from the environment and the command-line, only the paths provided from the command-line are taken into account. - If plugins are provided both from the environment and the command-line, the union of both groups is taken into account. """ arguments = docopt(__doc__, version=__version__, argv=cli_args) # TODO: remove this redundancy once Ecological can re-read the environment # at run-time while still having a compile-time definition (Config). # See https://github.com/jmcs/ecological/issues/20. class conf(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str] = () paths = arguments[""] if paths: if conf.input_paths: logging.warning("TRANSFORMER_INPUT_PATHS overwritten with CLI arguments") conf.input_paths = paths conf.input_paths = tuple(Path(p) for p in conf.input_paths) plugins = arguments["--plugin"] if plugins: if conf.plugins: logging.warning("TRANSFORMER_PLUGINS merged with CLI -p/--plugin options") conf.plugins = (*conf.plugins, *plugins) return cast(Config, conf) def script_entrypoint() -> None: """ Entrypoint for the "transformer" program (which reads arguments from the command-line and the environment). This is an alternative to using directly Scenario.from_path and locust.locustfile as a library API in another Python program. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s\t%(levelname)s\t%(message)s" ) config = read_config(cli_args=sys.argv[1:]) if not config.input_paths: logging.error("No input paths provided in environment nor command-line!") logging.info("Did you mean to provide env TRANSFORMER_INPUT_PATHS=[...]?") logging.info("Otherwise, here is the command-line manual:") print(__doc__, file=sys.stderr) exit(1) try: dump(file=sys.stdout, scenario_paths=config.input_paths, plugins=config.plugins) except ImportError as err: logging.error(f"Failed loading plugins: {err}") exit(2) except Exception: url = "https://github.com/zalando-incubator/Transformer/issues" logging.exception(f"Please help us fix this error by reporting it! {url}") exit(3) PK!T/  transformer/decision.pyfrom typing import NamedTuple, Union, Iterable, Optional class Decision(NamedTuple): valid: bool reason: str def __bool__(self) -> bool: return self.valid def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) and self.valid == o.valid @classmethod def yes(cls, reason: str = "ok") -> "Decision": return Decision(valid=True, reason=reason) @classmethod def no(cls, reason: str) -> "Decision": return Decision(valid=False, reason=reason) @classmethod def whether(cls, cond: Union[bool, "Decision"], reason: str) -> "Decision": if isinstance(cond, Decision): if cond: return cond return Decision.no(f"{reason}: {cond.reason}") return Decision.yes(reason) if cond else Decision.no(reason) @classmethod def all(cls, decisions: Iterable["Decision"]) -> "Decision": for d in decisions: if not d: return d return Decision.yes() @classmethod def any( cls, decisions: Iterable["Decision"], reason: Optional[str] = None ) -> "Decision": recorded_decisions = [] nb_bad_cases = 0 BAD_CASES_THRESHOLD = 5 for d in decisions: if d: return Decision.yes(f"{reason}: {d.reason}") if reason else d if nb_bad_cases <= BAD_CASES_THRESHOLD: recorded_decisions.append(d) nb_bad_cases += 1 if nb_bad_cases <= BAD_CASES_THRESHOLD: cases = str([d.reason for d in recorded_decisions]) else: cases = f"{nb_bad_cases} invalid cases" msg = f"no valid case: {cases}" if reason: msg = f"{reason}: {msg}" return Decision.no(msg) PK!p=transformer/helpers.pyimport json from typing import Iterable def zip_kv_pairs(pairs: Iterable) -> dict: return {pair.name: pair.value for pair in pairs} """ Use this with caution, as it is global and mutable! See also DUMMY_HAR_STRING. """ _DUMMY_HAR_DICT = { "log": { "entries": [ { "startedDateTime": "2018-01-01", "request": {"method": "GET", "url": "https://www.zalando.de"}, } ] } } DUMMY_HAR_STRING = json.dumps(_DUMMY_HAR_DICT) PK!м{uutransformer/locust.pyimport enum import warnings from typing import Sequence, List, Union, Iterator import transformer.plugins as plug import transformer.python as py from transformer.plugins.contracts import Plugin from transformer.scenario import Scenario from transformer.task import Task, Task2 LOCUST_MAX_WAIT_DELAY = 10 LOCUST_MIN_WAIT_DELAY = 0 LOCUSTFILE_COMMENT = """ File automatically generated by Transformer: https://github.bus.zalan.do/TIP/transformer """.strip() def _locust_task(task: Union[Task, Task2]) -> py.Function: """ Transforms a Task into the Python code expected by Locust. This function is private because it does not return a complete Locust task (the @task decorator is missing) and should therefore not be used for that purpose by unsuspecting users. """ if isinstance(task, Task): # TODO: remove when Task2 has replaced Task. # See https://github.com/zalando-incubator/Transformer/issues/11. task = Task2.from_task(task) return py.Function(name=task.name, params=["self"], statements=task.statements) class TaskSetType(enum.Enum): Set = "TaskSet" Sequence = "TaskSequence" def locust_taskset(scenario: Scenario) -> py.Class: """ Transforms a scenario (potentially containing other scenarios) into a Locust TaskSet definition. """ if any(isinstance(child, Task) for child in scenario.children): ts_type = TaskSetType.Sequence else: ts_type = TaskSetType.Set fields: List[py.Statement] = [] for i, child in enumerate(scenario.children, start=1): seq_decorator = f"seq_task({i})" if isinstance(child, (Task2, Task)): fields.append(py.Decoration(seq_decorator, _locust_task(child))) elif isinstance(child, Scenario): field = py.Decoration(f"task({child.weight})", locust_taskset(child)) if ts_type is TaskSetType.Sequence: field = py.Decoration(seq_decorator, field) fields.append(field) else: wrong_type = child.__class__.__qualname__ scenario_type = scenario.__class__.__qualname__ raise TypeError( f"unexpected type {wrong_type} in {scenario_type}.children: {child!r}" ) return py.Class(scenario.name, superclasses=[str(ts_type.value)], statements=fields) def locust_classes(scenarios: Sequence[Scenario]) -> List[py.Class]: """ Transforms scenarios into all Python classes needed by Locust (TaskSet and Locust classes). The only missing parts before a fully functional locustfile are: - integrating all necessary set-up/tear-down statements: - Python imports, - apply global plugins, - etc. - serializing everything via transformer.python. """ classes = [] for scenario in scenarios: taskset = locust_taskset(scenario) locust_class = py.Class( name=f"LocustFor{taskset.name}", superclasses=["HttpLocust"], statements=[ py.Assignment("task_set", py.Symbol(taskset.name)), py.Assignment("weight", py.Literal(scenario.weight)), py.Assignment("min_wait", py.Literal(LOCUST_MIN_WAIT_DELAY)), py.Assignment("max_wait", py.Literal(LOCUST_MAX_WAIT_DELAY)), ], ) classes.append(taskset) classes.append(locust_class) return classes def locust_program(scenarios: Sequence[Scenario]) -> py.Program: """ Converts a ScenarioGroup into a Locust File. """ global_code_blocks = { # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. block_name: py.OpaqueBlock("\n".join(block), comments=[block_name]) for scenario in scenarios for block_name, block in scenario.global_code_blocks.items() } return [ py.Import(["re"], comments=[LOCUSTFILE_COMMENT]), py.Import( ["HttpLocust", "TaskSequence", "TaskSet", "seq_task", "task"], source="locust", ), *locust_classes(scenarios), *global_code_blocks.values(), ] def locustfile_lines( scenarios: Sequence[Scenario], program_plugins: Sequence[Plugin] ) -> Iterator[str]: """ Converts the provided scenarios into a stream of Python statements and iterate on the resulting lines. """ program = plug.apply(program_plugins, locust_program(scenarios)) for stmt in program: for line in stmt.lines(): yield str(line) def locustfile(scenarios: Sequence[Scenario]) -> str: """ Simple wrapper around locustfile_lines joining all lines with "\n". This function is deprecated and will be removed in a future version. Do not rely on it. Reason: It does not provide significant value over locustfile_lines and has a less clear name and a less flexible API. It does not support new generation plugins contracts like OnPythonProgram. Deprecated since: v1.0.2. """ warnings.warn(DeprecationWarning("locustfile: use locustfile_lines instead")) return "\n".join(locustfile_lines(scenarios, ())) PK!@y3transformer/naming.pyimport re import zlib DIGIT_RX = re.compile(r"[0-9]") ENDS_WITH_ADLER32 = re.compile(r"_[0-9]+\Z") def to_identifier(string: str) -> str: """ Replace everything except letters, digits and underscore with underscores, allowing the resulting name to be used as identifier in a Python program. A checksum is added at the end (to avoid collisions) if at least one replacement is made, or if the input already ends like a checksum (otherwise, for any input X, we have: to_identifier(X) == to_identifier(to_identifier(X)) i.e. a collision). """ safe_name = re.sub(r"[^_a-z0-9]", "_", string, flags=re.IGNORECASE) if DIGIT_RX.match(safe_name): safe_name = f"_{safe_name}" if safe_name == string and not ENDS_WITH_ADLER32.search(string): return string unique_suffix: int = zlib.adler32(string.encode()) return f"{safe_name}_{unique_suffix}" PK!transformer/plugins/__init__.pyfrom .resolve import resolve from .contracts import plugin, Contract, apply, group_by_contract __all__ = ["resolve", "plugin", "Contract", "apply", "group_by_contract"] PK!߿ transformer/plugins/contracts.py""" This module defines the various contracts, i.e. types of plugins supported by Transformer. The term "contract" indicates that these types constrain what plugin implementors can do in Transformer. Transformer plugins are just functions that accept certain inputs and have certain outputs. Different types of plugins have different input and output types. Not all types of plugins can be applied at the same point in Transformer's pipeline (e.g. python.Program objects are built much later than Task objects), hence the multiplicity of contracts. # Plugin contracts ## OnTask Kind of "stateless" plugins that operate independently on each task. When implementing one, imagine their execution could be parallelized by Transformer in the future. Example: a plugin that injects a header in all requests. ## OnScenario Kind of plugins that operate on scenarios. Each scenario is the root of a tree composed of smaller scenarios and tasks (the leaves of this tree). Therefore, in an OnScenario plugin, you have the possibility of inspecting the subtree and making decisions based on that. However, OnScenario plugins will be applied to all scenarios by Transformer, so you don't need to recursively apply the plugin yourself on all subtrees. If you do that, the plugin will be applied many times more than necessary. Example: a plugin that keeps track of how long each scenario runs. ## OnPythonProgram Kind of plugins that operate on the whole syntax tree. The input and output of this kind of plugins is the complete, final locustfile generated by Transformer, represented as a syntax tree. OnPythonProgram plugins therefore have the most freedom compared to other plugin kinds, because they can change anything. Their downside is that manipulating the syntax tree is more complex than the scenario tree or individual tasks. Example: a plugin that injects some code in the global scope. # Other contracts This module also defines: ## Plugin Any supported contract of Transformer plugin. """ import enum from collections import defaultdict from typing import Callable, NewType, Iterable, TypeVar, List, DefaultDict class Contract(enum.Flag): """ Enumeration of all supported plugin contracts. Each contract defines a way for plugins to be used in Transformer. Any function may become a Transformer plugin by announcing that it implements at least one contract, using the @plugin decorator. """ OnTask = enum.auto() OnScenario = enum.auto() OnPythonProgram = enum.auto() # Historically Transformer has only one plugin contract, which transformed a # sequence of Task objects into another such sequence. Operating on a full list # of tasks (instead of task by task) offered more leeway: a plugin could e.g. # add a new task, or change only the first task. # However this OnTaskSequence model is too constraining for some use-cases, # e.g. when a plugin needs to inject code in the global scope, and having to # deal with a full, immutable list of tasks in plugins that independently # operate on each task implies a lot of verbosity and redundancy. # For these reasons, other plugin contracts were created to offer a more # varied choice for plugin implementers. # See https://github.com/zalando-incubator/Transformer/issues/10. OnTaskSequence = enum.auto() Plugin = NewType("Plugin", callable) class InvalidContractError(ValueError): """ Raised for plugin functions associated with invalid contracts. What an "invalid contract" represents is not strictly specified, but this includes at least objects that are not members of the Contract enumeration. """ class InvalidPluginError(ValueError): """ Raised when trying to use as plugin a function that has not been marked as such. """ def plugin(c: Contract) -> Callable[[callable], callable]: """ Function decorator. Use it to associate a function to a Contract, making it a Transformer plugin that will be detected by resolve(). :param c: the contract to associate to the decorated function. :raise InvalidContractError: if c is not a valid contract. """ if not isinstance(c, Contract): suggestions = (f"@plugin(Contract.{x.name})" for x in Contract) raise InvalidContractError( f"{c!r} is not a {Contract.__qualname__}. " f"Did you mean {', '.join(suggestions)}?" ) def _decorate(f: callable) -> callable: f._transformer_plugin_contract = c return f return _decorate def contract(f: Plugin) -> Contract: """ Returns the contract associated to a plugin function. :raise InvalidPluginError: if f is not a plugin. """ try: return getattr(f, "_transformer_plugin_contract") except AttributeError: raise InvalidPluginError(f) from None _T = TypeVar("_T") def apply(plugins: Iterable[Plugin], init: _T) -> _T: """ Applies each plugin to init in order, and returns the result. This just wraps a very simple but common operation. """ for p in plugins: init = p(init) return init _BASE_CONTRACTS = ( Contract.OnTask, Contract.OnTaskSequence, Contract.OnScenario, Contract.OnPythonProgram, ) def group_by_contract(plugins: Iterable[Plugin]) -> DefaultDict[Contract, List[Plugin]]: """ Groups plugins in lists according to their contracts. Each plugin is found in as many lists as it implements base contracts. Lists keep the order of the original plugins iterable. """ res = defaultdict(list) for p in plugins: c = contract(p) for bc in _BASE_CONTRACTS: if c & bc: # Contract is an enum.Flag: & computes the intersection. res[bc].append(p) return res PK!v&transformer/plugins/dummy.pyimport logging from typing import cast from transformer.plugins import plugin, Contract from transformer.request import Request from transformer.scenario import Scenario from transformer.task import Task @plugin(Contract.OnScenario) def f(s: Scenario) -> Scenario: first_req = first(s) logging.info(f"The first request was {first_req.url.geturl()}") return s def first(s: Scenario) -> Request: while isinstance(s, Scenario): s = s.children[0] return cast(Task, s).request PK!.jtransformer/plugins/resolve.pyimport importlib import inspect import logging from types import ModuleType from typing import Iterator from transformer.plugins.contracts import ( Plugin, Contract, InvalidContractError, contract, InvalidPluginError, ) def resolve(name: str) -> Iterator[Plugin]: """ Transform a plugin name into the corresponding, actual plugins. The name of a plugin is the name of a Python module containing (at least) one function decorated with @plugin (from the contracts module). The "resolve" function loads that module and returns these plugin functions found inside the module. :raise ImportError: if name does not match an accessible module. :raise TypeError: from load_load_plugins_from_module. :raise InvalidContractError: from load_load_plugins_from_module. :raise NoPluginError: from load_load_plugins_from_module. """ module = importlib.import_module(name) yield from load_plugins_from_module(module) class NoPluginError(ValueError): """ Raised for Python modules that should but don't contain any plugin function. """ def load_plugins_from_module(module: ModuleType) -> Iterator[Plugin]: """ :param module: Python module from which to load plugin functions. :raise TypeError: if module is not a Python module. :raise InvalidContractError: if a function is associated to an invalid contract. :raise NoPluginError: if module doesn't contain at least one plugin function. """ if not inspect.ismodule(module): raise TypeError(f"expected a module, got {module!r}") nb_plugins = 0 for _, obj in inspect.getmembers(module, inspect.isfunction): try: c = contract(obj) except InvalidPluginError: logging.debug(f"ignoring {_n(obj)}: not decorated with @plugin") continue if not isinstance(c, Contract): msg = f"{_n(obj)} associated to an invalid contract {c!r}" raise InvalidContractError(msg) nb_plugins += 1 yield obj if nb_plugins < 1: raise NoPluginError(module) def _n(x) -> str: return getattr(x, "__qualname__", None) or repr(x) PK!ތ'transformer/plugins/sanitize_headers.md# Sanitizing headers The [`sanitize_headers` plugin](sanitize_headers.py) should be used for processing scenarios generated in the Chrome browser, but is also advised to use it whenever cookies handling is important. The plugin removes Chrome-specific, RFC-non-compliant headers starting with `:`. Examples of such headers: ``` :authority: chrome.google.com :method: POST :path: /reviews/json/search :scheme: https ``` Additionally, the plugin: - converts header names to lowercase, which simplifies further header overriding, - ignores the `cookie` header, as cookies are handled by [Locust's _HttpSession_][http-session]. [http-session]: https://docs.locust.io/en/stable/api.html#httpsession-class PK!l'transformer/plugins/sanitize_headers.pyfrom transformer.helpers import zip_kv_pairs from transformer.plugins import plugin, Contract from transformer.task import Task2 @plugin(Contract.OnTask) def plugin(task: Task2) -> Task2: """ Removes Chrome-specific, RFC-non-compliant headers starting with `:`. Converts header names to lowercase to simplify further overriding. Removes the cookie header as it is handled by Locust's HttpSession. """ headers = task.request.headers if not isinstance(headers, dict): headers = zip_kv_pairs(headers) sanitized_headers = { k.lower(): v for (k, v) in headers.items() if not k.startswith(":") and k.lower() != "cookie" } task.request = task.request._replace(headers=sanitized_headers) return task PK!e e %transformer/plugins/test_contracts.pyimport pytest from hypothesis import given from hypothesis.strategies import from_type from transformer.plugins import apply, group_by_contract from .contracts import ( Contract, plugin, contract, InvalidContractError, InvalidPluginError, ) @given(from_type(Contract)) def test_contract_returns_contract_associated_with_plugin_decorator(c: Contract): @plugin(c) def foo(): ... assert contract(foo) is c def test_plugin_decorator_raises_with_invalid_contract(): with pytest.raises(InvalidContractError): @plugin(2) def foo(): ... def test_plugin_decorator_raises_without_contract(): with pytest.raises(InvalidContractError): @plugin def foo(): ... def test_contract_raises_with_invalid_plugin(): def foo(): ... with pytest.raises(InvalidPluginError): contract(foo) def test_plugin_is_exported_by_the_transformer_plugins_module(): try: from transformer.plugins import plugin except ImportError: pytest.fail("plugin should be exported by transformer.plugins") def test_Contract_is_exported_by_the_transformer_plugins_module(): try: from transformer.plugins import Contract except ImportError: pytest.fail("Contract should be exported by transformer.plugins") class TestApply: def test_return_init_unchanged_without_plugins(self): x = object() assert apply([], x) is x def test_return_plugin_result(self): @plugin(Contract.OnTask) def plugin_a(x: str) -> str: return x + "a" assert apply([plugin_a], "z") == "za" def test_runs_plugins_in_succession_on_input(self): @plugin(Contract.OnTask) def plugin_a(x: str) -> str: return x + "a" @plugin(Contract.OnTask) def plugin_b(x: str) -> str: return x + "b" assert apply((plugin_a, plugin_b), "") == "ab" assert apply((plugin_b, plugin_a, plugin_b), "") == "bab" class TestGroupByContract: def test_return_empty_dict_when_no_plugins(self): assert group_by_contract([]) == {} def test_index_plugins_with_simple_contracts_by_their_contract(self): @plugin(Contract.OnTask) def plugin_a(): pass @plugin(Contract.OnTask) def plugin_b(): pass @plugin(Contract.OnScenario) def plugin_z(): pass assert group_by_contract((plugin_a, plugin_b, plugin_z)) == { Contract.OnScenario: [plugin_z], Contract.OnTask: [plugin_a, plugin_b], } def test_index_plugins_with_complex_contracts_by_their_basic_contracts(self): @plugin(Contract.OnTask) def plugin_task(): pass @plugin(Contract.OnTask | Contract.OnScenario | Contract.OnPythonProgram) def plugin_multi(): pass assert group_by_contract((plugin_task, plugin_multi)) == { Contract.OnTask: [plugin_task, plugin_multi], Contract.OnScenario: [plugin_multi], Contract.OnPythonProgram: [plugin_multi], } PK!j梺!transformer/plugins/test_dummy.pyimport logging import os from pathlib import Path import transformer from transformer.helpers import DUMMY_HAR_STRING def test_dummy_plugin_works(tmp_path: Path, caplog): har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) caplog.set_level(logging.INFO) with open(os.path.devnull, "w") as f: transformer.dump(f, [har_path], plugins=["transformer.plugins.dummy"]) assert "The first request was https://www.zalando.de" in caplog.text PK!zr #transformer/plugins/test_resolve.pyimport logging import random import sys import uuid from pathlib import Path from types import ModuleType import pytest from hypothesis import given from hypothesis._strategies import permutations from transformer.plugins.contracts import plugin, Contract from .resolve import load_plugins_from_module, resolve, NoPluginError @pytest.fixture() def module_root(tmp_path: Path, monkeypatch) -> Path: monkeypatch.setattr(sys, "path", [str(tmp_path), *sys.path]) return tmp_path class TestResolve: def test_raises_for_module_not_found(self): modname = f"that_module_does_not_exist.{uuid.uuid4().hex}" with pytest.raises(ImportError): list(resolve(modname)) # must force evaluation of the generator def test_calls_load_plugins_from_module_with_module(self, module_root: Path): modname = "ab.cd.ef" modpath = Path(*modname.split(".")).with_suffix(".py") Path(module_root, modpath.parent).mkdir(parents=True) with Path(module_root, modpath).open("w") as f: f.write("from transformer.plugins.contracts import plugin, Contract\n") f.write("@plugin(Contract.OnTask)\n") f.write("def f(t):\n") f.write(" ...\n") f.write("def helper(t):\n") f.write(" ...\n") plugins = list(resolve(modname)) assert len(plugins) == 1 f = plugins[0] assert callable(f) assert f.__name__ == "f" def test_resolve_is_exported_by_the_transformer_plugins_module(self): try: from transformer.plugins import resolve except ImportError: pytest.fail("resolve should be exported by transformer.plugins") @pytest.fixture() def module() -> ModuleType: """Creates and returns an empty module.""" return ModuleType(f"fake_{random.randint(0, 99999999)}") class TestLoadPluginsFromModule: def test_raises_error_for_non_module(self): class A: pass with pytest.raises(TypeError): # Iterators are lazy, we need list() list(load_plugins_from_module(A)) def not_a_plugin(_): ... def plugin_not_a_plugin_either(_): ... @plugin(Contract.OnTask) def plugin_valid(_): ... @given(permutations((not_a_plugin, plugin_not_a_plugin_either, plugin_valid))) def test_ignores_non_plugin_stuff_in_module(self, module, caplog, functions): for f in functions: module.__dict__[f.__name__] = f caplog.clear() caplog.set_level(logging.DEBUG) plugins = list(load_plugins_from_module(module)) plugin_valid = next(f for f in functions if f.__name__ == "plugin_valid") assert plugins == [plugin_valid] non_plugin_functions = {f for f in functions if f is not plugin_valid} print(f">>> log messages: {caplog.messages}") for f in non_plugin_functions: assert any( f.__name__ in msg for msg in caplog.messages ), "ignored function names should be logged" def test_raises_for_modules_without_any_plugin(self, module): with pytest.raises(NoPluginError, match=module.__name__): # must force evaluation of the generator list(load_plugins_from_module(module)) PK!<,transformer/plugins/test_sanitize_headers.pyfrom datetime import datetime from urllib.parse import urlparse from transformer.request import HttpMethod, Header, Request from transformer.task import Task2 from .sanitize_headers import plugin def test_its_name_is_resolvable(): from transformer.plugins import resolve assert list(resolve("transformer.plugins.sanitize_headers")) == [plugin] TS = datetime(1970, 1, 1) def task_with_header(name: str, value: str) -> Task2: return Task2( name="some task", request=Request( timestamp=TS, method=HttpMethod.GET, url=urlparse("https://example.com"), headers=[Header(name=name, value=value)], post_data={}, query=[], ), ) def test_it_removes_headers_beginning_with_a_colon(): task = task_with_header(":non-rfc-header", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 0 def test_it_downcases_header_names(): task = task_with_header("Some Name", "some value") sanitized_headers = plugin(task).request.headers assert "some name" in sanitized_headers def test_it_removes_cookies(): task = task_with_header("Cookie", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 0 def test_it_does_not_change_nor_remove_other_headers(): task = task_with_header("some other header", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 1 PK!s=*U*Utransformer/python.pyimport re from types import MappingProxyType from typing import ( Sequence, Mapping, Any, List, Type, Set, Optional, Tuple, cast, Iterable, ) IMMUTABLE_EMPTY_DICT = MappingProxyType({}) class Line: """ A line of text and its associated indentation level. This class allows not to constantly copy strings to add a new indentation level at every scope of the AST. """ INDENT_UNIT: str = " " * 4 def __init__(self, text: str, indent_level: int = 0) -> None: self.text = text self.indent_level = indent_level def __str__(self) -> str: return f"{self.INDENT_UNIT * self.indent_level}{self.text}" def __repr__(self) -> str: return "{}(text={!r}, indent_level={!r})".format( self.__class__.__qualname__, self.text, self.indent_level ) def clone(self) -> "Line": """ Creates an exact but disconnected copy of self. Useful in tests. """ return self.__class__(text=self.text, indent_level=self.indent_level) def __eq__(self, o: object) -> bool: return ( isinstance(o, self.__class__) and self.text == cast(__class__, o).text and self.indent_level == cast(__class__, o).indent_level ) def _resplit(parts: Iterable[str]) -> List[str]: """ Given a list of strings, returns a list of lines, by splitting each string into multiple lines where it contains newlines. >>> _resplit([]) [] >>> _resplit(['a', 'b']) ['a', 'b'] >>> _resplit(['a', 'b\\nc\\nd']) ['a', 'b', 'c', 'd'] """ return [line for part in parts for line in part.splitlines()] class Statement: """ Python distinguishes between statements and expressions: basically, statements cannot be assigned to a variable, whereas expressions can. For our purpose, another distinction is important: statements may span over multiple lines (and not just for style), whereas all expressions can be expressed in a single line. This class serves as abstract base for all implementors of lines() and handles comment processing for them. """ def __init__(self, comments: Sequence[str] = ()) -> None: self._comments = _resplit(comments) @property def comments(self) -> List[str]: self._comments = _resplit(self._comments) return self._comments @comments.setter def comments(self, value: List[str]): self._comments = value def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ All Line objects necessary to represent this Statement, along with the appropriate indentation level. :param indent_level: How much indentation to apply to the least indented line of this statement. :param comments: Whether existing comments attached to self should be included in the result. """ raise NotImplementedError def comment_lines(self, indent_level: int) -> List[Line]: """ Converts self.comments from str to Line with "#" prefixes. """ return [Line(f"# {s}", indent_level) for s in self.comments] def attach_comment(self, line: Line) -> List[Line]: """ Attach a comment to line: inline if self.comments is just one line, on dedicated new lines above otherwise. """ comments = self.comments if not comments: return [line] if len(comments) == 1: line.text += f" # {comments[0]}" return [line] lines = self.comment_lines(line.indent_level) lines.append(line) return lines def __eq__(self, o: object) -> bool: return ( isinstance(o, self.__class__) and self.comments == cast(__class__, o).comments ) # Handy alias for type signatures. Program = Sequence[Statement] class OpaqueBlock(Statement): """ A block of code already represented as a string. This helps moving existing code (e.g. in plugins) from our ad-hoc "blocks of code" framework to the AST framework defined in this module. It also allows to express Python constructs that would otherwise not yet be representable with this AST framework. """ PREFIX_RX = re.compile(r"\s+") TAB_SIZE = 8 def __init__(self, block: str, comments: Sequence[str] = ()) -> None: super().__init__(comments) if not block.strip(): raise ValueError(f"OpaqueBlock can't be empty but got {block!r}") self.block = block def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: raw_lines = [l.expandtabs(self.TAB_SIZE) for l in self.block.splitlines()] first_nonempty_line = next(i for i, l in enumerate(raw_lines) if l.strip()) after_last_nonempty_line = next( len(raw_lines) - i for i, l in enumerate(reversed(raw_lines)) if l.strip() ) raw_lines = raw_lines[first_nonempty_line:after_last_nonempty_line] indents = [self.PREFIX_RX.match(l) for l in raw_lines] shortest_indent = min(len(p.group()) if p else 0 for p in indents) block_lines = [Line(l[shortest_indent:], indent_level) for l in raw_lines] if comments: return [*self.comment_lines(indent_level), *block_lines] return block_lines def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.block, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.block == cast(__class__, o).block class Function(Statement): """ A function definition (def ...). """ def __init__( self, name: str, params: Sequence[str], statements: Sequence[Statement], comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.name = name self.params = list(params) self.statements = list(statements) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: param_list = ", ".join(self.params) body_lines = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"def {self.name}({param_list}):", indent_level) if comments: return [*self.attach_comment(top), *body_lines] return [top, *body_lines] def __repr__(self) -> str: return "{}(name={!r}, params={!r}, statements={!r}, comments={!r})".format( self.__class__.__qualname__, self.name, self.params, self.statements, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.params == cast(__class__, o).params and self.statements == cast(__class__, o).statements ) class Decoration(Statement): """ A function or class definition to which is applied a decorator (e.g. @task). """ def __init__( self, decorator: str, target: Statement, comments: Sequence[str] = () ) -> None: super().__init__(comments) self.decorator = decorator self.target = target def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: top = Line(f"@{self.decorator}", indent_level) target_lines = self.target.lines(indent_level, comments) if comments: return [*self.attach_comment(top), *target_lines] return [top, *target_lines] def __repr__(self) -> str: return "{}({!r}, {!r}, comments={!r})".format( self.__class__.__qualname__, self.decorator, self.target, self.comments ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.decorator == cast(__class__, o).decorator and self.target == cast(__class__, o).target ) class Class(Statement): """ A class definition. """ def __init__( self, name: str, statements: Sequence[Statement], superclasses: Sequence[str] = (), comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.name = name self.statements = list(statements) self.superclasses = list(superclasses) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: superclasses = "" if self.superclasses: superclasses = "({})".format(", ".join(self.superclasses)) body = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"class {self.name}{superclasses}:", indent_level) if comments: return [*self.attach_comment(top), *body] return [top, *body] def __repr__(self) -> str: return ( "{}(name={!r}, statements={!r}, " "superclasses={!r}, comments={!r})" ).format( self.__class__.__qualname__, self.name, self.statements, self.superclasses, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.statements == cast(__class__, o).statements and self.superclasses == cast(__class__, o).superclasses ) class Expression: """ See the documentation of Statement for why Expression is a separate class. An expression is still a statement in Python (e.g. functions can be called anywhere), but this Expression class is NOT a Statement because we can't attach comments to arbitrary expressions (e.g. between braces). If you need to use an Expression as a Statement, see the Standalone wrapper class. This class serves as abstract base for all our implementors of __str__(). """ def __str__(self) -> str: raise NotImplementedError def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) class Standalone(Statement): """ Wraps an Expression so that it can be used as a Statement. """ def __init__(self, expr: Expression, comments: Sequence[str] = ()) -> None: super().__init__(comments) self.expr = expr def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ An Expression E used as a Statement is serialized as the result of str(E) on its own Line. """ line = Line(str(self.expr), indent_level) if comments: return self.attach_comment(line) return [line] def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.expr, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.expr == cast(__class__, o).expr def _all_subclasses_of(cls: Type) -> Set[Type]: """ All subclasses of cls, including non-direct ones (child of child of ...). """ direct_subclasses = set(cls.__subclasses__()) return direct_subclasses.union( s for d in direct_subclasses for s in _all_subclasses_of(d) ) class Literal(Expression): """ All literal Python expressions (integers, strings, lists, etc.). Everything will be serialized using repr(), except Expression objects that could be contained in a composite value like list: they will be serialized with str(), as is probably expected. Thus: >>> str(Literal([1, {"a": FString("-{x}")}])) "[1, {'a': f'-{x}'}]" instead of something like "[1, {'a': FString('-{x}')}]". """ def __init__(self, value: Any) -> None: super().__init__() self.value = value _REPR_BY_EXPR_CLS = None def __str__(self) -> str: # This is not pretty, but repr() doesn't accept a visitor we could use # to say "just this time, use that code to serialize Expression objects". if Literal._REPR_BY_EXPR_CLS is None: Literal._REPR_BY_EXPR_CLS = { c: c.__repr__ for c in _all_subclasses_of(Expression) } try: for k in Literal._REPR_BY_EXPR_CLS.keys(): k.__repr__ = k.__str__ return repr(self.value) finally: for k, _repr in Literal._REPR_BY_EXPR_CLS.items(): k.__repr__ = _repr def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.value!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.value == cast(__class__, o).value class FString(Literal): """ f-strings cannot be handled like most literals because they are evaluated first, so they lose their "f" prefix and their template is executed too early. """ def __init__(self, s: str) -> None: if not isinstance(s, str): raise TypeError( f"expecting a format string, got {s.__class__.__qualname__}: {s!r}" ) super().__init__(s) def __str__(self) -> str: return "f" + repr(str(self.value)) class Symbol(Expression): """ The name of something (variable, function, etc.). Avoids any kind of text transformation that would happen with Literal. >>> str(Literal("x")) "'x'" >>> str(Symbol("x")) 'x' The provided argument's type is explicitly checked and a TypeError may be raised to avoid confusion when a user expects e.g. Symbol(True) to work like Symbol("True"). """ def __init__(self, name: str) -> None: super().__init__() if not isinstance(name, str): raise TypeError( f"expected symbol name, got {name.__class__.__qualname__}: {name!r}" ) self.name = name def __str__(self) -> str: return self.name def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.name!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.name == cast(__class__, o).name class FunctionCall(Expression): """ The invocation of a function or method. """ def __init__( self, name: str, positional_args: Sequence[Expression] = (), named_args: Mapping[str, Expression] = IMMUTABLE_EMPTY_DICT, ) -> None: super().__init__() self.name = name self.positional_args = list(positional_args) self.named_args = dict(named_args) def __str__(self) -> str: args = [str(a) for a in self.positional_args] + [ f"{k}={v}" for k, v in self.named_args.items() ] return f"{self.name}({', '.join(args)})" def __repr__(self) -> str: return "{}({!r}, {!r}, {!r})".format( self.__class__.__qualname__, self.name, self.positional_args, self.named_args, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.positional_args == cast(__class__, o).positional_args and self.named_args == cast(__class__, o).named_args ) class BinaryOp(Expression): """ The invocation of a binary operator. To avoid any precedence error in the generated code, operands that are also BinaryOps are always surrounded by braces (even when not necessary, as in "1 + (2 + 3)", as a more subtle behavior has increased complexity of implementation without much benefit. """ def __init__(self, lhs: Expression, op: str, rhs: Expression) -> None: super().__init__() self.lhs = lhs self.op = op self.rhs = rhs def __str__(self) -> str: operands = [self.lhs, self.rhs] return f" {self.op} ".join( f"({x})" if isinstance(x, BinaryOp) else str(x) for x in operands ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.op == cast(__class__, o).op and self.rhs == cast(__class__, o).rhs ) class Assignment(Statement): """ The assignment of a value to a variable. For our purposes, we don't treat multiple assignment via tuples differently. We also don't support chained assignments such as "a = b = 1". """ def __init__(self, lhs: str, rhs: Expression, comments: Sequence[str] = ()) -> None: super().__init__(comments) self.lhs = lhs self.rhs = rhs def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: line = Line(f"{self.lhs} = {self.rhs}", indent_level) if comments: return self.attach_comment(line) return [line] def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.rhs == cast(__class__, o).rhs ) def __repr__(self) -> str: return "{}(lhs={!r}, rhs={!r}, comments={!r})".format( self.__class__.__qualname__, self.lhs, self.rhs, self.comments ) class IfElse(Statement): """ The if/elif/else construct, where elif and else are optional and elif can be repeated. """ def __init__( self, condition_blocks: Sequence[Tuple[Expression, Sequence[Statement]]], else_block: Optional[Sequence[Statement]] = None, comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.condition_blocks = [ (cond, list(stmts)) for cond, stmts in condition_blocks ] self._assert_consistency() self.else_block = else_block def _assert_consistency(self): if not self.condition_blocks: raise ValueError("can't have an if without at least one block") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() lines = [] for i, block in enumerate(self.condition_blocks): keyword = "if" if i == 0 else "elif" lines.append(Line(f"{keyword} {block[0]}:", indent_level)) lines.extend( [ line for stmt in block[1] for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] ) if self.else_block: lines.append(Line("else:", indent_level)) lines.extend( [ line for stmt in self.else_block for line in stmt.lines(indent_level + 1, comments) ] ) if comments: # There is always a first line, or _assert_consistency would fail. return [*self.attach_comment(lines[0]), *lines[1:]] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.condition_blocks == cast(__class__, o).condition_blocks and self.else_block == cast(__class__, o).else_block ) def __repr__(self) -> str: return "{}(condition_blocks={!r}, else_block={!r}, comments={!r})".format( self.__class__.__qualname__, self.condition_blocks, self.else_block, self.comments, ) class Import(Statement): """ The import statement in all its forms: "import", "import X as A", "from M import X", "from M import X as A", and "from M import X, Y". """ def __init__( self, targets: Sequence[str], source: Optional[str] = None, alias: Optional[str] = None, comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.targets = list(targets) self.source = source self.alias = alias self._assert_consistency() def _assert_consistency(self): if not self.targets: raise ValueError("expected at least one import target") if len(self.targets) > 1 and self.alias: raise ValueError("alias forbidden for multiple import targets") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() import_kw = f"from {self.source} import" if self.source else "import" alias_clause = f" as {self.alias}" if self.alias else "" lines = [ Line(f"{import_kw} {target}{alias_clause}", indent_level) for target in self.targets ] if comments: return [*self.comment_lines(indent_level), *lines] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.targets == cast(__class__, o).targets and self.source == cast(__class__, o).source and self.alias == cast(__class__, o).alias ) def __repr__(self) -> str: return "{}(targets={!r}, source={!r}, alias={!r}, comments={!r})".format( self.__class__.__qualname__, self.targets, self.source, self.alias, self.comments, ) PK!0O transformer/request.py# -*- coding: utf-8 -*- """ A representation of a HAR Request. """ import enum from datetime import datetime from typing import Iterator, NamedTuple, List from urllib.parse import urlparse, SplitResult import pendulum from transformer.naming import to_identifier class HttpMethod(enum.Enum): """ Enumeration of HTTP method types. """ GET = enum.auto() POST = enum.auto() PUT = enum.auto() OPTIONS = enum.auto() DELETE = enum.auto() class Header(NamedTuple): """ HTTP header as recorded in HAR file. """ name: str value: str class QueryPair(NamedTuple): """ Query String as recorded in HAR file. """ name: str value: str class Request(NamedTuple): """ An HTTP request as recorded in a HAR file. """ timestamp: datetime method: HttpMethod url: SplitResult headers: List[Header] post_data: dict query: List[QueryPair] @classmethod def from_har_entry(cls, entry: dict) -> "Request": """ Creates a request from a HAR entry. """ request = entry["request"] return Request( timestamp=pendulum.parse(entry["startedDateTime"]), method=HttpMethod[request["method"]], url=urlparse(request["url"]), headers=[ Header(name=d["name"], value=d["value"]) for d in request.get("headers", []) ], post_data=request.get("postData", {}), query=[ QueryPair(name=d["name"], value=d["value"]) for d in request.get("queryString", []) ], ) @classmethod def all_from_har(cls, har: dict) -> Iterator["Request"]: """ Generates requests for all entries in a given HAR file. """ for entry in har["log"]["entries"]: yield cls.from_har_entry(entry) def task_name(self) -> str: """ Generates a simple name suitable for use as a Python function. """ return "_".join( ( self.method.name, self.url.scheme, to_identifier(self.url.hostname), to_identifier(self.url.path), str(abs(hash(self))), ) ) def __hash__(self) -> int: return hash( ( self.timestamp, self.method, self.url, tuple(self.post_data) if self.post_data else None, ) ) PK![A9+9+transformer/scenario.pyimport json import logging from collections import defaultdict from pathlib import Path from typing import ( Sequence, Mapping, Union, Set, List, Optional, Dict, Tuple, NamedTuple, ) import transformer.plugins as plug from transformer.naming import to_identifier from transformer.plugins.contracts import Plugin from transformer.request import Request from transformer.task import Task, Task2 WEIGHT_FILE_SUFFIX = ".weight" DEFAULT_WEIGHT = 1 class SkippableScenarioError(ValueError): # noqa: B903 """ Raised when a Scenario cannot be created from the provided input path. If related to the creation of a Scenario B inside a larger Scenario A (i.e. B would be in A.children), A catches this exception, logs a warning, and moves on to the next potential child. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason class DanglingWeightError(SkippableScenarioError): """ Raised when a scenario directory contains weight files that don't correspond to any scenario. """ pass class CollidingScenariosError(SkippableScenarioError): """ Raised when scenarios created from different paths end up having the same name. The only way this happens is if the paths are identical save for the extension (e.g. ".har" vs ".json"), or if there is a bug (collision) in transformer.naming.to_identifier (which should never happen). """ pass class WeightValueError(ValueError): # noqa: B903 """ Raised when the weight file associated to a scenario contains errors. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason class Scenario(NamedTuple): """ A user's web session that we want to emulate, i.e. a sequence of tasks to be performed in order. """ name: str children: Sequence[Union[Task, Task2, "Scenario"]] origin: Optional[Path] weight: int = 1 @classmethod def from_path( cls, path: Path, plugins: Sequence[Plugin] = (), ts_plugins: Sequence[Plugin] = (), short_name: bool = False, ) -> "Scenario": """ Makes a Scenario (possibly containing sub-scenarios) out of the provided path, which may point to either: - a HAR file (x/y/z.har), - a scenario directory (a directory containing HAR files or other scenario directories). :raise SkippableScenarioError: if path is neither a directory nor a HAR file, or is a directory containing dangling weight files :param short_name: whether the returned scenarios have names based only on their path's basename, instead of the full path. By default False to avoid generating homonym scenarios, but True when generating sub-scenarios (children) from a directory path (because then the names are "scoped" by the parent directory). """ if path.is_dir(): return cls.from_dir( path, plugins, ts_plugins=ts_plugins, short_name=short_name ) else: return cls.from_har_file( path, plugins, ts_plugins=ts_plugins, short_name=short_name ) @classmethod def from_dir( cls, path: Path, plugins: Sequence[Plugin], ts_plugins: Sequence[Plugin], short_name: bool, ) -> "Scenario": """ Makes a Scenario out of the provided directory path. The directory must be a "scenario directory", which means that it must contain at least one HAR file or another scenario directory. Symbolic link loops are not checked but forbidden! There may exist a weight file .weight. If so, its contents will be used as weight for the Scenario by calling weight_from_path. Errors are handled this way: 1. If path itself cannot be transformed into a scenario, raise SkippableScenarioError. 2. For each child of path, apply (1) but catch the exception and display a warning about skipping this child. (If all children are skipped, (1) applies to path itself.) Therefore: - If the directory contains weight files that don't match any HAR file or subdirectory, an error will be emitted as this is probably a mistake. - If the directory contains files or directory that cannot be converted into scenarios (e.g. non-JSON files or .git directories), a message is emitted and the file or subdirectory is skipped. :raise SkippableScenarioError: if the directory contains dangling weight files or no sub-scenarios. """ try: children = list(path.iterdir()) except OSError as err: raise SkippableScenarioError(path, err) weight_files: Set[Path] = { child for child in children if child.suffix == WEIGHT_FILE_SUFFIX } scenarios: List[Scenario] = [] for child in children: if child in weight_files: continue try: scenario = cls.from_path( child, plugins, ts_plugins=ts_plugins, short_name=True ) except SkippableScenarioError as err: logging.warning( "while searching for HAR files, skipping %s: %s", child, err.reason ) else: scenarios.append(scenario) cls._check_dangling_weights(path, scenarios, weight_files) if not scenarios: raise SkippableScenarioError(path, "no scenarios inside the directory") cls._check_name_collisions(path, scenarios) return Scenario( name=to_identifier(path.with_suffix("").name if short_name else str(path)), children=tuple(scenarios), origin=path, weight=cls.weight_from_path(path), ) @classmethod def _check_name_collisions(cls, path: Path, scenarios: List["Scenario"]): scenarios_by_name: Dict[str, List[Scenario]] = defaultdict(list) for s in scenarios: scenarios_by_name[s.name].append(s) colliding_paths: Set[Tuple[Path, ...]] = { tuple(x.origin for x in xs) for xs in scenarios_by_name.values() if len(xs) > 1 } if colliding_paths: groups = "; ".join( " vs ".join(repr(s.name) for s in group) for group in colliding_paths ) logging.error( "%s contains scenarios with colliding names: %s", path, groups ) raise CollidingScenariosError(path, "scenarios have colliding names") @classmethod def _check_dangling_weights(cls, path, scenarios, weight_files): scenario_names = {s.origin.with_suffix("").name for s in scenarios} dangling_weight_files = [ f for f in weight_files if f.with_suffix("").name not in scenario_names ] if dangling_weight_files: hint = ", ".join(str(f) for f in dangling_weight_files) logging.error( "%s contains weight files that don't correspond to any scenarios: %s", path, hint, ) logging.info( "For any value of X, if there exists a weight file X.weight, " "there must exist either an X.har file or an X scenario subdirectory." ) raise DanglingWeightError(path, "contains dangling weight files") @classmethod def from_har_file( cls, path: Path, plugins: Sequence[Plugin], ts_plugins: Sequence[Plugin], short_name: bool, ) -> "Scenario": """ Creates a Scenario given a HAR file. :raise SkippableScenarioError: if path is unreadable or not a HAR file """ try: with path.open() as file: har = json.load(file) requests = Request.all_from_har(har) tasks = Task.from_requests(requests) # TODO: Remove this when Contract.OnTaskSequence is removed. tasks = plug.apply(ts_plugins, tasks) # TODO: Remove Task-to-Task2 conversion once both are merged. tasks = tuple(plug.apply(plugins, Task2.from_task(t)) for t in tasks) return Scenario( name=to_identifier( path.with_suffix("").name if short_name else str(path) ), children=tuple(tasks), origin=path, weight=cls.weight_from_path(path), ) except (OSError, json.JSONDecodeError, UnicodeDecodeError) as err: raise SkippableScenarioError(path, err) @classmethod def weight_from_path(cls, path: Path) -> int: """ Reads the weight file corresponding to path, or returns a default weight if the weight file doesn't exist. :param path: represents either a HAR file or a scenario directory :raise WeightValueError: if the weight file exists but its contents cannot be interpreted as a weight """ weight_path = path.with_suffix(WEIGHT_FILE_SUFFIX) try: weight = weight_path.read_text().strip() except OSError as err: logging.info( f"No {weight_path} provided for {path}: " f"assigning default weight {DEFAULT_WEIGHT} ({err})" ) return DEFAULT_WEIGHT if not weight.isdecimal() or int(weight) == 0: logging.error( f"invalid weight file %s: weights must be positive integers, got %r", weight_path, weight, ) raise WeightValueError(path, weight) return int(weight) @property def global_code_blocks(self) -> Mapping[str, Sequence[str]]: # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. return { block_name: block_lines for child in self.children for block_name, block_lines in child.global_code_blocks.items() } def apply_plugins(self, plugins: Sequence[Plugin]) -> "Scenario": """ Recursively builds a new scenario tree from the leaves by applying all plugins to each cloned scenario subtree. Does not do anything if plugins is empty. """ if not plugins: return self children = [ c.apply_plugins(plugins) if isinstance(c, Scenario) else c for c in self.children ] return plug.apply(plugins, self._replace(children=children)) PK!{5transformer/task.py# -*- coding: utf-8 -*- """ A representation of a Locust Task. """ import json from types import MappingProxyType from typing import Iterable, NamedTuple, Iterator, Sequence, Optional, Mapping, List import transformer.python as py from transformer.blacklist import on_blacklist from transformer.helpers import zip_kv_pairs from transformer.request import HttpMethod, Request, QueryPair IMMUTABLE_EMPTY_DICT = MappingProxyType({}) TIMEOUT = 30 ACTION_INDENTATION_LEVEL = 12 class LocustRequest(NamedTuple): """ All parameters for the request performed by the Locust client object. """ method: HttpMethod url: str headers: Mapping[str, str] post_data: dict = MappingProxyType({}) query: Sequence[QueryPair] = () @classmethod def from_request(cls, r: Request) -> "LocustRequest": return LocustRequest( method=r.method, url=repr(r.url.geturl()), headers=zip_kv_pairs(r.headers), post_data=r.post_data, query=r.query, ) NOOP_HTTP_METHODS = {HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.DELETE} def as_locust_action(self) -> str: args = { "url": self.url, "name": self.url, "headers": self.headers, "timeout": TIMEOUT, "allow_redirects": False, } if self.method is HttpMethod.POST: post_data = _parse_post_data(self.post_data) args[post_data["key"]] = post_data["data"] elif self.method is HttpMethod.PUT: post_data = _parse_post_data(self.post_data) args["params"] = zip_kv_pairs(self.query) args[post_data["key"]] = post_data["data"] elif self.method not in self.NOOP_HTTP_METHODS: raise ValueError(f"unsupported HTTP method: {self.method!r}") method = self.method.name.lower() named_args = ", ".join(f"{k}={v}" for k, v in args.items()) return f"response = self.client.{method}({named_args})" class Task2: def __init__( self, name: str, request: Request, statements: Sequence[py.Statement] = (), # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. global_code_blocks: Mapping[str, Sequence[str]] = IMMUTABLE_EMPTY_DICT, ) -> None: self.name = name self.request = request self.statements = list(statements) self.global_code_blocks = {k: list(v) for k, v in global_code_blocks.items()} @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task2"]: """ Generates a set of tasks from a given set of HTTP requests. Each request will be turned into an unevaluated function call making the actual request. The returned tasks are ordered by increasing timestamp of the corresponding request. """ # TODO: Update me when merging Task with Task2: "statements" needs to # contain the equivalent of LocustRequest. # See https://github.com/zalando-incubator/Transformer/issues/11. for req in sorted(requests, key=lambda r: r.timestamp): if not on_blacklist(req.url.netloc): yield cls(name=req.task_name(), request=req, statements=...) @classmethod def from_task(cls, task: "Task") -> "Task2": # TODO: Remove me as soon as the old Task is no longer used and Task2 is # renamed to Task. # See https://github.com/zalando-incubator/Transformer/issues/11. locust_request = task.locust_request if locust_request is None: locust_request = LocustRequest.from_request(task.request) return cls( name=task.name, request=task.request, statements=[ py.OpaqueBlock(block) for block in [ *task.locust_preprocessing, locust_request.as_locust_action(), *task.locust_postprocessing, ] ], ) class Task(NamedTuple): """ One step of "doing something" on a website. This basically represents a @task in Locust-speak. """ name: str request: Request locust_request: Optional[LocustRequest] = None locust_preprocessing: Sequence[str] = () locust_postprocessing: Sequence[str] = () global_code_blocks: Mapping[str, Sequence[str]] = MappingProxyType({}) @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task"]: """ Generates a set of Tasks from a given set of Requests. """ for req in sorted(requests, key=lambda r: r.timestamp): if on_blacklist(req.url.netloc): continue else: yield cls(name=req.task_name(), request=req) def as_locust_action(self, indentation=ACTION_INDENTATION_LEVEL) -> str: """ Converts a Task into a Locust Action. """ action: List[str] = [] for preprocessing in self.locust_preprocessing: action.append(_indent(preprocessing, indentation)) if self.locust_request is None: locust_request = LocustRequest.from_request(self.request) else: locust_request = self.locust_request action.append(locust_request.as_locust_action()) for postprocessing in self.locust_postprocessing: action.append(_indent(postprocessing, indentation)) return "\n".join(action) def inject_headers(self, headers: dict): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace( headers={**original_locust_request.headers, **headers} ) task = self._replace(locust_request=new_locust_request) return task def replace_url(self, url: str): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace(url=url) return self._replace(locust_request=new_locust_request) def _indent(input_string: str, requested_indentation: int) -> str: output_string = "" indentation = requested_indentation initial_leading_spaces = 0 for i, line in enumerate(input_string.splitlines()): leading_spaces = len(line) - len(line.lstrip()) if leading_spaces > 0: # We need to check the indentation of the second line in order to # account for the case where the existing indentation is greater than # the requested; it is used for reapplying sub-level-indentation e.g. # to if statements. if i == 1: initial_leading_spaces = leading_spaces else: indentation = requested_indentation + ( leading_spaces - initial_leading_spaces ) line = line.lstrip() output_string += line.rjust(len(line) + indentation, " ") + "\n" return output_string def _parse_post_data(post_data: dict) -> dict: data = post_data.get("text") mime: str = post_data.get("mimeType") if mime == "application/json": key = "json" # Workaround for bug in chrome-har: # https://github.com/sitespeedio/chrome-har/issues/23 # TODO: Remove once bug fixed. if data is None: params = post_data.get("params") if params is None: data = "" else: data = {} for param in params: data[param.get("name")] = param.get("value") else: data = json.loads(data) else: key = "data" if data: data = data.encode() return {"key": key, "data": data} PK!M~CCtransformer/test_blacklist.py# pylint: skip-file import io import os import logging from unittest.mock import patch from transformer.blacklist import on_blacklist class TestBlacklist: @patch("builtins.open") def test_it_returns_false_and_logs_error_if_the_blacklist_does_not_exist( self, mock_open, caplog ): mock_open.side_effect = FileNotFoundError caplog.set_level(logging.DEBUG) assert on_blacklist("") is False assert f"Could not read blacklist file {os.getcwd()}/.urlignore" in caplog.text @patch("builtins.open") def test_it_returns_false_if_the_blacklist_is_empty(self, mock_open): mock_open.return_value = io.StringIO("") assert on_blacklist("") is False @patch("builtins.open") def test_it_returns_false_if_url_is_not_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("www.zalando.de") is False @patch("builtins.open") def test_it_returns_true_if_url_is_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.google.com\nwww.amazon.com") assert on_blacklist("www.amazon.com") is True @patch("builtins.open") def test_it_returns_true_if_a_partial_match_is_found(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("http://www.amazon.com/") is True @patch("builtins.open") def test_it_ignores_empty_lines(self, mock_open): mock_open.return_value = io.StringIO("\nwww.amazon.com") assert on_blacklist("www.zalando.de") is False PK!؏.transformer/test_cli.pyfrom pathlib import Path from .cli import read_config class TestReadConfig: def test_paths_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config([]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli(self): conf = read_config(["/x/y", "a/b"]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli_overwrite_those_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config(["u/v/w"]) assert conf.input_paths == (Path("u/v/w"),) def test_plugins_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config([]) assert conf.plugins == ("a", "b.c.d") def test_plugins_from_cli(self): conf = read_config(["-p", "a", "XXX", "--plugin", "b.c.d"]) assert conf.plugins == ("a", "b.c.d") def test_merge_plugins_from_env_and_cli(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config(["-p", "e.f", "XXX", "--plugin", "g"]) assert conf.plugins == ("a", "b.c.d", "e.f", "g") PK!]JK K transformer/test_decision.pyfrom typing import List import pytest from hypothesis import given, assume from hypothesis.strategies import booleans, lists from .builders_decision import reasons, decisions, yes_decisions, no_decisions from .decision import Decision @given(decisions) def test_bool_relies_on_value_only(d: Decision): assert bool(d) == d.valid @given(decisions, decisions) def test_equality_relies_on_value_only(a: Decision, b: Decision): assert (a == b) == (a.valid == b.valid) class TestYes: def test_is_true(self): assert Decision.yes().valid is True def test_without_argument_has_dummy_reason(self): assert Decision.yes().reason.lower() == "ok" @given(reasons) def test_with_argument_records_reason(self, r: str): assert Decision.yes(r).reason == r class TestNo: @given(reasons) def test_is_false_regardless_of_reason(self, r: str): assert Decision.no(r).valid is False def test_without_argument_raises_error(self): with pytest.raises(Exception): Decision.no() @given(reasons) def test_reason_is_recorded_and_accessible(self, r: str): assert Decision.no(r).reason == r class TestWhether: @given(decisions, reasons) def test_wrapper_propagates_wrapped_value(self, d: Decision, r: str): assert Decision.whether(d, r).valid == d.valid @given(yes_decisions, reasons) def test_reuses_wrapped_reason_when_true(self, y: Decision, r: str): assert Decision.whether(y, r).reason == y.reason @given(no_decisions, reasons) def test_enriches_wrapped_reason_when_false(self, n: Decision, r: str): assert Decision.whether(n, r).reason == f"{r}: {n.reason}" @given(booleans()) def test_accepts_raw_bool(self, b: bool): d = Decision.whether(b, "x") assert d.valid == b assert d.reason == "x", "reason is always recorded" class TestAll: @given(lists(booleans())) def test_behaves_like_builtin(self, bs: List[bool]): bs_as_decisions = (Decision(valid=b, reason="") for b in bs) assert all(bs) == bool(Decision.all(bs_as_decisions)) def test_reuse_first_wrapped_no_reason(self): y = Decision.yes() n1 = Decision.no("n1") n2 = Decision.no("n2") assert Decision.all((y, n1, y, n2, y)) == n1 class TestAny: @given(lists(booleans())) def test_behaves_like_builtin(self, bs: List[bool]): bs_as_decisions = (Decision(valid=b, reason="") for b in bs) assert any(bs) == bool(Decision.any(bs_as_decisions)) def test_reuse_first_wrapped_yes_reason(self): y1 = Decision.yes("y1") y2 = Decision.yes("y2") n = Decision.no("n") assert Decision.any((n, y1, n, y2, n)) == y1 @given(lists(no_decisions, max_size=5)) def test_reason_lists_invalid_cases_if_5_or_less(self, ds: List[Decision]): d = Decision.any(ds) assert str([x.reason for x in ds]) in d.reason def test_reason_mentions_number_of_invalid_cases_if_more_than_5(self): nb_cases = 10 ds = [Decision.no("X") for _ in range(nb_cases)] d = Decision.any(ds) assert str(nb_cases) in d.reason @given(decisions, reasons) def test_provided_reason_prefixes_default_message(self, d: Decision, r: str): assume(r) assert Decision.any([d], r).reason == f"{r}: {Decision.any([d]).reason}" PK! كsvvtransformer/test_helpers.pyfrom transformer.helpers import zip_kv_pairs from transformer.request import Header class TestZipKVPairs: def test_it_returns_a_dict_given_a_list_of_named_tuples(self): name = "some name" value = "some value" result = zip_kv_pairs([Header(name=name, value=value)]) assert isinstance(result, dict) assert result[name] == value PK!2hw transformer/test_locust.pyimport string from typing import cast from unittest.mock import MagicMock import pytest from transformer.locust import locustfile, locust_taskset from transformer.request import HttpMethod from transformer.scenario import Scenario from transformer.task import Task, TIMEOUT class TestLocustfile: def test_it_renders_a_locustfile_template(self): a_name = "some_task" a_request = MagicMock() a_request.method = HttpMethod.GET a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_request.url.geturl() a_request.url.geturl.return_value = "some_url" task = Task(a_name, a_request) scenario = Scenario(name="SomeScenario", children=[task], origin=None) scenario_group = Scenario( name="ScenarioGroup", children=[scenario], weight=2, origin=None ) script = locustfile([scenario_group]) expected = string.Template( """ # File automatically generated by Transformer: # https://github.bus.zalan.do/TIP/transformer import re from locust import HttpLocust from locust import TaskSequence from locust import TaskSet from locust import seq_task from locust import task class ScenarioGroup(TaskSet): @task(1) class SomeScenario(TaskSequence): @seq_task(1) def some_task(self): response = self.client.get(url='some_url', name='some_url', headers={}, timeout=$TIMEOUT, allow_redirects=False) class LocustForScenarioGroup(HttpLocust): task_set = ScenarioGroup weight = 2 min_wait = 0 max_wait = 10 """ ).safe_substitute({"TIMEOUT": TIMEOUT}) assert expected.strip() == script.strip() def test_generates_passed_global_code_blocks(): sg1 = Scenario( "sg1", children=[ MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b1": ["ab"]} ), MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b2": ["cd"]} ), ], origin=None, ) sg2 = Scenario( "sg2", children=[MagicMock(spec_set=Scenario, children=[], global_code_blocks={})], origin=None, ) sg3 = Scenario( "sg3", children=[ MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b3": ["yz"], "b2": ["yyy", "zzz"]}, ) ], origin=None, ) code = locustfile([sg1, sg2, sg3]) assert code.endswith( "\n# b1\nab\n# b2\nyyy\nzzz\n# b3\nyz" ), "the latter b2 block should override the former" def test_locust_taskset_raises_on_malformed_scenario(): bad_child = cast(Scenario, 7) bad_scenario = Scenario(name="x", children=[bad_child], origin=None) with pytest.raises(TypeError, match=r"unexpected type .*\bchildren"): locust_taskset(bad_scenario) PK!transformer/test_naming.pyimport re from hypothesis import given, example, assume from hypothesis.strategies import text, from_regex from transformer.naming import to_identifier DIGITS_SUFFIX_RX = re.compile(r"_[0-9]+\Z") class TestToIdentifier: @given(text(min_size=1, max_size=3)) @example("0") def test_its_output_can_always_be_used_as_python_identifier(self, s: str): exec(f"{to_identifier(s)} = 2") @given(text(), text()) @example("x y", to_identifier("x y")) def test_it_has_no_collisions(self, a: str, b: str): assert a == b or to_identifier(a) != to_identifier(b) @given(from_regex(re.compile(r"[a-z_][a-z0-9_]*", re.IGNORECASE), fullmatch=True)) def test_it_does_not_add_suffix_when_not_necessary(self, input: str): assume(not DIGITS_SUFFIX_RX.search(input)) assert to_identifier(input) == input def test_it_adds_prefix_to_inputs_starting_with_digit(self): assert to_identifier("0").startswith("_") PK!3λ||transformer/test_python.pyimport pprint import string from typing import List from unittest.mock import patch import pytest from hypothesis import given from hypothesis.strategies import ( lists, text, integers, floats, one_of, booleans, none, dictionaries, ) import transformer.python as py from transformer.builders_python import ( indent_levels, ascii_inline_text, lines, opaque_blocks, statements, expressions, functions, decorations, classes, standalones, literals, function_calls, binary_ops, symbols, assignments, ifelses, imports, ) class TestLine: def test_str_is_identity_with_indent_level_zero(self): assert str(py.Line("abc", 0)) == "abc" def test_default_indent_level_is_zero(self): assert str(py.Line("abc")) == "abc" def test_default_indent_unit_is_four_spaces(self): assert py.Line.INDENT_UNIT == " " def test_str_indents_as_much_indent_units_as_provided_indent_level(self): assert ( str(py.Line("abc", 1)) == py.Line.INDENT_UNIT + "abc" ), "indent level 1 means one time INDENT_UNIT" assert str(py.Line("x", 2)) == py.Line.INDENT_UNIT * 2 + "x" @given(lines, lines) def test_equal_iff_text_and_indent_are_equal(self, a: py.Line, b: py.Line): assert a != b or (a.text == b.text and a.indent_level == b.indent_level) def test_repr(self): line = py.Line(text="a'\" b", indent_level=3) assert repr(line) == "Line(text='a\\'\" b', indent_level=3)" class TestStatement: def test_lines_must_be_implemented(self): with pytest.raises(NotImplementedError): py.Statement().lines() @given(statements, indent_levels, lists(ascii_inline_text(min_size=1), max_size=2)) def test_comment_lines(self, stmt: py.Statement, level: int, comments: List[str]): stmt.comments = comments x = py.Line.INDENT_UNIT assert [str(l) for l in stmt.comment_lines(level)] == [ x * level + f"# {line}" for line in comments ] @given(statements, lines) def test_attach_comment_without_comment_changes_nothing( self, stmt: py.Statement, line: py.Line ): clone = line.clone() stmt.comments.clear() # Some comments may have been generated by Hypothesis. lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, ascii_inline_text(min_size=1)) def test_attach_comment_with_one_line_comment_changes_only_text( self, stmt: py.Statement, line: py.Line, comment: str ): clone = line.clone() stmt.comments = [comment] lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert ( l.text == f"{clone.text} # {comment}" ), "the comment is appended to Line.text" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, lists(ascii_inline_text(min_size=1), min_size=2)) def test_attach_comment_with_multiline_comment_adds_lines_above( self, stmt: py.Statement, line: py.Line, comments: List[str] ): clone = line.clone() stmt.comments = comments lines = stmt.attach_comment(line) assert len(lines) == 1 + len(comments) l = lines[-1] assert l is line, "the same Line object must be returned last" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" assert lines[:-1] == [ py.Line(f"# {s}", clone.indent_level) for s in comments ], "all lines but the last are standalone comment lines" class TestOpaqueBlock: @given(opaque_blocks, opaque_blocks) def test_equal_iff_components_are_equal(self, a: py.OpaqueBlock, b: py.OpaqueBlock): assert a != b or (a.block == b.block and a.comments == b.comments) @given(text(string.whitespace, max_size=5)) def test_lines_raises_for_empty_input_block(self, block: str): with pytest.raises(ValueError): py.OpaqueBlock(block) def test_lines_returns_block_lines_if_top_and_bottom_are_not_empty(self): ob = py.OpaqueBlock(" a\n b\n\n\n c") assert len(ob.lines()) == 5 def test_lines_returns_block_lines_without_empty_top_and_bottom(self): ob = py.OpaqueBlock("\n\n a\n b\n\n\n c\n\n\n") assert len(ob.lines()) == 5 X = py.Line.INDENT_UNIT @pytest.mark.parametrize( "input_block, indent_level, expected", [ ("x", 0, "x"), ("x", 1, " x"), (" x", 1, " x"), (" x", 2, " x"), (" x", 0, "x"), ("x\nx", 0, "x\nx"), ("x\nx", 1, " x\n x"), (" x\n x", 1, " x\n x"), ("x\n x", 1, " x\n x"), ("x\nx\n", 1, " x\n x"), (" x\n x\n", 1, " x\n x"), ("x\n x\n", 1, " x\n x"), ("x\n x\n x", 0, "x\n x\n x"), ("x\n x\n x", 1, " x\n x\n x"), ("\nx\n x", 0, "x\n x"), ("\nx\n x", 1, " x\n x"), ("\n x\n x", 0, "x\n x"), ("\n x\n x", 1, " x\n x"), ("\tx\n\t x", 1, " x\n x"), ("\tx\n\t\tx", 1, f" x\n {' ' * py.OpaqueBlock.TAB_SIZE}x"), ], ) def test_lines_indents_correctly( self, input_block: str, indent_level: int, expected: str ): lines = py.OpaqueBlock(input_block).lines(indent_level) print("lines =") pprint.pprint(lines) with patch("transformer.python.Line.INDENT_UNIT", " "): assert "\n".join(str(line) for line in lines) == expected @given(opaque_blocks, indent_levels, ascii_inline_text(min_size=1)) def test_lines_displays_comment_always_above( self, ob: py.OpaqueBlock, level: int, comment: str ): x = py.Line.INDENT_UNIT ob.comments = [comment] assert [str(l) for l in ob.lines(level)] == [ x * level + f"# {comment}", *[str(l) for l in ob.lines(level, comments=False)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.OpaqueBlock("hello", comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [ x * level + "hello" ] def test_repr(self): text = " a'\" b " assert ( repr(py.OpaqueBlock(block=text, comments=["hi"])) == "OpaqueBlock(' a\\'\" b ', comments=['hi'])" ) class TestFunction: @given(indent_levels) def test_lines_with_no_params_and_no_body(self, level: int): f = py.Function("f", params=[], statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_simple_body(self, level: int): f = py.Function( "f", params=[], statements=[py.OpaqueBlock("print('Hello!')"), py.OpaqueBlock("return")], ) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "print('Hello!')", x * (level + 1) + "return", ] def test_lines_with_simple_params(self): f = py.Function("f", params=["x", "y"], statements=[]) assert [str(l) for l in f.lines()] == [ "def f(x, y):", py.Line.INDENT_UNIT + "pass", ] def test_lines_with_complex_params(self): f = py.Function( "f", params=["x: int", "abc: bool = True", "*z: str"], statements=[] ) assert [str(l) for l in f.lines()] == [ "def f(x: int, abc: bool = True, *z: str):", py.Line.INDENT_UNIT + "pass", ] @given(indent_levels) def test_lines_with_nested_body(self, level: int): x = py.Line.INDENT_UNIT f = py.Function( "func", params=[], statements=[ py.Assignment("a", py.Literal(2)), py.IfElse( [(py.Literal(True), [py.Assignment("b", py.Literal(3))])], [ py.Assignment("b", py.Literal(4)), py.Assignment("c", py.Literal(1)), ], ), ], ) assert [str(l) for l in f.lines(level)] == [ x * level + "def func():", x * (level + 1) + "a = 2", x * (level + 1) + "if True:", x * (level + 2) + "b = 3", x * (level + 1) + "else:", x * (level + 2) + "b = 4", x * (level + 2) + "c = 1", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "def f():", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level, comments=False)] == [ x * level + "def f():", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr(py.Function(name="f", params=["a"], statements=stmts, comments=["hi"])) == f"Function(name='f', params=['a'], statements={stmts!r}, comments=['hi'])" ) @given(functions, functions) def test_equal_iff_components_are_equal(self, a: py.Function, b: py.Function): assert a != b or ( a.name == b.name and a.params == b.params and a.statements == b.statements and a.comments == b.comments ) class TestDecoration: @given(decorations, decorations) def test_equal_iff_components_are_equal(self, a: py.Decoration, b: py.Decoration): assert a != b or ( a.decorator == b.decorator and a.target == b.target and a.comments == b.comments ) @given(indent_levels) def test_with_a_function(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) d = py.Decoration("task(2)", f) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_with_a_class(self, level: int): c = py.Class( "C", superclasses=[], statements=[py.Assignment("a: int", py.Literal(1))] ) d = py.Decoration("task", c) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task", *[str(l) for l in c.lines(level)], ] @given(indent_levels) def test_nested_decorators(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) first = py.Decoration("task(2)", f) second = py.Decoration("task_seq(1)", first) x = py.Line.INDENT_UNIT assert [str(l) for l in second.lines(level)] == [ x * level + "@task_seq(1)", x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_lines_with_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "# x", x * level + "# y", x * level + "@task", x * level + "# 1", x * level + "# 2", x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level, comments=False)] == [ x * level + "@task", x * level + "def f():", x * (level + 1) + "pass", ] def test_repr(self): f = py.Function("f", params=[], statements=[]) assert ( repr(py.Decoration("task", f, comments=["hi"])) == f"Decoration('task', {f!r}, comments=['hi'])" ) class TestClass: @given(classes, classes) def test_equal_iff_components_are_equal(self, a: py.Class, b: py.Class): assert a != b or ( a.name == b.name and a.statements == b.statements and a.superclasses == b.superclasses and a.comments == b.comments ) @given(indent_levels) def test_empty_class(self, level: int): c = py.Class("A", statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "pass", ] @given( lists( text(string.ascii_letters, min_size=1, max_size=2), min_size=0, max_size=3 ) ) def test_class_with_superclasses(self, names: List[str]): c = py.Class("A", statements=[], superclasses=names) x = py.Line.INDENT_UNIT if names: expected = "(" + ", ".join(names) + ")" else: expected = "" assert [str(l) for l in c.lines()] == [f"class A{expected}:", x + "pass"] @given(indent_levels) def test_class_with_fields(self, level: int): c = py.Class( "A", statements=[ py.Assignment("a", py.Literal(2)), py.Assignment("b", py.Literal(3)), ], ) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "a = 2", x * (level + 1) + "b = 3", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "class C:", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level, comments=False)] == [ x * level + "class C:", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr( py.Class( name="C", statements=stmts, superclasses=["A"], comments=["hi"] ) ) == f"Class(name='C', statements={stmts!r}, superclasses=['A'], comments=['hi'])" ) class TestExpression: def test_str_must_be_implemented(self): with pytest.raises(NotImplementedError): str(py.Expression()) class TestStandalone: @given(standalones, standalones) def test_equal_iff_components_are_equal(self, a: py.Standalone, b: py.Standalone): assert a != b or (a.expr == b.expr and a.comments == b.comments) @given(expressions, indent_levels) def test_lines_returns_the_expression_as_single_line( self, e: py.Expression, level: int ): stmt = py.Standalone(e) lines = stmt.lines(level) assert len(lines) == 1 line = lines[0] assert line.indent_level == level assert line.text == str(e) @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [x * level + "a"] def test_repr(self): expr = py.Symbol("a") assert ( repr(py.Standalone(expr, comments=["hi"])) == f"Standalone({expr!r}, comments=['hi'])" ) class TestLiteral: @given(literals, literals) def test_equal_iff_components_are_equal(self, a: py.Literal, b: py.Literal): assert a != b or a.value == b.value scalars = one_of(none(), booleans(), text(max_size=5), integers(), floats()) @given(scalars) def test_literal_scalar_uses_repr(self, x): assert str(py.Literal(x)) == repr(x) @given(lists(scalars)) def test_literal_list_of_scalars_uses_repr(self, x: list): assert str(py.Literal(x)) == repr(x) @given(dictionaries(scalars, scalars)) def test_literal_dict_of_scalars_uses_repr(self, x: dict): assert str(py.Literal(x)) == repr(x) def test_literal_composites_with_expr_use_repr_except_for_expr(self): lit = py.Literal("b") assert repr(lit) == "Literal('b')", "repr works on literal" assert str(py.Literal([1, {"a": lit}])) == "[1, {'a': 'b'}]" assert repr(lit) == "Literal('b')", "repr is still working on literal" class TestFString: def test_strings_appear_as_fstrings(self): assert str(py.FString("")) == "f''" assert str(py.FString("ab")) == "f'ab'" assert str(py.FString("a'b")) == """ f"a'b" """.strip() assert str(py.FString('a"b')) == """ f'a"b' """.strip() def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.FString(24)) def test_format_template_is_not_replaced(self): a = 2 assert str(py.FString("a {a} {} {a!r}")) == "f'a {a} {} {a!r}'" class TestSymbol: @given(symbols, symbols) def test_equal_iff_components_are_equal(self, a: py.Symbol, b: py.Symbol): assert a != b or a.name == b.name @given(text(string.ascii_letters)) def test_strings_appear_unchanged(self, s: str): assert str(py.Symbol(s)) == s def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.Symbol(True)) def test_repr(self): assert repr(py.Symbol(" x'\" y ")) == "Symbol(' x\\'\" y ')" class TestFunctionCall: @given(function_calls, function_calls) def test_equal_iff_components_are_equal( self, a: py.FunctionCall, b: py.FunctionCall ): assert a != b or ( a.name == b.name and a.positional_args == b.positional_args and a.named_args == b.named_args ) def test_with_no_args(self): assert str(py.FunctionCall("f")) == "f()" def test_with_positional_args(self): assert str(py.FunctionCall("f", [py.Literal(2)])) == "f(2)" def test_with_kwargs(self): assert ( str( py.FunctionCall( "f", named_args={"a": py.Literal(2), "bc": py.Literal("x")} ) ) == "f(a=2, bc='x')" ) def test_with_positional_and_kwargs(self): assert ( str( py.FunctionCall( "m.f", [py.Literal(True), py.FunctionCall("g", [py.Symbol("f")])], {"a": py.Literal(2), "bc": py.Literal("x")}, ) ) == "m.f(True, g(f), a=2, bc='x')" ) def test_repr(self): arg = py.Symbol("a") kwarg = py.Symbol("v") assert ( repr(py.FunctionCall("f", [arg], {"k": kwarg})) == f"FunctionCall('f', [{arg!r}], {{'k': {kwarg!r}}})" ) class TestBinaryOp: @given(binary_ops, binary_ops) def test_equal_iff_components_are_equal(self, a: py.BinaryOp, b: py.BinaryOp): assert a != b or (a.op == b.op and a.lhs == b.lhs and a.rhs == b.rhs) def test_simple(self): assert str(py.BinaryOp(py.Literal(2), "**", py.Literal(10))) == "2 ** 10" def test_nested(self): assert ( str( py.BinaryOp( py.Literal(2), "+", py.BinaryOp( py.BinaryOp(py.Literal(3), "-", py.Literal(4)), "*", py.Literal(5), ), ) ) == "2 + ((3 - 4) * 5)" ) class TestAssignment: @given(assignments, assignments) def test_equal_iff_components_are_equal(self, a: py.Assignment, b: py.Assignment): assert a != b or ( a.lhs == b.lhs and a.rhs == b.rhs and a.comments == b.comments ) @given(indent_levels) def test_simple(self, level: int): x = py.Line.INDENT_UNIT assert [str(l) for l in py.Assignment("foo", py.Literal(3)).lines(level)] == [ x * level + "foo = 3" ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "x = a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "x = a" ] def test_repr(self): rhs = py.Symbol("a") assert ( repr(py.Assignment("x", rhs, comments=["hi"])) == f"Assignment(lhs='x', rhs={rhs!r}, comments=['hi'])" ) class TestIfElse: @given(ifelses, ifelses) def test_equal_iff_components_are_equal(self, a: py.IfElse, b: py.IfElse): assert a != b or ( a.condition_blocks == b.condition_blocks and a.else_block == b.else_block and a.comments == b.comments ) def test_init_with_no_condition_raises_error(self): with pytest.raises(ValueError): py.IfElse([]) with pytest.raises(ValueError): py.IfElse([], else_block=[]) with pytest.raises(ValueError): py.IfElse([], else_block=[py.Assignment("a", py.Literal(2))]) @given(indent_levels) def test_lines_for_single_if(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ] ).lines(level) ] == [x * level + "if t is None:", x * (level + 1) + "t = 1"] @given(indent_levels) def test_lines_for_if_else(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ], [py.Assignment("t", py.Literal(2))], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "else:", x * (level + 1) + "t = 2", ] @given(indent_levels) def test_lines_for_if_elif(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ), ( py.Literal(False), [ py.Assignment("t", py.Literal(2)), py.Standalone(py.FunctionCall("f", [py.Symbol("t")])), ], ), ] ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "elif False:", x * (level + 1) + "t = 2", x * (level + 1) + "f(t)", ] @given(indent_levels) def test_lines_for_if_elif_else_with_no_statements(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ (py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), []), (py.Literal(False), []), (py.Literal(True), []), ], [], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "pass", x * level + "elif False:", x * (level + 1) + "pass", x * level + "elif True:", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "if True:", *[str(l) for l in if_true.lines(level + 1)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "if True:", *[str(l) for l in if_true.lines(level + 1, comments=False)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1, comments=False)], ] def test_repr(self): cond = py.Literal(True) if_true = [(cond, [py.Assignment("x", py.Symbol("a"))])] stmt = py.IfElse(condition_blocks=if_true, comments=["hi"]) assert ( repr(stmt) == f"IfElse(condition_blocks={if_true!r}, else_block=None, comments=['hi'])" ) class TestImport: @given(imports, imports) def test_equal_iff_components_are_equal(self, a: py.Import, b: py.Import): assert a != b or ( a.targets == b.targets and a.source == b.source and a.alias == b.alias and a.comments == b.comments ) def test_init_without_targets_raises_error(self): with pytest.raises(ValueError): py.Import([]) @given(indent_levels) def test_lines_without_targets_raises_error(self, level: int): i = py.Import(["safe"]) i.targets.clear() with pytest.raises(ValueError): i.lines(level) @given(indent_levels) def test_lines_with_single_target(self, level: int): x = py.Line.INDENT_UNIT name = "locust.http" assert [str(l) for l in py.Import([name]).lines(level)] == [ x * level + f"import {name}" ] @given(indent_levels) def test_lines_with_multiple_targets(self, level: int): x = py.Line.INDENT_UNIT names = ["locust.http", "math", "a.b.c"] assert [str(l) for l in py.Import(names).lines(level)] == [ x * level + f"import {name}" for name in names ] @given(indent_levels) def test_lines_with_single_target_and_alias(self, level: int): x = py.Line.INDENT_UNIT name = "transformer.python" alias = "py" assert [str(l) for l in py.Import([name], alias=alias).lines(level)] == [ x * level + f"import {name} as {alias}" ] def test_init_with_multiple_targets_and_alias_raises_error(self): with pytest.raises(ValueError): py.Import(["a", "b"], alias="c") @given(indent_levels) def test_lines_with_multiple_targets_and_alias_raises_error(self, level: int): i = py.Import(["safe"], alias="A") i.targets.append("oops") with pytest.raises(ValueError): i.lines(level) @given(indent_levels, lists(text(string.ascii_letters, min_size=1), min_size=1)) def test_lines_with_targets_and_source(self, level: int, targets: List[str]): x = py.Line.INDENT_UNIT source = "bar" assert [str(l) for l in py.Import(targets, source).lines(level)] == [ x * level + f"from {source} import {target}" for target in targets ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "import a", x * level + "import b", x * level + "import c", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "import a", x * level + "import b", x * level + "import c", ] def test_repr(self): stmt = py.Import(targets=["a", "b"], comments=["hi"]) assert ( repr(stmt) == f"Import(targets=['a', 'b'], source=None, alias=None, comments=['hi'])" ) PK!7transformer/test_request.py# pylint: skip-file from unittest.mock import MagicMock import pytest from transformer.request import * class TestFromHarEntry: def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(KeyError): invalid_dict = {"some": "data"} Request.from_har_entry(invalid_dict) def test_it_returns_a_request_given_a_get_request(self): get_request = { "request": {"method": "GET", "url": ""}, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(get_request) assert isinstance(request, Request) assert request.method == HttpMethod.GET def test_it_returns_a_request_given_a_post_request(self): post_request = { "request": { "method": "POST", "url": "", "postData": "{'some name': 'some value'}", }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(post_request) assert isinstance(request, Request) assert request.method == HttpMethod.POST assert request.post_data == "{'some name': 'some value'}" def test_it_returns_a_request_given_a_put_request(self): put_request = { "request": { "method": "PUT", "url": "", "postData": "{'some name': 'some value'}", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(put_request) assert isinstance(request, Request) assert request.method == HttpMethod.PUT assert request.post_data == "{'some name': 'some value'}" assert request.query == [QueryPair(name="some name", value="some value")] def test_it_returns_a_request_with_headers_given_an_options_request(self): options_request = { "request": { "method": "OPTIONS", "url": "", "postData": "", "headers": [{"name": "Access-Control-Request-Method", "value": "POST"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(options_request) assert isinstance(request, Request) assert request.method == HttpMethod.OPTIONS assert request.headers == [ Header(name="Access-Control-Request-Method", value="POST") ] def test_it_returns_a_request_with_a_query_given_a_delete_request_with_a_query( self ): delete_request = { "request": { "method": "DELETE", "url": "", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(delete_request) assert isinstance(request, Request) assert request.method == HttpMethod.DELETE assert request.query == [QueryPair(name="some name", value="some value")] class TestAllFromHar: @pytest.mark.skip(reason="Doesn't raise AssertionError; to be investigated.") def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(AssertionError): invalid_dict = {"some": "data"} Request.all_from_har(invalid_dict) def test_it_returns_a_list_of_requests_given_a_valid_dict(self): valid_dict = { "log": { "entries": [ { "request": {"method": "GET", "url": "", "postData": ""}, "startedDateTime": "2018-01-01", } ] } } assert isinstance(Request.all_from_har(valid_dict), Iterator) for request in Request.all_from_har(valid_dict): assert isinstance(request, Request) class TestTaskName: def test_it_generates_a_task_name_given_a_request(self): a_request = MagicMock() a_request.method.name = "some_name" a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_task_name = Request.task_name(a_request) a_duplicate_task_name = Request.task_name(a_request) assert a_task_name == a_duplicate_task_name a_request.method.name = "some_other_name" a_different_task_name = Request.task_name(a_request) assert a_task_name != a_different_task_name PK!ՔAl))transformer/test_scenario.pyimport logging import re import string from pathlib import Path from typing import List from unittest.mock import MagicMock, patch import pytest from hypothesis import given from hypothesis.strategies import lists, text, recursive, tuples from transformer.helpers import DUMMY_HAR_STRING, _DUMMY_HAR_DICT from transformer.scenario import Scenario, SkippableScenarioError, WeightValueError from transformer.task import Task paths = recursive( text(string.printable.replace("/", ""), min_size=1, max_size=3).filter( lambda s: s != "." and s != ".." ), lambda x: tuples(x, x).map("/".join), max_leaves=4, ).map(Path) class TestScenario: @patch("transformer.scenario.Path.is_dir", MagicMock(return_value=False)) @patch("transformer.scenario.Path.iterdir", MagicMock(return_value=())) @patch("transformer.scenario.Path.open") @patch("transformer.scenario.json.load", MagicMock(return_value=_DUMMY_HAR_DICT)) @given(paths=lists(paths, unique=True, min_size=2)) def test_names_are_unique(*_, paths: List[Path]): scenario_names = [Scenario.from_path(path).name for path in paths] assert sorted(set(scenario_names)) == sorted(scenario_names) assert len(paths) == len(scenario_names) def test_creation_from_scenario_directory_with_weight_file(self, tmp_path: Path): root_path = tmp_path / "some-path" root_path.mkdir() expected_weight = 7 root_path.with_suffix(".weight").write_text(str(expected_weight)) nb_har_files = 2 for i in range(nb_har_files): root_path.joinpath(f"{i}.har").write_text(DUMMY_HAR_STRING) result = Scenario.from_path(root_path) assert len(result.children) == nb_har_files assert result.weight == expected_weight class TestFromPath: def test_on_har_raises_error_with_incorrect_har(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(not_har_path) def test_on_dir_ignores_some_incorrect_hars(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") har_path = tmp_path / "good.har" har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(tmp_path) assert len(scenario.children) == 1 assert scenario.children[0].origin == har_path def test_on_dir_raises_error_with_all_incorrect_hars(self, tmp_path: Path): for i in range(2): tmp_path.joinpath(f"{i}.nothar").write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) def test_on_dir_with_dangling_weights_raises_error( self, tmp_path: Path, caplog ): (tmp_path / "ok.har").write_text(DUMMY_HAR_STRING) (tmp_path / "fail.weight").write_text("7") caplog.set_level(logging.INFO) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "weight file" in caplog.text assert any( r.levelname == "ERROR" for r in caplog.records ), "at least one ERROR logged" def test_records_global_code_blocks_from_tasks(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) scenario = Scenario("scenario", [t1, t2, t3], origin=None) assert scenario.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_from_scenarios(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) s1 = Scenario("s1", [t1, t2], origin=None) s2 = Scenario("s2", [t3], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_uniquely(self): common_blocks = {"x": ["a", "b"]} t1 = Task( "t1", request=MagicMock(), global_code_blocks={**common_blocks, "t1b": ["uvw"]}, ) t2 = Task( "t2", request=MagicMock(), global_code_blocks={**common_blocks, "t2b": ["xyz"]}, ) s1 = Scenario("s1", [t1], origin=None) s2 = Scenario("s2", [t2], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == { **common_blocks, "t1b": ["uvw"], "t2b": ["xyz"], } def test_without_weight_file_has_weight_1(self, tmp_path: Path): har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 1 def test_with_weight_file_has_corresponding_weight(self, tmp_path: Path): weight_path = tmp_path / "test.weight" weight_path.write_text("74") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 74 @pytest.mark.parametrize("weight", [0, -2, 2.1, -2.1, "NaN", "abc", " "]) def test_with_invalid_weight_raises_error_and_never_skips( self, tmp_path: Path, weight ): legit_har_path = tmp_path / "legit.har" legit_har_path.write_text(DUMMY_HAR_STRING) bad_weight_path = tmp_path / "test.weight" bad_weight_path.write_text(str(weight)) bad_weight_har_path = tmp_path / "test.har" bad_weight_har_path.write_text(DUMMY_HAR_STRING) with pytest.raises(WeightValueError): # If from_path was skipping the bad scenario/weight pair, it # would not raise because there is another valid scenario, # legit.har. Scenario.from_path(tmp_path) def test_with_many_weight_files_selects_weight_based_on_name( self, tmp_path: Path ): expected_weight_path = tmp_path / "test.weight" expected_weight_path.write_text("7") first_wrong_weight_path = tmp_path / "a.weight" first_wrong_weight_path.write_text("2") second_wrong_weight_path = tmp_path / "1.weight" second_wrong_weight_path.write_text("4") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 7 def test_uses_full_path_for_scenario_name(self, tmp_path: Path): har_basename = "e3ee4a1ef0817cde0a0a78c056e7cb35" har_path = tmp_path / har_basename har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(har_path) words_in_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", scenario.name) } assert har_basename in words_in_scenario_name words_in_parent_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(tmp_path)) } words_in_scenario_name_not_from_har_basename = words_in_scenario_name - { har_basename } assert ( words_in_parent_path <= words_in_scenario_name_not_from_har_basename ), "all components of the parent path must be in the scenario name" def test_uses_full_path_for_parents_and_basename_for_children( self, tmp_path: Path ): root_basename = "615010a656a5bb29d1898f163619611f" root = tmp_path / root_basename root.mkdir() for i in range(2): (root / f"s{i}.har").write_text(DUMMY_HAR_STRING) root_scenario = Scenario.from_path(root) words_in_root_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", root_scenario.name) } words_in_root_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(root)) } assert ( words_in_root_path <= words_in_root_scenario_name ), "parent scenario's name must come from full path" assert len(root_scenario.children) == 2 child_scenario_names = {c.name for c in root_scenario.children} assert child_scenario_names == { "s0", "s1", }, "child scenarios have short names" def test_raises_error_for_colliding_scenario_names_from_har_files( self, tmp_path: Path, caplog ): (tmp_path / "good.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.json").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert "bad.har" in caplog.text assert "bad.json" in caplog.text def test_raises_error_for_colliding_scenario_names_from_directory_and_file( self, tmp_path: Path, caplog ): directory = tmp_path / "x" directory.mkdir() # directory needs to contain a HAR file, otherwise Transformer will # not consider it a scenario. (directory / "a.har").write_text(DUMMY_HAR_STRING) (tmp_path / "x.har").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert re.search(r"\bx\b", caplog.text) assert re.search(r"\bx.har\b", caplog.text) PK!걎Z(Z(transformer/test_task.py# pylint: skip-file import io import json from unittest.mock import MagicMock from unittest.mock import patch from urllib.parse import urlparse import pytest from transformer.request import Header from transformer.task import ( Task, Request, HttpMethod, QueryPair, TIMEOUT, LocustRequest, ) class TestFromRequests: def test_it_returns_a_task(self): request = MagicMock() request.timestamp = 1 second_request = MagicMock() second_request.timestamp = 2 assert all( isinstance(t, Task) for t in Task.from_requests([request, second_request]) ) @patch("builtins.open") def test_it_doesnt_create_a_task_if_the_url_is_on_the_blacklist(self, mock_open): mock_open.return_value = io.StringIO("amazon") request = MagicMock() request.url = MagicMock() request.url.netloc = "www.amazon.com" task = Task.from_requests([request]) assert len(list(task)) == 0 @patch("builtins.open") def test_it_creates_a_task_if_the_path_not_host_is_on_the_blacklist( self, mock_open ): mock_open.return_value = io.StringIO("search\namazon") request = MagicMock() request.url = urlparse("https://www.google.com/search?&q=amazon") task = Task.from_requests([request]) assert len(list(task)) == 1 class TestAsLocustAction: def test_it_returns_an_error_given_an_unsupported_http_method(self): a_request_with_an_unsupported_http_method = MagicMock() task = Task("some_task", a_request_with_an_unsupported_http_method) with pytest.raises(ValueError): task.as_locust_action() def test_it_returns_a_string(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) assert isinstance(task.as_locust_action(), str) def test_it_returns_action_from_locust_request(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET locust_request = LocustRequest( method=HttpMethod.GET, url=repr("http://locust-task"), headers={} ) task = Task("some_task", request=a_request, locust_request=locust_request) action = task.as_locust_action() assert action.startswith("response = self.client.get(url='http://locust-task'") def test_it_returns_task_using_get_given_a_get_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.get(") def test_it_returns_a_task_using_post_given_a_post_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {} task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.post(") def test_it_returns_a_task_using_put_given_a_put_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.PUT a_request.post_data = {"text": "{'some key': 'some value'}"} a_request.query = [QueryPair(name="some name", value="some value")] task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.put(") assert "params={'some name': 'some value'}" in action assert "data=b\"{'some key': 'some value'}\"" in action def test_it_returns_a_task_using_options_given_an_options_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.OPTIONS a_request.headers = [Header(name="Access-Control-Request-Method", value="POST")] task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.options(") assert "headers={'Access-Control-Request-Method': 'POST'" in action def test_it_returns_a_task_using_delete_given_a_delete_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.DELETE a_request.url = urlparse("http://www.some.web.site/?some_name=some_value") task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.delete(") assert "?some_name=some_value" in action def test_it_provides_timeout_to_requests(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) action = task.as_locust_action() assert f"timeout={TIMEOUT}" in action def test_it_injects_headers(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET a_request.headers = [Header(name="some_header", value="some_value")] task = Task("some_task", a_request) action = task.as_locust_action() assert "some_value" in action def test_it_encodes_data_in_task_for_text_mime(self): decoded_value = '{"formatted": "54,95 €"}' a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {"text": decoded_value} task = Task("some_task", a_request) action = task.as_locust_action() assert str(decoded_value.encode()) in action def test_it_encodes_data_in_task_for_json_mime(self): decoded_value = '{"formatted": "54,95 €"}' a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {"text": decoded_value, "mimeType": "application/json"} task = Task("some_task", a_request) action = task.as_locust_action() assert str(json.loads(decoded_value)) in action def test_it_converts_post_params_to_post_text(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = { "mimeType": "application/json", "params": [ {"name": "username", "value": "some user"}, {"name": "password", "value": "some password"}, ], } task = Task("some task", a_request) action = task.as_locust_action() assert "'username': 'some user'" in action assert "'password': 'some password'" in action def test_it_creates_a_locust_request_when_there_is_none(self): task = Task(name="some name", request=MagicMock()) modified_task = Task.inject_headers(task, {}) assert modified_task.locust_request def test_it_returns_a_task_with_the_injected_headers(self): locust_request = LocustRequest( method=MagicMock(), url=MagicMock(), headers={"x-forwarded-for": ""} ) task = Task( name="some name", request=MagicMock(), locust_request=locust_request ) expected_headers = {"x-forwarded-for": "1.2.3.4"} modified_task = Task.inject_headers(task, headers=expected_headers) assert isinstance(modified_task, Task) headers = modified_task.locust_request.headers assert len(headers) == 1 assert headers == expected_headers class TestIndentation: def test_pre_processing_returns_an_indented_string_given_an_indentation(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = (*task.locust_preprocessing, "def some_function():") task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=2) assert action.startswith(" def some_function():") def test_post_processing_returns_an_indented_string_given_an_indentation(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_post_processings = (*task.locust_postprocessing, "def some_function():") task = task._replace(locust_postprocessing=new_post_processings) action = task.as_locust_action(indentation=2) assert " def some_function():" in action def test_it_applies_indentation_to_all_pre_processings(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = ( *task.locust_preprocessing, "def some_function():", "def some_other_function():", ) task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=2) assert action.startswith( " def some_function():\n\n def some_other_function():" ) def test_it_respects_sub_indentation_levels(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = ( *task.locust_preprocessing, "\n def function():\n if True:\n print(True)", ) task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=1) assert action.startswith(" \n def function():\n if True:\n print(True)") class TestReplaceURL: def test_it_creates_a_locust_request_when_there_is_none(self): task = Task(name="some name", request=MagicMock()) modified_task = Task.replace_url(task, "") assert modified_task.locust_request def test_it_returns_a_task_with_the_replaced_url(self): locust_request = LocustRequest( method=MagicMock(), url=MagicMock(), headers=MagicMock() ) task = Task( name="some name", request=MagicMock(), locust_request=locust_request ) expected_url = 'f"http://a.b.c/{some.value}/"' modified_task = Task.replace_url(task, expected_url) assert modified_task.locust_request.url == expected_url PK!CDDI I transformer/test_transform.py# pylint: skip-file import io from pathlib import Path import pytest import transformer import transformer.transform as tt from transformer.helpers import DUMMY_HAR_STRING from transformer.locust import locustfile_lines from transformer.plugins import plugin, Contract class TestTransform: def test_it_returns_a_locustfile_program_given_scenario_path(self, tmp_path: Path): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) locustfile_contents = str(tt.transform(har_path)) try: compile(locustfile_contents, "locustfile.py", "exec") except Exception as exception: pytest.fail(f"Compiling locustfile failed. [{exception}].") def test_it_uses_default_plugins(self, tmp_path: Path, monkeypatch): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) times_plugin_called = 0 # We don't need to specify a plugin signature here because signatures # are only checked at plugin name resolution. def fake_plugin(tasks): nonlocal times_plugin_called times_plugin_called += 1 return tasks monkeypatch.setattr(tt, "DEFAULT_PLUGINS", [fake_plugin]) tt.transform(har_path, plugins=[]) # explicitly provide no plugins assert times_plugin_called == 1 def dump_as_str(*args, **kwargs): """ Wraps transformer.dump by passing it a StringIO buffer as file argument and returning the final contents of that buffer. This makes transformer.dump behave like transformer.dumps, and thus allows to test their output more easily. """ s = io.StringIO() transformer.dump(s, *args, **kwargs) return s.getvalue() class TestDumpAndDumps: @pytest.mark.parametrize("f", (transformer.dumps, dump_as_str)) def test_with_no_paths_it_returns_empty_locustfile(self, f): expected_empty_locustfile = "\n".join( locustfile_lines(scenarios=[], program_plugins=()) ) assert f([]) == expected_empty_locustfile def test_dump_and_dumps_have_same_output_for_simple_har(self, tmp_path): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) assert transformer.dumps([tmp_path]) == dump_as_str([tmp_path]) @pytest.mark.parametrize( "f,with_default,expected_times_called", ( (f, *case) for f in (transformer.dumps, dump_as_str) for case in ((True, 1), (False, 0)) ), ) def test_it_uses_default_plugins( self, tmp_path: Path, monkeypatch, f, with_default, expected_times_called ): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) times_plugin_called = 0 @plugin(Contract.OnScenario) def fake_plugin(t): nonlocal times_plugin_called times_plugin_called += 1 return t monkeypatch.setattr(tt, "DEFAULT_PLUGINS", [fake_plugin]) f([har_path], with_default_plugins=with_default) assert times_plugin_called == expected_times_called PK!Acctransformer/transform.py""" Entrypoint for Transformer when used as a library. """ import warnings from pathlib import Path from typing import Sequence, Union, Iterable, TextIO, Iterator, TypeVar import transformer.plugins as plug from transformer.locust import locustfile, locustfile_lines from transformer.plugins import sanitize_headers, Contract from transformer.plugins.contracts import Plugin from transformer.scenario import Scenario DEFAULT_PLUGINS = (sanitize_headers.plugin,) def transform( scenarios_path: Union[str, Path], plugins: Sequence[Plugin] = (), with_default_plugins: bool = True, ) -> str: """ This function is deprecated and will be removed in a future version. Do not rely on it. Reason: It only accepts one scenario path at a time, and requires plugins to be already resolved (and therefore that users use transformer.plugins.resolve, which is kind of low-level). Both dumps & dump lift these constraints and have a more familiar naming (see json.dump/s, etc.). Deprecated since: v1.0.2. """ warnings.warn(DeprecationWarning("transform: use dump or dumps instead")) if with_default_plugins: plugins = (*DEFAULT_PLUGINS, *plugins) return locustfile([Scenario.from_path(Path(scenarios_path), plugins)]) LaxPath = Union[str, Path] PluginName = str def dumps( scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName] = (), with_default_plugins: bool = True, ) -> str: """ Transforms the provided scenarios using the provided plugins, and returns the resulting locustfile code as a string. See also: dump :param scenario_paths: paths to scenario files (HAR) or directories :param plugins: names of plugins to use :param with_default_plugins: whether the default plugins should be used in addition to those provided (recommended: True) """ return "\n".join(_dump_as_lines(scenario_paths, plugins, with_default_plugins)) def dump( file: TextIO, scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName] = (), with_default_plugins: bool = True, ) -> None: """ Transforms the provided scenarios using the provided plugins, and writes the resulting locustfile code in the provided file. See also: dumps :param file: an object with a `writelines` method (as specified by io.TextIOBase), e.g. `sys.stdout` or the result of `open`. :param scenario_paths: paths to scenario files (HAR) or directories. :param plugins: names of plugins to use. :param with_default_plugins: whether the default plugins should be used in addition to those provided (recommended: True). """ file.writelines( intersperse("\n", _dump_as_lines(scenario_paths, plugins, with_default_plugins)) ) def _dump_as_lines( scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName], with_default_plugins: bool, ) -> Iterator[str]: plugins = [p for name in plugins for p in plug.resolve(name)] if with_default_plugins: plugins = (*DEFAULT_PLUGINS, *plugins) plugins_for = plug.group_by_contract(plugins) scenarios = [ Scenario.from_path( path, plugins_for[Contract.OnTask], plugins_for[Contract.OnTaskSequence] ).apply_plugins(plugins_for[Contract.OnScenario]) for path in scenario_paths ] yield from locustfile_lines(scenarios, plugins_for[Contract.OnPythonProgram]) T = TypeVar("T") def intersperse(delim: T, iterable: Iterable[T]) -> Iterator[T]: """ >>> list(intersperse(",", "a")) ['a'] >>> list(intersperse(",", "")) [] >>> list(intersperse(",", "abc")) ['a', ',', 'b', ',', 'c'] >>> list(intersperse(",", ["a", "b", "c"])) ['a', ',', 'b', ',', 'c'] """ it = iter(iterable) try: yield next(it) except StopIteration: return for x in it: yield delim yield x PK!Ha76A0har_transformer-1.0.2.dist-info/entry_points.txtN+I/N.,()*)J+N/M-Eb%dZAŧUgpqPK!d*++'har_transformer-1.0.2.dist-info/LICENSEMIT License Copyright (c) 2019 Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU%har_transformer-1.0.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H/WT- (har_transformer-1.0.2.dist-info/METADATAVr6S^0I$f.M ݙ2FQc[$e4}>Id6 H>534z?3ȇ TyF\/ʘ",'"˨Z)@$S>]ÂL3TD6O>y&ւ1R fY~$Mi QR#TЄp#kOOl] !1 ϓ^eEaB aԒs-9]5i(O58P{[0gQtQJ PCBJ%V,wD!S"Q4:\<)02dOo,qBb*$,a 3tzs"%[TȌNj$bYΔM~J;*X w pK0w+):E$:_H"M-udy\Ef;=L_Yd!3)4G׽%ӑҺt- j쿧X9!hʓ9kS%Utf\72OZ (,[6qŵq_dVQ|k?z=ZeT[i@w*hF ̒PɛrM8aGEc쇇 `0.''`pHiCa=@ % !TQNQ={ټ5]^LVX[Tqbہ;>!88X֢bnW1v#tFDܪTh*Ɠ ܖ.VbXe"bT-"3rO\SWv?l2W~\Sy:a|=smh3oƵcӆw@f#[255Pk$D v nͻFNkb6wlFʙ=h:CH}.q[吐ǐ%Nz Sb`O{t}{hA\U_b=$[UgR`%7S n nU:@5,ihq+y+dpE;K,~t1veԏx`P(A%Mq`7V:HhzpF y0rdħ!^y?qEx$s3b:x1̊k*q vB_po8@jV=k:u3~|X 2 DSoâ7;:q3X%66H yY"f}_VH)#Um\ a(۬CFJD]=Ɗ]6n_ 4̰^PLœLgc2X2&VW73>֙#Ã)AU%M푯ͻĥt 0KA<@?Djay**^Mx^:a B+&r@qikf~ ?G&|'bSSUV'e"ʵa7֣I 2"֌#t 1{iM9&Yt (PN'e ,]>:`pφ  8^;. Y#HMmsx2GБ ;93~7j}ADHTXí$bl7 ?!>2;? PKkԍlgG^gؐн ̟s v6ۀ-QE.`ʦhZy2vy7Yg- zS\KÂ؎I3~an7-Ac@|v7Ɗj]cxEd.L$fv0(~Wujh4sMQ̥Xpz߭޽KXx$K_rߓBRtA!߽Awԇ[!.cZ/Ib(bmY&x)LJ3h֧1J e}׌ lS˧L9:Nm ''pkSs/Cc*vC\@żoQ ^U%w[9UrA;C^0+E'?c.¤zᔋ>U3Qh9 kڞEiG$J\5MBm `'dLKx4vo!2-BqVIQ>transformer/.urlignore_examplePK!)Ԧztransformer/__init__.pyPK!WWMtransformer/__main__.pyPK!"nwwtransformer/blacklist.pyPK!'BB transformer/builders_decision.pyPK!?8 transformer/builders_python.pyPK!6W W $transformer/cli.pyPK!T/  1transformer/decision.pyPK!p=8transformer/helpers.pyPK!м{uu;transformer/locust.pyPK!@y3Otransformer/naming.pyPK!tStransformer/plugins/__init__.pyPK!߿ [Ttransformer/plugins/contracts.pyPK!v&Xktransformer/plugins/dummy.pyPK!.jmtransformer/plugins/resolve.pyPK!ތ'Svtransformer/plugins/sanitize_headers.mdPK!l'_ytransformer/plugins/sanitize_headers.pyPK!e e %|transformer/plugins/test_contracts.pyPK!j梺!Rtransformer/plugins/test_dummy.pyPK!zr #vtransformer/plugins/test_resolve.pyPK!<,transformer/plugins/test_sanitize_headers.pyPK!s=*U*Utransformer/python.pyPK!0O Etransformer/request.pyPK![A9+9+ltransformer/scenario.pyPK!{5)transformer/task.pyPK!M~CCJtransformer/test_blacklist.pyPK!؏.Ptransformer/test_cli.pyPK!]JK K Utransformer/test_decision.pyPK! كsvvOctransformer/test_helpers.pyPK!2hw dtransformer/test_locust.pyPK!ptransformer/test_naming.pyPK!3λ||ttransformer/test_python.pyPK!7transformer/test_request.pyPK!ՔAl))transformer/test_scenario.pyPK!걎Z(Z(-transformer/test_task.pyPK!CDDI I ;Vtransformer/test_transform.pyPK!Accbtransformer/transform.pyPK!Ha76A0Xrhar_transformer-1.0.2.dist-info/entry_points.txtPK!d*++'rhar_transformer-1.0.2.dist-info/LICENSEPK!HڽTU%Lwhar_transformer-1.0.2.dist-info/WHEELPK!H/WT- (whar_transformer-1.0.2.dist-info/METADATAPK!H[bq &}}har_transformer-1.0.2.dist-info/RECORDPK**O 2