PKQ3LSVVfloweaver/__init__.py"""View flow data as Sankey diagrams.""" __version__ = '2.0.0a' from .dataset import Dataset from .partition import Partition, Group from .sankey_definition import SankeyDefinition, ProcessGroup, Waypoint, Bundle, Elsewhere from .view_graph import view_graph from .results_graph import results_graph from .augment_view_graph import elsewhere_bundles, augment from .hierarchy import Hierarchy from .sankey_data import SankeyData, SankeyLink, SankeyNode from .color_scales import CategoricalScale, QuantitativeScale from .weave import weave __all__ = ['Dataset', 'Partition', 'Group', 'SankeyDefinition', 'ProcessGroup', 'Waypoint', 'Bundle', 'Elsewhere', 'view_graph', 'results_graph', 'elsewhere_bundles', 'augment', 'Hierarchy', 'weave', 'save_sankey_data', 'serialise_data', 'CategoricalScale', 'QuantitativeScale'] PKKQfloweaver/augment_view_graph.pyimport networkx as nx from .sankey_definition import ProcessGroup, Waypoint, Bundle, Elsewhere from .ordering import new_node_indices, Ordering def elsewhere_bundles(sankey_definition): """Find new bundles and waypoints needed, so that every process group has a bundle to Elsewhere and a bundle from Elsewhere. """ # Build set of existing bundles to/from elsewhere. has_to_elsewhere = set() has_from_elsewhere = set() for bundle in sankey_definition.bundles.values(): assert not (bundle.source is Elsewhere and bundle.target is Elsewhere) if bundle.target is Elsewhere: # XXX they might have different flow_selections? # if bundle.source in has_to_elsewhere: # raise ValueError('duplicate bundles to elsewhere from {}'.format(bundle.source)) has_to_elsewhere.add(bundle.source) if bundle.source is Elsewhere: # XXX they might have different flow_selections? # if bundle.target in has_from_elsewhere: # raise ValueError('duplicate bundles from elsewhere to {}'.format(bundle.target)) has_from_elsewhere.add(bundle.target) # For each process group, add new bundles to/from elsewhere if not already # existing. Each one should have a waypoint of rank +/- 1. R = len(sankey_definition.ordering.layers) new_waypoints = {} new_bundles = {} # Add elsewhere bundles to all process groups if there are no bundles to start with no_bundles = (len(sankey_definition.bundles) == 0) for u, process_group in sankey_definition.nodes.items(): # Skip waypoints if not isinstance(process_group, ProcessGroup): continue waypoint_title = '→' if process_group.direction == 'R' else '←' d_rank = +1 if process_group.direction == 'R' else -1 r, _, _ = sankey_definition.ordering.indices(u) if no_bundles or (0 <= r + d_rank < R and u not in has_to_elsewhere): dummy_id = '__{}>'.format(u) assert dummy_id not in sankey_definition.nodes new_waypoints[dummy_id] = Waypoint( direction=process_group.direction, title=waypoint_title) new_bundles[dummy_id] = Bundle(u, Elsewhere, waypoints=[dummy_id]) if no_bundles or (0 <= r - d_rank < R and u not in has_from_elsewhere): dummy_id = '__>{}'.format(u) assert dummy_id not in sankey_definition.nodes new_waypoints[dummy_id] = Waypoint( direction=process_group.direction, title=waypoint_title) new_bundles[dummy_id] = Bundle(Elsewhere, u, waypoints=[dummy_id]) return new_waypoints, new_bundles def augment(G, new_waypoints, new_bundles): """Add waypoints for new_bundles to layered graph G. """ for v in new_waypoints.values(): assert isinstance(v, Waypoint) # copy G and order G = G.copy() R = len(G.ordering.layers) # XXX sorting makes order deterministic, which can affect final placement # of waypoints for k, bundle in sorted(new_bundles.items(), reverse=True): assert len(bundle.waypoints) == 1 w = bundle.waypoints[0] if bundle.to_elsewhere: u = G.node[bundle.source]['node'] r, _, _ = G.ordering.indices(bundle.source) d_rank = +1 if u.direction == 'R' else -1 G.add_node(w, node=new_waypoints[w]) r, G.ordering = check_order_edges(G.ordering, r, d_rank) this_rank = G.ordering.layers[r + d_rank] prev_rank = G.ordering.layers[r] G.add_edge(bundle.source, w, bundles=[k]) i, j = new_node_indices(G, this_rank, prev_rank, w, side='below') G.ordering = G.ordering.insert(r + d_rank, i, j, w) elif bundle.from_elsewhere: u = G.node[bundle.target]['node'] r, _, _ = G.ordering.indices(bundle.target) d_rank = +1 if u.direction == 'R' else -1 G.add_node(w, node=new_waypoints[w]) r, G.ordering = check_order_edges(G.ordering, r, -d_rank) this_rank = G.ordering.layers[r - d_rank] prev_rank = G.ordering.layers[r] G.add_edge(w, bundle.target, bundles=[k]) i, j = new_node_indices(G, this_rank, prev_rank, w, side='below') G.ordering = G.ordering.insert(r - d_rank, i, j, w) else: assert False, "Should not call augment() with non-elsewhere bundle" return G def check_order_edges(ordering, r, dr): layers = ordering.layers nb = len(layers[0]) if layers else 1 if r + dr >= len(layers): layers = layers + tuple(() for i in range(nb)) elif r + dr < 0: layers = tuple(() for i in range(nb)) + layers r += 1 return r, Ordering(layers) PK3LkU__floweaver/color_scales.py"""Color scales for Sankey diagrams. author: Rick Lupton created: 2018-01-19 """ import numpy as np from palettable.colorbrewer import qualitative, sequential # From matplotlib.colours def rgb2hex(rgb): 'Given an rgb or rgba sequence of 0-1 floats, return the hex string' return '#%02x%02x%02x' % tuple([int(np.round(val * 255)) for val in rgb[:3]]) class CategoricalScale: def __init__(self, attr, palette=None): self.attr = attr self.palette, self.lookup = prep_qualitative_palette(palette) self._next = 0 def set_domain(self, domain): self.lookup = {} self._next = 0 for d in domain: self.lookup[d] = self.palette[self._next] self._next = (self._next + 1) % len(self.palette) def __call__(self, link, measures): palette = self.get_palette() value = self.get_value(link, measures) if value in self.lookup: return self.lookup[value] else: color = self.palette[self._next] self._next = (self._next + 1) % len(self.palette) self.lookup[value] = color return color def get_value(self, link, measures): if self.attr in ('source', 'target', 'type', 'time'): return getattr(link, self.attr) else: return measures[self.attr] def get_palette(self): return self.palette def prep_qualitative_palette(palette): # qualitative colours based on material if palette is None: palette = 'Pastel1_8' if isinstance(palette, str): try: palette = getattr(qualitative, palette).hex_colors except AttributeError: raise ValueError('No qualitative palette called {}'.format(palette)) from None if isinstance(palette, dict): return palette.values(), palette else: return palette, {} # if not isinstance(palette, dict): # materials = sorted(set([m for v, w, (m, t) in G.edges(keys=True)])) # palette = {m: v # for m, v in zip(materials, itertools.cycle(palette))} class QuantitativeScale: def __init__(self, attr, palette=None, intensity=None, domain=None): self.attr = attr self.palette = self._prep_palette(palette) self.domain = domain self.intensity = intensity def set_domain_from(self, data): values = np.array([ # XXX need link here self.get_value(None, measures) for measures in data ]) self.set_domain((values.min(), values.max())) def set_domain(self, domain): assert len(domain) == 2 self.domain = domain def get_domain(self): return self.domain def _prep_palette(self, palette): if palette is None: palette = 'Reds_9' if isinstance(palette, str): try: palette = getattr(sequential, palette).mpl_colormap except AttributeError: raise ValueError('No sequential palette called {}'.format(palette)) from None return palette def get_palette(self): return self.palette def get_value(self, link, measures): return measures[self.attr] def __call__(self, link, measures): value = self.get_value(link, measures) if self.intensity is not None: value /= measures[self.intensity] if self.domain is not None: vmin, vmax = self.domain normed = (value - vmin) / (vmax - vmin) else: normed = value palette = self.get_palette() color = rgb2hex(palette(normed)) return color PKK~j!!floweaver/dataset.pyimport pandas as pd import networkx as nx from .partition import Partition def leaves_below(tree, node): return set(sum(([vv for vv in v if tree.out_degree(vv) == 0] for k, v in nx.dfs_successors(tree, node).items()), [])) class Resolver: def __init__(self, df, column): self.df = df self.column = column def __iter__(self): # XXX hack to avoid __getitem__ being called with integer indices, but # to have non-zero len. return iter(['keys']) def __getitem__(self, k): if not self.column: col = k elif k == 'id': col = self.column else: col = '{}.{}'.format(self.column, k) return self.df[col] def eval_selection(df, column, sel): if isinstance(sel, (list, tuple)): return df[column].isin(sel) elif isinstance(sel, str): resolver = Resolver(df, column) return df.eval(sel, local_dict={}, global_dict={}, resolvers=(resolver, )) else: raise TypeError('Unknown selection type: %s' % type(sel)) class Dataset: def __init__(self, flows, dim_process=None, dim_material=None, dim_time=None): if dim_process is not None and not dim_process.index.is_unique: raise ValueError('dim_process index not unique') if dim_material is not None and not dim_material.index.is_unique: raise ValueError('dim_material index not unique') if dim_time is not None and not dim_time.index.is_unique: raise ValueError('dim_time index not unique') # Fixed bug: make sure flows index is unique flows = flows.reset_index(drop=True) self._flows = flows self._dim_process = dim_process self._dim_material = dim_material self._dim_time = dim_time self._table = flows if dim_process is not None: self._table = self._table \ .join(dim_process.add_prefix('source.'), on='source') \ .join(dim_process.add_prefix('target.'), on='target') if dim_material is not None: self._table = self._table \ .join(dim_material.add_prefix('material.'), on='material') if dim_time is not None: self._table = self._table \ .join(dim_time.add_prefix('time.'), on='time') def partition(self, dimension, processes=None): """Partition of all values of `dimension` within `processes`""" if processes: q = (self._table.source.isin(processes) | self._table.target.isin(processes)) values = self._table.loc[q, dimension].unique() else: values = self._table[dimension].unique() return Partition.Simple(dimension, values) def apply_view(self, process_groups, bundles, flow_selection=None): return _apply_view(self, process_groups, bundles, flow_selection) def save(self, filename): with pd.HDFStore(filename) as store: store['flows'] = self._flows if self._dim_process is not None: store['dim_process'] = self._dim_process if self._dim_material is not None: store['dim_material'] = self._dim_material if self._dim_time is not None: store['dim_time'] = self._dim_time @classmethod def from_hdf(cls, filename): with pd.HDFStore(filename) as store: return cls(store['flows'], store['dim_process'] if 'dim_process' in store else None, store['dim_material'] if 'dim_material' in store else None, store['dim_time'] if 'dim_time' in store else None) @classmethod def from_csv(cls, flows_filename, dim_process_filename=None, dim_material_filename=None, dim_time_filename=None): def read(filename): if filename is not None: return pd.read_csv(filename).set_index('id') else: return None flows = pd.read_csv(flows_filename) dim_process = read(dim_process_filename) dim_material = read(dim_material_filename) dim_time = read(dim_time_filename) return cls(flows, dim_process, dim_material, dim_time) def find_flows(flows, source_query, target_query, flow_query=None, ignore_edges=None): """Filter flows according to source_query, target_query, and flow_query. """ if flow_query is not None: flows = flows[eval_selection(flows, '', flow_query)] if source_query is None and target_query is None: raise ValueError('source_query and target_query cannot both be None') elif source_query is None and target_query is not None: qt = eval_selection(flows, 'target', target_query) qs = (~eval_selection(flows, 'source', target_query) & ~flows.index.isin(ignore_edges or [])) elif source_query is not None and target_query is None: qs = eval_selection(flows, 'source', source_query) qt = (~eval_selection(flows, 'target', source_query) & ~flows.index.isin(ignore_edges or [])) else: qs = eval_selection(flows, 'source', source_query) qt = eval_selection(flows, 'target', target_query) f = flows[qs & qt] if source_query is None: internal_source = None else: internal_source = flows[qs & eval_selection(flows, 'target', source_query)] if target_query is None: internal_target = None else: internal_target = flows[qt & eval_selection(flows, 'source', target_query)] return f, internal_source, internal_target def _apply_view(dataset, process_groups, bundles, flow_selection): # What we want to warn about is flows between process_groups in the view_graph; they # are "used", since they appear in Elsewhere bundles, but the connection # isn't visible. used_edges = set() used_internal = set() used_process_groups = set() bundle_flows = {} table = dataset._table if flow_selection: table = table[eval_selection(table, '', flow_selection)] for k, bundle in bundles.items(): if bundle.from_elsewhere or bundle.to_elsewhere: continue # do these afterwards source = process_groups[bundle.source] target = process_groups[bundle.target] flows, internal_source, internal_target = \ find_flows(table, source.selection, target.selection, bundle.flow_selection) assert len(used_edges.intersection( flows.index.values)) == 0, 'duplicate bundle' bundle_flows[k] = flows used_edges.update(flows.index.values) used_process_groups.update(flows.source) used_process_groups.update(flows.target) # Also marked internal edges as "used" used_internal.update(internal_source.index.values) used_internal.update(internal_target.index.values) for k, bundle in bundles.items(): if bundle.from_elsewhere and bundle.to_elsewhere: raise ValueError('Cannot have flow from Elsewhere to Elsewhere') elif bundle.from_elsewhere: target = process_groups[bundle.target] flows, _, _ = find_flows(table, None, target.selection, bundle.flow_selection, used_edges) used_process_groups.add(bundle.target) elif bundle.to_elsewhere: source = process_groups[bundle.source] flows, _, _ = find_flows(table, source.selection, None, bundle.flow_selection, used_edges) used_process_groups.add(bundle.source) else: continue bundle_flows[k] = flows # XXX shouldn't this check processes in selections, not process groups? # Check set of process_groups relevant_flows = dataset._flows[dataset._flows.source.isin( used_process_groups) & dataset._flows.target.isin(used_process_groups)] unused_flows = relevant_flows[~relevant_flows.index.isin(used_edges) & ~relevant_flows.index.isin(used_internal)] return bundle_flows, unused_flows PKK floweaver/dummy_nodes.pyfrom .ordering import new_node_indices from .sankey_definition import Waypoint def add_dummy_nodes(G, v, w, bundle_key, bundle_index=0, node_kwargs=None): if node_kwargs is None: node_kwargs = {} V = G.get_node(v) W = G.get_node(w) H = G.copy() rv, iv, jv = H.ordering.indices(v) rw, iw, jw = H.ordering.indices(w) if rw > rv: p = rv if V.direction == 'L' else rv + 1 q = rw if W.direction == 'L' else rw - 1 new_ranks = list(range(p, q + 1)) d = 'R' elif rv > rw: p = rv if V.direction == 'R' else rv - 1 q = rw if W.direction == 'R' else rw + 1 new_ranks = list(range(p, q - 1, -1)) d = 'L' else: new_ranks = [] if not new_ranks: _add_edge(H, v, w, bundle_key) return H u = v for r in new_ranks: idr = '__{}_{}_{}'.format(v, w, r) # Only add and position dummy nodes the first time -- bundles can share # a dummy node leading to this happening more than once if idr not in H.node: _add_edge(H, u, idr, bundle_key) if r == rv: i, j = iv, jv + (+1 if V.direction == 'R' else -1) else: prev_rank = H.ordering.layers[r + 1 if d == 'L' else r - 1] i, j = new_node_indices(H, H.ordering.layers[r], prev_rank, idr, side='below' if d == 'L' else 'above') H.ordering = H.ordering.insert(r, i, j, idr) H.add_node(idr, node=Waypoint(direction=d, **node_kwargs)) else: _add_edge(H, u, idr, bundle_key) u = idr _add_edge(H, u, w, bundle_key) return H def _add_edge(G, v, w, bundle_key): if G.has_edge(v, w): G[v][w]['bundles'].append(bundle_key) else: G.add_edge(v, w, bundles=[bundle_key]) PKK'aaafloweaver/graph_to_sankey.pyimport itertools from palettable.colorbrewer import qualitative, sequential import numpy as np # From matplotlib.colours def rgb2hex(rgb): 'Given an rgb or rgba sequence of 0-1 floats, return the hex string' return '#%02x%02x%02x' % tuple([int(np.round(val * 255)) for val in rgb[:3]]) def graph_to_sankey(G, groups=None, palette=None, sample=None, hue=None, hue_range=None, hue_norm=False, flow_color=None): """Convert to display format, set colours, titles etc.""" if groups is None: groups = [] def get_data(data, key): if key == 'value': return data[key] else: return data['measures'][key] if sample is None: get_value = lambda data, key: float(get_data(data, key)) elif sample == 'mean': get_value = lambda data, key: get_data(data, key).mean() elif callable(sample): get_value = lambda data, key: sample(get_data(data, key)) else: get_value = lambda data, key: get_data(data, key)[sample] if flow_color is None and hue is None: # qualitative colours based on material if palette is None: palette = 'Pastel1_8' if isinstance(palette, str): try: palette = getattr(qualitative, palette).hex_colors except AttributeError: raise ValueError('No qualitative palette called {}'.format(palette)) from None if not isinstance(palette, dict): materials = sorted(set([m for v, w, (m, t) in G.edges(keys=True)])) palette = {m: v for m, v in zip(materials, itertools.cycle(palette))} get_color = lambda m, data: palette[m] elif flow_color is None and hue is not None: if palette is None: palette = 'Reds_9' if isinstance(palette, str): try: palette = getattr(sequential, palette).mpl_colormap except AttributeError: raise ValueError('No sequential palette called {}'.format(palette)) from None if hue_norm: get_hue = lambda data: get_value(data, hue) / get_value(data, 'value') elif callable(hue): get_hue = hue else: get_hue = lambda data: get_value(data, hue) values = np.array([get_hue(data) for _, _, data in G.edges(data=True)]) if hue_range is None: vmin, vmax = values.min(), values.max() else: vmin, vmax = hue_range get_color = lambda m, data: rgb2hex(palette((get_hue(data) - vmin) / (vmax - vmin))) else: get_color = flow_color links = [] nodes = [] for v, w, (m, t), data in G.edges(keys=True, data=True): links.append({ 'source': v, 'target': w, 'type': m, 'time': t, 'value': get_value(data, 'value'), 'bundles': [str(x) for x in data.get('bundles', [])], 'color': get_color(m, data), 'title': str(m), 'opacity': 1.0, }) for u, data in G.nodes(data=True): nodes.append({ 'id': u, 'title': str(data.get('title', u)), 'style': data.get('type', 'default'), 'direction': 'l' if data.get('direction', 'R') == 'L' else 'r', 'visibility': 'hidden' if data.get('title') == '' else 'visible', }) return { 'nodes': nodes, 'links': links, 'order': G.ordering.layers, 'groups': groups, } PKKwQQfloweaver/hierarchy.pyimport networkx as nx class Hierarchy: def __init__(self, tree, column): self.tree = tree self.column = column def _leaves_below(self, node): leaves = sum(([vv for vv in v if self.tree.out_degree(vv) == 0] for k, v in nx.dfs_successors(self.tree, node).items()), []) return sorted(leaves) or [node] def __call__(self, *nodes): """Return process IDs below the given nodes in the tree""" s = set() for node in nodes: if self.tree.in_degree(node) == 0: return None # all s.update(self._leaves_below(node)) if len(s) == 1: query = '{} == "{}"'.format(self.column, s.pop()) else: query = '{} in {}'.format(self.column, repr(sorted(s))) return query PKKBfloweaver/layered_graph.pyimport networkx as nx from .sankey_definition import Ordering class LayeredMixin(object): def __init__(self): super().__init__() self.ordering = Ordering([]) def copy(self): new = super().copy() new.ordering = self.ordering return new def remove_node(self, u): super().remove_node(u) self.ordering = self.ordering.remove(u) def get_node(self, u): """Get the ProcessGroup or Waypoint associated with `u`""" return self.node[u]['node'] class LayeredGraph(LayeredMixin, nx.DiGraph): pass class MultiLayeredGraph(LayeredMixin, nx.MultiDiGraph): pass PKK;zzfloweaver/ordering.pyimport bisect from .utils import pairwise import attr def _convert_layers(layers): """Wrap nodes in a single band, if none are specified.""" for item in layers: if any(isinstance(x, str) for x in item): return tuple((tuple(layer_nodes), ) for layer_nodes in layers) return tuple(tuple(tuple(band_nodes) for band_nodes in layer_bands) for layer_bands in layers) @attr.s(slots=True, frozen=True, repr=False) class Ordering(object): layers = attr.ib(convert=_convert_layers) def __repr__(self): def format_layer(layer): return '; '.join(', '.join(band) for band in layer) return 'Ordering( {} )'.format(' | '.join(format_layer(layer) for layer in self.layers)) def insert(self, i, j, k, value): def __insert(band): return band[:k] + (value, ) + band[k:] def _insert(layer): return [__insert(band) if j == jj else band for jj, band in enumerate(layer)] layers = [_insert(layer) if i == ii else layer for ii, layer in enumerate(self.layers)] return Ordering(layers) def remove(self, value): def __remove(band): return tuple(x for x in band if x != value) def _remove(layer): return tuple(__remove(band) for band in layer) layers = tuple(_remove(layer) for layer in self.layers) # remove unused ranks from layers layers = tuple(layer for layer in layers if any(rank for rank in layer)) return Ordering(layers) def indices(self, value): for r, bands in enumerate(self.layers): for i, rank in enumerate(bands): if value in rank: return r, i, rank.index(value) raise ValueError('node "{}" not in ordering'.format(value)) def flatten_bands(bands): L = [] idx = [] i = 0 for band in bands: L.extend(band) idx.append(i) i += len(band) return L, idx def unflatten_bands(L, idx): bands = [] for i0, i1 in pairwise(idx + [len(L)]): bands.append(L[i0:i1]) return bands def band_index(idx, i): for iband, i0 in reversed(list(enumerate(idx))): if i >= i0: return iband return len(idx) def new_node_indices(G, this_bands, other_bands, new_process_group, side='below'): assert side in ('above', 'below') this_layer, this_idx = flatten_bands(this_bands) other_layer, other_idx = flatten_bands(other_bands) # Position of new node, and which band in other_bands it would be new_pos = median_value(neighbour_positions(G, other_layer, new_process_group)) if new_pos == -1: # no connection -- default value? return (0, 0) new_band = band_index(other_idx, new_pos) # Position of other nodes in layer existing_pos = [median_value(neighbour_positions(G, other_layer, u)) for u in this_layer] existing_pos = fill_unknown(existing_pos, side) # New node should be in new_band, at a position depending on the existing # nodes in that band. candidates = [pos for pos in existing_pos if band_index(other_idx, pos) == new_band] index = (bisect.bisect_right(candidates, new_pos) if side == 'below' else bisect.bisect_left(candidates, new_pos)) return (new_band, index) def median_value(positions): N = len(positions) m = N // 2 if N == 0: return -1 elif N % 2 == 1: return positions[m] elif N == 2: return (positions[0] + positions[1]) / 2 else: left = positions[m - 1] - positions[0] right = positions[-1] - positions[m] return (positions[m - 1] * right + positions[m] * left) / ( left + right) def neighbour_positions(G, rank, u): # neighbouring positions on other rank positions = [] for i, n in enumerate(rank): if G.has_edge(n, u) or G.has_edge(u, n): positions.append(i) return sorted(positions) def fill_unknown(values, side): assert side in ('above', 'below') if not values: return [] if side == 'above': y = values[::-1] a = y[0] if y[0] >= 0 else len(y) else: y = values a = y[0] if y[0] >= 0 else 0 z = [] for x in y: if x >= 0: a = x z.append(a) if side == 'above': return z[::-1] else: return z PKK.Cfloweaver/partition.pyimport attr def _validate_query(instance, attribute, value): if value: if not all(isinstance(x, tuple) and len(x) == 2 for x in value): raise ValueError('All elements of query should be 2-tuples') @attr.s(slots=True, frozen=True) class Group(object): label = attr.ib(convert=str) query = attr.ib(convert=tuple, validator=_validate_query) @attr.s(slots=True, frozen=True) class Partition(object): groups = attr.ib(default=attr.Factory(tuple), convert=tuple) @property def labels(self): return [g.label for g in self.groups] @classmethod def Simple(cls, dimension, values): def make_group(v): if isinstance(v, tuple): label, items = v else: label, items = v, (v, ) return Group(label, [(dimension, tuple(items))]) groups = [make_group(v) for v in values] # Check for duplicates seen_values = set() for i, group in enumerate(groups): for j, value in enumerate(group.query[0][1]): if value in seen_values: raise ValueError('Duplicate value "{}" in partition (value {} of group {})' .format(value, j, i)) seen_values.add(value) return cls(groups) def __add__(self, other): return Partition(self.groups + other.groups) def __mul__(self, other): """Cartesian product""" groups = [ Group('{}/{}'.format(g1.label, g2.label), g1.query + g2.query) for g1 in self.groups for g2 in other.groups ] return Partition(groups) PK J3LjXFFfloweaver/results_graph.pyimport pandas as pd from .layered_graph import MultiLayeredGraph, Ordering from .partition import Partition, Group from .sankey_definition import ProcessGroup def results_graph(view_graph, bundle_flows, flow_partition=None, time_partition=None, measures='value'): G = MultiLayeredGraph() groups = [] # Add nodes to graph and to order layers = [] for r, bands in enumerate(view_graph.ordering.layers): o = [[] for band in bands] for i, rank in enumerate(bands): for u in rank: node = view_graph.get_node(u) group_nodes = [] for x, xtitle in nodes_from_partition(u, node.partition): o[i].append(x) group_nodes.append(x) if node.partition == None: title = u if node.title is None else node.title else: title = xtitle G.add_node(x, { 'type': ('process' if isinstance(node, ProcessGroup) else 'group'), 'direction': node.direction, 'title': title, }) groups.append({ 'id': u, 'type': ('process' if isinstance(node, ProcessGroup) else 'group'), 'title': node.title or '', 'nodes': group_nodes }) layers.append(o) G.ordering = Ordering(layers) # Add edges to graph for v, w, data in view_graph.edges(data=True): flows = pd.concat([bundle_flows[bundle] for bundle in data['bundles']], ignore_index=True) gv = view_graph.get_node(v).partition gw = view_graph.get_node(w).partition gf = data.get('flow_partition') or flow_partition or None gt = time_partition or None edges = group_flows(flows, v, gv, w, gw, gf, gt, measures) for _, _, _, d in edges: d['bundles'] = data['bundles'] G.add_edges_from(edges) # remove unused nodes unused = [u for u, deg in G.degree_iter() if deg == 0] for u in unused: G.remove_node(u) # remove unused nodes from groups def filter_groups(g): if len(g['nodes']) == 0: return False if len(g['nodes']) == 1: return G.node[g['nodes'][0]]['title'] != (g['title'] or g['id']) return True groups = [ dict(g, nodes=[x for x in g['nodes'] if x not in unused]) for g in groups ] groups = [g for g in groups if filter_groups(g)] return G, groups def nodes_from_partition(u, partition): if partition is None: return [('{}^*'.format(u), '*')] else: # _ -> other return [('{}^{}'.format(u, value), value) for value in partition.labels + ['_']] def agg_one_group(agg): def data(group): result = group.groupby(lambda x: '').agg(agg) return {k: result[k].iloc[0] for k in result} return data def group_flows(flows, v, partition1, w, partition2, flow_partition, time_partition, measures): if callable(measures): data = measures elif isinstance(measures, str): data = agg_one_group({measures: 'sum'}) elif isinstance(measures, list): data = agg_one_group({k: 'sum' for k in measures}) elif isinstance(measures, dict): data = agg_one_group(measures) else: raise ValueError('measure must be str, list, dict or callable') e = flows.copy() set_partition_keys(e, partition1, 'k1', v + '^', process_side='source') set_partition_keys(e, partition2, 'k2', w + '^', process_side='target') set_partition_keys(e, flow_partition, 'k3', '') set_partition_keys(e, time_partition, 'k4', '') grouped = e.groupby(['k1', 'k2', 'k3', 'k4']) return [ (source, target, (material, time), {'measures': data(group)}) for (source, target, material, time), group in grouped ] def set_partition_keys(df, partition, key_column, prefix, process_side=None): if partition is None: partition = Partition([Group('*', [])]) df[key_column] = prefix + '_' # other seen = (df.index != df.index) # False for group in partition.groups: q = (df.index == df.index) # True for dim, values in group.query: if dim.startswith('process') and process_side: dim = process_side + dim[7:] q = q & df[dim].isin(values) if any(q & seen): dup = df[q & seen] raise ValueError('Duplicate values in group {} ({}): {}' .format(group, process_side, ', '.join( ['{}-{}'.format(e.source, e.target) for _, e in dup.iterrows()]))) df.loc[q, key_column] = prefix + str(group.label) seen = seen | q PK-3LDRےfloweaver/sankey_data.py"""Data structure to represent Sankey diagram data. Author: Rick Lupton Created: 2018-01-15 """ import json import attr from collections import defaultdict from .sankey_definition import _validate_direction, _convert_ordering from .ordering import Ordering try: from ipysankeywidget import SankeyWidget from ipywidgets import Layout except ImportError: SankeyWidget = None _validate_opt_str = attr.validators.optional(attr.validators.instance_of(str)) @attr.s(slots=True, frozen=True) class SankeyData(object): nodes = attr.ib() links = attr.ib() groups = attr.ib(default=attr.Factory(list)) ordering = attr.ib(convert=_convert_ordering, default=Ordering([[]])) def to_json(self, filename=None): """Convert data to JSON-ready dictionary.""" data = { 'nodes': [n.to_json() for n in self.nodes], 'links': [l.to_json() for l in self.links], 'order': self.ordering.layers, 'groups': self.groups, } if filename is None: return data else: with open(filename, 'wt') as f: json.dump(data, f) def to_widget(self, width=700, height=500, margins=None, align_link_types=False): if SankeyWidget is None: raise RuntimeError('ipysankeywidget is required') if margins is None: margins = { 'top': 25, 'bottom': 10, 'left': 130, 'right': 130, } value = self.to_json() return SankeyWidget(nodes=value['nodes'], links=value['links'], order=value['order'], groups=value['groups'], align_link_types=align_link_types, layout=Layout(width=str(width), height=str(height)), margins=margins) @attr.s(slots=True, frozen=True) class SankeyNode(object): id = attr.ib(validator=attr.validators.instance_of(str)) title = attr.ib(default=None, validator=_validate_opt_str) direction = attr.ib(validator=_validate_direction, default='R') hidden = attr.ib(default=False) style = attr.ib(default=None, validator=_validate_opt_str) def to_json(self): """Convert node to JSON-ready dictionary.""" return { 'id': self.id, 'title': self.title if self.title is not None else self.id, 'style': { 'direction': self.direction.lower(), 'hidden': self.hidden is True or self.title == '', 'type': self.style if self.style is not None else 'default', }, } def _validate_opacity(instance, attr, value): if not isinstance(value, float): raise ValueError('opacity must be a number') if value < 0 or value > 1: raise ValueError('opacity must be between 0 and 1') @attr.s(slots=True, frozen=True) class SankeyLink(object): source = attr.ib(validator=attr.validators.instance_of(str)) target = attr.ib(validator=attr.validators.instance_of(str)) type = attr.ib(default=None, validator=_validate_opt_str) time = attr.ib(default=None, validator=_validate_opt_str) value = attr.ib(default=0.0, convert=float) title = attr.ib(default=None, validator=_validate_opt_str) color = attr.ib(default=None, validator=_validate_opt_str) opacity = attr.ib(default=1.0, convert=float, validator=_validate_opacity) def to_json(self): """Convert link to JSON-ready dictionary.""" return { 'source': self.source, 'target': self.target, 'type': self.type, 'time': self.time, 'value': self.value, 'title': self.title, # XXX format # 'style': { 'color': self.color, 'opacity': self.opacity, # } } PKâ3LWCfloweaver/sankey_definition.pyimport attr from . import sentinel from .ordering import Ordering # SankeyDefinition def _convert_bundles_to_dict(bundles): if not isinstance(bundles, dict): bundles = {k: v for k, v in enumerate(bundles)} return bundles def _convert_ordering(ordering): if isinstance(ordering, Ordering): return ordering else: return Ordering(ordering) def _validate_bundles(instance, attribute, bundles): # Check bundles for k, b in bundles.items(): if not b.from_elsewhere: if b.source not in instance.nodes: raise ValueError('Unknown source "{}" in bundle {}'.format( b.source, k)) if not isinstance(instance.nodes[b.source], ProcessGroup): raise ValueError( 'Source of bundle {} is not a process group'.format(k)) if not b.to_elsewhere: if b.target not in instance.nodes: raise ValueError('Unknown target "{}" in bundle {}'.format( b.target, k)) if not isinstance(instance.nodes[b.target], ProcessGroup): raise ValueError( 'Target of bundle {} is not a process group'.format(k)) for u in b.waypoints: if u not in instance.nodes: raise ValueError('Unknown waypoint "{}" in bundle {}'.format( u, k)) if not isinstance(instance.nodes[u], Waypoint): raise ValueError( 'Waypoint "{}" of bundle {} is not a waypoint'.format(u, k)) def _validate_ordering(instance, attribute, ordering): for layer_bands in ordering.layers: for band_nodes in layer_bands: for u in band_nodes: if u not in instance.nodes: raise ValueError('Unknown node "{}" in ordering'.format(u)) @attr.s(slots=True, frozen=True) class SankeyDefinition(object): nodes = attr.ib() bundles = attr.ib(convert=_convert_bundles_to_dict, validator=_validate_bundles) ordering = attr.ib(convert=_convert_ordering, validator=_validate_ordering) flow_selection = attr.ib(default=None) flow_partition = attr.ib(default=None) time_partition = attr.ib(default=None) def copy(self): return self.__class__(self.nodes.copy(), self.bundles.copy(), self.ordering, self.flow_partition, self.flow_selection, self.time_partition) # ProcessGroup def _validate_direction(instance, attribute, value): if value not in 'LR': raise ValueError('direction must be L or R') @attr.s(slots=True) class ProcessGroup(object): """A ProcessGroup represents a group of processes from the underlying dataset. The processes to include are defined by the `selection`. By default they are all lumped into one node in the diagram, but by defining a `partition` this can be controlled. Attributes ---------- selection : list or string If a list of strings, they are taken as process ids. If a single string, it is taken as a Pandas query string run against the process table. partition : Partition, optional Defines how to split the ProcessGroup into subgroups. direction : 'R' or 'L' Direction of flow, default 'R' (left-to-right). title : string, optional Label for the ProcessGroup. If not set, the ProcessGroup id will be used. """ selection = attr.ib(default=None) partition = attr.ib(default=None) direction = attr.ib(validator=_validate_direction, default='R') title = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) # Waypoint @attr.s(slots=True) class Waypoint(object): """A Waypoint represents a control point along a :class:`Bundle` of flows. There are two reasons to define Waypoints: to control the routing of :class:`Bundle` s of flows through the diagram, and to split flows according to some attributes by setting a `partition`. Attributes ---------- partition : Partition, optional Defines how to split the Waypoint into subgroups. direction : 'R' or 'L' Direction of flow, default 'R' (left-to-right). title : string, optional Label for the Waypoint. If not set, the Waypoint id will be used. """ partition = attr.ib(default=None) direction = attr.ib(validator=_validate_direction, default='R') title = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) # Bundle Elsewhere = sentinel.create('Elsewhere') def _validate_flow_selection(instance, attribute, value): if instance.source == instance.target and not value: raise ValueError('flow_selection is required for bundle with same ' 'source and target') @attr.s(frozen=True, slots=True) class Bundle(object): """A Bundle represents a set of flows between two :class:`ProcessGroup`s. Attributes ---------- source : string The id of the :class:`ProcessGroup` at the start of the Bundle. target : string The id of the :class:`ProcessGroup` at the end of the Bundle. waypoints : list of strings Optional list of ids of :class:`Waypoint`s the Bundle should pass through. flow_selection : string, optional Query string to filter the flows included in this Bundle. flow_partition : Partition, optional Defines how to split the flows in the Bundle into sub-flows. Often you want the same Partition for all the Bundles in the diagram, see :attr:`SankeyDefinition.flow_partition`. default_partition : Partition, optional Defines the Partition applied to any Waypoints automatically added to route the Bundle across layers of the diagram. """ source = attr.ib() target = attr.ib() waypoints = attr.ib(default=attr.Factory(tuple), convert=tuple) flow_selection = attr.ib(default=None, validator=_validate_flow_selection) flow_partition = attr.ib(default=None) default_partition = attr.ib(default=None) @property def to_elsewhere(self): """True if the target of the Bundle is Elsewhere (outside the system boundary).""" return self.target is Elsewhere @property def from_elsewhere(self): """True if the source of the Bundle is Elsewhere (outside the system boundary).""" return self.source is Elsewhere PKK3LxgRffloweaver/save_sankey.py"""Save Sankey data to JSON format. """ import json def serialise_data(value, version='2'): """Serialise `value` returned by graph_to_sankey.""" if version != '2': raise ValueError('Only version 2 is supported') return _value_version_2(value) def _value_version_2(value): nodes = [ dict(id=node['id'], title=node['title'], metadata=dict(direction=node['direction']), style=dict(hidden=(node['visibility'] == 'hidden'))) for node in value['nodes'] ] links = [ { 'source': link['source'], 'target': link['target'], 'type': link['type'], 'data': dict(value=link['value']), 'style': {'color': link['color']} } for link in value['links'] ] return { 'format': 'sankey-v2', 'metadata': { 'layers': value['order'], }, 'nodes': nodes, 'links': links, } PKKҔk floweaver/sentinel.py#!/usr/bin/env python # coding: utf-8 """ Create sentinel and singleton objects. Copyright 2014 © Eddie Antonio Santos. MIT licensed. """ import inspect __all__ = ['create'] __version__ = '0.1.1' def get_caller_module(): """ Returns the name of the caller's module as a string. >>> get_caller_module() '__main__' """ stack = inspect.stack() assert len(stack) > 1 caller = stack[2][0] return caller.f_globals['__name__'] def create(name, mro=(object,), extra_methods={}, *args, **kwargs): """ create(name, mro=(object,), extra_methods={}, ...) -> Sentinel instance Creates a new sentinel instance. This is a singleton instance kind of like the builtin None, and Ellipsis. Method resolution order (MRO) for the anonymous class can be specified (i.e., it can be a subclass). Provide the mro as tuple of all classes that it inherits from. If only one class, provide a 1-tuple: e.g., (Cls,). Additionally extra class attributes, such as methods can be provided in the extra_methods dict. The following methods are provided, but can be overridden: __repr__() Returns the class name, similar to None and Ellipsis. __copy__() __deepcopy__() Always return the same singleton instance such that ``copy(Sentinel) is Sentinel`` is true. __reduce__() Provided for proper pickling prowess. That is, ``pickle.loads(pickle.dumps(Sentinel)) is Sentinel`` is true. Finally, the remain arguments and keyword arguments are passed to the super class's __init__(). This is helpful when for instantiating base classes such as a tuple. """ cls_dict = {} cls_dict.update( # Provide a nice, clean, self-documenting __repr__ __repr__=lambda self: name, # Provide a copy and deepcopy implementation which simply return the # same exact instance. __deepcopy__=lambda self, _memo: self, __copy__=lambda self: self, # Provide a hook for pickling the sentinel. __reduce__=lambda self: name ) # If the default MRO is given, then it's safe to prevent the singleton # instance from having a instance dictionary. if mro == (object,): cls_dict.update(__slots__=()) cls_dict.update(extra_methods) anon_type = type(name, mro, cls_dict) # Stack introspection -- make the singleton always belong to the module of # its caller. If we don't do this, pickling using __reduce__() will fail! anon_type.__module__ = get_caller_module() # Return the singleton instance of this new, "anonymous" type. return anon_type(*args, **kwargs) PKKfloweaver/utils.pyimport itertools def pairwise(iterable): "s -> (s0,s1), (s1,s2), (s2, s3), ..." a, b = itertools.tee(iterable) next(b, None) return zip(a, b) PKKj%66floweaver/view_graph.pyfrom .layered_graph import LayeredGraph from .utils import pairwise from .sankey_definition import Elsewhere from .dummy_nodes import add_dummy_nodes def view_graph(sankey_definition): G = LayeredGraph() for k, node in sankey_definition.nodes.items(): G.add_node(k, node=node) G.ordering = sankey_definition.ordering G = _add_bundles_to_graph(G, sankey_definition.bundles, _bundle_order(sankey_definition)) return G def _add_bundles_to_graph(G, bundles, sort_key): for k, bundle in sorted(bundles.items(), key=sort_key): nodes = (bundle.source, ) + bundle.waypoints + (bundle.target, ) for iw, (a, b) in enumerate(pairwise(nodes)): # No need to add waypoints to get to Elsewhere -- it is # everywhere! if a is not Elsewhere and b is not Elsewhere: G = add_dummy_nodes(G, a, b, k, iw, _dummy_kw(bundle)) # check flow partitions are compatible for v, w, data in G.edges(data=True): flow_partitions = list({bundles[b].flow_partition for b in data['bundles']}) if len(flow_partitions) > 1: raise ValueError('Multiple flow partitions in bundles: {}'.format( ', '.join(str(b) for b in data['bundles']))) if flow_partitions[0]: data['flow_partition'] = flow_partitions[0] return G def _dummy_kw(bundle): return dict(title='', partition=bundle.default_partition or None) def _bundle_order(sankey_definition): def keyfunc(item): k, bundle = item if bundle.to_elsewhere or bundle.from_elsewhere: # bundle to elsewhere: last return (2, 0) r0, _, _ = sankey_definition.ordering.indices(bundle.source) r1, _, _ = sankey_definition.ordering.indices(bundle.target) if r1 > r0: # forwards bundles: shortest first return (0, r1 - r0) else: # backwards bundles: after forwards bundles, longest first return (1, r1 - r0) return keyfunc PK3LE@floweaver/weave.pyimport attr import numpy as np import pandas as pd import itertools from .dataset import Dataset from .sankey_data import SankeyData, SankeyNode, SankeyLink from .augment_view_graph import augment, elsewhere_bundles from .view_graph import view_graph from .results_graph import results_graph from .color_scales import CategoricalScale, QuantitativeScale from palettable.colorbrewer import qualitative, sequential # From matplotlib.colours def rgb2hex(rgb): 'Given an rgb or rgba sequence of 0-1 floats, return the hex string' return '#%02x%02x%02x' % tuple([int(np.round(val * 255)) for val in rgb[:3]]) def weave(sankey_definition, dataset, measures='value', link_width=None, link_color=None, palette=None): # Accept DataFrames as datasets -- assume it's the flow table if isinstance(dataset, pd.DataFrame): dataset = Dataset(dataset) # Calculate the view graph (adding dummy nodes) GV = view_graph(sankey_definition) # Add implicit to/from Elsewhere bundles to the view definition to ensure # consistency. new_waypoints, new_bundles = elsewhere_bundles(sankey_definition) GV2 = augment(GV, new_waypoints, new_bundles) # XXX messy bundles2 = dict(sankey_definition.bundles, **new_bundles) # Get the flows selected by the bundles bundle_flows, unused_flows = dataset.apply_view( sankey_definition.nodes, bundles2, sankey_definition.flow_selection) # Calculate the results graph (actual Sankey data) GR, groups = results_graph(GV2, bundle_flows, flow_partition=sankey_definition.flow_partition, time_partition=sankey_definition.time_partition, measures=measures) # Default link width is same as default measure if link_width is None: if not isinstance(measures, str): raise ValueError(('If you set a complicated measure function, ' 'you need to set link_width too.')) link_width = measures if callable(link_width): get_value = lambda link, measures: link_width(measures) elif isinstance(link_width, str): get_value = lambda link, measures: float(measures[link_width]) else: raise ValueError('link_width must be a str or callable') # Default link color is categorical scale based on link type if link_color is None: link_color = CategoricalScale('type', palette=palette) elif isinstance(link_color, str): link_color = CategoricalScale(link_color, palette=palette) elif not callable(link_color): raise TypeError('link_color must be a str or callable') # Set domain for quantitative colors, if not already set if hasattr(link_color, 'set_domain_from'): link_color.set_domain_from([data['measures'] for _, _, data in GR.edges(data=True)]) # Package result links = [ make_link(get_value, link_color, v, w, m, t, data) for v, w, (m, t), data in GR.edges(keys=True, data=True) ] nodes = [ make_node(u, data) for u, data in GR.nodes(data=True) ] result = SankeyData(nodes, links, groups, GR.ordering.layers) return result # maybe this function should be customisable? def make_link(get_value, get_color, v, w, m, t, data): link = SankeyLink( source=v, target=w, type=m, time=t, title=str(m) ) return attr.evolve( link, value=get_value(link, data['measures']), color=get_color(link, data['measures']), ) def make_node(u, data): return SankeyNode( id=u, title=data.get('title'), style=data.get('type'), direction=data.get('direction', 'R'), # XXX not setting hidden here -- should have logic here or in to_json()? ) def prep_qualitative_palette(G, palette): # qualitative colours based on material if palette is None: palette = 'Pastel1_8' if isinstance(palette, str): try: palette = getattr(qualitative, palette).hex_colors except AttributeError: raise ValueError('No qualitative palette called {}'.format(palette)) from None if not isinstance(palette, dict): materials = sorted(set([m for v, w, (m, t) in G.edges(keys=True)])) palette = {m: v for m, v in zip(materials, itertools.cycle(palette))} PKMId66"floweaver-2.0.0a.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2016 Rick Lupton Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H١Wd floweaver-2.0.0a.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,Q0343 /, (-JLR()*M ILR(4KM̫#DPK!H˴݇#floweaver-2.0.0a.dist-info/METADATAeMO@+樉Ǎ$BI@ae֧ tZ"/vܜ*O\_:pt R.1qjaJFdotC~ړ}e+i}ks>h1DCa[(^q-J^Q)w#ey1R_i/'MkǓckL:v-,v.Km 6DQS{eGJ{xoeC@7[H"(Xf[0oiWӛhrX .S^G%l.>\c4GylBK1MԾqi/(nW {jiՆւHۆS4VV HqG4\u-kΉftފ1?I܀X ] 9z Q}UpxLVkR5%x@xf̬wӁY,Ka akyBɒG*P*2YaV~KvU08ޅ[EZpAtgmva7,$'hm(.5j!UFӣ“{֋N۲hPKQ3LSVVfloweaver/__init__.pyPKKQfloweaver/augment_view_graph.pyPK3LkU__floweaver/color_scales.pyPKK~j!!t%floweaver/dataset.pyPKK eGfloweaver/dummy_nodes.pyPKK'aaaeOfloweaver/graph_to_sankey.pyPKKwQQ^floweaver/hierarchy.pyPKKBafloweaver/layered_graph.pyPKK;zzIdfloweaver/ordering.pyPKK.Cvfloweaver/partition.pyPK J3LjXFF}floweaver/results_graph.pyPK-3LDRے/floweaver/sankey_data.pyPKâ3LWCfloweaver/sankey_definition.pyPKK3LxgRf;floweaver/save_sankey.pyPKKҔk Lfloweaver/sentinel.pyPKK,floweaver/utils.pyPKKj%66floweaver/view_graph.pyPK3LE@gfloweaver/weave.pyPKMId66"9floweaver-2.0.0a.dist-info/LICENSEPK!H١Wd floweaver-2.0.0a.dist-info/WHEELPK!H˴݇#Dfloweaver-2.0.0a.dist-info/METADATAPK!HopX!floweaver-2.0.0a.dist-info/RECORDPK"