PK!Ďpyproject.toml[tool.poetry] name = "har-transformer" version = "1.0.0" description = "A tool to convert HAR files into a locustfile." authors = [ "Serhii Cherniavskyi ", "Thibaut Le Page ", "Brian Maher ", "Oliwia Zaremba " ] license = "MIT" readme = "README.md" homepage = "https://github.com/zalando-incubator/transformer" repository = "https://github.com/zalando-incubator/transformer" keywords = ["load testing", "locust", "har"] classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Topic :: Software Development :: Testing :: Traffic Generation", "Topic :: Software Development :: Code Generators", "Topic :: Internet :: WWW/HTTP", ] packages = [{ include = "transformer" }] # pyproject.toml must be included: Transformer dynamically extracts the version # from there. include = ["pyproject.toml"] [tool.poetry.scripts] transformer = "transformer:cli.script_entrypoint" [tool.poetry.dependencies] python = "^3.6" pendulum = "^2.0" chevron = "^0.13" docopt = "^0.6.2" ecological = "^1.6" tomlkit = "^0.5.3" [tool.poetry.dev-dependencies] locustio = "^0.9.0" pytest = "^3.9" pytest-cov = "*" hypothesis = "^4.4" black = {version = "*",allows-prereleases = true} flake8 = "^3.7" flake8-bugbear = "^18.8" flake8-docstrings = "^1.3" flake8-tidy-imports = "^2.0" [build-system] requires = ["poetry>=0.12"] build-backend = "poetry.masonry.api" PK!>>transformer/.urlignore_example360yield abmr.net addthis.com adfarm adform.net adition.com adkontekst.pl admatic.com admeira.ch adnxs.com adsrvr.org akstat atdmt.com bidswitch.net bing bluekai.com calotag.com casalemedia.com cloudfront.net connectad.io converge-digital.com creativefactory.zalando crwdcntrl.net demdex.net doubleclick.net effitarget.com email-reflex.com exelator.com experianmarketingservices.digital facebook google himediads.com ibillboard.com intelliad.de keyxel.com krxd.net liadm.com lkqd.net mathtag.com mediabong.com metrigo.com metrigo.zalan.do mookie1.com mpulse nscontext.eu nuggad opecloud.com openx.net pixel pubmatic.com pubmine.com reflex rtb-seller.com sara.media semasio.net sharethis.com socialaudience.nl static-img tapad.com usabilla vdopia.com videoplaza.tv vimeo.com vimeocdn.com vmg.host yieldlab.net ztat.net admatic.com PK! 88transformer/__init__.pyfrom .__version__ import version __all__ = ["version"] PK!WWtransformer/__main__.pyfrom .cli import script_entrypoint if __name__ == "__main__": script_entrypoint() PK!2yytransformer/__version__.pyfrom pathlib import Path from typing import cast import tomlkit from tomlkit.toml_document import TOMLDocument def pyproject() -> TOMLDocument: with Path(__file__).parent.parent.joinpath("pyproject.toml").open() as f: pyproject = f.read() return tomlkit.parse(pyproject) def version() -> str: return cast(str, pyproject()["tool"]["poetry"]["version"]) PK!"nwwtransformer/blacklist.pyimport os import logging def on_blacklist(url): """ Checks for matching URLs in an ignore file (blacklist) from user's current directory. """ blacklist_file = f"{os.getcwd()}/.urlignore" try: with open(blacklist_file) as file: blacklist = [line.rstrip("\n") for line in file if len(line) > 1] for blacklist_item in blacklist: if blacklist_item in url: return True return False except OSError as err: logging.debug( "Could not read blacklist file %s. Reason: %s", blacklist_file, err ) return False PK!'BB transformer/builders_decision.pyfrom hypothesis.strategies import text, builds, booleans, just from .decision import Decision reasons = text(max_size=2) yes_decisions = builds(Decision, valid=just(True), reason=reasons) no_decisions = builds(Decision, valid=just(False), reason=reasons) decisions = builds(Decision, valid=booleans(), reason=reasons) PK!?8transformer/builders_python.py""" Hypothesis builders for property-based testing of the transformer.python module. """ import re import string from typing import Optional from hypothesis.searchstrategy import SearchStrategy from hypothesis.strategies import ( integers, text, lists, builds, deferred, one_of, recursive, just, none, booleans, floats, tuples, dictionaries, ) from transformer import python as py # Strategy for indentation levels we want to test with (just "no indentation" or # "one-level indentation" because "two-level indentation" will likely be the # same, and we don't want the tests running for too long). indent_levels = integers(min_value=0, max_value=1) def ascii_text(min_size: int = 0, max_size: Optional[int] = 5) -> SearchStrategy[str]: """ Strategy for ASCII strings, with a default max_size to avoid wasting time generating too much. """ return text(string.printable, min_size=min_size, max_size=max_size) _ascii_inline = re.sub(r"[\r\n\v\f]", "", string.printable) def ascii_inline_text( min_size: int = 0, max_size: Optional[int] = 5 ) -> SearchStrategy[str]: """Similar to ascii_text, but does not generate multiline strings.""" return text(_ascii_inline, min_size=min_size, max_size=max_size) # Strategy for identifiers, i.e. strings that can be used as symbols (function # names, etc.) in Python programs. # Unqualified identifiers cannot contain ".", qualified identifiers can # (but only between unqualified identifiers, i.e. not at the beginning or end). unqualified_identifiers = text(string.ascii_letters, min_size=1, max_size=5) qualified_identifiers = lists(unqualified_identifiers, min_size=2, max_size=3).map( ".".join ) identifiers = unqualified_identifiers | qualified_identifiers # Strategy for python.Line objects. lines = builds(py.Line, ascii_inline_text(), indent_levels) # Strategy for lists of strings to be used as comment text in tests. comments = lists(ascii_inline_text(), max_size=3) # Strategy for python.Statement objects. Basically, if you ask for a Statement, # you get an instance of one of Statement's subclasses. # All Statement subclasses should be mentioned here. # We need deferred to break the cyclic dependency between builds() that depend statements. statements = deferred( lambda: one_of( opaque_blocks, functions, decorations, classes, standalones, assignments, ifelses, imports, ) ) _atomic_blocks = text(string.ascii_letters + string.punctuation, min_size=1, max_size=3) _complex_blocks = recursive( _atomic_blocks, lambda b: text(string.whitespace, min_size=1, max_size=2).flatmap( lambda ws: tuples(b, b).map(ws.join) ), max_leaves=8, ) # Strategy for python.OpaqueBlock. We don't want whitespace-only comments # (should be ignored by the syntax tree, which requires boilerplate in tests) # so we build the text by joining non-whitespace strings together. # Since this strategy is the default in the statements strategy, which is widely # used in tests, it should be as fast as possible. opaque_blocks = builds(py.OpaqueBlock, block=_complex_blocks, comments=comments) # Strategy for python.Function objects. functions = builds( py.Function, name=unqualified_identifiers, params=lists(unqualified_identifiers, max_size=2), statements=lists(statements, max_size=2), comments=comments, ) # Strategy for python.Class objects. classes = builds( py.Class, name=unqualified_identifiers, statements=lists(statements, max_size=2), superclasses=lists(identifiers, max_size=2), comments=comments, ) # Strategy for python.Decoration objects. decorations = builds( py.Decoration, decorator=identifiers, target=one_of(functions, classes), comments=comments, ) # Strategy for python.Expression objects. Basically, if you ask for an Expression, # you get an instance of one of Expression's subclasses. # All Expression subclasses should be mentioned here. # Uses deferred for the same reason as Statement. expressions = deferred(lambda: one_of(symbols, literals, function_calls, binary_ops)) # Strategy for python.Standalone objects. standalones = builds(py.Standalone, expr=expressions, comments=comments) # Strategy for python.FString objects. The first examples are supposed to # trigger interpolation behavior to show that it doesn't happen with FString. fstrings = builds( py.FString, one_of(just(""), just("{a}"), text(min_size=1, max_size=5)) ) # Strategy for python.Literal objects. # The recursion doesn't use the set data type because python.Statement and # python.Expression objects are not hashable (because they are not immutable); # this is also why we use unqualified_identifiers as dictionary keys. literals = recursive( one_of(none(), booleans(), integers(), floats(), text(max_size=5)).map(py.Literal) | fstrings, lambda x: one_of( lists(x, max_size=2), tuples(x), dictionaries(unqualified_identifiers, x, max_size=2), ).map(py.Literal), max_leaves=8, ) # Strategy for python.Symbol objects. symbols = builds(py.Symbol, identifiers) # Strategy for python.FunctionCall objects. The size of argument collections is # limited for performance reasons and because handling 3 arguments is (hopefully) # the same as handling 2 arguments. function_calls = builds( py.FunctionCall, name=identifiers, positional_args=lists(expressions, max_size=2), named_args=dictionaries(unqualified_identifiers, expressions, max_size=2), ) # Strategy for reasonable operator names: "+", "++", "in", etc. operators = text(string.ascii_letters + string.punctuation, min_size=1, max_size=2) # Strategy for python.BinaryOp. binary_ops = builds(py.BinaryOp, lhs=expressions, op=operators, rhs=expressions) # Strategy for python.Assignment. assignments = builds(py.Assignment, lhs=identifiers, rhs=expressions, comments=comments) # Strategy for python.IfElse. The size of Statement sub-lists is limited for # performance reasons and because handling 3 statements is (hopefully) the same # as handling 2 statements. ifelses = builds( py.IfElse, condition_blocks=lists( tuples(expressions, lists(statements, max_size=2)), min_size=1, max_size=3 ), else_block=one_of(none(), lists(statements, max_size=2)), comments=comments, ) # Strategy for python.Import without an alias part. multi_imports = builds( py.Import, targets=lists(identifiers, min_size=1, max_size=2), source=one_of(none(), identifiers), ) # Strategy for python.Import with an alias part. aliased_imports = builds( py.Import, targets=tuples(identifiers), source=one_of(none(), identifiers), alias=unqualified_identifiers, ) # Strategy for python.Import. imports = multi_imports | aliased_imports PK!fj \ transformer/cli.py""" Transformer: Convert web browser sessions (HAR files) into Locust load testing scenarios (locustfiles). Usage: transformer [-p ]... [...] transformer --help transformer --version Options: --help Print this help message and exit. -p, --plugin= Use the specified plugin. Repeatable. --version Print version information and exit. """ import logging import sys from pathlib import Path from typing import Sequence, cast, Tuple import ecological from docopt import docopt from transformer import version from transformer import plugins as plug from transformer.locust import locustfile from transformer.scenario import Scenario class Config(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str, ...] = () def read_config(cli_args: Sequence[str]) -> Config: """ Combine command-line arguments & options (managed by docopt) with environment variables (managed by Ecological) into Ecological's Config class. Special cases: - If input paths are provided both from the environment and the command-line, only the paths provided from the command-line are taken into account. - If plugins are provided both from the environment and the command-line, the union of both groups is taken into account. """ arguments = docopt(__doc__, version=version(), argv=cli_args) # TODO: remove this redundancy once Ecological can re-read the environment # at run-time while still having a compile-time definition (Config). # See https://github.com/jmcs/ecological/issues/20. class conf(ecological.AutoConfig, prefix="transformer"): input_paths: Tuple[Path, ...] = () plugins: Tuple[str] = () paths = arguments[""] if paths: if conf.input_paths: logging.warning("TRANSFORMER_INPUT_PATHS overwritten with CLI arguments") conf.input_paths = paths conf.input_paths = tuple(Path(p) for p in conf.input_paths) plugins = arguments["--plugin"] if plugins: if conf.plugins: logging.warning("TRANSFORMER_PLUGINS merged with CLI -p/--plugin options") conf.plugins = (*conf.plugins, *plugins) return cast(Config, conf) def script_entrypoint() -> None: """ Entrypoint for the "transformer" program (which reads arguments from the command-line and the environment). This is an alternative to using directly Scenario.from_path and locust.locustfile as a library API in another Python program. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s\t%(levelname)s\t%(message)s" ) config = read_config(cli_args=sys.argv[1:]) if not config.input_paths: logging.error("No input paths provided in environment nor command-line!") logging.info("Did you mean to provide env TRANSFORMER_INPUT_PATHS=[...]?") logging.info("Otherwise, here is the command-line manual:") print(__doc__, file=sys.stderr) exit(1) plugins = tuple(p for name in config.plugins for p in plug.resolve(name)) scenarios = [Scenario.from_path(path, plugins) for path in config.input_paths] print(str(locustfile(scenarios))) PK!T/  transformer/decision.pyfrom typing import NamedTuple, Union, Iterable, Optional class Decision(NamedTuple): valid: bool reason: str def __bool__(self) -> bool: return self.valid def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) and self.valid == o.valid @classmethod def yes(cls, reason: str = "ok") -> "Decision": return Decision(valid=True, reason=reason) @classmethod def no(cls, reason: str) -> "Decision": return Decision(valid=False, reason=reason) @classmethod def whether(cls, cond: Union[bool, "Decision"], reason: str) -> "Decision": if isinstance(cond, Decision): if cond: return cond return Decision.no(f"{reason}: {cond.reason}") return Decision.yes(reason) if cond else Decision.no(reason) @classmethod def all(cls, decisions: Iterable["Decision"]) -> "Decision": for d in decisions: if not d: return d return Decision.yes() @classmethod def any( cls, decisions: Iterable["Decision"], reason: Optional[str] = None ) -> "Decision": recorded_decisions = [] nb_bad_cases = 0 BAD_CASES_THRESHOLD = 5 for d in decisions: if d: return Decision.yes(f"{reason}: {d.reason}") if reason else d if nb_bad_cases <= BAD_CASES_THRESHOLD: recorded_decisions.append(d) nb_bad_cases += 1 if nb_bad_cases <= BAD_CASES_THRESHOLD: cases = str([d.reason for d in recorded_decisions]) else: cases = f"{nb_bad_cases} invalid cases" msg = f"no valid case: {cases}" if reason: msg = f"{reason}: {msg}" return Decision.no(msg) PK!p=transformer/helpers.pyimport json from typing import Iterable def zip_kv_pairs(pairs: Iterable) -> dict: return {pair.name: pair.value for pair in pairs} """ Use this with caution, as it is global and mutable! See also DUMMY_HAR_STRING. """ _DUMMY_HAR_DICT = { "log": { "entries": [ { "startedDateTime": "2018-01-01", "request": {"method": "GET", "url": "https://www.zalando.de"}, } ] } } DUMMY_HAR_STRING = json.dumps(_DUMMY_HAR_DICT) PK!c4transformer/locust.pyimport enum from typing import Sequence, List, Union import transformer.python as py from transformer.scenario import Scenario from transformer.task import Task, Task2 LOCUST_MAX_WAIT_DELAY = 10 LOCUST_MIN_WAIT_DELAY = 0 LOCUSTFILE_COMMENT = """ File automatically generated by Transformer: https://github.bus.zalan.do/TIP/transformer """.strip() def _locust_task(task: Union[Task, Task2]) -> py.Function: """ Transforms a Task into the Python code expected by Locust. This function is private because it does not return a complete Locust task (the @task decorator is missing) and should therefore not be used for that purpose by unsuspecting users. """ if isinstance(task, Task): # TODO: remove when Task2 has replaced Task. # See https://github.com/zalando-incubator/Transformer/issues/11. task = Task2.from_task(task) return py.Function(name=task.name, params=["self"], statements=task.statements) class TaskSetType(enum.Enum): Set = "TaskSet" Sequence = "TaskSequence" def locust_taskset(scenario: Scenario) -> py.Class: """ Transforms a scenario (potentially containing other scenarios) into a Locust TaskSet definition. """ if any(isinstance(child, Task) for child in scenario.children): ts_type = TaskSetType.Sequence else: ts_type = TaskSetType.Set fields: List[py.Statement] = [] for i, child in enumerate(scenario.children, start=1): seq_decorator = f"seq_task({i})" if isinstance(child, Task): fields.append(py.Decoration(seq_decorator, _locust_task(child))) elif isinstance(child, Scenario): field = py.Decoration(f"task({child.weight})", locust_taskset(child)) if ts_type is TaskSetType.Sequence: field = py.Decoration(seq_decorator, field) fields.append(field) else: wrong_type = child.__class__.__qualname__ scenario_type = scenario.__class__.__qualname__ raise TypeError( f"unexpected type {wrong_type} in {scenario_type}.children: {child!r}" ) return py.Class(scenario.name, superclasses=[str(ts_type.value)], statements=fields) def locust_classes(scenarios: Sequence[Scenario]) -> List[py.Class]: """ Transforms scenarios into all Python classes needed by Locust (TaskSet and Locust classes). The only missing parts before a fully functional locustfile are: - integrating all necessary set-up/tear-down statements: - Python imports, - apply global plugins, - etc. - serializing everything via transformer.python. """ classes = [] for scenario in scenarios: taskset = locust_taskset(scenario) locust_class = py.Class( name=f"LocustFor{taskset.name}", superclasses=["HttpLocust"], statements=[ py.Assignment("task_set", py.Symbol(taskset.name)), py.Assignment("weight", py.Literal(scenario.weight)), py.Assignment("min_wait", py.Literal(LOCUST_MIN_WAIT_DELAY)), py.Assignment("max_wait", py.Literal(LOCUST_MAX_WAIT_DELAY)), ], ) classes.append(taskset) classes.append(locust_class) return classes def locust_program(scenarios: Sequence[Scenario]) -> py.Program: """ Converts a ScenarioGroup into a Locust File. """ global_code_blocks = { # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. block_name: py.OpaqueBlock("\n".join(block), comments=[block_name]) for scenario in scenarios for block_name, block in scenario.global_code_blocks.items() } return [ py.Import(["re"], comments=[LOCUSTFILE_COMMENT]), py.Import( ["HttpLocust", "TaskSequence", "TaskSet", "seq_task", "task"], source="locust", ), *locust_classes(scenarios), *global_code_blocks.values(), ] def locustfile(scenarios: Sequence[Scenario]) -> str: return "\n".join( str(line) for stmt in locust_program(scenarios) for line in stmt.lines() ) PK!@y3transformer/naming.pyimport re import zlib DIGIT_RX = re.compile(r"[0-9]") ENDS_WITH_ADLER32 = re.compile(r"_[0-9]+\Z") def to_identifier(string: str) -> str: """ Replace everything except letters, digits and underscore with underscores, allowing the resulting name to be used as identifier in a Python program. A checksum is added at the end (to avoid collisions) if at least one replacement is made, or if the input already ends like a checksum (otherwise, for any input X, we have: to_identifier(X) == to_identifier(to_identifier(X)) i.e. a collision). """ safe_name = re.sub(r"[^_a-z0-9]", "_", string, flags=re.IGNORECASE) if DIGIT_RX.match(safe_name): safe_name = f"_{safe_name}" if safe_name == string and not ENDS_WITH_ADLER32.search(string): return string unique_suffix: int = zlib.adler32(string.encode()) return f"{safe_name}_{unique_suffix}" PK!6transformer/plugins/README.md# Plugins ## List of available plugins - [`sanitize_headers`](sanitize_headers.md) ## How to use plugins In order to use a plugin, pass a method of a [`Plugin`](__init__.py) signature: - a single argument of type `Sequence[Task]` - returned value of type `Sequence[Task]` to the `transform.main` function like so: ```python from transformer import transform from transformer.plugins import sanitize_headers paths = [...] locustfile = transform.main(paths, plugins=[sanitize_headers.plugin]) ``` [`Plugin`]: __init__.py [`sanitize_headers`]: sanitize_headers.py ## Writing custom plugins It's possible to use custom plugins. A valid plugin must be a method with the signature of the [`Plugin`][]. Each plugin has access to all `Task`s that are generated from the HAR file, and can modify them in different way, depending on the intent: - the `locust_request` attribute can be modified in order to change the request itself, e.g. its URL or parameters, - the `locust_preprocessing` attribute can be modified in order to add some logic **before** the request, - the `locust_postprocessing` attribute can be modified in order to add some logic **after** the request; at this stage, the `response` object from the request can also be used. Each plugin must take a full collection of tasks (`Sequence[Task]`) as argument, and return it as well, which means that both: modified and untouched tasks should be returned, with respect to their order. See [`sanitize_headers`][] plugin as an example of the implementation. PK!-44transformer/plugins/__init__.pyfrom .resolve import resolve __all__ = ["resolve"] PK! transformer/plugins/contracts.py""" This module defines the various contracts, i.e. types of plugins supported by Transformer. The term "contract" indicates that these types constrain what plugin implementors can do in Transformer. Transformer plugins are just functions that accept certain inputs and have certain outputs. Different types of plugins have different input and output types. Not all types of plugins can be applied at the same point in Transformer's pipeline (e.g. python.Program objects are built much later than Task objects), hence the multiplicity of contracts. These input and output types are formalized here using Python's annotation syntax and the typing module. In addition to the plugin contracts, this module provides an "isvalid" method for checking whether arbitrary Python objects conform to a given plugin contract. # Plugin kinds ## OnTask Kind of "stateless" plugins that operate independently on each task. When implementing one, imagine their execution could be parallelized by Transformer in the future. Example: a plugin that injects a header in all requests. ## OnScenario Kind of plugins that operate on scenarios. Each scenario is the root of a tree composed of smaller scenarios and tasks (the leaves of this tree). Therefore, in an OnScenario plugin, you have the possibility of inspecting the subtree and making decisions based on that. However, OnScenario plugins will be applied to all scenarios by Transformer, so you don't need to recursively apply the plugin yourself on all subtrees. If you do that, the plugin will be applied many times more than necessary. Example: a plugin that keeps track of how long each scenario runs. ## OnPythonProgram Kind of plugins that operate on the whole syntax tree. The input and output of this kind of plugins is the complete, final locustfile generated by Transformer, represented as a syntax tree. OnPythonProgram plugins therefore have the most freedom compared to other plugin kinds, because they can change anything. Their downside is that manipulating the syntax tree is more complex than the scenario tree or individual tasks. Example: a plugin that injects some code in the global scope. # Other types This module also defines: ## Plugin Any supported kind of Transformer plugin. """ import inspect from typing import Sequence, Callable, Union, Dict, Any from transformer import python from transformer.decision import Decision from transformer.task import Task, Task2 PluginValidator = Callable[[inspect.Signature], Decision] _PluginValidatorDecorator = Callable[[PluginValidator], PluginValidator] _PLUGIN_VALIDATORS: Dict[type, PluginValidator] = {} def _register_contract(plugin_type: type) -> None: if plugin_type in _PLUGIN_VALIDATORS: raise ValueError(f"{plugin_type} already registered; cannot register it again") *expected_params, expected_return = plugin_type.__args__ def _validator(sig: inspect.Signature) -> Decision: if sig.return_annotation != expected_return: return Decision.no( f"expected {expected_return}, got {sig.return_annotation}" ) actual_params = [p.annotation for p in sig.parameters.values()] if actual_params != expected_params: return Decision.no( f"expected parameters {expected_params}, got {actual_params}" ) return Decision.yes() _PLUGIN_VALIDATORS[plugin_type] = _validator OnTask = Callable[[Task2], Task2] OnScenario = Callable[["scenario.Scenario"], "scenario.Scenario"] OnPythonProgram = Callable[[python.Program], python.Program] # Historically Transformer has only one kind of plugin, which transformed a # sequence of Task objects into another such sequence. Operating on a full list # of tasks (instead of task by task) offered more leeway: a plugin could e.g. # add a new task, or change only the first task. # However this OnTaskSequence model is too constraining for some use-cases, # e.g. when a plugin needs to inject code in the global scope, and having to # deal with a full, immutable list of tasks in plugins that independently # operate on each task implies a lot of verbosity and redundancy. # For these reasons, other plugin kinds were created to offer a more varied # choice for plugin implementers. # See https://github.com/zalando-incubator/Transformer/issues/10. OnTaskSequence = Callable[[Sequence[Task]], Sequence[Task]] Plugin = Union[OnTask, OnScenario, OnPythonProgram, OnTaskSequence] for plugin_type in Plugin.__args__: _register_contract(plugin_type) def isvalid(plugin_type: type, obj: Any) -> Decision: """ Checks whether obj is an implementation of the plugin contract plugin_type. The return value is basically a boolean, with an additional string describing the reason for this decision. :param plugin_type: plugin contract to verify obj against :param obj: any Python object :return: whether obj is conform to the plugin_type contract :raise TypeError: if plugin_type is not a plugin contract """ if plugin_type is Plugin: return Decision.any( (isvalid(t, obj) for t in Plugin.__args__), "should be valid for a Plugin subtype", ) if not callable(obj): return Decision.no(f"{obj!r} is not a function") try: validator = _PLUGIN_VALIDATORS[plugin_type] except KeyError: raise TypeError(f"no Plugin contract registered for {plugin_type}") try: actual_signature = inspect.signature(obj) except (ValueError, TypeError) as err: return Decision.no(f"could not extract signature from {obj!r}: {err}") return Decision.whether( validator(actual_signature), f"{obj.__name__!r} should implement {plugin_type}" ) PK!ȹtransformer/plugins/dummy.pyimport logging from typing import Sequence from transformer.task import Task def plugin(tasks: Sequence[Task]) -> Sequence[Task]: logging.info(f"The first request was {tasks[0].request.url.geturl()}") return tasks PK!!}transformer/plugins/resolve.pyimport importlib import inspect import logging from types import ModuleType from typing import Iterator from transformer.plugins import contracts from transformer.plugins.contracts import Plugin def resolve(name: str) -> Iterator[Plugin]: """ Transform a plugin name into the corresponding, actual plugins. The name of a plugin is the name of a Python module containing (at least) one function which name begins with "plugin" and which is annotated according to one of the "plugin contracts" (defined in the contracts module). The "resolve" function loads that module and returns these plugin functions found inside the module. """ try: module = importlib.import_module(name) except ImportError as err: logging.error(f"failed to import plugin module {name!r}: {err}") return iter(()) return load_plugins_from_module(module) PLUGIN_PREFIX = "plugin" def load_plugins_from_module(module: ModuleType) -> Iterator[Plugin]: if not inspect.ismodule(module): raise TypeError(f"expected a module, got {module!r}") at_least_once = False for obj_name, obj in inspect.getmembers(module, inspect.isfunction): if obj_name.startswith(PLUGIN_PREFIX): valid = contracts.isvalid(Plugin, obj) if not valid: logging.warning(f"ignoring {obj_name}: {valid.reason}") else: at_least_once = True yield obj else: logging.debug(f"ignoring {obj_name}: doesn't start with {PLUGIN_PREFIX!r}") if not at_least_once: logging.error(f"module {module} doesn't contain plugin functions") PK!'transformer/plugins/sanitize_headers.md# Sanitizing headers The [`sanitize_headers` plugin](sanitize_headers.py) should be used for processing scenarios that were generated in the Chrome browser, but is advised to be used whenever cookies handling is important. The plugin removes Chrome-specific, RFC non-compliant headers starting with ":". Example of such headers: ``` :authority: chrome.google.com :method: POST :path: /reviews/json/search :scheme: https ``` Additionally the plugin: - maps header keys to lowercase, which makes further overriding of headers deterministic, - ignores the `cookie` header, as cookies are handled by [Locust's `HttpSession`][http-session]. [http-session]: https://docs.locust.io/en/stable/api.html#httpsession-classPK!tXQQ'transformer/plugins/sanitize_headers.pyfrom typing import Sequence from transformer.task import Task, LocustRequest from transformer.helpers import zip_kv_pairs def plugin(tasks: Sequence[Task]) -> Sequence[Task]: """ Removes Chrome-specific, RFC non-compliant headers starting with ":". Maps header keys to lowercase to make overriding deterministic. Removes cookie header as it is handled by Locust's HttpSession. """ modified_tasks = [] for task in tasks: if task.locust_request is None: task = task._replace( locust_request=LocustRequest.from_request(task.request) ) headers = task.locust_request.headers if not isinstance(headers, dict): headers = zip_kv_pairs(headers) sanitized_headers = { k.lower(): v for (k, v) in headers.items() if not k.startswith(":") and k.lower() != "cookie" } task = task._replace( locust_request=task.locust_request._replace(headers=sanitized_headers) ) modified_tasks.append(task) return modified_tasks PK!l@bb%transformer/plugins/test_contracts.pyfrom typing import Callable from unittest.mock import MagicMock import pytest from transformer.task import Task2 from .contracts import isvalid, Plugin, OnTask class TestIsvalid: def test_no_if_obj_is_not_a_function_regardless_of_plugin(self): class A: pass assert not isvalid(MagicMock(), A()) assert not isvalid(MagicMock(), 2) assert not isvalid(MagicMock(), "x") def test_raises_error_for_unknown_plugin(self): def f(_: int) -> int: ... IntPlugin = Callable[[int], int] with pytest.raises(TypeError): isvalid(IntPlugin, f) def test_no_if_obj_has_no_signature(self): def f(task): return task assert not isvalid(OnTask, f) def test_no_if_obj_has_wrong_signature(self): def f(b: bool) -> bool: return b assert not isvalid(OnTask, f) def test_yes_if_obj_has_right_signature(self): def f(t: Task2) -> Task2: return t assert isvalid(OnTask, f) def test_isvalid_plugin_false_if_false_for_all_plugin_subtypes(self): def f(t: bool) -> bool: return t assert not isvalid(Plugin, f) def test_isvalid_plugin_true_if_true_for_a_plugin_subtype(self): def f(t: Task2) -> Task2: return t assert isvalid(Plugin, f) PK!&v3!transformer/plugins/test_dummy.pyimport logging from pathlib import Path from transformer.helpers import DUMMY_HAR_STRING from transformer.scenario import Scenario def test_dummy_plugin_works(tmp_path: Path, caplog): from transformer.plugins import dummy har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) caplog.set_level(logging.INFO) Scenario.from_path(har_path, plugins=[dummy.plugin]) assert "https://www.zalando.de" in caplog.text PK!|9ɦ #transformer/plugins/test_resolve.pyimport logging import random import sys import uuid from pathlib import Path from types import ModuleType import pytest from transformer.task import Task2 from .resolve import load_plugins_from_module, resolve @pytest.fixture() def module_root(tmp_path: Path, monkeypatch) -> Path: monkeypatch.setattr(sys, "path", [str(tmp_path), *sys.path]) return tmp_path class TestResolve: def test_returns_empty_and_logs_for_module_not_found(self, caplog): modname = f"that_module_does_not_exist.{uuid.uuid4().hex}" assert list(resolve(modname)) == [] assert f"failed to import plugin module {modname!r}" in caplog.text def test_calls_load_plugins_from_module_with_module(self, module_root: Path): modname = "ab.cd.ef" modpath = Path(*modname.split(".")).with_suffix(".py") Path(module_root, modpath.parent).mkdir(parents=True) with Path(module_root, modpath).open("w") as f: f.write("from transformer.plugins.contracts import Task2\n") f.write("def plugin_f(t: Task2) -> Task2:\n") f.write(" ...\n") plugins = list(resolve(modname)) assert len(plugins) == 1 f = plugins[0] assert f.__name__ == "plugin_f" def test_resolve_is_exported_by_the_transformer_plugins_module(self): try: from transformer.plugins import resolve except ImportError: pytest.fail("resolve should be exported by transformer.plugins") @pytest.fixture() def module() -> ModuleType: """Creates and returns an empty module.""" return ModuleType(f"fake_{random.randint(0, 99999999)}") class TestLoadPluginsFromModule: def test_raises_error_for_non_module(self): class A: pass with pytest.raises(TypeError): # Iterators are lazy, we need list() list(load_plugins_from_module(A)) def test_ignores_non_plugin_stuff_in_module(self, module, caplog): def signature_not_a_plugin(_: Task2) -> Task2: ... def plugin_prefixed_but_no(): ... def plugin_valid(_: Task2) -> Task2: ... non_plugin_functions = (signature_not_a_plugin, plugin_prefixed_but_no) functions = (*non_plugin_functions, plugin_valid) for f in functions: module.__dict__[f.__name__] = f caplog.clear() caplog.set_level(logging.DEBUG) plugins = list(load_plugins_from_module(module)) assert plugins == [plugin_valid] print(f">>> log messages: {caplog.messages}") for f in non_plugin_functions: assert any( f.__name__ in msg for msg in caplog.messages ), "ignored function names should be logged" def test_empty_iterator_for_modules_without_any_plugin(self, module, caplog): plugins = list(load_plugins_from_module(module)) assert plugins == [] assert module.__name__ in caplog.text PK!%HH,transformer/plugins/test_sanitize_headers.py# pylint: skip-file from unittest.mock import MagicMock from transformer.task import Task, LocustRequest from transformer.request import HttpMethod, Header from .sanitize_headers import plugin def test_it_removes_headers_beginning_with_a_colon(): tasks = [ Task( name="some task", request=None, locust_request=LocustRequest( method=HttpMethod.GET, url="", headers=[Header(name=":non-rfc-header", value="some value")], ), ) ] sanitized_headers = plugin(tasks)[0].locust_request.headers assert len(sanitized_headers) == 0 def test_it_downcases_header_names(): tasks = [ Task( name="some task", request=None, locust_request=LocustRequest( method=HttpMethod.GET, url="", headers=[Header(name="Some Name", value="some value")], ), ) ] sanitized_headers = plugin(tasks)[0].locust_request.headers assert "some name" in sanitized_headers def test_it_removes_cookies(): tasks = [ Task( name="someTask", request=None, locust_request=LocustRequest( method=HttpMethod.GET, url="", headers=[Header(name="cookie", value="some value")], ), ) ] sanitized_headers = plugin(tasks)[0].locust_request.headers assert len(sanitized_headers) == 0 def test_it_does_not_remove_other_headers(): tasks = [ Task( name="someTask", request=None, locust_request=LocustRequest( method=HttpMethod.GET, url="", headers=[Header(name="some other header", value="some value")], ), ) ] sanitized_headers = plugin(tasks)[0].locust_request.headers assert len(sanitized_headers) == 1 def test_it_creates_a_locust_request_if_none_exist(): tasks = [Task(name="some task", request=MagicMock())] assert plugin(tasks)[0].locust_request PK!s=*U*Utransformer/python.pyimport re from types import MappingProxyType from typing import ( Sequence, Mapping, Any, List, Type, Set, Optional, Tuple, cast, Iterable, ) IMMUTABLE_EMPTY_DICT = MappingProxyType({}) class Line: """ A line of text and its associated indentation level. This class allows not to constantly copy strings to add a new indentation level at every scope of the AST. """ INDENT_UNIT: str = " " * 4 def __init__(self, text: str, indent_level: int = 0) -> None: self.text = text self.indent_level = indent_level def __str__(self) -> str: return f"{self.INDENT_UNIT * self.indent_level}{self.text}" def __repr__(self) -> str: return "{}(text={!r}, indent_level={!r})".format( self.__class__.__qualname__, self.text, self.indent_level ) def clone(self) -> "Line": """ Creates an exact but disconnected copy of self. Useful in tests. """ return self.__class__(text=self.text, indent_level=self.indent_level) def __eq__(self, o: object) -> bool: return ( isinstance(o, self.__class__) and self.text == cast(__class__, o).text and self.indent_level == cast(__class__, o).indent_level ) def _resplit(parts: Iterable[str]) -> List[str]: """ Given a list of strings, returns a list of lines, by splitting each string into multiple lines where it contains newlines. >>> _resplit([]) [] >>> _resplit(['a', 'b']) ['a', 'b'] >>> _resplit(['a', 'b\\nc\\nd']) ['a', 'b', 'c', 'd'] """ return [line for part in parts for line in part.splitlines()] class Statement: """ Python distinguishes between statements and expressions: basically, statements cannot be assigned to a variable, whereas expressions can. For our purpose, another distinction is important: statements may span over multiple lines (and not just for style), whereas all expressions can be expressed in a single line. This class serves as abstract base for all implementors of lines() and handles comment processing for them. """ def __init__(self, comments: Sequence[str] = ()) -> None: self._comments = _resplit(comments) @property def comments(self) -> List[str]: self._comments = _resplit(self._comments) return self._comments @comments.setter def comments(self, value: List[str]): self._comments = value def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ All Line objects necessary to represent this Statement, along with the appropriate indentation level. :param indent_level: How much indentation to apply to the least indented line of this statement. :param comments: Whether existing comments attached to self should be included in the result. """ raise NotImplementedError def comment_lines(self, indent_level: int) -> List[Line]: """ Converts self.comments from str to Line with "#" prefixes. """ return [Line(f"# {s}", indent_level) for s in self.comments] def attach_comment(self, line: Line) -> List[Line]: """ Attach a comment to line: inline if self.comments is just one line, on dedicated new lines above otherwise. """ comments = self.comments if not comments: return [line] if len(comments) == 1: line.text += f" # {comments[0]}" return [line] lines = self.comment_lines(line.indent_level) lines.append(line) return lines def __eq__(self, o: object) -> bool: return ( isinstance(o, self.__class__) and self.comments == cast(__class__, o).comments ) # Handy alias for type signatures. Program = Sequence[Statement] class OpaqueBlock(Statement): """ A block of code already represented as a string. This helps moving existing code (e.g. in plugins) from our ad-hoc "blocks of code" framework to the AST framework defined in this module. It also allows to express Python constructs that would otherwise not yet be representable with this AST framework. """ PREFIX_RX = re.compile(r"\s+") TAB_SIZE = 8 def __init__(self, block: str, comments: Sequence[str] = ()) -> None: super().__init__(comments) if not block.strip(): raise ValueError(f"OpaqueBlock can't be empty but got {block!r}") self.block = block def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: raw_lines = [l.expandtabs(self.TAB_SIZE) for l in self.block.splitlines()] first_nonempty_line = next(i for i, l in enumerate(raw_lines) if l.strip()) after_last_nonempty_line = next( len(raw_lines) - i for i, l in enumerate(reversed(raw_lines)) if l.strip() ) raw_lines = raw_lines[first_nonempty_line:after_last_nonempty_line] indents = [self.PREFIX_RX.match(l) for l in raw_lines] shortest_indent = min(len(p.group()) if p else 0 for p in indents) block_lines = [Line(l[shortest_indent:], indent_level) for l in raw_lines] if comments: return [*self.comment_lines(indent_level), *block_lines] return block_lines def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.block, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.block == cast(__class__, o).block class Function(Statement): """ A function definition (def ...). """ def __init__( self, name: str, params: Sequence[str], statements: Sequence[Statement], comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.name = name self.params = list(params) self.statements = list(statements) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: param_list = ", ".join(self.params) body_lines = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"def {self.name}({param_list}):", indent_level) if comments: return [*self.attach_comment(top), *body_lines] return [top, *body_lines] def __repr__(self) -> str: return "{}(name={!r}, params={!r}, statements={!r}, comments={!r})".format( self.__class__.__qualname__, self.name, self.params, self.statements, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.params == cast(__class__, o).params and self.statements == cast(__class__, o).statements ) class Decoration(Statement): """ A function or class definition to which is applied a decorator (e.g. @task). """ def __init__( self, decorator: str, target: Statement, comments: Sequence[str] = () ) -> None: super().__init__(comments) self.decorator = decorator self.target = target def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: top = Line(f"@{self.decorator}", indent_level) target_lines = self.target.lines(indent_level, comments) if comments: return [*self.attach_comment(top), *target_lines] return [top, *target_lines] def __repr__(self) -> str: return "{}({!r}, {!r}, comments={!r})".format( self.__class__.__qualname__, self.decorator, self.target, self.comments ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.decorator == cast(__class__, o).decorator and self.target == cast(__class__, o).target ) class Class(Statement): """ A class definition. """ def __init__( self, name: str, statements: Sequence[Statement], superclasses: Sequence[str] = (), comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.name = name self.statements = list(statements) self.superclasses = list(superclasses) def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: superclasses = "" if self.superclasses: superclasses = "({})".format(", ".join(self.superclasses)) body = [ line for stmt in self.statements for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] top = Line(f"class {self.name}{superclasses}:", indent_level) if comments: return [*self.attach_comment(top), *body] return [top, *body] def __repr__(self) -> str: return ( "{}(name={!r}, statements={!r}, " "superclasses={!r}, comments={!r})" ).format( self.__class__.__qualname__, self.name, self.statements, self.superclasses, self.comments, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.statements == cast(__class__, o).statements and self.superclasses == cast(__class__, o).superclasses ) class Expression: """ See the documentation of Statement for why Expression is a separate class. An expression is still a statement in Python (e.g. functions can be called anywhere), but this Expression class is NOT a Statement because we can't attach comments to arbitrary expressions (e.g. between braces). If you need to use an Expression as a Statement, see the Standalone wrapper class. This class serves as abstract base for all our implementors of __str__(). """ def __str__(self) -> str: raise NotImplementedError def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) class Standalone(Statement): """ Wraps an Expression so that it can be used as a Statement. """ def __init__(self, expr: Expression, comments: Sequence[str] = ()) -> None: super().__init__(comments) self.expr = expr def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: """ An Expression E used as a Statement is serialized as the result of str(E) on its own Line. """ line = Line(str(self.expr), indent_level) if comments: return self.attach_comment(line) return [line] def __repr__(self) -> str: return "{}({!r}, comments={!r})".format( self.__class__.__qualname__, self.expr, self.comments ) def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.expr == cast(__class__, o).expr def _all_subclasses_of(cls: Type) -> Set[Type]: """ All subclasses of cls, including non-direct ones (child of child of ...). """ direct_subclasses = set(cls.__subclasses__()) return direct_subclasses.union( s for d in direct_subclasses for s in _all_subclasses_of(d) ) class Literal(Expression): """ All literal Python expressions (integers, strings, lists, etc.). Everything will be serialized using repr(), except Expression objects that could be contained in a composite value like list: they will be serialized with str(), as is probably expected. Thus: >>> str(Literal([1, {"a": FString("-{x}")}])) "[1, {'a': f'-{x}'}]" instead of something like "[1, {'a': FString('-{x}')}]". """ def __init__(self, value: Any) -> None: super().__init__() self.value = value _REPR_BY_EXPR_CLS = None def __str__(self) -> str: # This is not pretty, but repr() doesn't accept a visitor we could use # to say "just this time, use that code to serialize Expression objects". if Literal._REPR_BY_EXPR_CLS is None: Literal._REPR_BY_EXPR_CLS = { c: c.__repr__ for c in _all_subclasses_of(Expression) } try: for k in Literal._REPR_BY_EXPR_CLS.keys(): k.__repr__ = k.__str__ return repr(self.value) finally: for k, _repr in Literal._REPR_BY_EXPR_CLS.items(): k.__repr__ = _repr def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.value!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.value == cast(__class__, o).value class FString(Literal): """ f-strings cannot be handled like most literals because they are evaluated first, so they lose their "f" prefix and their template is executed too early. """ def __init__(self, s: str) -> None: if not isinstance(s, str): raise TypeError( f"expecting a format string, got {s.__class__.__qualname__}: {s!r}" ) super().__init__(s) def __str__(self) -> str: return "f" + repr(str(self.value)) class Symbol(Expression): """ The name of something (variable, function, etc.). Avoids any kind of text transformation that would happen with Literal. >>> str(Literal("x")) "'x'" >>> str(Symbol("x")) 'x' The provided argument's type is explicitly checked and a TypeError may be raised to avoid confusion when a user expects e.g. Symbol(True) to work like Symbol("True"). """ def __init__(self, name: str) -> None: super().__init__() if not isinstance(name, str): raise TypeError( f"expected symbol name, got {name.__class__.__qualname__}: {name!r}" ) self.name = name def __str__(self) -> str: return self.name def __repr__(self) -> str: return f"{self.__class__.__qualname__}({self.name!r})" def __eq__(self, o: object) -> bool: return super().__eq__(o) and self.name == cast(__class__, o).name class FunctionCall(Expression): """ The invocation of a function or method. """ def __init__( self, name: str, positional_args: Sequence[Expression] = (), named_args: Mapping[str, Expression] = IMMUTABLE_EMPTY_DICT, ) -> None: super().__init__() self.name = name self.positional_args = list(positional_args) self.named_args = dict(named_args) def __str__(self) -> str: args = [str(a) for a in self.positional_args] + [ f"{k}={v}" for k, v in self.named_args.items() ] return f"{self.name}({', '.join(args)})" def __repr__(self) -> str: return "{}({!r}, {!r}, {!r})".format( self.__class__.__qualname__, self.name, self.positional_args, self.named_args, ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.name == cast(__class__, o).name and self.positional_args == cast(__class__, o).positional_args and self.named_args == cast(__class__, o).named_args ) class BinaryOp(Expression): """ The invocation of a binary operator. To avoid any precedence error in the generated code, operands that are also BinaryOps are always surrounded by braces (even when not necessary, as in "1 + (2 + 3)", as a more subtle behavior has increased complexity of implementation without much benefit. """ def __init__(self, lhs: Expression, op: str, rhs: Expression) -> None: super().__init__() self.lhs = lhs self.op = op self.rhs = rhs def __str__(self) -> str: operands = [self.lhs, self.rhs] return f" {self.op} ".join( f"({x})" if isinstance(x, BinaryOp) else str(x) for x in operands ) def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.op == cast(__class__, o).op and self.rhs == cast(__class__, o).rhs ) class Assignment(Statement): """ The assignment of a value to a variable. For our purposes, we don't treat multiple assignment via tuples differently. We also don't support chained assignments such as "a = b = 1". """ def __init__(self, lhs: str, rhs: Expression, comments: Sequence[str] = ()) -> None: super().__init__(comments) self.lhs = lhs self.rhs = rhs def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: line = Line(f"{self.lhs} = {self.rhs}", indent_level) if comments: return self.attach_comment(line) return [line] def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.lhs == cast(__class__, o).lhs and self.rhs == cast(__class__, o).rhs ) def __repr__(self) -> str: return "{}(lhs={!r}, rhs={!r}, comments={!r})".format( self.__class__.__qualname__, self.lhs, self.rhs, self.comments ) class IfElse(Statement): """ The if/elif/else construct, where elif and else are optional and elif can be repeated. """ def __init__( self, condition_blocks: Sequence[Tuple[Expression, Sequence[Statement]]], else_block: Optional[Sequence[Statement]] = None, comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.condition_blocks = [ (cond, list(stmts)) for cond, stmts in condition_blocks ] self._assert_consistency() self.else_block = else_block def _assert_consistency(self): if not self.condition_blocks: raise ValueError("can't have an if without at least one block") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() lines = [] for i, block in enumerate(self.condition_blocks): keyword = "if" if i == 0 else "elif" lines.append(Line(f"{keyword} {block[0]}:", indent_level)) lines.extend( [ line for stmt in block[1] for line in stmt.lines(indent_level + 1, comments) ] or [Line("pass", indent_level + 1)] ) if self.else_block: lines.append(Line("else:", indent_level)) lines.extend( [ line for stmt in self.else_block for line in stmt.lines(indent_level + 1, comments) ] ) if comments: # There is always a first line, or _assert_consistency would fail. return [*self.attach_comment(lines[0]), *lines[1:]] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.condition_blocks == cast(__class__, o).condition_blocks and self.else_block == cast(__class__, o).else_block ) def __repr__(self) -> str: return "{}(condition_blocks={!r}, else_block={!r}, comments={!r})".format( self.__class__.__qualname__, self.condition_blocks, self.else_block, self.comments, ) class Import(Statement): """ The import statement in all its forms: "import", "import X as A", "from M import X", "from M import X as A", and "from M import X, Y". """ def __init__( self, targets: Sequence[str], source: Optional[str] = None, alias: Optional[str] = None, comments: Sequence[str] = (), ) -> None: super().__init__(comments) self.targets = list(targets) self.source = source self.alias = alias self._assert_consistency() def _assert_consistency(self): if not self.targets: raise ValueError("expected at least one import target") if len(self.targets) > 1 and self.alias: raise ValueError("alias forbidden for multiple import targets") def lines(self, indent_level: int = 0, comments: bool = True) -> List[Line]: self._assert_consistency() import_kw = f"from {self.source} import" if self.source else "import" alias_clause = f" as {self.alias}" if self.alias else "" lines = [ Line(f"{import_kw} {target}{alias_clause}", indent_level) for target in self.targets ] if comments: return [*self.comment_lines(indent_level), *lines] return lines def __eq__(self, o: object) -> bool: return ( super().__eq__(o) and self.targets == cast(__class__, o).targets and self.source == cast(__class__, o).source and self.alias == cast(__class__, o).alias ) def __repr__(self) -> str: return "{}(targets={!r}, source={!r}, alias={!r}, comments={!r})".format( self.__class__.__qualname__, self.targets, self.source, self.alias, self.comments, ) PK!0O transformer/request.py# -*- coding: utf-8 -*- """ A representation of a HAR Request. """ import enum from datetime import datetime from typing import Iterator, NamedTuple, List from urllib.parse import urlparse, SplitResult import pendulum from transformer.naming import to_identifier class HttpMethod(enum.Enum): """ Enumeration of HTTP method types. """ GET = enum.auto() POST = enum.auto() PUT = enum.auto() OPTIONS = enum.auto() DELETE = enum.auto() class Header(NamedTuple): """ HTTP header as recorded in HAR file. """ name: str value: str class QueryPair(NamedTuple): """ Query String as recorded in HAR file. """ name: str value: str class Request(NamedTuple): """ An HTTP request as recorded in a HAR file. """ timestamp: datetime method: HttpMethod url: SplitResult headers: List[Header] post_data: dict query: List[QueryPair] @classmethod def from_har_entry(cls, entry: dict) -> "Request": """ Creates a request from a HAR entry. """ request = entry["request"] return Request( timestamp=pendulum.parse(entry["startedDateTime"]), method=HttpMethod[request["method"]], url=urlparse(request["url"]), headers=[ Header(name=d["name"], value=d["value"]) for d in request.get("headers", []) ], post_data=request.get("postData", {}), query=[ QueryPair(name=d["name"], value=d["value"]) for d in request.get("queryString", []) ], ) @classmethod def all_from_har(cls, har: dict) -> Iterator["Request"]: """ Generates requests for all entries in a given HAR file. """ for entry in har["log"]["entries"]: yield cls.from_har_entry(entry) def task_name(self) -> str: """ Generates a simple name suitable for use as a Python function. """ return "_".join( ( self.method.name, self.url.scheme, to_identifier(self.url.hostname), to_identifier(self.url.path), str(abs(hash(self))), ) ) def __hash__(self) -> int: return hash( ( self.timestamp, self.method, self.url, tuple(self.post_data) if self.post_data else None, ) ) PK!jXP| ' 'transformer/scenario.pyimport json import logging from collections import defaultdict from pathlib import Path from typing import ( NamedTuple, Sequence, Mapping, Union, Set, List, Optional, Dict, Tuple, ) from transformer.naming import to_identifier from transformer.plugins.contracts import OnTaskSequence from transformer.request import Request from transformer.task import Task, Task2 WEIGHT_FILE_SUFFIX = ".weight" DEFAULT_WEIGHT = 1 class SkippableScenarioError(ValueError): # noqa: B903 """ Raised when a Scenario cannot be created from the provided input path. If related to the creation of a Scenario B inside a larger Scenario A (i.e. B would be in A.children), A catches this exception, logs a warning, and moves on to the next potential child. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason class DanglingWeightError(SkippableScenarioError): """ Raised when a scenario directory contains weight files that don't correspond to any scenario. """ pass class CollidingScenariosError(SkippableScenarioError): """ Raised when scenarios created from different paths end up having the same name. The only way this happens is if the paths are identical save for the extension (e.g. ".har" vs ".json"), or if there is a bug (collision) in transformer.naming.to_identifier (which should never happen). """ pass class WeightValueError(ValueError): # noqa: B903 """ Raised when the weight file associated to a scenario contains errors. """ def __init__(self, scenario_path: Path, reason: Union[Exception, str]) -> None: self.path = scenario_path self.reason = reason class Scenario(NamedTuple): """ A user's web session that we want to emulate, i.e. a sequence of tasks to be performed in order. """ name: str children: Sequence[Union[Task, Task2, "Scenario"]] origin: Optional[Path] weight: int = 1 @classmethod def from_path( cls, path: Path, plugins: Sequence[OnTaskSequence] = (), short_name: bool = False, ) -> "Scenario": """ Makes a Scenario (possibly containing sub-scenarios) out of the provided path, which may point to either: - a HAR file (x/y/z.har), - a scenario directory (a directory containing HAR files or other scenario directories). :raise SkippableScenarioError: if path is neither a directory nor a HAR file, or is a directory containing dangling weight files :param short_name: whether the returned scenarios have names based only on their path's basename, instead of the full path. By default False to avoid generating homonym scenarios, but True when generating sub-scenarios (children) from a directory path (because then the names are "scoped" by the parent directory). """ if path.is_dir(): return cls.from_dir(path, plugins, short_name=short_name) else: return cls.from_har_file(path, plugins, short_name=short_name) @classmethod def from_dir( cls, path: Path, plugins: Sequence[OnTaskSequence], short_name: bool ) -> "Scenario": """ Makes a Scenario out of the provided directory path. The directory must be a "scenario directory", which means that it must contain at least one HAR file or another scenario directory. Symbolic link loops are not checked but forbidden! There may exist a weight file .weight. If so, its contents will be used as weight for the Scenario by calling weight_from_path. Errors are handled this way: 1. If path itself cannot be transformed into a scenario, raise SkippableScenarioError. 2. For each child of path, apply (1) but catch the exception and display a warning about skipping this child. (If all children are skipped, (1) applies to path itself.) Therefore: - If the directory contains weight files that don't match any HAR file or subdirectory, an error will be emitted as this is probably a mistake. - If the directory contains files or directory that cannot be converted into scenarios (e.g. non-JSON files or .git directories), a message is emitted and the file or subdirectory is skipped. :raise SkippableScenarioError: if the directory contains dangling weight files or no sub-scenarios. """ try: children = list(path.iterdir()) except OSError as err: raise SkippableScenarioError(path, err) weight_files: Set[Path] = { child for child in children if child.suffix == WEIGHT_FILE_SUFFIX } scenarios: List[Scenario] = [] for child in children: if child in weight_files: continue try: scenario = cls.from_path(child, plugins, short_name=True) except SkippableScenarioError as err: logging.warning( "while searching for HAR files, skipping %s: %s", child, err.reason ) else: scenarios.append(scenario) cls._check_dangling_weights(path, scenarios, weight_files) if not scenarios: raise SkippableScenarioError(path, "no scenarios inside the directory") cls._check_name_collisions(path, scenarios) return Scenario( name=to_identifier(path.with_suffix("").name if short_name else str(path)), children=tuple(scenarios), origin=path, weight=cls.weight_from_path(path), ) @classmethod def _check_name_collisions(cls, path: Path, scenarios: List["Scenario"]): scenarios_by_name: Dict[str, List[Scenario]] = defaultdict(list) for s in scenarios: scenarios_by_name[s.name].append(s) colliding_paths: Set[Tuple[Path, ...]] = { tuple(x.origin for x in xs) for xs in scenarios_by_name.values() if len(xs) > 1 } if colliding_paths: groups = "; ".join( " vs ".join(repr(s.name) for s in group) for group in colliding_paths ) logging.error( "%s contains scenarios with colliding names: %s", path, groups ) raise CollidingScenariosError(path, "scenarios have colliding names") @classmethod def _check_dangling_weights(cls, path, scenarios, weight_files): scenario_names = {s.origin.with_suffix("").name for s in scenarios} dangling_weight_files = [ f for f in weight_files if f.with_suffix("").name not in scenario_names ] if dangling_weight_files: hint = ", ".join(str(f) for f in dangling_weight_files) logging.error( "%s contains weight files that don't correspond to any scenarios: %s", path, hint, ) logging.info( "For any value of X, if there exists a weight file X.weight, " "there must exist either an X.har file or an X scenario subdirectory." ) raise DanglingWeightError(path, "contains dangling weight files") @classmethod def from_har_file( cls, path: Path, plugins: Sequence[OnTaskSequence], short_name: bool ) -> "Scenario": """ Creates a Scenario given a HAR file. :raise SkippableScenarioError: if path is unreadable or not a HAR file """ try: with path.open() as file: har = json.load(file) requests = Request.all_from_har(har) tasks = list(Task.from_requests(requests)) for plugin in plugins: tasks = plugin(tasks) return Scenario( name=to_identifier( path.with_suffix("").name if short_name else str(path) ), children=tuple(tasks), origin=path, weight=cls.weight_from_path(path), ) except (OSError, json.JSONDecodeError, UnicodeDecodeError) as err: raise SkippableScenarioError(path, err) @classmethod def weight_from_path(cls, path: Path) -> int: """ Reads the weight file corresponding to path, or returns a default weight if the weight file doesn't exist. :param path: represents either a HAR file or a scenario directory :raise WeightValueError: if the weight file exists but its contents cannot be interpreted as a weight """ weight_path = path.with_suffix(WEIGHT_FILE_SUFFIX) try: weight = weight_path.read_text().strip() except OSError as err: logging.info( f"No {weight_path} provided for {path}: " f"assigning default weight {DEFAULT_WEIGHT} ({err})" ) return DEFAULT_WEIGHT if not weight.isdecimal() or int(weight) == 0: logging.error( f"invalid weight file %s: weights must be positive integers, got %r", weight_path, weight, ) raise WeightValueError(path, weight) return int(weight) @property def global_code_blocks(self) -> Mapping[str, Sequence[str]]: # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. return { block_name: block_lines for child in self.children for block_name, block_lines in child.global_code_blocks.items() } PK!{5transformer/task.py# -*- coding: utf-8 -*- """ A representation of a Locust Task. """ import json from types import MappingProxyType from typing import Iterable, NamedTuple, Iterator, Sequence, Optional, Mapping, List import transformer.python as py from transformer.blacklist import on_blacklist from transformer.helpers import zip_kv_pairs from transformer.request import HttpMethod, Request, QueryPair IMMUTABLE_EMPTY_DICT = MappingProxyType({}) TIMEOUT = 30 ACTION_INDENTATION_LEVEL = 12 class LocustRequest(NamedTuple): """ All parameters for the request performed by the Locust client object. """ method: HttpMethod url: str headers: Mapping[str, str] post_data: dict = MappingProxyType({}) query: Sequence[QueryPair] = () @classmethod def from_request(cls, r: Request) -> "LocustRequest": return LocustRequest( method=r.method, url=repr(r.url.geturl()), headers=zip_kv_pairs(r.headers), post_data=r.post_data, query=r.query, ) NOOP_HTTP_METHODS = {HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.DELETE} def as_locust_action(self) -> str: args = { "url": self.url, "name": self.url, "headers": self.headers, "timeout": TIMEOUT, "allow_redirects": False, } if self.method is HttpMethod.POST: post_data = _parse_post_data(self.post_data) args[post_data["key"]] = post_data["data"] elif self.method is HttpMethod.PUT: post_data = _parse_post_data(self.post_data) args["params"] = zip_kv_pairs(self.query) args[post_data["key"]] = post_data["data"] elif self.method not in self.NOOP_HTTP_METHODS: raise ValueError(f"unsupported HTTP method: {self.method!r}") method = self.method.name.lower() named_args = ", ".join(f"{k}={v}" for k, v in args.items()) return f"response = self.client.{method}({named_args})" class Task2: def __init__( self, name: str, request: Request, statements: Sequence[py.Statement] = (), # TODO: Replace me with a plugin framework that accesses the full tree. # See https://github.com/zalando-incubator/Transformer/issues/11. global_code_blocks: Mapping[str, Sequence[str]] = IMMUTABLE_EMPTY_DICT, ) -> None: self.name = name self.request = request self.statements = list(statements) self.global_code_blocks = {k: list(v) for k, v in global_code_blocks.items()} @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task2"]: """ Generates a set of tasks from a given set of HTTP requests. Each request will be turned into an unevaluated function call making the actual request. The returned tasks are ordered by increasing timestamp of the corresponding request. """ # TODO: Update me when merging Task with Task2: "statements" needs to # contain the equivalent of LocustRequest. # See https://github.com/zalando-incubator/Transformer/issues/11. for req in sorted(requests, key=lambda r: r.timestamp): if not on_blacklist(req.url.netloc): yield cls(name=req.task_name(), request=req, statements=...) @classmethod def from_task(cls, task: "Task") -> "Task2": # TODO: Remove me as soon as the old Task is no longer used and Task2 is # renamed to Task. # See https://github.com/zalando-incubator/Transformer/issues/11. locust_request = task.locust_request if locust_request is None: locust_request = LocustRequest.from_request(task.request) return cls( name=task.name, request=task.request, statements=[ py.OpaqueBlock(block) for block in [ *task.locust_preprocessing, locust_request.as_locust_action(), *task.locust_postprocessing, ] ], ) class Task(NamedTuple): """ One step of "doing something" on a website. This basically represents a @task in Locust-speak. """ name: str request: Request locust_request: Optional[LocustRequest] = None locust_preprocessing: Sequence[str] = () locust_postprocessing: Sequence[str] = () global_code_blocks: Mapping[str, Sequence[str]] = MappingProxyType({}) @classmethod def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task"]: """ Generates a set of Tasks from a given set of Requests. """ for req in sorted(requests, key=lambda r: r.timestamp): if on_blacklist(req.url.netloc): continue else: yield cls(name=req.task_name(), request=req) def as_locust_action(self, indentation=ACTION_INDENTATION_LEVEL) -> str: """ Converts a Task into a Locust Action. """ action: List[str] = [] for preprocessing in self.locust_preprocessing: action.append(_indent(preprocessing, indentation)) if self.locust_request is None: locust_request = LocustRequest.from_request(self.request) else: locust_request = self.locust_request action.append(locust_request.as_locust_action()) for postprocessing in self.locust_postprocessing: action.append(_indent(postprocessing, indentation)) return "\n".join(action) def inject_headers(self, headers: dict): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace( headers={**original_locust_request.headers, **headers} ) task = self._replace(locust_request=new_locust_request) return task def replace_url(self, url: str): if self.locust_request is None: original_locust_request = LocustRequest.from_request(self.request) else: original_locust_request = self.locust_request new_locust_request = original_locust_request._replace(url=url) return self._replace(locust_request=new_locust_request) def _indent(input_string: str, requested_indentation: int) -> str: output_string = "" indentation = requested_indentation initial_leading_spaces = 0 for i, line in enumerate(input_string.splitlines()): leading_spaces = len(line) - len(line.lstrip()) if leading_spaces > 0: # We need to check the indentation of the second line in order to # account for the case where the existing indentation is greater than # the requested; it is used for reapplying sub-level-indentation e.g. # to if statements. if i == 1: initial_leading_spaces = leading_spaces else: indentation = requested_indentation + ( leading_spaces - initial_leading_spaces ) line = line.lstrip() output_string += line.rjust(len(line) + indentation, " ") + "\n" return output_string def _parse_post_data(post_data: dict) -> dict: data = post_data.get("text") mime: str = post_data.get("mimeType") if mime == "application/json": key = "json" # Workaround for bug in chrome-har: # https://github.com/sitespeedio/chrome-har/issues/23 # TODO: Remove once bug fixed. if data is None: params = post_data.get("params") if params is None: data = "" else: data = {} for param in params: data[param.get("name")] = param.get("value") else: data = json.loads(data) else: key = "data" if data: data = data.encode() return {"key": key, "data": data} PK!M~CCtransformer/test_blacklist.py# pylint: skip-file import io import os import logging from unittest.mock import patch from transformer.blacklist import on_blacklist class TestBlacklist: @patch("builtins.open") def test_it_returns_false_and_logs_error_if_the_blacklist_does_not_exist( self, mock_open, caplog ): mock_open.side_effect = FileNotFoundError caplog.set_level(logging.DEBUG) assert on_blacklist("") is False assert f"Could not read blacklist file {os.getcwd()}/.urlignore" in caplog.text @patch("builtins.open") def test_it_returns_false_if_the_blacklist_is_empty(self, mock_open): mock_open.return_value = io.StringIO("") assert on_blacklist("") is False @patch("builtins.open") def test_it_returns_false_if_url_is_not_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("www.zalando.de") is False @patch("builtins.open") def test_it_returns_true_if_url_is_on_blacklist(self, mock_open): mock_open.return_value = io.StringIO("www.google.com\nwww.amazon.com") assert on_blacklist("www.amazon.com") is True @patch("builtins.open") def test_it_returns_true_if_a_partial_match_is_found(self, mock_open): mock_open.return_value = io.StringIO("www.amazon.com") assert on_blacklist("http://www.amazon.com/") is True @patch("builtins.open") def test_it_ignores_empty_lines(self, mock_open): mock_open.return_value = io.StringIO("\nwww.amazon.com") assert on_blacklist("www.zalando.de") is False PK!؏.transformer/test_cli.pyfrom pathlib import Path from .cli import read_config class TestReadConfig: def test_paths_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config([]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli(self): conf = read_config(["/x/y", "a/b"]) assert conf.input_paths == (Path("/x/y"), Path("a/b")) def test_paths_from_cli_overwrite_those_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_INPUT_PATHS", """["/x/y", "a/b"]""") conf = read_config(["u/v/w"]) assert conf.input_paths == (Path("u/v/w"),) def test_plugins_from_env(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config([]) assert conf.plugins == ("a", "b.c.d") def test_plugins_from_cli(self): conf = read_config(["-p", "a", "XXX", "--plugin", "b.c.d"]) assert conf.plugins == ("a", "b.c.d") def test_merge_plugins_from_env_and_cli(self, monkeypatch): monkeypatch.setenv("TRANSFORMER_PLUGINS", """["a", "b.c.d"]""") conf = read_config(["-p", "e.f", "XXX", "--plugin", "g"]) assert conf.plugins == ("a", "b.c.d", "e.f", "g") PK!]JK K transformer/test_decision.pyfrom typing import List import pytest from hypothesis import given, assume from hypothesis.strategies import booleans, lists from .builders_decision import reasons, decisions, yes_decisions, no_decisions from .decision import Decision @given(decisions) def test_bool_relies_on_value_only(d: Decision): assert bool(d) == d.valid @given(decisions, decisions) def test_equality_relies_on_value_only(a: Decision, b: Decision): assert (a == b) == (a.valid == b.valid) class TestYes: def test_is_true(self): assert Decision.yes().valid is True def test_without_argument_has_dummy_reason(self): assert Decision.yes().reason.lower() == "ok" @given(reasons) def test_with_argument_records_reason(self, r: str): assert Decision.yes(r).reason == r class TestNo: @given(reasons) def test_is_false_regardless_of_reason(self, r: str): assert Decision.no(r).valid is False def test_without_argument_raises_error(self): with pytest.raises(Exception): Decision.no() @given(reasons) def test_reason_is_recorded_and_accessible(self, r: str): assert Decision.no(r).reason == r class TestWhether: @given(decisions, reasons) def test_wrapper_propagates_wrapped_value(self, d: Decision, r: str): assert Decision.whether(d, r).valid == d.valid @given(yes_decisions, reasons) def test_reuses_wrapped_reason_when_true(self, y: Decision, r: str): assert Decision.whether(y, r).reason == y.reason @given(no_decisions, reasons) def test_enriches_wrapped_reason_when_false(self, n: Decision, r: str): assert Decision.whether(n, r).reason == f"{r}: {n.reason}" @given(booleans()) def test_accepts_raw_bool(self, b: bool): d = Decision.whether(b, "x") assert d.valid == b assert d.reason == "x", "reason is always recorded" class TestAll: @given(lists(booleans())) def test_behaves_like_builtin(self, bs: List[bool]): bs_as_decisions = (Decision(valid=b, reason="") for b in bs) assert all(bs) == bool(Decision.all(bs_as_decisions)) def test_reuse_first_wrapped_no_reason(self): y = Decision.yes() n1 = Decision.no("n1") n2 = Decision.no("n2") assert Decision.all((y, n1, y, n2, y)) == n1 class TestAny: @given(lists(booleans())) def test_behaves_like_builtin(self, bs: List[bool]): bs_as_decisions = (Decision(valid=b, reason="") for b in bs) assert any(bs) == bool(Decision.any(bs_as_decisions)) def test_reuse_first_wrapped_yes_reason(self): y1 = Decision.yes("y1") y2 = Decision.yes("y2") n = Decision.no("n") assert Decision.any((n, y1, n, y2, n)) == y1 @given(lists(no_decisions, max_size=5)) def test_reason_lists_invalid_cases_if_5_or_less(self, ds: List[Decision]): d = Decision.any(ds) assert str([x.reason for x in ds]) in d.reason def test_reason_mentions_number_of_invalid_cases_if_more_than_5(self): nb_cases = 10 ds = [Decision.no("X") for _ in range(nb_cases)] d = Decision.any(ds) assert str(nb_cases) in d.reason @given(decisions, reasons) def test_provided_reason_prefixes_default_message(self, d: Decision, r: str): assume(r) assert Decision.any([d], r).reason == f"{r}: {Decision.any([d]).reason}" PK! كsvvtransformer/test_helpers.pyfrom transformer.helpers import zip_kv_pairs from transformer.request import Header class TestZipKVPairs: def test_it_returns_a_dict_given_a_list_of_named_tuples(self): name = "some name" value = "some value" result = zip_kv_pairs([Header(name=name, value=value)]) assert isinstance(result, dict) assert result[name] == value PK!2hw transformer/test_locust.pyimport string from typing import cast from unittest.mock import MagicMock import pytest from transformer.locust import locustfile, locust_taskset from transformer.request import HttpMethod from transformer.scenario import Scenario from transformer.task import Task, TIMEOUT class TestLocustfile: def test_it_renders_a_locustfile_template(self): a_name = "some_task" a_request = MagicMock() a_request.method = HttpMethod.GET a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_request.url.geturl() a_request.url.geturl.return_value = "some_url" task = Task(a_name, a_request) scenario = Scenario(name="SomeScenario", children=[task], origin=None) scenario_group = Scenario( name="ScenarioGroup", children=[scenario], weight=2, origin=None ) script = locustfile([scenario_group]) expected = string.Template( """ # File automatically generated by Transformer: # https://github.bus.zalan.do/TIP/transformer import re from locust import HttpLocust from locust import TaskSequence from locust import TaskSet from locust import seq_task from locust import task class ScenarioGroup(TaskSet): @task(1) class SomeScenario(TaskSequence): @seq_task(1) def some_task(self): response = self.client.get(url='some_url', name='some_url', headers={}, timeout=$TIMEOUT, allow_redirects=False) class LocustForScenarioGroup(HttpLocust): task_set = ScenarioGroup weight = 2 min_wait = 0 max_wait = 10 """ ).safe_substitute({"TIMEOUT": TIMEOUT}) assert expected.strip() == script.strip() def test_generates_passed_global_code_blocks(): sg1 = Scenario( "sg1", children=[ MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b1": ["ab"]} ), MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b2": ["cd"]} ), ], origin=None, ) sg2 = Scenario( "sg2", children=[MagicMock(spec_set=Scenario, children=[], global_code_blocks={})], origin=None, ) sg3 = Scenario( "sg3", children=[ MagicMock( spec_set=Scenario, children=[], global_code_blocks={"b3": ["yz"], "b2": ["yyy", "zzz"]}, ) ], origin=None, ) code = locustfile([sg1, sg2, sg3]) assert code.endswith( "\n# b1\nab\n# b2\nyyy\nzzz\n# b3\nyz" ), "the latter b2 block should override the former" def test_locust_taskset_raises_on_malformed_scenario(): bad_child = cast(Scenario, 7) bad_scenario = Scenario(name="x", children=[bad_child], origin=None) with pytest.raises(TypeError, match=r"unexpected type .*\bchildren"): locust_taskset(bad_scenario) PK!transformer/test_naming.pyimport re from hypothesis import given, example, assume from hypothesis.strategies import text, from_regex from transformer.naming import to_identifier DIGITS_SUFFIX_RX = re.compile(r"_[0-9]+\Z") class TestToIdentifier: @given(text(min_size=1, max_size=3)) @example("0") def test_its_output_can_always_be_used_as_python_identifier(self, s: str): exec(f"{to_identifier(s)} = 2") @given(text(), text()) @example("x y", to_identifier("x y")) def test_it_has_no_collisions(self, a: str, b: str): assert a == b or to_identifier(a) != to_identifier(b) @given(from_regex(re.compile(r"[a-z_][a-z0-9_]*", re.IGNORECASE), fullmatch=True)) def test_it_does_not_add_suffix_when_not_necessary(self, input: str): assume(not DIGITS_SUFFIX_RX.search(input)) assert to_identifier(input) == input def test_it_adds_prefix_to_inputs_starting_with_digit(self): assert to_identifier("0").startswith("_") PK!3λ||transformer/test_python.pyimport pprint import string from typing import List from unittest.mock import patch import pytest from hypothesis import given from hypothesis.strategies import ( lists, text, integers, floats, one_of, booleans, none, dictionaries, ) import transformer.python as py from transformer.builders_python import ( indent_levels, ascii_inline_text, lines, opaque_blocks, statements, expressions, functions, decorations, classes, standalones, literals, function_calls, binary_ops, symbols, assignments, ifelses, imports, ) class TestLine: def test_str_is_identity_with_indent_level_zero(self): assert str(py.Line("abc", 0)) == "abc" def test_default_indent_level_is_zero(self): assert str(py.Line("abc")) == "abc" def test_default_indent_unit_is_four_spaces(self): assert py.Line.INDENT_UNIT == " " def test_str_indents_as_much_indent_units_as_provided_indent_level(self): assert ( str(py.Line("abc", 1)) == py.Line.INDENT_UNIT + "abc" ), "indent level 1 means one time INDENT_UNIT" assert str(py.Line("x", 2)) == py.Line.INDENT_UNIT * 2 + "x" @given(lines, lines) def test_equal_iff_text_and_indent_are_equal(self, a: py.Line, b: py.Line): assert a != b or (a.text == b.text and a.indent_level == b.indent_level) def test_repr(self): line = py.Line(text="a'\" b", indent_level=3) assert repr(line) == "Line(text='a\\'\" b', indent_level=3)" class TestStatement: def test_lines_must_be_implemented(self): with pytest.raises(NotImplementedError): py.Statement().lines() @given(statements, indent_levels, lists(ascii_inline_text(min_size=1), max_size=2)) def test_comment_lines(self, stmt: py.Statement, level: int, comments: List[str]): stmt.comments = comments x = py.Line.INDENT_UNIT assert [str(l) for l in stmt.comment_lines(level)] == [ x * level + f"# {line}" for line in comments ] @given(statements, lines) def test_attach_comment_without_comment_changes_nothing( self, stmt: py.Statement, line: py.Line ): clone = line.clone() stmt.comments.clear() # Some comments may have been generated by Hypothesis. lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, ascii_inline_text(min_size=1)) def test_attach_comment_with_one_line_comment_changes_only_text( self, stmt: py.Statement, line: py.Line, comment: str ): clone = line.clone() stmt.comments = [comment] lines = stmt.attach_comment(line) assert len(lines) == 1, "no additional lines must be created" l = lines[0] assert l is line, "the same Line object must be returned" assert ( l.text == f"{clone.text} # {comment}" ), "the comment is appended to Line.text" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" @given(statements, lines, lists(ascii_inline_text(min_size=1), min_size=2)) def test_attach_comment_with_multiline_comment_adds_lines_above( self, stmt: py.Statement, line: py.Line, comments: List[str] ): clone = line.clone() stmt.comments = comments lines = stmt.attach_comment(line) assert len(lines) == 1 + len(comments) l = lines[-1] assert l is line, "the same Line object must be returned last" assert l.text == clone.text, "Line.text must not change" assert l.indent_level == clone.indent_level, "Line.indent_level must not change" assert lines[:-1] == [ py.Line(f"# {s}", clone.indent_level) for s in comments ], "all lines but the last are standalone comment lines" class TestOpaqueBlock: @given(opaque_blocks, opaque_blocks) def test_equal_iff_components_are_equal(self, a: py.OpaqueBlock, b: py.OpaqueBlock): assert a != b or (a.block == b.block and a.comments == b.comments) @given(text(string.whitespace, max_size=5)) def test_lines_raises_for_empty_input_block(self, block: str): with pytest.raises(ValueError): py.OpaqueBlock(block) def test_lines_returns_block_lines_if_top_and_bottom_are_not_empty(self): ob = py.OpaqueBlock(" a\n b\n\n\n c") assert len(ob.lines()) == 5 def test_lines_returns_block_lines_without_empty_top_and_bottom(self): ob = py.OpaqueBlock("\n\n a\n b\n\n\n c\n\n\n") assert len(ob.lines()) == 5 X = py.Line.INDENT_UNIT @pytest.mark.parametrize( "input_block, indent_level, expected", [ ("x", 0, "x"), ("x", 1, " x"), (" x", 1, " x"), (" x", 2, " x"), (" x", 0, "x"), ("x\nx", 0, "x\nx"), ("x\nx", 1, " x\n x"), (" x\n x", 1, " x\n x"), ("x\n x", 1, " x\n x"), ("x\nx\n", 1, " x\n x"), (" x\n x\n", 1, " x\n x"), ("x\n x\n", 1, " x\n x"), ("x\n x\n x", 0, "x\n x\n x"), ("x\n x\n x", 1, " x\n x\n x"), ("\nx\n x", 0, "x\n x"), ("\nx\n x", 1, " x\n x"), ("\n x\n x", 0, "x\n x"), ("\n x\n x", 1, " x\n x"), ("\tx\n\t x", 1, " x\n x"), ("\tx\n\t\tx", 1, f" x\n {' ' * py.OpaqueBlock.TAB_SIZE}x"), ], ) def test_lines_indents_correctly( self, input_block: str, indent_level: int, expected: str ): lines = py.OpaqueBlock(input_block).lines(indent_level) print("lines =") pprint.pprint(lines) with patch("transformer.python.Line.INDENT_UNIT", " "): assert "\n".join(str(line) for line in lines) == expected @given(opaque_blocks, indent_levels, ascii_inline_text(min_size=1)) def test_lines_displays_comment_always_above( self, ob: py.OpaqueBlock, level: int, comment: str ): x = py.Line.INDENT_UNIT ob.comments = [comment] assert [str(l) for l in ob.lines(level)] == [ x * level + f"# {comment}", *[str(l) for l in ob.lines(level, comments=False)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.OpaqueBlock("hello", comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [ x * level + "hello" ] def test_repr(self): text = " a'\" b " assert ( repr(py.OpaqueBlock(block=text, comments=["hi"])) == "OpaqueBlock(' a\\'\" b ', comments=['hi'])" ) class TestFunction: @given(indent_levels) def test_lines_with_no_params_and_no_body(self, level: int): f = py.Function("f", params=[], statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_simple_body(self, level: int): f = py.Function( "f", params=[], statements=[py.OpaqueBlock("print('Hello!')"), py.OpaqueBlock("return")], ) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "def f():", x * (level + 1) + "print('Hello!')", x * (level + 1) + "return", ] def test_lines_with_simple_params(self): f = py.Function("f", params=["x", "y"], statements=[]) assert [str(l) for l in f.lines()] == [ "def f(x, y):", py.Line.INDENT_UNIT + "pass", ] def test_lines_with_complex_params(self): f = py.Function( "f", params=["x: int", "abc: bool = True", "*z: str"], statements=[] ) assert [str(l) for l in f.lines()] == [ "def f(x: int, abc: bool = True, *z: str):", py.Line.INDENT_UNIT + "pass", ] @given(indent_levels) def test_lines_with_nested_body(self, level: int): x = py.Line.INDENT_UNIT f = py.Function( "func", params=[], statements=[ py.Assignment("a", py.Literal(2)), py.IfElse( [(py.Literal(True), [py.Assignment("b", py.Literal(3))])], [ py.Assignment("b", py.Literal(4)), py.Assignment("c", py.Literal(1)), ], ), ], ) assert [str(l) for l in f.lines(level)] == [ x * level + "def func():", x * (level + 1) + "a = 2", x * (level + 1) + "if True:", x * (level + 2) + "b = 3", x * (level + 1) + "else:", x * (level + 2) + "b = 4", x * (level + 2) + "c = 1", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "def f():", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) f = py.Function("f", params=[], statements=[stmt], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in f.lines(level, comments=False)] == [ x * level + "def f():", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr(py.Function(name="f", params=["a"], statements=stmts, comments=["hi"])) == f"Function(name='f', params=['a'], statements={stmts!r}, comments=['hi'])" ) @given(functions, functions) def test_equal_iff_components_are_equal(self, a: py.Function, b: py.Function): assert a != b or ( a.name == b.name and a.params == b.params and a.statements == b.statements and a.comments == b.comments ) class TestDecoration: @given(decorations, decorations) def test_equal_iff_components_are_equal(self, a: py.Decoration, b: py.Decoration): assert a != b or ( a.decorator == b.decorator and a.target == b.target and a.comments == b.comments ) @given(indent_levels) def test_with_a_function(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) d = py.Decoration("task(2)", f) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_with_a_class(self, level: int): c = py.Class( "C", superclasses=[], statements=[py.Assignment("a: int", py.Literal(1))] ) d = py.Decoration("task", c) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "@task", *[str(l) for l in c.lines(level)], ] @given(indent_levels) def test_nested_decorators(self, level: int): f = py.Function("f", params=[], statements=[py.Assignment("a", py.Symbol("f"))]) first = py.Decoration("task(2)", f) second = py.Decoration("task_seq(1)", first) x = py.Line.INDENT_UNIT assert [str(l) for l in second.lines(level)] == [ x * level + "@task_seq(1)", x * level + "@task(2)", *[str(l) for l in f.lines(level)], ] @given(indent_levels) def test_lines_with_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level)] == [ x * level + "# x", x * level + "# y", x * level + "@task", x * level + "# 1", x * level + "# 2", x * level + "def f():", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): f = py.Function("f", params=[], statements=[], comments=["1", "2"]) d = py.Decoration("task", f, comments=["x", "y"]) x = py.Line.INDENT_UNIT assert [str(l) for l in d.lines(level, comments=False)] == [ x * level + "@task", x * level + "def f():", x * (level + 1) + "pass", ] def test_repr(self): f = py.Function("f", params=[], statements=[]) assert ( repr(py.Decoration("task", f, comments=["hi"])) == f"Decoration('task', {f!r}, comments=['hi'])" ) class TestClass: @given(classes, classes) def test_equal_iff_components_are_equal(self, a: py.Class, b: py.Class): assert a != b or ( a.name == b.name and a.statements == b.statements and a.superclasses == b.superclasses and a.comments == b.comments ) @given(indent_levels) def test_empty_class(self, level: int): c = py.Class("A", statements=[]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "pass", ] @given( lists( text(string.ascii_letters, min_size=1, max_size=2), min_size=0, max_size=3 ) ) def test_class_with_superclasses(self, names: List[str]): c = py.Class("A", statements=[], superclasses=names) x = py.Line.INDENT_UNIT if names: expected = "(" + ", ".join(names) + ")" else: expected = "" assert [str(l) for l in c.lines()] == [f"class A{expected}:", x + "pass"] @given(indent_levels) def test_class_with_fields(self, level: int): c = py.Class( "A", statements=[ py.Assignment("a", py.Literal(2)), py.Assignment("b", py.Literal(3)), ], ) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "class A:", x * (level + 1) + "a = 2", x * (level + 1) + "b = 3", ] @given(indent_levels) def test_lines_with_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "class C:", *[str(l) for l in stmt.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): stmt = py.OpaqueBlock("foo", comments=["x"]) c = py.Class("C", statements=[stmt], superclasses=[], comments=["1", "2"]) x = py.Line.INDENT_UNIT assert [str(l) for l in c.lines(level, comments=False)] == [ x * level + "class C:", *[str(l) for l in stmt.lines(level + 1, comments=False)], ] def test_repr(self): stmts = [py.OpaqueBlock("raise")] assert ( repr( py.Class( name="C", statements=stmts, superclasses=["A"], comments=["hi"] ) ) == f"Class(name='C', statements={stmts!r}, superclasses=['A'], comments=['hi'])" ) class TestExpression: def test_str_must_be_implemented(self): with pytest.raises(NotImplementedError): str(py.Expression()) class TestStandalone: @given(standalones, standalones) def test_equal_iff_components_are_equal(self, a: py.Standalone, b: py.Standalone): assert a != b or (a.expr == b.expr and a.comments == b.comments) @given(expressions, indent_levels) def test_lines_returns_the_expression_as_single_line( self, e: py.Expression, level: int ): stmt = py.Standalone(e) lines = stmt.lines(level) assert len(lines) == 1 line = lines[0] assert line.indent_level == level assert line.text == str(e) @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT ob = py.Standalone(py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in ob.lines(level, comments=False)] == [x * level + "a"] def test_repr(self): expr = py.Symbol("a") assert ( repr(py.Standalone(expr, comments=["hi"])) == f"Standalone({expr!r}, comments=['hi'])" ) class TestLiteral: @given(literals, literals) def test_equal_iff_components_are_equal(self, a: py.Literal, b: py.Literal): assert a != b or a.value == b.value scalars = one_of(none(), booleans(), text(max_size=5), integers(), floats()) @given(scalars) def test_literal_scalar_uses_repr(self, x): assert str(py.Literal(x)) == repr(x) @given(lists(scalars)) def test_literal_list_of_scalars_uses_repr(self, x: list): assert str(py.Literal(x)) == repr(x) @given(dictionaries(scalars, scalars)) def test_literal_dict_of_scalars_uses_repr(self, x: dict): assert str(py.Literal(x)) == repr(x) def test_literal_composites_with_expr_use_repr_except_for_expr(self): lit = py.Literal("b") assert repr(lit) == "Literal('b')", "repr works on literal" assert str(py.Literal([1, {"a": lit}])) == "[1, {'a': 'b'}]" assert repr(lit) == "Literal('b')", "repr is still working on literal" class TestFString: def test_strings_appear_as_fstrings(self): assert str(py.FString("")) == "f''" assert str(py.FString("ab")) == "f'ab'" assert str(py.FString("a'b")) == """ f"a'b" """.strip() assert str(py.FString('a"b')) == """ f'a"b' """.strip() def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.FString(24)) def test_format_template_is_not_replaced(self): a = 2 assert str(py.FString("a {a} {} {a!r}")) == "f'a {a} {} {a!r}'" class TestSymbol: @given(symbols, symbols) def test_equal_iff_components_are_equal(self, a: py.Symbol, b: py.Symbol): assert a != b or a.name == b.name @given(text(string.ascii_letters)) def test_strings_appear_unchanged(self, s: str): assert str(py.Symbol(s)) == s def test_non_strings_raise_error(self): with pytest.raises(TypeError): assert str(py.Symbol(True)) def test_repr(self): assert repr(py.Symbol(" x'\" y ")) == "Symbol(' x\\'\" y ')" class TestFunctionCall: @given(function_calls, function_calls) def test_equal_iff_components_are_equal( self, a: py.FunctionCall, b: py.FunctionCall ): assert a != b or ( a.name == b.name and a.positional_args == b.positional_args and a.named_args == b.named_args ) def test_with_no_args(self): assert str(py.FunctionCall("f")) == "f()" def test_with_positional_args(self): assert str(py.FunctionCall("f", [py.Literal(2)])) == "f(2)" def test_with_kwargs(self): assert ( str( py.FunctionCall( "f", named_args={"a": py.Literal(2), "bc": py.Literal("x")} ) ) == "f(a=2, bc='x')" ) def test_with_positional_and_kwargs(self): assert ( str( py.FunctionCall( "m.f", [py.Literal(True), py.FunctionCall("g", [py.Symbol("f")])], {"a": py.Literal(2), "bc": py.Literal("x")}, ) ) == "m.f(True, g(f), a=2, bc='x')" ) def test_repr(self): arg = py.Symbol("a") kwarg = py.Symbol("v") assert ( repr(py.FunctionCall("f", [arg], {"k": kwarg})) == f"FunctionCall('f', [{arg!r}], {{'k': {kwarg!r}}})" ) class TestBinaryOp: @given(binary_ops, binary_ops) def test_equal_iff_components_are_equal(self, a: py.BinaryOp, b: py.BinaryOp): assert a != b or (a.op == b.op and a.lhs == b.lhs and a.rhs == b.rhs) def test_simple(self): assert str(py.BinaryOp(py.Literal(2), "**", py.Literal(10))) == "2 ** 10" def test_nested(self): assert ( str( py.BinaryOp( py.Literal(2), "+", py.BinaryOp( py.BinaryOp(py.Literal(3), "-", py.Literal(4)), "*", py.Literal(5), ), ) ) == "2 + ((3 - 4) * 5)" ) class TestAssignment: @given(assignments, assignments) def test_equal_iff_components_are_equal(self, a: py.Assignment, b: py.Assignment): assert a != b or ( a.lhs == b.lhs and a.rhs == b.rhs and a.comments == b.comments ) @given(indent_levels) def test_simple(self, level: int): x = py.Line.INDENT_UNIT assert [str(l) for l in py.Assignment("foo", py.Literal(3)).lines(level)] == [ x * level + "foo = 3" ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "x = a", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Assignment("x", py.Symbol("a"), comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "x = a" ] def test_repr(self): rhs = py.Symbol("a") assert ( repr(py.Assignment("x", rhs, comments=["hi"])) == f"Assignment(lhs='x', rhs={rhs!r}, comments=['hi'])" ) class TestIfElse: @given(ifelses, ifelses) def test_equal_iff_components_are_equal(self, a: py.IfElse, b: py.IfElse): assert a != b or ( a.condition_blocks == b.condition_blocks and a.else_block == b.else_block and a.comments == b.comments ) def test_init_with_no_condition_raises_error(self): with pytest.raises(ValueError): py.IfElse([]) with pytest.raises(ValueError): py.IfElse([], else_block=[]) with pytest.raises(ValueError): py.IfElse([], else_block=[py.Assignment("a", py.Literal(2))]) @given(indent_levels) def test_lines_for_single_if(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ] ).lines(level) ] == [x * level + "if t is None:", x * (level + 1) + "t = 1"] @given(indent_levels) def test_lines_for_if_else(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ) ], [py.Assignment("t", py.Literal(2))], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "else:", x * (level + 1) + "t = 2", ] @given(indent_levels) def test_lines_for_if_elif(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ ( py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), [py.Assignment("t", py.Literal(1))], ), ( py.Literal(False), [ py.Assignment("t", py.Literal(2)), py.Standalone(py.FunctionCall("f", [py.Symbol("t")])), ], ), ] ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "t = 1", x * level + "elif False:", x * (level + 1) + "t = 2", x * (level + 1) + "f(t)", ] @given(indent_levels) def test_lines_for_if_elif_else_with_no_statements(self, level: int): x = py.Line.INDENT_UNIT assert [ str(l) for l in py.IfElse( [ (py.BinaryOp(py.Symbol("t"), "is", py.Literal(None)), []), (py.Literal(False), []), (py.Literal(True), []), ], [], ).lines(level) ] == [ x * level + "if t is None:", x * (level + 1) + "pass", x * level + "elif False:", x * (level + 1) + "pass", x * level + "elif True:", x * (level + 1) + "pass", ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "if True:", *[str(l) for l in if_true.lines(level + 1)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1)], ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT cond = py.Literal(True) if_true = py.Assignment("x", py.Symbol("a"), comments=["tx", "ty"]) if_false = py.Assignment("x", py.Symbol("b"), comments=["fx", "fy"]) stmt = py.IfElse([(cond, [if_true])], [if_false], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "if True:", *[str(l) for l in if_true.lines(level + 1, comments=False)], x * level + "else:", *[str(l) for l in if_false.lines(level + 1, comments=False)], ] def test_repr(self): cond = py.Literal(True) if_true = [(cond, [py.Assignment("x", py.Symbol("a"))])] stmt = py.IfElse(condition_blocks=if_true, comments=["hi"]) assert ( repr(stmt) == f"IfElse(condition_blocks={if_true!r}, else_block=None, comments=['hi'])" ) class TestImport: @given(imports, imports) def test_equal_iff_components_are_equal(self, a: py.Import, b: py.Import): assert a != b or ( a.targets == b.targets and a.source == b.source and a.alias == b.alias and a.comments == b.comments ) def test_init_without_targets_raises_error(self): with pytest.raises(ValueError): py.Import([]) @given(indent_levels) def test_lines_without_targets_raises_error(self, level: int): i = py.Import(["safe"]) i.targets.clear() with pytest.raises(ValueError): i.lines(level) @given(indent_levels) def test_lines_with_single_target(self, level: int): x = py.Line.INDENT_UNIT name = "locust.http" assert [str(l) for l in py.Import([name]).lines(level)] == [ x * level + f"import {name}" ] @given(indent_levels) def test_lines_with_multiple_targets(self, level: int): x = py.Line.INDENT_UNIT names = ["locust.http", "math", "a.b.c"] assert [str(l) for l in py.Import(names).lines(level)] == [ x * level + f"import {name}" for name in names ] @given(indent_levels) def test_lines_with_single_target_and_alias(self, level: int): x = py.Line.INDENT_UNIT name = "transformer.python" alias = "py" assert [str(l) for l in py.Import([name], alias=alias).lines(level)] == [ x * level + f"import {name} as {alias}" ] def test_init_with_multiple_targets_and_alias_raises_error(self): with pytest.raises(ValueError): py.Import(["a", "b"], alias="c") @given(indent_levels) def test_lines_with_multiple_targets_and_alias_raises_error(self, level: int): i = py.Import(["safe"], alias="A") i.targets.append("oops") with pytest.raises(ValueError): i.lines(level) @given(indent_levels, lists(text(string.ascii_letters, min_size=1), min_size=1)) def test_lines_with_targets_and_source(self, level: int, targets: List[str]): x = py.Line.INDENT_UNIT source = "bar" assert [str(l) for l in py.Import(targets, source).lines(level)] == [ x * level + f"from {source} import {target}" for target in targets ] @given(indent_levels) def test_lines_with_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level)] == [ x * level + "# 1", x * level + "# 2", x * level + "import a", x * level + "import b", x * level + "import c", ] @given(indent_levels) def test_lines_with_hidden_comments(self, level: int): x = py.Line.INDENT_UNIT stmt = py.Import(["a", "b", "c"], comments=["1", "2"]) assert [str(l) for l in stmt.lines(level, comments=False)] == [ x * level + "import a", x * level + "import b", x * level + "import c", ] def test_repr(self): stmt = py.Import(targets=["a", "b"], comments=["hi"]) assert ( repr(stmt) == f"Import(targets=['a', 'b'], source=None, alias=None, comments=['hi'])" ) PK!7transformer/test_request.py# pylint: skip-file from unittest.mock import MagicMock import pytest from transformer.request import * class TestFromHarEntry: def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(KeyError): invalid_dict = {"some": "data"} Request.from_har_entry(invalid_dict) def test_it_returns_a_request_given_a_get_request(self): get_request = { "request": {"method": "GET", "url": ""}, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(get_request) assert isinstance(request, Request) assert request.method == HttpMethod.GET def test_it_returns_a_request_given_a_post_request(self): post_request = { "request": { "method": "POST", "url": "", "postData": "{'some name': 'some value'}", }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(post_request) assert isinstance(request, Request) assert request.method == HttpMethod.POST assert request.post_data == "{'some name': 'some value'}" def test_it_returns_a_request_given_a_put_request(self): put_request = { "request": { "method": "PUT", "url": "", "postData": "{'some name': 'some value'}", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(put_request) assert isinstance(request, Request) assert request.method == HttpMethod.PUT assert request.post_data == "{'some name': 'some value'}" assert request.query == [QueryPair(name="some name", value="some value")] def test_it_returns_a_request_with_headers_given_an_options_request(self): options_request = { "request": { "method": "OPTIONS", "url": "", "postData": "", "headers": [{"name": "Access-Control-Request-Method", "value": "POST"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(options_request) assert isinstance(request, Request) assert request.method == HttpMethod.OPTIONS assert request.headers == [ Header(name="Access-Control-Request-Method", value="POST") ] def test_it_returns_a_request_with_a_query_given_a_delete_request_with_a_query( self ): delete_request = { "request": { "method": "DELETE", "url": "", "queryString": [{"name": "some name", "value": "some value"}], }, "startedDateTime": "2018-01-01", } request = Request.from_har_entry(delete_request) assert isinstance(request, Request) assert request.method == HttpMethod.DELETE assert request.query == [QueryPair(name="some name", value="some value")] class TestAllFromHar: @pytest.mark.skip(reason="Doesn't raise AssertionError; to be investigated.") def test_it_returns_an_error_given_an_invalid_dict(self): with pytest.raises(AssertionError): invalid_dict = {"some": "data"} Request.all_from_har(invalid_dict) def test_it_returns_a_list_of_requests_given_a_valid_dict(self): valid_dict = { "log": { "entries": [ { "request": {"method": "GET", "url": "", "postData": ""}, "startedDateTime": "2018-01-01", } ] } } assert isinstance(Request.all_from_har(valid_dict), Iterator) for request in Request.all_from_har(valid_dict): assert isinstance(request, Request) class TestTaskName: def test_it_generates_a_task_name_given_a_request(self): a_request = MagicMock() a_request.method.name = "some_name" a_request.url.scheme = "some_scheme" a_request.url.hostname = "some_hostname" a_request.url.path = "some_path" a_task_name = Request.task_name(a_request) a_duplicate_task_name = Request.task_name(a_request) assert a_task_name == a_duplicate_task_name a_request.method.name = "some_other_name" a_different_task_name = Request.task_name(a_request) assert a_task_name != a_different_task_name PK!ՔAl))transformer/test_scenario.pyimport logging import re import string from pathlib import Path from typing import List from unittest.mock import MagicMock, patch import pytest from hypothesis import given from hypothesis.strategies import lists, text, recursive, tuples from transformer.helpers import DUMMY_HAR_STRING, _DUMMY_HAR_DICT from transformer.scenario import Scenario, SkippableScenarioError, WeightValueError from transformer.task import Task paths = recursive( text(string.printable.replace("/", ""), min_size=1, max_size=3).filter( lambda s: s != "." and s != ".." ), lambda x: tuples(x, x).map("/".join), max_leaves=4, ).map(Path) class TestScenario: @patch("transformer.scenario.Path.is_dir", MagicMock(return_value=False)) @patch("transformer.scenario.Path.iterdir", MagicMock(return_value=())) @patch("transformer.scenario.Path.open") @patch("transformer.scenario.json.load", MagicMock(return_value=_DUMMY_HAR_DICT)) @given(paths=lists(paths, unique=True, min_size=2)) def test_names_are_unique(*_, paths: List[Path]): scenario_names = [Scenario.from_path(path).name for path in paths] assert sorted(set(scenario_names)) == sorted(scenario_names) assert len(paths) == len(scenario_names) def test_creation_from_scenario_directory_with_weight_file(self, tmp_path: Path): root_path = tmp_path / "some-path" root_path.mkdir() expected_weight = 7 root_path.with_suffix(".weight").write_text(str(expected_weight)) nb_har_files = 2 for i in range(nb_har_files): root_path.joinpath(f"{i}.har").write_text(DUMMY_HAR_STRING) result = Scenario.from_path(root_path) assert len(result.children) == nb_har_files assert result.weight == expected_weight class TestFromPath: def test_on_har_raises_error_with_incorrect_har(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(not_har_path) def test_on_dir_ignores_some_incorrect_hars(self, tmp_path: Path): not_har_path = tmp_path / "not.har" not_har_path.write_text("not JSON!") har_path = tmp_path / "good.har" har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(tmp_path) assert len(scenario.children) == 1 assert scenario.children[0].origin == har_path def test_on_dir_raises_error_with_all_incorrect_hars(self, tmp_path: Path): for i in range(2): tmp_path.joinpath(f"{i}.nothar").write_text("not JSON!") with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) def test_on_dir_with_dangling_weights_raises_error( self, tmp_path: Path, caplog ): (tmp_path / "ok.har").write_text(DUMMY_HAR_STRING) (tmp_path / "fail.weight").write_text("7") caplog.set_level(logging.INFO) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "weight file" in caplog.text assert any( r.levelname == "ERROR" for r in caplog.records ), "at least one ERROR logged" def test_records_global_code_blocks_from_tasks(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) scenario = Scenario("scenario", [t1, t2, t3], origin=None) assert scenario.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_from_scenarios(self): t1_blocks = {"t1-1": ["abc"], "t1-2": ["def"]} t1 = Task("t1", request=MagicMock(), global_code_blocks=t1_blocks) t2 = Task("t2", request=MagicMock()) t3_blocks = {"t3-1": ("xyz",)} t3 = Task("t3", request=MagicMock(), global_code_blocks=t3_blocks) s1 = Scenario("s1", [t1, t2], origin=None) s2 = Scenario("s2", [t3], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == {**t1_blocks, **t3_blocks} def test_group_records_global_code_blocks_uniquely(self): common_blocks = {"x": ["a", "b"]} t1 = Task( "t1", request=MagicMock(), global_code_blocks={**common_blocks, "t1b": ["uvw"]}, ) t2 = Task( "t2", request=MagicMock(), global_code_blocks={**common_blocks, "t2b": ["xyz"]}, ) s1 = Scenario("s1", [t1], origin=None) s2 = Scenario("s2", [t2], origin=None) sg = Scenario("sg", [s1, s2], origin=None) assert sg.global_code_blocks == { **common_blocks, "t1b": ["uvw"], "t2b": ["xyz"], } def test_without_weight_file_has_weight_1(self, tmp_path: Path): har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 1 def test_with_weight_file_has_corresponding_weight(self, tmp_path: Path): weight_path = tmp_path / "test.weight" weight_path.write_text("74") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 74 @pytest.mark.parametrize("weight", [0, -2, 2.1, -2.1, "NaN", "abc", " "]) def test_with_invalid_weight_raises_error_and_never_skips( self, tmp_path: Path, weight ): legit_har_path = tmp_path / "legit.har" legit_har_path.write_text(DUMMY_HAR_STRING) bad_weight_path = tmp_path / "test.weight" bad_weight_path.write_text(str(weight)) bad_weight_har_path = tmp_path / "test.har" bad_weight_har_path.write_text(DUMMY_HAR_STRING) with pytest.raises(WeightValueError): # If from_path was skipping the bad scenario/weight pair, it # would not raise because there is another valid scenario, # legit.har. Scenario.from_path(tmp_path) def test_with_many_weight_files_selects_weight_based_on_name( self, tmp_path: Path ): expected_weight_path = tmp_path / "test.weight" expected_weight_path.write_text("7") first_wrong_weight_path = tmp_path / "a.weight" first_wrong_weight_path.write_text("2") second_wrong_weight_path = tmp_path / "1.weight" second_wrong_weight_path.write_text("4") har_path = tmp_path / "test.har" har_path.write_text(DUMMY_HAR_STRING) assert Scenario.from_path(har_path).weight == 7 def test_uses_full_path_for_scenario_name(self, tmp_path: Path): har_basename = "e3ee4a1ef0817cde0a0a78c056e7cb35" har_path = tmp_path / har_basename har_path.write_text(DUMMY_HAR_STRING) scenario = Scenario.from_path(har_path) words_in_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", scenario.name) } assert har_basename in words_in_scenario_name words_in_parent_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(tmp_path)) } words_in_scenario_name_not_from_har_basename = words_in_scenario_name - { har_basename } assert ( words_in_parent_path <= words_in_scenario_name_not_from_har_basename ), "all components of the parent path must be in the scenario name" def test_uses_full_path_for_parents_and_basename_for_children( self, tmp_path: Path ): root_basename = "615010a656a5bb29d1898f163619611f" root = tmp_path / root_basename root.mkdir() for i in range(2): (root / f"s{i}.har").write_text(DUMMY_HAR_STRING) root_scenario = Scenario.from_path(root) words_in_root_scenario_name = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", root_scenario.name) } words_in_root_path = { m.group() for m in re.finditer(r"[A-Za-z0-9]+", str(root)) } assert ( words_in_root_path <= words_in_root_scenario_name ), "parent scenario's name must come from full path" assert len(root_scenario.children) == 2 child_scenario_names = {c.name for c in root_scenario.children} assert child_scenario_names == { "s0", "s1", }, "child scenarios have short names" def test_raises_error_for_colliding_scenario_names_from_har_files( self, tmp_path: Path, caplog ): (tmp_path / "good.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.har").write_text(DUMMY_HAR_STRING) (tmp_path / "bad.json").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert "bad.har" in caplog.text assert "bad.json" in caplog.text def test_raises_error_for_colliding_scenario_names_from_directory_and_file( self, tmp_path: Path, caplog ): directory = tmp_path / "x" directory.mkdir() # directory needs to contain a HAR file, otherwise Transformer will # not consider it a scenario. (directory / "a.har").write_text(DUMMY_HAR_STRING) (tmp_path / "x.har").write_text(DUMMY_HAR_STRING) caplog.set_level(logging.ERROR) with pytest.raises(SkippableScenarioError): Scenario.from_path(tmp_path) assert "colliding names" in caplog.text assert re.search(r"\bx\b", caplog.text) assert re.search(r"\bx.har\b", caplog.text) PK!걎Z(Z(transformer/test_task.py# pylint: skip-file import io import json from unittest.mock import MagicMock from unittest.mock import patch from urllib.parse import urlparse import pytest from transformer.request import Header from transformer.task import ( Task, Request, HttpMethod, QueryPair, TIMEOUT, LocustRequest, ) class TestFromRequests: def test_it_returns_a_task(self): request = MagicMock() request.timestamp = 1 second_request = MagicMock() second_request.timestamp = 2 assert all( isinstance(t, Task) for t in Task.from_requests([request, second_request]) ) @patch("builtins.open") def test_it_doesnt_create_a_task_if_the_url_is_on_the_blacklist(self, mock_open): mock_open.return_value = io.StringIO("amazon") request = MagicMock() request.url = MagicMock() request.url.netloc = "www.amazon.com" task = Task.from_requests([request]) assert len(list(task)) == 0 @patch("builtins.open") def test_it_creates_a_task_if_the_path_not_host_is_on_the_blacklist( self, mock_open ): mock_open.return_value = io.StringIO("search\namazon") request = MagicMock() request.url = urlparse("https://www.google.com/search?&q=amazon") task = Task.from_requests([request]) assert len(list(task)) == 1 class TestAsLocustAction: def test_it_returns_an_error_given_an_unsupported_http_method(self): a_request_with_an_unsupported_http_method = MagicMock() task = Task("some_task", a_request_with_an_unsupported_http_method) with pytest.raises(ValueError): task.as_locust_action() def test_it_returns_a_string(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) assert isinstance(task.as_locust_action(), str) def test_it_returns_action_from_locust_request(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET locust_request = LocustRequest( method=HttpMethod.GET, url=repr("http://locust-task"), headers={} ) task = Task("some_task", request=a_request, locust_request=locust_request) action = task.as_locust_action() assert action.startswith("response = self.client.get(url='http://locust-task'") def test_it_returns_task_using_get_given_a_get_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.get(") def test_it_returns_a_task_using_post_given_a_post_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {} task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.post(") def test_it_returns_a_task_using_put_given_a_put_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.PUT a_request.post_data = {"text": "{'some key': 'some value'}"} a_request.query = [QueryPair(name="some name", value="some value")] task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.put(") assert "params={'some name': 'some value'}" in action assert "data=b\"{'some key': 'some value'}\"" in action def test_it_returns_a_task_using_options_given_an_options_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.OPTIONS a_request.headers = [Header(name="Access-Control-Request-Method", value="POST")] task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.options(") assert "headers={'Access-Control-Request-Method': 'POST'" in action def test_it_returns_a_task_using_delete_given_a_delete_http_method(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.DELETE a_request.url = urlparse("http://www.some.web.site/?some_name=some_value") task = Task("some_task", a_request) action = task.as_locust_action() assert action.startswith("response = self.client.delete(") assert "?some_name=some_value" in action def test_it_provides_timeout_to_requests(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) action = task.as_locust_action() assert f"timeout={TIMEOUT}" in action def test_it_injects_headers(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET a_request.headers = [Header(name="some_header", value="some_value")] task = Task("some_task", a_request) action = task.as_locust_action() assert "some_value" in action def test_it_encodes_data_in_task_for_text_mime(self): decoded_value = '{"formatted": "54,95 €"}' a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {"text": decoded_value} task = Task("some_task", a_request) action = task.as_locust_action() assert str(decoded_value.encode()) in action def test_it_encodes_data_in_task_for_json_mime(self): decoded_value = '{"formatted": "54,95 €"}' a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = {"text": decoded_value, "mimeType": "application/json"} task = Task("some_task", a_request) action = task.as_locust_action() assert str(json.loads(decoded_value)) in action def test_it_converts_post_params_to_post_text(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.POST a_request.post_data = { "mimeType": "application/json", "params": [ {"name": "username", "value": "some user"}, {"name": "password", "value": "some password"}, ], } task = Task("some task", a_request) action = task.as_locust_action() assert "'username': 'some user'" in action assert "'password': 'some password'" in action def test_it_creates_a_locust_request_when_there_is_none(self): task = Task(name="some name", request=MagicMock()) modified_task = Task.inject_headers(task, {}) assert modified_task.locust_request def test_it_returns_a_task_with_the_injected_headers(self): locust_request = LocustRequest( method=MagicMock(), url=MagicMock(), headers={"x-forwarded-for": ""} ) task = Task( name="some name", request=MagicMock(), locust_request=locust_request ) expected_headers = {"x-forwarded-for": "1.2.3.4"} modified_task = Task.inject_headers(task, headers=expected_headers) assert isinstance(modified_task, Task) headers = modified_task.locust_request.headers assert len(headers) == 1 assert headers == expected_headers class TestIndentation: def test_pre_processing_returns_an_indented_string_given_an_indentation(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = (*task.locust_preprocessing, "def some_function():") task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=2) assert action.startswith(" def some_function():") def test_post_processing_returns_an_indented_string_given_an_indentation(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_post_processings = (*task.locust_postprocessing, "def some_function():") task = task._replace(locust_postprocessing=new_post_processings) action = task.as_locust_action(indentation=2) assert " def some_function():" in action def test_it_applies_indentation_to_all_pre_processings(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = ( *task.locust_preprocessing, "def some_function():", "def some_other_function():", ) task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=2) assert action.startswith( " def some_function():\n\n def some_other_function():" ) def test_it_respects_sub_indentation_levels(self): a_request = MagicMock(spec_set=Request) a_request.method = HttpMethod.GET task = Task("some_task", a_request) new_pre_processings = ( *task.locust_preprocessing, "\n def function():\n if True:\n print(True)", ) task = task._replace(locust_preprocessing=new_pre_processings) action = task.as_locust_action(indentation=1) assert action.startswith(" \n def function():\n if True:\n print(True)") class TestReplaceURL: def test_it_creates_a_locust_request_when_there_is_none(self): task = Task(name="some name", request=MagicMock()) modified_task = Task.replace_url(task, "") assert modified_task.locust_request def test_it_returns_a_task_with_the_replaced_url(self): locust_request = LocustRequest( method=MagicMock(), url=MagicMock(), headers=MagicMock() ) task = Task( name="some name", request=MagicMock(), locust_request=locust_request ) expected_url = 'f"http://a.b.c/{some.value}/"' modified_task = Task.replace_url(task, expected_url) assert modified_task.locust_request.url == expected_url PK!transformer/test_transform.py# pylint: skip-file import json from pathlib import Path import pytest import transformer.transform from transformer.helpers import _DUMMY_HAR_DICT class TestTransform: def test_it_returns_a_locustfile_string_given_scenario_path(self, tmp_path: Path): har_path = tmp_path / "some.har" with har_path.open("w") as file: json.dump(_DUMMY_HAR_DICT, file) locustfile_contents = str(transformer.transform.transform(har_path)) try: compile(locustfile_contents, "locustfile.py", "exec") except Exception as exception: pytest.fail(f"Compiling locustfile failed. [{exception}].") PK!etransformer/transform.py# -*- coding: utf-8 -*- """ Entrypoint for Transformer. """ from pathlib import Path from typing import Sequence, Union import transformer.python as py from transformer.locust import locustfile from transformer.plugins.contracts import OnTaskSequence from transformer.scenario import Scenario def transform( scenarios_path: Union[str, Path], plugins: Sequence[OnTaskSequence] = () ) -> py.Program: return locustfile([Scenario.from_path(Path(scenarios_path), plugins)]) PK!HV6A0har_transformer-1.0.0.dist-info/entry_points.txtN+I/N.,()*)J+N/M-Eb[%dAŧUgpqPK!d*++'har_transformer-1.0.0.dist-info/LICENSEMIT License Copyright (c) 2019 Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU%har_transformer-1.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hq=#(har_transformer-1.0.0.dist-info/METADATAZrȕ}ϯȦ6K.Œ,-QİQ0UY@VUF,UG?LL?ϽX\<|p||'"Y "Hˎ024x٫|'?hӨWm}*Z&1So)Brk7 FR5LSM> l wö[!!h$`vo,6{#\Hf41wb$J w"%&{Nj/lkYp+keGC\[iY{ =ޑ7|Iv_!f 1./p{L.`Dff~J̵x褩8;4 ;jf/,]I Z t& ʣJ0Vl}$.{gq<)%,U3pcɉZ@YBdf,˓L/Ƈ̑J AhZZ9_0|Q _"h@"".ֶAƱ]S%J`#]~"XP@Seo,b1ΩO8ਵL \XǫXe|n*5,a2Nb-ȃn3{ȍ3RAޑc[OɢH9H}TKUj(o:f}/4(k>@(myKG$ۑ"IxH3]zKN8YӞ^222l S m]rZ1d>lTz#@d4ĭa[a9]iGQ/ rohOKoL`„ -y/,`=@dX 9˽]v ƒΣ!ĥc ?І(8L4t(2zS|-s-(ƠϴRvpɶ[mw<_OKxzE( ٧-SO`u|;d+lxRDOi8V,(NkMg4JE%s[B7.ܯ@ W4r8i9:MHg% " r&6˼G+`0D@lg8myf$pG>࿂0إa=3$UeK[> W|!HJhG#<9߹p" `)..n }[J{Pz`_[V}aߪ/aa2|G )}Y+cdR^Rqyp>y -O[ܨ`ߗyjpPO+O)%R4ǭv8S QxQ+ + '9pŜJ[rKޱЪ㻶/"6Ϲ4e qq[*㛙J ^[mP+wyA;.&c'5-ugwjq :#!͈BnNp{H-n.75ʟ- rAorsfNj2N!omiW>ig.|-_wF)W:L?H:skM ,LNk;5ot }>z‹x~e x][p;PK!H&har_transformer-1.0.0.dist-info/RECORDǶXE-7a =_dug-=Ug/8qBtuÏ. O<'Uhz4;߉oeiӰ)a281t^?뮌?]UEnxeSDlhV5F5i Ͱ`] lC>RN|^ZTt\Sqs4Caczm[kExe"ЦXbﯳ-a"Ep2w)F:R|'E/a P͘2~Hrxs'st P7EQ+λlԛ`F/+*<%&ϋF\suHި8V1 6WFVm@%m  6{a1N0C]'TG@o>\8"J@U\٩ЪչjP\YM'0o$*/8rp]M'd7sicqAd;~rB+`|?abcT83AZ(bV00hWUZ^Ҽ>r!_I۔Aβ/d,Kk1NOg8Qgp=fe,-QufB{,IsT F?p=ZGz딬|˜ i&5#A]ퟃUfِNt>Jbr>-:jɏm]X TD{_{#[!FT\-93tQ_SBj8E"7B JLgAL&hio(.jmT_@IQ< q鋧9d5[c Up= ._xMԡ Rƫg*R-s8$7U"I8kM?x}’şbp':SGHgP0dԏ"Cfjǣ(+pksCDϽS/?gອ<-A;?Ό ( ejf\Y ݷ1ƿ>Aʡ4M~⹓CٲE"Jؐ@OE~OŠf/CXXF" /R1X$UyxoqXO:zIlCGI_9IQ`omB~1'(\&?qM"q)8K!ۡM߉(۷1kdt7bO V cځ*xլIN f_mCq2c1ɭWtdԪc6ڇAމt&x`ra1_K26]3N]4-:賹LޱսdPb+j3B?}x]hFz !X{CDȱK'}൬ϛ# :->transformer/.urlignore_examplePK! 88 transformer/__init__.pyPK!WW transformer/__main__.pyPK!2yy transformer/__version__.pyPK!"nwwB transformer/blacklist.pyPK!'BB transformer/builders_decision.pyPK!?8otransformer/builders_python.pyPK!fj \ +transformer/cli.pyPK!T/  k8transformer/decision.pyPK!p=?transformer/helpers.pyPK!c4Atransformer/locust.pyPK!@y3Rtransformer/naming.pyPK!6Vtransformer/plugins/README.mdPK!-44\transformer/plugins/__init__.pyPK! ]transformer/plugins/contracts.pyPK!ȹstransformer/plugins/dummy.pyPK!!}utransformer/plugins/resolve.pyPK!'{transformer/plugins/sanitize_headers.mdPK!tXQQ'~transformer/plugins/sanitize_headers.pyPK!l@bb%wtransformer/plugins/test_contracts.pyPK!&v3!transformer/plugins/test_dummy.pyPK!|9ɦ #"transformer/plugins/test_resolve.pyPK!%HH, transformer/plugins/test_sanitize_headers.pyPK!s=*U*Utransformer/python.pyPK!0O transformer/request.pyPK!jXP| ' 'transformer/scenario.pyPK!{5`&transformer/task.pyPK!M~CCFtransformer/test_blacklist.pyPK!؏. Mtransformer/test_cli.pyPK!]JK K PRtransformer/test_decision.pyPK! كsvv_transformer/test_helpers.pyPK!2hw atransformer/test_locust.pyPK!^mtransformer/test_naming.pyPK!3λ||\qtransformer/test_python.pyPK!7Otransformer/test_request.pyPK!ՔAl))qtransformer/test_scenario.pyPK!걎Z(Z(1*transformer/test_task.pyPK!Rtransformer/test_transform.pyPK!eUtransformer/transform.pyPK!HV6A0Whar_transformer-1.0.0.dist-info/entry_points.txtPK!d*++'$Xhar_transformer-1.0.0.dist-info/LICENSEPK!HڽTU%\har_transformer-1.0.0.dist-info/WHEELPK!Hq=#(+]har_transformer-1.0.0.dist-info/METADATAPK!H&5lhar_transformer-1.0.0.dist-info/RECORDPK-- mt