PKK >>sankeyview/__init__.py"""View flow data as Sankey diagrams.""" __version__ = '1.1.6' from .dataset import Dataset from .partition import Partition, Group from .sankey_definition import SankeyDefinition, ProcessGroup, Waypoint, Bundle, Elsewhere from .view_graph import view_graph from .results_graph import results_graph from .augment_view_graph import elsewhere_bundles, augment from .sankey_view import sankey_view from .hierarchy import Hierarchy from .graph_to_sankey import graph_to_sankey from .save_sankey import save_sankey_data, serialise_data __all__ = ['Dataset', 'Partition', 'Group', 'SankeyDefinition', 'ProcessGroup', 'Waypoint', 'Bundle', 'Elsewhere', 'view_graph', 'results_graph', 'elsewhere_bundles', 'augment', 'sankey_view', 'Hierarchy', 'graph_to_sankey', 'save_sankey_data', 'serialise_data'] PKXQIQ sankeyview/augment_view_graph.pyimport networkx as nx from .sankey_definition import ProcessGroup, Waypoint, Bundle, Elsewhere from .ordering import new_node_indices, Ordering def elsewhere_bundles(sankey_definition): """Find new bundles and waypoints needed, so that every process group has a bundle to Elsewhere and a bundle from Elsewhere. """ # Build set of existing bundles to/from elsewhere. has_to_elsewhere = set() has_from_elsewhere = set() for bundle in sankey_definition.bundles.values(): assert not (bundle.source is Elsewhere and bundle.target is Elsewhere) if bundle.target is Elsewhere: # XXX they might have different flow_selections? # if bundle.source in has_to_elsewhere: # raise ValueError('duplicate bundles to elsewhere from {}'.format(bundle.source)) has_to_elsewhere.add(bundle.source) if bundle.source is Elsewhere: # XXX they might have different flow_selections? # if bundle.target in has_from_elsewhere: # raise ValueError('duplicate bundles from elsewhere to {}'.format(bundle.target)) has_from_elsewhere.add(bundle.target) # For each process group, add new bundles to/from elsewhere if not already # existing. Each one should have a waypoint of rank +/- 1. R = len(sankey_definition.ordering.layers) new_waypoints = {} new_bundles = {} # Add elsewhere bundles to all process groups if there are no bundles to start with no_bundles = (len(sankey_definition.bundles) == 0) for u, process_group in sankey_definition.nodes.items(): # Skip waypoints if not isinstance(process_group, ProcessGroup): continue waypoint_title = '→' if process_group.direction == 'R' else '←' d_rank = +1 if process_group.direction == 'R' else -1 r, _, _ = sankey_definition.ordering.indices(u) if no_bundles or (0 <= r + d_rank < R and u not in has_to_elsewhere): dummy_id = '__{}>'.format(u) assert dummy_id not in sankey_definition.nodes new_waypoints[dummy_id] = Waypoint( direction=process_group.direction, title=waypoint_title) new_bundles[dummy_id] = Bundle(u, Elsewhere, waypoints=[dummy_id]) if no_bundles or (0 <= r - d_rank < R and u not in has_from_elsewhere): dummy_id = '__>{}'.format(u) assert dummy_id not in sankey_definition.nodes new_waypoints[dummy_id] = Waypoint( direction=process_group.direction, title=waypoint_title) new_bundles[dummy_id] = Bundle(Elsewhere, u, waypoints=[dummy_id]) return new_waypoints, new_bundles def augment(G, new_waypoints, new_bundles): """Add waypoints for new_bundles to layered graph G. """ for v in new_waypoints.values(): assert isinstance(v, Waypoint) # copy G and order G = G.copy() R = len(G.ordering.layers) # XXX sorting makes order deterministic, which can affect final placement # of waypoints for k, bundle in sorted(new_bundles.items(), reverse=True): assert len(bundle.waypoints) == 1 w = bundle.waypoints[0] if bundle.to_elsewhere: u = G.node[bundle.source]['node'] r, _, _ = G.ordering.indices(bundle.source) d_rank = +1 if u.direction == 'R' else -1 G.add_node(w, node=new_waypoints[w]) r, G.ordering = check_order_edges(G.ordering, r, d_rank) this_rank = G.ordering.layers[r + d_rank] prev_rank = G.ordering.layers[r] G.add_edge(bundle.source, w, bundles=[k]) i, j = new_node_indices(G, this_rank, prev_rank, w, side='below') G.ordering = G.ordering.insert(r + d_rank, i, j, w) elif bundle.from_elsewhere: u = G.node[bundle.target]['node'] r, _, _ = G.ordering.indices(bundle.target) d_rank = +1 if u.direction == 'R' else -1 G.add_node(w, node=new_waypoints[w]) r, G.ordering = check_order_edges(G.ordering, r, -d_rank) this_rank = G.ordering.layers[r - d_rank] prev_rank = G.ordering.layers[r] G.add_edge(w, bundle.target, bundles=[k]) i, j = new_node_indices(G, this_rank, prev_rank, w, side='below') G.ordering = G.ordering.insert(r - d_rank, i, j, w) else: assert False, "Should not call augment() with non-elsewhere bundle" return G def check_order_edges(ordering, r, dr): layers = ordering.layers nb = len(layers[0]) if layers else 1 if r + dr >= len(layers): layers = layers + tuple(() for i in range(nb)) elif r + dr < 0: layers = tuple(() for i in range(nb)) + layers r += 1 return r, Ordering(layers) PKMeJ~j!!sankeyview/dataset.pyimport pandas as pd import networkx as nx from .partition import Partition def leaves_below(tree, node): return set(sum(([vv for vv in v if tree.out_degree(vv) == 0] for k, v in nx.dfs_successors(tree, node).items()), [])) class Resolver: def __init__(self, df, column): self.df = df self.column = column def __iter__(self): # XXX hack to avoid __getitem__ being called with integer indices, but # to have non-zero len. return iter(['keys']) def __getitem__(self, k): if not self.column: col = k elif k == 'id': col = self.column else: col = '{}.{}'.format(self.column, k) return self.df[col] def eval_selection(df, column, sel): if isinstance(sel, (list, tuple)): return df[column].isin(sel) elif isinstance(sel, str): resolver = Resolver(df, column) return df.eval(sel, local_dict={}, global_dict={}, resolvers=(resolver, )) else: raise TypeError('Unknown selection type: %s' % type(sel)) class Dataset: def __init__(self, flows, dim_process=None, dim_material=None, dim_time=None): if dim_process is not None and not dim_process.index.is_unique: raise ValueError('dim_process index not unique') if dim_material is not None and not dim_material.index.is_unique: raise ValueError('dim_material index not unique') if dim_time is not None and not dim_time.index.is_unique: raise ValueError('dim_time index not unique') # Fixed bug: make sure flows index is unique flows = flows.reset_index(drop=True) self._flows = flows self._dim_process = dim_process self._dim_material = dim_material self._dim_time = dim_time self._table = flows if dim_process is not None: self._table = self._table \ .join(dim_process.add_prefix('source.'), on='source') \ .join(dim_process.add_prefix('target.'), on='target') if dim_material is not None: self._table = self._table \ .join(dim_material.add_prefix('material.'), on='material') if dim_time is not None: self._table = self._table \ .join(dim_time.add_prefix('time.'), on='time') def partition(self, dimension, processes=None): """Partition of all values of `dimension` within `processes`""" if processes: q = (self._table.source.isin(processes) | self._table.target.isin(processes)) values = self._table.loc[q, dimension].unique() else: values = self._table[dimension].unique() return Partition.Simple(dimension, values) def apply_view(self, process_groups, bundles, flow_selection=None): return _apply_view(self, process_groups, bundles, flow_selection) def save(self, filename): with pd.HDFStore(filename) as store: store['flows'] = self._flows if self._dim_process is not None: store['dim_process'] = self._dim_process if self._dim_material is not None: store['dim_material'] = self._dim_material if self._dim_time is not None: store['dim_time'] = self._dim_time @classmethod def from_hdf(cls, filename): with pd.HDFStore(filename) as store: return cls(store['flows'], store['dim_process'] if 'dim_process' in store else None, store['dim_material'] if 'dim_material' in store else None, store['dim_time'] if 'dim_time' in store else None) @classmethod def from_csv(cls, flows_filename, dim_process_filename=None, dim_material_filename=None, dim_time_filename=None): def read(filename): if filename is not None: return pd.read_csv(filename).set_index('id') else: return None flows = pd.read_csv(flows_filename) dim_process = read(dim_process_filename) dim_material = read(dim_material_filename) dim_time = read(dim_time_filename) return cls(flows, dim_process, dim_material, dim_time) def find_flows(flows, source_query, target_query, flow_query=None, ignore_edges=None): """Filter flows according to source_query, target_query, and flow_query. """ if flow_query is not None: flows = flows[eval_selection(flows, '', flow_query)] if source_query is None and target_query is None: raise ValueError('source_query and target_query cannot both be None') elif source_query is None and target_query is not None: qt = eval_selection(flows, 'target', target_query) qs = (~eval_selection(flows, 'source', target_query) & ~flows.index.isin(ignore_edges or [])) elif source_query is not None and target_query is None: qs = eval_selection(flows, 'source', source_query) qt = (~eval_selection(flows, 'target', source_query) & ~flows.index.isin(ignore_edges or [])) else: qs = eval_selection(flows, 'source', source_query) qt = eval_selection(flows, 'target', target_query) f = flows[qs & qt] if source_query is None: internal_source = None else: internal_source = flows[qs & eval_selection(flows, 'target', source_query)] if target_query is None: internal_target = None else: internal_target = flows[qt & eval_selection(flows, 'source', target_query)] return f, internal_source, internal_target def _apply_view(dataset, process_groups, bundles, flow_selection): # What we want to warn about is flows between process_groups in the view_graph; they # are "used", since they appear in Elsewhere bundles, but the connection # isn't visible. used_edges = set() used_internal = set() used_process_groups = set() bundle_flows = {} table = dataset._table if flow_selection: table = table[eval_selection(table, '', flow_selection)] for k, bundle in bundles.items(): if bundle.from_elsewhere or bundle.to_elsewhere: continue # do these afterwards source = process_groups[bundle.source] target = process_groups[bundle.target] flows, internal_source, internal_target = \ find_flows(table, source.selection, target.selection, bundle.flow_selection) assert len(used_edges.intersection( flows.index.values)) == 0, 'duplicate bundle' bundle_flows[k] = flows used_edges.update(flows.index.values) used_process_groups.update(flows.source) used_process_groups.update(flows.target) # Also marked internal edges as "used" used_internal.update(internal_source.index.values) used_internal.update(internal_target.index.values) for k, bundle in bundles.items(): if bundle.from_elsewhere and bundle.to_elsewhere: raise ValueError('Cannot have flow from Elsewhere to Elsewhere') elif bundle.from_elsewhere: target = process_groups[bundle.target] flows, _, _ = find_flows(table, None, target.selection, bundle.flow_selection, used_edges) used_process_groups.add(bundle.target) elif bundle.to_elsewhere: source = process_groups[bundle.source] flows, _, _ = find_flows(table, source.selection, None, bundle.flow_selection, used_edges) used_process_groups.add(bundle.source) else: continue bundle_flows[k] = flows # XXX shouldn't this check processes in selections, not process groups? # Check set of process_groups relevant_flows = dataset._flows[dataset._flows.source.isin( used_process_groups) & dataset._flows.target.isin(used_process_groups)] unused_flows = relevant_flows[~relevant_flows.index.isin(used_edges) & ~relevant_flows.index.isin(used_internal)] return bundle_flows, unused_flows PKK sankeyview/dummy_nodes.pyfrom .ordering import new_node_indices from .sankey_definition import Waypoint def add_dummy_nodes(G, v, w, bundle_key, bundle_index=0, node_kwargs=None): if node_kwargs is None: node_kwargs = {} V = G.get_node(v) W = G.get_node(w) H = G.copy() rv, iv, jv = H.ordering.indices(v) rw, iw, jw = H.ordering.indices(w) if rw > rv: p = rv if V.direction == 'L' else rv + 1 q = rw if W.direction == 'L' else rw - 1 new_ranks = list(range(p, q + 1)) d = 'R' elif rv > rw: p = rv if V.direction == 'R' else rv - 1 q = rw if W.direction == 'R' else rw + 1 new_ranks = list(range(p, q - 1, -1)) d = 'L' else: new_ranks = [] if not new_ranks: _add_edge(H, v, w, bundle_key) return H u = v for r in new_ranks: idr = '__{}_{}_{}'.format(v, w, r) # Only add and position dummy nodes the first time -- bundles can share # a dummy node leading to this happening more than once if idr not in H.node: _add_edge(H, u, idr, bundle_key) if r == rv: i, j = iv, jv + (+1 if V.direction == 'R' else -1) else: prev_rank = H.ordering.layers[r + 1 if d == 'L' else r - 1] i, j = new_node_indices(H, H.ordering.layers[r], prev_rank, idr, side='below' if d == 'L' else 'above') H.ordering = H.ordering.insert(r, i, j, idr) H.add_node(idr, node=Waypoint(direction=d, **node_kwargs)) else: _add_edge(H, u, idr, bundle_key) u = idr _add_edge(H, u, w, bundle_key) return H def _add_edge(G, v, w, bundle_key): if G.has_edge(v, w): G[v][w]['bundles'].append(bundle_key) else: G.add_edge(v, w, bundles=[bundle_key]) PKK11sankeyview/graph_to_sankey.pyimport itertools from palettable.colorbrewer import qualitative, sequential import numpy as np # From matplotlib.colours def rgb2hex(rgb): 'Given an rgb or rgba sequence of 0-1 floats, return the hex string' return '#%02x%02x%02x' % tuple([int(np.round(val * 255)) for val in rgb[:3]]) def graph_to_sankey(G, groups=None, palette=None, sample=None, hue=None, hue_range=None, hue_norm=False, flow_color=None): """Convert to display format, set colours, titles etc.""" if groups is None: groups = [] def get_data(data, key): if key == 'value': return data[key] else: return data['measures'][key] if sample is None: get_value = lambda data, key: float(get_data(data, key)) elif sample == 'mean': get_value = lambda data, key: get_data(data, key).mean() else: get_value = lambda data, key: get_data(data, key)[sample] if flow_color is None and hue is None: # qualitative colours based on material if palette is None: palette = 'Pastel1_8' if isinstance(palette, str): try: palette = getattr(qualitative, palette).hex_colors except AttributeError: raise ValueError('No qualitative palette called {}'.format(palette)) from None if not isinstance(palette, dict): materials = sorted(set([m for v, w, (m, t) in G.edges(keys=True)])) palette = {m: v for m, v in zip(materials, itertools.cycle(palette))} get_color = lambda m, data: palette[m] elif flow_color is None and hue is not None: if palette is None: palette = 'Reds_9' if isinstance(palette, str): try: palette = getattr(sequential, palette).mpl_colormap except AttributeError: raise ValueError('No sequential palette called {}'.format(palette)) from None if hue_norm: get_hue = lambda data: get_value(data, hue) / get_value(data, 'value') elif callable(hue): get_hue = hue else: get_hue = lambda data: get_value(data, hue) values = np.array([get_hue(data) for _, _, data in G.edges(data=True)]) if hue_range is None: vmin, vmax = values.min(), values.max() else: vmin, vmax = hue_range get_color = lambda m, data: rgb2hex(palette((get_hue(data) - vmin) / (vmax - vmin))) else: get_color = flow_color links = [] nodes = [] for v, w, (m, t), data in G.edges(keys=True, data=True): links.append({ 'source': v, 'target': w, 'type': m, 'time': t, 'value': get_value(data, 'value'), 'bundles': [str(x) for x in data.get('bundles', [])], 'color': get_color(m, data), 'title': str(m), 'opacity': 1.0, }) for u, data in G.nodes(data=True): nodes.append({ 'id': u, 'title': str(data.get('title', u)), 'style': data.get('type', 'default'), 'direction': 'l' if data.get('direction', 'R') == 'L' else 'r', 'visibility': 'hidden' if data.get('title') == '' else 'visible', }) return { 'nodes': nodes, 'links': links, 'order': G.ordering.layers, 'groups': groups, } PKMIwQQsankeyview/hierarchy.pyimport networkx as nx class Hierarchy: def __init__(self, tree, column): self.tree = tree self.column = column def _leaves_below(self, node): leaves = sum(([vv for vv in v if self.tree.out_degree(vv) == 0] for k, v in nx.dfs_successors(self.tree, node).items()), []) return sorted(leaves) or [node] def __call__(self, *nodes): """Return process IDs below the given nodes in the tree""" s = set() for node in nodes: if self.tree.in_degree(node) == 0: return None # all s.update(self._leaves_below(node)) if len(s) == 1: query = '{} == "{}"'.format(self.column, s.pop()) else: query = '{} in {}'.format(self.column, repr(sorted(s))) return query PKKOd**sankeyview/jupyter.pyfrom collections import defaultdict try: from ipysankeywidget import SankeyWidget from ipywidgets import Layout except ImportError: SankeyWidget = None try: import graphviz except ImportError: graphviz = None from .utils import pairwise from .sankey_view import sankey_view from .augment_view_graph import augment, elsewhere_bundles from .view_graph import view_graph from .graph_to_sankey import graph_to_sankey from .sankey_definition import Waypoint def show_sankey(sankey_definition, dataset, palette=None, width=700, height=500, margins=None, align_link_types=False, measure='value', agg_measures=None, hue=None, override_node_layout=None, override_link_layout=None): if SankeyWidget is None: raise RuntimeError('ipysankeywidget is required') if margins is None: margins = {'top': 25, 'bottom': 10, 'left': 130, 'right': 130} if isinstance(hue, str): if hue == measure: hue_func = lambda data: data[hue] else: hue_func = lambda data: data['measures'][hue] elif callable(hue): hue_func = hue elif hue is None: hue_func = None else: raise ValueError('hue must be attribute name, callable, or None') G, groups = sankey_view(sankey_definition, dataset, measure, agg_measures) value = graph_to_sankey(G, groups, palette=palette, hue=hue_func) if align_link_types: value['alignLinkTypes'] = True # XXX experimental overrides if override_node_layout is not None: for node in value['nodes']: override = override_node_layout.get(node['id'], {}) if 'y' in override: node['forceY'] = override['y'] if override_link_layout is not None: value['overrideLinks'] = override_link_layout return SankeyWidget(nodes=value['nodes'], links=value['links'], order=value['order'], align_link_types=align_link_types, layout=Layout(width=str(width), height=str(height)), margins=margins) # def show_sankey_definition(sankey_definition, filename=None, # directory=None, xlabels=None, labels=None): # if xlabels is None: # xlabels = {} # if labels is None: # labels = {} # g = graphviz.Digraph(graph_attr=dict(splines='true', rankdir='LR'), # node_attr=dict(fontsize='12', width='0.5', height='0.3')) # for r, bands in enumerate(sankey_definition.ordering.layers): # subgraph = graphviz.Digraph() # for i, rank in enumerate(bands): # for j, u in enumerate(rank): # node_style = 'solid' if u in sankey_definition.process_groups else 'dashed' # attr = dict(label=u, shape='box', style=node_style) # if u in xlabels: # attr['xlabel'] = xlabels[u] # if u in labels: # attr['label'] = labels[u] # subgraph.node(u, **attr) # subgraph.body.append('rank=same;') # g.subgraph(subgraph) # # invisible edges to get order right # for r, bands in enumerate(sankey_definition.ordering.layers): # for i, rank in enumerate(bands): # for a, b in pairwise(rank): # g.edge(a, b, color='white') # for bundle in sankey_definition.bundles.values(): # v, w = bundle.source, bundle.target # if w is Elsewhere: # pass # elif v is Elsewhere: # pass # else: # rv, jv = find_order(sankey_definition.ordering.layers, v) # rw, jw = find_order(sankey_definition.ordering.layers, w) # if rv == rw and jv > jw: # g.edge(str(w), str(v), dir='back') # else: # g.edge(str(v), str(w)) # if filename: # if filename.endswith('.png'): # g.format = 'png' # elif filename.endswith('.xdot'): # g.format = 'xdot' # g.render(filename=filename, directory=directory, cleanup=True) # return g def show_view_graph(sankey_definition, include_elsewhere=False, filename=None, directory=None, xlabels=None, labels=None, include_coords=False): if graphviz is None: raise RuntimeError('graphviz is required') if xlabels is None: xlabels = {} if labels is None: labels = {} GV = view_graph(sankey_definition) if include_elsewhere: new_waypoints, new_bundles = elsewhere_bundles(sankey_definition) GV = augment(GV, new_waypoints, new_bundles) g = graphviz.Digraph( graph_attr=dict(splines='true', rankdir='LR'), node_attr=dict(fontsize='12', width='0.5', height='0.3')) # band_heights = defaultdict(int) # for bands in oV: # for i, rank in enumerate(bands): # band_heights[i] = max(band_heights[i], len(rank)) for r, bands in enumerate(GV.ordering.layers): # j0 = 0 subgraph = graphviz.Digraph() for i, rank in enumerate(bands): for j, u in enumerate(rank): node = GV.node[u]['node'] if '_' in u: attr = dict(label='', shape='point', width='0.1') elif isinstance(node, Waypoint): if u.startswith('from ') or u.startswith('to '): attr = dict(label=u, shape='plaintext') else: attr = dict(label=u, shape='box', style='dashed') else: attr = dict(label=u, shape='box') if u in xlabels: attr['xlabel'] = xlabels[u] if u in labels: attr['label'] = labels[u] if include_coords: attr['label'] += '\n({}, {}, {})'.format(r, i, j) # pos = (r * 1.2, -0.6 * (j0 + j)) subgraph.node( u, **attr) #pos='{},{}!'.format(*pos), pin='True', **attr) # j0 += band_heights[i] subgraph.body.append('rank=same;') g.subgraph(subgraph) # invisible edges to get order right for r, bands in enumerate(GV.ordering.layers): for i, rank in enumerate(bands): for a, b in pairwise(rank): g.edge(a, b, color='white') for v, w in GV.edges(): rv, jv = find_order(GV.ordering.layers, v) rw, jw = find_order(GV.ordering.layers, w) if rv == rw and jv > jw: g.edge(w, v, dir='back') else: g.edge(v, w) # r0 = -0.5 # r1 = len(oV) + 0.5 # j = 0.5 # for i in range(1, len(band_heights)): # attr = dict(pin='True', shape='none', label='') # g.node('__{}a'.format(j), pos='{},{}!'.format(r0*1.2, -0.6*j), **attr) # g.node('__{}b'.format(j), pos='{},{}!'.format(r1*1.2, -0.6*j), **attr) # g.edge('__{}a'.format(j), '__{}b'.format(j), arrowhead='none', style='dotted') # j += band_heights[i] if filename: g.format = 'png' g.render(filename=filename, directory=directory, cleanup=True) return g def find_order(order, process_group): for r, bands in enumerate(order): j = 0 for i, rank in enumerate(bands): for u in rank: if u == process_group: return (r, j) j += 1 raise ValueError('process_group "{}" not found'.format(process_group)) def show_view_graph_pos(sankey_definition, include_elsewhere=False, filename=None, directory=None, xlabels=None, labels=None, include_coords=False): if xlabels is None: xlabels = {} if labels is None: labels = {} if include_elsewhere: sankey_definition = augment(sankey_definition) GV, oV = view_graph(sankey_definition) g = graphviz.Digraph(engine='neato', graph_attr=dict(splines='true'), node_attr=dict(fontsize='12', width='0.5', height='0.3')) band_heights = defaultdict(int) for bands in oV: for i, rank in enumerate(bands): band_heights[i] = max(band_heights[i], len(rank)) for r, bands in enumerate(oV): j0 = 0 for i, rank in enumerate(bands): for j, u in enumerate(rank): process_group = GV.node[u]['node'] if '_' in u: attr = dict(label='', shape='point', width='0.1') elif not process_group.selection: # waypoint if u.startswith('from ') or u.startswith('to '): attr = dict(label=u, shape='plaintext') else: attr = dict(label=u, shape='box', style='dashed') else: attr = dict(label=u, shape='box') if u in xlabels: attr['xlabel'] = xlabels[u] if u in labels: attr['label'] = labels[u] if include_coords: attr['label'] += '\n({}, {}, {})'.format(r, i, j) pos = (r * 1.2, -0.6 * (j0 + j)) g.process_group(u, pos='{},{}!'.format(*pos), pin='True', **attr) j0 += band_heights[i] for v, w in GV.edges(): g.edge(v, w) r0 = -0.5 r1 = len(oV) + 0.5 j = 0.5 for i in range(1, len(band_heights)): attr = dict(pin='True', shape='none', label='') g.node('__{}a'.format(j), pos='{},{}!'.format(r0 * 1.2, -0.6 * j), **attr) g.node('__{}b'.format(j), pos='{},{}!'.format(r1 * 1.2, -0.6 * j), **attr) g.edge('__{}a'.format(j), '__{}b'.format(j), arrowhead='none', style='dotted') j += band_heights[i] if filename: g.format = 'png' g.render(filename=filename, directory=directory, cleanup=True) return g PKMIBsankeyview/layered_graph.pyimport networkx as nx from .sankey_definition import Ordering class LayeredMixin(object): def __init__(self): super().__init__() self.ordering = Ordering([]) def copy(self): new = super().copy() new.ordering = self.ordering return new def remove_node(self, u): super().remove_node(u) self.ordering = self.ordering.remove(u) def get_node(self, u): """Get the ProcessGroup or Waypoint associated with `u`""" return self.node[u]['node'] class LayeredGraph(LayeredMixin, nx.DiGraph): pass class MultiLayeredGraph(LayeredMixin, nx.MultiDiGraph): pass PKXQI;zzsankeyview/ordering.pyimport bisect from .utils import pairwise import attr def _convert_layers(layers): """Wrap nodes in a single band, if none are specified.""" for item in layers: if any(isinstance(x, str) for x in item): return tuple((tuple(layer_nodes), ) for layer_nodes in layers) return tuple(tuple(tuple(band_nodes) for band_nodes in layer_bands) for layer_bands in layers) @attr.s(slots=True, frozen=True, repr=False) class Ordering(object): layers = attr.ib(convert=_convert_layers) def __repr__(self): def format_layer(layer): return '; '.join(', '.join(band) for band in layer) return 'Ordering( {} )'.format(' | '.join(format_layer(layer) for layer in self.layers)) def insert(self, i, j, k, value): def __insert(band): return band[:k] + (value, ) + band[k:] def _insert(layer): return [__insert(band) if j == jj else band for jj, band in enumerate(layer)] layers = [_insert(layer) if i == ii else layer for ii, layer in enumerate(self.layers)] return Ordering(layers) def remove(self, value): def __remove(band): return tuple(x for x in band if x != value) def _remove(layer): return tuple(__remove(band) for band in layer) layers = tuple(_remove(layer) for layer in self.layers) # remove unused ranks from layers layers = tuple(layer for layer in layers if any(rank for rank in layer)) return Ordering(layers) def indices(self, value): for r, bands in enumerate(self.layers): for i, rank in enumerate(bands): if value in rank: return r, i, rank.index(value) raise ValueError('node "{}" not in ordering'.format(value)) def flatten_bands(bands): L = [] idx = [] i = 0 for band in bands: L.extend(band) idx.append(i) i += len(band) return L, idx def unflatten_bands(L, idx): bands = [] for i0, i1 in pairwise(idx + [len(L)]): bands.append(L[i0:i1]) return bands def band_index(idx, i): for iband, i0 in reversed(list(enumerate(idx))): if i >= i0: return iband return len(idx) def new_node_indices(G, this_bands, other_bands, new_process_group, side='below'): assert side in ('above', 'below') this_layer, this_idx = flatten_bands(this_bands) other_layer, other_idx = flatten_bands(other_bands) # Position of new node, and which band in other_bands it would be new_pos = median_value(neighbour_positions(G, other_layer, new_process_group)) if new_pos == -1: # no connection -- default value? return (0, 0) new_band = band_index(other_idx, new_pos) # Position of other nodes in layer existing_pos = [median_value(neighbour_positions(G, other_layer, u)) for u in this_layer] existing_pos = fill_unknown(existing_pos, side) # New node should be in new_band, at a position depending on the existing # nodes in that band. candidates = [pos for pos in existing_pos if band_index(other_idx, pos) == new_band] index = (bisect.bisect_right(candidates, new_pos) if side == 'below' else bisect.bisect_left(candidates, new_pos)) return (new_band, index) def median_value(positions): N = len(positions) m = N // 2 if N == 0: return -1 elif N % 2 == 1: return positions[m] elif N == 2: return (positions[0] + positions[1]) / 2 else: left = positions[m - 1] - positions[0] right = positions[-1] - positions[m] return (positions[m - 1] * right + positions[m] * left) / ( left + right) def neighbour_positions(G, rank, u): # neighbouring positions on other rank positions = [] for i, n in enumerate(rank): if G.has_edge(n, u) or G.has_edge(u, n): positions.append(i) return sorted(positions) def fill_unknown(values, side): assert side in ('above', 'below') if not values: return [] if side == 'above': y = values[::-1] a = y[0] if y[0] >= 0 else len(y) else: y = values a = y[0] if y[0] >= 0 else 0 z = [] for x in y: if x >= 0: a = x z.append(a) if side == 'above': return z[::-1] else: return z PK|I.Csankeyview/partition.pyimport attr def _validate_query(instance, attribute, value): if value: if not all(isinstance(x, tuple) and len(x) == 2 for x in value): raise ValueError('All elements of query should be 2-tuples') @attr.s(slots=True, frozen=True) class Group(object): label = attr.ib(convert=str) query = attr.ib(convert=tuple, validator=_validate_query) @attr.s(slots=True, frozen=True) class Partition(object): groups = attr.ib(default=attr.Factory(tuple), convert=tuple) @property def labels(self): return [g.label for g in self.groups] @classmethod def Simple(cls, dimension, values): def make_group(v): if isinstance(v, tuple): label, items = v else: label, items = v, (v, ) return Group(label, [(dimension, tuple(items))]) groups = [make_group(v) for v in values] # Check for duplicates seen_values = set() for i, group in enumerate(groups): for j, value in enumerate(group.query[0][1]): if value in seen_values: raise ValueError('Duplicate value "{}" in partition (value {} of group {})' .format(value, j, i)) seen_values.add(value) return cls(groups) def __add__(self, other): return Partition(self.groups + other.groups) def __mul__(self, other): """Cartesian product""" groups = [ Group('{}/{}'.format(g1.label, g2.label), g1.query + g2.query) for g1 in self.groups for g2 in other.groups ] return Partition(groups) PKMI=vCCsankeyview/results_graph.pyimport pandas as pd from .layered_graph import MultiLayeredGraph, Ordering from .partition import Partition, Group from .sankey_definition import ProcessGroup def results_graph(view_graph, bundle_flows, flow_partition=None, time_partition=None, measure='value', agg_measures=None): G = MultiLayeredGraph() groups = [] # Add nodes to graph and to order layers = [] for r, bands in enumerate(view_graph.ordering.layers): o = [[] for band in bands] for i, rank in enumerate(bands): for u in rank: node = view_graph.get_node(u) group_nodes = [] for x, xtitle in nodes_from_partition(u, node.partition): o[i].append(x) group_nodes.append(x) if node.partition == None: title = u if node.title is None else node.title else: title = xtitle G.add_node(x, { 'type': ('process' if isinstance(node, ProcessGroup) else 'group'), 'direction': node.direction, 'title': title, }) groups.append({ 'id': u, 'type': ('process' if isinstance(node, ProcessGroup) else 'group'), 'title': node.title or '', 'nodes': group_nodes }) layers.append(o) G.ordering = Ordering(layers) # Add edges to graph for v, w, data in view_graph.edges(data=True): flows = pd.concat([bundle_flows[bundle] for bundle in data['bundles']], ignore_index=True) gv = view_graph.get_node(v).partition gw = view_graph.get_node(w).partition gf = data.get('flow_partition') or flow_partition or None gt = time_partition or None edges = group_flows(flows, v, gv, w, gw, gf, gt, measure, agg_measures) for _, _, _, d in edges: d['bundles'] = data['bundles'] G.add_edges_from(edges) # remove unused nodes unused = [u for u, deg in G.degree_iter() if deg == 0] for u in unused: G.remove_node(u) # remove unused nodes from groups groups = [ dict(g, nodes=[x for x in g['nodes'] if x not in unused]) for g in groups ] groups = [g for g in groups if len(g['nodes']) > 0] return G, groups def nodes_from_partition(u, partition): if partition is None: return [('{}^*'.format(u), '*')] else: # _ -> other return [('{}^{}'.format(u, value), value) for value in partition.labels + ['_']] def group_flows(flows, v, partition1, w, partition2, flow_partition, time_partition, measure, agg_measures=None): if agg_measures is None: agg_measures = {} agg_all_measures = dict(agg_measures) agg_all_measures[measure] = 'sum' e = flows.copy() set_partition_keys(e, partition1, 'k1', v + '^', process_side='source') set_partition_keys(e, partition2, 'k2', w + '^', process_side='target') set_partition_keys(e, flow_partition, 'k3', '') set_partition_keys(e, time_partition, 'k4', '') grouped = e.groupby(['k1', 'k2', 'k3', 'k4']) if 'sample' in flows: def data(group): agg = group.groupby('sample').agg(agg_all_measures) d = {'value': agg[measure].values} if agg_measures: d['measures'] = {k: agg[k].values for k in agg_measures} return d else: def data(group): agg = group.groupby(lambda x: '').agg(agg_all_measures) d = {'value': agg[measure].iloc[0]} if agg_measures: d['measures'] = {k: agg[k].iloc[0] for k in agg_measures} return d return [ (source, target, (material, time), data(group)) for (source, target, material, time), group in grouped ] def set_partition_keys(df, partition, key_column, prefix, process_side=None): if partition is None: partition = Partition([Group('*', [])]) df[key_column] = prefix + '_' # other seen = (df.index != df.index) # False for group in partition.groups: q = (df.index == df.index) # True for dim, values in group.query: if dim.startswith('process') and process_side: dim = process_side + dim[7:] q = q & df[dim].isin(values) if any(q & seen): dup = df[q & seen] raise ValueError('Duplicate values in group {} ({}): {}' .format(group, process_side, ', '.join( ['{}-{}'.format(e.source, e.target) for _, e in dup.iterrows()]))) df.loc[q, key_column] = prefix + str(group.label) seen = seen | q PK Jsankeyview/sankey_definition.pyimport attr from . import sentinel from .ordering import Ordering # SankeyDefinition def _convert_bundles_to_dict(bundles): if not isinstance(bundles, dict): bundles = {k: v for k, v in enumerate(bundles)} return bundles def _convert_ordering(ordering): if isinstance(ordering, Ordering): return ordering else: return Ordering(ordering) def _validate_bundles(instance, attribute, bundles): # Check bundles for k, b in bundles.items(): if not b.from_elsewhere: if b.source not in instance.nodes: raise ValueError('Unknown source "{}" in bundle {}'.format( b.source, k)) if not isinstance(instance.nodes[b.source], ProcessGroup): raise ValueError( 'Source of bundle {} is not a process group'.format(k)) if not b.to_elsewhere: if b.target not in instance.nodes: raise ValueError('Unknown target "{}" in bundle {}'.format( b.target, k)) if not isinstance(instance.nodes[b.target], ProcessGroup): raise ValueError( 'Target of bundle {} is not a process group'.format(k)) for u in b.waypoints: if u not in instance.nodes: raise ValueError('Unknown waypoint "{}" in bundle {}'.format( u, k)) if not isinstance(instance.nodes[u], Waypoint): raise ValueError( 'Waypoint "{}" of bundle {} is not a waypoint'.format(u, k)) def _validate_ordering(instance, attribute, ordering): for layer_bands in ordering.layers: for band_nodes in layer_bands: for u in band_nodes: if u not in instance.nodes: raise ValueError('Unknown node "{}" in ordering'.format(u)) @attr.s(slots=True, frozen=True) class SankeyDefinition(object): nodes = attr.ib() bundles = attr.ib(convert=_convert_bundles_to_dict, validator=_validate_bundles) ordering = attr.ib(convert=_convert_ordering, validator=_validate_ordering) flow_selection = attr.ib(default=None) flow_partition = attr.ib(default=None) time_partition = attr.ib(default=None) def copy(self): return self.__class__(self.nodes.copy(), self.bundles.copy(), self.ordering, self.flow_partition, self.flow_selection, self.time_partition) # ProcessGroup def _validate_direction(instance, attribute, value): if value not in 'LR': raise ValueError('direction must be L or R') @attr.s(slots=True) class ProcessGroup(object): selection = attr.ib(default=None) partition = attr.ib(default=None) direction = attr.ib(validator=_validate_direction, default='R') title = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) # Waypoint @attr.s(slots=True) class Waypoint(object): partition = attr.ib(default=None) direction = attr.ib(validator=_validate_direction, default='R') title = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) # Bundle Elsewhere = sentinel.create('Elsewhere') def _validate_flow_selection(instance, attribute, value): if instance.source == instance.target and not value: raise ValueError('flow_selection is required for bundle with same ' 'source and target') @attr.s(slots=True) class Bundle(object): source = attr.ib() target = attr.ib() waypoints = attr.ib(default=attr.Factory(tuple), convert=tuple) flow_selection = attr.ib(default=None, validator=_validate_flow_selection) flow_partition = attr.ib(default=None) default_partition = attr.ib(default=None) @property def to_elsewhere(self): return self.target is Elsewhere @property def from_elsewhere(self): return self.source is Elsewhere PKMI>Isankeyview/sankey_view.pyfrom .augment_view_graph import augment, elsewhere_bundles from .view_graph import view_graph from .results_graph import results_graph def sankey_view(sankey_definition, dataset, measure='value', agg_measures=None): # Calculate the view graph (adding dummy nodes) GV = view_graph(sankey_definition) # Add implicit to/from Elsewhere bundles to the view definition to ensure # consistency. new_waypoints, new_bundles = elsewhere_bundles(sankey_definition) GV2 = augment(GV, new_waypoints, new_bundles) # XXX messy bundles2 = dict(sankey_definition.bundles, **new_bundles) # Get the flows selected by the bundles bundle_flows, unused_flows = dataset.apply_view( sankey_definition.nodes, bundles2, sankey_definition.flow_selection) # Calculate the results graph (actual Sankey data) GR, groups = results_graph(GV2, bundle_flows, flow_partition=sankey_definition.flow_partition, time_partition=sankey_definition.time_partition, measure=measure, agg_measures=agg_measures) return GR, groups PK GJzasankeyview/save_sankey.py"""Save Sankey data to JSON format. """ import json from .sankey_view import sankey_view from .graph_to_sankey import graph_to_sankey def _convert_node(node): return { 'id': node['id'], 'title': node['title'], 'metadata': {'direction': node['direction']}, 'style': {'hidden': node['visibility'] == 'hidden', 'type': node['style']}, } def save_sankey_data(filename, sankey_definition, dataset, palette=None, measure='value'): """Save Sankey data to `filename`. Similar to show_sankey().""" graph, groups = sankey_view(sankey_definition, dataset, measure) value = graph_to_sankey(graph, groups, palette=palette) with open(filename, 'wt') as f: json.dump(serialise_data(value), f) def serialise_data(value, version='2'): """Serialise `value` returned by graph_to_sankey.""" if version != '2': raise ValueError('Only version 2 is supported') return _value_version_2(value) def _value_version_2(value): nodes = [ dict(id=node['id'], title=node['title'], metadata=dict(direction=node['direction']), style=dict(hidden=(node['visibility'] == 'hidden'))) for node in value['nodes'] ] links = [ { 'source': link['source'], 'target': link['target'], 'type': link['type'], 'data': dict(value=link['value']), 'style': {'color': link['color']} } for link in value['links'] ] return { 'format': 'sankey-v2', 'metadata': { 'layers': value['order'], }, 'nodes': nodes, 'links': links, } PKJҔk sankeyview/sentinel.py#!/usr/bin/env python # coding: utf-8 """ Create sentinel and singleton objects. Copyright 2014 © Eddie Antonio Santos. MIT licensed. """ import inspect __all__ = ['create'] __version__ = '0.1.1' def get_caller_module(): """ Returns the name of the caller's module as a string. >>> get_caller_module() '__main__' """ stack = inspect.stack() assert len(stack) > 1 caller = stack[2][0] return caller.f_globals['__name__'] def create(name, mro=(object,), extra_methods={}, *args, **kwargs): """ create(name, mro=(object,), extra_methods={}, ...) -> Sentinel instance Creates a new sentinel instance. This is a singleton instance kind of like the builtin None, and Ellipsis. Method resolution order (MRO) for the anonymous class can be specified (i.e., it can be a subclass). Provide the mro as tuple of all classes that it inherits from. If only one class, provide a 1-tuple: e.g., (Cls,). Additionally extra class attributes, such as methods can be provided in the extra_methods dict. The following methods are provided, but can be overridden: __repr__() Returns the class name, similar to None and Ellipsis. __copy__() __deepcopy__() Always return the same singleton instance such that ``copy(Sentinel) is Sentinel`` is true. __reduce__() Provided for proper pickling prowess. That is, ``pickle.loads(pickle.dumps(Sentinel)) is Sentinel`` is true. Finally, the remain arguments and keyword arguments are passed to the super class's __init__(). This is helpful when for instantiating base classes such as a tuple. """ cls_dict = {} cls_dict.update( # Provide a nice, clean, self-documenting __repr__ __repr__=lambda self: name, # Provide a copy and deepcopy implementation which simply return the # same exact instance. __deepcopy__=lambda self, _memo: self, __copy__=lambda self: self, # Provide a hook for pickling the sentinel. __reduce__=lambda self: name ) # If the default MRO is given, then it's safe to prevent the singleton # instance from having a instance dictionary. if mro == (object,): cls_dict.update(__slots__=()) cls_dict.update(extra_methods) anon_type = type(name, mro, cls_dict) # Stack introspection -- make the singleton always belong to the module of # its caller. If we don't do this, pickling using __reduce__() will fail! anon_type.__module__ = get_caller_module() # Return the singleton instance of this new, "anonymous" type. return anon_type(*args, **kwargs) PKHsankeyview/utils.pyimport itertools def pairwise(iterable): "s -> (s0,s1), (s1,s2), (s2, s3), ..." a, b = itertools.tee(iterable) next(b, None) return zip(a, b) PKMIj%66sankeyview/view_graph.pyfrom .layered_graph import LayeredGraph from .utils import pairwise from .sankey_definition import Elsewhere from .dummy_nodes import add_dummy_nodes def view_graph(sankey_definition): G = LayeredGraph() for k, node in sankey_definition.nodes.items(): G.add_node(k, node=node) G.ordering = sankey_definition.ordering G = _add_bundles_to_graph(G, sankey_definition.bundles, _bundle_order(sankey_definition)) return G def _add_bundles_to_graph(G, bundles, sort_key): for k, bundle in sorted(bundles.items(), key=sort_key): nodes = (bundle.source, ) + bundle.waypoints + (bundle.target, ) for iw, (a, b) in enumerate(pairwise(nodes)): # No need to add waypoints to get to Elsewhere -- it is # everywhere! if a is not Elsewhere and b is not Elsewhere: G = add_dummy_nodes(G, a, b, k, iw, _dummy_kw(bundle)) # check flow partitions are compatible for v, w, data in G.edges(data=True): flow_partitions = list({bundles[b].flow_partition for b in data['bundles']}) if len(flow_partitions) > 1: raise ValueError('Multiple flow partitions in bundles: {}'.format( ', '.join(str(b) for b in data['bundles']))) if flow_partitions[0]: data['flow_partition'] = flow_partitions[0] return G def _dummy_kw(bundle): return dict(title='', partition=bundle.default_partition or None) def _bundle_order(sankey_definition): def keyfunc(item): k, bundle = item if bundle.to_elsewhere or bundle.from_elsewhere: # bundle to elsewhere: last return (2, 0) r0, _, _ = sankey_definition.ordering.indices(bundle.source) r1, _, _ = sankey_definition.ordering.indices(bundle.target) if r1 > r0: # forwards bundles: shortest first return (0, r1 - r0) else: # backwards bundles: after forwards bundles, longest first return (1, r1 - r0) return keyfunc PKMId66"sankeyview-1.1.6.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2016 Rick Lupton Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H١Wd sankeyview-1.1.6.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,Q0343 /, (-JLR()*M ILR(4KM̫#DPK!H:H#sankeyview-1.1.6.dist-info/METADATAen0E8.*jE@HKC2$VG1i {-1Ș(D|Rv45 ]gQ&c0 S 0K#&J eI2ܦɃ>yv6y8->u*2q낂_ ^,Hdk1ꓦiR+W>3]}bOIٛ&У1>ă }3{PK!H R!sankeyview-1.1.6.dist-info/RECORD}ǎH< Pd{&9``.M4E06¡ɀWօ0*]D3Eh _M]^+MgxpXI{$ QDa!vlWnW^< ȉ!UvmvS)L݃N"fb8=q9pc:<wӍěVMXcݵ] `Gq9oY&0**ٙ2EtLl'ycϯt#$s?lTs"\ic\ YkIȵwUvx.T#VPW ݔ*1QP%w 5!vE^ui;l3ڀTFA9H K.ݜ/xRICp zJƂ2fsc݅ϵvp|\y_~뜿.0 4Uo˔IwFлFYPvwZ "8֝AD9ѼNV<+؆\UAVHB&ԸB?0 ~"m+x\cP̫.Wܜ ܨ0'W-^-fB4:_4F݃cE"c6V; _PKK >>sankeyview/__init__.pyPKXQIQ rsankeyview/augment_view_graph.pyPKMeJ~j!!sankeyview/dataset.pyPKK 8sankeyview/dummy_nodes.pyPKK11@sankeyview/graph_to_sankey.pyPKMIwQQNsankeyview/hierarchy.pyPKKOd**Rsankeyview/jupyter.pyPKMIB|sankeyview/layered_graph.pyPKXQI;zzsankeyview/ordering.pyPK|I.C4sankeyview/partition.pyPKMI=vCCsankeyview/results_graph.pyPK Jlsankeyview/sankey_definition.pyPKMI>Isankeyview/sankey_view.pyPK GJzasankeyview/save_sankey.pyPKJҔk sankeyview/sentinel.pyPKHsankeyview/utils.pyPKMIj%66sankeyview/view_graph.pyPKMId66"sankeyview-1.1.6.dist-info/LICENSEPK!H١Wd sankeyview-1.1.6.dist-info/WHEELPK!H:H#%sankeyview-1.1.6.dist-info/METADATAPK!H R!Usankeyview-1.1.6.dist-info/RECORDPKt