PK!>>transformer/.urlignore_example360yield abmr.net addthis.com adfarm adform.net adition.com adkontekst.pl admatic.com admeira.ch adnxs.com adsrvr.org akstat atdmt.com bidswitch.net bing bluekai.com calotag.com casalemedia.com cloudfront.net connectad.io converge-digital.com creativefactory.zalando crwdcntrl.net demdex.net doubleclick.net effitarget.com email-reflex.com exelator.com experianmarketingservices.digital facebook google himediads.com ibillboard.com intelliad.de keyxel.com krxd.net liadm.com lkqd.net mathtag.com mediabong.com metrigo.com metrigo.zalan.do mookie1.com mpulse nscontext.eu nuggad opecloud.com openx.net pixel pubmatic.com pubmine.com reflex rtb-seller.com sara.media semasio.net sharethis.com socialaudience.nl static-img tapad.com usabilla vdopia.com videoplaza.tv vimeo.com vimeocdn.com vmg.host yieldlab.net ztat.net admatic.com PK!S^OOtransformer/__init__.py""" :mod:`transformer` -- Main API ============================================ This module exports the functions that should cover most use-cases of any Transformer user. """ import pkg_resources from .transform import dumps, dump __version__ = pkg_resources.get_distribution("har-transformer").version __all__ = ["dumps", "dump"] PK!WWtransformer/__main__.pyfrom .cli import script_entrypoint if __name__ == "__main__": script_entrypoint() PK!"nwwtransformer/blacklist.pyimport os import logging def on_blacklist(url): """ Checks for matching URLs in an ignore file (blacklist) from user's current directory. """ blacklist_file = f"{os.getcwd()}/.urlignore" try: with open(blacklist_file) as file: blacklist = [line.rstrip("\n") for line in file if len(line) > 1] for blacklist_item in blacklist: if blacklist_item in url: return True return False except OSError as err: logging.debug( "Could not read blacklist file %s. Reason: %s", blacklist_file, err ) return False PK!?8transformer/builders_python.py""" Hypothesis builders for property-based testing of the transformer.python module. """ import re import string from typing import Optional from hypothesis.searchstrategy import SearchStrategy from hypothesis.strategies import ( integers, text, lists, builds, deferred, one_of, recursive, just, none, booleans, floats, tuples, dictionaries, ) from transformer import python as py # Strategy for indentation levels we want to test with (just "no indentation" or # "one-level indentation" because "two-level indentation" will likely be the # same, and we don't want the tests running for too long). indent_levels = integers(min_value=0, max_value=1) def ascii_text(min_size: int = 0, max_size: Optional[int] = 5) -> SearchStrategy[str]: """ Strategy for ASCII strings, with a default max_size to avoid wasting time generating too much. """ return text(string.printable, min_size=min_size, max_size=max_size) _ascii_inline = re.sub(r"[\r\n\v\f]", "", string.printable) def ascii_inline_text( min_size: int = 0, max_size: Optional[int] = 5 ) -> SearchStrategy[str]: """Similar to ascii_text, but does not generate multiline strings.""" return text(_ascii_inline, min_size=min_size, max_size=max_size) # Strategy for identifiers, i.e. strings that can be used as symbols (function # names, etc.) in Python programs. # Unqualified identifiers cannot contain ".", qualified identifiers can # (but only between unqualified identifiers, i.e. not at the beginning or end). unqualified_identifiers = text(string.ascii_letters, min_size=1, max_size=5) qualified_identifiers = lists(unqualified_identifiers, min_size=2, max_size=3).map( ".".join ) identifiers = unqualified_identifiers | qualified_identifiers # Strategy for python.Line objects. lines = builds(py.Line, ascii_inline_text(), indent_levels) # Strategy for lists of strings to be used as comment text in tests. comments = lists(ascii_inline_text(), max_size=3) # Strategy for python.Statement objects. Basically, if you ask for a Statement, # you get an instance of one of Statement's subclasses. # All Statement subclasses should be mentioned here. # We need deferred to break the cyclic dependency between builds() that depend statements. statements = deferred( lambda: one_of( opaque_blocks, functions, decorations, classes, standalones, assignments, ifelses, imports, ) ) _atomic_blocks = text(string.ascii_letters + string.punctuation, min_size=1, max_size=3) _complex_blocks = recursive( _atomic_blocks, lambda b: text(string.whitespace, min_size=1, max_size=2).flatmap( lambda ws: tuples(b, b).map(ws.join) ), max_leaves=8, ) # Strategy for python.OpaqueBlock. We don't want whitespace-only comments # (should be ignored by the syntax tree, which requires boilerplate in tests) # so we build the text by joining non-whitespace strings together. # Since this strategy is the default in the statements strategy, which is widely # used in tests, it should be as fast as possible. opaque_blocks = builds(py.OpaqueBlock, block=_complex_blocks, comments=comments) # Strategy for python.Function objects. functions = builds( py.Function, name=unqualified_identifiers, params=lists(unqualified_identifiers, max_size=2), statements=lists(statements, max_size=2), comments=comments, ) # Strategy for python.Class objects. classes = builds( py.Class, name=unqualified_identifiers, statements=lists(statements, max_size=2), superclasses=lists(identifiers, max_size=2), comments=comments, ) # Strategy for python.Decoration objects. decorations = builds( py.Decoration, decorator=identifiers, target=one_of(functions, classes), comments=comments, ) # Strategy for python.Expression objects. Basically, if you ask for an Expression, # you get an instance of one of Expression's subclasses. # All Expression subclasses should be mentioned here. # Uses deferred for the same reason as Statement. expressions = deferred(lambda: one_of(symbols, literals, function_calls, binary_ops)) # Strategy for python.Standalone objects. standalones = builds(py.Standalone, expr=expressions, comments=comments) # Strategy for python.FString objects. The first examples are supposed to # trigger interpolation behavior to show that it doesn't happen with FString. fstrings = builds( py.FString, one_of(just(""), just("{a}"), text(min_size=1, max_size=5)) ) # Strategy for python.Literal objects. # The recursion doesn't use the set data type because python.Statement and # python.Expression objects are not hashable (because they are not immutable); # this is also why we use unqualified_identifiers as dictionary keys. literals = recursive( one_of(none(), booleans(), integers(), floats(), text(max_size=5)).map(py.Literal) | fstrings, lambda x: one_of( lists(x, max_size=2), tuples(x), dictionaries(unqualified_identifiers, x, max_size=2), ).map(py.Literal), max_leaves=8, ) # Strategy for python.Symbol objects. symbols = builds(py.Symbol, identifiers) # Strategy for python.FunctionCall objects. The size of argument collections is # limited for performance reasons and because handling 3 arguments is (hopefully) # the same as handling 2 arguments. function_calls = builds( py.FunctionCall, name=identifiers, positional_args=lists(expressions, max_size=2), named_args=dictionaries(unqualified_identifiers, expressions, max_size=2), ) # Strategy for reasonable operator names: "+", "++", "in", etc. operators = text(string.ascii_letters + string.punctuation, min_size=1, max_size=2) # Strategy for python.BinaryOp. binary_ops = builds(py.BinaryOp, lhs=expressions, op=operators, rhs=expressions) # Strategy for python.Assignment. assignments = builds(py.Assignment, lhs=identifiers, rhs=expressions, comments=comments) # Strategy for python.IfElse. The size of Statement sub-lists is limited for # performance reasons and because handling 3 statements is (hopefully) the same # as handling 2 statements. ifelses = builds( py.IfElse, condition_blocks=lists( tuples(expressions, lists(statements, max_size=2)), min_size=1, max_size=3 ), else_block=one_of(none(), lists(statements, max_size=2)), comments=comments, ) # Strategy for python.Import without an alias part. multi_imports = builds( py.Import, targets=lists(identifiers, min_size=1, max_size=2), source=one_of(none(), identifiers), ) # Strategy for python.Import with an alias part. aliased_imports = builds( py.Import, targets=tuples(identifiers), source=one_of(none(), identifiers), alias=unqualified_identifiers, ) # Strategy for python.Import. imports = multi_imports | aliased_imports PK!6W W transformer/cli.py""" Transformer: Convert web browser sessions (HAR files) into Locust load testing scenarios (locustfiles). Usage: transformer [-p ]... [...] transformer --help transformer --version Options: --help Print this help message and exit. -p, --plugin= Use the specified plugin. Repeatable. --version Print version information and exit. Documentation & code: https://github.com/zalando-incubator/transformer """ import logging import sys from pathlib import Path from typing import Sequence, cast, Tuple import ecological from docopt import docopt from transformer import __version__, dump class Config(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str, ...] = () def read_config(cli_args: Sequence[str]) -> Config: """ Combine command-line arguments & options (managed by docopt) with environment variables (managed by Ecological) into Ecological's Config class. Special cases: - If input paths are provided both from the environment and the command-line, only the paths provided from the command-line are taken into account. - If plugins are provided both from the environment and the command-line, the union of both groups is taken into account. """ arguments = docopt(__doc__, version=__version__, argv=cli_args) # TODO: remove this redundancy once Ecological can re-read the environment # at run-time while still having a compile-time definition (Config). # See https://github.com/jmcs/ecological/issues/20. class conf(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str] = () paths = arguments[""] if paths: if conf.input_paths: logging.warning("TRANSFORMER_INPUT_PATHS overwritten with CLI arguments") conf.input_paths = paths conf.input_paths = tuple(Path(p) for p in conf.input_paths) plugins = arguments["--plugin"] if plugins: if conf.plugins: logging.warning("TRANSFORMER_PLUGINS merged with CLI -p/--plugin options") conf.plugins = (*conf.plugins, *plugins) return cast(Config, conf) def script_entrypoint() -> None: """ Entrypoint for the "transformer" program (which reads arguments from the command-line and the environment). This is an alternative to using directly Scenario.from_path and locust.locustfile as a library API in another Python program. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s\t%(levelname)s\t%(message)s" ) config = read_config(cli_args=sys.argv[1:]) if not config.input_paths: logging.error("No input paths provided in environment nor command-line!") logging.info("Did you mean to provide env TRANSFORMER_INPUT_PATHS=[...]?") logging.info("Otherwise, here is the command-line manual:") print(__doc__, file=sys.stderr) exit(1) try: dump(file=sys.stdout, scenario_paths=config.input_paths, plugins=config.plugins) except ImportError as err: logging.error(f"Failed loading plugins: {err}") exit(2) except Exception: url = "https://github.com/zalando-incubator/Transformer/issues" logging.exception(f"Please help us fix this error by reporting it! {url}") exit(3) PK!p=transformer/helpers.pyimport json from typing import Iterable def zip_kv_pairs(pairs: Iterable) -> dict: return {pair.name: pair.value for pair in pairs} """ Use this with caution, as it is global and mutable! See also DUMMY_HAR_STRING. """ _DUMMY_HAR_DICT = { "log": { "entries": [ { "startedDateTime": "2018-01-01", "request": {"method": "GET", "url": "https://www.zalando.de"}, } ] } } DUMMY_HAR_STRING = json.dumps(_DUMMY_HAR_DICT) PK!м{uutransformer/locust.pyimport enum import warnings from typing import Sequence, List, Union, Iterator import transformer.plugins as plug import transformer.python as py from transformer.plugins.contracts import Plugin from transformer.scenario import Scenario from transformer.task import Task, Task2 LOCUST_MAX_WAIT_DELAY = 10 LOCUST_MIN_WAIT_DELAY = 0 LOCUSTFILE_COMMENT = """ File automatically generated by Transformer: https://github.bus.zalan.do/TIP/transformer """.strip() def _locust_task(task: Union[Task, Task2]) -> py.Function: """ Transforms a Task into the Python code expected by Locust. This function is private because it does not return a complete Locust task (the @task decorator is missing) and should therefore not be used for that purpose by unsuspecting users. """ if isinstance(task, Task): # TODO: remove when Task2 has replaced Task. # See https://github.com/zalando-incubator/Transformer/issues/11. task = Task2.from_task(task) return py.Function(name=task.name, params=["self"], statements=task.statements) class TaskSetType(enum.Enum): Set = "TaskSet" Sequence = "TaskSequence" def locust_taskset(scenario: Scenario) -> py.Class: """ Transforms a scenario (potentially containing other scenarios) into a Locust TaskSet definition. """ if any(isinstance(child, Task) for child in scenario.children): ts_type = TaskSetType.Sequence else: ts_type = TaskSetType.Set fields: List[py.Statement] = [] for i, child in enumerate(scenario.children, start=1): seq_decorator = f"seq_task({i})" if isinstance(child, (Task2, Task)): fields.append(py.Decoration(seq_decorator, _locust_task(child))) elif isinstance(child, Scenario): field = py.Decoration(f"task({child.weight})", locust_taskset(child)) if ts_type is TaskSetType.Sequence: field = py.Decoration(seq_decorator, field) fields.append(field) else: wrong_type = child.__class__.__qualname__ scenario_type = scenario.__class__.__qualname__ raise TypeError( f"unexpected type {wrong_type} in {scenario_type}.children: {child!r}" ) return py.Class(scenario.name, superclasses=[str(ts_type.value)], statements=fields) def locust_classes(scenarios: Sequence[Scenario]) -> List[py.Class]: """ Transforms scenarios into all Python classes needed by Locust (TaskSet and Locust classes). The only missing parts before a fully functional locustfile are: - integrating all necessary set-up/tear-down statements: - Python imports, - apply global plugins, - etc. - serializing everything via transformer.python. """ classes = [] for scenario in scenarios: taskset = locust_taskset(scenario) locust_class = py.Class( name=f"LocustFor{taskset.name}", superclasses=["HttpLocust"], statements=[ py.Assignment("task_set", py.Symbol(taskset.name)), py.Assignment("weight", py.Literal(scenario.weight)), py.Assignment("min_wait", py.Literal(LOCUST_MIN_WAIT_DELAY)), py.Assignment("max_wait", py.Literal(LOCUST_MAX_WAIT_DELAY)), ], ) classes.append(taskset) classes.append(locust_class) return classes def locust_program(scenarios: Sequence[Scenario]) -> py.Program: """ Converts a ScenarioGroup into a Locust File. """ global_code_blocks = { # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. block_name: py.OpaqueBlock("\n".join(block), comments=[block_name]) for scenario in scenarios for block_name, block in scenario.global_code_blocks.items() } return [ py.Import(["re"], comments=[LOCUSTFILE_COMMENT]), py.Import( ["HttpLocust", "TaskSequence", "TaskSet", "seq_task", "task"], source="locust", ), *locust_classes(scenarios), *global_code_blocks.values(), ] def locustfile_lines( scenarios: Sequence[Scenario], program_plugins: Sequence[Plugin] ) -> Iterator[str]: """ Converts the provided scenarios into a stream of Python statements and iterate on the resulting lines. """ program = plug.apply(program_plugins, locust_program(scenarios)) for stmt in program: for line in stmt.lines(): yield str(line) def locustfile(scenarios: Sequence[Scenario]) -> str: """ Simple wrapper around locustfile_lines joining all lines with "\n". This function is deprecated and will be removed in a future version. Do not rely on it. Reason: It does not provide significant value over locustfile_lines and has a less clear name and a less flexible API. It does not support new generation plugins contracts like OnPythonProgram. Deprecated since: v1.0.2. """ warnings.warn(DeprecationWarning("locustfile: use locustfile_lines instead")) return "\n".join(locustfile_lines(scenarios, ())) PK!@y3transformer/naming.pyimport re import zlib DIGIT_RX = re.compile(r"[0-9]") ENDS_WITH_ADLER32 = re.compile(r"_[0-9]+\Z") def to_identifier(string: str) -> str: """ Replace everything except letters, digits and underscore with underscores, allowing the resulting name to be used as identifier in a Python program. A checksum is added at the end (to avoid collisions) if at least one replacement is made, or if the input already ends like a checksum (otherwise, for any input X, we have: to_identifier(X) == to_identifier(to_identifier(X)) i.e. a collision). """ safe_name = re.sub(r"[^_a-z0-9]", "_", string, flags=re.IGNORECASE) if DIGIT_RX.match(safe_name): safe_name = f"_{safe_name}" if safe_name == string and not ENDS_WITH_ADLER32.search(string): return string unique_suffix: int = zlib.adler32(string.encode()) return f"{safe_name}_{unique_suffix}" PK!;VVtransformer/plugins/__init__.py""" :mod:`transformer.plugins` -- Plugin System =========================================== This module exposes the API needed to create your own Transformer plugins. """ from .resolve import resolve from .contracts import plugin, Contract, apply, group_by_contract __all__ = ["resolve", "plugin", "Contract", "apply", "group_by_contract"] PK!2:ww transformer/plugins/contracts.py""" :mod:`transformer.plugins.contracts` -- Contracts for Plugin Authors ==================================================================== This module defines the various :ref:`Transformer contracts ` and their helper functions. """ import enum from collections import defaultdict from typing import NewType, Iterable, TypeVar, List, DefaultDict class Contract(enum.Flag): """ Enumeration of all supported :ref:`plugin contracts `. Each specific contract defines a way for plugins to be used in Transformer. Any Python function may become a Transformer plugin by announcing that it implements at least one contract, using the :func:`@plugin ` decorator. """ OnTask = enum.auto() #: The :term:`OnTask` contract. OnScenario = enum.auto() #: The :term:`OnScenario` contract. OnPythonProgram = enum.auto() #: The :term:`OnPythonProgram` contract. # Historically Transformer has only one plugin contract, which transformed a # sequence of Task objects into another such sequence. Operating on a full list # of tasks (instead of task by task) offered more leeway: a plugin could e.g. # add a new task, or change only the first task. # However this OnTaskSequence model is too constraining for some use-cases, # e.g. when a plugin needs to inject code in the global scope, and having to # deal with a full, immutable list of tasks in plugins that independently # operate on each task implies a lot of verbosity and redundancy. # For these reasons, other plugin contracts were created to offer a more # varied choice for plugin implementers. # See https://github.com/zalando-incubator/Transformer/issues/10. OnTaskSequence = enum.auto() #: Deprecated. Plugin = NewType("Plugin", callable) class InvalidContractError(ValueError): """ Raised for plugin functions associated with invalid contracts. What an "invalid contract" represents is not strictly specified, but this includes at least objects that are not members of the Contract enumeration. """ class InvalidPluginError(ValueError): """ Raised when trying to use as plugin a function that has not been marked as such. """ def plugin(c: Contract): """ Documented in dev.rst. """ if not isinstance(c, Contract): suggestions = (f"@plugin(Contract.{x.name})" for x in Contract) raise InvalidContractError( f"{c!r} is not a {Contract.__qualname__}. " f"Did you mean {', '.join(suggestions)}?" ) def _decorate(f: callable) -> callable: f._transformer_plugin_contract = c return f return _decorate def contract(f: Plugin) -> Contract: """ Returns the contract associated to a plugin function. :raise InvalidPluginError: if f is not a plugin. """ try: return getattr(f, "_transformer_plugin_contract") except AttributeError: raise InvalidPluginError(f) from None _T = TypeVar("_T") def apply(plugins: Iterable[Plugin], init: _T) -> _T: """ Applies each plugin to init in order, and returns the result. This just wraps a very simple but common operation. """ for p in plugins: init = p(init) return init _BASE_CONTRACTS = ( Contract.OnTask, Contract.OnTaskSequence, Contract.OnScenario, Contract.OnPythonProgram, ) def group_by_contract(plugins: Iterable[Plugin]) -> DefaultDict[Contract, List[Plugin]]: """ Groups plugins in lists according to their contracts. Each plugin is found in as many lists as it implements base contracts. Lists keep the order of the original plugins iterable. """ res = defaultdict(list) for p in plugins: c = contract(p) for bc in _BASE_CONTRACTS: if c & bc: # Contract is an enum.Flag: & computes the intersection. res[bc].append(p) return res PK!v&transformer/plugins/dummy.pyimport logging from typing import cast from transformer.plugins import plugin, Contract from transformer.request import Request from transformer.scenario import Scenario from transformer.task import Task @plugin(Contract.OnScenario) def f(s: Scenario) -> Scenario: first_req = first(s) logging.info(f"The first request was {first_req.url.geturl()}") return s def first(s: Scenario) -> Request: while isinstance(s, Scenario): s = s.children[0] return cast(Task, s).request PK!.jtransformer/plugins/resolve.pyimport importlib import inspect import logging from types import ModuleType from typing import Iterator from transformer.plugins.contracts import ( Plugin, Contract, InvalidContractError, contract, InvalidPluginError, ) def resolve(name: str) -> Iterator[Plugin]: """ Transform a plugin name into the corresponding, actual plugins. The name of a plugin is the name of a Python module containing (at least) one function decorated with @plugin (from the contracts module). The "resolve" function loads that module and returns these plugin functions found inside the module. :raise ImportError: if name does not match an accessible module. :raise TypeError: from load_load_plugins_from_module. :raise InvalidContractError: from load_load_plugins_from_module. :raise NoPluginError: from load_load_plugins_from_module. """ module = importlib.import_module(name) yield from load_plugins_from_module(module) class NoPluginError(ValueError): """ Raised for Python modules that should but don't contain any plugin function. """ def load_plugins_from_module(module: ModuleType) -> Iterator[Plugin]: """ :param module: Python module from which to load plugin functions. :raise TypeError: if module is not a Python module. :raise InvalidContractError: if a function is associated to an invalid contract. :raise NoPluginError: if module doesn't contain at least one plugin function. """ if not inspect.ismodule(module): raise TypeError(f"expected a module, got {module!r}") nb_plugins = 0 for _, obj in inspect.getmembers(module, inspect.isfunction): try: c = contract(obj) except InvalidPluginError: logging.debug(f"ignoring {_n(obj)}: not decorated with @plugin") continue if not isinstance(c, Contract): msg = f"{_n(obj)} associated to an invalid contract {c!r}" raise InvalidContractError(msg) nb_plugins += 1 yield obj if nb_plugins < 1: raise NoPluginError(module) def _n(x) -> str: return getattr(x, "__qualname__", None) or repr(x) PK!ތ'transformer/plugins/sanitize_headers.md# Sanitizing headers The [`sanitize_headers` plugin](sanitize_headers.py) should be used for processing scenarios generated in the Chrome browser, but is also advised to use it whenever cookies handling is important. The plugin removes Chrome-specific, RFC-non-compliant headers starting with `:`. Examples of such headers: ``` :authority: chrome.google.com :method: POST :path: /reviews/json/search :scheme: https ``` Additionally, the plugin: - converts header names to lowercase, which simplifies further header overriding, - ignores the `cookie` header, as cookies are handled by [Locust's _HttpSession_][http-session]. [http-session]: https://docs.locust.io/en/stable/api.html#httpsession-class PK!ÿss'transformer/plugins/sanitize_headers.pyfrom transformer.plugins import plugin, Contract from transformer.request import Header from transformer.task import Task2 @plugin(Contract.OnTask) def plugin(task: Task2) -> Task2: """ Removes Chrome-specific, RFC-non-compliant headers starting with `:`. Converts header names to lowercase to simplify further overriding. Removes the cookie header as it is handled by Locust's HttpSession. """ task.request.headers = [ Header(name=h.name.lower(), value=h.value) for h in task.request.headers if not h.name.startswith(":") and h.name.lower() != "cookie" ] return task PK!e e %transformer/plugins/test_contracts.pyimport pytest from hypothesis import given from hypothesis.strategies import from_type from transformer.plugins import apply, group_by_contract from .contracts import ( Contract, plugin, contract, InvalidContractError, InvalidPluginError, ) @given(from_type(Contract)) def test_contract_returns_contract_associated_with_plugin_decorator(c: Contract): @plugin(c) def foo(): ... assert contract(foo) is c def test_plugin_decorator_raises_with_invalid_contract(): with pytest.raises(InvalidContractError): @plugin(2) def foo(): ... def test_plugin_decorator_raises_without_contract(): with pytest.raises(InvalidContractError): @plugin def foo(): ... def test_contract_raises_with_invalid_plugin(): def foo(): ... with pytest.raises(InvalidPluginError): contract(foo) def test_plugin_is_exported_by_the_transformer_plugins_module(): try: from transformer.plugins import plugin except ImportError: pytest.fail("plugin should be exported by transformer.plugins") def test_Contract_is_exported_by_the_transformer_plugins_module(): try: from transformer.plugins import Contract except ImportError: pytest.fail("Contract should be exported by transformer.plugins") class TestApply: def test_return_init_unchanged_without_plugins(self): x = object() assert apply([], x) is x def test_return_plugin_result(self): @plugin(Contract.OnTask) def plugin_a(x: str) -> str: return x + "a" assert apply([plugin_a], "z") == "za" def test_runs_plugins_in_succession_on_input(self): @plugin(Contract.OnTask) def plugin_a(x: str) -> str: return x + "a" @plugin(Contract.OnTask) def plugin_b(x: str) -> str: return x + "b" assert apply((plugin_a, plugin_b), "") == "ab" assert apply((plugin_b, plugin_a, plugin_b), "") == "bab" class TestGroupByContract: def test_return_empty_dict_when_no_plugins(self): assert group_by_contract([]) == {} def test_index_plugins_with_simple_contracts_by_their_contract(self): @plugin(Contract.OnTask) def plugin_a(): pass @plugin(Contract.OnTask) def plugin_b(): pass @plugin(Contract.OnScenario) def plugin_z(): pass assert group_by_contract((plugin_a, plugin_b, plugin_z)) == { Contract.OnScenario: [plugin_z], Contract.OnTask: [plugin_a, plugin_b], } def test_index_plugins_with_complex_contracts_by_their_basic_contracts(self): @plugin(Contract.OnTask) def plugin_task(): pass @plugin(Contract.OnTask | Contract.OnScenario | Contract.OnPythonProgram) def plugin_multi(): pass assert group_by_contract((plugin_task, plugin_multi)) == { Contract.OnTask: [plugin_task, plugin_multi], Contract.OnScenario: [plugin_multi], Contract.OnPythonProgram: [plugin_multi], } PK!j梺!transformer/plugins/test_dummy.pyimport logging import os from pathlib import Path import transformer from transformer.helpers import DUMMY_HAR_STRING def test_dummy_plugin_works(tmp_path: Path, caplog): har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) caplog.set_level(logging.INFO) with open(os.path.devnull, "w") as f: transformer.dump(f, [har_path], plugins=["transformer.plugins.dummy"]) assert "The first request was https://www.zalando.de" in caplog.text PK!zr #transformer/plugins/test_resolve.pyimport logging import random import sys import uuid from pathlib import Path from types import ModuleType import pytest from hypothesis import given from hypothesis._strategies import permutations from transformer.plugins.contracts import plugin, Contract from .resolve import load_plugins_from_module, resolve, NoPluginError @pytest.fixture() def module_root(tmp_path: Path, monkeypatch) -> Path: monkeypatch.setattr(sys, "path", [str(tmp_path), *sys.path]) return tmp_path class TestResolve: def test_raises_for_module_not_found(self): modname = f"that_module_does_not_exist.{uuid.uuid4().hex}" with pytest.raises(ImportError): list(resolve(modname)) # must force evaluation of the generator def test_calls_load_plugins_from_module_with_module(self, module_root: Path): modname = "ab.cd.ef" modpath = Path(*modname.split(".")).with_suffix(".py") Path(module_root, modpath.parent).mkdir(parents=True) with Path(module_root, modpath).open("w") as f: f.write("from transformer.plugins.contracts import plugin, Contract\n") f.write("@plugin(Contract.OnTask)\n") f.write("def f(t):\n") f.write(" ...\n") f.write("def helper(t):\n") f.write(" ...\n") plugins = list(resolve(modname)) assert len(plugins) == 1 f = plugins[0] assert callable(f) assert f.__name__ == "f" def test_resolve_is_exported_by_the_transformer_plugins_module(self): try: from transformer.plugins import resolve except ImportError: pytest.fail("resolve should be exported by transformer.plugins") @pytest.fixture() def module() -> ModuleType: """Creates and returns an empty module.""" return ModuleType(f"fake_{random.randint(0, 99999999)}") class TestLoadPluginsFromModule: def test_raises_error_for_non_module(self): class A: pass with pytest.raises(TypeError): # Iterators are lazy, we need list() list(load_plugins_from_module(A)) def not_a_plugin(_): ... def plugin_not_a_plugin_either(_): ... @plugin(Contract.OnTask) def plugin_valid(_): ... @given(permutations((not_a_plugin, plugin_not_a_plugin_either, plugin_valid))) def test_ignores_non_plugin_stuff_in_module(self, module, caplog, functions): for f in functions: module.__dict__[f.__name__] = f caplog.clear() caplog.set_level(logging.DEBUG) plugins = list(load_plugins_from_module(module)) plugin_valid = next(f for f in functions if f.__name__ == "plugin_valid") assert plugins == [plugin_valid] non_plugin_functions = {f for f in functions if f is not plugin_valid} print(f">>> log messages: {caplog.messages}") for f in non_plugin_functions: assert any( f.__name__ in msg for msg in caplog.messages ), "ignored function names should be logged" def test_raises_for_modules_without_any_plugin(self, module): with pytest.raises(NoPluginError, match=module.__name__): # must force evaluation of the generator list(load_plugins_from_module(module)) PK!0sII,transformer/plugins/test_sanitize_headers.pyfrom datetime import datetime from urllib.parse import urlparse from transformer.request import HttpMethod, Header, Request from transformer.task import Task2 from .sanitize_headers import plugin def test_its_name_is_resolvable(): from transformer.plugins import resolve assert list(resolve("transformer.plugins.sanitize_headers")) == [plugin] TS = datetime(1970, 1, 1) def task_with_header(name: str, value: str) -> Task2: return Task2( name="some task", request=Request( timestamp=TS, method=HttpMethod.GET, url=urlparse("https://example.com"), name="task_name", headers=[Header(name=name, value=value)], post_data={}, query=[], ), ) def test_it_removes_headers_beginning_with_a_colon(): task = task_with_header(":non-rfc-header", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 0 def test_it_downcases_header_names(): task = task_with_header("Some Name", "some value") sanitized_headers = plugin(task).request.headers header_names = {h.name for h in sanitized_headers} assert "some name" in header_names def test_it_removes_cookies(): task = task_with_header("Cookie", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 0 def test_it_does_not_change_nor_remove_other_headers(): task = task_with_header("some other header", "some value") sanitized_headers = plugin(task).request.headers assert len(sanitized_headers) == 1 PK!e,pptransformer/python.py""" :mod:`transformer.python` -- Python Syntax Tree =============================================== Transformer's Python Syntax Tree framework allows you to create and manipulate Python source code without bothering with irrelevant, style-related details. It is the main API for writing :term:`OnPythonProgram` plugins. A non-goal of this framework is *customization of style*: users should rely on an external tool (such as `black`_) if they need style customization of their generated locustfile. .. _black: https://github.com/ambv/black """ import re from types import MappingProxyType from typing import ( Sequence, Mapping, Any, List, Type, Set, Optional, Tuple, cast, Iterable, Callable, TypeVar, ClassVar, ) from dataclasses import dataclass IMMUTABLE_EMPTY_DICT = MappingProxyType({}) @dataclass class Line: """ A line of text and its associated indentation level. This class allows not to constantly copy strings to add a new indentation level at every scope of the syntax tree. .. attribute:: text :any:`str` -- Text contained by this line. .. attribute:: indent_level :any:`int` -- Indentation level of :attr:`text` in the line. """ text: str indent_level: int = 0 INDENT_UNIT: ClassVar[str] = " " * 4 def __str__(self) -> str: """ Textual representation of this line, with :attr:`text` indented according to :attr:`indent_level`. """ return f"{self.INDENT_UNIT * self.indent_level}{self.text}" def clone(self) -> "Line": """ Creates an exact but disconnected copy of self. Useful in tests. """ return type(self)(text=self.text, indent_level=self.indent_level) def _resplit(parts: Iterable[str]) -> List[str]: """ Given a list of strings, returns a list of lines, by splitting each string into multiple lines where it contains newlines. >>> _resplit([]) [] >>> _resplit(['a', 'b']) ['a', 'b'] >>> _resplit(['a', 'b\\nc\\nd']) ['a', 'b', 'c', 'd'] """ return [line for part in parts for line in part.splitlines()] class Statement: """ Python distinguishes between statements and expressions: basically, statements cannot be assigned to a variable, whereas expressions can. For our purpose, another distinction is important: statements may span over multiple lines (and not just for style), whereas all expressions can be expressed in a single line. This class serves as abstract base for all implementors of :meth:`lines` and handles comment processing for them. """ def __init__(self, comments: Sequence[str] = ()) -> None: """ :param comments: Comment lines attached to this statement. """ self._comments = _resplit(comments) @property def comments(self) -> List[str]: """ Comment lines attached to this statement. This is a :class:`property` to ensure that modifications of this list preserve the invariant "one element = one line". """ self._comments = _resplit(self._comments) return self._comments @comments.setter def comments(self, value: List[str]): self._comments = value def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ All Line objects necessary to represent this Statement, along with the appropriate indentation level. :param indent_level: How much indentation to apply to the least indented line of this statement. :param comments: Whether existing comments attached to *self* should be included in the result. """ raise NotImplementedError def comment_lines(self, indent_level: int) -> List[Line]: """ Converts self.comments from str to Line with ``#`` prefixes. """ return [Line(f"# {s}", indent_level) for s in self.comments] def attach_comment(self, line: Line) -> List[Line]: """ Attach a comment to *line*: inline if *self.comments* is just one line, on dedicated new lines above otherwise. """ comments = self.comments if not comments: return [line] if len(comments) == 1: line.text += f" # {comments[0]}" return [line] lines = self.comment_lines(line.indent_level) lines.append(line) return lines def __eq__(self, o: object) -> bool: return ( isinstance(o, self.__class__) and self.comments == cast(__class__, o).comments ) # Handy alias for type signatures. Program = Sequence[Statement] class OpaqueBlock(Statement): """ A block of code already represented as a string. This helps moving existing code (e.g. in plugins) from our ad-hoc "blocks of code" framework to the syntax tree framework defined in this module. It also allows to express Python constructs that would otherwise not yet be representable with this AST framework. """ PREFIX_RX = re.compile(r"\s+") TAB_SIZE = 8 def __init__(self, block: str, comments: Sequence[str] = ()) -> None: """ :param block: String representing a block of Python code. """ super().__init__(comments) if not block.strip(): raise ValueError(f"OpaqueBlock can't be empty but got {block!r}") self.block = block def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: raw_lines = [l.expandtabs(self.TAB_SIZE) for l in self.block.splitlines()] first_nonempty_line = next(i for i, l in enumerate(raw_lines) if l.strip()) after_last_nonempty_line = next( len(raw_lines) - i for i, l in enumerate(reversed(raw_lines)) if l.strip() ) raw_lines = raw_lines[first_nonempty_line:after_last_nonempty_line] indents = [self.PREFIX_RX.match(l) for l in raw_lines] shortest_indent = min(len(p.group()) if p else 0 for p in indents) block_lines = [Line(l[shortest_indent:], indent_level) for l in raw_lines] if comments: return [*self.comment_lines(indent_level), *block_lines] return block_lines def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.block, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.block == cast(__class__, o).block class Function(Statement): """ A function definition (``def ...``). """ def __init__( self, name: str, params: Sequence[str], statements: Sequence[Statement], comments: Sequence[str] = (), ) -> None: """ :param name: Name of this function. :param params: Names of each parameter of this function. :param statements: Body of this function. """ super().__init__(comments) self.name = name self.params = list(params) self.statements = list(statements) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: param_list = ", ".join(self.params) body_lines = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"def {self.name}({param_list}):", indent_level) if comments: return [*self.attach_comment(top), *body_lines] return [top, *body_lines] def __repr__(self) -> str: return "{}(name={!r}, params={!r}, statements={!r}, comments={!r})".format( self.__class__.__qualname__, self.name, self.params, self.statements, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.params == cast(__class__, o).params and self.statements == cast(__class__, o).statements ) class Decoration(Statement): """ A function or class definition to which is applied a decorator (e.g. ``@task``). """ def __init__( self, decorator: str, target: Statement, comments: Sequence[str] = () ) -> None: """ :param decorator: Name of the decorator applied to *target*. :param target: Function or class definition to which is applied *decorator*. """ super().__init__(comments) self.decorator = decorator self.target = target def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: top = Line(f"@{self.decorator}", indent_level) target_lines = self.target.lines(indent_level, comments) if comments: return [*self.attach_comment(top), *target_lines] return [top, *target_lines] def __repr__(self) -> str: return "{}({!r}, {!r}, comments={!r})".format( self.__class__.__qualname__, self.decorator, self.target, self.comments ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.decorator == cast(__class__, o).decorator and self.target == cast(__class__, o).target ) class Class(Statement): """ A class definition. """ def __init__( self, name: str, statements: Sequence[Statement], superclasses: Sequence[str] = (), comments: Sequence[str] = (), ) -> None: """ :param name: Name of this class. :param statements: Fields of this class: methods, attributes, etc. :param superclasses: Names of each superclass of this class. In fact anything in the "function argument" format can be used here, like keyword-arguments (but in a string!). """ super().__init__(comments) self.name = name self.statements = list(statements) self.superclasses = list(superclasses) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: superclasses = "" if self.superclasses: superclasses = "({})".format(", ".join(self.superclasses)) body = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"class {self.name}{superclasses}:", indent_level) if comments: return [*self.attach_comment(top), *body] return [top, *body] def __repr__(self) -> str: return ( "{}(name={!r}, statements={!r}, " "superclasses={!r}, comments={!r})" ).format( self.__class__.__qualname__, self.name, self.statements, self.superclasses, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.statements == cast(__class__, o).statements and self.superclasses == cast(__class__, o).superclasses ) class Expression: """ See the documentation of :class:`Statement` for why Expression is a separate class. An expression is still a statement in Python (e.g. functions can be called anywhere), but this :class:`Expression` class is **not** a :class:`Statement` because we can't attach comments to arbitrary expressions (e.g. between braces). If you need to use an :class:`Expression` as a :class:`Statement`, see the :class:`Standalone` wrapper class. This class serves as abstract base for all our implementors of :meth:`__str__`. """ def __str__(self) -> str: raise NotImplementedError def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) class Standalone(Statement): """ Wraps an :class:`Expression` so that it can be used as a :class:`Statement`. """ def __init__(self, expr: Expression, comments: Sequence[str] = ()) -> None: """ :param expr: The wrapped expression. """ super().__init__(comments) self.expr = expr def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ An :class:`Expression` E used as a :class:`Statement` is serialized as the result of :samp:`str({E})` on its own :class:`Line`. """ line = Line(str(self.expr), indent_level) if comments: return self.attach_comment(line) return [line] def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.expr, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.expr == cast(__class__, o).expr def _all_subclasses_of(cls: Type) -> Set[Type]: """ All subclasses of *cls*, including non-direct ones (child of child of ...). """ direct_subclasses = set(cls.__subclasses__()) return direct_subclasses.union( s for d in direct_subclasses for s in _all_subclasses_of(d) ) class Literal(Expression): """ All literal Python expressions (integers, strings, lists, etc.). Everything will be serialized using :func:`repr`, except :class:`Expression` objects that could be contained in a composite value like ``list``: they will be serialized with :func:`str`, as is probably expected. Thus: >>> str(Literal([1, {"a": FString("-{x}")}])) "[1, {'a': f'-{x}'}]" instead of something like ``[1, {'a': FString('-{x}')}]``. .. seealso:: :class:`FString` """ def __init__(self, value: Any) -> None: """ :param value: The Python literal represented by this node. """ super().__init__() self.value = value _REPR_BY_EXPR_CLS = None def __str__(self) -> str: # This is not pretty, but repr() doesn't accept a visitor we could use # to say "just this time, use that code to serialize Expression objects". if Literal._REPR_BY_EXPR_CLS is None: Literal._REPR_BY_EXPR_CLS = { c: c.__repr__ for c in _all_subclasses_of(Expression) } try: for k in Literal._REPR_BY_EXPR_CLS.keys(): k.__repr__ = k.__str__ return repr(self.value) finally: for k, _repr in Literal._REPR_BY_EXPR_CLS.items(): k.__repr__ = _repr def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.value!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.value == cast(__class__, o).value class FString(Literal): """ f-strings_ are strings that capture values from their environment. They cannot be handled in :class:`Literal` because they are a "trick" of the Python parser: *before* the program runs, they lose their ``f`` prefix and their template is evaluated, so when :class:`Literal` is instantiated, they are only a normal string that tried to capture values from Transformer's context (instead of *the locustfile's* context). .. _f-strings: https://docs.python.org/3/whatsnew/3.6.html#whatsnew36-pep498 """ def __init__(self, s: str) -> None: """ :param s: The template of this f-string, for example ``a{x}b`` for the f-string ``f"a{x}b"``. """ if not isinstance(s, str): raise TypeError( f"expecting a format string, got {s.__class__.__qualname__}: {s!r}" ) super().__init__(s) def __str__(self) -> str: return "f" + repr(str(self.value)) class Symbol(Expression): """ The name of something (variable, function, etc.). Avoids any kind of string quoting and escaping that would happen with :class:`Literal`. >>> str(Literal("x")) "'x'" >>> str(Symbol("x")) 'x' The provided argument's type is explicitly checked and a :class:`TypeError` may be raised to avoid confusion when a user expects e.g. ``Symbol(True)`` to work like ``Symbol("True")``. """ def __init__(self, name: str) -> None: """ :param name: Textual representation of this symbol. Will be forwarded without modification to the locustfile. """ super().__init__() if not isinstance(name, str): raise TypeError( f"expected symbol name, got {name.__class__.__qualname__}: {name!r}" ) self.name = name def __str__(self) -> str: return self.name def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.name!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.name == cast(__class__, o).name class FunctionCall(Expression): """ The invocation of a function or method. """ def __init__( self, name: str, positional_args: Sequence[Expression] = (), named_args: Mapping[str, Expression] = IMMUTABLE_EMPTY_DICT, ) -> None: """ :param name: Name of the function that is called. :param positional_args: Positional arguments associated with this call, if any. :param named_args: Keyword-arguments associated with this call, if any. """ super().__init__() self.name = name self.positional_args = list(positional_args) self.named_args = dict(named_args) def __str__(self) -> str: args = [str(a) for a in self.positional_args] + [ f"{k}={v}" for k, v in self.named_args.items() ] return f"{self.name}({', '.join(args)})" def __repr__(self) -> str: return "{}({!r}, {!r}, {!r})".format( self.__class__.__qualname__, self.name, self.positional_args, self.named_args, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.positional_args == cast(__class__, o).positional_args and self.named_args == cast(__class__, o).named_args ) class BinaryOp(Expression): """ The invocation of a binary operator. To avoid any precedence error in the generated code, operands that are also BinaryOps are always surrounded by braces (even when not necessary, as in "1 + (2 + 3)", as a more subtle behavior would increase the complexity of the implementation without much benefit. """ def __init__(self, lhs: Expression, op: str, rhs: Expression) -> None: """ :param lhs: Left-hand side operand of this operation. :param op: Name of the operator, like ``+``. :param rhs: Right-hand side operand of this operation. """ super().__init__() self.lhs = lhs self.op = op self.rhs = rhs def __str__(self) -> str: operands = [self.lhs, self.rhs] return f" {self.op} ".join( f"({x})" if isinstance(x, BinaryOp) else str(x) for x in operands ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.op == cast(__class__, o).op and self.rhs == cast(__class__, o).rhs ) class Assignment(Statement): """ The assignment of a value to a variable. For our purposes, we don't treat multiple assignment via tuples differently. We also don't support chained assignments such as ``a = b = 1``. """ def __init__(self, lhs: str, rhs: Expression, comments: Sequence[str] = ()) -> None: """ :param lhs: Variable name (or names) the *rhs* is assigned to. :param rhs: Expression which value is assigned to *lhs*. """ super().__init__(comments) self.lhs = lhs self.rhs = rhs def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: line = Line(f"{self.lhs} = {self.rhs}", indent_level) if comments: return self.attach_comment(line) return [line] def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.rhs == cast(__class__, o).rhs ) def __repr__(self) -> str: return "{}(lhs={!r}, rhs={!r}, comments={!r})".format( self.__class__.__qualname__, self.lhs, self.rhs, self.comments ) class IfElse(Statement): """ The if/elif/else construct, where elif and else are optional and elif can be repeated. """ def __init__( self, condition_blocks: Sequence[Tuple[Expression, Sequence[Statement]]], else_block: Optional[Sequence[Statement]] = None, comments: Sequence[str] = (), ) -> None: """ :param condition_blocks: Pairs of condition and statements. Each pair is composed of an expression representing a condition, and a list of statements corresponding to that condition. This represents an if/elif/.../elif sequence, where there is always an "if" clause and an arbitrary number of "elif" clauses. :param else_block: Statements representing the "else" clause, if any. :raise ValueError: If there is not at least one element in *condition_blocks*. """ super().__init__(comments) self.condition_blocks = [ (cond, list(stmts)) for cond, stmts in condition_blocks ] self._assert_consistency() self.else_block = else_block def _assert_consistency(self): if not self.condition_blocks: raise ValueError("can't have an if without at least one block") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() lines = [] for i, block in enumerate(self.condition_blocks): keyword = "if" if i == 0 else "elif" lines.append(Line(f"{keyword} {block[0]}:", indent_level)) lines.extend( [ line for stmt in block[1] for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] ) if self.else_block: lines.append(Line("else:", indent_level)) lines.extend( [ line for stmt in self.else_block for line in stmt.lines(indent_level + 1, comments) ] ) if comments: # There is always a first line, or _assert_consistency would fail. return [*self.attach_comment(lines[0]), *lines[1:]] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.condition_blocks == cast(__class__, o).condition_blocks and self.else_block == cast(__class__, o).else_block ) def __repr__(self) -> str: return "{}(condition_blocks={!r}, else_block={!r}, comments={!r})".format( self.__class__.__qualname__, self.condition_blocks, self.else_block, self.comments, ) class Import(Statement): """ The import statement in all its forms: ``import X``, ``import X as A``, ``from M import X``, ``from M import X as A``, and ``from M import X, Y``. Combined imports like ``from M import X, Y`` are split for simplicity. """ def __init__( self, targets: Sequence[str], source: Optional[str] = None, alias: Optional[str] = None, comments: Sequence[str] = (), ) -> None: """ :param targets: What is imported: *X* in :samp:`import {X}` and :samp:`from M import {X}`. :param source: From where *targets* are imported, if applicable: *M* in :samp:`from {M} import X`. :param alias: Alias for a single-element *targets*: *A* in :samp:`import X as {A}` and :samp:`from M import X as {A}`. :raise ValueError: If *targets* is empty, or if *alias* is specified even though there are multiple *targets*. """ super().__init__(comments) self.targets = list(targets) self.source = source self.alias = alias self._assert_consistency() def _assert_consistency(self): if not self.targets: raise ValueError("expected at least one import target") if len(self.targets) > 1 and self.alias: raise ValueError("alias forbidden for multiple import targets") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() import_kw = f"from {self.source} import" if self.source else "import" alias_clause = f" as {self.alias}" if self.alias else "" lines = [ Line(f"{import_kw} {target}{alias_clause}", indent_level) for target in self.targets ] if comments: return [*self.comment_lines(indent_level), *lines] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.targets == cast(__class__, o).targets and self.source == cast(__class__, o).source and self.alias == cast(__class__, o).alias ) def __repr__(self) -> str: return "{}(targets={!r}, source={!r}, alias={!r}, comments={!r})".format( self.__class__.__qualname__, self.targets, self.source, self.alias, self.comments, ) _T = TypeVar("_T") @dataclass class ExpressionView(Expression): """ A "proxy" for an object that is not an :class:`Expression`. .. |Expr| replace:: :class:`Expression` .. |EV| replace:: :class:`ExpressionView` |EV| allows to mix non-|Expr| objects in the syntax tree, along with a function capable of transforming these objects into actual |Expr| objects at any time. This is useful when these objects are easier to manipulate than their |Expr| equivalent. .. |Request| replace:: :class:`Request ` For example: any |Request| object can be converted into an equivalent |Expr|, but |Request| has a simpler API than |Expr| for request-oriented operations like accessing the URL, etc. |EV| can "wrap" a |Request| to pretend that the |Request| is an |Expr| (with all associated benefits of being part of the syntax tree), but still support the |Request| API. .. attribute:: target :any:`() → T ` -- A function (without parameters) returning the wrapped, non-|Expr| object. The benefit of :attr:`target` being a function (instead of a direct reference to the wrapped object) is that it allows to specify some mutable field of an object. See for example :class:`Task2 `, which contains an |EV| wrapping its own :attr:`request ` attribute. If the value of that attribute changes, the |EV| will refer to the new value (found by accessing the attribute via *self*), not the old value (which would still be referenced by a non-callable :attr:`target`). .. attribute:: converter :any:`T → ` |Expr| -- A function capable of transforming the result of :attr:`target` into an |Expr|. The result of :attr:`converter` is computed each time this |EV| has to behave like an |Expr|, for example when passed as argument to :any:`str`. .. attribute:: name :any:`str` -- Purely descriptive: makes the inspection of data structures containing |EV| objects more comfortable. """ target: Callable[[], _T] converter: Callable[[_T], Expression] name: str def __str__(self) -> str: return str(self.converter(self.target())) PK!uZZtransformer/request.py""" :mod:`transformer.request` -- HTTP requests read from HAR ========================================================= Representation of HAR Request objects. """ import enum from datetime import datetime from typing import Iterator, List, Optional from urllib.parse import urlparse, SplitResult import pendulum from dataclasses import dataclass from transformer.naming import to_identifier class HttpMethod(enum.Enum): """ Enumeration of supported HTTP method types. """ GET = enum.auto() #: GET POST = enum.auto() #: POST PUT = enum.auto() #: PUT OPTIONS = enum.auto() #: OPTIONS DELETE = enum.auto() #: DELETE @dataclass(frozen=True) class Header: """ An HTTP header, as recorded in a HAR file (headers__). __ http://www.softwareishard.com/blog/har-12-spec/#headers """ name: str value: str @dataclass(frozen=True) class QueryPair: """ A pair of query parameters, as recorded in a HAR file (queryString__). __ http://www.softwareishard.com/blog/har-12-spec/#queryString """ name: str value: str @dataclass class Request: """ An HTTP request, as recorded in a HAR file (request__). __ http://www.softwareishard.com/blog/har-12-spec/#request Note that *post_data*, if present, will be a dict of the same format as recorded in the HAR file (postData__ -- although it is not consistently followed by HAR generators). __ http://www.softwareishard.com/blog/har-12-spec/#postData .. attribute:: timestamp :class:`~datetime.datetime` -- Time at which the request was recorded. .. attribute:: method :class:`HttpMethod` -- HTTP method of the request. .. attribute:: url :class:`urllib.parse.SplitResult` -- URL targeted by the request. .. attribute:: headers :annotation: = [] :class:`~typing.List` of :class:`Header` -- HTTP headers sent with the request. .. attribute:: post_data :annotation: = None :data:`~typing.Optional` :any:`dict` -- If :attr:`method` is ``POST``, the corresponding data payload. .. attribute:: query :annotation: = [] :class:`~typing.List` of :class:`QueryPair` -- Key-value arguments sent as part of the :attr:`url`'s `query string`__. __ https://en.wikipedia.org/wiki/Query_string .. attribute:: name :annotation: = None :data:`~typing.Optional` :any:`str` -- Value provided for :class:`locust.clients.HttpSession`'s "dynamic" ``name`` parameter. See `Grouping requests to URLs with dynamic parameters`__ for details. __ https://docs.locust.io/en/stable/writing-a-locustfile.html #grouping-requests-to-urls-with-dynamic-parameters """ timestamp: datetime method: HttpMethod url: SplitResult headers: List[Header] = () post_data: Optional[dict] = None query: List[QueryPair] = () name: Optional[str] = None def __post_init__(self): self.headers = list(self.headers) self.query = list(self.query) @classmethod def from_har_entry(cls, entry: dict) -> "Request": """ Creates a request from a HAR entry__. __ http://www.softwareishard.com/blog/har-12-spec/#entries :raise KeyError: if *entry* is not a valid HAR "entry" object. :raise ValueError: if the ``request.startedDateTime`` value cannot be interpreted as a timestamp. """ request = entry["request"] return Request( timestamp=pendulum.parse(entry["startedDateTime"]), method=HttpMethod[request["method"]], url=urlparse(request["url"]), name=None, headers=[ Header(name=d["name"], value=d["value"]) for d in request.get("headers", []) ], post_data=request.get("postData"), query=[ QueryPair(name=d["name"], value=d["value"]) for d in request.get("queryString", []) ], ) @classmethod def all_from_har(cls, har: dict) -> Iterator["Request"]: """ Generates requests for all entries__ in a given HAR top-level object. __ http://www.softwareishard.com/blog/har-12-spec/#entries """ for entry in har["log"]["entries"]: yield cls.from_har_entry(entry) def task_name(self) -> str: """ Generates a simple name to be used as :attr:`~transformer.task.Task2.name` by the :term:`task` of this request. """ return "_".join( ( self.method.name, self.url.scheme, to_identifier(self.url.hostname), to_identifier(self.url.path), str(abs(hash(self))), ) ) def __hash__(self) -> int: return hash( ( self.timestamp, self.method, self.url, tuple(self.headers), repr(self.post_data) if self.post_data else None, tuple(self.query), ) ) PK!_JQ7Q7transformer/scenario.py""" :mod:`transformer.scenario` -- Grouping related tasks into scenarios ==================================================================== .. |Scenario| replace:: :class:`Scenario` Transformer creates a |Scenario| object for each HAR file it reads, so each :term:`task` (representing an HTTP request in a HAR file) is part of a :term:`scenario`. Transformer also creates |Scenario| objects for directories that contain HAR files, so a :term:`scenario` can also be part of another :term:`scenario`. To sum up, |Scenario| objects form a tree_, the leaves of which are all :term:`task` objects. This hierarchical structure maps cleanly to Locust's :class:`~locust.core.TaskSet` objects, which can also be nested and have a weight. .. _tree: https://en.wikipedia.org/wiki/Tree_(data_structure) """ import json import logging from collections import defaultdict from pathlib import Path from typing import Sequence, Mapping, Union, Set, List, Optional, Dict, Tuple import dataclasses from dataclasses import dataclass import transformer.plugins as plug from transformer.naming import to_identifier from transformer.plugins.contracts import Plugin from transformer.request import Request from transformer.task import Task, Task2 WEIGHT_FILE_SUFFIX = ".weight" DEFAULT_WEIGHT = 1 class SkippableScenarioError(ValueError): # noqa: B903 """ Raised when a Scenario cannot be created from the provided input path. If related to the creation of a Scenario B inside a larger Scenario A (i.e. B would be in A.children), A catches this exception, logs a warning, and moves on to the next potential child. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason class DanglingWeightError(SkippableScenarioError): """ Raised when a scenario directory contains weight files that don't correspond to any scenario. """ pass class CollidingScenariosError(SkippableScenarioError): """ Raised when scenarios created from different paths end up having the same name. The only way this happens is if the paths are identical save for their extension (e.g. ``.har`` vs ``.json``), or if there is a bug (collision) in :func:`~transformer.naming.to_identifier` (which should never happen). """ pass class WeightValueError(ValueError): # noqa: B903 """ Raised when the weight file associated to a scenario contains errors. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason @dataclass class Scenario: """ A web browsing session that we want to emulate, i.e. a sequence of :term:`tasks ` to be performed. .. attribute:: name :any:`str` -- Name of the corresponding :class:`~locust.core.TaskSet`. .. attribute:: children |Sequence| [ :class:`~transformer.task.Task2` :data:`| ` :class:`Scenario` ] -- Tasks and scenarios that are part of this scenario. .. attribute:: origin :data:`~typing.Optional` :class:`~pathlib.Path` -- Path to the HAR file or directory this scenario represents. .. attribute:: weight :annotation: = 1 :any:`int` -- Weight of this scenario. See :ref:`Specifying-weights` and :ref:`Hierarchical-scenarios` for details. """ name: str children: Sequence[Union[Task, Task2, "Scenario"]] origin: Optional[Path] weight: int = 1 @classmethod def from_path( cls, path: Path, plugins: Sequence[Plugin] = (), ts_plugins: Sequence[Plugin] = (), short_name: bool = False, ) -> "Scenario": """ Makes a :class:`Scenario` (possibly containing sub-scenarios) out of the provided *path*, which may point to either: - a HAR file (like :file:`x/y/z.har`), - a scenario directory (a directory containing HAR files or other scenario directories). :raise SkippableScenarioError: if path is neither a directory nor a HAR file, or is a directory containing dangling weight files :param path: path to the HAR file or scenario directory. :param plugins: list of :term:`OnScenario` plugins to apply. :param ts_plugins: deprecated -- for backward compatibility only. :param short_name: whether the returned scenarios have names based only on their path's basename, instead of the full path. By default *False* to avoid generating homonym scenarios (and therefore homonym :class:~locust.core.TaskSet` classes), but *True* when generating sub-scenarios (:attr:`children`) from a directory *path* (because then the names are "scoped" by the parent directory). """ if path.is_dir(): return cls.from_dir( path, plugins, ts_plugins=ts_plugins, short_name=short_name ) else: return cls.from_har_file( path, plugins, ts_plugins=ts_plugins, short_name=short_name ) @classmethod def from_dir( cls, path: Path, plugins: Sequence[Plugin], ts_plugins: Sequence[Plugin], short_name: bool, ) -> "Scenario": """ Makes a :class:`Scenario` out of the provided directory *path*. *path* must represent a "scenario directory", which contains at least one HAR file or another scenario directory. Symbolic link loops are not checked but forbidden! There may exist a weight file :file:`{path}.weight`. If so, its contents will be used as :attr:`weight` after calling :meth:`weight_from_path`. Errors are handled this way: #. If *path* itself cannot be transformed into a scenario, raise :exc:`SkippableScenarioError`. #. For each child of *path*, apply (1) but catch the exception and display a warning about skipping that child. (If all children are skipped, (1) applies to *path* itself.) Therefore: - If the directory contains weight files that don't match any HAR file or subdirectory, an error is emitted as this is probably a mistake. - If the directory contains files or directories that cannot be converted into scenarios (e.g. non-JSON files or :file:`.git` directories), a message is emitted and the file or subdirectory is skipped. :param path: path to the directory. :param plugins: list of :term:`OnScenario` plugins to apply. :param ts_plugins: deprecated -- for backward compatibility only. :param short_name: whether to simplify the resulting :class:`~locust.core.TaskSet` class name. If *short_name* is *False*, that class name is guaranteed to be unique across all TaskSets of the locustfile, but this is generally not necessary and results in less readable class names. :raise SkippableScenarioError: if the directory contains dangling weight files or no sub-scenarios. """ try: children = list(path.iterdir()) except OSError as err: raise SkippableScenarioError(path, err) weight_files: Set[Path] = { child for child in children if child.suffix == WEIGHT_FILE_SUFFIX } scenarios: List[Scenario] = [] for child in children: if child in weight_files: continue try: scenario = cls.from_path( child, plugins, ts_plugins=ts_plugins, short_name=True ) except SkippableScenarioError as err: logging.warning( "while searching for HAR files, skipping %s: %s", child, err.reason ) else: scenarios.append(scenario) cls._check_dangling_weights(path, scenarios, weight_files) if not scenarios: raise SkippableScenarioError(path, "no scenarios inside the directory") cls._check_name_collisions(path, scenarios) return Scenario( name=to_identifier(path.with_suffix("").name if short_name else str(path)), children=tuple(scenarios), origin=path, weight=cls.weight_from_path(path), ) @classmethod def _check_name_collisions(cls, path: Path, scenarios: List["Scenario"]): scenarios_by_name: Dict[str, List[Scenario]] = defaultdict(list) for s in scenarios: scenarios_by_name[s.name].append(s) colliding_paths: Set[Tuple[Path, ...]] = { tuple(x.origin for x in xs) for xs in scenarios_by_name.values() if len(xs) > 1 } if colliding_paths: groups = "; ".join( " vs ".join(repr(s.name) for s in group) for group in colliding_paths ) logging.error( "%s contains scenarios with colliding names: %s", path, groups ) raise CollidingScenariosError(path, "scenarios have colliding names") @classmethod def _check_dangling_weights(cls, path, scenarios, weight_files): scenario_names = {s.origin.with_suffix("").name for s in scenarios} dangling_weight_files = [ f for f in weight_files if f.with_suffix("").name not in scenario_names ] if dangling_weight_files: hint = ", ".join(str(f) for f in dangling_weight_files) logging.error( "%s contains weight files that don't correspond to any scenarios: %s", path, hint, ) logging.info( "For any value of X, if there exists a weight file X.weight, " "there must exist either an X.har file or an X scenario subdirectory." ) raise DanglingWeightError(path, "contains dangling weight files") @classmethod def from_har_file( cls, path: Path, plugins: Sequence[Plugin], ts_plugins: Sequence[Plugin], short_name: bool, ) -> "Scenario": """ Creates a Scenario given a HAR file. :raise SkippableScenarioError: if path is unreadable or not a HAR file """ try: with path.open() as file: har = json.load(file) requests = Request.all_from_har(har) tasks = Task.from_requests(requests) # TODO: Remove this when Contract.OnTaskSequence is removed. tasks = plug.apply(ts_plugins, tasks) # TODO: Remove Task-to-Task2 conversion once both are merged. tasks = tuple(plug.apply(plugins, Task2.from_task(t)) for t in tasks) return Scenario( name=to_identifier( path.with_suffix("").name if short_name else str(path) ), children=tuple(tasks), origin=path, weight=cls.weight_from_path(path), ) except (OSError, json.JSONDecodeError, UnicodeDecodeError) as err: raise SkippableScenarioError(path, err) @classmethod def weight_from_path(cls, path: Path) -> int: """ Reads the weight file corresponding to path, or returns a default weight if the weight file doesn't exist. :param path: represents either a HAR file or a scenario directory :raise WeightValueError: if the weight file exists but its contents cannot be interpreted as a weight """ weight_path = path.with_suffix(WEIGHT_FILE_SUFFIX) try: weight = weight_path.read_text().strip() except OSError as err: logging.info( f"No {weight_path} provided for {path}: " f"assigning default weight {DEFAULT_WEIGHT} ({err})" ) return DEFAULT_WEIGHT if not weight.isdecimal() or int(weight) == 0: logging.error( f"invalid weight file %s: weights must be positive integers, got %r", weight_path, weight, ) raise WeightValueError(path, weight) return int(weight) @property def global_code_blocks(self) -> Mapping[str, Sequence[str]]: """ .. deprecated:: 1.0.2 This attribute is only kept for backward compatibility purposes. It exists because Transformer's first plugin system didn't have :term:`OnPythonProgram`, so plugins had to specify the top-level locustfile code blocks they needed (e.g. imports, global variables) at the :class:`Task` level and let the plugin system percolate these code blocks through the scenario tree. This explains why tasks have the similar :any:`transformer.task.Task2.global_code_blocks` field. """ # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. return { block_name: block_lines for child in self.children for block_name, block_lines in child.global_code_blocks.items() } def apply_plugins(self, plugins: Sequence[Plugin]) -> "Scenario": """ Recursively builds a new scenario tree from the leaves by applying all *plugins* to each cloned scenario subtree. Does not do anything if *plugins* is empty. :param plugins: the plugins to apply. See :ref:`Specifying-plugins` for details. """ if not plugins: return self children = [ c.apply_plugins(plugins) if isinstance(c, Scenario) else c for c in self.children ] return plug.apply(plugins, dataclasses.replace(self, children=children)) PK!k` or :ref:`grouping `: these come with :term:`scenarios `. """ import json from collections import OrderedDict from json import JSONDecodeError from types import MappingProxyType from typing import ( Iterable, NamedTuple, Iterator, Sequence, Optional, Mapping, Dict, List, Tuple, cast, ) import dataclasses from dataclasses import dataclass import transformer.python as py from transformer.blacklist import on_blacklist from transformer.helpers import zip_kv_pairs from transformer.request import HttpMethod, Request, QueryPair IMMUTABLE_EMPTY_DICT = MappingProxyType({}) TIMEOUT = 30 ACTION_INDENTATION_LEVEL = 12 JSON_MIME_TYPE = "application/json" class LocustRequest(NamedTuple): """ All parameters for the request performed by the Locust client object. .. deprecated:: 1.0.2 Only used by :class:`Task`, which is itself deprecated. Use :class:`Task2` instead of :class:`Task`. """ method: HttpMethod url: str headers: Mapping[str, str] post_data: dict = MappingProxyType({}) query: Sequence[QueryPair] = () name: Optional[str] = None @classmethod def from_request(cls, r: Request) -> "LocustRequest": return LocustRequest( method=r.method, url=repr(r.url.geturl()), headers=zip_kv_pairs(r.headers), post_data=r.post_data, query=r.query, name=repr(r.name or r.url.geturl()), ) @dataclass class Task2: """ Represents a :term:`task`, i.e. an HTTP request along with some optional pre- and post-processing code. .. attribute:: name :any:`str` -- Name of the corresponding :any:`locust.core.task` function in the locustfile. .. attribute:: request :any:`transformer.request.Request` -- HTTP request executed by this task. .. attribute:: statements :any:`Sequence ` of |Statement| -- Body of the corresponding :any:`locust.core.task` function in the locustfile. One of these statements contains an |ExpressionView| pointing to :attr:`request`. The other statements (if any) represent pre- or post-processing code for that request, depending on whether they appear before or after the statement containing the |ExpressionView|. .. warning:: Plugins should be careful if they replace the |ExpressionView| object found in :attr:`statements`. Other plugins should still be able to change :attr:`request` and expect to see these changes reflected in :attr:`statements` via |ExpressionView|. .. attribute:: global_code_blocks :any:`Mapping ` of :any:`str` to |Statement| .. deprecated:: 1.0.2 This attribute is only kept for backward compatibility purposes. It exists because Transformer's first plugin system didn't have :term:`OnPythonProgram`, so plugins had to specify the top-level locustfile code blocks they needed (e.g. imports, global variables) at the :class:`Task` level and let the plugin system percolate these code blocks through the scenario tree. This explains why scenarios have the similar :any:`transformer.scenario.Scenario.global_code_blocks` field. """ name: str request: Request statements: Sequence[py.Statement] = () # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. global_code_blocks: Mapping[str, Sequence[str]] = IMMUTABLE_EMPTY_DICT def __post_init__(self,) -> None: self.statements = list(self.statements) self.global_code_blocks = { k: list(v) for k, v in self.global_code_blocks.items() } @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task2"]: """ Generates a set of tasks from a given set of HTTP requests. Each request will be turned into an unevaluated function call (:class:`transformer.python.FunctionCall`) making the actual request. The returned tasks are ordered by increasing :any:`timestamp ` of the corresponding request. """ # TODO: Update me when merging Task with Task2: "statements" needs to # contain a ExpressionView to Task2.request. # See what is done in from_task (but without the LocustRequest part). # See https://github.com/zalando-incubator/Transformer/issues/11. for req in sorted(requests, key=lambda r: r.timestamp): if not on_blacklist(req.url.netloc): yield cls(name=req.task_name(), request=req, statements=...) @classmethod def from_task(cls, task: "Task") -> "Task2": # TODO: Remove me as soon as the old Task is no longer used and Task2 is # renamed to Task. # See https://github.com/zalando-incubator/Transformer/issues/11. t = cls(name=task.name, request=task.request) if task.locust_request: expr_view = py.ExpressionView( name="this task's request field", target=lambda: task.locust_request, converter=lreq_to_expr, ) else: expr_view = py.ExpressionView( name="this task's request field", target=lambda: t.request, converter=req_to_expr, ) t.statements = [ *[py.OpaqueBlock(x) for x in task.locust_preprocessing], py.Assignment("response", expr_view), *[py.OpaqueBlock(x) for x in task.locust_postprocessing], ] return t NOOP_HTTP_METHODS = {HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.DELETE} def req_to_expr(r: Request) -> py.FunctionCall: url = py.Literal(str(r.url.geturl())) headers = zip_kv_pairs(r.headers) args: Dict[str, py.Expression] = OrderedDict( url=url, name=py.Literal(r.name) if r.name else url, timeout=py.Literal(TIMEOUT), allow_redirects=py.Literal(False), ) if headers: args["headers"] = py.Literal(headers) if r.method is HttpMethod.POST: if r.post_data: rpd = RequestsPostData.from_har_post_data(r.post_data) args.update(rpd.as_kwargs()) elif r.method is HttpMethod.PUT: if r.post_data: rpd = RequestsPostData.from_har_post_data(r.post_data) args.update(rpd.as_kwargs()) args.setdefault("params", py.Literal([])) cast(py.Literal, args["params"]).value.extend( _params_from_name_value_dicts([dataclasses.asdict(q) for q in r.query]) ) elif r.method not in NOOP_HTTP_METHODS: raise ValueError(f"unsupported HTTP method: {r.method!r}") method = r.method.name.lower() return py.FunctionCall(name=f"self.client.{method}", named_args=args) def lreq_to_expr(lr: LocustRequest) -> py.FunctionCall: # TODO: Remove me once LocustRequest no longer exists. # See https://github.com/zalando-incubator/Transformer/issues/11. url = _peel_off_repr(lr.url) name = _peel_off_repr(lr.name) if lr.name else url args: Dict[str, py.Expression] = OrderedDict( url=url, name=name, timeout=py.Literal(TIMEOUT), allow_redirects=py.Literal(False), ) if lr.headers: args["headers"] = py.Literal(lr.headers) if lr.method is HttpMethod.POST: if lr.post_data: rpd = RequestsPostData.from_har_post_data(lr.post_data) args.update(rpd.as_kwargs()) elif lr.method is HttpMethod.PUT: if lr.post_data: rpd = RequestsPostData.from_har_post_data(lr.post_data) args.update(rpd.as_kwargs()) args.setdefault("params", py.Literal([])) cast(py.Literal, args["params"]).value.extend( _params_from_name_value_dicts([dataclasses.asdict(q) for q in lr.query]) ) elif lr.method not in NOOP_HTTP_METHODS: raise ValueError(f"unsupported HTTP method: {lr.method!r}") method = lr.method.name.lower() return py.FunctionCall(name=f"self.client.{method}", named_args=args) def _peel_off_repr(s: str) -> py.Literal: """ Reverse the effect of LocustRequest's repr() calls on url and name. """ if s.startswith("f"): return py.FString(eval(s[1:], {}, {})) return py.Literal(eval(s, {}, {})) class Task(NamedTuple): """ One step of "doing something" on a website. This basically represents a @task in Locust-speak. .. deprecated:: 1.0.2 Use :class:`Task2` instead. :class:`Task` is kept for backward compatibility with existing plugins that have not yet migrated to :class:`Task2`. Transformer will automatically convert :class:`Task` objects into :class:`Task2` objects using :meth:`Task2.from_task`. """ name: str request: Request locust_request: Optional[LocustRequest] = None locust_preprocessing: Sequence[str] = () locust_postprocessing: Sequence[str] = () global_code_blocks: Mapping[str, Sequence[str]] = MappingProxyType({}) @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task"]: """ Generates a set of Tasks from a given set of Requests. """ for req in sorted(requests, key=lambda r: r.timestamp): if on_blacklist(req.url.netloc): continue else: yield cls(name=req.task_name(), request=req) def inject_headers(self, headers: dict): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace( headers={**original_locust_request.headers, **headers} ) task = self._replace(locust_request=new_locust_request) return task def replace_url(self, url: str): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace(url=url) return self._replace(locust_request=new_locust_request) @dataclass class RequestsPostData: """ Data to be sent via HTTP POST, along with which API of the requests library to use. """ data: Optional[py.Literal] = None params: Optional[py.Literal] = None json: Optional[py.Literal] = None def as_kwargs(self) -> Dict[str, py.Expression]: return {k: v for k, v in dataclasses.asdict(self).items() if v is not None} @classmethod def from_har_post_data(cls, post_data: dict) -> "RequestsPostData": """ Converts a HAR postData object into a RequestsPostData instance. :param post_data: a HAR "postData" object, see http://www.softwareishard.com/blog/har-12-spec/#postData. :raise ValueError: if *post_data* is invalid. """ try: return _from_har_post_data(post_data) except ValueError as err: raise ValueError(f"invalid HAR postData object: {post_data!r}") from err def _from_har_post_data(post_data: dict) -> RequestsPostData: mime_k = "mimeType" try: mime: str = post_data[mime_k] except KeyError: raise ValueError(f"missing {mime_k!r} field") from None rpd = RequestsPostData() # The "text" and "params" fields are supposed to be mutually # exclusive (according to the HAR spec) but nobody respects that. # Often, both text and params are provided for x-www-form-urlencoded. text_k, params_k = "text", "params" if text_k not in post_data and params_k not in post_data: raise ValueError(f"should contain {text_k!r} or {params_k!r}") _extract_text(mime, post_data, text_k, rpd) try: params = _params_from_post_data(params_k, post_data) if params is not None: rpd.params = py.Literal(params) except (KeyError, UnicodeEncodeError, TypeError) as err: raise ValueError("unreadable params field") from err return rpd def _extract_text( mime: str, post_data: dict, text_k: str, rpd: RequestsPostData ) -> None: text = post_data.get(text_k) if mime == JSON_MIME_TYPE: if text is None: raise ValueError(f"missing {text_k!r} field for {JSON_MIME_TYPE} content") try: rpd.json = py.Literal(json.loads(text)) except JSONDecodeError as err: raise ValueError(f"unreadable JSON from field {text_k!r}") from err elif text is not None: # Probably application/x-www-form-urlencoded. try: rpd.data = py.Literal(text.encode()) except UnicodeEncodeError as err: raise ValueError(f"cannot encode the {text_k!r} field in UTF-8") from err def _params_from_post_data( key: str, post_data: dict ) -> Optional[List[Tuple[bytes, bytes]]]: """ Extracts the *key* list from *post_data* and calls _params_from_name_value_dicts with that list. :raise TypeError: if the object at *key* is built using unexpected data types. """ params = post_data.get(key) if params is None: return None if not isinstance(params, list): raise TypeError(f"the {key!r} field should be a list") return _params_from_name_value_dicts(params) def _params_from_name_value_dicts( dicts: Iterable[Mapping[str, str]] ) -> List[Tuple[bytes, bytes]]: """ Converts a HAR "params" element [0] into a list of tuples that can be used as value for requests' "params" keyword-argument. [0]: http://www.softwareishard.com/blog/har-12-spec/#params [1]: http://docs.python-requests.org/en/master/user/quickstart/ #more-complicated-post-requests :raise KeyError: if one of the elements doesn't contain a "name" or "value" field. :raise UnicodeEncodeError: if an element's "name" or "value" string cannot be encoded in UTF-8. """ return [(d["name"].encode(), d["value"].encode()) for d in dicts] PK!M~CCtransformer/test_blacklist.py# pylint: skip-file import io import os import logging from unittest.mock import patch from transformer.blacklist import on_blacklist class TestBlacklist: @patch("builtins.open") def test_it_returns_false_and_logs_error_if_the_blacklist_does_not_exist( self, mock_open, caplog ): mock_open.side_effect = FileNotFoundError caplog.set_level(logging.DEBUG) assert on_blacklist("") is False assert f"Could not read blacklist file {os.getcwd()}/.urlignore" in caplog.text @patch("builtins.open") def test_it_returns_false_if_the_blacklist_is_empty(self, mock_open): mock_open.return_value = io.StringIO("") assert on_blacklist("") is False @patch("builtins.open") def test_it_returns_false_if_url_is_not_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("www.zalando.de") is False @patch("builtins.open") def test_it_returns_true_if_url_is_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.google.com\nwww.amazon.com") assert on_blacklist("www.amazon.com") is True @patch("builtins.open") def test_it_returns_true_if_a_partial_match_is_found(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("http://www.amazon.com/") is True @patch("builtins.open") def test_it_ignores_empty_lines(self, mock_open): mock_open.return_value = io.StringIO("\nwww.amazon.com") assert on_blacklist("www.zalando.de") is False PK!؏.transformer/test_cli.pyfrom pathlib import Path from .cli import read_config class TestReadConfig: def test_paths_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config([]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli(self): conf = read_config(["/x/y", "a/b"]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli_overwrite_those_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config(["u/v/w"]) assert conf.input_paths == (Path("u/v/w"),) def test_plugins_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config([]) assert conf.plugins == ("a", "b.c.d") def test_plugins_from_cli(self): conf = read_config(["-p", "a", "XXX", "--plugin", "b.c.d"]) assert conf.plugins == ("a", "b.c.d") def test_merge_plugins_from_env_and_cli(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config(["-p", "e.f", "XXX", "--plugin", "g"]) assert conf.plugins == ("a", "b.c.d", "e.f", "g") PK! كsvvtransformer/test_helpers.pyfrom transformer.helpers import zip_kv_pairs from transformer.request import Header class TestZipKVPairs: def test_it_returns_a_dict_given_a_list_of_named_tuples(self): name = "some name" value = "some value" result = zip_kv_pairs([Header(name=name, value=value)]) assert isinstance(result, dict) assert result[name] == value PK!P//transformer/test_locust.pyimport string from typing import cast from unittest.mock import MagicMock import pytest import transformer.plugins as plug from transformer.locust import locustfile, locust_taskset from transformer.request import HttpMethod from transformer.scenario import Scenario from transformer.task import Task, TIMEOUT from transformer.plugins import plugin, Contract from transformer.task import Task2 class TestLocustfile: def test_it_renders_a_locustfile_template(self): a_name = "some_task" a_request = MagicMock() a_request.method = HttpMethod.GET a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_request.url.geturl() a_request.url.geturl.return_value = "some_url" a_request.name = None task = Task(a_name, a_request) scenario = Scenario(name="SomeScenario", children=[task], origin=None) scenario_group = Scenario( name="ScenarioGroup", children=[scenario], weight=2, origin=None ) script = locustfile([scenario_group]) expected = string.Template( """ # File automatically generated by Transformer: # https://github.bus.zalan.do/TIP/transformer import re from locust import HttpLocust from locust import TaskSequence from locust import TaskSet from locust import seq_task from locust import task class ScenarioGroup(TaskSet): @task(1) class SomeScenario(TaskSequence): @seq_task(1) def some_task(self): response = self.client.get(url='some_url', name='some_url', timeout=$TIMEOUT, allow_redirects=False) class LocustForScenarioGroup(HttpLocust): task_set = ScenarioGroup weight = 2 min_wait = 0 max_wait = 10 """ ).safe_substitute({"TIMEOUT": TIMEOUT}) assert expected.strip() == script.strip() def test_it_renders_a_locustfile_template_with_plugin_change_task_name(self): @plugin(Contract.OnTask) def plugin_change_task_name(t: Task2) -> Task2: t.request.name = "changed_name" return t a_name = "some_task" a_request = MagicMock() a_request.method = HttpMethod.GET a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_request.url.geturl() a_request.url.geturl.return_value = "some_url" a_request.name = None task = plug.apply((plugin_change_task_name,), Task(a_name, a_request)) scenario = Scenario(name="SomeScenario", children=[task], origin=None) scenario_group = Scenario( name="ScenarioGroup", children=[scenario], weight=2, origin=None ) script = locustfile([scenario_group]) expected = string.Template( """ # File automatically generated by Transformer: # https://github.bus.zalan.do/TIP/transformer import re from locust import HttpLocust from locust import TaskSequence from locust import TaskSet from locust import seq_task from locust import task class ScenarioGroup(TaskSet): @task(1) class SomeScenario(TaskSequence): @seq_task(1) def some_task(self): response = self.client.get(url='some_url', name='changed_name', timeout=$TIMEOUT, allow_redirects=False) class LocustForScenarioGroup(HttpLocust): task_set = ScenarioGroup weight = 2 min_wait = 0 max_wait = 10 """ ).safe_substitute({"TIMEOUT": TIMEOUT}) assert expected.strip() == script.strip() def test_generates_passed_global_code_blocks(): def mock(name, blocks=None): m = MagicMock(spec=Scenario, children=[], global_code_blocks=blocks or {}) # https://docs.python.org/3/library/unittest.mock.html#mock-names-and-the-name-attribute m.name = name return m sg1 = Scenario( "sg1", children=[mock("a", blocks={"b1": ["ab"]}), mock("b", blocks={"b2": ["cd"]})], origin=None, ) sg2 = Scenario("sg2", children=[mock("c")], origin=None) sg3 = Scenario( "sg3", children=[mock("d", blocks={"b3": ["yz"], "b2": ["yyy", "zzz"]})], origin=None, ) code = locustfile([sg1, sg2, sg3]) assert code.endswith( "\n# b1\nab\n# b2\nyyy\nzzz\n# b3\nyz" ), "the latter b2 block should override the former" def test_locust_taskset_raises_on_malformed_scenario(): bad_child = cast(Scenario, 7) bad_scenario = Scenario(name="x", children=[bad_child], origin=None) with pytest.raises(TypeError, match=r"unexpected type .*\bchildren"): locust_taskset(bad_scenario) PK!transformer/test_naming.pyimport re from hypothesis import given, example, assume from hypothesis.strategies import text, from_regex from transformer.naming import to_identifier DIGITS_SUFFIX_RX = re.compile(r"_[0-9]+\Z") class TestToIdentifier: @given(text(min_size=1, max_size=3)) @example("0") def test_its_output_can_always_be_used_as_python_identifier(self, s: str): exec(f"{to_identifier(s)} = 2") @given(text(), text()) @example("x y", to_identifier("x y")) def test_it_has_no_collisions(self, a: str, b: str): assert a == b or to_identifier(a) != to_identifier(b) @given(from_regex(re.compile(r"[a-z_][a-z0-9_]*", re.IGNORECASE), fullmatch=True)) def test_it_does_not_add_suffix_when_not_necessary(self, input: str): assume(not DIGITS_SUFFIX_RX.search(input)) assert to_identifier(input) == input def test_it_adds_prefix_to_inputs_starting_with_digit(self): assert to_identifier("0").startswith("_") PK!Ҽ}}transformer/test_python.pyimport pprint import string from typing import List from unittest.mock import patch import pytest from hypothesis import given from hypothesis.strategies import ( lists, text, integers, floats, one_of, booleans, none, dictionaries, ) import transformer.python as py from transformer.builders_python import ( indent_levels, ascii_inline_text, lines, opaque_blocks, statements, expressions, functions, decorations, classes, standalones, literals, function_calls, binary_ops, symbols, assignments, ifelses, imports, ) class TestLine: def test_str_is_identity_with_indent_level_zero(self): assert str(py.Line("abc", 0)) == "abc" def test_default_indent_level_is_zero(self): assert str(py.Line("abc")) == "abc" def test_default_indent_unit_is_four_spaces(self): assert py.Line.INDENT_UNIT == " " def test_str_indents_as_much_indent_units_as_provided_indent_level(self): assert ( str(py.Line("abc", 1)) == py.Line.INDENT_UNIT + "abc" ), "indent level 1 means one time INDENT_UNIT" assert str(py.Line("x", 2)) == py.Line.INDENT_UNIT * 2 + "x" @given(lines, lines) def test_equal_iff_text_and_indent_are_equal(self, a: py.Line, b: py.Line): assert a != b or (a.text == b.text and a.indent_level == b.indent_level) def test_repr(self): line = py.Line(text="a'\" b", indent_level=3) assert repr(line) == "Line(text='a\\'\" b', indent_level=3)" class TestStatement: def test_lines_must_be_implemented(self): with pytest.raises(NotImplementedError): py.Statement().lines() @given(statements, indent_levels, lists(ascii_inline_text(min_size=1), max_size=2)) def test_comment_lines(self, stmt: py.Statement, level: int, comments: List[str]): stmt.comments = comments x = py.Line.INDENT_UNIT assert [str(l) for l in stmt.comment_lines(level)] == [ x * level + f"# {line}" for line in comments ] @given(statements, lines) def test_attach_comment_without_comment_changes_nothing( self, stmt: py.Statement, line: py.Line ): clone = line.clone() stmt.comments.clear() # Some comments may have been generated by Hypothesis. lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, ascii_inline_text(min_size=1)) def test_attach_comment_with_one_line_comment_changes_only_text( self, stmt: py.Statement, line: py.Line, comment: str ): clone = line.clone() stmt.comments = [comment] lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert ( l.text == f"{clone.text} # {comment}" ), "the comment is appended to Line.text" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, lists(ascii_inline_text(min_size=1), min_size=2)) def test_attach_comment_with_multiline_comment_adds_lines_above( self, stmt: py.Statement, line: py.Line, comments: List[str] ): clone = line.clone() stmt.comments = comments lines = stmt.attach_comment(line) assert len(lines) == 1 + len(comments) l = lines[-1] assert l is line, "the same Line object must be returned last" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" assert lines[:-1] == [ py.Line(f"# {s}", clone.indent_level) for s in comments ], "all lines but the last are standalone comment lines" class TestOpaqueBlock: @given(opaque_blocks, opaque_blocks) def test_equal_iff_components_are_equal(self, a: py.OpaqueBlock, b: py.OpaqueBlock): assert a != b or (a.block == b.block and a.comments == b.comments) @given(text(string.whitespace, max_size=5)) def test_lines_raises_for_empty_input_block(self, block: str): with pytest.raises(ValueError): py.OpaqueBlock(block) def test_lines_returns_block_lines_if_top_and_bottom_are_not_empty(self): ob = py.OpaqueBlock(" a\n b\n\n\n c") assert len(ob.lines()) == 5 def test_lines_returns_block_lines_without_empty_top_and_bottom(self): ob = py.OpaqueBlock("\n\n a\n b\n\n\n c\n\n\n") assert len(ob.lines()) == 5 X = py.Line.INDENT_UNIT @pytest.mark.parametrize( "input_block, indent_level, expected", [ ("x", 0, "x"), ("x", 1, " x"), (" x", 1, " x"), (" x", 2, " x"), (" x", 0, "x"), ("x\nx", 0, "x\nx"), ("x\nx", 1, " x\n x"), (" x\n x", 1, " x\n x"), ("x\n x", 1, " x\n x"), ("x\nx\n", 1, " x\n x"), (" x\n x\n", 1, " x\n x"), ("x\n x\n", 1, " x\n x"), ("x\n x\n x", 0, "x\n x\n x"), ("x\n x\n x", 1, " x\n x\n x"), ("\nx\n x", 0, "x\n x"), ("\nx\n x", 1, " x\n x"), ("\n x\n x", 0, "x\n x"), ("\n x\n x", 1, " x\n x"), ("\tx\n\t x", 1, " x\n x"), ("\tx\n\t\tx", 1, f" x\n {' ' * py.OpaqueBlock.TAB_SIZE}x"), ], ) def test_lines_indents_correctly( self, input_block: str, indent_level: int, expected: str ): lines = py.OpaqueBlock(input_block).lines(indent_level) print("lines =") pprint.pprint(lines) with patch("transformer.python.Line.INDENT_UNIT", " "): assert "\n".join(str(line) for line in lines) == expected @given(opaque_blocks, indent_levels, ascii_inline_text(min_size=1)) def test_lines_displays_comment_always_above( self, ob: py.OpaqueBlock, level: int, comment: str ): x = py.Line.INDENT_UNIT ob.comments = [comment] assert [str(l) for l in ob.lines(level)] == [ x * level + f"# {comment}", *[str(l) for l in ob.lines(level, comments=False)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.OpaqueBlock("hello", comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [ x * level + "hello" ] def test_repr(self): text = " a'\" b " assert ( repr(py.OpaqueBlock(block=text, comments=["hi"])) == "OpaqueBlock(' a\\'\" b ', comments=['hi'])" ) class TestFunction: @given(indent_levels) def test_lines_with_no_params_and_no_body(self, level: int): f = py.Function("f", params=[], statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_simple_body(self, level: int): f = py.Function( "f", params=[], statements=[py.OpaqueBlock("print('Hello!')"), py.OpaqueBlock("return")], ) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "print('Hello!')", x * (level + 1) + "return", ] def test_lines_with_simple_params(self): f = py.Function("f", params=["x", "y"], statements=[]) assert [str(l) for l in f.lines()] == [ "def f(x, y):", py.Line.INDENT_UNIT + "pass", ] def test_lines_with_complex_params(self): f = py.Function( "f", params=["x: int", "abc: bool = True", "*z: str"], statements=[] ) assert [str(l) for l in f.lines()] == [ "def f(x: int, abc: bool = True, *z: str):", py.Line.INDENT_UNIT + "pass", ] @given(indent_levels) def test_lines_with_nested_body(self, level: int): x = py.Line.INDENT_UNIT f = py.Function( "func", params=[], statements=[ py.Assignment("a", py.Literal(2)), py.IfElse( [(py.Literal(True), [py.Assignment("b", py.Literal(3))])], [ py.Assignment("b", py.Literal(4)), py.Assignment("c", py.Literal(1)), ], ), ], ) assert [str(l) for l in f.lines(level)] == [ x * level + "def func():", x * (level + 1) + "a = 2", x * (level + 1) + "if True:", x * (level + 2) + "b = 3", x * (level + 1) + "else:", x * (level + 2) + "b = 4", x * (level + 2) + "c = 1", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "def f():", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level, comments=False)] == [ x * level + "def f():", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr(py.Function(name="f", params=["a"], statements=stmts, comments=["hi"])) == f"Function(name='f', params=['a'], statements={stmts!r}, comments=['hi'])" ) @given(functions, functions) def test_equal_iff_components_are_equal(self, a: py.Function, b: py.Function): assert a != b or ( a.name == b.name and a.params == b.params and a.statements == b.statements and a.comments == b.comments ) class TestDecoration: @given(decorations, decorations) def test_equal_iff_components_are_equal(self, a: py.Decoration, b: py.Decoration): assert a != b or ( a.decorator == b.decorator and a.target == b.target and a.comments == b.comments ) @given(indent_levels) def test_with_a_function(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) d = py.Decoration("task(2)", f) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_with_a_class(self, level: int): c = py.Class( "C", superclasses=[], statements=[py.Assignment("a: int", py.Literal(1))] ) d = py.Decoration("task", c) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task", *[str(l) for l in c.lines(level)], ] @given(indent_levels) def test_nested_decorators(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) first = py.Decoration("task(2)", f) second = py.Decoration("task_seq(1)", first) x = py.Line.INDENT_UNIT assert [str(l) for l in second.lines(level)] == [ x * level + "@task_seq(1)", x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_lines_with_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "# x", x * level + "# y", x * level + "@task", x * level + "# 1", x * level + "# 2", x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level, comments=False)] == [ x * level + "@task", x * level + "def f():", x * (level + 1) + "pass", ] def test_repr(self): f = py.Function("f", params=[], statements=[]) assert ( repr(py.Decoration("task", f, comments=["hi"])) == f"Decoration('task', {f!r}, comments=['hi'])" ) class TestClass: @given(classes, classes) def test_equal_iff_components_are_equal(self, a: py.Class, b: py.Class): assert a != b or ( a.name == b.name and a.statements == b.statements and a.superclasses == b.superclasses and a.comments == b.comments ) @given(indent_levels) def test_empty_class(self, level: int): c = py.Class("A", statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "pass", ] @given( lists( text(string.ascii_letters, min_size=1, max_size=2), min_size=0, max_size=3 ) ) def test_class_with_superclasses(self, names: List[str]): c = py.Class("A", statements=[], superclasses=names) x = py.Line.INDENT_UNIT if names: expected = "(" + ", ".join(names) + ")" else: expected = "" assert [str(l) for l in c.lines()] == [f"class A{expected}:", x + "pass"] @given(indent_levels) def test_class_with_fields(self, level: int): c = py.Class( "A", statements=[ py.Assignment("a", py.Literal(2)), py.Assignment("b", py.Literal(3)), ], ) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "a = 2", x * (level + 1) + "b = 3", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "class C:", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level, comments=False)] == [ x * level + "class C:", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr( py.Class( name="C", statements=stmts, superclasses=["A"], comments=["hi"] ) ) == f"Class(name='C', statements={stmts!r}, superclasses=['A'], comments=['hi'])" ) class TestExpression: def test_str_must_be_implemented(self): with pytest.raises(NotImplementedError): str(py.Expression()) class TestStandalone: @given(standalones, standalones) def test_equal_iff_components_are_equal(self, a: py.Standalone, b: py.Standalone): assert a != b or (a.expr == b.expr and a.comments == b.comments) @given(expressions, indent_levels) def test_lines_returns_the_expression_as_single_line( self, e: py.Expression, level: int ): stmt = py.Standalone(e) lines = stmt.lines(level) assert len(lines) == 1 line = lines[0] assert line.indent_level == level assert line.text == str(e) @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [x * level + "a"] def test_repr(self): expr = py.Symbol("a") assert ( repr(py.Standalone(expr, comments=["hi"])) == f"Standalone({expr!r}, comments=['hi'])" ) class TestLiteral: @given(literals, literals) def test_equal_iff_components_are_equal(self, a: py.Literal, b: py.Literal): assert a != b or a.value == b.value scalars = one_of(none(), booleans(), text(max_size=5), integers(), floats()) @given(scalars) def test_literal_scalar_uses_repr(self, x): assert str(py.Literal(x)) == repr(x) @given(lists(scalars)) def test_literal_list_of_scalars_uses_repr(self, x: list): assert str(py.Literal(x)) == repr(x) @given(dictionaries(scalars, scalars)) def test_literal_dict_of_scalars_uses_repr(self, x: dict): assert str(py.Literal(x)) == repr(x) def test_literal_composites_with_expr_use_repr_except_for_expr(self): lit = py.Literal("b") assert repr(lit) == "Literal('b')", "repr works on literal" assert str(py.Literal([1, {"a": lit}])) == "[1, {'a': 'b'}]" assert repr(lit) == "Literal('b')", "repr is still working on literal" class TestFString: def test_strings_appear_as_fstrings(self): assert str(py.FString("")) == "f''" assert str(py.FString("ab")) == "f'ab'" assert str(py.FString("a'b")) == """ f"a'b" """.strip() assert str(py.FString('a"b')) == """ f'a"b' """.strip() def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.FString(24)) def test_format_template_is_not_replaced(self): a = 2 assert str(py.FString("a {a} {} {a!r}")) == "f'a {a} {} {a!r}'" class TestSymbol: @given(symbols, symbols) def test_equal_iff_components_are_equal(self, a: py.Symbol, b: py.Symbol): assert a != b or a.name == b.name @given(text(string.ascii_letters)) def test_strings_appear_unchanged(self, s: str): assert str(py.Symbol(s)) == s def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.Symbol(True)) def test_repr(self): assert repr(py.Symbol(" x'\" y ")) == "Symbol(' x\\'\" y ')" class TestFunctionCall: @given(function_calls, function_calls) def test_equal_iff_components_are_equal( self, a: py.FunctionCall, b: py.FunctionCall ): assert a != b or ( a.name == b.name and a.positional_args == b.positional_args and a.named_args == b.named_args ) def test_with_no_args(self): assert str(py.FunctionCall("f")) == "f()" def test_with_positional_args(self): assert str(py.FunctionCall("f", [py.Literal(2)])) == "f(2)" def test_with_kwargs(self): assert ( str( py.FunctionCall( "f", named_args={"a": py.Literal(2), "bc": py.Literal("x")} ) ) == "f(a=2, bc='x')" ) def test_with_positional_and_kwargs(self): assert ( str( py.FunctionCall( "m.f", [py.Literal(True), py.FunctionCall("g", [py.Symbol("f")])], {"a": py.Literal(2), "bc": py.Literal("x")}, ) ) == "m.f(True, g(f), a=2, bc='x')" ) def test_repr(self): arg = py.Symbol("a") kwarg = py.Symbol("v") assert ( repr(py.FunctionCall("f", [arg], {"k": kwarg})) == f"FunctionCall('f', [{arg!r}], {{'k': {kwarg!r}}})" ) class TestBinaryOp: @given(binary_ops, binary_ops) def test_equal_iff_components_are_equal(self, a: py.BinaryOp, b: py.BinaryOp): assert a != b or (a.op == b.op and a.lhs == b.lhs and a.rhs == b.rhs) def test_simple(self): assert str(py.BinaryOp(py.Literal(2), "**", py.Literal(10))) == "2 ** 10" def test_nested(self): assert ( str( py.BinaryOp( py.Literal(2), "+", py.BinaryOp( py.BinaryOp(py.Literal(3), "-", py.Literal(4)), "*", py.Literal(5), ), ) ) == "2 + ((3 - 4) * 5)" ) class TestAssignment: @given(assignments, assignments) def test_equal_iff_components_are_equal(self, a: py.Assignment, b: py.Assignment): assert a != b or ( a.lhs == b.lhs and a.rhs == b.rhs and a.comments == b.comments ) @given(indent_levels) def test_simple(self, level: int): x = py.Line.INDENT_UNIT assert [str(l) for l in py.Assignment("foo", py.Literal(3)).lines(level)] == [ x * level + "foo = 3" ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "x = a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "x = a" ] def test_repr(self): rhs = py.Symbol("a") assert ( repr(py.Assignment("x", rhs, comments=["hi"])) == f"Assignment(lhs='x', rhs={rhs!r}, comments=['hi'])" ) class TestIfElse: @given(ifelses, ifelses) def test_equal_iff_components_are_equal(self, a: py.IfElse, b: py.IfElse): assert a != b or ( a.condition_blocks == b.condition_blocks and a.else_block == b.else_block and a.comments == b.comments ) def test_init_with_no_condition_raises_error(self): with pytest.raises(ValueError): py.IfElse([]) with pytest.raises(ValueError): py.IfElse([], else_block=[]) with pytest.raises(ValueError): py.IfElse([], else_block=[py.Assignment("a", py.Literal(2))]) @given(indent_levels) def test_lines_for_single_if(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ] ).lines(level) ] == [x * level + "if t is None:", x * (level + 1) + "t = 1"] @given(indent_levels) def test_lines_for_if_else(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ], [py.Assignment("t", py.Literal(2))], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "else:", x * (level + 1) + "t = 2", ] @given(indent_levels) def test_lines_for_if_elif(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ), ( py.Literal(False), [ py.Assignment("t", py.Literal(2)), py.Standalone(py.FunctionCall("f", [py.Symbol("t")])), ], ), ] ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "elif False:", x * (level + 1) + "t = 2", x * (level + 1) + "f(t)", ] @given(indent_levels) def test_lines_for_if_elif_else_with_no_statements(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ (py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), []), (py.Literal(False), []), (py.Literal(True), []), ], [], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "pass", x * level + "elif False:", x * (level + 1) + "pass", x * level + "elif True:", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "if True:", *[str(l) for l in if_true.lines(level + 1)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "if True:", *[str(l) for l in if_true.lines(level + 1, comments=False)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1, comments=False)], ] def test_repr(self): cond = py.Literal(True) if_true = [(cond, [py.Assignment("x", py.Symbol("a"))])] stmt = py.IfElse(condition_blocks=if_true, comments=["hi"]) assert ( repr(stmt) == f"IfElse(condition_blocks={if_true!r}, else_block=None, comments=['hi'])" ) class TestImport: @given(imports, imports) def test_equal_iff_components_are_equal(self, a: py.Import, b: py.Import): assert a != b or ( a.targets == b.targets and a.source == b.source and a.alias == b.alias and a.comments == b.comments ) def test_init_without_targets_raises_error(self): with pytest.raises(ValueError): py.Import([]) @given(indent_levels) def test_lines_without_targets_raises_error(self, level: int): i = py.Import(["safe"]) i.targets.clear() with pytest.raises(ValueError): i.lines(level) @given(indent_levels) def test_lines_with_single_target(self, level: int): x = py.Line.INDENT_UNIT name = "locust.http" assert [str(l) for l in py.Import([name]).lines(level)] == [ x * level + f"import {name}" ] @given(indent_levels) def test_lines_with_multiple_targets(self, level: int): x = py.Line.INDENT_UNIT names = ["locust.http", "math", "a.b.c"] assert [str(l) for l in py.Import(names).lines(level)] == [ x * level + f"import {name}" for name in names ] @given(indent_levels) def test_lines_with_single_target_and_alias(self, level: int): x = py.Line.INDENT_UNIT name = "transformer.python" alias = "py" assert [str(l) for l in py.Import([name], alias=alias).lines(level)] == [ x * level + f"import {name} as {alias}" ] def test_init_with_multiple_targets_and_alias_raises_error(self): with pytest.raises(ValueError): py.Import(["a", "b"], alias="c") @given(indent_levels) def test_lines_with_multiple_targets_and_alias_raises_error(self, level: int): i = py.Import(["safe"], alias="A") i.targets.append("oops") with pytest.raises(ValueError): i.lines(level) @given(indent_levels, lists(text(string.ascii_letters, min_size=1), min_size=1)) def test_lines_with_targets_and_source(self, level: int, targets: List[str]): x = py.Line.INDENT_UNIT source = "bar" assert [str(l) for l in py.Import(targets, source).lines(level)] == [ x * level + f"from {source} import {target}" for target in targets ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "import a", x * level + "import b", x * level + "import c", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "import a", x * level + "import b", x * level + "import c", ] def test_repr(self): stmt = py.Import(targets=["a", "b"], comments=["hi"]) assert ( repr(stmt) == f"Import(targets=['a', 'b'], source=None, alias=None, comments=['hi'])" ) class TestExpressionView: def test_wraps_int_into_literal(self): def f(x: int) -> py.Literal: return py.Literal(x * 2) ev = py.ExpressionView(name="hello", target=lambda: 7, converter=f) assert ev.converter(ev.target()) == py.Literal(14) assert str(ev) == "14" PK!7transformer/test_request.py# pylint: skip-file from unittest.mock import MagicMock import pytest from transformer.request import * class TestFromHarEntry: def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(KeyError): invalid_dict = {"some": "data"} Request.from_har_entry(invalid_dict) def test_it_returns_a_request_given_a_get_request(self): get_request = { "request": {"method": "GET", "url": ""}, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(get_request) assert isinstance(request, Request) assert request.method == HttpMethod.GET def test_it_returns_a_request_given_a_post_request(self): post_request = { "request": { "method": "POST", "url": "", "postData": "{'some name': 'some value'}", }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(post_request) assert isinstance(request, Request) assert request.method == HttpMethod.POST assert request.post_data == "{'some name': 'some value'}" def test_it_returns_a_request_given_a_put_request(self): put_request = { "request": { "method": "PUT", "url": "", "postData": "{'some name': 'some value'}", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(put_request) assert isinstance(request, Request) assert request.method == HttpMethod.PUT assert request.post_data == "{'some name': 'some value'}" assert request.query == [QueryPair(name="some name", value="some value")] def test_it_returns_a_request_with_headers_given_an_options_request(self): options_request = { "request": { "method": "OPTIONS", "url": "", "postData": "", "headers": [{"name": "Access-Control-Request-Method", "value": "POST"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(options_request) assert isinstance(request, Request) assert request.method == HttpMethod.OPTIONS assert request.headers == [ Header(name="Access-Control-Request-Method", value="POST") ] def test_it_returns_a_request_with_a_query_given_a_delete_request_with_a_query( self ): delete_request = { "request": { "method": "DELETE", "url": "", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(delete_request) assert isinstance(request, Request) assert request.method == HttpMethod.DELETE assert request.query == [QueryPair(name="some name", value="some value")] class TestAllFromHar: @pytest.mark.skip(reason="Doesn't raise AssertionError; to be investigated.") def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(AssertionError): invalid_dict = {"some": "data"} Request.all_from_har(invalid_dict) def test_it_returns_a_list_of_requests_given_a_valid_dict(self): valid_dict = { "log": { "entries": [ { "request": {"method": "GET", "url": "", "postData": ""}, "startedDateTime": "2018-01-01", } ] } } assert isinstance(Request.all_from_har(valid_dict), Iterator) for request in Request.all_from_har(valid_dict): assert isinstance(request, Request) class TestTaskName: def test_it_generates_a_task_name_given_a_request(self): a_request = MagicMock() a_request.method.name = "some_name" a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_task_name = Request.task_name(a_request) a_duplicate_task_name = Request.task_name(a_request) assert a_task_name == a_duplicate_task_name a_request.method.name = "some_other_name" a_different_task_name = Request.task_name(a_request) assert a_task_name != a_different_task_name PK!ՔAl))transformer/test_scenario.pyimport logging import re import string from pathlib import Path from typing import List from unittest.mock import MagicMock, patch import pytest from hypothesis import given from hypothesis.strategies import lists, text, recursive, tuples from transformer.helpers import DUMMY_HAR_STRING, _DUMMY_HAR_DICT from transformer.scenario import Scenario, SkippableScenarioError, WeightValueError from transformer.task import Task paths = recursive( text(string.printable.replace("/", ""), min_size=1, max_size=3).filter( lambda s: s != "." and s != ".." ), lambda x: tuples(x, x).map("/".join), max_leaves=4, ).map(Path) class TestScenario: @patch("transformer.scenario.Path.is_dir", MagicMock(return_value=False)) @patch("transformer.scenario.Path.iterdir", MagicMock(return_value=())) @patch("transformer.scenario.Path.open") @patch("transformer.scenario.json.load", MagicMock(return_value=_DUMMY_HAR_DICT)) @given(paths=lists(paths, unique=True, min_size=2)) def test_names_are_unique(*_, paths: List[Path]): scenario_names = [Scenario.from_path(path).name for path in paths] assert sorted(set(scenario_names)) == sorted(scenario_names) assert len(paths) == len(scenario_names) def test_creation_from_scenario_directory_with_weight_file(self, tmp_path: Path): root_path = tmp_path / "some-path" root_path.mkdir() expected_weight = 7 root_path.with_suffix(".weight").write_text(str(expected_weight)) nb_har_files = 2 for i in range(nb_har_files): root_path.joinpath(f"{i}.har").write_text(DUMMY_HAR_STRING) result = Scenario.from_path(root_path) assert len(result.children) == nb_har_files assert result.weight == expected_weight class TestFromPath: def test_on_har_raises_error_with_incorrect_har(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(not_har_path) def test_on_dir_ignores_some_incorrect_hars(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") har_path = tmp_path / "good.har" har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(tmp_path) assert len(scenario.children) == 1 assert scenario.children[0].origin == har_path def test_on_dir_raises_error_with_all_incorrect_hars(self, tmp_path: Path): for i in range(2): tmp_path.joinpath(f"{i}.nothar").write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) def test_on_dir_with_dangling_weights_raises_error( self, tmp_path: Path, caplog ): (tmp_path / "ok.har").write_text(DUMMY_HAR_STRING) (tmp_path / "fail.weight").write_text("7") caplog.set_level(logging.INFO) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "weight file" in caplog.text assert any( r.levelname == "ERROR" for r in caplog.records ), "at least one ERROR logged" def test_records_global_code_blocks_from_tasks(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) scenario = Scenario("scenario", [t1, t2, t3], origin=None) assert scenario.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_from_scenarios(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) s1 = Scenario("s1", [t1, t2], origin=None) s2 = Scenario("s2", [t3], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_uniquely(self): common_blocks = {"x": ["a", "b"]} t1 = Task( "t1", request=MagicMock(), global_code_blocks={**common_blocks, "t1b": ["uvw"]}, ) t2 = Task( "t2", request=MagicMock(), global_code_blocks={**common_blocks, "t2b": ["xyz"]}, ) s1 = Scenario("s1", [t1], origin=None) s2 = Scenario("s2", [t2], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == { **common_blocks, "t1b": ["uvw"], "t2b": ["xyz"], } def test_without_weight_file_has_weight_1(self, tmp_path: Path): har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 1 def test_with_weight_file_has_corresponding_weight(self, tmp_path: Path): weight_path = tmp_path / "test.weight" weight_path.write_text("74") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 74 @pytest.mark.parametrize("weight", [0, -2, 2.1, -2.1, "NaN", "abc", " "]) def test_with_invalid_weight_raises_error_and_never_skips( self, tmp_path: Path, weight ): legit_har_path = tmp_path / "legit.har" legit_har_path.write_text(DUMMY_HAR_STRING) bad_weight_path = tmp_path / "test.weight" bad_weight_path.write_text(str(weight)) bad_weight_har_path = tmp_path / "test.har" bad_weight_har_path.write_text(DUMMY_HAR_STRING) with pytest.raises(WeightValueError): # If from_path was skipping the bad scenario/weight pair, it # would not raise because there is another valid scenario, # legit.har. Scenario.from_path(tmp_path) def test_with_many_weight_files_selects_weight_based_on_name( self, tmp_path: Path ): expected_weight_path = tmp_path / "test.weight" expected_weight_path.write_text("7") first_wrong_weight_path = tmp_path / "a.weight" first_wrong_weight_path.write_text("2") second_wrong_weight_path = tmp_path / "1.weight" second_wrong_weight_path.write_text("4") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 7 def test_uses_full_path_for_scenario_name(self, tmp_path: Path): har_basename = "e3ee4a1ef0817cde0a0a78c056e7cb35" har_path = tmp_path / har_basename har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(har_path) words_in_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", scenario.name) } assert har_basename in words_in_scenario_name words_in_parent_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(tmp_path)) } words_in_scenario_name_not_from_har_basename = words_in_scenario_name - { har_basename } assert ( words_in_parent_path <= words_in_scenario_name_not_from_har_basename ), "all components of the parent path must be in the scenario name" def test_uses_full_path_for_parents_and_basename_for_children( self, tmp_path: Path ): root_basename = "615010a656a5bb29d1898f163619611f" root = tmp_path / root_basename root.mkdir() for i in range(2): (root / f"s{i}.har").write_text(DUMMY_HAR_STRING) root_scenario = Scenario.from_path(root) words_in_root_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", root_scenario.name) } words_in_root_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(root)) } assert ( words_in_root_path <= words_in_root_scenario_name ), "parent scenario's name must come from full path" assert len(root_scenario.children) == 2 child_scenario_names = {c.name for c in root_scenario.children} assert child_scenario_names == { "s0", "s1", }, "child scenarios have short names" def test_raises_error_for_colliding_scenario_names_from_har_files( self, tmp_path: Path, caplog ): (tmp_path / "good.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.json").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert "bad.har" in caplog.text assert "bad.json" in caplog.text def test_raises_error_for_colliding_scenario_names_from_directory_and_file( self, tmp_path: Path, caplog ): directory = tmp_path / "x" directory.mkdir() # directory needs to contain a HAR file, otherwise Transformer will # not consider it a scenario. (directory / "a.har").write_text(DUMMY_HAR_STRING) (tmp_path / "x.har").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert re.search(r"\bx\b", caplog.text) assert re.search(r"\bx.har\b", caplog.text) PK!aŞXKXKtransformer/test_task.py# pylint: skip-file import enum import io from unittest.mock import MagicMock, Mock from unittest.mock import patch from urllib.parse import urlparse import pytest from hypothesis import given from hypothesis.strategies import composite, sampled_from, booleans from transformer import python as py from transformer.request import Header, QueryPair from transformer.task import ( Task, Request, HttpMethod, TIMEOUT, LocustRequest, Task2, RequestsPostData, JSON_MIME_TYPE, req_to_expr, lreq_to_expr, ) class TestTask: class TestFromRequests: def test_it_returns_a_task(self): request = MagicMock() request.timestamp = 1 second_request = MagicMock() second_request.timestamp = 2 assert all( isinstance(t, Task) for t in Task.from_requests([request, second_request]) ) @patch("builtins.open") def test_it_doesnt_create_a_task_if_the_url_is_on_the_blacklist( self, mock_open ): mock_open.return_value = io.StringIO("amazon") request = MagicMock() request.url = MagicMock() request.url.netloc = "www.amazon.com" task = Task.from_requests([request]) assert len(list(task)) == 0 @patch("builtins.open") def test_it_creates_a_task_if_the_path_not_host_is_on_the_blacklist( self, mock_open ): mock_open.return_value = io.StringIO("search\namazon") request = MagicMock() request.url = urlparse("https://www.google.com/search?&q=amazon") task = Task.from_requests([request]) assert len(list(task)) == 1 class TestReplaceURL: def test_it_creates_a_locust_request_when_there_is_none(self): task = Task(name="some name", request=MagicMock()) modified_task = Task.replace_url(task, "") assert modified_task.locust_request def test_it_returns_a_task_with_the_replaced_url(self): locust_request = LocustRequest( method=MagicMock(), url=MagicMock(), headers=MagicMock() ) task = Task( name="some name", request=MagicMock(), locust_request=locust_request ) expected_url = 'f"http://a.b.c/{some.value}/"' modified_task = Task.replace_url(task, expected_url) assert modified_task.locust_request.url == expected_url class TestTask2: class TestFromTask: def test_without_locust_request_it_proxies_the_request(self): req = Mock(spec_set=Request) task = Task(name="T", request=req) task2 = Task2.from_task(task) assert task2.name == "T" assert task2.request == req assert len(task2.statements) == 1 assign = task2.statements[0] assert isinstance(assign, py.Assignment) assert assign.lhs == "response" assert isinstance(assign.rhs, py.ExpressionView) assert assign.rhs.target() == task2.request assert assign.rhs.converter is req_to_expr def test_with_locust_request_it_proxies_it(self): lr = Mock(spec_set=LocustRequest) req = Mock(spec_set=Request) task = Task(name="T", request=req, locust_request=lr) task2 = Task2.from_task(task) assert task2.name == "T" assert task2.request == req assert len(task2.statements) == 1 assign = task2.statements[0] assert isinstance(assign, py.Assignment) assert assign.lhs == "response" assert isinstance(assign.rhs, py.ExpressionView) assert assign.rhs.target() == lr assert assign.rhs.converter is lreq_to_expr class _KindOfDict(enum.Flag): Text = enum.auto() Params = enum.auto() Both = Text | Params _formats = sampled_from(("json", "www")) _kinds_of_dicts = sampled_from(_KindOfDict) # From http://www.softwareishard.com/blog/har-12-spec/#postData. @composite def har_post_dicts(draw, format=None): format = format or draw(_formats) if format == "json": d = {"mimeType": "application/json", "text": """{"a":"b", "c": "d"}"""} if draw(booleans()): d["params"] = [] if draw(booleans()): d["comment"] = "" return d d = {"mimeType": "application/x-www-form-urlencoded"} kind = draw(_kinds_of_dicts) if kind & _KindOfDict.Text: d["text"] = "a=b&c=d" if draw(booleans()): d.setdefault("params", []) if kind & _KindOfDict.Params: d["params"] = [{"name": "a", "value": "b"}, {"name": "c", "value": "d"}] if draw(booleans()): d.setdefault("text", "") return d class TestRequestPostData: def test_as_kwargs_only_shows_defined(self): v, w = MagicMock(), MagicMock() assert RequestsPostData(data=v).as_kwargs() == {"data": v} assert RequestsPostData(params=v, json=w).as_kwargs() == { "params": v, "json": w, } class TestFromHarPostData: @given(har_post_dicts(format="json")) def test_it_selects_json_approach_for_json_format(self, d: dict): rpd = RequestsPostData.from_har_post_data(d) assert rpd.json == py.Literal({"a": "b", "c": "d"}) assert rpd.data is None @given(har_post_dicts(format="www")) def test_it_selects_data_approach_for_urlencoded_format(self, d: dict): rpd = RequestsPostData.from_har_post_data(d) assert rpd.json is None assert rpd.data == py.Literal(b"a=b&c=d") or rpd.params == py.Literal( [(b"a", b"b"), (b"c", b"d")] ) @given(har_post_dicts()) def test_it_doesnt_raise_error_on_valid_input(self, d: dict): RequestsPostData.from_har_post_data(d) def test_it_raises_on_post_data_without_text_or_params(self): with pytest.raises(ValueError): RequestsPostData.from_har_post_data({"mimeType": "nil"}) def test_it_raises_on_invalid_json(self): with pytest.raises(ValueError): RequestsPostData.from_har_post_data( {"mimeType": JSON_MIME_TYPE, "text": "not json"} ) @pytest.mark.parametrize( "mime,kwarg,val", ( (JSON_MIME_TYPE, "json", {}), ("application/x-www-form-urlencoded", "data", b"{}"), ), ) def test_it_accepts_both_params_and_text(self, mime: str, kwarg, val): expected_fields = { "params": py.Literal([(b"n", b"v")]), kwarg: py.Literal(val), } assert RequestsPostData.from_har_post_data( { "mimeType": mime, "text": "{}", "params": [{"name": "n", "value": "v"}], } ) == RequestsPostData(**expected_fields) class TestReqToExpr: def test_it_supports_get_requests(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.GET, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("x", "y")], # query is currently ignored for GET ) assert req_to_expr(r) == py.FunctionCall( name="self.client.get", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) def test_it_supports_urlencoded_post_requests(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data={ "mimeType": "application/x-www-form-urlencoded", "params": [{"name": "x", "value": "y"}], "text": "z=7", }, ) assert req_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "data": py.Literal(b"z=7"), "params": py.Literal([(b"x", b"y")]), }, ) def test_it_supports_json_post_requests(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data={ "mimeType": "application/json", "params": [{"name": "x", "value": "y"}], "text": """{"z": 7}""", }, ) assert req_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "json": py.Literal({"z": 7}), "params": py.Literal([(b"x", b"y")]), }, ) def test_it_supports_empty_post_requests(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data=None, ) assert req_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) def test_it_supports_put_requests_with_payload(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.PUT, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("c", "d")], post_data={ "mimeType": "application/json", "params": [{"name": "x", "value": "y"}], "text": """{"z": 7}""", }, ) assert req_to_expr(r) == py.FunctionCall( name="self.client.put", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "json": py.Literal({"z": 7}), "params": py.Literal([(b"x", b"y"), (b"c", b"d")]), }, ) def test_it_supports_put_requests_without_payload(self): url = "http://abc.de" r = Request( timestamp=MagicMock(), method=HttpMethod.PUT, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("c", "d")], post_data=None, ) assert req_to_expr(r) == py.FunctionCall( name="self.client.put", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "params": py.Literal([(b"c", b"d")]), }, ) def test_it_uses_the_custom_name_if_provided(self): url = "http://abc.de" name = "my-req" r = Request( name=name, timestamp=MagicMock(), method=HttpMethod.GET, url=urlparse(url) ) assert req_to_expr(r) == py.FunctionCall( name="self.client.get", named_args={ "url": py.Literal(url), "name": py.Literal(name), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) class TestLreqToExpr: def test_it_supports_get_requests(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.GET, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("x", "y")], # query is currently ignored for GET ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.get", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) def test_it_supports_fstring_urls(self): url = "http://abc.{tld}" r = LocustRequest(method=HttpMethod.GET, url=f"f'{url}'", headers={"a": "b"}) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.get", named_args={ "url": py.FString(url), "name": py.FString(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) def test_it_supports_urlencoded_post_requests(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data={ "mimeType": "application/x-www-form-urlencoded", "params": [{"name": "x", "value": "y"}], "text": "z=7", }, ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "data": py.Literal(b"z=7"), "params": py.Literal([(b"x", b"y")]), }, ) def test_it_supports_json_post_requests(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data={ "mimeType": "application/json", "params": [{"name": "x", "value": "y"}], "text": """{"z": 7}""", }, ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "json": py.Literal({"z": 7}), "params": py.Literal([(b"x", b"y")]), }, ) def test_it_supports_empty_post_requests(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.POST, url=urlparse(url), headers=[Header("a", "b")], post_data=None, ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.post", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) def test_it_supports_put_requests_with_payload(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.PUT, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("c", "d")], post_data={ "mimeType": "application/json", "params": [{"name": "x", "value": "y"}], "text": """{"z": 7}""", }, ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.put", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "json": py.Literal({"z": 7}), "params": py.Literal([(b"x", b"y"), (b"c", b"d")]), }, ) def test_it_supports_put_requests_without_payload(self): url = "http://abc.de" r = LocustRequest.from_request( Request( timestamp=MagicMock(), method=HttpMethod.PUT, url=urlparse(url), headers=[Header("a", "b")], query=[QueryPair("c", "d")], post_data=None, ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.put", named_args={ "url": py.Literal(url), "name": py.Literal(url), "headers": py.Literal({"a": "b"}), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), "params": py.Literal([(b"c", b"d")]), }, ) def test_it_uses_the_custom_name_if_provided(self): url = "http://abc.de" name = "my-req" r = LocustRequest.from_request( Request( name=name, timestamp=MagicMock(), method=HttpMethod.GET, url=urlparse(url), ) ) assert lreq_to_expr(r) == py.FunctionCall( name="self.client.get", named_args={ "url": py.Literal(url), "name": py.Literal(name), "timeout": py.Literal(TIMEOUT), "allow_redirects": py.Literal(False), }, ) PK!CDDI I transformer/test_transform.py# pylint: skip-file import io from pathlib import Path import pytest import transformer import transformer.transform as tt from transformer.helpers import DUMMY_HAR_STRING from transformer.locust import locustfile_lines from transformer.plugins import plugin, Contract class TestTransform: def test_it_returns_a_locustfile_program_given_scenario_path(self, tmp_path: Path): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) locustfile_contents = str(tt.transform(har_path)) try: compile(locustfile_contents, "locustfile.py", "exec") except Exception as exception: pytest.fail(f"Compiling locustfile failed. [{exception}].") def test_it_uses_default_plugins(self, tmp_path: Path, monkeypatch): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) times_plugin_called = 0 # We don't need to specify a plugin signature here because signatures # are only checked at plugin name resolution. def fake_plugin(tasks): nonlocal times_plugin_called times_plugin_called += 1 return tasks monkeypatch.setattr(tt, "DEFAULT_PLUGINS", [fake_plugin]) tt.transform(har_path, plugins=[]) # explicitly provide no plugins assert times_plugin_called == 1 def dump_as_str(*args, **kwargs): """ Wraps transformer.dump by passing it a StringIO buffer as file argument and returning the final contents of that buffer. This makes transformer.dump behave like transformer.dumps, and thus allows to test their output more easily. """ s = io.StringIO() transformer.dump(s, *args, **kwargs) return s.getvalue() class TestDumpAndDumps: @pytest.mark.parametrize("f", (transformer.dumps, dump_as_str)) def test_with_no_paths_it_returns_empty_locustfile(self, f): expected_empty_locustfile = "\n".join( locustfile_lines(scenarios=[], program_plugins=()) ) assert f([]) == expected_empty_locustfile def test_dump_and_dumps_have_same_output_for_simple_har(self, tmp_path): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) assert transformer.dumps([tmp_path]) == dump_as_str([tmp_path]) @pytest.mark.parametrize( "f,with_default,expected_times_called", ( (f, *case) for f in (transformer.dumps, dump_as_str) for case in ((True, 1), (False, 0)) ), ) def test_it_uses_default_plugins( self, tmp_path: Path, monkeypatch, f, with_default, expected_times_called ): har_path = tmp_path / "some.har" har_path.write_text(DUMMY_HAR_STRING) times_plugin_called = 0 @plugin(Contract.OnScenario) def fake_plugin(t): nonlocal times_plugin_called times_plugin_called += 1 return t monkeypatch.setattr(tt, "DEFAULT_PLUGINS", [fake_plugin]) f([har_path], with_default_plugins=with_default) assert times_plugin_called == expected_times_called PK!xYtransformer/transform.py""" :mod:transformer.transform -- Entrypoint functions ================================================== Defines user-facing functions performing all or most of the HAR-to-locustfile conversion. """ import warnings from pathlib import Path from typing import Sequence, Union, Iterable, TextIO, Iterator, TypeVar import transformer.plugins as plug from transformer.locust import locustfile, locustfile_lines from transformer.plugins import sanitize_headers, Contract from transformer.plugins.contracts import Plugin from transformer.scenario import Scenario DEFAULT_PLUGINS = (sanitize_headers.plugin,) def transform( scenarios_path: Union[str, Path], plugins: Sequence[Plugin] = (), with_default_plugins: bool = True, ) -> str: """ This function is deprecated and will be removed in a future version. Do not rely on it. Reason: It only accepts one scenario path at a time, and requires plugins to be already resolved (and therefore that users use transformer.plugins.resolve, which is kind of low-level). Both dumps & dump lift these constraints and have a more familiar naming (see json.dump/s, etc.). Deprecated since: v1.0.2. """ warnings.warn(DeprecationWarning("transform: use dump or dumps instead")) if with_default_plugins: plugins = (*DEFAULT_PLUGINS, *plugins) return locustfile([Scenario.from_path(Path(scenarios_path), plugins)]) LaxPath = Union[str, Path] PluginName = str def dumps( scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName] = (), with_default_plugins: bool = True, ) -> str: """ Transforms the provided *scenario_paths* using the provided *plugins*, and returns the resulting locustfile code as a string. See also: :func:`dump` :param scenario_paths: paths to scenario files (HAR) or directories :param plugins: names of plugins to use :param with_default_plugins: whether the default plugins should be used in addition to those provided (recommended: True) """ return "\n".join(_dump_as_lines(scenario_paths, plugins, with_default_plugins)) def dump( file: TextIO, scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName] = (), with_default_plugins: bool = True, ) -> None: """ Transforms the provided *scenario_paths* using the provided *plugins*, and writes the resulting locustfile code in the provided *file*. See also: :func:`dumps` :param file: an object with a `writelines` method (as specified by io.TextIOBase), e.g. `sys.stdout` or the result of `open`. :param scenario_paths: paths to scenario files (HAR) or directories. :param plugins: names of plugins to use. :param with_default_plugins: whether the default plugins should be used in addition to those provided (recommended: True). """ file.writelines( intersperse("\n", _dump_as_lines(scenario_paths, plugins, with_default_plugins)) ) def _dump_as_lines( scenario_paths: Iterable[LaxPath], plugins: Sequence[PluginName], with_default_plugins: bool, ) -> Iterator[str]: plugins = [p for name in plugins for p in plug.resolve(name)] if with_default_plugins: plugins = (*DEFAULT_PLUGINS, *plugins) plugins_for = plug.group_by_contract(plugins) scenarios = [ Scenario.from_path( path, plugins_for[Contract.OnTask], plugins_for[Contract.OnTaskSequence] ).apply_plugins(plugins_for[Contract.OnScenario]) for path in scenario_paths ] yield from locustfile_lines(scenarios, plugins_for[Contract.OnPythonProgram]) T = TypeVar("T") def intersperse(delim: T, iterable: Iterable[T]) -> Iterator[T]: """ >>> list(intersperse(",", "a")) ['a'] >>> list(intersperse(",", "")) [] >>> list(intersperse(",", "abc")) ['a', ',', 'b', ',', 'c'] >>> list(intersperse(",", ["a", "b", "c"])) ['a', ',', 'b', ',', 'c'] """ it = iter(iterable) try: yield next(it) except StopIteration: return for x in it: yield delim yield x PK!Ha76A0har_transformer-1.1.0.dist-info/entry_points.txtN+I/N.,()*)J+N/M-Eb%dZAŧUgpqPK!d*++'har_transformer-1.1.0.dist-info/LICENSEMIT License Copyright (c) 2019 Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU%har_transformer-1.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H1 (har_transformer-1.1.0.dist-info/METADATAWNF8T Plu. hEJdlO)cwfC $=36xR@wf[fhB ~aJs0;TyFLϥJ"}"M*C8#?lŔ70i~ d\hc?LӅ`L hEYD2 \1ݔhY>Tk>  繒+wD(2Qrh"t٢ðU$tR}G[ 2Œ}cf2ּs[1!ej,Y5RO+u]BkL%M]pmB ])LrtOhK-KC:C`US]< b)TX}KaZβEj6AΗ<{W~~fSټ{6-Dgʜ|ujL:`>^pr9xeL4 a I nGlsGn=5)fd3Y $F~cY/>1^PO2ǭk۱JDͧ^In%}]L 07yzWIvvu ؕM&(qZrh'W~I?lNÌv=%LYLq&DH;UWAsit_^^~Lp v} hG<4Pⳗ %0QNTG 8C\6Nwdw/j"8 齒%b.L~ ו(+.: šO$TT/JG`"tc{Lz9nx;ޖpGpZh͟x- Ǽ/QR]ZҪt_W)m 1U&-W=3!XNՉRAHC Jv@ O˘Oe,$`*pf. 9ÃPѧeHǏ #eZزn)?gجwAH%cөy8aoU3梳zqBʞh>%ME@NX߉M2 C[, CO{ἽޙQ[fF?x= (/V:,>r"b֢콵E#>CГ SUB1nEژ{2}Cȇl~I;jgBbTH+َ+9˅r{)ӂ#/"e$lZ5$ )c`ZBI* }͌~`h]&GDˬUc&4h]N: :z~@?(iuN:YNRe}PZ"b5/z3 _xE@!`?'l֥[l#8hFTr=3dE7Qk*GsZZtlҫQ0egK]eML7~ug\n;=XL<ϗ-6t͎<@gO PK!>>transformer/.urlignore_examplePK!S^OOztransformer/__init__.pyPK!WWtransformer/__main__.pyPK!"nwwtransformer/blacklist.pyPK!?87transformer/builders_python.pyPK!6W W G#transformer/cli.pyPK!p=0transformer/helpers.pyPK!м{uu2transformer/locust.pyPK!@y3Gtransformer/naming.pyPK!;VVeKtransformer/plugins/__init__.pyPK!2:ww Ltransformer/plugins/contracts.pyPK!v&\transformer/plugins/dummy.pyPK!.j^transformer/plugins/resolve.pyPK!ތ'gtransformer/plugins/sanitize_headers.mdPK!ÿss'jtransformer/plugins/sanitize_headers.pyPK!e e %lmtransformer/plugins/test_contracts.pyPK!j梺!ztransformer/plugins/test_dummy.pyPK!zr #8|transformer/plugins/test_resolve.pyPK!0sII,gtransformer/plugins/test_sanitize_headers.pyPK!e,pptransformer/python.pyPK!uZZ.transformer/request.pyPK!_JQ7Q7transformer/scenario.pyPK!k