PK{vPOagilkia/__init__.py"""Automated smart testing strategies for web services. This 'agilkia' package is for testing web services and managing set of traces. Traces may come from user interactions, or from automated test suites, etc. The main data structure for traces is the ``TraceSet``: * class TraceSet supports loading/saving traces as JSON, converting to Pandas, etc. * class Trace is used by TraceSet, and contains a list of Events. * Each Event is a dict that contains at least the following keys: * "action" gives the name of the action (a string); * "inputs" is a dict of input parameter names to values; * "outputs" is a dict of output parameter names to values. Automated test generation facilities include: * RandomTester generates random test sequences. * SmartTester generates tests from an ML model (Currently this is included in RandomTester.generate_trace_ml, but this will be split into a separate class shortly.) """ # This package follows a 'Convenience Store' model. # That is, it directly exports all the features that will be useful to users. # They do not need to import sub-modules. # # See the article: "What’s __init__ for me?" by Jacob Deppen on TowardsDataScience.com: # https://towardsdatascience.com/whats-init-for-me-d70a312da583 __version__ = '0.2.2' from . random_tester import (read_input_rules, RandomTester, uniq, build_interface, print_signatures, DUMP_WSDL, DUMP_SIGNATURES, GOOD_PASSWORD) from . json_traces import (Trace, TraceSet, TraceEncoder, TRACE_SET_VERSION, xml_decode, all_action_names, event_status, safe_name, default_map_to_chars, trace_to_string, traces_to_pandas) PKZPO5YUYUagilkia/json_traces.py# -*- coding: utf-8 -*- """ Data structures for Traces and Sets of Traces. This defines the 'Trace' and 'TraceSet' classes, plus helper functions. NOTE: private data fields (starting with '_') will not be stored in the JSON files. For example, each Trace object has a '_parent' point to its TraceSet, but this is not stored in the JSON file, since the hierarchical structure of the JSON already captures the parent-child relationship between TraceSet and Trace. NOTE: file version upgrade policy: JSON trace file version numbers follow the usual Semantic Versioning scheme: (Major.Minor.Patch). TraceSet.upgrade_json_data' currently just prints a warning message when a program running older code reads a JSON file with a newer MINOR version number. This allows graceful updating of one program at a time, but does have the danger that a older program may read newer data (with a warning), then SAVE that data in the slightly older format, thus losing some data. But a strict version-equality means that all programs have to be updated simultaneously, which is a pain. TODO: * DONE: add save_as_arff() method like to_pandas. * DONE: store event_chars into meta_data. * DONE: store signatures into meta_data. * split RandomTester into SmartTester subclass (better meta-data). * add ActionChars class? * extend to_pandas() to allow user-defined columns to be added. * add support for clustering traces and visualising the clusters. @author: utting@usc.edu.au """ import os import sys from pathlib import Path # object-oriented filenames! import json import decimal import datetime import re import xml.etree.ElementTree as ET import pandas as pd import arff # liac-arff from https://pypi.org/project/liac-arff from typing import List, Set, Mapping, Dict, Union # Define some type synonyms # ========================= # An event is a dictionary that maps string keys to either a string or a nested dictionary. # Every event has at least these keys: "action":str, "inputs":dict, "outputs":dict Event = Dict[str, Union[str, Mapping[str, str]]] TRACE_SET_VERSION = "0.1.2" def safe_name(string: str) -> str: """Returns 'string' with all non-alpha-numeric characters replaced by '_'.""" return re.sub("[^A-Za-z0-9]", "_", string) class Trace: """Represents a single trace, which contains a sequence of events. """ def __init__(self, events: List[Event], parent: 'TraceSet' = None, random_state=None): """Create a Trace object from a list of events. Args: events: the sequence of Events that make up this trace. parent: the TraceSet that this trace is part of. random_state: If this trace was generated using some randomness, you should supply this optional parameter, to record the state of the random generator at the start of the sequence. For example, rand_state=rand.getstate(). """ self.events = events self._parent = parent self.random_state = random_state def trace_set(self): """Returns the TraceSet that this trace is part of, or None if not known.""" return self._parent def __iter__(self): return self.events.__iter__() def to_string(self, to_char: Mapping[str, str] = None, compress: List[str] = None, color_status: bool = False): """Return a one-line summary of this trace, one character per event. See 'trace_to_string' for details. NOTE: throws an exception if no to_char map is given and this trace has no parent. """ if to_char is None: if self._parent is None: raise Exception("Cannot view trace with no parent and no to_char map.") to_char = self._parent.get_event_chars() return trace_to_string(self.events, to_char, compress=compress, color_status=color_status) def __str__(self): try: return self.to_string() except Exception: return "..." class TraceSet: """Represents a set of traces, either generated or recorded. Invariants: * forall tr:self.traces (tr._parent is self) (TODO: set _parent to None when a trace is removed?) * self.meta_data is a dict with keys: date, source at least. """ def __init__(self, traces: List[Trace], meta_data: Mapping[str, str] = None): """Create a TraceSet object from a list of Traces. Args: traces: the list of Traces. meta_data: a dictionary that captures the important meta-data for this set of traces. If this TraceSet is to be saved into a file, the meta-data should include at least the GDF (General Data Format) compulsory fields, which are based on the Dublin Core: "date" in ISO 8601 format; "dataset" for the official name of this TraceSet; "source" for the origin of the dataset; and any other meta-data that is available. """ self.version = TRACE_SET_VERSION if meta_data is None: meta_data = self.get_default_meta_data() self.meta_data = meta_data self.traces = traces for tr in self.traces: if isinstance(tr, Trace): tr._parent = self else: raise Exception("TraceSet expects List[Trace], not: " + str(type(tr))) self._event_chars = None # recalculated if set of traces grows. def __iter__(self): return self.traces.__iter__() @classmethod def get_default_meta_data(cls): """Generates some basic meta-data such as date, user and command line.""" now = datetime.datetime.now().isoformat() user = os.path.expanduser('~').split('/')[-1] # usually correct, but can be tricked. meta_data = {"date": now, "author": user, "dataset": "unknown", "action_chars": None} if len(sys.argv) > 0: meta_data["source"] = sys.argv[0] # the path to the running script/tool. meta_data["cmdline"] = sys.argv return meta_data def get_meta(self, key: str) -> any: """Returns requested meta data, or None if that key does not exist.""" if key in self.meta_data: return self.meta_data[key] else: return None def append(self, trace: Trace): """Appends the given trace into this set. This also sets its parent to be this set. """ trace._parent = self self.traces.append(trace) self._event_chars = None # we will recalculate this later def set_event_chars(self, given: Mapping[str, str] = None): """Sets up the event-to-char map that is used to visualise traces. This will calculate a default mapping for any actions that are not in given. (See 'default_map_to_chars'). Args: given: optional pre-allocation of a few action names to chars. For good readability of the printed traces, it is recommended that extremely common actions should be mapped to 'small' characters like '.' or ','. """ if given is not None: self.meta_data["action_chars"] = given # override any previous given map. else: given = self.get_meta("action_chars") actions = all_action_names(self.traces) self._event_chars = default_map_to_chars(actions, given=given) def get_event_chars(self): """Gets the event-to-char map that is used to visualise traces. This maps each action name to a single character. If set_event_chars has not been called, this getter will calculate and cache a default mapping from action names to characters. """ if self._event_chars is None: self.set_event_chars() return self._event_chars def __str__(self): name = self.meta_data["dataset"] # required meta data return f"TraceSet '{name}' with {len(self.traces)} traces." def save_to_json(self, file: Path) -> None: if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) with file.open("w") as output: json.dump(self, output, indent=2, cls=TraceEncoder) @classmethod def load_from_json(cls, file: Path) -> 'TraceSet': if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) if not isinstance(file, Path): raise Exception(f"load_from_json requires Path, not {file} (type={type(file)})") # with open(filename, "r") as input: data = json.loads(file.read_text()) # Now check version and upgrade if necessary. if isinstance(data, list): # this file was pre-TraceSet, so just a list of lists of events. mtime = datetime.datetime.fromtimestamp(file.stat().st_mtime).isoformat() meta = {"date": mtime, "dataset": file.name, "source": "Upgraded from version 0.1"} traces = cls([], meta) for ev_list in data: traces.append(Trace(ev_list)) return traces elif isinstance(data, dict) and data.get("__class__", None) == "TraceSet": return cls.upgrade_json_data(data) else: raise Exception("unknown JSON file format: " + str(data)[0:60]) @classmethod def upgrade_json_data(cls, json_data: Dict) -> 'TraceSet': version = json_data["version"] if version.startswith("0.1."): # This JSON file is compatible with our code. # First, convert json_data dicts to Trace and TraceSet objects. traceset = TraceSet([], json_data["meta_data"]) for tr_data in json_data["traces"]: assert tr_data["__class__"] == "Trace" rand = tr_data.get("random_state", None) traceset.append(Trace(tr_data["events"], random_state=rand)) # Next, see if any little updates are needed. if version == TRACE_SET_VERSION: pass # nothing extra to do. elif version == "0.1.1": # Move given_event_chars into meta_data["action_chars"] # Note: traceset["version"] has already been updated to the latest. traceset.meta_data["actions_chars"] = json_data["given_event_chars"] else: # The JSON must be from a newer 0.1.x version, so give a warning. print(f"WARNING: reading {version} TraceSet using {TRACE_SET_VERSION} code.") print(f" Some data may be lost. Please upgrade this program.") return traceset raise Exception(f"upgrade of TraceSet v{version} to v{TRACE_SET_VERSION} not supported.") def to_pandas(self) -> pd.DataFrame: """Converts all the traces into a single Pandas DataFrame (one event/row). The first three columns are 'Trace' and 'Event' which give the number of the trace and the position of the event within that trace, and 'Action' which is the name of the action of the event. Each named input value is recorded in a separate column. For outputs, by default there are just 'Status' (int) and 'Error' (str) columns. """ return traces_to_pandas(self.traces) def arff_type(self, pandas_type: str) -> str: """Maps each Pandas data type to the closest ARFF type.""" if pd.api.types.is_integer_dtype(pandas_type): return "INTEGER" if pd.api.types.is_float_dtype(pandas_type): return "REAL" if pd.api.types.is_bool_dtype(pandas_type): return ["False", "True"] return "STRING" # TODO: check column to see if NOMINAL is better? # raise Exception(f"do not know how to translate Pandas type {pandas_type} to ARFF.") def save_to_arff(self, file: Path, name=None) -> None: """Save all the events in all traces into an ARFF file for machine learning. Args: filename: the name of the file to save into. Should end with '.arff'. name: optional relation name to identify this data inside the ARFF file. The default is the base name of 'file'. """ if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) if name is None: name = file.stem data = self.to_pandas() attributes = [(n, self.arff_type(t)) for (n, t) in zip(data.columns, data.dtypes)] with file.open("w") as output: contents = { "relation": safe_name(name), "attributes": attributes, "data": data.values, # [[tr] for tr in trace_summaries], "description": "Events from " + name } arff.dump(contents, output) class TraceEncoder(json.JSONEncoder): """Custom JSON encoder because objects from zeep could not be serialised. Based on ideas from this blog entry by 'The Fellow' (Ouma Rodgers): https://medium.com/python-pandemonium/json-the-python-way-91aac95d4041. This does not handle XML objects, as they should be decoded via xml_decode first. """ def default(self, obj): if isinstance(obj, (dict, list, tuple, str, int, float, bool)): return super().default(obj) # JSON already handles these if isinstance(obj, decimal.Decimal): return float(round(obj, 6)) # f"{o:.5f}" if isinstance(obj, (bytes, bytearray)): return "BYTES..." # TODO: handle these better: repr(o)? if isinstance(obj, (set, frozenset)): return list(obj) if isinstance(obj, (datetime.date, datetime.datetime, datetime.time)): return obj.isoformat() # as a string if hasattr(obj, "__dict__"): result = { "__class__": obj.__class__.__name__, "__module__": obj.__module__ } if len(obj.__dict__) == 1 and "__values__" in obj.__dict__: # zeep seems to hide the attributes in a __values__ dict. # We lift them up to the top level to make the json more readable. self._add_public_attributes(result, obj.__dict__["__values__"]) else: self._add_public_attributes(result, obj.__dict__) return result raise Exception("JSON serialisation not implemented yet for: " + str(obj) + " type " + str(type(obj)) + " dir:" + ",".join(dir(obj))) def _add_public_attributes(self, result, attrs) -> None: for (name, value) in attrs.items(): if not name.startswith("_"): result[name] = value def xml_decode(obj: ET.Element) -> Union[str, Mapping[str, any]]: """Custom XML encoder to decode XML into a Python dictionary suitable for JSON encoding. This roughly follows the ideas from: https://www.xml.com/pub/a/2006/05/31/converting-between-xml-and-json.html. For simple XML objects with no attributes and no children, this returns just the text string. For more complex XML objects, it returns a dictionary. Note that the top-level tag of 'obj' is assumed to be handled by the caller. That is, the caller will typically do ```d[tag] = xml_decode(obj)``` where xml_decode will return either a simple string, or a dictionary. """ if len(obj) == 0 and len(obj.attrib) == 0: return obj.text else: # return obj as a dictionary result = {} for (n, v) in obj.attrib.items(): result[n] = v # child objects are more tricky, since some tags may appear multiple times. # If a tag appears multiple times, we map it to a list of child objects. curr_tag = None curr_list = [] for child in obj: if child.tag != curr_tag: # save the child(ren) we have just finished if len(curr_list) > 0: result[curr_tag] = curr_list if len(curr_list) > 1 else curr_list[0] curr_list = [] curr_tag = child.tag curr_list.append(xml_decode(child)) if len(curr_list) > 0: result[curr_tag] = curr_list if len(curr_list) > 1 else curr_list[0] if obj.text and obj.text.strip(): # ignore text that is just whitespace result["text"] = obj.text return result def default_map_to_chars(actions: List[str], given: Mapping[str, str] = None) -> Mapping[str, str]: """Tries to guess a useful default mapping from action names to single characters. Args: actions: the names of all the actions. given: optional pre-allocation of a few action names to chars. You can use this to override the default behaviour. Returns: A map from every name in actions to a unique single character. """ names = sorted(actions) result = {} if given is None else given # TODO: a better algorithm might be to break up compound words and look for word prefixes? curr_prefix = "" pass2 = [] for i in range(len(names)): name = names[i] if name in result: continue # given # skip over any prefix that was in common with previous name. if name.startswith(curr_prefix): pos = len(curr_prefix) else: pos = 0 # check ahead for common prefixes first if i + 1 < len(names): nxt = names[i + 1] if nxt.startswith(name) and name[0] not in result.values(): result[name] = name[0] curr_prefix = name continue prefix = max([p for p in range(max(len(name), len(nxt))) if name[0:p] == nxt[0:p]]) # print(f" found prefix {prefix} of {name} and {nxt}") curr_prefix = name[0:prefix] else: prefix = 0 curr_prefix = "" if prefix > 0 and prefix > pos: pos = prefix done = False for j in range(pos, len(name)): if name[pos] not in result.values(): result[name] = name[pos] done = True break if not done: pass2.append(name) # Pass 2 (all visible ASCII chars except " and ') allchars = "".join([chr(n) for n in range(42, 127)]) + "!#$%&()" for name in pass2: for ch in name + allchars: if ch not in result.values(): result[name] = ch break # move onto next name in pass2 return result def all_action_names(traces: List[Trace]) -> Set[str]: """Collects all the action names that appear in the given traces.""" result = set() for tr in traces: for ev in tr.events: action = ev["action"] result.add(action) return result def event_status(event: Event) -> int: """Get the status result for the given event.""" return int(event["outputs"]["Status"]) def trace_to_string(trace: List[Event], to_char: Mapping[str, str], compress: List[str] = None, color_status: bool = False) -> str: """Converts a trace to a short summary string, one character per action. Args: trace: the sequence of JSON-like events, with an "action" field. to_char: maps each action name to a single character. This map must include every action name that appears in the traces. A suitable map can be constructed via TraceSet.get_event_chars(). compress: a list of Action names. Repeated events will be compressed if in this list. color_status: True means color the string red where status is non-zero. This uses ANSI escape sequences, so needs to be printed to a terminal. Returns: a summary string. """ compress_set = set() if compress is None else set(compress) chars = [] prev_action = None for ev in trace: action = ev["action"] if action == prev_action and action in compress_set: # NOTE: we color compressed output just based on the first event. pass else: if color_status and event_status(ev) != 0: chars.append("\033[91m") # start RED chars.append(to_char[action]) chars.append("\033[0m") # turn off color else: chars.append(to_char[action]) prev_action = action return "".join(chars) def traces_to_pandas(traces: List[Trace]) -> pd.DataFrame: """Collects all events into a single Pandas DataFrame. Columns include the trace number, the event number, the action name, each input parameter, the result status and error message. TODO: we could convert complex values to strings before sending to Pandas? """ rows = [] for tr_num in range(len(traces)): events = traces[tr_num].events for ev_num in range(len(events)): event = events[ev_num] row = {"Trace": tr_num, "Event": ev_num, "Action": event["action"]} # we add "Status" and "Error" first, so that those columns come before inputs. row["Status"] = event_status(event) row["Error"] = event["outputs"].get("Error", None) row.update(event["inputs"].items()) rows.append(row) return pd.DataFrame(rows) PK vPO2D:[?[?agilkia/random_tester.py""" Simple random test generator for SOAP web services. Author: Mark Utting, 2019 TODO: * provide a way of generating related inputs like (lat,long) together. """ import csv import requests import zeep import zeep.helpers import getpass import operator import random import numpy import unittest from pathlib import Path from pprint import pprint from typing import Tuple, List, Mapping from . json_traces import Trace, TraceSet # A signature of a method maps "input"/"output" to the dictionary of input/output names and types. Signature = Mapping[str, Mapping[str, str]] # TODO: make these user-configurable DUMP_WSDL = False # save each *.wsdl file into current directory. DUMP_SIGNATURES = False # save summary of methods into *_signatures.txt GOOD_PASSWORD = "" def read_input_rules(file: Path) -> Mapping[str, List[str]]: """Reads a CSV file of input values. The header line of the CSV file should contain headers: Name,Frequency,Value. (but the Frequency column is optional, and missing frequencies default to 1). For example if one line contains 'size,3,100' and another contains 'size,2,200', then the resulting input rules will define a 3/5 chance of size being 100, and a 2/5 chance of it being 200. """ input_rules = {} with open(file, "r") as input: for row in csv.DictReader(input): name = row["Name"] freq = row.get("Frequency", "") freq_int = int(freq) if freq else 1 value = row["Value"] value_list = input_rules.get(name, []) for i in range(freq_int): value_list.append(value) input_rules[name] = value_list # update it after appending new values print(input_rules) return input_rules def summary(value) -> str: """Returns a one-line summary of the given value.""" s = str(value).replace("\n", "").replace(" ", "") return s[:60] def uniq(d): """Returns the unique value of a dictionary, else an empty dictionary.""" result = {} for k, v in d.items(): if result == {} or result == v: result = v return result # temp hack - ITM 3 ports have slight differences. else: print(f"WARNING: uniq sees different values.\n" + " val1={result}\n val2={v}") return {} return result class TestUniq(unittest.TestCase): """Some unit tests of the uniq function.""" def test_normal(self): self.assertEqual("def", uniq({"abc": "def"})) # TODO: assert uniq({"abc":"one", "xyz":"two"}) == {} def test_duplicate_values(self): self.assertEquals("one", uniq({"abc": "one", "xyz": "one"})) def parse_elements(elements): """Helper function for build_interface.""" all_elements = {} for name, element in elements: all_elements[name] = {} all_elements[name]['optional'] = element.is_optional if hasattr(element.type, 'elements'): all_elements[name]['type'] = parse_elements(element.type.elements) else: all_elements[name]['type'] = str(element.type) return all_elements def build_interface(client: zeep.Client) -> Mapping[str, Mapping]: """Returns a nested dictionary structure for the methods of client. Typical usage to get a method called "Login" is: ```build_interface(client)[service][port]["operations"]["Login"]``` """ interface = {} for service in client.wsdl.services.values(): interface[service.name] = {} for port in service.ports.values(): interface[service.name][port.name] = {} operations = {} for operation in port.binding._operations.values(): operations[operation.name] = {} operations[operation.name]['input'] = {} elements = operation.input.body.type.elements operations[operation.name]['input'] = parse_elements(elements) interface[service.name][port.name]['operations'] = operations return interface def print_signatures(client: zeep.Client, out): """Print a short summary of each operation signature offered by client.""" # From: https://stackoverflow.com/questions/50089400/introspecting-a-wsdl-with-python-zeep for service in client.wsdl.services.values(): out.write(f"service: {service.name}\n") for port in service.ports.values(): out.write(f" port: {port.name}\n") operations = sorted( port.binding._operations.values(), key=operator.attrgetter('name')) for operation in operations: action = operation.name inputs = operation.input.signature() outputs = operation.output.signature() out.write(f" {action}({inputs}) --> ({outputs})\n") class RandomTester: """Does random testing of a given web service. Give it a URL to a web service (or a list of URLs if there are several web services), and it will read the WSDL specifications from those web services and generate any number of random test sequences to test the methods. For more sophisticated (user-directed) testing you can also: * supply a username and password if login credentials are needed. * supply the subset of method names that you want to focus on testing (default is all). * supply a set of default input values (or generation functions) for each data type. * supply a set of input values (or generation functions) for each named input parameter. """ def __init__(self, urls, methods_to_test=None, input_rules=None, rand=random.Random(), action_chars=None, verbose=False): """Creates a random tester for the server url and set of web services on that server. Args: urls (str or List[str]): URLs to the web services, used to find the WSDL files. methods_to_test (List[str]): only these methods will be tested (None means all). input_rules (Dict[str,List]): maps each input parameter name to a list of possible values, one of which will be chosen randomly. rand (random.Random): the random number generator used to generate tests. action_chars (Mapping[str, str]): optional action-to-character map, for visualisation. verbose (bool): True means print progress messages during test generation. """ self.urls = [urls] if isinstance(urls, str) else urls self.username = None self.password = None self.random = rand self.verbose = verbose self.clients_and_methods = [] # List[(zeep.Service, Dict[str, Signature)] self.methods_to_test = methods_to_test self.methods_allowed = [] if methods_to_test is None else methods_to_test # maps each parameter to list of possible 'values' self.named_input_rules = {} if input_rules is None else input_rules meta = TraceSet.get_default_meta_data() meta["source"] = "RandomTester" meta["web_services"] = self.urls meta["methods_to_test"] = methods_to_test meta["input_rules"] = input_rules meta["method_signatures"] = {} # see add_web_service meta["action_chars"] = action_chars new_trace = Trace([], self.random.getstate()) self.curr_events = new_trace.events # mutable list to append to. self.trace_set = TraceSet([], meta) self.trace_set.append(new_trace) for w in self.urls: self.add_web_service(w) def set_username(self, username, password=None): """Set the username and (optional) password to be used for the subsequent operations. If password is not supplied, this method will immediately interactively prompt for it. """ self.username = username self.trace_set.meta_data["username"] = username self.password = password or getpass.getpass(f"Please enter password for user {username}:") def add_web_service(self, url): """Add another web service using the given url.""" wsdl = url + ("" if url.upper().endswith("WSDL") else ".asmx?WSDL") name = url.split("/")[-1] print(" loading WSDL: ", wsdl) if DUMP_WSDL: # save the WSDL for reference r = requests.get(wsdl, allow_redirects=True) open(f"{name}.wsdl", 'wb').write(r.content) # now create the client interface for this web service client = zeep.Client(wsdl=wsdl) interface = build_interface(client) pprint([(k, len(v["operations"])) for k, v in uniq(interface).items()]) if DUMP_SIGNATURES: # save summary of this web service into a signatures file with open(f"{name}_signatures.txt", "w") as sig: print_signatures(client, sig) if not uniq(interface): print(f"WARNING: web service {name} has empty interface?") pprint(interface) else: ops = uniq(uniq(interface))["operations"] self.clients_and_methods.append((client, ops)) self.trace_set.meta_data["method_signatures"].update(ops) if self.methods_to_test is None: self.methods_allowed += list(ops.keys()) def _find_method(self, name) -> Tuple[zeep.Client, Mapping[str, Signature]]: """Find the given method in one of the web services and returns its signature.""" for (client, interface) in self.clients_and_methods: if name in interface: return client, interface[name] raise Exception(f"could not find {name} in any WSDL specifications.") def choose_input_value(self, arg_name: str) -> str: """Choose an appropriate value for the input argument called 'arg_name'. If no set of input rules is defined for 'arg_name', then 'generate_input_value' is called to generate a suitable input value. Subclasses can override this. Args: arg_name (str): the name of the input parameter. Returns: a string if successful, or None if no suitable value was found. """ values = self.named_input_rules.get(arg_name, None) if values is None: return self.generate_input_value(arg_name) val = self.random.choice(values) return val def generate_input_value(self, arg_name: str) -> any: """Can be overridden in subclasses to generate smart values for an input argument.""" print(f"ERROR: please define possible parameter values for input {arg_name}") return None def _insert_password(self, arg_value: str) -> str: if arg_value == GOOD_PASSWORD: return self.password else: return arg_value def get_methods(self) -> Mapping[str, Signature]: """Return the set of all method names in all the web services.""" methods = {} for (client, interface) in self.clients_and_methods: methods.update(interface) return methods def call_method(self, name, args=None): """Call the web service name(args) and add the result to trace. Args: name (str): the name of the method to call. args (dict): the input values for the method. If args=None, then this method uses 'choose_input_value' to choose appropriate values for each argument value of the method. Returns: Before the call, this method replaces some symbolic inputs by actual concrete values. For example the correct password token is replaced by the real password -- this avoids recording the real password in the inputs of the trace. Returns: all the data returned by the method. """ (client, signature) = self._find_method(name) inputs = signature["input"] if args is None: args = {n: self.choose_input_value(n) for n in inputs.keys()} if None in args.values(): print(f"skipping method {name}. Please define missing input values.") return None if self.verbose: print(f" call {name}{args}") # insert special secret argument values if requested args_list = [self._insert_password(arg) for (n, arg) in args.items()] out = getattr(client.service, name)(*args_list) # we call it 'action' so it gets printed before 'inputs' (alphabetical order). self.curr_events.append({"action": name, "inputs": args, "outputs": out}) if self.verbose: print(f" -> {summary(out)}") return out def generate_trace(self, start=True, length=20, methods=None) -> Trace: """Generates the requested length of test steps, choosing methods at random. Args: start (bool): True means that a new trace is started (unless current one is empty). length (int): The number of steps to generate (default=20). methods (List[str]): only these methods will be chosen (None means all are allowed) Returns: the whole of the current trace that has been generated so far. """ if start: if len(self.curr_events) > 0: new_trace = Trace([], self.random.getstate()) self.curr_events = new_trace.events # mutable list to append to. self.trace_set.append(new_trace) if methods is None: methods = self.methods_allowed for i in range(length): # TODO: continue while Status==0? self.call_method(self.random.choice(methods)) return self.trace_set.traces[-1] def setup_feature_data(self): """Must be called before the first call to get_trace_features.""" actions = self.methods_allowed nums = len(actions) self.action2number = dict(zip(actions, range(nums))) if self.verbose: print("Action 2 num:", self.action2number) def get_action_counts(self, trace) -> List[int]: """Returns an array of counts - how many times each event occurs in trace.""" result = [0 for k in self.action2number.keys()] for ev in trace: action_num = self.action2number[ev["action"]] result[action_num] += 1 return result def get_trace_features(self) -> List[int]: """Returns a vector of numeric features suitable for input to an ML model. The results returned by this function must match the training set of the ML model. Currently this returns an array of counts - how many times each event occurs in the whole current trace, and how many times in the most recent 8 events. """ prefix = self.get_action_counts(self.curr_events) suffix = self.get_action_counts(self.curr_events[-8:]) return prefix+suffix def generate_trace_ml(self, model, start=True, length=20): """Generates the requested length of test steps, choosing methods using the given model. Args: model (any): the ML model to use to generate the next event. This model must support the 'predict_proba' method. start (bool): True means that a new trace is started, beginning with a "Login" call. length (int): The number of steps to generate (default=20). Returns: the whole of the current trace that has been generated so far. """ self.setup_feature_data() # start a new (empty) trace if requested. self.generate_trace(start=start, length=0) for i in range(length): features = self.get_trace_features() [proba] = model.predict_proba([features]) [action_num] = numpy.random.choice(len(proba), p=proba, size=1) action = self.methods_allowed[action_num] if self.verbose: print(i, features, action, ",".join([f"{int(p*100)}" for p in proba])) self.call_method(action) return self.curr_events if __name__ == "__main__": unittest.main() PK@[POӓ#agilkia-0.2.2.dist-info/LICENSE.txtCopyright 2019 Mark Utting Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK!HPOagilkia-0.2.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!Hw1? agilkia-0.2.2.dist-info/METADATA}Vn7}W TZĉZ 5lq"yhڥ$JQag+۱3g ud_=IŹUD[-ŝi+Ivİs=گePy ]LӉV\+7өxcs)!NtV$y ;|&NWx_=x.ުܺƹ-62BQ-Lߦ˩6R 1GguDМuT.Q ʧʪDVNfЧdO*:]9EPE\-Jsmf}GcCj*dсm eۢQ JcѣAn?v(eI?ϟjc|ЕK]WQ4L_ qg5ԧVNl[ )U\*UrA?6Zc ;d Ϲ ms}jyiNjgo/^tv3D5(3گ t|rhƁ)٠9FN cD}XZ!k橙WSi5P0r`avLaq@oܼUqk"\~Xz%ϵM;,#".FH窱.t&G5 |81`u?|"W!|]YZI* ݞ2_ D_#E ͤfƻ /Mѧ/-ޱkGG^BHӢt|qqxs?xw8h}MzE2YI昻 mo}twtxI^h<#k2jW4Uyu{kզݻ4.4c-5f :_J2DȢD7'jF]3&dSBdj\]?_6.yr[ݪrtn<1o-^Fdf_*2b UՔs) f7aݨbݭ{>n>A_їྶ6PK!H,&dagilkia-0.2.2.dist-info/RECORDuv0}, e1TDE79$X@}ӍT;]N?`>"" b#G+Qhֆ|)Dqmj(0Tj1*q_ܝ MiF٬^4M=#O\/lD S~k̲"" '!M0&GΓ[lf =׾&PL[|A VX2^=YO>vaͰ._qLdc Zf7PK{vPOagilkia/__init__.pyPKZPO5YUYUagilkia/json_traces.pyPK vPO2D:[?[?\agilkia/random_tester.pyPK@[POӓ#agilkia-0.2.2.dist-info/LICENSE.txtPK!HPOnagilkia-0.2.2.dist-info/WHEELPK!Hw1? agilkia-0.2.2.dist-info/METADATAPK!H,&dagilkia-0.2.2.dist-info/RECORDPK