PK!F$$triflow/__init__.py#!/usr/bin/env python # coding=utf8 from triflow.core import schemes # noqa from triflow.core.model import Model # noqa from triflow.core.simulation import Simulation # noqa from triflow.plugins.container import TriflowContainer as Container # noqa from triflow.plugins.displays import TriflowDisplay as Display # noqa import logging from logging import NullHandler logging.getLogger(__name__).addHandler(NullHandler()) retrieve_container = Container.retrieve display_fields = Display.display_fields display_probe = Display.display_probe PK!triflow/core/__init__.py#!/usr/bin/env python # coding=utf8 import logging from logging import NullHandler logging.getLogger(__name__).addHandler(NullHandler()) PK!,,triflow/core/compilers.py#!/usr/bin/env python # coding=utf8 from functools import partial import numpy as np from scipy.sparse import csc_matrix from sympy import lambdify def theano_compiler(model): """Take a triflow model and return optimized theano routines. Parameters ---------- model: triflow.Model: Model to compile Returns ------- (theano function, theano_function): Optimized routine that compute the evolution equations and their jacobian matrix. """ from theano import tensor as T from theano.ifelse import ifelse import theano.sparse as ths from theano import function def th_Min(a, b): if isinstance(a, T.TensorVariable) or isinstance(b, T.TensorVariable): return T.where(a < b, a, b) return min(a, b) def th_Max(a, b): if isinstance(a, T.TensorVariable) or isinstance(b, T.TensorVariable): return T.where(a < b, b, a) return max(a, b) def th_Heaviside(a): if isinstance(a, T.TensorVariable): return T.where(a < 0, 1, 1) return 0 if a < 0 else 1 mapargs = {arg: T.vector(arg) for arg, sarg in zip(model._args, model._symbolic_args)} to_feed = mapargs.copy() x_th = mapargs['x'] N = x_th.size L = x_th[-1] - x_th[0] dx = L / (N - 1) to_feed['dx'] = dx periodic = T.scalar("periodic", dtype="int32") middle_point = int((model._window_range - 1) / 2) th_args = [mapargs[key] for key in [*model._indep_vars, *model._dep_vars, *model._help_funcs, *model._pars]] + [periodic] map_extended = {} for (varname, discretisation_tree) in \ model._symb_vars_with_spatial_diff_order.items(): pad_left, pad_right = model._bounds th_arg = mapargs[varname] per_extended_var = T.concatenate([th_arg[pad_left:], th_arg, th_arg[:pad_right]]) edge_extended_var = T.concatenate([[th_arg[0]] * middle_point, th_arg, [th_arg[-1]] * middle_point]) extended_var = ifelse(periodic, per_extended_var, edge_extended_var) map_extended[varname] = extended_var for order in range(pad_left, pad_right + 1): if order != 0: var = ("{}_{}{}").format(varname, 'm' if order < 0 else 'p', np.abs(order)) else: var = varname new_var = extended_var[order - pad_left: extended_var.size + order - pad_right] to_feed[var] = new_var F = lambdify((model._symbolic_args), expr=model.F_array.tolist(), modules=[T, {"Max": th_Max, "Min": th_Min, "Heaviside": th_Heaviside}])( *[to_feed[key] for key in model._args] ) F = T.concatenate(F, axis=0).reshape((model._nvar, N)).T F = T.stack(F).flatten() J = lambdify((model._symbolic_args), expr=model.J_array.tolist(), modules=[T, {"Max": th_Max, "Min": th_Min, "Heaviside": th_Heaviside}])( *[to_feed[key] for key in model._args] ) J = [j if j != 0 else T.constant(0.) for j in J] J = [j if not isinstance(j, (int, float)) else T.constant(j) for j in J] J = T.stack([T.repeat(j, N) if j.ndim == 0 else j for j in J]) J = J[model._sparse_indices[0]].T.squeeze() i = T.arange(N).dimshuffle([0, 'x']) idx = T.arange(N * model._nvar).reshape((N, model._nvar)).T edge_extended_idx = T.concatenate([T.repeat(idx[:, :1], middle_point, axis=1), idx, T.repeat(idx[:, -1:], middle_point, axis=1)], axis=1).T.flatten() per_extended_idx = T.concatenate([idx[:, -middle_point:], idx, idx[:, :middle_point]], axis=1).T.flatten() extended_idx = ifelse(periodic, per_extended_idx, edge_extended_idx) rows = T.tile(T.arange(model._nvar), model._window_range * model._nvar) + i * model._nvar cols = T.repeat(T.arange(model._window_range * model._nvar), model._nvar) + i * model._nvar rows = rows[:, model._sparse_indices].reshape(J.shape).flatten() cols = extended_idx[cols][:, model._sparse_indices] \ .reshape(J.shape).flatten() permutation = T.argsort(cols) J = J.flatten()[permutation] rows = rows[permutation] cols = cols[permutation] count = T.zeros((N * model._nvar + 1,), dtype=int) uq, cnt = T.extra_ops.Unique(False, False, True)(cols) count = T.set_subtensor(count[uq + 1], cnt) indptr = T.cumsum(count) shape = T.stack([N * model._nvar, N * model._nvar]) sparse_J = ths.CSC(J, rows, indptr, shape) F_theano_function = function(inputs=th_args, outputs=F, on_unused_input='ignore', allow_input_downcast=True) J_theano_function = function(inputs=th_args, outputs=sparse_J, on_unused_input='ignore', allow_input_downcast=True) return F_theano_function, J_theano_function def numpy_compiler(model): """Take a triflow model and return optimized numpy routines. Parameters ---------- model: triflow.Model: Model to compile Returns ------- (numpy function, numpy function): Optimized routine that compute the evolution equations and their jacobian matrix. """ def np_Min(args): a, b = args return np.where(a < b, a, b) def np_Max(args): a, b = args return np.where(a < b, b, a) def np_Heaviside(a): return np.where(a < 0, 1, 1) f_func = lambdify((model._symbolic_args), expr=model.F_array.tolist(), modules=[{"amax": np_Max, "amin": np_Min, "Heaviside": np_Heaviside}, "numpy"]) j_func = lambdify((model._symbolic_args), expr=model._J_sparse_array.tolist(), modules=[{"amax": np_Max, "amin": np_Min, "Heaviside": np_Heaviside}, "numpy"]) compute_F = partial(compute_F_numpy, model, f_func) compute_J = partial(compute_J_numpy, model, j_func) return compute_F, compute_J def init_computation_numpy(model, *input_args): mapargs = {key: input_args[i] for i, key in enumerate([*model._indep_vars, *model._dep_vars, *model._help_funcs, *[*model._pars, "periodic"]])} x = mapargs["x"] N = x.size L = x[-1] - x[0] dx = L / (N - 1) periodic = mapargs["periodic"] middle_point = int((model._window_range - 1) / 2) args = [mapargs[key] for key in [*model._indep_vars, *model._dep_vars, *model._help_funcs, *model._pars]] + [periodic] mapargs['dx'] = dx map_extended = mapargs.copy() for (varname, discretisation_tree) in \ model._symb_vars_with_spatial_diff_order.items(): pad_left, pad_right = model._bounds arg = mapargs[varname] if periodic: extended_var = np.concatenate([arg[pad_left:], arg, arg[:pad_right]]) else: extended_var = np.concatenate([[arg[0]] * middle_point, arg, [arg[-1]] * middle_point]) map_extended[varname] = extended_var for order in range(pad_left, pad_right + 1): if order != 0: var = ("{}_{}{}").format(varname, 'm' if order < 0 else 'p', np.abs(order)) else: var = varname new_var = extended_var[order - pad_left: extended_var.size + order - pad_right] map_extended[var] = new_var return args, map_extended, N, middle_point, periodic def compute_F_numpy(model, f_func, *input_args): args, map_extended, N, middle_point, periodic = \ init_computation_numpy(model, *input_args) F = f_func(*[map_extended[key] for key in model._args]) F = np.concatenate(F, axis=0).reshape((model._nvar, N)).T F = np.stack(F).flatten() return F def compute_J_numpy(model, j_func, *input_args): args, map_extended, N, middle_point, periodic = \ init_computation_numpy(model, *input_args) J = j_func(*[map_extended[key] for key in model._args]) J = np.stack([np.repeat(j, N) if len( np.array(j).shape) == 0 else j for j in J]) J = J.T.squeeze() i = np.arange(N)[:, None] idx = np.arange(N * model._nvar).reshape((N, model._nvar)).T if periodic: extended_idx = np.concatenate([idx[:, -middle_point:], idx, idx[:, :middle_point]], axis=1).T.flatten() else: extended_idx = np.concatenate([np.repeat(idx[:, :1], middle_point, axis=1), idx, np.repeat(idx[:, -1:], middle_point, axis=1)], axis=1).T.flatten() rows = np.tile(np.arange(model._nvar), model._window_range * model._nvar) + i * model._nvar cols = np.repeat(np.arange(model._window_range * model._nvar), model._nvar) + i * model._nvar rows = rows[:, model._sparse_indices].reshape(J.shape) cols = extended_idx[cols][:, model._sparse_indices].reshape(J.shape) rows = rows.flatten() cols = cols.flatten() sparse_J = csc_matrix((J.flatten(), (rows, cols)), shape=(N * model._nvar, N * model._nvar)) return sparse_J PK!˽triflow/core/fields.py#!/usr/bin/env python # coding=utf8 from copy import copy, deepcopy import numpy as np import pandas as pd from xarray import Dataset def reduce_fields(coords, dependent_variables, helper_functions, data): Field = BaseFields.factory(coords, dependent_variables, helper_functions) return Field(**data) class BaseFields(Dataset): """Specialized container which expose the data as a structured numpy array, give access to the dependants variables and the herlpers function as attributes (as a numpy rec array) and is able to give access to a flat view of the dependent variables only (which is needed by the ode solvers for all the linear algebra manipulation). Parameters ---------- **inputs : numpy.array named argument providing x, the dependent variables and the helper functions. All of these are mendatory and a KeyError will be raised if a data is missing. Attributes ---------- array : numpy.array vanilla numpy array containing the data size : int Number of discretisation nodes """ # noqa @staticmethod def factory(coords, dependent_variables, helper_functions): """Fields factory generating specialized container build around a triflow Model and xarray. Parameters ---------- coords: iterable of str: coordinates name. First coordinate have to be shared with all variables dependent_variables : iterable tuple (name, coords) coordinates and name of the dependent variables helper_functions : iterable tuple (name, coords) coordinates and name of the helpers functions Returns ------- triflow.BaseFields Specialized container which expose the data as a structured numpy array """ Field = type('Field', BaseFields.__bases__, dict(BaseFields.__dict__)) Field._coords = coords Field.dependent_variables_info = dependent_variables Field.helper_functions_info = helper_functions Field._var_info = [*list(Field.dependent_variables_info), *list(Field.helper_functions_info)] Field.dependent_variables = [dep[0] for dep in Field.dependent_variables_info] Field.helper_functions = [dep[0] for dep in Field.helper_functions_info] Field._keys, Field._coords_info = zip(*Field._var_info) return Field @staticmethod def factory1D(dependent_variables, helper_functions): """Fields factory generating specialized container build around a triflow Model and xarray. Wrapper for 1D data. Parameters ---------- dependent_variables : iterable for str name of the dependent variables helper_functions : iterable of str name of the helpers functions Returns ------- triflow.BaseFields Specialized container which expose the data as a structured numpy array """ return BaseFields.factory(("x", ), [(name, ("x", )) for name in dependent_variables], [(name, ("x", )) for name in helper_functions],) def __init__(self, **inputs): Dataset.__init__(self, data_vars={key: (coords, inputs[key]) for key, coords in self._var_info}, coords={coord: inputs[coord] for coord in self._coords}) def __reduce__(self): return (reduce_fields, (self._coords, self.dependent_variables_info, self.helper_functions_info, {key: self[key] for key in self.keys()})) def copy(self, deep=True): new_dataset = Dataset.copy(self, deep) new_dataset.__dict__.update({key: (deepcopy(value) if deep else copy(value)) for key, value in self.__dict__.items()}) return new_dataset def __copy__(self, deep=True): return self.copy(deep) @property def size(self): """numpy.ndarray.view: view of the dependent variables of the main numpy array """ # noqa return self["x"].size @property def uarray(self): """numpy.ndarray.view: view of the dependent variables of the main numpy array """ # noqa return self[self.dependent_variables] @property def uflat(self): """return a flatten **copy** of the main numpy array with only the dependant variables. Be carefull, modification of these data will not be reflected on the main array! """ # noqa aligned_arrays = [self[key].values[[(slice(None) if c in coords else None) for c in self._coords]].T for key, coords in self.dependent_variables_info] return np.vstack(aligned_arrays).flatten("F") def keys(self): return [*self._coords, *self._keys] def to_df(self): if len(self.coords) > 1: raise ValueError("CSV files only available for 1D arrays") data = {key: self[key] for key in self._keys} df = pd.DataFrame(dict(**data), index=self[self._coords[0]]) return df def fill(self, uflat): rarray = uflat.reshape((self[self._coords[0]].size, -1)) ptr = 0 for var, coords in self.dependent_variables_info: coords = list(coords) coords.remove(self._coords[0]) next_ptr = ptr + sum([self.coords[key].size for key in coords]) next_ptr = ptr + 1 if next_ptr == ptr else next_ptr self[var][:] = rarray[:, ptr:next_ptr].squeeze() ptr = next_ptr def to_csv(self, path): self.to_df().to_csv(path) def to_clipboard(self): self.to_df().to_clipboard() PK!WVpUpUtriflow/core/model.py#!/usr/bin/env python # coding=utf8 import logging import sys from functools import partial from itertools import product from pickle import dump, load from pprint import pformat import numpy as np from sympy import (Derivative, Function, Max, Min, Symbol, SympifyError, symbols, sympify) from .compilers import numpy_compiler, theano_compiler from .fields import BaseFields from .routines import F_Routine, J_Routine logging.getLogger(__name__).addHandler(logging.NullHandler()) logging = logging.getLogger(__name__) sys.setrecursionlimit(40000) EPS = 1E-6 def _generate_sympify_namespace(independent_variables, dependent_variables, helper_functions): """Generate the link between the symbols of the derivatives and the sympy Derivative operation. Parameters ---------- independent_variable : str name of the independant variable ("x") dependent_variables : iterable of str names of the dependent variables helper_functions : iterable of str names of the helper functions Returns ------- dict dictionnary containing the symbol to parse as keys and the sympy expression to evaluate instead as values. """ # noqa independent_variable = independent_variables[0] # TEMP FIX BEFORE REAL ND symbolic_independent_variable = Symbol(independent_variable) def partial_derivative(symbolic_independent_variable, i, expr): return Derivative(expr, symbolic_independent_variable, i) namespace = {independent_variable: symbolic_independent_variable} namespace.update({'d%s' % (independent_variable * i): partial(partial_derivative, symbolic_independent_variable, i) for i in range(1, 10)}) namespace.update({'d%s%s' % (independent_variable * order, var): Derivative(Function(var)(independent_variable), independent_variable, order) for order, var in product(range(1, 10), dependent_variables + helper_functions)}) logging.debug("sympy namespace: %s" % namespace) return namespace def _reduce_model(eq_diffs, dep_vars, pars, help_functions, bdc_conditions): model = Model(eq_diffs, dep_vars, pars, help_functions, bdc_conditions) return model class Model: """Contain finite difference approximation and routine of the dynamical system Take a mathematical form as input, use Sympy to transform it as a symbolic expression, perform the finite difference approximation and expose theano optimized routine for both the right hand side of the dynamical system and Jacobian matrix approximation. Parameters ---------- differential_equations : iterable of str or str the right hand sides of the partial differential equations written as :math:`\\frac{\partial U}{\partial t} = F(U)`, where the spatial derivative can be written as `dxxU` or `dx(U, 2)` or with the sympy notation `Derivative(U, x, x)` dependent_variables : iterable of str or str the dependent variables with the same order as the temporal derivative of the previous arg. parameters : iterable of str or str, optional, default None list of the parameters. Can be feed with a scalar of an array with the same size help_functions : None, optional All fields which have not to be solved with the time derivative but will be derived in space. double: bool, optional Choose if the dtypes are float64 (the default) or float32 Attributes ---------- F : triflow.F_Routine Callable used to compute the right hand side of the dynamical system F_array : numpy.ndarray of sympy.Expr Symbolic expressions of the right hand side of the dynamical system J : triflow.J_Routine Callable used to compute the Jacobian of the dynamical system J_array : numpy.ndarray of sympy.Expr Symbolic expressions of the Jacobian side of the dynamical system Properties ---------- fields_template: Model specific Fields container used to store and access to the model variables in an efficient way. Methods ------- save: Save a binary of the Model with pre-optimized F and J routines Examples -------- A simple diffusion equation: >>> from triflow import Model >>> model = Model("k * dxxU", "U", "k") A coupled system of convection-diffusion equation: >>> from triflow import Model >>> model = Model(["k1 * dxxU - c1 * dxV", ... "k2 * dxxV - c2 * dxU",], ... ["U", "V"], ["k1", "k2", "c1", "c2"]) """ # noqa def __init__(self, differential_equations, dependent_variables, parameters=None, help_functions=None, bdc_conditions=None, compiler="theano", simplify=False, fdiff_jac=False, double=True, hold_compilation=False): if compiler == "theano": compiler = theano_compiler if compiler == "numpy": compiler = numpy_compiler logging.debug('enter __init__ Model') self._double = double self._symb_t = Symbol("t") indep_vars = ["x"] # coerce the inputs the way to have coherent types def coerce(arg): if arg is None: return tuple() else: if isinstance(arg, (str, )): return tuple([arg]) return tuple(arg) (self._diff_eqs, self._indep_vars, self._dep_vars, self._pars, self._help_funcs, self._bdcs) = map(coerce, (differential_equations, indep_vars, dependent_variables, parameters, help_functions, bdc_conditions)) self._nvar = len(self._dep_vars) # generate the sympy namespace which will connect the math input into # sympy operation. sympify_namespace = {} sympify_namespace.update(_generate_sympify_namespace( self._indep_vars, self._dep_vars, self._help_funcs)) # parse the inputs in order to have Sympy symbols and expressions (self._symb_diff_eqs, self._symb_indep_vars, self._symb_dep_vars, self._symb_pars, self._symb_help_funcs, self._symb_bdcs) = self._sympify_model(self._diff_eqs, self._indep_vars, self._dep_vars, self._pars, self._help_funcs, self._bdcs, sympify_namespace) # we will need to extract the order of the != spatial derivative # in order to know the size of the ghost area at the limit of self._symb_vars_with_spatial_diff_order = {str(svar.func): {(svar.func, 0)} for svar in (self._symb_dep_vars + self._symb_help_funcs)} # Use finite difference scheme to generate a spatial approximation and # have a rhs which will only be a function of the discretized dependent # variables and help functions. approximated_diff_eqs = self._approximate_derivative( self._symb_diff_eqs, self._symb_indep_vars, self._symb_dep_vars, self._symb_help_funcs) self._dbdcs = self._approximate_derivative( self._symb_bdcs, self._symb_indep_vars, self._symb_dep_vars, self._symb_help_funcs) logging.debug("approximated equations: {}".format( approximated_diff_eqs)) # We compute the size of the the ghost area at the limit of # the spatial domain self._bounds = self._extract_bounds( self._dep_vars, self._symb_vars_with_spatial_diff_order) self._window_range = self._bounds[-1] - self._bounds[0] + 1 # We obtain a Fortran like flattened vector containing all the discrete # dependent variable needed for the spatial approximation around the # local node i U = self._extract_unknowns( self._dep_vars, self._bounds, self._symb_vars_with_spatial_diff_order).flatten('F') # We do the same as for the U variable but with all the discrete # variables, dependent variables and help functions. self._discrete_variables = self._extract_unknowns( self._dep_vars + self._help_funcs, self._bounds, self._symb_vars_with_spatial_diff_order).flatten('F') # We expose a numpy.ndarray filled with the rhs of our approximated # dynamical system self.F_array = np.array(approximated_diff_eqs) if simplify: self.F_array = np.array([eq.simplify() for eq in self.F_array.tolist()]) # We compute the jacobian as the partial derivative of all equation of # our system according to all the discrete variable in U. if fdiff_jac: self.J_array = np.array([ [(diff_eq.subs(u, u + EPS) - diff_eq) / EPS for u in U] for diff_eq in approximated_diff_eqs]).flatten('F') else: self.J_array = np.array([ [diff_eq.diff(u) for u in U] for diff_eq in approximated_diff_eqs]).flatten('F') if simplify: self.J_array = np.array([eq.expand().simplify() for eq in self.J_array.tolist()]) # We flag and store the null entry of the Jacobian matrix self._sparse_indices = np.where(self.J_array != 0) # We drop all the null term of the Jacobian matrix, because we target # a sparse matrix storage for memory saving and efficient linalg ops. self._J_sparse_array = self.J_array[self._sparse_indices] if hold_compilation: return # We compile the math with a theano based compiler (default) self.compile(compiler) def compile(self, compiler): F_function, J_function = compiler(self) logging.debug('compile F') self.F = F_Routine(self.F_array, (self._dep_vars + self._help_funcs), self._pars, F_function) logging.debug('compile J') self.J = J_Routine(self._J_sparse_array, (self._dep_vars + self._help_funcs), self._pars, J_function) @property def fields_template(self): return BaseFields.factory1D(self._dep_vars, self._help_funcs) @property def _args(self): return list(map(str, self._symbolic_args)) @property def _symbolic_args(self): return ([*list(self._symb_indep_vars), *list(self._discrete_variables), *list(self._symb_pars), Symbol('dx')]) def save(self, filename): """Save the model as a binary pickle file. Parameters ---------- filename : str name of the file where the model is saved. Returns ------- None """ with open(filename, 'wb') as f: dump(self, f) def __repr__(self): repr = """{equations} Variables --------- unknowns: {vars} helpers: {helps} parameters: {pars}""" repr = repr.format( vars=", ".join(self._dep_vars), helps=", ".join(self._help_funcs) if self._pars else None, equations="\n".join(self._diff_eqs), pars=", ".join(self._pars) if self._pars else None) return repr @staticmethod def load(filename): """load a pre-compiled triflow model. The internal of theano allow a caching of the model. Will be slow if it is the first time the model is loaded on the system. Parameters ---------- filename : str path of the pre-compiled model Returns ------- triflow.core.Model triflow pre-compiled model """ with open(filename, 'rb') as f: return load(f) def _extract_bounds(self, variables, dict_symbol): bounds = (0, 0) for var in variables: dvars, orders = zip(*dict_symbol[var]) bounds = (min(bounds[0], min(orders)), max(bounds[1], max(orders)) ) return bounds def _extract_unknowns(self, vars, bounds, dict_symbol): unknowns = np.zeros((len(vars), bounds[-1] - bounds[0] + 1), dtype=object) for i, var in enumerate(vars): dvars, orders = zip(*dict_symbol[var]) for j, order in enumerate(range(bounds[0], bounds[1] + 1)): if order == 0: unknowns[i, j] = Symbol(var) if order < 0: unknowns[i, j] = Symbol( '{}_m{}'.format(var, np.abs(order))) if order > 0: unknowns[i, j] = Symbol( '{}_p{}'.format(var, np.abs(order))) return unknowns def _finite_diff_scheme(self, U, order): logging.debug("finite diff approximation %i, %s" % (order, U)) dx = Symbol('dx') var_label = str(U) if order == 1: Um1 = Symbol('%s_m1' % var_label) Up1 = Symbol('%s_p1' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) return (1 / 2 * Up1 - 1 / 2 * Um1) / dx if order == 2: Um1 = Symbol('%s_m1' % var_label) Up1 = Symbol('%s_p1' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) return (Up1 - 2 * U + Um1) / dx ** 2 if order == 3: Um1 = Symbol('%s_m1' % var_label) Um2 = Symbol('%s_m2' % var_label) Up1 = Symbol('%s_p1' % var_label) Up2 = Symbol('%s_p2' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) self._symb_vars_with_spatial_diff_order[var_label].add((Um2, -2)) self._symb_vars_with_spatial_diff_order[var_label].add((Up2, 2)) return (-1 / 2 * Um2 + Um1 - Up1 + 1 / 2 * Up2) / dx ** 3 if order == 4: Um1 = Symbol('%s_m1' % var_label) Um2 = Symbol('%s_m2' % var_label) Up1 = Symbol('%s_p1' % var_label) Up2 = Symbol('%s_p2' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) self._symb_vars_with_spatial_diff_order[var_label].add((Um2, -2)) self._symb_vars_with_spatial_diff_order[var_label].add((Up2, 2)) return (Um2 - 4 * Um1 + 6 * U - 4 * Up1 + Up2) / dx ** 4 raise NotImplementedError('Finite difference up ' 'to 5th order not implemented yet') def _upwind_scheme(self, a, U, accuracy): dx = Symbol('dx') var_label = str(U) ap = Max(a, 0) am = Min(a, 0) if accuracy == 1: Um1 = Symbol('%s_m1' % var_label) Up1 = Symbol('%s_p1' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) Um = (U - Um1) / dx Up = (Up1 - U) / dx return ap * Um + am * Up elif accuracy == 2: Um1 = Symbol('%s_m1' % var_label) Up1 = Symbol('%s_p1' % var_label) Um2 = Symbol('%s_m2' % var_label) Up2 = Symbol('%s_p2' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) self._symb_vars_with_spatial_diff_order[var_label].add((Um2, -2)) self._symb_vars_with_spatial_diff_order[var_label].add((Up2, 2)) Um = (3 * U - 4 * Um1 + Um2) / (2 * dx) Up = (-3 * U + 4 * Up1 - Up2) / (2 * dx) return ap * Um + am * Up elif accuracy == 3: Um1 = Symbol('%s_m1' % var_label) Up1 = Symbol('%s_p1' % var_label) Um2 = Symbol('%s_m2' % var_label) Up2 = Symbol('%s_p2' % var_label) self._symb_vars_with_spatial_diff_order[var_label].add((Um1, -1)) self._symb_vars_with_spatial_diff_order[var_label].add((Up1, 1)) self._symb_vars_with_spatial_diff_order[var_label].add((Um2, -2)) self._symb_vars_with_spatial_diff_order[var_label].add((Up2, 2)) Um = (2 * Up1 + 3 * U - 6 * Um1 + Um2) / (6 * dx) Up = (-2 * Um1 - 3 * U + 6 * Up1 - Up2) / (6 * dx) return ap * Um + am * Up raise NotImplementedError('Upwind up ' 'to 2nd order not implemented yet') def _sympify_model(self, diff_eqs, indep_vars, dep_vars, pars, help_functions, bdc_conditions, sympify_namespace): logging.debug('enter _sympify_model') logging.debug(pformat(diff_eqs)) logging.debug(pformat(dep_vars)) logging.debug(pformat(pars)) logging.debug(pformat(help_functions)) symbolic_indep_vars = tuple([(Symbol(indep_var)) for indep_var in indep_vars]) symbolic_dep_vars = tuple([Function(dep_var)(*symbolic_indep_vars) for dep_var in dep_vars]) symbolic_help_functions = tuple([Function(help_function) (*symbolic_indep_vars) for help_function in help_functions]) symbolic_pars = symbols(pars) def sympify_equations(equations): try: return tuple( [sympify(func, locals=sympify_namespace) .subs(zip(map(Symbol, dep_vars), (symbolic_dep_vars + symbolic_help_functions))) .doit() for func in equations]) except (TypeError, SympifyError): raise ValueError("badly formated differential equations") symbolic_diff_eqs, symbolic_bdcs = map(sympify_equations, (diff_eqs, bdc_conditions)) return (symbolic_diff_eqs, symbolic_indep_vars, symbolic_dep_vars, symbolic_pars, symbolic_help_functions, symbolic_bdcs) def _approximate_derivative(self, symbolic_diff_eqs: tuple, symbolic_indep_vars: tuple, symbolic_dep_vars: tuple, symbolic_fields: tuple) -> tuple: logging.debug('enter _approximate_derivative') approximated_diff_eqs = [] for func in symbolic_diff_eqs: afunc = func for derivative in func.find(Derivative): var = Symbol(str(derivative.args[0].func)) logging.debug("{}, {}".format(derivative, var)) order = len(derivative.args) - 1 afunc = afunc.replace( derivative, self._finite_diff_scheme(var, order)) afunc = afunc.subs([(var, Symbol(str(var.func))) for var in symbolic_dep_vars + symbolic_fields]) afunc = afunc.replace(Function("upwind"), self._upwind_scheme) approximated_diff_eqs.append(afunc.expand()) return tuple(approximated_diff_eqs) def __reduce__(self): return (_reduce_model, (self._diff_eqs, self._dep_vars, self._pars, self._help_funcs, self._bdcs)) PK![4 4 triflow/core/routines.py#!/usr/bin/env python # coding=utf8 import numpy as np import sympy as sp class ModelRoutine: def __init__(self, matrix, args, pars, ufunc, reduced=False): self.pars = list(pars) + ['periodic'] self.matrix = matrix self.args = args self._ufunc = ufunc def __repr__(self): return sp.Matrix(self.matrix.tolist()).__repr__() class F_Routine(ModelRoutine): """Compute the right hand side of the dynamical system :math:`\\frac{\\partial U}{\\partial t} = F(U)` Parameters ---------- fields : triflow.Fields triflow fields container generated by a triflow.Model containing the actual state of the dependent variables and helper functions. pars : dict dictionnary with the different physical parameters of the model and the 'periodic' key. Returns ------- numpy.ndarray flat array containing the right hand side of the dynamical system. """ # noqa def __call__(self, fields, pars): uargs = [fields['x'].values, *[fields[key].values for key in self.args]] pargs = [pars[key] + fields["x"].values * 0 if key != 'periodic' else pars[key] for key in self.pars] F = self._ufunc(*uargs, *pargs) return F def diff_approx(self, fields, pars, eps=1E-8): fpars = {key: pars[key] for key in self.pars} fpars['dx'] = (fields['x'][-1] - fields['x'][0]) / fields['x'].size U = fields.uflat J = np.zeros((U.size, U.size)) F = self(fields, pars) for i, u in enumerate(U): fields_plus = fields.copy() Up = fields_plus.uflat Up[i] += eps fields_plus.fill(Up) Fplus = self(fields_plus, pars) J[i] = (Fplus - F) / (eps) return J.T class J_Routine(ModelRoutine): """Compute the right hand side of the dynamical system :math:`\\frac{\\partial U}{\\partial t} = F(U)` Parameters ---------- fields : triflow.Fields triflow fields container generated by a triflow.Model containing the actual state of the dependent variables and helper functions. pars : dict dictionnary with the different physical parameters of the model and the 'periodic' key. sparse : bool, optional, default True whether should the matrix returned as dense or sparse form. Returns ------- scipy.sparse.CSC or numpy.ndarray: sparse or dense form (depending of the `sparse` argument) of the Jacobian approximation of the dynamical system right hand side. """ # noqa def __call__(self, fields, pars, sparse=True): uargs = [fields['x'].values, *[fields[key].values for key in self.args]] pargs = [(pars[key] + fields['x'] * 0).values if key != 'periodic' else pars[key] for key in self.pars] J = self._ufunc(*uargs, *pargs) return J if sparse else J.todense() PK!F5n[UUtriflow/core/schemes.py#!/usr/bin/env python # coding=utf8 """This module regroups all the implemented temporal schemes. They are written as callable class which take the model and some control arguments at the init, and perform a computation step every time they are called. The following solvers are implemented: * Backward and Forward Euler, Crank-Nicolson method (with the Theta class) * Some Rosenbrock Wanner schemes (up to the 6th order) with time controler * All the scipy.integrate.ode integrators with the scipy_ode class. """ import logging from functools import wraps import numpy as np import scipy.sparse as sps from scipy.integrate import ode from scipy.interpolate import interp1d from scipy.linalg import norm from toolz import memoize logging.getLogger(__name__).addHandler(logging.NullHandler()) logging = logging.getLogger(__name__) def null_hook(t, fields, pars): return fields, pars def time_stepping(scheme, tol=1E-1, ord=2, m=10, reject_factor=2): internal_dt = None def one_step(t, fields, dt, pars, hook): dt_ = dt while True: t_, fields_ = scheme(t, fields, m * dt_, pars, hook) for i in range(10): t, fields = scheme(t, fields, dt_, pars, hook) errs = [np.linalg.norm(fields_[key] - fields[key], ord) / (m**2 - 1) for key in fields.dependent_variables] err = max(errs) dt_ = np.sqrt(dt ** 2 * tol / err) if dt_ < dt / reject_factor: continue break return t, fields, dt_ @wraps(scheme) def adaptatif_scheme(t, fields, dt, pars, hook=null_hook): nonlocal internal_dt next_step = t + dt internal_dt = internal_dt if internal_dt else dt while t + internal_dt <= next_step: t, fields, internal_dt = one_step(t, fields, internal_dt / m, pars, hook) if t < next_step: t, fields = scheme(t, fields, next_step - t, pars, hook) return t, fields return adaptatif_scheme class ROW_general: """Rosenbrock Wanner class of temporal solvers The implementation and the different parameters can be found in http://www.digibib.tu-bs.de/?docid=00055262 """ @memoize def __cache__(self, N): Id = sps.eye(N, format='csc') return Id def __init__(self, model, alpha, gamma, b, b_pred=None, time_stepping=False, tol=None, max_iter=None, dt_min=None, safety_factor=0.9, recompute_target=True): self._internal_dt = None self._model = model self._alpha = alpha self._gamma = gamma self._b = b self._b_pred = b_pred self._s = len(b) self._time_control = time_stepping self._internal_iter = None self._tol = tol self._safety_factor = safety_factor self._max_iter = max_iter self._dt_min = dt_min self._recompute_target = recompute_target self._interp_cache = None def __call__(self, t, fields, dt, pars, hook=null_hook): """Perform a step of the solver: took a time and a system state as a triflow Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : triflow.Fields actual system state in a triflow Fields dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... container Returns ------- tuple : t, fields updated time and fields container Raises ------ NotImplementedError raised if a time stepping is requested but the scheme do not provide the b predictor coefficients. ValueError raised if time_stepping is True and tol is not provided. """ # noqa if self._time_control: return self._variable_step(t, fields, dt, pars, hook=hook) t, fields, _ = self._fixed_step(t, fields, dt, pars, hook=hook) fields, pars = hook(t, fields, pars) return t, fields def _fixed_step(self, t, fields, dt, pars, hook=null_hook): fields = fields.copy() fields, pars = hook(t, fields, pars) J = self._model.J(fields, pars) Id = self.__cache__(fields.uflat.size) self._A = A = Id - self._gamma[0, 0] * dt * J luf = sps.linalg.factorized(A) ks = [] fields_i = fields.copy() for i in np.arange(self._s): fields_i.fill(fields.uflat + sum([self._alpha[i, j] * ks[j] for j in range(i)])) F = self._model.F(fields_i, pars) ks.append(luf(dt * F + dt * (J @ sum([self._gamma[i, j] * ks[j] for j in range(i)]) if i > 0 else 0) ) ) U = fields.uflat.copy() U = U + sum([bi * ki for bi, ki in zip(self._b, ks)]) U_pred = (U + sum([bi * ki for bi, ki in zip(self._b_pred, ks)]) if self._b_pred is not None else None) fields.fill(U) return t + dt, fields, (norm(U - U_pred, np.inf) if U_pred is not None else None) def _variable_step(self, t, fields, dt, pars, hook=null_hook): self._next_time_step = t + dt self._max_dt = t self._internal_iter = 0 try: fields.fill(self._interp_cache(self._next_time_step)) return self._next_time_step, fields except (TypeError, ValueError): pass if not self._recompute_target: dt = self._internal_dt = (1E-6 if self._internal_dt is None else self._internal_dt) else: dt = self._internal_dt = min((1E-6 if self._internal_dt is None else self._internal_dt), dt) while True: self._err = None while (self._err is None or self._err > self._tol): new_t, new_fields, self._err = self._fixed_step(t, fields, dt, pars, hook) logging.debug("error: {}".format(self._err)) dt = self._internal_dt = (self._safety_factor * dt * np.sqrt(self._tol / self._err)) logging.debug('dt computed after err below tol: %g' % dt) logging.debug('ROS_vart, t %g' % t) if new_t >= self._next_time_step: target_dt = self._next_time_step - t logging.debug('new t more than next expected time step, ' 'compute with proper timestep %g' % target_dt) if self._recompute_target: t, fields, self._err = self._fixed_step( t, fields, self._next_time_step - t, pars, hook) else: self._interp_cache = interp1d([t, new_t], [fields.uflat[None], new_fields.uflat[None]], axis=0) fields.fill(self._interp_cache(self._next_time_step)) self._internal_iter += 1 fields, pars = hook(t, fields, pars) return self._next_time_step, fields t = new_t fields = new_fields.copy() self._internal_iter += 1 if self._internal_iter > (self._max_iter if self._max_iter else self._internal_iter + 1): raise RuntimeError("Rosebrock internal iteration " "above max iterations authorized") if dt < (self._dt_min if self._dt_min else dt * .5): raise RuntimeError("Rosebrock internal time step " "less than authorized") class ROS2(ROW_general): """Second order Rosenbrock scheme, without time stepping Parameters ---------- model : triflow.Model triflow Model """ def __init__(self, model): gamma = np.array([[2.928932188134E-1, 0], [-5.857864376269E-1, 2.928932188134E-1]]) alpha = np.array([[0, 0], [1, 0]]) b = np.array([1 / 2, 1 / 2]) super().__init__(model, alpha, gamma, b, time_stepping=False) class ROS3PRw(ROW_general): """Third order Rosenbrock scheme, with time stepping Parameters ---------- model : triflow.Model triflow Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa def __init__(self, model, tol=1E-1, time_stepping=True, max_iter=None, dt_min=None, recompute_target=True): alpha = np.zeros((3, 3)) gamma = np.zeros((3, 3)) gamma_i = 7.8867513459481287e-01 b = [5.0544867840851759e-01, -1.1571687603637559e-01, 6.1026819762785800e-01] b_pred = [2.8973180237214197e-01, 1.0000000000000001e-01, 6.1026819762785800e-01] alpha[1, 0] = 2.3660254037844388e+00 alpha[2, 0] = 5.0000000000000000e-01 alpha[2, 1] = 7.6794919243112270e-01 gamma[0, 0] = gamma[1, 1] = gamma[2, 2] = gamma_i gamma[1, 0] = -2.3660254037844388e+00 gamma[2, 0] = -8.6791218280355165e-01 gamma[2, 1] = -8.7306695894642317e-01 super().__init__(model, alpha, gamma, b, b_pred=b_pred, time_stepping=time_stepping, tol=tol, max_iter=max_iter, dt_min=dt_min, recompute_target=recompute_target) class ROS3PRL(ROW_general): """4th order Rosenbrock scheme, with time stepping Parameters ---------- model : triflow.Model triflow Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa def __init__(self, model, tol=1E-1, time_stepping=True, max_iter=None, dt_min=None, recompute_target=True): alpha = np.zeros((4, 4)) gamma = np.zeros((4, 4)) gamma_i = 4.3586652150845900e-01 b = [2.1103008548132443e-03, 8.8607515441580453e-01, -3.2405197677907682e-01, 4.3586652150845900e-01] b_pred = [5.0000000000000000e-01, 3.8752422953298199e-01, -2.0949226315045236e-01, 3.2196803361747034e-01] alpha[1, 0] = .5 alpha[2, 0] = .5 alpha[2, 1] = .5 alpha[3, 0] = .5 alpha[3, 1] = .5 alpha[3, 2] = 0 for i in range(len(b)): gamma[i, i] = gamma_i gamma[1, 0] = -5.0000000000000000e-01 gamma[2, 0] = -7.9156480420464204e-01 gamma[2, 1] = 3.5244216792751432e-01 gamma[3, 0] = -4.9788969914518677e-01 gamma[3, 1] = 3.8607515441580453e-01 gamma[3, 2] = -3.2405197677907682e-01 super().__init__(model, alpha, gamma, b, b_pred=b_pred, time_stepping=time_stepping, tol=tol, max_iter=max_iter, dt_min=dt_min, recompute_target=recompute_target) class RODASPR(ROW_general): """6th order Rosenbrock scheme, with time stepping Parameters ---------- model : triflow.Model triflow Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa def __init__(self, model, tol=1E-1, time_stepping=True, max_iter=None, dt_min=None, recompute_target=True): alpha = np.zeros((6, 6)) gamma = np.zeros((6, 6)) b = [-7.9683251690137014E-1, 6.2136401428192344E-2, 1.1198553514719862E00, 4.7198362114404874e-1, -1.0714285714285714E-1, 2.5e-1] b_pred = [-7.3844531665375115e0, -3.0593419030174646e-1, 7.8622074209377981e0, 5.7817993590145966e-1, 2.5e-1, 0] alpha[1, 0] = 7.5E-1 alpha[2, 0] = 7.5162877593868457E-2 alpha[2, 1] = 2.4837122406131545E-2 alpha[3, 0] = 1.6532708886396510e0 alpha[3, 1] = 2.1545706385445562e-1 alpha[3, 2] = -1.3157488872766792e0 alpha[4, 0] = 1.9385003738039885e1 alpha[4, 1] = 1.2007117225835324e0 alpha[4, 2] = -1.9337924059522791e1 alpha[4, 3] = -2.4779140110062559e-1 alpha[5, 0] = -7.3844531665375115e0 alpha[5, 1] = -3.0593419030174646e-1 alpha[5, 2] = 7.8622074209377981e0 alpha[5, 3] = 5.7817993590145966e-1 alpha[5, 4] = 2.5e-1 gamma_i = .25 for i in range(len(b)): gamma[i, i] = gamma_i gamma[1, 0] = -7.5e-1 gamma[2, 0] = -8.8644e-2 gamma[2, 1] = -2.868897e-2 gamma[3, 0] = -4.84700e0 gamma[3, 1] = -3.1583e-1 gamma[3, 2] = 4.9536568e0 gamma[4, 0] = -2.67694569e1 gamma[4, 1] = -1.5066459e0 gamma[4, 2] = 2.720013e1 gamma[4, 3] = 8.25971337e-1 gamma[5, 0] = 6.58762e0 gamma[5, 1] = 3.6807059e-1 gamma[5, 2] = -6.74235e0 gamma[5, 3] = -1.061963e-1 gamma[5, 4] = -3.57142857e-1 super().__init__(model, alpha, gamma, b, b_pred=b_pred, time_stepping=time_stepping, tol=tol, max_iter=max_iter, dt_min=dt_min, recompute_target=recompute_target) class scipy_ode: """Proxy written around the scipy.integrate.ode class. Give access to all the scpy integrators. Parameters ---------- model : triflow.Model triflow Model integrator : str, optional, default 'vode' name of the chosen scipy integration scheme. **integrator_kwargs extra arguments provided to the scipy integration scheme. """ # noqa def __init__(self, model, jac=False, integrator='vode', **integrator_kwargs): def func_scipy_proxy(t, U, fields, pars, hook): fields.fill(U) fields, pars = hook(t, fields, pars) return model.F(fields, pars) def jacob_scipy_proxy(t, U, fields, pars, hook): fields.fill(U) fields, pars = hook(t, fields, pars) return model.J(fields, pars, sparse=False) self._solv = ode(func_scipy_proxy, jac=jacob_scipy_proxy if jac else None) self._solv.set_integrator(integrator, **integrator_kwargs) def __call__(self, t, fields, dt, pars, hook=null_hook): """Perform a step of the solver: took a time and a system state as a triflow Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : triflow.Fields actual system state in a triflow Fields dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... container Returns ------- tuple : t, fields updated time and fields container Raises ------ RuntimeError Description """ # noqa solv = self._solv fields, pars = hook(t, fields, pars) solv.set_initial_value(fields.uflat, t) solv.set_f_params(fields, pars, hook) solv.set_jac_params(fields, pars, hook) U = solv.integrate(t + dt) fields.fill(U) fields, _ = hook(t + dt, fields, pars) return t + dt, fields class Theta: """Simple theta-based scheme where theta is a weight if theta = 0, the scheme is a forward-euler scheme if theta = 1, the scheme is a backward-euler scheme if theta = 0.5, the scheme is called a Crank-Nicolson scheme Parameters ---------- model : triflow.Model triflow Model theta : int, optional, default 1 weight of the theta-scheme solver : callable, optional, default scipy.sparse.linalg.spsolve method able to solve a Ax = b linear equation with A a sparse matrix. Take A and b as argument and return x. """ # noqa def __init__(self, model, theta=1, solver=sps.linalg.spsolve): self._model = model self._theta = theta self._solver = solver def __call__(self, t, fields, dt, pars, hook=null_hook): """Perform a step of the solver: took a time and a system state as a triflow Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : triflow.Fields actual system state in a triflow Fields container dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... Returns ------- tuple : t, fields updated time and fields container """ # noqa fields = fields.copy() fields, pars = hook(t, fields, pars) F = self._model.F(fields, pars) J = self._model.J(fields, pars) U = fields.uflat B = dt * (F - self._theta * J @ U) + U J = (sps.identity(U.size, format='csc') - self._theta * dt * J) fields.fill(self._solver(J, B)) fields, _ = hook(t + dt, fields, pars) return t + dt, fields PK!rjKY=Y=triflow/core/simulation.py#!/usr/bin/env python # coding=utf8 import inspect import logging import pprint import time import warnings from collections import namedtuple from uuid import uuid1 import pendulum import streamz import tqdm from numpy import isclose from . import schemes from ..plugins.container import TriflowContainer logging.getLogger(__name__).addHandler(logging.NullHandler()) logging = logging.getLogger(__name__) def is_interactive(): import __main__ as main return not hasattr(main, '__file__') tqdm = tqdm.tqdm_notebook if is_interactive() else tqdm.tqdm class Timer: def __init__(self, last, total): self.last = last self.total = total def __repr__(self): repr = """last: {last} total: {total}""" return repr.format(last=(pendulum.now() .subtract( seconds=self.last) .diff()), total=(pendulum.now() .subtract( seconds=self.total) .diff())) def null_hook(t, fields, pars): return fields, pars PostProcess = namedtuple( "PostProcess", ["name", "function", "description"]) class Simulation(object): """High level container used to run simulation build on triflow Model. This object is an iterable which will yield every time step until the parameters 'tmax' is reached if provided. By default, the solver use a 6th order ROW solver, an implicit method with integrated time-stepping. Parameters ---------- model : triflow.Model Contain finite difference approximation and routine of the dynamical system fields : triflow.BaseFields or dict (any mappable) triflow container or mappable filled with initial conditions parameters : dict physical parameters of the simulation dt : float time stepping for output. if time_stepping is False, the internal time stepping will be the same. t : float, optional, default 0. initial time tmax : float, optional, default None Control the end of the simulation. If None (the default), the com- putation will continue until interrupted by the user (using Ctrl-C or a SIGTERM signal). id : None, optional Name of the simulation. A 2 word slug will be generated if not provided. hook : callable, optional, default null_hook. Any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... The default null_hook has no impact on the computation. scheme : callable, optional, default triflow.schemes.RODASPR An callable object which take the simulation state and return the next step. Its signature is scheme.__call__(fields, t, dt, pars, hook) and it should return the next time and the updated fields. It take the model and extra positional and named arguments. time_stepping : boolean, default True Indicate if the time step is controlled by an algorithm dependant of the temporal scheme (see the doc on time stepping for extra info). **kwargs extra arguments passed to the scheme. Attributes ---------- dt : float output time step fields : triflow.Fields triflow container filled with actual data i : int actual iteration id : str name of the simulation model : triflow.Model triflow Model used in the simulation parameters : dict physical parameters of the simulation status : str status of the simulation, one of the following one: ('created', 'running', 'finished', 'failed') t : float actual time tmax : float or None, default None stopping time of the simulation. Not stopping if set to None. Properties ---------- post_processes: list of triflow.core.simulation.PostProcess contain all the post processing function attached to the simulation. container: triflow.TriflowContainer give access to the attached container, if any. timer: triflow.core.simulation.Timer return the cpu time of the previous step and the total running time of the simulation. stream: streamz.Stream Streamz starting point, fed by the simulation state after each time_step. This interface is used for post-processing, saving the data on disk by the TriflowContainer and display the fields in real-time. Examples -------- >>> import numpy as np >>> import triflow >>> model = triflow.Model(["k1 * dxxU", ... "k2 * dxxV"], ... ["U", "V"], ... ["k1", "k2"]) >>> x = np.linspace(0, 100, 1000, endpoint=False) >>> U = np.cos(x * 2 * np.pi / 100) >>> V = np.sin(x * 2 * np.pi / 100) >>> fields = model.fields_template(x=x, U=U, V=V) >>> pars = {'k1': 1, 'k2': 1, 'periodic': True} >>> simulation = triflow.Simulation(model, fields, pars, dt=5., tmax=50.) >>> for t, fields in simulation: ... pass >>> print(t) 50.0 """ # noqa def __init__(self, model, fields, parameters, dt, t=0, tmax=None, id=None, hook=null_hook, scheme=schemes.RODASPR, time_stepping=True, **kwargs): def intersection_kwargs(kwargs, function): """Inspect the function signature to identify the relevant keys in a dictionary of named parameters. """ func_signature = inspect.signature(function) func_parameters = func_signature.parameters kwargs = {key: value for key, value in kwargs.items() if key in func_parameters} return kwargs kwargs["time_stepping"] = time_stepping self.id = str(uuid1())[:6] if not id else id self.model = model self.parameters = parameters self.fields = model.fields_template(**fields) self.t = t self.user_dt = self.dt = dt self.tmax = tmax self.i = 0 self._stream = streamz.Stream() self._pprocesses = [] self._scheme = scheme(model, **intersection_kwargs(kwargs, scheme.__init__)) if (time_stepping and self._scheme not in [schemes.RODASPR, schemes.ROS3PRL, schemes.ROS3PRw]): self._scheme = schemes.time_stepping( self._scheme, **intersection_kwargs(kwargs, schemes.time_stepping)) self.status = 'created' self._total_running = 0 self._last_running = 0 self._created_timestamp = pendulum.now() self._started_timestamp = None self._last_timestamp = None self._actual_timestamp = pendulum.now() self._hook = hook self._container = None self._iterator = self.compute() def _compute_one_step(self, t, fields, pars): """ Compute one step of the simulation, then update the timers. """ fields, pars = self._hook(t, fields, pars) self.dt = (self.tmax - t if self.tmax and (t + self.dt >= self.tmax) else self.dt) before_compute = time.clock() t, fields = self._scheme(t, fields, self.dt, pars, hook=self._hook) after_compute = time.clock() self._last_running = after_compute - before_compute self._total_running += self._last_running self._last_timestamp = self._actual_timestamp self._actual_timestamp = pendulum.now() return t, fields, pars def compute(self): """Generator which yield the actual state of the system every dt. Yields ------ tuple : t, fields Actual time and updated fields container. """ fields = self.fields t = self.t pars = self.parameters self._started_timestamp = pendulum.now() self.stream.emit(self) try: while True: t, fields, pars = self._compute_one_step(t, fields, pars) self.i += 1 self.t = t self.fields = fields self.parameters = pars for pprocess in self.post_processes: pprocess.function(self) self.stream.emit(self) yield self.t, self.fields if self.tmax and (isclose(self.t, self.tmax)): self._end_simulation() return except RuntimeError: self.status = 'failed' raise def _end_simulation(self): if self.container: self.container.flush() self.container.merge() def run(self, progress=True, verbose=False): """Compute all steps of the simulation. Be careful: if tmax is not set, this function will result in an infinit loop. Returns ------- (t, fields): last time and result fields. """ total_iter = int((self.tmax // self.user_dt) if self.tmax else None) log = logging.info if verbose else logging.debug if progress: with tqdm(initial=(self.i if self.i < total_iter else total_iter), total=total_iter) as pbar: for t, fields in self: pbar.update(1) log("%s running: t: %g" % (self.id, t)) try: return t, fields except UnboundLocalError: warnings.warn("Simulation already ended") for t, fields in self: log("%s running: t: %g" % (self.id, t)) try: return t, fields except UnboundLocalError: warnings.warn("Simulation already ended") def __repr__(self): repr = """{simulation_name:=^30} created: {created_date} started: {started_date} last: {last_date} time: {t:g} iteration: {iter:g} last step: {step_time} total time: {running_time} Physical parameters ------------------- {parameters} Hook function ------------- {hook_source} =========== Model =========== {model_repr}""" repr = repr.format(simulation_name=" %s " % self.id, parameters="\n\t".join( [("%s:" % key).ljust(12) + pprint.pformat(value) for key, value in self.parameters.items()]), t=self.t, iter=self.i, model_repr=self.model, hook_source=inspect.getsource(self._hook), step_time=(None if not self._last_running else pendulum.now() .subtract( seconds=self._last_running) .diff()), running_time=(pendulum.now() .subtract( seconds=self._total_running) .diff()), created_date=(self._created_timestamp .to_cookie_string()), started_date=(self._started_timestamp .to_cookie_string() if self._started_timestamp else "None"), last_date=(self._last_timestamp .to_cookie_string() if self._last_timestamp else "None")) return repr def attach_container(self, path=None, save="all", mode="w", nbuffer=50, force=False): """add a Container to the simulation which allows some persistance to the simulation. Parameters ---------- path : str or None (default: None) path for the container. If None (the default), the data lives only in memory (and are available with `simulation.container`) mode : str, optional "a" or "w" (default "w") save : str, optional "all" will save every time-step, "last" will only get the last time step nbuffer : int, optional wait until nbuffer data in the Queue before save on disk. timeout : int, optional wait until timeout since last flush before save on disk. force : bool, optional (default False) if True, remove the target folder if not empty. if False, raise an error. """ self._container = TriflowContainer("%s/%s" % (path, self.id) if path else None, save=save, mode=mode, metadata=self.parameters, force=force, nbuffer=nbuffer) self._container.connect(self.stream) return self._container @property def post_processes(self): return self._pprocesses @property def stream(self): return self._stream @property def container(self): return self._container @property def timer(self): return Timer(self._last_running, self._total_running) def add_post_process(self, name, post_process, description=""): """add a post-process Parameters ---------- name : str name of the post-traitment post_process : callback (function of a class with a __call__ method or a streamz.Stream). this callback have to accept the simulation state as parameter and return the modifield simulation state. if a streamz.Stream is provided, it will me plugged_in with the previous streamz (and ultimately to the initial_stream). All these stream accept and return the simulation state. description : str, optional, Default is "". give extra information about the post-processing """ self._pprocesses.append(PostProcess(name=name, function=post_process, description=description)) self._pprocesses[-1].function(self) def remove_post_process(self, name): """remove a post-process Parameters ---------- name : str name of the post-process to remove. """ self._pprocesses = [post_process for post_process in self._pprocesses if post_process.name != name] def __iter__(self): return self.compute() def __next__(self): return next(self._iterator) PK!O $$triflow/plugins/__init__.py#!/usr/bin/env python # coding=utf8 PK!Br1triflow/plugins/container.py#!/usr/bin/env python # coding=utf8 import json import logging import warnings from collections import deque, namedtuple from uuid import uuid1 import yaml from path import Path from streamz import collect from xarray import concat, open_dataset, open_mfdataset log = logging.getLogger(__name__) log.handlers = [] log.addHandler(logging.NullHandler()) FieldsData = namedtuple("FieldsData", ["data", "metadata"]) class AttrDict(dict): def __init__(self, *args, **kwargs): super(AttrDict, self).__init__(*args, **kwargs) self.__dict__ = self def coerce_attr(key, value): value_type = type(value) if value_type in [int, float, str]: return value for cast in (int, float, str): try: value = cast(value) log.debug("Illegal netCDF type ({}) of attribute for {}, " "casted to {}".format(value_type, key, cast)) return value except TypeError: pass raise TypeError("Illegal netCDF type ({}) of attribute for {}, " "auto-casting failed, tried to cast to " "int, float and str") class TriflowContainer: def __init__(self, path=None, mode='a', *, save="all", metadata={}, force=False, nbuffer=50): self._nbuffer = nbuffer self._mode = mode self._metadata = metadata self.save = save self._cached_data = deque([], self._n_save) self._collector = None self.path = path = Path(path).abspath() if path else None if not path: return if self._mode == "w" and force: path.rmtree_p() if self._mode == "w" and not force and path.exists(): raise FileExistsError( "Directory %s exists, set force=True to override it" % path) if self._mode == "r" and not path.exists(): raise FileNotFoundError("Container not found.") path.makedirs_p() with open(self.path / 'metadata.yml', 'w') as yaml_file: yaml.dump(self._metadata, yaml_file, default_flow_style=False) @property def save(self): return "last" if self._n_save else "all" @save.setter def save(self, value): if value == "all": self._n_save = None elif value == "last" or value == -1: self._n_save = 1 else: raise ValueError('save argument accept only "all", "last" or -1 ' 'as value, not %s' % value) def _expand_fields(self, t, fields): fields = fields.assign_coords(t=t).expand_dims("t") for key, value in self._metadata.items(): fields.attrs[key] = coerce_attr(key, value) self._cached_data.append(fields) return fields def _concat_fields(self, fields): if fields: return concat(fields, dim="t") def connect(self, stream): def get_t_fields(simul): return simul.t, simul.fields def expand_fields(inps): return self._expand_fields(*inps) def get_last(list_fields): return list_fields[-1] accumulation_stream = (stream .map(get_t_fields) .map(expand_fields)) self._collector = collect(accumulation_stream) if self.save == "all": self._collector.map(self._concat_fields).sink(self._write) else: self._collector.map(get_last).sink(self._write) (accumulation_stream .partition(self._nbuffer) .sink(self._collector.flush)) return self._collector def flush(self): if self._collector: self._collector.flush() def _write(self, concatenated_fields): if concatenated_fields is not None and self.path: target_file = self.path / "data_%i.nc" % uuid1() concatenated_fields.to_netcdf(target_file) self._cached_data = deque([], self._n_save) if self.save == "last": [file.remove() for file in self.path.glob("data_*.nc") if file != target_file] def __repr__(self): repr = """path: {path} {data}""".format(path=self.path, data=self.data) return repr def __del__(self): self.flush() @property def data(self): try: if self.path: return open_mfdataset(self.path / "data*.nc") return self._concat_fields(self._cached_data) except OSError: return @property def metadata(self): try: if self.path: with open(self.path / 'metadata.yml', 'r') as yaml_file: return yaml.load(yaml_file) return self._metadata except OSError: return @metadata.setter def metadata(self, parameters): if self._mode == "r": return for key, value in parameters.items(): self._metadata[key] = value if self.path: with open(self.path / 'info.yml', 'w') as yaml_file: yaml.dump(self._metadata, yaml_file, default_flow_style=False) @staticmethod def retrieve(path, isel='all', lazy=True): path = Path(path) try: data = open_dataset(path / "data.nc") lazy = True except FileNotFoundError: data = open_mfdataset(path / "data*.nc", concat_dim="t").sortby("t") try: with open(Path(path) / 'metadata.yml', 'r') as yaml_file: metadata = yaml.load(yaml_file) except FileNotFoundError: # Ensure retro-compatibility with older version with open(path.glob("Treant.*.json")[0]) as f: metadata = json.load(f)["categories"] if isel == 'last': data = data.isel(t=-1) elif isel == 'all': pass elif isinstance(isel, dict): data = data.isel(**isel) else: data = data.isel(t=isel) if not lazy: return FieldsData(data=data.load(), metadata=AttrDict(**metadata)) return FieldsData(data=data, metadata=AttrDict(**metadata)) @staticmethod def get_last(path): warnings.warn( "get_last method is deprecied and will be removed in a " "future version. Please use retrieve(path, 'last') instead.", DeprecationWarning ) return TriflowContainer.retrieve(path, isel=[-1], lazy=False) @staticmethod def get_all(path): warnings.warn( "get_last method is deprecied and will be removed in a " "future version. Please use retrieve(path) instead.", DeprecationWarning ) return TriflowContainer.retrieve(path, isel="all", lazy=False) def merge(self, override=True): if self.path: return TriflowContainer.merge_datafiles(self.path, override=override) @staticmethod def merge_datafiles(path, override=False): path = Path(path) if (path / "data.nc").exists() and not override: raise FileExistsError(path / "data.nc") (path / "data.nc").remove_p() split_data = open_mfdataset(path / "data*.nc", concat_dim="t").sortby("t") split_data.to_netcdf(path / "data.nc") merged_data = open_dataset(path / "data.nc", chunks=split_data.chunks) if not split_data.equals(merged_data): (path / "data.nc").remove() raise IOError("Unable to merge data ") [file.remove() for file in path.files("data_*.nc")] return path / "data.nc" PK!HHtriflow/plugins/displays.py#!/usr/bin/env python # coding=utf8 import logging import multiprocessing as mp import os import warnings from collections import deque from uuid import uuid4 import coolname from holoviews import Curve, DynamicMap, Layout, streams from path import Path # noqa log = logging.getLogger(__name__) log.handlers = [] log.addHandler(logging.NullHandler()) def is_interactive(): import __main__ as main return not hasattr(main, '__file__') def manage_display_import(): global MPLRenderer import matplotlib as mpl if os.environ.get('DISPLAY', '') == '': log.info('no display found. Using non-interactive Agg backend') with warnings.catch_warnings(): warnings.simplefilter("ignore") mpl.use('Agg') from holoviews.plotting.mpl import MPLRenderer # noqa if is_interactive(): from holoviews import notebook_extension notebook_extension("bokeh") manage_display_import() class TriflowDisplay: def __init__(self, skel_data, plot_function, on_disk=None, on_disk_name="triflow_plot", **renderer_args): self.on_disk = None self._plot_pipe = streams.Pipe(data=skel_data) self._dynmap = (DynamicMap(plot_function, streams=[self._plot_pipe]) .opts("Curve [width=600] {+framewise}")) self._writers = [] if on_disk: self._renderer = MPLRenderer.instance() target_dir = Path(on_disk) target_dir.makedirs_p() def save_curves(data): target_file = target_dir / "%s_%i" % (on_disk_name, data.i) process = mp.Process(target=self._renderer.save, args=(self.hv_curve, target_file), kwargs=renderer_args) self._writers.append(process) process.start() self._plot_pipe.add_subscriber(save_curves) def _repr_mimebundle_(self, *args, **kwargs): return self.hv_curve._repr_mimebundle_(*args, **kwargs) def connect(self, stream): stream.sink(self._plot_pipe.send) @property def hv_curve(self): return self._dynmap.collate() def __add__(self, other): if isinstance(other, TriflowDisplay): return self._dynmap + other._dynmap return self._dynmap + other def __mul__(self, other): if isinstance(other, TriflowDisplay): return self._dynmap * other._dynmap self._dynmap * other @staticmethod def display_fields(simul, keys="all", on_disk=None, on_disk_name=None, **renderer_args): def plot_function(data): nonlocal keys curves = [] keys = data.fields.data_vars if keys == "all" else keys for var in keys if not isinstance(keys, str) else [keys]: displayed_field = data.fields[var] if displayed_field.dims != ('x', ): continue curves.append(Curve((displayed_field.squeeze()))) return Layout(curves).cols(1) if on_disk and not on_disk_name: on_disk_name = "%s_%s" % (simul.id, "-".join(keys)) display = TriflowDisplay(simul, plot_function, on_disk=on_disk, on_disk_name=on_disk_name, **renderer_args) display.connect(simul.stream) return display @staticmethod def display_probe(simul, function, xlabel=None, ylabel=None, buffer=None, on_disk=None, on_disk_name=None, **renderer_args): history = deque([], buffer) if not xlabel: xlabel = coolname.generate_slug(2) if not ylabel: ylabel = function.__name__ if ylabel == '': warnings.warn("Anonymous function used, appending random prefix " "to avoid label confusion") ylabel += str(uuid4())[:8] def plot_function(data): history.append(function(simul)) return Curve(history, kdims=xlabel, vdims=ylabel) if on_disk and not on_disk_name: on_disk_name = "simul.id_%s" % ylabel display = TriflowDisplay(simul, plot_function, on_disk=on_disk, on_disk_name=on_disk_name, **renderer_args) display.connect(simul.stream) return display PK!Etriflow-0.5.1.dist-info/LICENSECopyright 2018 Nicolas Cellier Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H=BTTtriflow-0.5.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]n0H*J>mlcAPK!HBV; triflow-0.5.1.dist-info/METADATAY}s۶sVihIVvzK\m˲"! I(Ym{(Yrw(L:96X$,XZraMQz3Og t*fNLfVeJ6ӿܔbfj1:Ξv 1 2+HM¹j||<t3MRS&eNUi*ڬM1Q=7-7qXƅTrҩZNT1:5`TT!u>).PC%rF^mpvr/y"+cqqy.TUmV*=޻6Ze9oDaSn|7nqY};&׿ I u2|larjm vG'ʦ6 N+I8%JY7'0!Vŋud>cw?9u-7ACV)(2kUզWѽ̺/U^]Q\Z/4IL=?lWĮ)2(M9Ax3_xvktK']c\-Wᩙ^ OkYIt[zZ}"1)rCMuZynmkU2>Cs5ypE_:\C/OooR@(6ڊ\~x\.U>w~4څJT +qXPe TRNBYtQT:_ZRLeJ\B{U=fǣ% lEB5̔r%*ϱ53c)d((Ŋ0ڑ$0\ gD0iZA!׀6:]*T:SB3#` XrNk1']%4J\e6A&~ '^`<P!a@B3kj0&G/Ln`6a7p2Fu7cqui>=O J[, #*8-q\n ^zNJfnnH$BKX'bN #%\tIGXc'! 5j3Ҭ)=ι*!kFoUf!4jYo[wh7wݿ 1y1'%#@xOxfHJِ Vf=y󦻒ZCxjSt5AS8(ji%Zh) k1)?^E%@xGSXD G(Gћ(N<X졌lmU1TCeվ6ҫ*"bqm9?#};#n!-pU&U=3X ԇ\MFk;#biR68<~e`B'*OܸzxSOf_bdՍ. k4LroSSfs͘C^߀h`9PYS{(@̒_'?4>(} nGD Z;Ҏ3 W~gvԪ|Jw`/Gi!M;|9E¬{;a鏗Qo?>˿`7b'іXⲜIn^r wd;^]E?ox1%w}ً'ı8/|83ؚ0rE4KV/>t}9!oH,i;lnyl0NK0څC!% 1H1%E.t?;Ĕ m^E+F)O1ǩy[q}}LV<(n6Z.XnJTÈ}#$9ҏV@WvqBORtI)۰PxK3\PoqH=%)Z/r~vs9錺_܆wԍqDR=s?,=B{73ޮ<[B(J{hYfj`0c9Qi7;b_h#.P_sڀSEAXUJڛ x dJvoGVE) BorϘ7CUlw]>dQq[W>@Y =)A5`>}yϿ0 }ó檓Gჯ;?:fv PK!Htriflow-0.5.1.dist-info/RECORDuGPmx D(!wܶd7թ"/{u1["5gwFn;)K /(ICeQi4}?K/b U+Y+*$̫4U[Y?<[<֙oAkFdS l[J#@/2/2}t⸙F~̴ƋF͵ $ WjҬ|xQsRНrE?9ci,y(^TEr&nKRYL&qmy (|U^CRug#biX̊=ON(n7XTSES?f#qyy[3QK/)UJ ؖӥ]+NݼMZtC_(!YG *GLz>ZOC$Vĉ9ԛ1^HbACbC[FsĔ*T_=< nnkY,ds5@$}(FqD:o'l#W83vf_)kWokg}5),@F_QOLFN !vn=iᗦ&8,:'kckN ]YFGok}Ɣ!{~8%l n'PK!F$$triflow/__init__.pyPK!Utriflow/core/__init__.pyPK!,,triflow/core/compilers.pyPK!˽80triflow/core/fields.pyPK!WVpUpU)Ktriflow/core/model.pyPK![4 4 ̠triflow/core/routines.pyPK!F5n[UU6triflow/core/schemes.pyPK!rjKY=Y=ytriflow/core/simulation.pyPK!O $$ @triflow/plugins/__init__.pyPK!Br1g@triflow/plugins/container.pyPK!HH_triflow/plugins/displays.pyPK!E4rtriflow-0.5.1.dist-info/LICENSEPK!H=BTTvtriflow-0.5.1.dist-info/WHEELPK!HBV; wtriflow-0.5.1.dist-info/METADATAPK!Htriflow-0.5.1.dist-info/RECORDPK6