PK]SO߽agilkia/__init__.py"""Automated smart testing strategies for web services. This 'agilkia' package is for testing web services and managing set of traces. Traces may come from user interactions, or from automated test suites, etc. The main data structure for traces is the ``TraceSet``: * class TraceSet supports loading/saving traces as JSON, converting to Pandas, etc. * class Trace is used by TraceSet, and contains a list of Events. * Each Event is a dict that contains at least the following keys: * "action" gives the name of the action (a string); * "inputs" is a dict of input parameter names to values; * "outputs" is a dict of output parameter names to values. Automated test generation facilities include: * RandomTester generates random test sequences. * SmartTester generates tests from an ML model (Currently this is included in RandomTester.generate_trace_ml, but this will be split into a separate class shortly.) """ # This package follows a 'Convenience Store' model. # That is, it directly exports all the features that will be useful to users. # They do not need to import sub-modules. # # See the article: "What’s __init__ for me?" by Jacob Deppen on TowardsDataScience.com: # https://towardsdatascience.com/whats-init-for-me-d70a312da583 __version__ = '0.3.1' from . random_tester import (read_input_rules, RandomTester, uniq, build_interface, print_signatures, DUMP_WSDL, DUMP_SIGNATURES, GOOD_PASSWORD) from . json_traces import (Event, Trace, TraceSet, TraceEncoder, TRACE_SET_VERSION, xml_decode, all_action_names, safe_name, default_map_to_chars, trace_to_string, traces_to_pandas) PKSOrՈՈagilkia/json_traces.py# -*- coding: utf-8 -*- """ Data structures for Traces and Sets of Traces. This defines the 'Trace' and 'TraceSet' classes, plus helper functions. NOTE: private data fields (starting with '_') will not be stored in the JSON files. For example, each Trace object has a '_parent' point to its TraceSet, but this is not stored in the JSON file, since the hierarchical structure of the JSON already captures the parent-child relationship between TraceSet and Trace. NOTE: file version upgrade policy: JSON trace file version numbers follow the usual Semantic Versioning scheme: (Major.Minor.Patch). TraceSet.upgrade_json_data' currently just prints a warning message when a program running older code reads a JSON file with a newer MINOR version number. This allows graceful updating of one program at a time, but does have the danger that a older program may read newer data (with a warning), then SAVE that data in the slightly older format, thus losing some data. But a strict version-equality means that all programs have to be updated simultaneously, which is a pain. TODO: * DONE: add save_as_arff() method like to_pandas. * DONE: store event_chars into meta_data. * DONE: store signatures into meta_data. * DONE: create Event class and make it dict-like. * DONE: add support for splitting traces into 'sessions' via splitting or grouping. * DONE: add support for clustering traces * DONE: add support for visualising the clusters (TSNE). * add unit tests for clustering... * read/restore TraceSet.clusters field? Or move into meta-data? * split RandomTester into SmartTester subclass (better meta-data). * add 'meta_data' to Trace and Event objects too (replace properties) * add ActionChars class? * extend to_pandas() to allow user-defined columns to be added. @author: utting@usc.edu.au """ import os import sys from pathlib import Path # object-oriented filenames! from collections import defaultdict import json import decimal import datetime import re import xml.etree.ElementTree as ET import pandas as pd # type: ignore import sklearn.cluster # type: ignore import sklearn.preprocessing # type: ignore import matplotlib.pyplot as plt import matplotlib.cm as pltcm from sklearn.manifold import TSNE # liac-arff from https://pypi.org/project/liac-arff (via pip) # import arff # type: ignore from typing import List, Set, Mapping, Dict, Union, Any, Optional, cast TRACE_SET_VERSION = "0.1.3" MetaData = Optional[Dict[str, Any]] def safe_name(string: str) -> str: """Returns 'string' with all non-alpha-numeric characters replaced by '_'.""" return re.sub("[^A-Za-z0-9]", "_", string) class Event: """An Event is a dictionary-like object that records all the details of an event. This includes at least: * self.action (str): the full action name. * self.inputs (Dict[str,Any]): the named inputs and their values. * self.outputs (Dict[str,Any]): the named outputs and their values. * self.properties (Dict[str,Any]): any extra properties such as "timestamp". Note: if self.properties["timestamp"] is present, it should be in ISO 8601 format. """ def __init__(self, action: str, inputs: Dict[str, Any], outputs: Dict[str, Any], properties: Optional[Dict[str, Any]] = None): self.action = action self.inputs = inputs self.outputs = outputs self.properties = {} if properties is None else properties @property def status(self) -> int: """Read-only status of the operation, where 0 means success. If no output 'Status' is available, this method always returns 0. """ return int(self.outputs.get("Status", "0")) @property def error_message(self) -> str: """Read-only error message output by this operation. If no output['Error'] field is available, this method always returns "". """ return self.outputs.get("Error", "") class Trace: """Represents a single trace, which contains a sequence of events. """ def __init__(self, events: List[Event], parent: 'TraceSet' = None, random_state=None): """Create a Trace object from a list of events. Args: events: the sequence of Events that make up this trace. parent: the TraceSet that this trace is part of. random_state: If this trace was generated using some randomness, you should supply this optional parameter, to record the state of the random generator at the start of the sequence. For example, rand_state=rand.getstate(). """ if events and not isinstance(events[0], Event): raise Exception("Events required, not: " + str(events[0]) + " ...") self.events = events self._parent = parent self.random_state = random_state def trace_set(self): """Returns the TraceSet that this trace is part of, or None if not known.""" return self._parent def __iter__(self): return self.events.__iter__() def __len__(self): return len(self.events) def __getitem__(self, key): return self.events[key] def append(self, event: Event): if not isinstance(event, Event): raise Exception("Event required, not: " + str(event)) self.events.append(event) def action_counts(self) -> Dict[str, int]: """Returns a dictionary of how many times each action occurs in this trace. Returns: A dictionary of counts that can be used for clustering traces. """ result = defaultdict(int) for ev in self.events: result[ev.action] += 1 return result def action_status_counts(self) -> Dict[str, int]: """Counts how many times each action-status pair occurs in this trace. Returns: A dictionary of counts that can be used for clustering traces. """ result = defaultdict(int) for ev in self.events: key = ev.action + "_" + str(ev.status) result[key] += 1 return result def to_string(self, to_char: Dict[str, str] = None, compress: List[str] = None, color_status: bool = False): """Return a one-line summary of this trace, one character per event. See 'trace_to_string' for details. NOTE: throws an exception if no to_char map is given and this trace has no parent. """ if to_char is None: if self._parent is None: raise Exception("Cannot view trace with no parent and no to_char map.") to_char = self._parent.get_event_chars() return trace_to_string(self.events, to_char, compress=compress, color_status=color_status) def __str__(self): try: return self.to_string() except Exception: return "..." class TraceSet: """Represents a set of traces, either generated or recorded. Invariants: * forall tr:self.traces (tr._parent is self) (TODO: set _parent to None when a trace is removed?) * self.meta_data is a dict with keys: date, source at least. """ meta_data: Dict[str, Any] _event_chars: Optional[Dict[str, str]] def __init__(self, traces: List[Trace], meta_data: Dict[str, Any] = None): """Create a TraceSet object from a list of Traces. Args: traces: the list of Traces. meta_data: a dictionary that captures the important meta-data for this set of traces. If this TraceSet is to be saved into a file, the meta-data should include at least the GDF (General Data Format) compulsory fields, which are based on the Dublin Core: "date" in ISO 8601 format; "dataset" for the official name of this TraceSet; "source" for the origin of the dataset; and any other meta-data that is available. """ self.version = TRACE_SET_VERSION if meta_data is None: self.meta_data = self.get_default_meta_data() else: self.meta_data = meta_data.copy() self.traces = traces self.clusters: List[int] = None self._cluster_data: pd.DataFrame = None for tr in self.traces: if isinstance(tr, Trace): tr._parent = self else: raise Exception("TraceSet expects List[Trace], not: " + str(type(tr))) self._event_chars = None # recalculated if set of traces grows. def __iter__(self): return self.traces.__iter__() def __len__(self): return len(self.traces) def __getitem__(self, key): return self.traces[key] def message(self, msg: str): """Print a progress message.""" print(" ", msg) @classmethod def get_default_meta_data(cls) -> Dict[str, Any]: """Generates some basic meta-data such as date, user and command line.""" now = datetime.datetime.now().isoformat() user = os.path.expanduser('~').split('/')[-1] # usually correct, but can be tricked. meta_data: Dict[str, Any] = { "date": now, "author": user, "dataset": "unknown", "action_chars": None } if len(sys.argv) > 0: meta_data["source"] = sys.argv[0] # the path to the running script/tool. meta_data["cmdline"] = sys.argv return meta_data def get_meta(self, key: str) -> Optional[MetaData]: """Returns requested meta data, or None if that key does not exist.""" if key in self.meta_data: return self.meta_data[key] else: return None def append(self, trace: Trace): """Appends the given trace into this set. This also sets its parent to be this set. """ if not isinstance(trace, Trace): raise Exception("Trace required, not: " + str(trace)) trace._parent = self self.traces.append(trace) self._event_chars = None # we will recalculate this later def set_event_chars(self, given: Mapping[str, str] = None): """Sets up the event-to-char map that is used to visualise traces. This will calculate a default mapping for any actions that are not in given. (See 'default_map_to_chars'). Args: given: optional pre-allocation of a few action names to chars. For good readability of the printed traces, it is recommended that extremely common actions should be mapped to 'small' characters like '.' or ','. """ if given is None: new_given = self.get_meta("action_chars") else: self.meta_data["action_chars"] = given # override any previous given map. new_given = cast(Dict[str, str], given).copy() # copy so we don't change orginal. actions = all_action_names(self.traces) self._event_chars = default_map_to_chars(actions, given=new_given) def get_event_chars(self): """Gets the event-to-char map that is used to visualise traces. This maps each action name to a single character. If set_event_chars has not been called, this getter will calculate and cache a default mapping from action names to characters. """ if self._event_chars is None: self.set_event_chars() return self._event_chars def __str__(self): name = self.meta_data["dataset"] # required meta data return f"TraceSet '{name}' with {len(self)} traces." def save_to_json(self, file: Path) -> None: if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) with file.open("w") as output: json.dump(self, output, indent=2, cls=TraceEncoder) @classmethod def load_from_json(cls, file: Path) -> 'TraceSet': if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) if not isinstance(file, Path): raise Exception(f"load_from_json requires Path, not {file} (type={type(file)})") # with open(filename, "r") as input: data = json.loads(file.read_text()) # Now check version and upgrade if necessary. if isinstance(data, list): # this file was pre-TraceSet, so just a list of lists of events. mtime = datetime.datetime.fromtimestamp(file.stat().st_mtime).isoformat() meta = {"date": mtime, "dataset": file.name, "source": "Upgraded from version 0.1"} traces = cls([], meta) for ev_list in data: events = [cls._create_event_objects("0.1", ev) for ev in ev_list] traces.append(Trace(events)) return traces elif isinstance(data, dict) and data.get("__class__", None) == "TraceSet": return cls.upgrade_json_data(data) else: raise Exception("unknown JSON file format: " + str(data)[0:60]) @classmethod def upgrade_json_data(cls, json_data: Dict) -> 'TraceSet': version = json_data["version"] if version.startswith("0.1."): # This JSON file is compatible with our code. # First, convert json_data dicts to Trace and TraceSet objects. traceset = TraceSet([], json_data["meta_data"]) for tr_data in json_data["traces"]: assert tr_data["__class__"] == "Trace" rand = tr_data.get("random_state", None) events = [cls._create_event_objects(version, ev) for ev in tr_data["events"]] traceset.append(Trace(events, random_state=rand)) # Next, see if any little updates are needed. if version == TRACE_SET_VERSION or version == "0.1.2": pass # nothing more to do. elif version == "0.1.1": # Move given_event_chars into meta_data["action_chars"] # Note: traceset["version"] has already been updated to the latest. traceset.meta_data["actions_chars"] = json_data["given_event_chars"] else: # The JSON must be from a newer 0.1.x version, so give a warning. print(f"WARNING: reading {version} TraceSet using {TRACE_SET_VERSION} code.") print(f" Some data may be lost. Please upgrade this program.") return traceset raise Exception(f"upgrade of TraceSet v{version} to v{TRACE_SET_VERSION} not supported.") @classmethod def _create_event_objects(cls, version: str, ev: Dict[str, Any]) -> Event: special = ["action", "inputs", "outputs"] action = ev["action"] inputs = ev["inputs"] outputs = ev["outputs"] if version <= "0.1.2": props = {key: ev[key] for key in ev if key not in special} else: props = ev["properties"] return Event(action, inputs, outputs, props) def to_pandas(self) -> pd.DataFrame: """Converts all the traces into a single Pandas DataFrame (one event/row). The first three columns are 'Trace' and 'Event' which give the number of the trace and the position of the event within that trace, and 'Action' which is the name of the action of the event. Each named input value is recorded in a separate column. For outputs, by default there are just 'Status' (int) and 'Error' (str) columns. """ return traces_to_pandas(self.traces) def arff_type(self, pandas_type: str) -> Union[str, List[str]]: """Maps each Pandas data type to the closest ARFF type.""" if pd.api.types.is_integer_dtype(pandas_type): return "INTEGER" if pd.api.types.is_float_dtype(pandas_type): return "REAL" if pd.api.types.is_bool_dtype(pandas_type): return ["False", "True"] return "STRING" # TODO: check column to see if NOMINAL is better? # raise Exception(f"do not know how to translate Pandas type {pandas_type} to ARFF.") def save_to_arff(self, file: Path, name=None) -> None: """Save all the events in all traces into an ARFF file for machine learning. Args: filename: the name of the file to save into. Should end with '.arff'. name: optional relation name to identify this data inside the ARFF file. The default is the base name of 'file'. """ if isinstance(file, str): print(f"WARNING: converting {file} to Path. Please learn to speak pathlib.") file = Path(file) if name is None: name = file.stem data = self.to_pandas() attributes = [(n, self.arff_type(t)) for (n, t) in zip(data.columns, data.dtypes)] try: import arff except ImportError: print("Please install ARFF support before using save_to_arff.") print("It is a pip only package: pip install liac-arff") return with file.open("w") as output: contents = { "relation": safe_name(name), "attributes": attributes, "data": data.values, # [[tr] for tr in trace_summaries], "description": "Events from " + name } arff.dump(contents, output) def with_traces_split(self, start_action: str = None, input_name: str = None, comparator=None) -> 'TraceSet': """Returns a new TraceSet with each trace in this set split into shorter traces. It accepts several split criteria, and will start a new trace whenever any of those criteria are true. At least one criteria must be supplied. Args: start_action: the name of an action that starts a new trace. input_name: the name of an input. Whenever the value of this input changes, then a new trace should be started. Note that events with this input missing are ignored for this splitting criteria. TODO: comparator: a function that takes two events and returns True iff the second event should start a new trace. (Not implemented yet) TODO: add an end_action criteria? Returns: a new TraceSet, usually with more traces and shorter traces. """ if start_action is None and input_name is None: raise Exception("split_traces requires at least one split criteria.") traces2 = TraceSet([], self.meta_data) # TODO: update meta data with split info? for old in self.traces: curr_trace = Trace([]) traces2.append(curr_trace) prev_input = None for event in old: input_value = event.inputs.get(input_name, None) input_changed = input_value != prev_input and input_value is not None if (event.action == start_action or input_changed) and len(curr_trace) > 0: curr_trace = Trace([]) traces2.append(curr_trace) curr_trace.append(event) if input_value is not None: prev_input = input_value # NOTE: we could check end_action here. return traces2 def with_traces_grouped_by(self, name: str, property: bool = False) -> 'TraceSet': """Returns a new TraceSet with each trace grouped into shorter traces. It generates a new trace for each distinct value of the given input or property name. Args: name: the name of an input. A new trace is started for each value of this input (or property). Note that events with this value missing are totally discarded. property: True means group by the property called name, rather than an input. Returns: a new TraceSet, usually with more traces and shorter traces. """ # TODO: update meta data with split info? traces2 = TraceSet([], self.meta_data) for old in self.traces: groups = defaultdict(list) # for each value this stores a list of Events. for event in old: if property: value = event.properties.get(name, None) else: value = event.inputs.get(name, None) if value is not None: groups[value].append(event) for event_list in groups.values(): traces2.append(Trace(event_list)) return traces2 def get_trace_data(self, method: str = "action_counts") -> pd.DataFrame: """Returns a Pandas table of statistics/data about each trace. This can gather data using any of the zero-parameter data-gathering methods of the Trace class that returns a Dict[str, number] for some kind of number. The default is the ``action_counts()`` method, which corresponds to the `bag-of-words' algorithm. Note: you can add more data-gathering methods by defining a subclass of Trace and using that subclass when you create Trace objects. Returns: A table of data that can be used for clustering or machine learning. """ trace_data = [getattr(tr, method)() for tr in self.traces] data = pd.DataFrame(trace_data) data.fillna(value=0, inplace=True) return data def create_clusters(self, data: pd.DataFrame, algorithm=None, normalize: bool = True) -> int: """Runs a clustering algorithm on the given data and remembers the clusters. Args: data: a Pandas DataFrame, typically from get_trace_data(). algorithm: a clustering algorithm (default is MeanShift()). normalize: False uses data unchanged, True uses a normalized copy of data, using the sklearn.preprocessing.RobustScaler class, because it is more robust in the presence of outlier values, and our trace data often has very large outliers. See the study: https://schlieplab.org/Static/Publications/2008-IEEENeuralNets-ComparingNormalizations.pdf Returns: The number of clusters generated.""" if algorithm is None: algorithm = sklearn.cluster.MeanShift() if normalize: # transformer = sklearn.preprocessing.RobustScaler().fit(data) transformer = sklearn.preprocessing.MinMaxScaler().fit(data) self._cluster_data = pd.DataFrame(transformer.transform(data), columns=data.columns) else: self._cluster_data = data alg_name = str(algorithm).split("(")[0] self.message(f"running {alg_name} on {len(data)} traces.") algorithm.fit(self._cluster_data) self.clusters = algorithm.labels_ return max(self.clusters) + 1 def visualize_clusters(self): """Visualize the clusters from create_clusters() using TSNE.""" data = self._cluster_data if data is None or self.clusters is None: raise Exception("You must call create_clusters() before visualizing them!") self.message("running TSNE...") model = TSNE() tsne_obj = model.fit_transform(data) print(tsne_obj[0:5]) # All the following complex stuff is for adding a 'show label on mouse over' feature # to the TSNE display. It works when run from command line, but not in Jupyter/Spyder! # Surely there must be an easier way than doing all this... # Code adapted from: # https://stackoverflow.com/questions/55891285/how-to-make-labels-appear- # when-hovering-over-a-point-in-multiple-axis/55892690#55892690 fig, ax = plt.subplots() # Choose a colormap. See bottom of the matplotlib page: # https://matplotlib.org/3.1.0/tutorials/colors/colormaps.html colors = pltcm.get_cmap('hsv') print("colors=", colors) sc = plt.scatter(tsne_obj[:, 0], tsne_obj[:, 1], c=self.clusters, cmap=colors) names = [str(tr) for tr in self.traces] # these are in same order as tsne_df rows. annot = ax.annotate("", xy=(0, 0), xytext=(20, 20), textcoords="offset points", bbox=dict(boxstyle="round", fc="w"), arrowprops=dict(arrowstyle="->"), ) annot.set_visible(False) def update_annot(ind): pos = sc.get_offsets()[ind["ind"][0]] annot.xy = pos # text = "{}, {}".format(" ".join(list(map(str, ind["ind"]))), # " ".join([str(names[n]) for n in ind["ind"]])) text = "\n".join([f"{n}: {str(names[n])}" for n in ind["ind"]]) annot.set_text(text) # annot.get_bbox_patch().set_facecolor(cmap(norm(c[ind["ind"][0]]))) # annot.get_bbox_patch().set_alpha(0.4) def hover(event): vis = annot.get_visible() if event.inaxes == ax: cont, ind = sc.contains(event) if cont: update_annot(ind) annot.set_visible(True) fig.canvas.draw_idle() else: if vis: annot.set_visible(False) fig.canvas.draw_idle() fig.canvas.mpl_connect("motion_notify_event", hover) plt.show() def get_cluster(self, num: int) -> List[Trace]: """Gets a list of all the Trace objects that are in the given cluster.""" if self.clusters is None: raise Exception("You must call create_clusters() before get_cluster(_)") if len(self.clusters) != len(self.traces): raise Exception("Traces have changed, so you must call create_clusters() again.") return [tr for (i, tr) in zip(self.clusters, self.traces) if i == num] class TraceEncoder(json.JSONEncoder): """Custom JSON encoder because objects from zeep could not be serialised. Based on ideas from this blog entry by 'The Fellow' (Ouma Rodgers): https://medium.com/python-pandemonium/json-the-python-way-91aac95d4041. This does not handle XML objects, as they should be decoded via xml_decode first. """ def default(self, obj): if isinstance(obj, (dict, list, tuple, str, int, float, bool)): return super().default(obj) # JSON already handles these if isinstance(obj, decimal.Decimal): return float(round(obj, 6)) # f"{o:.5f}" if isinstance(obj, (bytes, bytearray)): return "BYTES..." # TODO: handle these better: repr(o)? if isinstance(obj, (set, frozenset)): return list(obj) if isinstance(obj, (datetime.date, datetime.datetime, datetime.time)): return obj.isoformat() # as a string if hasattr(obj, "__dict__"): result = { "__class__": obj.__class__.__name__, "__module__": obj.__module__ } if len(obj.__dict__) == 1 and "__values__" in obj.__dict__: # zeep seems to hide the attributes in a __values__ dict. # We lift them up to the top level to make the json more readable. self._add_public_attributes(result, obj.__dict__["__values__"]) else: self._add_public_attributes(result, obj.__dict__) return result raise Exception("JSON serialisation not implemented yet for: " + str(obj) + " type " + str(type(obj)) + " dir:" + ",".join(dir(obj))) def _add_public_attributes(self, result, attrs) -> None: for (name, value) in attrs.items(): if not name.startswith("_"): result[name] = value def xml_decode(obj: ET.Element) -> Union[str, Dict[str, Any]]: """Custom XML encoder to decode XML into a Python dictionary suitable for JSON encoding. This roughly follows the ideas from: https://www.xml.com/pub/a/2006/05/31/converting-between-xml-and-json.html. For simple XML objects with no attributes and no children, this returns just the text string. For more complex XML objects, it returns a dictionary. Note that the top-level tag of 'obj' is assumed to be handled by the caller. That is, the caller will typically do ```d[tag] = xml_decode(obj)``` where xml_decode will return either a simple string, or a dictionary. """ if len(obj) == 0 and len(obj.attrib) == 0: return cast(str, obj.text) else: # return obj as a dictionary result: Dict[str, Any] = {} for (n, v) in obj.attrib.items(): result[n] = v # child objects are more tricky, since some tags may appear multiple times. # If a tag appears multiple times, we map it to a list of child objects. curr_tag = "" curr_list: List[Union[str, Dict[str, Any]]] = [] for child in obj: if child.tag != curr_tag: # save the child(ren) we have just finished if len(curr_list) > 0: result[curr_tag] = curr_list if len(curr_list) > 1 else curr_list[0] curr_list = [] curr_tag = child.tag curr_list.append(xml_decode(child)) if len(curr_list) > 0: result[curr_tag] = curr_list if len(curr_list) > 1 else curr_list[0] if obj.text and obj.text.strip(): # ignore text that is just whitespace result["text"] = obj.text return result def default_map_to_chars(actions: Set[str], given: Dict[str, str] = None) -> Dict[str, str]: """Tries to guess a useful default mapping from action names to single characters. Args: actions: the names of all the actions. given: optional pre-allocation of a few action names to chars. You can use this to override the default behaviour. Returns: A map from every name in actions to a unique single character. """ names: List[str] = sorted(list(actions)) result: Dict[str, str] = {} if given is None else given.copy() # TODO: a better algorithm might be to break up compound words and look for word prefixes? curr_prefix = "" pass2 = [] for i in range(len(names)): name = names[i] if name in result: continue # given # skip over any prefix that was in common with previous name. if name.startswith(curr_prefix): pos = len(curr_prefix) else: pos = 0 # check ahead for common prefixes first if i + 1 < len(names): nxt = names[i + 1] if nxt.startswith(name) and name[0] not in result.values(): result[name] = name[0] curr_prefix = name continue prefix = max([p for p in range(max(len(name), len(nxt))) if name[0:p] == nxt[0:p]]) # print(f" found prefix {prefix} of {name} and {nxt}") curr_prefix = name[0:prefix] else: prefix = 0 curr_prefix = "" if prefix > 0 and prefix > pos: pos = prefix done = False for j in range(pos, len(name)): if name[pos] not in result.values(): result[name] = name[pos] done = True break if not done: pass2.append(name) # Pass 2 (all visible ASCII chars except " and ') allchars = "".join([chr(n) for n in range(42, 127)]) + "!#$%&()" for name in pass2: for ch in name + allchars: if ch not in result.values(): result[name] = ch break # move onto next name in pass2 return result def all_action_names(traces: List[Trace]) -> Set[str]: """Collects all the action names that appear in the given traces.""" result = set() for tr in traces: for ev in tr.events: action = ev.action result.add(action) return result def trace_to_string(trace: List[Event], to_char: Mapping[str, str], compress: List[str] = None, color_status: bool = False) -> str: """Converts a trace to a short summary string, one character per action. Args: trace: the sequence of JSON-like events, with an "action" field. to_char: maps each action name to a single character. This map must include every action name that appears in the traces. A suitable map can be constructed via TraceSet.get_event_chars(). compress: a list of Action names. Repeated events will be compressed if in this list. color_status: True means color the string red where status is non-zero. This uses ANSI escape sequences, so needs to be printed to a terminal. Returns: a summary string. """ compress_set = set() if compress is None else set(compress) chars = [] prev_action = None for ev in trace: action = ev.action if action == prev_action and action in compress_set: # NOTE: we color compressed output just based on the first event. pass else: if color_status and ev.status != 0: chars.append("\033[91m") # start RED chars.append(to_char[action]) chars.append("\033[0m") # turn off color else: chars.append(to_char[action]) prev_action = action return "".join(chars) def traces_to_pandas(traces: List[Trace]) -> pd.DataFrame: """Collects all events into a single Pandas DataFrame. Columns include the trace number, the event number, the action name, each input parameter, the result status and error message. TODO: we could convert complex values to strings before sending to Pandas? """ rows = [] for tr_num in range(len(traces)): events = traces[tr_num].events for ev_num in range(len(events)): event = events[ev_num] row = {"Trace": tr_num, "Event": ev_num, "Action": event.action} # we add "Status" and "Error" first, so that those columns come before inputs. row["Status"] = event.status row["Error"] = event.error_message row.update(event.inputs.items()) rows.append(row) return pd.DataFrame(rows) PKsQOEEůAAagilkia/random_tester.py""" Simple random test generator for SOAP web services. Author: Mark Utting, 2019 TODO: * provide a way of generating related inputs like (lat,long) together. """ import csv import requests import zeep # type: ignore import zeep.helpers # type: ignore import getpass import operator import random import numpy # type: ignore import unittest from pathlib import Path from pprint import pprint from typing import Tuple, List, Mapping, Dict, Any, Optional, Union from . json_traces import Event, Trace, TraceSet # A signature of a method maps "input"/"output" to the dictionary of input/output names and types. Signature = Mapping[str, Mapping[str, str]] InputRules = Dict[str, List[str]] # TODO: make these user-configurable DUMP_WSDL = False # save each *.wsdl file into current directory. DUMP_SIGNATURES = False # save summary of methods into *_signatures.txt GOOD_PASSWORD = "" def read_input_rules(file: Path) -> InputRules: """Reads a CSV file of input values. The header line of the CSV file should contain headers: Name,Frequency,Value. (but the Frequency column is optional, and missing frequencies default to 1). For example if one line contains 'size,3,100' and another contains 'size,2,200', then the resulting input rules will define a 3/5 chance of size being 100, and a 2/5 chance of it being 200. """ input_rules: InputRules = {} with open(file, "r") as input: for row in csv.DictReader(input): name = row["Name"] freq = row.get("Frequency", "") freq_int = int(freq) if freq else 1 value = row["Value"] value_list = input_rules.get(name, []) for i in range(freq_int): value_list.append(value) input_rules[name] = value_list # update it after appending new values print(input_rules) return input_rules def summary(value) -> str: """Returns a one-line summary of the given value.""" s = str(value).replace("\n", "").replace(" ", "") return s[:60] def uniq(d): """Returns the unique value of a dictionary, else an empty dictionary.""" result = {} for k, v in d.items(): if result == {} or result == v: result = v return result # temp hack - ITM 3 ports have slight differences. else: print(f"WARNING: uniq sees different values.\n" + " val1={result}\n val2={v}") return {} return result class TestUniq(unittest.TestCase): """Some unit tests of the uniq function.""" def test_normal(self): self.assertEqual("def", uniq({"abc": "def"})) # TODO: assert uniq({"abc":"one", "xyz":"two"}) == {} def test_duplicate_values(self): self.assertEquals("one", uniq({"abc": "one", "xyz": "one"})) def parse_elements(elements): """Helper function for build_interface.""" all_elements = {} for name, element in elements: all_elements[name] = {} all_elements[name]['optional'] = element.is_optional if hasattr(element.type, 'elements'): all_elements[name]['type'] = parse_elements(element.type.elements) else: all_elements[name]['type'] = str(element.type) return all_elements def build_interface(client: zeep.Client) -> Dict[str, Dict[str, Any]]: """Returns a nested dictionary structure for the methods of client. Typical usage to get a method called "Login" is: ```build_interface(client)[service][port]["operations"]["Login"]``` """ interface: Dict[str, Dict[str, Any]] = {} for service in client.wsdl.services.values(): interface[service.name] = {} for port in service.ports.values(): interface[service.name][port.name] = {} operations: Dict[str, Any] = {} for operation in port.binding._operations.values(): operations[operation.name] = {} operations[operation.name]['input'] = {} elements = operation.input.body.type.elements operations[operation.name]['input'] = parse_elements(elements) interface[service.name][port.name]['operations'] = operations return interface def print_signatures(client: zeep.Client, out): """Print a short summary of each operation signature offered by client.""" # From: https://stackoverflow.com/questions/50089400/introspecting-a-wsdl-with-python-zeep for service in client.wsdl.services.values(): out.write(f"service: {service.name}\n") for port in service.ports.values(): out.write(f" port: {port.name}\n") operations = sorted( port.binding._operations.values(), key=operator.attrgetter('name')) for operation in operations: action = operation.name inputs = operation.input.signature() outputs = operation.output.signature() out.write(f" {action}({inputs}) --> ({outputs})\n") class RandomTester: """Does random testing of a given web service. Give it a URL to a web service (or a list of URLs if there are several web services), and it will read the WSDL specifications from those web services and generate any number of random test sequences to test the methods. For more sophisticated (user-directed) testing you can also: * supply a username and password if login credentials are needed. * supply the subset of method names that you want to focus on testing (default is all). * supply a set of default input values (or generation functions) for each data type. * supply a set of input values (or generation functions) for each named input parameter. """ def __init__(self, urls: Union[str, List[str]], methods_to_test: List[str] = None, input_rules: Dict[str, List] = None, rand: random.Random = None, action_chars: Mapping[str, str] = None, verbose: bool = False): """Creates a random tester for the server url and set of web services on that server. Args: urls (str or List[str]): URLs to the web services, used to find the WSDL files. methods_to_test (List[str]): only these methods will be tested (None means all). input_rules (Dict[str,List]): maps each input parameter name to a list of possible values, one of which will be chosen randomly. rand (random.Random): the random number generator used to generate tests. action_chars (Mapping[str, str]): optional action-to-character map, for visualisation. verbose (bool): True means print progress messages during test generation. """ self.urls = [urls] if isinstance(urls, str) else urls self.username: Optional[str] = None self.password: Optional[str] = None self.random = random.Random() if rand is None else rand self.verbose = verbose self.clients_and_methods: List[Tuple[zeep.Service, Dict[str, Signature]]] = [] self.methods_to_test = methods_to_test self.methods_allowed = [] if methods_to_test is None else methods_to_test # maps each parameter to list of possible 'values' self.named_input_rules = {} if input_rules is None else input_rules meta = TraceSet.get_default_meta_data() meta["source"] = "RandomTester" meta["web_services"] = self.urls meta["methods_to_test"] = methods_to_test meta["input_rules"] = input_rules meta["method_signatures"] = {} # see add_web_service meta["action_chars"] = action_chars new_trace = Trace([], random_state=self.random.getstate()) self.curr_events = new_trace.events # mutable list to append to. self.trace_set = TraceSet([], meta) self.trace_set.append(new_trace) for w in self.urls: self.add_web_service(w) def set_username(self, username: str, password: str = None): """Set the username and (optional) password to be used for the subsequent operations. If password is not supplied, this method will immediately interactively prompt for it. """ self.username = username self.trace_set.meta_data["username"] = username self.password = password or getpass.getpass(f"Please enter password for user {username}:") def add_web_service(self, url: str): """Add another web service using the given url.""" wsdl = url + ("" if url.upper().endswith("WSDL") else ".asmx?WSDL") name = url.split("/")[-1] print(" loading WSDL: ", wsdl) if DUMP_WSDL: # save the WSDL for reference r = requests.get(wsdl, allow_redirects=True) open(f"{name}.wsdl", 'wb').write(r.content) # now create the client interface for this web service client = zeep.Client(wsdl=wsdl) interface = build_interface(client) pprint([(k, len(v["operations"])) for k, v in uniq(interface).items()]) if DUMP_SIGNATURES: # save summary of this web service into a signatures file with open(f"{name}_signatures.txt", "w") as sig: print_signatures(client, sig) if not uniq(interface): print(f"WARNING: web service {name} has empty interface?") pprint(interface) else: ops = uniq(uniq(interface))["operations"] self.clients_and_methods.append((client, ops)) self.trace_set.meta_data["method_signatures"].update(ops) if self.methods_to_test is None: self.methods_allowed += list(ops.keys()) def _find_method(self, name: str) -> Tuple[zeep.Client, Signature]: """Find the given method in one of the web services and returns its signature.""" for (client, interface) in self.clients_and_methods: if name in interface: return client, interface[name] raise Exception(f"could not find {name} in any WSDL specifications.") def choose_input_value(self, arg_name: str) -> str: """Choose an appropriate value for the input argument called 'arg_name'. If no set of input rules is defined for 'arg_name', then 'generate_input_value' is called to generate a suitable input value. Subclasses can override this. Args: arg_name (str): the name of the input parameter. Returns: a string if successful, or None if no suitable value was found. """ values = self.named_input_rules.get(arg_name, None) if values is None: return self.generate_input_value(arg_name) val = self.random.choice(values) return val def generate_input_value(self, arg_name: str) -> Any: """Can be overridden in subclasses to generate smart values for an input argument.""" print(f"ERROR: please define possible parameter values for input {arg_name}") return None def _insert_password(self, arg_value: str) -> str: if arg_value == GOOD_PASSWORD: if self.password is None: raise Exception("Please call set_username before using " + GOOD_PASSWORD) return self.password else: return arg_value def get_methods(self) -> Mapping[str, Signature]: """Return the set of all method names in all the web services.""" methods = {} for (client, interface) in self.clients_and_methods: methods.update(interface) return methods def call_method(self, name: str, args: Dict[str, Any] = None): """Call the web service name(args) and add the result to trace. Args: name (str): the name of the method to call. args (dict): the input values for the method. If args=None, then this method uses 'choose_input_value' to choose appropriate values for each argument value of the method. Returns: Before the call, this method replaces some symbolic inputs by actual concrete values. For example the correct password token is replaced by the real password -- this avoids recording the real password in the inputs of the trace. Returns: all the data returned by the method. """ (client, signature) = self._find_method(name) inputs = signature["input"] if args is None: args = {n: self.choose_input_value(n) for n in inputs.keys()} if None in args.values(): print(f"skipping method {name}. Please define missing input values.") return None if self.verbose: print(f" call {name}{args}") # insert special secret argument values if requested args_list = [self._insert_password(arg) for (n, arg) in args.items()] out = getattr(client.service, name)(*args_list) # we call it 'action' so it gets printed before 'inputs' (alphabetical order). self.curr_events.append(Event(name, args, out)) if self.verbose: print(f" -> {summary(out)}") return out def generate_trace(self, start=True, length=20, methods: List[str] = None) -> Trace: """Generates the requested length of test steps, choosing methods at random. Args: start (bool): True means that a new trace is started (unless current one is empty). length (int): The number of steps to generate (default=20). methods (List[str]): only these methods will be chosen (None means all are allowed) Returns: the whole of the current trace that has been generated so far. """ if start: if len(self.curr_events) > 0: new_trace = Trace([], random_state=self.random.getstate()) self.curr_events = new_trace.events # mutable list to append to. self.trace_set.append(new_trace) if methods is None: methods = self.methods_allowed for i in range(length): # TODO: continue while Status==0? self.call_method(self.random.choice(methods)) return self.trace_set.traces[-1] def setup_feature_data(self): """Must be called before the first call to get_trace_features.""" actions = self.methods_allowed nums = len(actions) self.action2number = dict(zip(actions, range(nums))) if self.verbose: print("Action 2 num:", self.action2number) def get_action_counts(self, events: List[Event]) -> List[int]: """Returns an array of counts - how many times each event occurs in trace.""" result = [0 for k in self.action2number.keys()] for ev in events: action_num = self.action2number[ev.action] result[action_num] += 1 return result def get_trace_features(self) -> List[int]: """Returns a vector of numeric features suitable for input to an ML model. The results returned by this function must match the training set of the ML model. Currently this returns an array of counts - how many times each event occurs in the whole current trace, and how many times in the most recent 8 events. """ prefix = self.get_action_counts(self.curr_events) suffix = self.get_action_counts(self.curr_events[-8:]) return prefix+suffix def generate_trace_ml(self, model, start=True, length=20): """Generates the requested length of test steps, choosing methods using the given model. Args: model (Any): the ML model to use to generate the next event. This model must support the 'predict_proba' method. start (bool): True means that a new trace is started, beginning with a "Login" call. length (int): The number of steps to generate (default=20). Returns: the whole of the current trace that has been generated so far. """ self.setup_feature_data() # start a new (empty) trace if requested. self.generate_trace(start=start, length=0) for i in range(length): features = self.get_trace_features() [proba] = model.predict_proba([features]) [action_num] = numpy.random.choice(len(proba), p=proba, size=1) action = self.methods_allowed[action_num] if self.verbose: print(i, features, action, ",".join([f"{int(p*100)}" for p in proba])) self.call_method(action) return self.curr_events if __name__ == "__main__": unittest.main() PK@[POӓ#agilkia-0.3.1.dist-info/LICENSE.txtCopyright 2019 Mark Utting Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK!HPOagilkia-0.3.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!HƱ agilkia-0.3.1.dist-info/METADATAVR#7-`nHĉLxI1[5[Im35Uy}}=WSG~ 2Av~Sk[ 2W}Sm^rs}V2<.T+Oh+7שS)g!NuUIjn{_N&F+9Fu0ƒmS:9/6*o^`v<9@(~'i0^Vv\k/V/9t\g=cA?vhXO4#ENiq![=]2tŘjiJ5X }McGp}AIF?+n9B^+0;j*r0>Q4n#`g M<+srM*`J!a Xt3VE_xIi^{nMVz  lN N}B,l;.і+ ܣ^y^Cf]J̥s@6".XaܫCx}%1ݠUac 5so+)⭈R(9>]$Sgeގg5Z zUTVp1L  L2zL#NvڱvLX["e!bPHgI5⠹0.n3 (#VH>pL<`t6]!!ѷ9Ʀ t"T9Fiڴip OqEIۧN\q;ǓvV =WTV4Wdå\nCe9*@?-zkYL5h4/>ױNN!l꒣+'c^R͡O&*C2q2ZvU!`Wv&~s͑1ht~rv1>Kg3w1kU#s]>&GpUgM`oG+z 'tSg}Gk,"zoEKS5oONuqn5}k)-nq]KT99o//OﮆBr/+Ò-0G"jjE8,5M+&F}G^զ# p6Fh1Rc KSvYf<h1gR8U՞7iHR|"Sݽ~僧*quxz1:y`={k :nݧ<,zkHGp&0mv]