PK!zplaningfsi/__init__.py"""PlaningFSI is a Python package for solving 2d FSI problems. PlaningFSI solves the general FSI problem of 2-D planing surfaces on a free surface, where each surface can be rigid or flexible. The substructures are part of a rigid body, which can be free in sinkage and trim. The fluid solver is based on linearized potential-flow assumptions, and the structural solver considers a large-deformation simple beam element. """ import logging logger = logging.getLogger("planingfsi") logger.setLevel(logging.DEBUG) PK!planingfsi/__version__.py__version__ = "0.1.0-dev" PK!LYYplaningfsi/cli.py"""Command line program for PlaningFSI. This script is called from the command line with options to plot or load results from file. It reads the fluid and solid body properties from dictionary files, assembles the problem, and runs it. """ import click import krampy from . import config from . import logger from .fe.femesh import Mesh from .fsi.simulation import Simulation @click.group(name="planingfsi", help="Run the PlaningFSI program") @click.option("post_mode", "--post", is_flag=True) @click.option("--plot_save", is_flag=True) @click.option("new_case", "--new", is_flag=True) def run_planingfsi(post_mode, plot_save, new_case): if post_mode: logger.info("Running in post-processing mode") config.plotting.save = True config.plotting.plot_any = True config.io.results_from_file = True if plot_save: config.plotting.save = True config.plotting.plot_any = True if new_case: krampy.rm_rf(krampy.find_files(config.path.case_dir, "[0-9]*")) simulation = Simulation() simulation.load_input_files() simulation.run() @run_planingfsi.command(name="mesh") @click.argument("mesh_dict", required=False) @click.option("plot_show", "--show", is_flag=True) @click.option("plot_save", "--save", is_flag=True) @click.option("--verbose", is_flag=True) def generate_mesh(mesh_dict, plot_show, plot_save, verbose): if not mesh_dict: config.path.mesh_dict_dir = mesh_dict mesh = Mesh() exec(open(config.path.mesh_dict_dir).read()) mesh.display(disp=verbose) mesh.plot(show=plot_show, save=plot_save) mesh.write() PK!88planingfsi/config.py"""This module is used to store the global configuration. Values are stored after reading the configDict file, and values can be accessed by other packages and modules by importing the config module. Usage: from planingfsi import config The global attributes can then be simply accessed via config.attribute_name """ import math import os from pathlib import Path import matplotlib from krampy.iotools import load_dict_from_file from planingfsi import logger DICT_NAME = "configDict" logger.info("Loading values from {0}".format(DICT_NAME)) class ConfigItem(object): """A descriptor to represent a configuration item. Attributes are loaded from a dictionary with fancy default handling. """ def __init__(self, alt_key=None, alt_keys=None, default=None, type_=None): self.name = None self.alt_keys = [] if alt_key: self.alt_keys.append(alt_key) if alt_keys: self.alt_keys.extend(alt_keys) self.default = default self.type_ = type_ def __get__(self, instance, owner): """Retrieve the value from the instance dictionary. If it doesn't exist, return the default.""" if self.name in instance.__dict__: return instance.__dict__[self.name] return self.default def __set__(self, instance, value): """When the value is set, try to convert it and then store it in the instance dictionary.""" if value is not None and self.type_ is not None: value = self.type_(value) instance.__dict__[self.name] = value def __set_name__(self, _, name): self.name = name @property def keys(self): """A list of keys to look for when reading in the value.""" return [self.name] + self.alt_keys def get_from_dict(self, dict_): """Try to read all keys from the dictionary until a non-None value is found. Returns the default value if no appropriate value is found in the dictionary. """ for key in self.keys: value = dict_.get(key) if value is not None: return value raise KeyError( 'None of the following keys "{}" found in dictionary.'.format(self.keys) ) class SubConfig(object): """An empty class used simply for dividing the configuration into different sections. Also useful in helping define the namespace scopes. """ def __init__(self): if Path(DICT_NAME).exists(): self.load_from_dict(DICT_NAME) def load_from_dict(self, dict_name: str): """Load the configuration from a dictionary file. Parameters ---------- dict_name The path to the dictionary file. """ dict_ = load_dict_from_file(dict_name) config_items = {key: item for key, item in self.__class__.__dict__.items()} for key, config_item in self.__class__.__dict__.items(): if isinstance(config_item, ConfigItem): try: value = config_item.get_from_dict(dict_) except KeyError: pass else: setattr(self, config_item.name, value) class FlowConfig(SubConfig): density = ConfigItem(alt_key="rho", default=998.2) gravity = ConfigItem(alt_key="g", default=9.81) kinematic_viscosity = ConfigItem(alt_key="nu", default=1e-6) waterline_height = ConfigItem(alt_key="hWL", default=0.0) num_dim = ConfigItem(alt_key="dim", default=2) include_friction = ConfigItem(alt_key="shearCalc", default=False) _froude_num = ConfigItem(alt_key="Fr", default=None, type_=float) _flow_speed = ConfigItem(alt_key="U", default=None, type_=float) @property def reference_length(self): return body.reference_length @property def flow_speed(self): """The flow speed is the native variable to store free-stream velocity. However, if Froude number is set from input file, that should override the flow speed input. """ if self._froude_num is not None: if self._flow_speed is not None: raise ValueError( "Only one flow speed variable (either Froude number or flow " "speed) must be set in {0}".format(DICT_NAME) ) self.froude_num = self._froude_num elif self._flow_speed is None: raise ValueError("Must specify either U or Fr in {0}".format(DICT_NAME)) return self._flow_speed @flow_speed.setter def flow_speed(self, value): """Set the raw flow speed variable and ensure raw Froude number is not also set.""" self._flow_speed = value self._froude_num = None @property def froude_num(self): return self.flow_speed / math.sqrt(self.gravity * self.reference_length) @froude_num.setter def froude_num(self, value): self.flow_speed = value * math.sqrt(self.gravity * self.reference_length) @property def stagnation_pressure(self): return 0.5 * self.density * self.flow_speed ** 2 @property def k0(self): return self.gravity / self.flow_speed ** 2 @property def lam(self): return 2 * math.pi / self.k0 class BodyConfig(SubConfig): xCofG = ConfigItem(default=0.0) yCofG = ConfigItem(default=0.0) _xCofR = ConfigItem(type_=float) _yCofR = ConfigItem(type_=float) mass = ConfigItem(alt_key="m", default=1.0) _weight = ConfigItem(alt_key="W") reference_length = ConfigItem(alt_keys=["Lref", "Lc"], default=1.0) _cushion_pressure = ConfigItem(alt_key="Pc", default=0.0) _seal_pressure = ConfigItem(alt_key="Ps", default=0.0) # TODO: Do these belong here? PcBar = ConfigItem() PsBar = ConfigItem() # Rigid body motion parameters time_step = ConfigItem("timeStep", default=1e-3) relax_rigid_body = ConfigItem("rigidBodyRelax", default=1.0) motion_method = ConfigItem("motionMethod", default="Physical") motion_jacobian_first_step = ConfigItem("motionJacobianFirstStep", default=1e-6) bow_seal_tip_load = ConfigItem("bowSealTipLoad", default=0.0) tip_constraint_ht = ConfigItem("tipConstraintHt", type_=float) seal_load_pct = ConfigItem("sealLoadPct", default=1.0) cushion_force_method = ConfigItem("cushionForceMethod", default="Fixed") initial_draft = ConfigItem("initialDraft", default=0.0) initial_trim = ConfigItem("initialTrim", default=0.0) max_draft_step = ConfigItem("maxDraftStep", default=1e-3) max_trim_step = ConfigItem("maxTrimStep", default=1e-3) max_draft_acc = ConfigItem("maxDraftAcc", default=1000.0) max_trim_acc = ConfigItem("maxTrimAcc", default=1000.0) free_in_draft = ConfigItem("freeInDraft", default=False) free_in_trim = ConfigItem("freeInTrim", default=False) draft_damping = ConfigItem("draftDamping", default=1000.0) trim_damping = ConfigItem("trimDamping", default=500.0) _relax_draft = ConfigItem("draftRelax", type_=float) _relax_trim = ConfigItem("trimRelax", type_=float) @property def relax_draft(self): if self._relax_draft is not None: return self._relax_draft return body.relax_rigid_body @property def relax_trim(self): if self._relax_trim is not None: return self._relax_trim return body.relax_rigid_body @property def Pc(self): return self._cushion_pressure @property def PcBar(self): return self._cushion_pressure * self.reference_length / self.weight @PcBar.setter def PcBar(self, value): self._cushion_pressure = value * self.weight / self.reference_length @property def Ps(self): return self._seal_pressure @property def PsBar(self): if self._cushion_pressure == 0.0: return 0.0 return self._seal_pressure / self._cushion_pressure @PsBar.setter def PsBar(self, value): self._seal_pressure = value * self._cushion_pressure @property def xCofR(self): if self._xCofR is not None: return self._xCofR return self.xCofG @property def yCofR(self): if self._yCofR is not None: return self._yCofR return self.yCofG @property def weight(self): if self._weight is not None: return self._weight return self.mass * flow.gravity @weight.setter def weight(self, value): self.mass = value / flow.gravity class PlotConfig(SubConfig): pType = ConfigItem(alt_key="pScaleType", default="stagnation") _pScale = ConfigItem(alt_key="pScale", default=1.0) _pScalePct = ConfigItem(alt_key="pScalePct", default=1.0) _pScaleHead = ConfigItem(alt_key="pScaleHead", default=1.0) growth_rate = ConfigItem(alt_key="growthRate", default=1.1) CofR_grid_len = ConfigItem(alt_key="CofRGridLen", default=0.5) fig_format = ConfigItem(alt_key="figFormat", default="eps") pressure_limiter = ConfigItem("pressureLimiter", default=False) # Load plot extents ext_e = ConfigItem("extE", default=0.1) ext_w = ConfigItem("extW", default=0.1) ext_n = ConfigItem("extN", default=0.1) ext_s = ConfigItem("extS", default=0.1) xmin = ConfigItem("plotXMin", type_=float) xmax = ConfigItem("plotXMax", type_=float) ymin = ConfigItem("plotYMin", type_=float) ymax = ConfigItem("plotYMax", type_=float) lambda_min = ConfigItem("lamMin", default=-1.0) lambda_max = ConfigItem("lamMax", default=1.0) _x_fs_min = ConfigItem("xFSMin", type_=float) _x_fs_max = ConfigItem("xFSMax", type_=float) # Whether to save, show, or watch plots save = ConfigItem("plotSave", default=False) show_pressure = ConfigItem("plotPressure", default=False) show = ConfigItem("plotShow", default=False) _watch = ConfigItem("plotWatch", default=False) @property def watch(self): return self._watch or self.show @property def plot_any(self): return self.show or self.save or self.watch or self.show_pressure @property def x_fs_min(self): if self._x_fs_min is not None: return self._x_fs_min if self.xmin is not None: return self.xmin return self.lambda_min * flow.lam @property def x_fs_max(self): if self._x_fs_max is not None: return self._x_fs_max if self.xmax is not None: return self.xmax return self.lambda_max * flow.lam @property def pScale(self): if plotting.pType == "stagnation": return flow.stagnation_pressure elif plotting.pType == "cushion": return body.Pc if body.Pc > 0.0 else 1.0 elif plotting.pType == "hydrostatic": return flow.density * flow.gravity * self._pScaleHead return self._pScale * self._pScalePct class PathConfig(SubConfig): # Directories and file formats case_dir = ConfigItem("caseDir", default=".") fig_dir_name = ConfigItem("figDirName", default="figures") body_dict_dir = ConfigItem("bodyDictDir", default="bodyDict") input_dict_dir = ConfigItem("inputDictDir", default="inputDict") cushion_dict_dir = ConfigItem( alt_keys=["pressureCushionDictDir", "cushionDictDir"], default="cushionDict" ) mesh_dir = ConfigItem("meshDir", default="mesh") mesh_dict_dir = ConfigItem("meshDictDir", default="meshDict") class IOConfig(SubConfig): data_format = ConfigItem("dataFormat", default="txt") write_interval = ConfigItem("writeInterval", default=1) write_time_histories = ConfigItem("writeTimeHistories", default=False) results_from_file = ConfigItem("resultsFromFile", default=False) class SolverConfig(SubConfig): # Parameters for wetted length solver wetted_length_solver = ConfigItem("wettedLengthSolver", default="Secant") wetted_length_tol = ConfigItem("wettedLengthTol", default=1e-6) wetted_length_relax = ConfigItem("wettedLengthRelax", default=1.0) wetted_length_max_it = ConfigItem("wettedLengthMaxIt", default=20) wetted_length_max_it_0 = ConfigItem("wettedLengthMaxIt0", default=100) wetted_length_max_step_pct = ConfigItem("wettedLengthMaxStepPct", default=0.2) _wetted_length_max_step_pct_inc = ConfigItem( "wettedLengthMaxStepPctInc", type_=float ) _wetted_length_max_step_pct_dec = ConfigItem( "wettedLengthMaxStepPctDec", type_=float ) wetted_length_max_jacobian_reset_step = ConfigItem( "wettedLengthMaxJacobianResetStep", default=100 ) max_it = ConfigItem("maxIt", default=1) num_ramp_it = ConfigItem("rampIt", default=0) relax_initial = ConfigItem("relaxI", default=0.01) relax_final = ConfigItem("relaxF", default=0.5) max_residual = ConfigItem("tolerance", default=1e-6) pretension = ConfigItem("pretension", default=0.1) relax_FEM = ConfigItem(alt_keys=["FEMRelax", "relaxFEM"], default=1.0) max_FEM_disp = ConfigItem("maxFEMDisp", default=1.0) num_damp = ConfigItem("numDamp", default=0.0) @property def wetted_length_max_step_pct_inc(self): if self._wetted_length_max_step_pct_inc is not None: return self._wetted_length_max_step_pct_inc return self.wetted_length_max_step_pct @property def wetted_length_max_step_pct_dec(self): if self._wetted_length_max_step_pct_dec is not None: return self._wetted_length_max_step_pct_dec return self.wetted_length_max_step_pct # Flow-related variables flow = FlowConfig() body = BodyConfig() plotting = PlotConfig() path = PathConfig() io = IOConfig() solver = SolverConfig() def load_from_file(filename): for c in [flow, body, plotting, path, io, solver]: c.load_from_dict(filename) # Initialized constants ramp = 1.0 has_free_structure = False it_dir = "" it = -1 # Use tk by default. Otherwise try Agg. Otherwise, disable plotting. _fallback_engine = "Agg" if os.environ.get("DISPLAY") is None: matplotlib.use(_fallback_engine) else: try: from matplotlib import pyplot except ImportError: try: matplotlib.use(_fallback_engine) except ImportError: plotting.plot_any = False PK!planingfsi/fe/__init__.pyPK!(VF  planingfsi/fe/felib.pyimport numpy as np from scipy.interpolate import interp1d import planingfsi.config as config # import planingfsi.krampy as kp class Node: obj = [] @classmethod def get_index(cls, ind): return cls.obj[ind] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def count(cls): return len(cls.All()) @classmethod def find_nearest(cls): return [o for o in cls.obj] def __init__(self): self.node_num = len(Node.obj) Node.obj.append(self) self.x = 0.0 self.y = 0.0 self.dof = [self.node_num * config.flow.num_dim + i for i in [0, 1]] self.is_dof_fixed = [True] * config.flow.num_dim self.fixed_load = np.zeros(config.flow.num_dim) self.line_xy = None def set_coordinates(self, x, y): self.x = x self.y = y def move_coordinates(self, dx, dy): self.x += dx self.y += dy def get_coordinates(self): return self.x, self.y def plot(self, sty=None): if self.line_xy is not None: self.line_xy.set_data(self.x, self.y) class Element: obj = [] def __init__(self): self.element_num = len(Element.obj) Element.obj.append(self) # self.fluidS = [] # self.fluidP = [] self.node = [None] * 2 self.dof = [0] * config.flow.num_dim self.length = 0.0 self.initial_length = None self.qp = np.zeros((2)) self.qs = np.zeros((2)) self.lineEl = None self.lineEl0 = None self.plot_on = True def set_properties(self, **kwargs): length = kwargs.get("length", None) axialForce = kwargs.get("axialForce", None) EA = kwargs.get("EA", None) if not length == None: self.length = length self.initial_length = length if not axialForce == None: self.axial_force = axialForce self.initial_axial_force = axialForce if not EA == None: self.EA = EA def set_nodes(self, nodeList): self.node = nodeList self.dof = [dof for nd in self.node for dof in nd.dof] self.update_geometry() self.init_pos = [np.array(nd.get_coordinates()) for nd in self.node] def set_parent(self, parent): self.parent = parent def update_geometry(self): x = [nd.x for nd in self.node] y = [nd.y for nd in self.node] self.length = ((x[1] - x[0]) ** 2 + (y[1] - y[0]) ** 2) ** 0.5 if self.initial_length is None: self.initial_length = self.length self.gamma = kp.atand2(y[1] - y[0], x[1] - x[0]) def set_pressure_and_shear(self, qp, qs): self.qp = qp self.qs = qs def plot(self): if self.lineEl is not None and self.plot_on: self.lineEl.set_data([nd.x for nd in self.node], [nd.y for nd in self.node]) if self.lineEl0 is not None and self.plot_on: basePt = [self.parent.parent.xCofR0, self.parent.parent.yCofR0] pos = [ kp.rotatePt(pos, basePt, self.parent.parent.trim) - np.array([0, self.parent.parent.draft]) for pos in self.init_pos ] x, y = list(zip(*[[posi[i] for i in range(2)] for posi in pos])) self.lineEl0.set_data(x, y) class TrussElement(Element): def __init__(self): Element.__init__(self) self.initial_axial_force = 0.0 self.EA = 0.0 def get_stiffness_and_force(self): # Stiffness matrices in local coordinates KL = ( np.array([[1, 0, -1, 0], [0, 0, 0, 0], [-1, 0, 1, 0], [0, 0, 0, 0]]) * self.EA / self.length ) KNL = ( np.array([[1, 0, -1, 0], [0, 1, 0, -1], [-1, 0, 1, 0], [0, -1, 0, 1]]) * self.axial_force / self.length ) # Force vectors in local coordinates FL = np.array([[self.qs[0]], [self.qp[0]], [self.qs[1]], [self.qp[1]]]) FNL = np.array([[1], [0], [-1], [0]]) * self.axial_force # Add linear and nonlinear components K = KL + KNL F = FL + FNL # Rotate stiffness and force matrices into global coordinates C = kp.cosd(self.gamma) S = kp.sind(self.gamma) TM = np.array([[C, S, 0, 0], [-S, C, 0, 0], [0, 0, C, S], [0, 0, -S, C]]) K = np.dot(np.dot(TM.T, K), TM) F = np.dot(TM.T, F) return K, F def update_geometry(self): Element.update_geometry(self) self.axial_force = (1 - config.ramp) * self.initial_axial_force + self.EA * ( self.length - self.initial_length ) / self.initial_length class RigidElement(Element): pass # def __init__(self): # super().__init__(self) PK!$5$5planingfsi/fe/femesh.pyimport os import numpy as np import matplotlib.pyplot as plt import planingfsi.config as config import krampy as kp class Mesh: @classmethod def get_pt_by_id(cls, ID): return Point.find_by_id(ID) def __init__(self): self.submesh = [] self.add_point(0, "dir", [0, 0]) def add_submesh(self, name=""): submesh = Submesh(name) self.submesh.append(submesh) return submesh def add_point(self, ID, method, position, **kwargs): P = Point() P.set_id(ID) if method == "dir": P.set_position(np.array(position)) elif method == "rel": base_pt_id, ang, R = position P.set_position( Point.find_by_id(base_pt_id).get_position() + R * kp.ang2vecd(ang) ) elif method == "con": base_pt_id, dim, val = position ang = kwargs.get("angle", 0 if dim == "x" else 90) base_pt = Point.find_by_id(base_pt_id).get_position() if dim == "x": P.set_position( np.array([val, base_pt[1] + (val - base_pt[0]) * kp.tand(ang)]) ) elif dim == "y": P.set_position( np.array([base_pt[0] + (val - base_pt[1]) / kp.tand(ang), val]) ) else: print("Incorrect dimension specification") elif method == "pct": base_pt_id, end_pt_id, pct = position base_pt = Point.find_by_id(base_pt_id).get_position() end_pt = Point.find_by_id(end_pt_id).get_position() P.set_position((1 - pct) * base_pt + pct * end_pt) else: raise NameError( "Incorrect position specification method for point, ID: {0}".format(ID) ) return P def add_point_along_curve(self, ID, curve, pct): P = Point() P.set_id(ID) P.set_position(map(curve.get_shape_func(), [pct])[0]) return P def add_load(self, pt_id, F): Point.find_by_id(pt_id).add_fixed_load(F) def fix_all_points(self): for pt in Point.All(): pt.set_fixed_dof("x", "y") def fix_points(self, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.set_fixed_dof("x", "y") def rotate_points(self, base_pt_id, angle, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.rotate(base_pt_id, angle) def rotate_all_points(self, base_pt_id, angle): for pt in Point.All(): pt.rotate(base_pt_id, angle) def move_points(self, dx, dy, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.move(dx, dy) def move_all_points(self, dx, dy): for pt in Point.All(): pt.move(dx, dy) def scale_all_points(self, sf, base_pt_id=0): base_pt = Point.find_by_id(base_pt_id).get_position() for pt in Point.All(): pt.set_position((pt.get_position() - base_pt) * sf + base_pt) def get_diff(self, pt0, pt1): return ( Point.find_by_id(pt1).get_position() - Point.find_by_id(pt0).get_position() ) def get_length(self, pt0, pt1): return np.linalg.norm(self.get_diff(pt0, pt1)) def display(self, **kwargs): if kwargs.get("disp", False): Shape.print_all() print(("Line count: {0}".format(Line.count()))) print(("Point count: {0}".format(Point.count()))) def plot(self, **kwargs): show = kwargs.get("show", False) save = kwargs.get("save", False) plot = show or save if plot: plt.figure(figsize=(16, 14)) plt.axis("equal") plt.xlabel(r"$x$", size=18) plt.ylabel(r"$y$", size=18) Shape.plot_all() lims = plt.gca().get_xlim() ext = (lims[1] - lims[0]) * 0.1 plt.xlim([lims[0] - ext, lims[1] + ext]) # Process optional arguments and save or show figure if save: saved_file_name = kwargs.get("fileName", "meshLayout") plt.savefig(saved_file_name + ".eps", format="eps") if show: plt.show() def write(self): kp.createIfNotExist(config.path.mesh_dir) x, y = list(zip(*[pt.get_position() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "nodes.txt"), ["x", x], ["y", y] ) x, y = list(zip(*[pt.get_fixed_dof() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "fixedDOF.txt"), ["x", x], ["y", y], headerFormat=">1", dataFormat=">1", ) x, y = list(zip(*[pt.get_fixed_load() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "fixedLoad.txt"), ["x", x], ["y", y], headerFormat=">6", dataFormat="6.4e", ) for sm in self.submesh: sm.write() class Submesh(Mesh): def __init__(self, name): Mesh.__init__(self) self.name = name self.line = [] def add_curve(self, pt_id1, pt_id2, **kwargs): arc_length = kwargs.get("arcLen", None) radius = kwargs.get("radius", None) C = Curve(kwargs.get("Nel", 1)) C.set_id(kwargs.get("ID", -1)) C.set_end_pts_by_id(pt_id1, pt_id2) if arc_length is not None: C.set_arc_length(arc_length) elif radius is not None: C.set_radius(radius) else: C.set_arc_length(0.0) C.distribute_points() for pt in C.pt: pt.set_used() self.line += [l for l in C.get_lines()] return C def write(self): if len(self.line) > 0: ptL, ptR = list( zip( *[ [pt.get_index() for pt in l._get_element_coords()] for l in self.line ] ) ) kp.writeaslist( os.path.join( config.path.mesh_dir, "elements_{0}.txt".format(self.name) ), ["ptL", ptL], ["ptR", ptR], headerFormat="<4", dataFormat=">4", ) class Shape: obj = [] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def count(cls): return len(cls.All()) @classmethod def plot_all(cls): for o in cls.obj: o.plot() @classmethod def print_all(cls): for o in cls.obj: o.display() @classmethod def find_by_id(cls, ID): if ID == -1: return None else: obj = [a for a in cls.All() if a.ID == ID] if len(obj) == 0: return None else: return obj[0] def __init__(self): self.set_index(Shape.count()) self.ID = -1 Shape.obj.append(self) def set_id(self, ID): existing = self.__class__.find_by_id(ID) if existing is not None: existing.set_id(-1) self.ID = ID def set_index(self, ind): self.ind = ind def get_id(self): return self.ID def get_index(self): return self.ind def plot(self): return 0 class Point(Shape): obj = [] def __init__(self): Shape.__init__(self) self.set_index(Point.count()) Point.obj.append(self) self.pos = np.zeros(2) self.is_dof_fixed = [True] * 2 self.fixed_load = np.zeros(2) self.is_used = False def set_used(self, used=True): self.is_used = True self.set_free_dof("x", "y") def get_fixed_load(self): return self.fixed_load def add_fixed_load(self, F): for i in range(2): self.fixed_load[i] += F[i] def set_free_dof(self, *args): for arg in args: if arg == "x": self.is_dof_fixed[0] = False if arg == "y": self.is_dof_fixed[1] = False def set_fixed_dof(self, *args): for arg in args: if arg == "x": self.is_dof_fixed[0] = True if arg == "y": self.is_dof_fixed[1] = True def get_fixed_dof(self): return self.is_dof_fixed def move(self, dx, dy): self.set_position(self.get_position() + np.array([dx, dy])) def set_position(self, pos): self.pos = pos def get_position(self): return self.pos def get_x_pos(self): return self.pos[0] def get_y_pos(self): return self.pos[1] def rotate(self, base_pt_id, angle): base_pt = Point.find_by_id(base_pt_id).get_position() self.set_position(kp.rotateVec(self.get_position() - base_pt, angle) + base_pt) def display(self): print( ( "{0} {1}: ID = {2}, Pos = {3}".format( self.__class__.__name__, self.get_index(), self.get_id(), self.get_position(), ) ) ) def plot(self): if self.ID == -1: plt.plot(self.pos[0], self.pos[1], "r*") else: plt.plot(self.pos[0], self.pos[1], "ro") plt.text(self.pos[0], self.pos[1], " {0}".format(self.ID)) class Curve(Shape): obj = [] def __init__(self, Nel=1): Shape.__init__(self) self.set_index(Curve.count()) Curve.obj.append(self) self.pt = [] self.line = [] self._end_pts = [] self.Nel = Nel self.plot_sty = "m--" def set_end_pts_by_id(self, pt_id1, pt_id2): self.set_end_pts([Point.find_by_id(pid) for pid in [pt_id1, pt_id2]]) def get_shape_func(self): xy = [pt.get_position() for pt in self._end_pts] if self.radius == 0.0: return lambda s: xy[0] * (1 - s) + xy[1] * s else: x, y = list(zip(*xy)) gam = np.arctan2(y[1] - y[0], x[1] - x[0]) alf = self.arc_length / (2 * self.radius) return lambda s: self._end_pts[0].get_position() + 2 * self.radius * np.sin( s * alf ) * kp.ang2vec(gam + (s - 1) * alf) def set_radius(self, R): self.radius = R self.calculate_chord() self.calculate_arc_length() def set_arc_length(self, arc_length): self.calculate_chord() if self.chord >= arc_length: self.arc_length = 0.0 self.set_radius(0.0) else: self.arc_length = arc_length self.calculate_radius() def calculate_radius(self): self.radius = 1 / self.calculate_curvature() def calculate_chord(self): x, y = list(zip(*[pt.get_position() for pt in self._end_pts])) self.chord = ((x[1] - x[0]) ** 2 + (y[1] - y[0]) ** 2) ** 0.5 def calculate_arc_length(self): if self.radius == 0: self.arc_length = self.chord else: f = lambda s: self.chord / (2 * self.radius) - np.sin(s / (2 * self.radius)) # Keep increasing guess until fsolve finds the first non-zero root self.arc_length = kp.fzero(f, self.chord + 1e-6) def calculate_curvature(self): f = lambda x: x * self.chord / 2 - np.sin(x * self.arc_length / 2) # Keep increasing guess until fsolve finds the first non-zero root kap = 0.0 kap0 = 0.0 while kap <= 1e-6: kap = kp.fzero(f, kap0) kap0 += 0.02 return kap def distribute_points(self): self.pt.append(self._end_pts[0]) if not self.Nel == 1: # Distribute N points along a parametric curve defined by f(s), s in [0,1] s = np.linspace(0.0, 1.0, self.Nel + 1)[1:-1] for xy in map(self.get_shape_func(), s): P = Point() self.pt.append(P) P.set_position(xy) self.pt.append(self._end_pts[1]) self.generate_lines() def generate_lines(self): for ptSt, ptEnd in zip(self.pt[:-1], self.pt[1:]): L = Line() L.set_end_pts([ptSt, ptEnd]) self.line.append(L) def get_lines(self): return self.line def set_pts(self, pt): self.pt = pt def set_end_pts(self, end_pt): self._end_pts = end_pt def _get_element_coords(self): return self.pt def getPtIDs(self): return [pt.get_id() for pt in self.pt] def display(self): print( ( "{0} {1}: ID = {2}, Pt IDs = {3}".format( self.__class__.__name__, self.get_index(), self.get_id(), self.getPtIDs(), ) ) ) def plot(self): x, y = list(zip(*[pt.get_position() for pt in self.pt])) plt.plot(x, y, self.plot_sty) class Line(Curve): obj = [] def __init__(self): Curve.__init__(self) self.set_index(Line.count()) Line.obj.append(self) self.plot_sty = "b-" def set_end_pts(self, endPt): Curve.set_end_pts(self, endPt) self.set_pts(endPt) PK!planingfsi/fe/structure.pyimport os import numpy as np from scipy.interpolate import interp1d # import planingfsi.io from planingfsi import config # from planingfsi import krampy as kp # from planingfsi import io from . import felib as fe class FEStructure: """Parent object for solving the finite-element structure. Consists of several rigid bodies and substructures. """ def __init__(self): self.rigid_body = [] self.substructure = [] self.node = [] self.res = 1.0 def add_rigid_body(self, dict_=None): if dict_ is None: dict_ = planingfsi.io.Dictionary() rigid_body = RigidBody(dict_) self.rigid_body.append(rigid_body) return rigid_body def add_substructure(self, dict_=None): if dict_ is None: dict_ = planingfsi.io.Dictionary() ss_type = dict_.read("substructureType", "rigid") if ss_type.lower() == "flexible" or ss_type.lower() == "truss": ss = FlexibleSubstructure(dict_) FlexibleSubstructure.obj.append(ss) elif ss_type.lower() == "torsionalspring": ss = TorsionalSpringSubstructure(dict_) else: ss = RigidSubstructure(dict_) self.substructure.append(ss) # Find parent body and add substructure to it body = [ b for b in self.rigid_body if b.name == dict_.read("bodyName", "default") ] if len(body) > 0: body = body[0] else: body = self.rigid_body[0] body.add_substructure(ss) ss.add_parent(body) print( ( "Adding Substructure {0} of type {1} to rigid body {2}".format( ss.name, ss.type_, body.name ) ) ) return ss def initialize_rigid_bodies(self): for bd in self.rigid_body: bd.initialize_position() def update_fluid_forces(self): for bd in self.rigid_body: bd.update_fluid_forces() def calculate_response(self): if config.io.results_from_file: self.load_response() else: for bd in self.rigid_body: bd.update_position() bd.update_substructure_positions() def get_residual(self): self.res = 0.0 for bd in self.rigid_body: if bd.free_in_draft or bd.free_in_trim: self.res = np.max([np.abs(bd.res_l), self.res]) self.res = np.max([np.abs(bd.res_m), self.res]) self.res = np.max([FlexibleSubstructure.res, self.res]) def load_response(self): self.update_fluid_forces() for bd in self.rigid_body: bd.load_motion() for ss in bd.substructure: ss.load_coordinates() ss.update_geometry() def write_results(self): for bd in self.rigid_body: bd.write_motion() for ss in bd.substructure: ss.write_coordinates() def plot(self): for body in self.rigid_body: for struct in body.substructure: struct.plot() def load_mesh(self): # Create all nodes x, y = np.loadtxt(os.path.join(config.path.mesh_dir, "nodes.txt"), unpack=True) xf, yf = np.loadtxt( os.path.join(config.path.mesh_dir, "fixedDOF.txt"), unpack=True ) fx, fy = np.loadtxt( os.path.join(config.path.mesh_dir, "fixedLoad.txt"), unpack=True ) for xx, yy, xxf, yyf, ffx, ffy in zip(x, y, xf, yf, fx, fy): nd = fe.Node() nd.set_coordinates(xx, yy) nd.is_dof_fixed = [bool(xxf), bool(yyf)] nd.fixed_load = np.array([ffx, ffy]) self.node.append(nd) for struct in self.substructure: struct.load_mesh() if ( struct.type_ == "rigid" or struct.type_ == "rotating" or struct.type_ == "torsionalSpring" ): struct.set_fixed_dof() for ss in self.substructure: ss.set_attachments() class RigidBody(object): def __init__(self, dict_): self.dict_ = dict_ self.num_dim = 2 self.draft = 0.0 self.trim = 0.0 self.name = self.dict_.read("bodyName", "default") self.weight = self.dict_.read( "W", self.dict_.read("loadPct", 1.0) * config.body.weight ) self.weight *= config.body.seal_load_pct self.m = self.dict_.read("m", self.weight / config.flow.gravity) self.Iz = self.dict_.read("Iz", self.m * config.body.reference_length ** 2 / 12) self.has_planing_surface = self.dict_.read("hasPlaningSurface", False) var = [ "max_draft_step", "max_trim_step", "free_in_draft", "free_in_trim", "draft_damping", "trim_damping", "max_draft_acc", "max_trim_acc", "xCofG", "yCofG", "xCofR", "yCofR", "initial_draft", "initial_trim", "relax_draft", "relax_trim", "time_step", "num_damp", ] for v in var: if v in self.dict_: setattr(self, v, self.dict_.read(v)) elif hasattr(config.body, v): setattr(self, v, getattr(config.body, v)) elif hasattr(config.solver, v): setattr(self, v, getattr(config.solver, v)) elif hasattr(config, v): setattr(self, v, getattr(config, v)) else: raise ValueError("Cannot find symbol: {0}".format(v)) # setattr(self, v, self.dict_.read(v, getattr(config.body, getattr(config, v)))) # self.xCofR = self.dict_.read('xCofR', self.xCofG) # self.yCofR = self.dict_.read('yCofR', self.yCofG) self.xCofR0 = self.xCofR self.yCofR0 = self.yCofR self.max_disp = np.array([self.max_draft_step, self.max_trim_step]) self.free_dof = np.array([self.free_in_draft, self.free_in_trim]) self.c_damp = np.array([self.draft_damping, self.trim_damping]) self.max_acc = np.array([self.max_draft_acc, self.max_trim_acc]) self.relax = np.array([self.relax_draft, self.relax_trim]) if self.free_in_draft or self.free_in_trim: config.has_free_structure = True self.v = np.zeros((self.num_dim)) self.a = np.zeros((self.num_dim)) self.v_old = np.zeros((self.num_dim)) self.a_old = np.zeros((self.num_dim)) self.beta = self.dict_.read("beta", 0.25) self.gamma = self.dict_.read("gamma", 0.5) self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 self.solver = None self.disp_old = 0.0 self.res_old = None self.two_ago_disp = 0.0 self.predictor = True self.f_old = 0.0 self.two_ago_f = 0.0 self.res_l = 1.0 self.res_m = 1.0 self.J = None self.J_tmp = None # Assign displacement function depending on specified method self.get_disp = lambda: (0.0, 0.0) if any(self.free_dof): if config.body.motion_method == "Secant": self.get_disp = self.get_disp_secant elif config.body.motion_method == "Broyden": self.get_disp = self.get_disp_broyden elif config.body.motion_method == "BroydenNew": self.get_disp = self.get_disp_broyden_new elif config.body.motion_method == "Physical": self.get_disp = self.get_disp_physical elif config.body.motion_method == "Newmark-Beta": self.get_disp = self.get_disp_newmark_beta elif config.body.motion_method == "PhysicalNoMass": self.get_disp = self.get_disp_physical_no_mass elif config.body.motion_method == "Sep": self.get_disp = self.getDispSep self.trim_solver = None self.draft_solver = None self.substructure = [] self.node = None print(("Adding Rigid Body: {0}".format(self.name))) def add_substructure(self, ss): self.substructure.append(ss) def store_nodes(self): self.node = [] for ss in self.substructure: for nd in ss.node: if not any([n.node_num == nd.node_num for n in self.node]): self.node.append(nd) def initialize_position(self): self.set_position(self.initial_draft, self.initial_trim) def set_position(self, draft, trim): self.update_position(draft - self.draft, trim - self.trim) def update_position(self, dDraft=None, dTrim=None): if dDraft is None: dDraft, dTrim = self.get_disp() if np.isnan(dDraft): dDraft = 0.0 if np.isnan(dTrim): dTrim = 0.0 if self.node is None: self.store_nodes() for nd in self.node: xo, yo = nd.get_coordinates() newPos = kp.rotatePt([xo, yo], [self.xCofR, self.yCofR], dTrim) nd.move_coordinates(newPos[0] - xo, newPos[1] - yo - dDraft) for s in self.substructure: s.update_geometry() self.xCofG, self.yCofG = kp.rotatePt( [self.xCofG, self.yCofG], [self.xCofR, self.yCofR], dTrim ) self.yCofG -= dDraft self.yCofR -= dDraft self.draft += dDraft self.trim += dTrim self.print_motion() def update_substructure_positions(self): FlexibleSubstructure.update_all() for ss in self.substructure: print(("Updating position for substructure: {0}".format(ss.name))) # if ss.type.lower() == 'flexible': # ss.getPtDispFEM() if ss.type_.lower() == "torsionalspring" or ss.type_.lower() == "rigid": ss.updateAngle() def update_fluid_forces(self): self.reset_loads() for ss in self.substructure: ss.update_fluid_forces() self.D += ss.D self.L += ss.L self.M += ss.M self.Da += ss.Da self.La += ss.La self.Ma += ss.Ma self.res_l = self.get_res_lift() self.res_m = self.get_res_moment() def reset_loads(self): self.D *= 0.0 self.L *= 0.0 self.M *= 0.0 if config.body.cushion_force_method.lower() == "assumed": self.L += ( config.body.Pc * config.body.reference_length * kp.cosd(config.trim) ) def get_disp_physical(self): disp = self.limit_disp(self.time_step * self.v) for i in range(self.num_dim): if np.abs(disp[i]) == np.abs(self.max_disp[i]): self.v[i] = disp[i] / self.time_step self.v += self.time_step * self.a self.a = np.array( [self.weight - self.L, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.a -= self.Cdamp * (self.v + self.v**3) * config.ramp self.a -= self.c_damp * self.v * config.ramp self.a /= np.array([self.m, self.Iz]) self.a = np.min( np.vstack( (np.abs(self.a), np.array([self.max_draft_acc, self.max_trim_acc])) ), axis=0, ) * np.sign(self.a) # accLimPct = np.min(np.vstack((np.abs(self.a), self.maxAcc)), axis=0) * np.sign(self.a) # for i in range(len(self.a)): # if self.a[i] == 0.0 or not self.freeDoF[i]: # accLimPct[i] = 1.0 # else: # accLimPct[i] /= self.a[i] # # self.a *= np.min(accLimPct) disp *= config.ramp return disp def get_disp_newmark_beta(self): self.a = np.array( [self.weight - self.L, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.a -= self.Cdamp * self.v * config.ramp self.a /= np.array([self.m, self.Iz]) self.a = np.min( np.vstack( (np.abs(self.a), np.array([self.max_draft_acc, self.max_trim_acc])) ), axis=0, ) * np.sign(self.a) dv = (1 - self.gamma) * self.a_old + self.gamma * self.a dv *= self.time_step dv *= 1 - self.numDamp self.v += dv disp = 0.5 * (1 - 2 * self.beta) * self.a_old + self.beta * self.a disp *= self.time_step disp += self.v_old disp *= self.time_step self.a_old = self.a self.v_old = self.v disp *= self.relax disp *= config.ramp disp = self.limit_disp(disp) # disp *= config.ramp return disp def get_disp_physical_no_mass(self): F = np.array( [ self.weight - self.L, self.M - self.weight * (config.body.xCofG - config.body.xCofR), ] ) # F -= self.Cdamp * self.v if self.predictor: disp = F / self.c_damp * self.time_step self.predictor = False else: disp = 0.5 * self.time_step / self.c_damp * (F - self.two_ago_f) self.predictor = True disp *= self.relax * config.ramp disp = self.limit_disp(disp) # self.v = disp / self.timeStep self.two_ago_f = self.f_old self.f_old = disp * self.c_damp / self.time_step return disp def get_disp_secant(self): if self.solver is None: self.resFun = lambda x: np.array( [ self.L - self.weight, self.M - self.weight * (config.body.xCofG - config.body.xCofR), ] ) self.solver = kp.RootFinder( self.resFun, np.array([config.body.initial_draft, config.body.initial_trim]), "secant", dxMax=self.max_disp * self.free_dof, ) if not self.disp_old is None: self.solver.takeStep(self.disp_old) # Solve for limited displacement disp = self.solver.limitStep(self.solver.getStep()) self.two_ago_disp = self.disp_old self.disp_old = disp return disp def reset_jacobian(self): if self.J_tmp is None: self.Jit = 0 self.J_tmp = np.zeros((self.num_dim, self.num_dim)) self.step = 0 self.Jfo = self.resFun(self.x) self.res_old = self.Jfo * 1.0 else: f = self.resFun(self.x) self.J_tmp[:, self.Jit] = (f - self.Jfo) / self.disp_old[self.Jit] self.Jit += 1 disp = np.zeros((self.num_dim)) if self.Jit < self.num_dim: disp[self.Jit] = config.body.motion_jacobian_first_step if self.Jit > 0: disp[self.Jit - 1] = -config.body.motion_jacobian_first_step self.disp_old = disp if self.Jit >= self.num_dim: self.J = self.J_tmp * 1.0 self.J_tmp = None self.disp_old = None return disp def get_disp_broyden_new(self): if self.solver is None: self.resFun = lambda x: np.array( [ f for f, free_dof in zip( [self.get_res_lift(), self.get_res_moment()], self.free_dof ) if free_dof ] ) maxDisp = [ m for m, free_dof in zip(self.max_disp, self.free_dof) if free_dof ] self.x = np.array( [ f for f, free_dof in zip([self.draft, self.trim], self.free_dof) if free_dof ] ) self.solver = kp.RootFinder(self.resFun, self.x, "broyden", dxMax=maxDisp) if not self.disp_old is None: self.solver.takeStep(self.disp_old) # Solve for limited displacement disp = self.solver.limitStep(self.solver.getStep()) self.two_ago_disp = self.disp_old self.disp_old = disp return disp def get_disp_broyden(self): if self.solver is None: self.resFun = lambda x: np.array( [self.L - self.weight, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.resFun = lambda x: np.array([self.get_res_moment(), self.get_res_lift()]) self.solver = 1.0 self.x = np.array([self.draft, self.trim]) if self.J is None: disp = self.reset_jacobian() else: self.f = self.resFun(self.x) if not self.disp_old is None: self.x += self.disp_old dx = np.reshape(self.disp_old, (self.num_dim, 1)) df = np.reshape(self.f - self.res_old, (self.num_dim, 1)) self.J += ( np.dot(df - np.dot(self.J, dx), dx.T) / np.linalg.norm(dx) ** 2 ) dof = self.free_dof dx = np.zeros_like(self.x) A = -self.J[np.ix_(dof, dof)] b = self.f.reshape(self.num_dim, 1)[np.ix_(dof)] dx[np.ix_(dof)] = np.linalg.solve(A, b) if self.res_old is not None: if any(np.abs(self.f) - np.abs(self.res_old) > 0.0): self.step += 1 if self.step >= 6: print("\nResetting Jacobian for Motion\n") disp = self.reset_jacobian() disp = dx.reshape(self.num_dim) # disp = self.solver.getStep() disp *= self.relax disp = self.limit_disp(disp) self.disp_old = disp self.res_old = self.f * 1.0 self.step += 1 return disp def limit_disp(self, disp): dispLimPct = np.min(np.vstack((np.abs(disp), self.max_disp)), axis=0) * np.sign( disp ) for i in range(len(disp)): if disp[i] == 0.0 or not self.free_dof[i]: dispLimPct[i] = 1.0 else: dispLimPct[i] /= disp[i] return disp * np.min(dispLimPct) * self.free_dof def get_res_lift(self): if np.isnan(self.L): res = 1.0 else: res = (self.L - self.weight) / ( config.flow.stagnation_pressure * config.body.reference_length + 1e-6 ) return np.abs(res * self.free_in_draft) def get_res_moment(self): if np.isnan(self.M): res = 1.0 else: if self.xCofG == self.xCofR and self.M == 0.0: res = 1.0 else: res = (self.M - self.weight * (self.xCofG - self.xCofR)) / ( config.flow.stagnation_pressure * config.body.reference_length ** 2 + 1e-6 ) return np.abs(res * self.free_in_trim) def print_motion(self): print(("Rigid Body Motion: {0}".format(self.name))) print((" CofR: ({0}, {1})".format(self.xCofR, self.yCofR))) print((" CofG: ({0}, {1})".format(self.xCofG, self.yCofG))) print((" Draft: {0:5.4e}".format(self.draft))) print((" Trim Angle: {0:5.4e}".format(self.trim))) print((" Lift Force: {0:5.4e}".format(self.L))) print((" Drag Force: {0:5.4e}".format(self.D))) print((" Moment: {0:5.4e}".format(self.M))) print((" Lift Force Air: {0:5.4e}".format(self.La))) print((" Drag Force Air: {0:5.4e}".format(self.Da))) print((" Moment Air: {0:5.4e}".format(self.Ma))) print((" Lift Res: {0:5.4e}".format(self.res_l))) print((" Moment Res: {0:5.4e}".format(self.res_m))) def write_motion(self): kp.writeasdict( os.path.join( config.it_dir, "motion_{0}.{1}".format(self.name, config.io.data_format) ), ["xCofR", self.xCofR], ["yCofR", self.yCofR], ["xCofG", self.xCofG], ["yCofG", self.yCofG], ["draft", self.draft], ["trim", self.trim], ["liftRes", self.res_l], ["momentRes", self.res_m], ["Lift", self.L], ["Drag", self.D], ["Moment", self.M], ["LiftAir", self.La], ["DragAir", self.Da], ["MomentAir", self.Ma], ) for ss in self.substructure: if ss.type_ == "torsionalSpring": ss.writeDeformation() def load_motion(self): K = kp.dict_ionary( os.path.join( config.it_dir, "motion_{0}.{1}".format(self.name, config.io.data_format) ) ) self.xCofR = K.read("xCofR", np.nan) self.yCofR = K.read("yCofR", np.nan) self.xCofG = K.read("xCofG", np.nan) self.yCofG = K.read("yCofG", np.nan) self.draft = K.read("draft", np.nan) self.trim = K.read("trim", np.nan) self.res_l = K.read("liftRes", np.nan) self.res_m = K.read("momentRes", np.nan) self.L = K.read("Lift", np.nan) self.D = K.read("Drag", np.nan) self.M = K.read("Moment", np.nan) self.La = K.read("LiftAir", np.nan) self.Da = K.read("DragAir", np.nan) self.Ma = K.read("MomentAir", np.nan) class Substructure: count = 0 obj = [] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def find_by_name(cls, name): return [o for o in cls.obj if o.name == name][0] def __init__(self, dict_): self.index = Substructure.count Substructure.count += 1 Substructure.obj.append(self) self.dict_ = dict_ self.name = self.dict_.read("substructureName", "") self.type_ = self.dict_.read("substructureType", "rigid") self.interpolator = None self.seal_pressure = self.dict_.read_load_or_default("Ps", 0.0) self.seal_pressure_method = self.dict_.read("PsMethod", "constant") self.seal_over_pressure_pct = self.dict_.read_load_or_default( "overPressurePct", 1.0 ) self.cushion_pressure_type = self.dict_.read("cushionPressureType", None) self.tip_load = self.dict_.read_load_or_default("tipLoad", 0.0) self.tip_constraint_height = self.dict_.read("tipConstraintHt", None) self.struct_interp_type = self.dict_.read("structInterpType", "linear") self.struct_extrap = self.dict_.read("structExtrap", True) self.line_fluid_pressure = None self.line_air_pressure = None self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.U = 0.0 def add_parent(self, parent): self.parent = parent def get_residual(self): return 0.0 def set_interpolator(self, interpolator): self.interpolator = interpolator def set_element_properties(self): for el in self.el: el.set_properties(length=self.get_arc_length() / len(self.el)) def load_mesh(self): ndSt, ndEnd = np.loadtxt( os.path.join(config.path.mesh_dir, "elements_{0}.txt".format(self.name)), unpack=True, ) if isinstance(ndSt, float): ndSt = [int(ndSt)] ndEnd = [int(ndEnd)] else: ndSt = [int(nd) for nd in ndSt] ndEnd = [int(nd) for nd in ndEnd] ndInd = ndSt + [ndEnd[-1]] # Generate Element list self.node = [fe.Node.get_index(i) for i in ndInd] self.set_interp_function() self.el = [self.element_type() for _ in ndSt] self.set_element_properties() for ndSti, ndEndi, el in zip(ndSt, ndEnd, self.el): el.set_nodes([fe.Node.get_index(ndSti), fe.Node.get_index(ndEndi)]) el.set_parent(self) def set_interp_function(self): self.node_arc_length = np.zeros(len(self.node)) for i, nd0, nd1 in zip( list(range(len(self.node) - 1)), self.node[:-1], self.node[1:] ): self.node_arc_length[i + 1] = ( self.node_arc_length[i] + ((nd1.x - nd0.x) ** 2 + (nd1.y - nd0.y) ** 2) ** 0.5 ) if len(self.node_arc_length) == 2: self.struct_interp_type = "linear" elif len(self.node_arc_length) == 3 and not self.struct_interp_type == "linear": self.struct_interp_type = "quadratic" x, y = [np.array(xx) for xx in zip(*[(nd.x, nd.y) for nd in self.node])] self.interp_func_x, self.interp_func_y = ( interp1d(self.node_arc_length, x), interp1d(self.node_arc_length, y, kind=self.struct_interp_type), ) if self.struct_extrap: self.interp_func_x, self.interp_func_y = self.extrap_coordinates( self.interp_func_x, self.interp_func_y ) def extrap_coordinates(self, fxi, fyi): def extrap1d(interpolator): xs = interpolator.x ys = interpolator.y def pointwise(xi): if xi < xs[0]: return ys[0] + (xi - xs[0]) * (ys[1] - ys[0]) / (xs[1] - xs[0]) elif xi > xs[-1]: return ys[-1] + (xi - xs[-1]) * (ys[-1] - ys[-2]) / ( xs[-1] - xs[-2] ) else: return interpolator(xi) def ufunclike(xs): return np.array(list(map(pointwise, np.array([xs]))))[0] return ufunclike return extrap1d(fxi), extrap1d(fyi) def get_coordinates(self, si): return self.interp_func_x(si), self.interp_func_y(si) def get_xcoordinates(self, s): return self.get_coordinates(s)[0] def get_ycoordinates(self, s): return self.get_coordinates(s)[1] def get_arc_length(self): return max(self.node_arc_length) def write_coordinates(self): kp.writeaslist( os.path.join( config.it_dir, "coords_{0}.{1}".format(self.name, config.io.data_format) ), ["x [m]", [nd.x for nd in self.node]], ["y [m]", [nd.y for nd in self.node]], ) def load_coordinates(self): x, y = np.loadtxt( os.path.join( config.it_dir, "coords_{0}.{1}".format(self.name, config.io.data_format) ), unpack=True, ) for xx, yy, nd in zip(x, y, self.node): nd.set_coordinates(xx, yy) def update_fluid_forces(self): self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 if self.interpolator is not None: sMin, sMax = self.interpolator.get_min_max_s() for i, el in enumerate(self.el): # Get pressure at end points and all fluid points along element nodeS = [self.node_arc_length[i], self.node_arc_length[i + 1]] if self.interpolator is not None: s, pressure_fluid, tau = self.interpolator.get_loads_in_range( nodeS[0], nodeS[1] ) # Limit pressure to be below stagnation pressure if config.plotting.pressure_limiter: pressure_fluid = np.min( np.hstack( ( pressure_fluid, np.ones_like(pressure_fluid) * config.flow.stagnation_pressure, ) ), axis=0, ) else: s = np.array(nodeS) pressure_fluid = np.zeros_like(s) tau = np.zeros_like(s) ss = nodeS[1] Pc = 0.0 if self.interpolator is not None: if ss > sMax: Pc = self.interpolator.fluid.upstream_pressure elif ss < sMin: Pc = self.interpolator.fluid.downstream_pressure elif self.cushion_pressure_type == "Total": Pc = config.body.Pc # Store fluid and air pressure components for element (for # plotting) if i == 0: self.fluidS += [s[0]] self.fluidP += [pressure_fluid[0]] self.airS += [nodeS[0]] self.airP += [Pc - self.seal_pressure] self.fluidS += [ss for ss in s[1:]] self.fluidP += [pp for pp in pressure_fluid[1:]] self.airS += [ss for ss in nodeS[1:]] if self.seal_pressure_method.lower() == "hydrostatic": self.airP += [ Pc - self.seal_pressure + config.flow.density * config.flow.gravity * (self.interp_func_y(si) - config.flow.waterline_height) for si in nodeS[1:] ] else: self.airP += [Pc - self.seal_pressure for _ in nodeS[1:]] # Apply ramp to hydrodynamic pressure pressure_fluid *= config.ramp ** 2 # Add external cushion pressure to external fluid pressure pressure_cushion = np.zeros_like(s) Pc = 0.0 for ii, ss in enumerate(s): if self.interpolator is not None: if ss > sMax: Pc = self.interpolator.fluid.upstream_pressure elif ss < sMin: Pc = self.interpolator.fluid.downstream_pressure elif self.cushion_pressure_type == "Total": Pc = config.body.Pc pressure_cushion[ii] = Pc # Calculate internal pressure if self.seal_pressure_method.lower() == "hydrostatic": pressure_internal = ( self.seal_pressure - config.flow.density * config.flow.gravity * ( np.array([self.interp_func_y(si) for si in s]) - config.flow.waterline_height ) ) else: pressure_internal = ( self.seal_pressure * np.ones_like(s) * self.seal_over_pressure_pct ) pressure_external = pressure_fluid + pressure_cushion pressure_total = pressure_external - pressure_internal # Integrate pressure profile, calculate center of pressure and # distribute force to nodes integral = kp.integrate(s, pressure_total) if integral == 0.0: qp = np.zeros(2) else: pct = ( kp.integrate(s, s * pressure_total) / integral - s[0] ) / kp.cumdiff(s) qp = integral * np.array([1 - pct, pct]) integral = kp.integrate(s, tau) if integral == 0.0: qs = np.zeros(2) else: pct = (kp.integrate(s, s * tau) / integral - s[0]) / kp.cumdiff(s) qs = -integral * np.array([1 - pct, pct]) el.set_pressure_and_shear(qp, qs) # Calculate external force and moment for rigid body calculation if ( config.body.cushion_force_method.lower() == "integrated" or config.body.cushion_force_method.lower() == "assumed" ): if config.body.cushion_force_method.lower() == "integrated": integrand = pressure_external elif config.body.cushion_force_method.lower() == "assumed": integrand = pressure_fluid n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t) ] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.D -= kp.integrate(s, np.array(zip(*f)[0])) self.L += kp.integrate(s, np.array(zip(*f)[1])) self.M += kp.integrate(s, np.array(m)) else: if self.interpolator is not None: self.D = self.interpolator.fluid.D self.L = self.interpolator.fluid.L self.M = self.interpolator.fluid.M integrand = pressure_cushion n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.Da -= kp.integrate(s, np.array(list(zip(*f))[0])) self.La += kp.integrate(s, np.array(list(zip(*f))[1])) self.Ma += kp.integrate(s, np.array(m)) def get_normal_vector(self, s): dxds = kp.getDerivative(lambda si: self.get_coordinates(si)[0], s) dyds = kp.getDerivative(lambda si: self.get_coordinates(si)[1], s) return kp.rotateVec(kp.ang2vecd(kp.atand2(dyds, dxds)), -90) def plot_pressure_profiles(self): if self.line_fluid_pressure is not None: self.line_fluid_pressure.set_data( self.get_pressure_plot_points(self.fluidS, self.fluidP) ) if self.line_air_pressure is not None: self.line_air_pressure.set_data( self.get_pressure_plot_points(self.airS, self.airP) ) def get_pressure_plot_points(self, s0, p0): sp = [(s, p) for s, p in zip(s0, p0) if not np.abs(p) < 1e-4] if len(sp) > 0: s0, p0 = list(zip(*sp)) nVec = list(map(self.get_normal_vector, s0)) coords0 = [np.array(self.get_coordinates(s)) for s in s0] coords1 = [ c + config.plotting.pScale * p * n for c, p, n in zip(coords0, p0, nVec) ] return list( zip( *[ xyi for c0, c1 in zip(coords0, coords1) for xyi in [c0, c1, np.ones(2) * np.nan] ] ) ) else: return [], [] def update_geometry(self): self.set_interp_function() def plot(self): for el in self.el: el.plot() # for nd in [self.node[0],self.node[-1]]: # nd.plot() self.plot_pressure_profiles() def set_attachments(self): return None def set_angle(self): return None class FlexibleSubstructure(Substructure): obj = [] res = 0.0 @classmethod def update_all(cls): num_dof = fe.Node.count() * config.flow.num_dim Kg = np.zeros((num_dof, num_dof)) Fg = np.zeros((num_dof, 1)) Ug = np.zeros((num_dof, 1)) # Assemble global matrices for all substructures together for ss in cls.obj: ss.update_fluid_forces() ss.assemble_global_stiffness_and_force() Kg += ss.K Fg += ss.F for nd in fe.Node.All(): for i in range(2): Fg[nd.dof[i]] += nd.fixed_load[i] # Determine fixed degrees of freedom dof = [False for _ in Fg] for nd in fe.Node.All(): for dofi, fdofi in zip(nd.dof, nd.is_dof_fixed): dof[dofi] = not fdofi # Solve FEM linear matrix equation if any(dof): Ug[np.ix_(dof)] = np.linalg.solve(Kg[np.ix_(dof, dof)], Fg[np.ix_(dof)]) cls.res = np.max(np.abs(Ug)) Ug *= config.relax_FEM Ug *= np.min([config.max_FEM_disp / np.max(Ug), 1.0]) for nd in fe.Node.All(): nd.move_coordinates(Ug[nd.dof[0], 0], Ug[nd.dof[1], 0]) for ss in cls.obj: ss.update_geometry() def __init__(self, dict_): # FlexibleSubstructure.obj.append(self) Substructure.__init__(self, dict_) self.element_type = fe.TrussElement self.pretension = self.dict_.read("pretension", -0.5) self.EA = self.dict_.read("EA", 5e7) self.K = None self.F = None # self.U = None config.has_free_structure = True def get_residual(self): return np.max(np.abs(self.U)) def initialize_matrices(self): num_dof = fe.Node.count() * config.flow.num_dim self.K = np.zeros((num_dof, num_dof)) self.F = np.zeros((num_dof, 1)) self.U = np.zeros((num_dof, 1)) def assemble_global_stiffness_and_force(self): if self.K is None: self.initialize_matrices() else: self.K *= 0 self.F *= 0 for el in self.el: self.add_loads_from_element(el) def add_loads_from_element(self, el): K, F = el.get_stiffness_and_force() self.K[np.ix_(el.dof, el.dof)] += K self.F[np.ix_(el.dof)] += F # def getPtDispFEM(self): # if self.K is None: # self.initializeMatrices() ## self.U *= 0.0 # self.update_fluid_forces() # self.assembleGlobalStiffnessAndForce() # # dof = [False for dof in self.F] # for nd in self.node: # for dofi, fdofi in zip(nd.dof, nd.fixedDOF): # dof[dofi] = not fdofi # if any(dof): ## self.U[np.ix_(dof)] = np.linalg.solve(self.K[np.ix_(dof,dof)], self.F[np.ix_(dof)]) # # # Relax displacement and limit step if necessary # self.U *= config.relaxFEM # self.U *= np.min([config.maxFEMDisp / np.max(self.U), 1.0]) # # for nd in self.node: # nd.moveCoordinates(self.U[nd.dof[0],0], self.U[nd.dof[1],0]) # # self.update_geometry() def set_element_properties(self): Substructure.set_element_properties(self) for el in self.el: el.set_properties(axialForce=-self.pretension, EA=self.EA) def update_geometry(self): for el in self.el: el.update_geometry() Substructure.set_interp_function(self) class RigidSubstructure(Substructure): def __init__(self, dict_): Substructure.__init__(self, dict_) self.element_type = fe.RigidElement def set_attachments(self): return None def updateAngle(self): return None def set_fixed_dof(self): for nd in self.node: for j in range(config.flow.num_dim): nd.is_dof_fixed[j] = True class TorsionalSpringSubstructure(FlexibleSubstructure, RigidSubstructure): def __init__(self, dict_): FlexibleSubstructure.__init__(self, dict_) self.element_type = fe.RigidElement self.tip_load_pct = self.dict_.read("tipLoadPct", 0.0) self.base_pt_pct = self.dict_.read("basePtPct", 1.0) self.spring_constant = self.dict_.read("spring_constant", 1000.0) self.theta = 0.0 self.Mt = 0.0 # TODO self.MOld = None self.relax = self.dict_.read("relaxAng", config.body.relax_rigid_body) self.attach_pct = self.dict_.read("attachPct", 0.0) self.attached_node = None self.attached_element = None self.minimum_angle = self.dict_.read("minimumAngle", -float("Inf")) self.max_angle_step = self.dict_.read("maxAngleStep", float("Inf")) config.has_free_structure = True def load_mesh(self): Substructure.load_mesh(self) self.set_fixed_dof() if self.base_pt_pct == 1.0: self.base_pt = self.node[-1].get_coordinates() elif self.base_pt_pct == 0.0: self.base_pt = self.node[0].get_coordinates() else: self.base_pt = self.get_coordinates( self.base_pt_pct * self.get_arc_length() ) self.set_element_properties() self.set_angle(self.dict_.read("initialAngle", 0.0)) def set_attachments(self): attached_substructure_name = self.dict_.read("attachedSubstructure", None) if attached_substructure_name is not None: self.attached_substructure = Substructure.find_by_name( attached_substructure_name ) else: self.attached_substructure = None if self.dict_.read("attachedSubstructureEnd", "End").lower() == "start": self.attached_ind = 0 else: self.attached_ind = -1 if self.attached_node is None and self.attached_substructure is not None: self.attached_node = self.attached_substructure.node[self.attached_ind] self.attached_element = self.attached_substructure.el[self.attached_ind] def update_fluid_forces(self): self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Dt = 0.0 self.Lt = 0.0 self.Mt = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 if self.interpolator is not None: s_min, s_max = self.interpolator.get_min_max_s() for i, el in enumerate(self.el): # Get pressure at end points and all fluid points along element node_s = [self.node_arc_length[i], self.node_arc_length[i + 1]] if self.interpolator is not None: s, pressure_fluid, tau = self.interpolator.get_loads_in_range( node_s[0], node_s[1] ) # Limit pressure to be below stagnation pressure if config.plotting.pressure_limiter: pressure_fluid = np.min( np.hstack( ( pressure_fluid, np.ones_like(pressure_fluid) * config.flow.stagnation_pressure, ) ), axis=0, ) else: s = np.array(node_s) pressure_fluid = np.zeros_like(s) tau = np.zeros_like(s) ss = node_s[1] Pc = 0.0 if self.interpolator is not None: if ss > s_max: Pc = self.interpolator.fluid.get_upstream_pressure() elif ss < s_min: Pc = self.interpolator.fluid.get_downstream_pressure() elif self.cushion_pressure_type == "Total": Pc = config.body.Pc # Store fluid and air pressure components for element (for # plotting) if i == 0: self.fluidS += [s[0]] self.fluidP += [pressure_fluid[0]] self.airS += [node_s[0]] self.airP += [Pc - self.seal_pressure] self.fluidS += [ss for ss in s[1:]] self.fluidP += [pp for pp in pressure_fluid[1:]] self.airS += [ss for ss in node_s[1:]] self.airP += [Pc - self.seal_pressure for _ in node_s[1:]] # Apply ramp to hydrodynamic pressure pressure_fluid *= config.ramp ** 2 # Add external cushion pressure to external fluid pressure pressure_cushion = np.zeros_like(s) Pc = 0.0 for ii, ss in enumerate(s): if self.interpolator is not None: if ss > s_max: Pc = self.interpolator.fluid.get_upstream_pressure() elif ss < s_min: Pc = self.interpolator.fluid.get_downstream_pressure() elif self.cushion_pressure_type == "Total": Pc = config.body.Pc pressure_cushion[ii] = Pc pressure_internal = self.seal_pressure * np.ones_like(s) pressure_external = pressure_fluid + pressure_cushion pressure_total = pressure_external - pressure_internal # Calculate external force and moment for rigid body calculation if ( config.body.cushion_force_method.lower() == "integrated" or config.body.cushion_force_method.lower() == "assumed" ): if config.body.cushion_force_method.lower() == "integrated": integrand = pressure_external elif config.body.cushion_force_method.lower() == "assumed": integrand = pressure_fluid n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] fC = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(pressure_external, tau, n, t) ] fFl = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(pressure_fluid, tau, n, t) ] f = fC + fFl print( ("Cushion Lift-to-Weight: {0}".format(fC[1] / config.body.weight)) ) r = [ np.array([pt[0] - config.body.xCofR, pt[1] - config.body.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.D -= kp.integrate(s, np.array(zip(*f)[0])) self.L += kp.integrate(s, np.array(zip(*f)[1])) self.M += kp.integrate(s, np.array(m)) else: if self.interpolator is not None: self.D = self.interpolator.fluid.D self.L = self.interpolator.fluid.L self.M = self.interpolator.fluid.M # Apply pressure loading for moment calculation # integrand = pFl integrand = pressure_total n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.base_pt[0], pt[1] - self.base_pt[1]]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] fx, fy = list(zip(*f)) self.Dt += kp.integrate(s, np.array(fx)) self.Lt += kp.integrate(s, np.array(fy)) self.Mt += kp.integrate(s, np.array(m)) integrand = pressure_cushion n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.Da -= kp.integrate(s, np.array(zip(*f)[0])) self.La += kp.integrate(s, np.array(zip(*f)[1])) self.Ma += kp.integrate(s, np.array(m)) # Apply tip load tipC = self.get_coordinates(self.tip_load_pct * self.get_arc_length()) tipR = np.array([tipC[i] - self.base_pt[i] for i in [0, 1]]) tipF = np.array([0.0, self.tip_load]) * config.ramp tipM = kp.cross2(tipR, tipF) self.Lt += tipF[1] self.Mt += tipM # Apply moment from attached substructure # el = self.attachedEl # attC = self.attachedNode.get_coordinates() # attR = np.array([attC[i] - self.basePt[i] for i in [0,1]]) # attF = el.axialForce * kp.ang2vec(el.gamma + 180) # attM = kp.cross2(attR, attF) * config.ramp ## attM = np.min([np.abs(attM), np.abs(self.Mt)]) * kp.sign(attM) # if np.abs(attM) > 2 * np.abs(tipM): ## attM = attM * np.abs(tipM) / np.abs(attM) # self.Mt += attM def updateAngle(self): if np.isnan(self.Mt): theta = 0.0 else: theta = -self.Mt if not self.spring_constant == 0.0: theta /= self.spring_constant dTheta = (theta - self.theta) * self.relax dTheta = np.min([np.abs(dTheta), self.max_angle_step]) * np.sign(dTheta) self.set_angle(self.theta + dTheta) def set_angle(self, ang): dTheta = np.max([ang, self.minimum_angle]) - self.theta if self.attached_node is not None and not any( [nd == self.attached_node for nd in self.node] ): attNd = [self.attached_node] else: attNd = [] # basePt = np.array([c for c in self.basePt]) basePt = np.array([c for c in self.node[-1].get_coordinates()]) for nd in self.node + attNd: oldPt = np.array([c for c in nd.get_coordinates()]) newPt = kp.rotatePt(oldPt, basePt, -dTheta) nd.set_coordinates(newPt[0], newPt[1]) self.theta += dTheta self.residual = dTheta self.update_geometry() print((" Deformation for substructure {0}: {1}".format(self.name, self.theta))) def get_residual(self): return self.residual # return self.theta + self.Mt / self.spring_constant def writeDeformation(self): kp.writeasdict( os.path.join( config.it_dir, "deformation_{0}.{1}".format(self.name, config.io.data_format), ), ["angle", self.theta], ) PK!planingfsi/fsi/__init__.pyPK!'&1&1planingfsi/fsi/figure.pyimport os import numpy as np import matplotlib.pyplot as plt import planingfsi.config as config # import planingfsi.krampy as kp class FSIFigure: def __init__(self, solid, fluid): self.solid = solid self.fluid = fluid plt.figure(figsize=(16, 12)) if config.plotting.watch: plt.ion() self.geometryAx = plt.axes([0.05, 0.6, 0.9, 0.35]) for nd in self.solid.node: nd.line_x_yline_xy, = plt.plot([], [], "ro") self.fluid.lineFS, = plt.plot([], [], "b-") self.fluid.lineFSi, = plt.plot([], [], "b--") for struct in self.solid.substructure: struct.line_air_pressure, = plt.plot([], [], "g-") struct.line_fluid_pressure, = plt.plot([], [], "r-") for el in struct.el: el.lineEl0, = plt.plot([], [], "k--") el.lineEl, = plt.plot([], [], "k-", linewidth=2) self.lineCofR = [] for bd in self.solid.rigid_body: CofRPlot( self.geometryAx, bd, symbol=False, style="k--", marker="s", fill=False ) self.lineCofR.append( CofRPlot(self.geometryAx, bd, symbol=False, style="k-", marker="o") ) xMin, xMax = kp.minMax( [nd.x for struct in self.solid.substructure for nd in struct.node] ) yMin, yMax = kp.minMax( [nd.y for struct in self.solid.substructure for nd in struct.node] ) if config.plotting.xmin is not None: xMin = config.plotting.xmin config.plotting.ext_w = 0.0 if config.plotting.xmax is not None: xMax = config.plotting.xmax config.plotting.ext_e = 0.0 if config.plotting.ymin is not None: yMin = config.plotting.ymin config.plotting.ext_s = 0.0 if config.plotting.ymax is not None: yMax = config.plotting.ymax config.plotting.ext_n = 0.0 plt.xlabel(r"$x$ [m]", fontsize=22) plt.ylabel(r"$y$ [m]", fontsize=22) plt.xlim( [ xMin - (xMax - xMin) * config.plotting.ext_w, xMax + (xMax - xMin) * config.plotting.ext_e, ] ) plt.ylim( [ yMin - (yMax - yMin) * config.plotting.ext_s, yMax + (yMax - yMin) * config.plotting.ext_n, ] ) plt.gca().set_aspect("equal") self.TXT = plt.text( 0.05, 0.95, "", ha="left", va="top", transform=plt.gca().transAxes ) self.subplot = [] bd = [bd for bd in self.solid.rigid_body] if len(bd) > 0: self.subplot.append(ForceSubplot([0.70, 0.30, 0.25, 0.2], bd[0])) self.subplot.append(MotionSubplot([0.70, 0.05, 0.25, 0.2], bd[0])) if len(bd) > 1: self.subplot.append(ForceSubplot([0.05, 0.30, 0.25, 0.2], bd[1])) self.subplot.append(MotionSubplot([0.05, 0.05, 0.25, 0.2], bd[1])) self.subplot.append(ResidualSubplot([0.40, 0.05, 0.25, 0.45], self.solid)) def update(self): self.solid.plot() self.fluid.plot_free_surface() self.TXT.set_text( ( r"Iteration {0}" + "\n" + r"$Fr={1:>8.3f}$" + "\n" + r"$\bar{{P_c}}={2:>8.3f}$" ).format(config.it, config.flow.froude_num, config.body.PcBar) ) # Update each lower subplot for s in self.subplot: s.update( self.solid.res < config.solver.max_residual and config.it > config.solver.num_ramp_it ) for l in self.lineCofR: l.update() plt.draw() if config.plotting.save: self.save() def writeTimeHistories(self): for s in self.subplot: if isinstance(s, TimeHistory): s.write() def save(self): plt.savefig( os.path.join( config.path.fig_dir_name, "frame{1:04d}.{0}".format(config.plotting.fig_format, config.it), ), format=config.plotting.fig_format, ) # , dpi=300) def show(self): plt.show(block=True) class PlotSeries: def __init__(self, xFunc, yFunc, **kwargs): self.x = [] self.y = [] self.additionalSeries = [] self.getX = xFunc self.getY = yFunc self.seriesType = kwargs.get("type", "point") self.ax = kwargs.get("ax", plt.gca()) self.legEnt = kwargs.get("legEnt", None) self.ignoreFirst = kwargs.get("ignoreFirst", False) self.line, = self.ax.plot([], [], kwargs.get("sty", "k-")) if self.seriesType == "history+curr": self.additionalSeries.append( PlotSeries( xFunc, yFunc, ax=self.ax, type="point", sty=kwargs.get("currSty", "ro"), ) ) def update(self, final=False): if self.seriesType == "point": self.x = self.getX() self.y = self.getY() if final: self.line.set_marker("*") self.line.set_markerfacecolor("y") self.line.set_markersize(10) else: if self.ignoreFirst and self.getX() == 0: self.x.append(np.nan) self.y.append(np.nan) else: self.x.append(self.getX()) self.y.append(self.getY()) self.line.set_data(self.x, self.y) for s in self.additionalSeries: s.update(final) class TimeHistory: def __init__(self, pos, name="default"): self.series = [] self.ax = [] self.title = "" self.xlabel = "" self.ylabel = "" self.name = name self.addAxes(plt.axes(pos)) def addAxes(self, ax): self.ax.append(ax) def addYAxes(self): self.addAxes(plt.twinx(self.ax[0])) def addSeries(self, series): self.series.append(series) return series def setProperties(self, axInd=0, **kwargs): plt.setp(self.ax[axInd], **kwargs) def createLegend(self, axInd=0): line, name = list( zip(*[(s.line, s.legEnt) for s in self.series if s.legEnt is not None]) ) self.ax[axInd].legend(line, name, loc="lower left") def update(self, final=False): for s in self.series: s.update(final) for ax in self.ax: xMin, xMax = 0.0, config.it + 5 yMin = np.nan for i, l in enumerate(ax.get_lines()): y = l.get_data()[1] if len(np.shape(y)) == 0: y = np.array([y]) try: y = y[np.ix_(~np.isnan(y))] except: y = [] if not np.shape(y)[0] == 0: yMinL = np.min(y) - 0.02 yMaxL = np.max(y) + 0.02 if i == 0 or np.isnan(yMin): yMin, yMax = yMinL, yMaxL else: yMin = np.min([yMin, yMinL]) yMax = np.max([yMax, yMaxL]) ax.set_xlim([xMin, xMax]) if ~np.isnan(yMin) and ~np.isnan(yMax): ax.set_ylim([yMin, yMax]) def write(self): ff = open("{0}_timeHistories.txt".format(self.name), "w") x = self.series[0].x y = list(zip(*[s.y for s in self.series])) for xi, yi in zip(x, y): ff.write("{0:4.0f}".format(xi)) for yii in yi: ff.write(" {0:8.6e}".format(yii)) ff.write("\n") ff.close() class MotionSubplot(TimeHistory): def __init__(self, pos, body): TimeHistory.__init__(self, pos, body.name) itFunc = lambda: config.it drftFunc = lambda: body.draft trimFunc = lambda: body.trim self.addYAxes() # Add plot series to appropriate axis self.addSeries( PlotSeries( itFunc, drftFunc, ax=self.ax[0], sty="b-", type="history+curr", legEnt="Draft", ) ) self.addSeries( PlotSeries( itFunc, trimFunc, ax=self.ax[1], sty="r-", type="history+curr", legEnt="Trim", ) ) self.setProperties( title=r"Motion History: {0}".format(body.name), xlabel=r"Iteration", ylabel=r"$d$ [m]", ) self.setProperties(1, ylabel=r"$\theta$ [deg]") self.createLegend() class ForceSubplot(TimeHistory): def __init__(self, pos, body): TimeHistory.__init__(self, pos, body.name) itFunc = lambda: config.it liftFunc = lambda: body.L / body.weight dragFunc = lambda: body.D / body.weight momFunc = lambda: body.M / (body.weight * config.body.reference_length) self.addSeries( PlotSeries( itFunc, liftFunc, sty="r-", type="history+curr", legEnt="Lift", ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, dragFunc, sty="b-", type="history+curr", legEnt="Drag", ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, momFunc, sty="g-", type="history+curr", legEnt="Moment", ignoreFirst=True, ) ) self.setProperties( title=r"Force & Moment History: {0}".format(body.name), # xlabel=r'Iteration', \ ylabel=r"$\mathcal{D}/W$, $\mathcal{L}/W$, $\mathcal{M}/WL_c$", ) self.createLegend() class ResidualSubplot(TimeHistory): def __init__(self, pos, solid): TimeHistory.__init__(self, pos, "residuals") itFunc = lambda: config.it col = ["r", "b", "g"] for bd, coli in zip(solid.rigid_body, col): self.addSeries( PlotSeries( itFunc, bd.get_res_lift, sty="{0}-".format(coli), type="history+curr", legEnt="ResL: {0}".format(bd.name), ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, bd.get_res_moment, sty="{0}--".format(coli), type="history+curr", legEnt="ResM: {0}".format(bd.name), ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, lambda: np.abs(solid.res), sty="k-", type="history+curr", legEnt="Total", ignoreFirst=True, ) ) self.setProperties(title=r"Residual History", yscale="log") self.createLegend() class CofRPlot: def __init__(self, ax, body, **kwargs): self.ax = ax self.body = body self.symbol = kwargs.get("symbol", True) style = kwargs.get("style", "k-") fill = kwargs.get("fill", True) marker = kwargs.get("marker", "o") if not fill: fcol = "w" else: fcol = "k" self.line, = self.ax.plot([], [], style) self.lineCofG, = self.ax.plot( [], [], "k" + marker, markersize=8, markerfacecolor=fcol ) self.update() def update(self): l = config.plotting.CofR_grid_len c = np.array([self.body.xCofR, self.body.yCofR]) hvec = kp.rotateVec(np.array([0.5 * l, 0.0]), self.body.trim) vvec = kp.rotateVec(np.array([0.0, 0.5 * l]), self.body.trim) pts = [c - hvec, c + hvec, np.ones(2) * np.nan, c - vvec, c + vvec] self.line.set_data(list(zip(*pts))) self.lineCofG.set_data(self.body.xCofG, self.body.yCofG) PK!T planingfsi/fsi/interpolator.pyimport numpy as np from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp class Interpolator: def __init__(self, solid, fluid, dict_=""): self.solid = solid self.fluid = fluid self.solid.set_interpolator(self) self.fluid.interpolator = self self.solidPositionFunction = None self.fluidPressureFunction = None self.getBodyHeight = self.getSurfaceHeightFixedX self.sSep = [] self.sImm = [] if isinstance(dict_, str): dict_ = kp.Dictionary(dict_) # self.Dict = dict_ self.sSepPctStart = dict_.read("sSepPctStart", 0.5) self.sImmPctStart = dict_.read("sImmPctStart", 0.9) def setBodyHeightFunction(self, funcName): exec("self.getBodyHeight = self.{0}".format(funcName)) def set_solid_position_function(self, func): self.solidPositionFunction = func def set_fluid_pressure_function(self, func): self.fluidPressureFunction = func def getSurfaceHeightFixedX(self, x): s = np.max([self.getSFixedX(x, 0.5), 0.0]) return self.get_coordinates(s)[1] def get_coordinates(self, s): return self.solidPositionFunction(s) def get_min_max_s(self): pts = self.fluid._get_element_coords() return [self.getSFixedX(x) for x in [pts[0], pts[-1]]] def getSFixedX(self, fixedX, soPct=0.5): return kp.fzero( lambda s: self.get_coordinates(s)[0] - fixedX, soPct * self.solid.get_arc_length(), ) def getSFixedY(self, fixedY, soPct): return kp.fzero( lambda s: self.get_coordinates(s)[1] - fixedY, soPct * self.solid.get_arc_length(), ) @property def immersed_length(self): if self.sImm == []: self.sImm = self.sImmPctStart * self.solid.get_arc_length() self.sImm = kp.fzero( lambda s: self.get_coordinates(s)[1] - config.flow.waterline_height, self.sImm, ) return self.get_coordinates(self.sImm)[0] def get_separation_point(self): def yCoord(s): return self.get_coordinates(s[0])[1] if self.sSep == []: self.sSep = self.sSepPctStart * self.solid.get_arc_length() self.sSep = fmin(yCoord, np.array([self.sSep]), disp=False, xtol=1e-6)[0] self.sSep = np.max([self.sSep, 0.0]) return self.get_coordinates(self.sSep) def get_loads_in_range(self, s0, s1): x = [self.solidPositionFunction(s)[0] for s in [s0, s1]] x, p, tau = self.fluidPressureFunction(x[0], x[1]) s = np.array([self.getSFixedX(xx, 0.5) for xx in x]) return s, p, tau PK!,!!planingfsi/fsi/simulation.pyimport os import numpy as np import planingfsi.config as config # import planingfsi.io # import planingfsi.krampy as kp # from planingfsi import krampy, io from planingfsi.fe.structure import FEStructure from planingfsi.fsi.interpolator import Interpolator from planingfsi.potentialflow.solver import PotentialPlaningSolver from .figure import FSIFigure class Simulation(object): """Simulation object to manage the FSI problem. Handles the iteration between the fluid and solid solvers. Attributes ---------- solid_solver : FEStructure Solid solver. fluid_solver : PotentialPlaningSolver Fluid solver. """ def __init__(self): # Create solid and fluid solver objects self.solid_solver = FEStructure() self.fluid_solver = PotentialPlaningSolver() # def setFluidPressureFunc(self, func): # self.fluidPressureFunc = func # # def setSolidPositionFunc(self, func): # self.solidPositionFunc = func def update_fluid_response(self): self.fluid_solver.calculate_response() self.solid_solver.update_fluid_forces() def update_solid_response(self): self.solid_solver.calculate_response() def create_dirs(self): config.it_dir = os.path.join(config.path.case_dir, "{0}".format(config.it)) config.path.fig_dir_name = os.path.join( config.path.case_dir, config.path.fig_dir_name ) if self.check_output_interval() and not config.io.results_from_file: kp.createIfNotExist(config.path.case_dir) kp.createIfNotExist(config.it_dir) if config.plotting.save: kp.createIfNotExist(config.path.fig_dir_name) if config.it == 0: for f in os.listdir(config.path.fig_dir_name): os.remove(os.path.join(config.path.fig_dir_name, f)) def load_input_files(self): # Add all rigid bodies if os.path.exists(config.path.body_dict_dir): for dict_name in krampy.listdir_nohidden(config.path.body_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.body_dict_dir, dict_name) ) self.solid_solver.add_rigid_body(dict_) else: self.solid_solver.add_rigid_body() # Add all substructures for dict_name in krampy.listdir_nohidden(config.path.input_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.input_dict_dir, dict_name) ) substructure = self.solid_solver.add_substructure(dict_) if dict_.read("hasPlaningSurface", False): planing_surface = self.fluid_solver.add_planing_surface(dict_) interpolator = Interpolator(substructure, planing_surface, dict_) interpolator.set_solid_position_function(substructure.get_coordinates) interpolator.set_fluid_pressure_function( planing_surface.get_loads_in_range ) # Add all pressure cushions if os.path.exists(config.path.cushion_dict_dir): for dict_name in krampy.listdir_nohidden(config.path.cushion_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.cushion_dict_dir, dict_name) ) self.fluid_solver.add_pressure_cushion(dict_) def run(self): """Run the fluid-structure interaction simulation by iterating between the fluid and solid solvers. """ config.it = 0 self.solid_solver.load_mesh() if config.io.results_from_file: self.create_dirs() self.apply_ramp() self.it_dirs = kp.sortDirByNum(kp.find_files("[0-9]*"))[1] if config.plotting.plot_any: self.figure = FSIFigure(self.solid_solver, self.fluid_solver) # Initialize body at specified trim and draft self.solid_solver.initialize_rigid_bodies() self.update_fluid_response() # Iterate between solid and fluid solvers until equilibrium while config.it <= config.solver.num_ramp_it or ( self.get_residual() >= config.solver.max_residual and config.it <= config.solver.max_it ): # Calculate response if config.has_free_structure: self.apply_ramp() self.update_solid_response() self.update_fluid_response() self.solid_solver.get_residual() else: self.solid_solver.res = 0.0 # Write, print, and plot results self.create_dirs() self.write_results() self.print_status() self.update_figure() # Increment iteration count self.increment() if config.plotting.save and not config.plotting.fig_format == "png": config.plotting.fig_format = "png" self.figure.save() if config.io.write_time_histories: self.figure.write_time_histories() print("Execution complete") if config.plotting.show: self.figure.show() def increment(self): if config.io.results_from_file: old_ind = np.nonzero(config.it == self.it_dirs)[0][0] if not old_ind == len(self.it_dirs) - 1: config.it = int(self.it_dirs[old_ind + 1]) else: config.it = config.max_it + 1 self.create_dirs() else: config.it += 1 def update_figure(self): if config.plotting.plot_any: self.figure.update() def get_body_res(self, x): self.solid_solver.get_pt_disp_rb(x[0], x[1]) self.solid_solver.update_nodal_positions() self.update_fluid_response() # Write, print, and plot results self.create_dirs() self.write_results() self.print_status() self.update_figure() # Update iteration number depending on whether loading existing or # simply incrementing by 1 if config.io.results_from_file: if config.it < len(self.it_dirs) - 1: config.it = int(self.it_dirs[config.it + 1]) else: config.it = config.max_it it += 1 else: config.it += 1 config.res_l = self.solid_solver.rigid_body[0].get_res_l() config.res_m = self.solid_solver.rigid_body[0].get_res_moment() print("Rigid Body Residuals:") print((" Lift: {0:0.4e}".format(config.res_l))) print((" Moment: {0:0.4e}\n".format(config.res_m))) return np.array([config.res_l, config.res_m]) def apply_ramp(self): if config.io.results_from_file: self.loadResults() else: if config.solver.num_ramp_it == 0: ramp = 1.0 else: ramp = np.min((config.it / float(config.solver.num_ramp_it), 1.0)) config.ramp = ramp config.relax_FEM = ( 1 - ramp ) * config.solver.relax_initial + ramp * config.solver.relax_final def get_residual(self): if config.io.results_from_file: return 1.0 return kp.Dictionary( os.path.join(config.it_dir, "overallQuantities.txt") ).read("Residual", 0.0) else: return self.solid_solver.res def print_status(self): print( "Residual after iteration {1:>4d}: {0:5.3e}".format( self.get_residual(), config.it ) ) def check_output_interval(self): return ( config.it >= config.solver.max_it or self.get_residual() < config.solver.max_residual or np.mod(config.it, config.io.write_interval) == 0 ) def write_results(self): if self.check_output_interval() and not config.io.results_from_file: kp.writeasdict( os.path.join(config.it_dir, "overallQuantities.txt"), ["Ramp", config.ramp], ["Residual", self.solid_solver.res], ) self.fluid_solver.write_results() self.solid_solver.write_results() def load_results(self): K = kp.Dictionary(os.path.join(config.it_dir, "overallQuantities.txt")) config.ramp = K.read("Ramp", 0.0) self.solid_solver.res = K.read("Residual", 0.0) PK!$planingfsi/potentialflow/__init__.pyPK!Y55+planingfsi/potentialflow/pressureelement.py"""Module containing definitions of different types of pressure element.""" import numpy as np from scipy.special import sici # TODO: Move all plotting commands to its own module try: import matplotlib.pyplot as plt except ImportError: plt = None pass import planingfsi.config as config # import planingfsi.krampy as kp _PLOT_INFINITY = 50.0 def _get_aux_fg(lam): """Return f and g functions, which are dependent on the auxiliary sine and cosine functions. Combined into one function to reduce number of calls to scipy.special.sici(). """ lam = abs(lam) (aux_sine, aux_cosine) = sici(lam) (sine, cosine) = np.sin(lam), np.cos(lam) return ( cosine * (0.5 * np.pi - aux_sine) + sine * aux_cosine, sine * (0.5 * np.pi - aux_sine) - cosine * aux_cosine, ) def _get_gamma1(lam, aux_g): """Return result of first integral (Gamma_1 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_g : float Second result of `_get_aux_fg` Returns ------- float """ if lam > 0: return (aux_g + np.log(lam)) / np.pi else: return (aux_g + np.log(-lam)) / np.pi + (2 * np.sin(lam) - lam) def _get_gamma2(lam, aux_f): """Return result of second integral (Gamma_2 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_f : float First result of `_get_aux_fg` Returns ------- float """ return kp.sign(lam) * aux_f / np.pi + kp.heaviside(-lam) * (2 * np.cos(lam) - 1) def _get_gamma3(lam, aux_f): """Return result of third integral (Gamma_3 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_f : float First result of `_get_aux_fg` Returns ------- float """ return ( -kp.sign(lam) * aux_f / np.pi - 2 * kp.heaviside(-lam) * np.cos(lam) - kp.heaviside(lam) ) def _eval_left_right(f, x, dx=1e-6): """Evaluate a function by taking the average of the values just above and below the x value. Used to avoid singularity/Inf/NaN. Args ---- f : function Function handle. x : float Argument of function. dx : float, optional Step size. Returns ------ float : Function value. """ return (f(x + dx) + f(x - dx)) / 2 class PressureElement(object): """Abstract base class to represent all different types of pressure elements. Attributes ---------- x_coord : float x-location of element. z_coord : float z-coordinate of element, if on body. pressure : float The pressure/strength of the element. shear_stress : float Shear stress at the element. width : float Width of element. is_source : bool True of element is a source. is_on_body : bool True if on body. Used for force calculation. parent : PressurePatch Pressure patch that this element belongs to. """ def __init__( self, x_coord=np.nan, z_coord=np.nan, pressure=np.nan, shear_stress=0.0, width=np.nan, is_source=False, is_on_body=False, parent=None, ): # TODO: Replace x_coord with coord pointer to Coordinate object, # (e.g. self.coords.x). self.x_coord = x_coord self.z_coord = z_coord self._pressure = pressure self.shear_stress = shear_stress self._width = np.zeros(2) self.width = width self.is_source = is_source self.is_on_body = is_on_body self.parent = parent @property def width(self): return sum(self._width) @property def pressure(self): return self._pressure @pressure.setter def pressure(self, value): self._pressure = value def get_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient of element. Args ---- x_coord : float Dimensional x-coordinate. """ x_rel = x_coord - self.x_coord if self.width == 0.0: return 0.0 elif x_rel == 0.0 or x_rel == self._width[1] or x_rel == -self._width[0]: influence = _eval_left_right(self._get_local_influence_coefficient, x_rel) else: influence = self._get_local_influence_coefficient(x_rel) return influence / (config.flow.density * config.flow.gravity) def get_influence(self, x_coord): """Return _get_local_influence_coefficient for actual pressure. Args ---- x_coord : float Dimensional x-coordinate. """ return self.get_influence_coefficient(x_coord) * self.pressure def _get_local_influence_coefficient(self, x_coord): """Return influence coefficient in iso-geometric coordinates. Each subclass must implement its own method. Args ---- x_coord : float x-coordinate, centered at element location. """ pass def __repr__(self): """Print element attributes.""" string = ( "{0}: (x,z) = ({1}, {2}), width = {3}," "is_source = {4}, p = {5}, is_on_body = {6}" ) return string.format( self.__class__.__name__, self.x_coord, self.z_coord, self.width, self.is_source, self.pressure, self.is_on_body, ) def plot(self, x_coords, color="b"): """Plot pressure element shape.""" _pressure = np.array([self.pressure(xi) for xi in x_coords]) plt.plot(x_coords, _pressure, color=color, linetype="-") plt.plot( self.x_coord * np.ones(2), [0.0, self.pressure], color=color, linetype="--" ) class AftHalfTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile but towards aft direction. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): self._width[0] = width @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel < -self.width or _x_rel > 0.0: return 0.0 else: return self._pressure * (1 + _x_rel / self.width) def _get_local_influence_coefficient(self, x_rel): lambda_0 = config.flow.k0 * x_rel aux_f, aux_g = _get_aux_fg(lambda_0) _influence = _get_gamma1(lambda_0, aux_g) / (self.width * config.flow.k0) _influence += _get_gamma2(lambda_0, aux_f) lambda_2 = config.flow.k0 * (x_rel + self.width) __, aux_g = _get_aux_fg(lambda_2) _influence -= _get_gamma1(lambda_2, aux_g) / (self.width * config.flow.k0) return _influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord - np.array([self.width, 0]) super().plot(x_coords, color="g") class ForwardHalfTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile but towards forward direction. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): self._width[1] = width @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel > self.width or _x_rel < 0.0: return 0.0 else: return self._pressure * (1 - _x_rel / self.width) def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, aux_g = _get_aux_fg(lambda_0) influence = _get_gamma1(lambda_0, aux_g) / (self.width * config.flow.k0) influence -= _get_gamma2(lambda_0, aux_f) lambda_1 = config.flow.k0 * (x_coord - self.width) _, aux_g = _get_aux_fg(lambda_1) influence -= _get_gamma1(lambda_1, aux_g) / (self.width * config.flow.k0) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([0, self.width]) super().plot(x_coords, color="b") class CompleteTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile towards both directions. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) kwargs["width"] = kwargs.get("width", np.zeros(2)) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): if not len(width) == 2: raise ValueError("Width must be length-two array") else: self._width = np.array(width) @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel > self._width[1] or _x_rel < -self._width[0]: return 0.0 elif _x_rel < 0.0: return self._pressure * (1 + _x_rel / self._width[0]) else: return self._pressure * (1 - _x_rel / self._width[1]) def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord _, aux_g = _get_aux_fg(lambda_0) influence = ( _get_gamma1(lambda_0, aux_g) * self.width / (self._width[1] * self._width[0]) ) lambda_1 = config.flow.k0 * (x_coord - self._width[1]) _, aux_g = _get_aux_fg(lambda_1) influence -= _get_gamma1(lambda_1, aux_g) / self._width[1] lambda_2 = config.flow.k0 * (x_coord + self._width[0]) _, aux_g = _get_aux_fg(lambda_2) influence -= _get_gamma1(lambda_2, aux_g) / self._width[0] return influence / config.flow.k0 def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([-self._width[0], 0.0, self._width[1]]) super().plot(x_coords, color="r") class AftSemiInfinitePressureBand(PressureElement): """Semi-infinite pressure band in aft direction. Args ---- **kwargs Same as PressureElement arguments """ @PressureElement.pressure.getter def pressure(self, x_coord=0.0): """Return shape of pressure profile.""" x_rel = x_coord - self.x_coord if x_rel > 0.0: return 0.0 else: return self._pressure def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, _ = _get_aux_fg(lambda_0) influence = _get_gamma2(lambda_0, aux_f) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([-_PLOT_INFINITY, 0]) super().plot(x_coords, color="r") class ForwardSemiInfinitePressureBand(PressureElement): """Semi-infinite pressure band in forward direction. Args ---- **kwargs Same as PressureElement arguments """ @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel < 0.0: return 0.0 else: return self._pressure def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, _ = _get_aux_fg(lambda_0) influence = _get_gamma3(lambda_0, aux_f) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([0, _PLOT_INFINITY]) super().plot(x_coords, color="r") PK!JvLvL)planingfsi/potentialflow/pressurepatch.py"""Classes representing a pressure patch on the free surface.""" import os import numpy as np from scipy.interpolate import interp1d from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp import planingfsi.potentialflow.pressureelement as pe class PressurePatch(object): """Abstract base class representing a patch of pressure elements on the free surface. Attributes ---------- pressure_elements : list List of pressure elements. base_pt : float x-location of base point. length : float Length of pressure patch. is_kutta_unknown : bool True of trailing edge pressure unknown neighbor_up : PressurePatch PressurePatch instance upstream of this one. neighbor_down : PressurePatch PressurePatch instance downstream of this one. interpolator : FSIInterpolator Object to get interpolated body position if a `PlaningSurface` force_dict : dict Dictionary of global force values derived from element pressure/shear stress. parent : PotentialPlaningCalculation Parent solver this patch belongs to. """ def __init__(self, neighbor_up=None, neighbor_down=None, parent=None): self.pressure_elements = [] self._end_pts = np.zeros(2) self.is_kutta_unknown = False self._neighbor_up = None self._neighbor_down = None self.neighbor_up = neighbor_up self.neighbor_down = neighbor_down self.interpolator = None self.force_dict = {} self.parent = parent @property def base_pt(self): return self._end_pts[0] @base_pt.setter def base_pt(self, x): self._end_pts[0] = x @property def length(self): return self._end_pts[1] - self._end_pts[0] @property def neighbor_up(self): return self._neighbor_up @neighbor_up.setter def neighbor_up(self, obj): if obj is not None: self._neighbor_up = obj self._neighbor_up.neighbor_down = self @property def neighbor_down(self): return self._neighbor_down @neighbor_down.setter def neighbor_down(self, obj): if obj is not None: self._neighbor_down = obj self._neighbor_down.neighbor_up = self def _reset_element_coords(self): """Re-distribute pressure element positions given the length and end points of this patch. """ x = self._get_element_coords() for i, el in enumerate(self.pressure_elements): el.x_coord = x[i] if not el.is_source: el.z_coord = self.interpolator.getBodyHeight(x[i]) if isinstance(el, pe.CompleteTriangularPressureElement): el.width = [x[i] - x[i - 1], x[i + 1] - x[i]] elif isinstance(el, pe.ForwardHalfTriangularPressureElement): el.width = x[i + 1] - x[i] elif isinstance(el, pe.AftHalfTriangularPressureElement): el.width = x[i] - x[i - 1] elif isinstance(el, pe.AftSemiInfinitePressureBand): el.width = np.inf else: raise ValueError("Invalid Element Type!") def get_free_surface_height(self, x): """Get free surface height at a position x due to the elements on this patch. Args ---- x : float x-position at which to calculate free-surface height. """ return sum([el.get_influence(x) for el in self.pressure_elements]) def calculate_wave_drag(self): """Calculate wave drag of patch. Returns ------- None """ xo = -10.1 * config.flow.lam xTrough, = fmin(self.get_free_surface_height, xo, disp=False) xCrest, = fmin(lambda x: -self.get_free_surface_height(x), xo, disp=False) self.Dw = ( 0.0625 * config.flow.density * config.flow.gravity * ( self.get_free_surface_height(xCrest) - self.get_free_surface_height(xTrough) ) ** 2 ) def print_forces(self): """Print forces to screen.""" print(("Forces and Moment for {0}:".format(self.patch_name))) print((" Total Drag [N] : {0:6.4e}".format(self.D))) print((" Wave Drag [N] : {0:6.4e}".format(self.Dw))) print((" Pressure Drag [N] : {0:6.4e}".format(self.Dp))) print((" Frictional Drag [N] : {0:6.4e}".format(self.Df))) print((" Total Lift [N] : {0:6.4e}".format(self.L))) print((" Pressure Lift [N] : {0:6.4e}".format(self.Lp))) print((" Frictional Lift [N] : {0:6.4e}".format(self.Lf))) print((" Moment [N-m] : {0:6.4e}".format(self.M))) def write_forces(self): """Write forces to file.""" kp.writeasdict( os.path.join( config.it_dir, "forces_{0}.{1}".format(self.patch_name, config.io.data_format), ), ["Drag", self.D], ["WaveDrag", self.Dw], ["PressDrag", self.Dp], ["FricDrag", self.Df], ["Lift", self.L], ["PressLift", self.Lp], ["FricLift", self.Lf], ["Moment", self.M], ["BasePt", self.base_pt], ["Length", self.length], ) def load_forces(self): """Load forces from file.""" K = kp.Dictionary( os.path.join( config.it_dir, "forces_{0}.{1}".format(self.patch_name, config.io.data_format), ) ) self.D = K.read("Drag", 0.0) self.Dw = K.read("WaveDrag", 0.0) self.Dp = K.read("PressDrag", 0.0) self.Df = K.read("FricDrag", 0.0) self.L = K.read("Lift", 0.0) self.Lp = K.read("PressLift", 0.0) self.Lf = K.read("FricLift", 0.0) self.M = K.read("Moment", 0.0) self.base_pt = K.read("BasePt", 0.0) self.length = K.read("Length", 0.0) class PressureCushion(PressurePatch): """Pressure Cushion consisting solely of source elements. Args ---- dict_ : krampy.Dictionary or str Dictionary instance or string with dictionary path to load properties. **kwargs : dict Keyword arguments. Attributes ---------- patch_name : str Name of patch base_pt : float Right end of `PressurePatch`. """ count = 0 def __init__(self, dict_, **kwargs): super(self.__class__, self).__init__(self, **kwargs) self.index = PressureCushion.count PressureCushion.count += 1 self.patch_name = dict_.read( "pressureCushionName", "pressureCushion{0}".format(self.index) ) # TODO: cushion_type variability should be managed by sub-classing # PressurePatch self.cushion_type = dict_.read("cushionType", "") self.cushion_pressure = kwargs.get( "Pc", dict_.read_load_or_default("cushionPressure", 0.0) ) self.neighbor_up = PlaningSurface.find_by_name( dict_.read("upstreamPlaningSurface", "") ) self.neighbor_down = PlaningSurface.find_by_name( dict_.read("downstreamPlaningSurface", "") ) if self.neighbor_down is not None: self.neighbor_down.upstream_pressure = self.cushion_pressure if self.neighbor_up is not None: self.neighbor_up.kutta_pressure = self.cushion_pressure if self.cushion_type == "infinite": # Dummy element, will have 0 pressure self.pressure_elements += [ pe.AftSemiInfinitePressureBand(is_source=True, is_on_body=False) ] self.pressure_elements += [ pe.AftSemiInfinitePressureBand(is_source=True, is_on_body=False) ] self._end_pts[0] = -1000.0 # doesn't matter where else: self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement( is_source=True, is_on_body=False ) ] Nfl = dict_.read("numElements", 10) self.smoothing_factor = dict_.read("smoothingFactor", np.nan) for n in [self.neighbor_down, self.neighbor_up]: if n is None and ~np.isnan(self.smoothing_factor): self.pressure_elements += [ pe.CompleteTriangularPressureElement( is_source=True, is_on_body=False ) for __ in range(Nfl) ] self.end_pts = [ dict_.read_load_or_default(key, 0.0) for key in ["downstreamLoc", "upstreamLoc"] ] self.pressure_elements += [ pe.AftHalfTriangularPressureElement(is_source=True, is_on_body=False) ] self._update_end_pts() @property def base_pt(self): if self.neighbor_up is None: return self._end_pts[1] else: return self.neighbor_up.base_pt @base_pt.setter def base_pt(self, x): self._end_pts[1] = x def _get_element_coords(self): """Return x-locations of all elements.""" if not self.cushion_type == "smoothed" or np.isnan(self.smoothing_factor): return self.end_pts else: add_width = np.arctanh(0.99) * self.length / (2 * self.smoothing_factor) addL = np.linspace( -add_width, add_width, len(self.pressure_elements) / 2 + 1 ) x = np.hstack((self.end_pts[0] + addL, self.end_pts[1] + addL)) return x def _update_end_pts(self): if self.neighbor_down is not None: self._end_pts[0] = self.neighbor_down.end_pts[1] if self.neighbor_up is not None: self._end_pts[1] = self.neighbor_up.end_pts[0] self._reset_element_coords() # Set source pressure of elements for elNum, el in enumerate(self.pressure_elements): el.is_source = True if self.cushion_type == "smoothed" and not np.isnan(self.smoothing_factor): alf = 2 * self.smoothing_factor / self.length el.set_source_pressure( 0.5 * self.cushion_pressure * ( np.tanh(alf * el.x_coord) - np.tanh(alf * (el.x_coord - self.length)) ) ) # for infinite pressure cushion, first element is dummy, set to # zero, second is semiInfinitePressureBand and set to cushion # pressure elif self.cushion_type == "infinite": if elNum == 0: el.pressure = 0.0 else: el.pressure = self.cushion_pressure else: el.pressure = self.cushion_pressure @PressurePatch.length.setter def length(self, length): self._end_pts[0] = self._end_pts[1] - length def calculate_forces(self): self.calculate_wave_drag() class PlaningSurface(PressurePatch): """Planing Surface consisting of unknown elements.""" count = 0 all = [] @classmethod def find_by_name(cls, name): """Return first planing surface matching provided name. Returns ------- PlaningSurface instance or None of no match found """ if not name == "": matches = [o for o in cls.all if o.patch_name == name] if len(matches) > 0: return matches[0] return None def __init__(self, dict_, **kwargs): PressurePatch.__init__(self) self.index = PlaningSurface.count PlaningSurface.count += 1 PlaningSurface.all.append(self) self.patch_name = dict_.read("substructureName", "") Nfl = dict_.read("Nfl", 0) self.point_spacing = dict_.read("pointSpacing", "linear") self.initial_length = dict_.read("initialLength", None) self.minimum_length = dict_.read("minimumLength", 0.0) self.maximum_length = dict_.read("maximum_length", float("Inf")) self.spring_constant = dict_.read("springConstant", 1e4) self.kutta_pressure = kwargs.get( "kuttaPressure", dict_.read_load_or_default("kuttaPressure", 0.0) ) self._upstream_pressure = kwargs.get( "upstreamPressure", dict_.read_load_or_default("upstreamPressure", 0.0) ) self.is_initialized = False self.is_active = True self.is_kutta_unknown = True self.is_sprung = dict_.read("isSprung", False) if self.is_sprung: self.initial_length = 0.0 self.minimum_length = 0.0 self.maximum_length = 0.0 self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement(is_on_body=False) ] self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement( is_source=True, pressure=self.kutta_pressure ) ] self.pressure_elements += [ pe.CompleteTriangularPressureElement() for __ in range(Nfl - 1) ] self.pressure_elements += [ pe.AftHalfTriangularPressureElement( is_source=True, pressure=self.upstream_pressure ) ] for el in self.pressure_elements: el.parent = self # Define point spacing if self.point_spacing == "cosine": self.pct = 0.5 * (1 - kp.cosd(np.linspace(0, 180, Nfl + 1))) else: self.pct = np.linspace(0.0, 1.0, Nfl + 1) self.pct /= self.pct[-2] self.pct = np.hstack((0.0, self.pct)) @property def upstream_pressure(self): if self.neighbor_up is not None: return self.neighbor_up.cushion_pressure else: return 0.0 @property def downstream_pressure(self): if self.neighbor_down is not None: return self.neighbor_down.cushion_pressure else: return 0.0 @upstream_pressure.setter def upstream_pressure(self, pressure): self._upstream_pressure = pressure el = self.pressure_elements[-1] el.pressure = pressure el.is_source = True el.z_coord = np.nan @PressurePatch.length.setter def length(self, length): length = np.max([length, 0.0]) x0 = self.interpolator.get_separation_point()[0] self._end_pts = [x0, x0 + length] self._reset_element_coords() def initialize_end_pts(self): """Initialize end points to be at separation point and wetted length root. """ self.base_pt = self.interpolator.get_separation_point()[0] if not self.is_initialized: if self.initial_length is None: self.length = self.interpolator.immersed_length - self.base_pt else: self.length = self.initial_length self.is_initialized = True def _reset_element_coords(self): """Set width of first element to be twice as wide.""" PressurePatch._reset_element_coords(self) x = self._get_element_coords() self.pressure_elements[0].width = x[2] - x[0] def _get_element_coords(self): """Get position of pressure elements.""" return self.pct * self.length + self._end_pts[0] def get_residual(self): """Get residual to drive first element pressure to zero.""" if self.length <= 0.0: return 0.0 else: return self.pressure_elements[0].pressure / config.flow.stagnation_pressure def calculate_forces(self): # TODO: Re-factor, this is messy." if self.length > 0.0: el = [el for el in self.pressure_elements if el.is_on_body] self.x = np.array([eli.x_coord for eli in el]) self.p = np.array([eli.pressure for eli in el]) self.p += self.pressure_elements[0].pressure self.shear_stress = np.array([eli.shear_stress for eli in el]) self.s = np.array([self.interpolator.getSFixedX(xx) for xx in self.x]) self.fP = interp1d(self.x, self.p, bounds_error=False, fill_value=0.0) self.fTau = interp1d( self.x, self.shear_stress, bounds_error=False, fill_value=0.0 ) AOA = kp.atand(self._get_body_derivative(self.x)) self.Dp = kp.integrate(self.s, self.p * kp.sind(AOA)) self.Df = kp.integrate(self.s, self.shear_stress * kp.cosd(AOA)) self.Lp = kp.integrate(self.s, self.p * kp.cosd(AOA)) self.Lf = -kp.integrate(self.s, self.shear_stress * kp.sind(AOA)) self.D = self.Dp + self.Df self.L = self.Lp + self.Lf self.M = kp.integrate( self.x, self.p * kp.cosd(AOA) * (self.x - config.body.xCofR) ) else: self.Dp = 0.0 self.Df = 0.0 self.Lp = 0.0 self.Lf = 0.0 self.D = 0.0 self.L = 0.0 self.M = 0.0 self.x = [] if self.is_sprung: self._apply_spring() self.calculate_wave_drag() def get_loads_in_range(self, x0, x1): """Return pressure and shear stress values at points between two arguments. """ # Get indices within range unless length is zero if self.length > 0.0: ind = np.nonzero((self.x > x0) * (self.x < x1))[0] else: ind = [] # Output all values within range if not ind == []: x = np.hstack((x0, self.x[ind], x1)) p = np.hstack((self.fP(x0), self.p[ind], self.fP(x1))) tau = np.hstack((self.fTau(x0), self.shear_stress[ind], self.fTau(x1))) else: x = np.array([x0, x1]) p = np.zeros_like(x) tau = np.zeros_like(x) return x, p, tau def _apply_spring(self): xs = self.pressure_elements[0].x_coord zs = self.pressure_elements[0].z_coord disp = zs - self.parent.get_free_surface_height(xs) Fs = -self.spring_constant * disp self.L += Fs self.M += Fs * (xs - config.body.xCofR) def _calculate_shear_stress(self): def shear_stress_func(xx): if xx == 0.0: return 0.0 else: Rex = config.flow.flow_speed * xx / config.flow.kinematic_viscosity return ( 0.332 * config.flow.density * config.flow.flow_speed ** 2 * Rex ** -0.5 ) x = self._get_element_coords()[0:-1] s = np.array([self.interpolator.getSFixedX(xx) for xx in x]) s = s[-1] - s for si, el in zip(s, self.pressure_elements): el.shear_stress = shear_stress_func(si) def _get_body_derivative(self, x, direction="r"): if isinstance(x, float): x = [x] return np.array( [ kp.getDerivative(self.interpolator.getBodyHeight, xx, direction) for xx in x ] ) PK!kVWVW"planingfsi/potentialflow/solver.py"""Fundamental module for constructing and solving planing potential flow problems """ from os.path import join import numpy as np from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp # from planingfsi import io from planingfsi.potentialflow.pressurepatch import PlaningSurface from planingfsi.potentialflow.pressurepatch import PressureCushion if config.plotting.plot_any: import matplotlib.pyplot as plt class PotentialPlaningSolver(object): """The top-level object which handles calculation of the potential flow problem. Pointers to the planing surfaces and pressure elements are stored for reference during initialization and problem setup. The potential planing calculation is then solved during iteration with the structural solver. """ # Class method to handle singleton behavior __instance = None def __new__(cls): if cls.__instance is None: cls.__instance = object.__new__(cls) cls.__instance.init() return cls.__instance def init(self): # Not __init__. This method gets called by __new__ on the first # instance. self.X = None self.D = None self.Dw = None self.Dp = None self.Df = None self.L = None self.Lp = None self.Lf = None self.M = None self.p = None self.shear_stress = None self.xFS = None self.zFS = None self.x = None self.xBar = None self.xHist = None self.storeLen = None self.max_len = None self.planing_surfaces = [] self.pressure_cushions = [] self.pressure_patches = [] self.pressure_elements = [] self.lineFS = None self.lineFSi = None self.solver = None self.min_len = None self.init_len = None def add_pressure_patch(self, instance): """Add pressure patch to the calculation. Args ---- instance : PressurePatch The pressure patch object to add. Returns ------- None """ self.pressure_patches.append(instance) self.pressure_elements += [el for el in instance.pressure_elements] return None def add_planing_surface(self, dict_, **kwargs): """Add planing surface to the calculation from a dictionary file name. Args ---- dict_ : planingfsi.io.Dictionary The dictionary file. Returns ------- PlaningSurface Instance created from dictionary. """ kwargs["parent"] = self instance = PlaningSurface(dict_, **kwargs) self.planing_surfaces.append(instance) self.add_pressure_patch(instance) return instance def add_pressure_cushion(self, dict_, **kwargs): """Add pressure cushion to the calculation from a dictionary file name. Args ---- dict_ : planingfsi.io.Dictionary The dictionary file. Returns ------- PressureCushion Instance created from dictionary. """ kwargs["parent"] = self instance = PressureCushion(dict_, **kwargs) self.pressure_cushions.append(instance) self.add_pressure_patch(instance) return instance def get_planing_surface_by_name(self, name): """Return planing surface by name. Args ---- name : str Name of planing surface. Returns ------- PlaningSurface Planing surface object match """ l = [surf for surf in self.planing_surfaces if surf.patch_name == name] if len(l) > 0: return l[0] else: return None def print_element_status(self): """Print status of each element.""" for el in self.pressure_elements: el.print_element() return None def calculate_pressure(self): """Calculate pressure of each element to satisfy body boundary conditions. """ # Form lists of unknown and source elements unknownEl = [ el for el in self.pressure_elements if not el.is_source and el.width > 0.0 ] nanEl = [el for el in self.pressure_elements if el.width <= 0.0] sourceEl = [el for el in self.pressure_elements if el.is_source] # Form arrays to build linear system # x location of unknown elements X = np.array([el.x_coord for el in unknownEl]) # influence coefficient matrix (to be reshaped) A = np.array([el.get_influence_coefficient(x) for x in X for el in unknownEl]) # height of each unknown element above waterline Z = np.array([el.z_coord for el in unknownEl]) - config.flow.waterline_height # subtract contribution from source elements Z -= np.array([np.sum([el.get_influence(x) for el in sourceEl]) for x in X]) # Solve linear system dim = len(unknownEl) if not dim == 0: p = np.linalg.solve(np.reshape(A, (dim, dim)), np.reshape(Z, (dim, 1)))[ :, 0 ] else: p = np.zeros_like(Z) # Apply calculated pressure to the elements for pi, el in zip(p, unknownEl): el.pressure = pi for el in nanEl: el.pressure = 0.0 return None def get_residual(self, Lw): """Return array of residuals of each planing surface to satisfy Kutta condition. Args ---- Lw : np.ndarray Array of wetted lengths of planing surfaces. Returns ------- np.ndarray Array of trailing edge residuals of each planing surface. """ # Set length of each planing surface for Lwi, p in zip(Lw, self.planing_surfaces): p.length = np.min([Lwi, p.maximum_length]) # Update bounds of pressure cushions for p in self.pressure_cushions: p._update_end_pts() # Solve for unknown pressures and output residual self.calculate_pressure() res = np.array([p.get_residual() for p in self.planing_surfaces]) def array_to_string(array): return ", ".join(["{0:11.4e}".format(a) for a in array]).join("[]") print(" Lw: ", array_to_string(Lw)) print(" Residual:", array_to_string(res)) print() return res def calculate_response(self): """Calculate response, including pressure and free-surface profiles. Will load results from file if specified. """ if config.io.results_from_file: self.load_response() else: self.calculate_response_unknown_wetted_length() def calculate_response_unknown_wetted_length(self): """Calculate potential flow problem via iteration to find wetted length of all planing surfaces to satisfy all trailing edge conditions. """ # Reset results so they will be recalculated after new solution self.xFS = None self.X = None self.xHist = [] # Initialize planing surface lengths and then solve until residual is # *zero* if len(self.planing_surfaces) > 0: for p in self.planing_surfaces: p.initialize_end_pts() print(" Solving for wetted length:") if self.min_len is None: self.min_len = np.array( [p.minimum_length for p in self.planing_surfaces] ) self.max_len = np.array( [p.maximum_length for p in self.planing_surfaces] ) self.init_len = np.array( [p.initial_length for p in self.planing_surfaces] ) else: for i, p in enumerate(self.planing_surfaces): L = p.get_length() self.init_len[i] = p.initial_length if ~np.isnan(L) and L - self.min_len[i] > 1e-6: # and self.solver.it < self.solver.maxIt: self.init_len[i] = L * 1.0 else: self.init_len[i] = p.initial_length dxMaxDec = config.solver.wetted_length_max_step_pct_dec * ( self.init_len - self.min_len ) dxMaxInc = config.solver.wetted_length_max_step_pct_inc * ( self.init_len - self.min_len ) for i, p in enumerate(self.planing_surfaces): if p.initial_length == 0.0: self.init_len[i] = 0.0 if self.solver is None: self.solver = kp.RootFinder( self.get_residual, self.init_len * 1.0, config.solver.wetted_length_solver, xMin=self.min_len, xMax=self.max_len, errLim=config.solver.wetted_length_tol, dxMaxDec=dxMaxDec, dxMaxInc=dxMaxInc, firstStep=1e-6, maxIt=config.solver.wetted_length_max_it_0, maxJacobianResetStep=config.solver.wetted_length_max_jacobian_reset_step, relax=config.solver.wetted_length_relax, ) else: self.solver.maxIt = config.solver.wetted_length_max_it self.solver.reinitialize(self.init_len * 1.0) self.solver.setMaxStep(dxMaxInc, dxMaxDec) if any(self.init_len > 0.0): self.solver.solve() self.calculate_pressure_and_shear_profile() def plot_residuals_over_range(self): """Plot residuals.""" self.storeLen = np.array([p.get_length() for p in self.planing_surfaces]) xx, yy = list(zip(*self.xHist)) N = 10 L = np.linspace(0.001, 0.25, N) X = np.zeros((N, N)) Y = np.zeros((N, N)) Z1 = np.zeros((N, N)) Z2 = np.zeros((N, N)) for i, Li in enumerate(L): for j, Lj in enumerate(L): x = np.array([Li, Lj]) y = self.get_residual(x) X[i, j] = x[0] Y[i, j] = x[1] Z1[i, j] = y[0] Z2[i, j] = y[1] for i, Zi, seal in zip([2, 3], [Z1, Z2], ["bow", "stern"]): plt.figure(i) plt.contourf(X, Y, Zi, 50) plt.gray() plt.colorbar() plt.contour(X, Y, Z1, np.array([0.0]), colors="b") plt.contour(X, Y, Z2, np.array([0.0]), colors="b") plt.plot(xx, yy, "k.-") if self.solver.converged: plt.plot(xx[-1], yy[-1], "y*", markersize=10) else: plt.plot(xx[-1], yy[-1], "rx", markersize=8) plt.title("Residual for {0} seal trailing edge pressure".format(seal)) plt.xlabel("Bow seal length") plt.ylabel("Stern seal length") plt.savefig("{0}SealResidual_{1}.png".format(seal, config.it), format="png") plt.clf() self.get_residual(self.storeLen) def calculate_pressure_and_shear_profile(self): """Calculate pressure and shear stress profiles over plate surface.""" if self.X is None: if config.flow.include_friction: for p in self.planing_surfaces: p._calculate_shear_stress() # Calculate forces on each patch for p in self.pressure_patches: p.calculate_forces() # Calculate pressure profile if len(self.pressure_patches) > 0: self.X = np.sort( np.unique( np.hstack( [p._get_element_coords() for p in self.pressure_patches] ) ) ) self.p = np.zeros_like(self.X) self.shear_stress = np.zeros_like(self.X) else: self.X = np.array([-1e6, 1e6]) self.p = np.zeros_like(self.X) self.shear_stress = np.zeros_like(self.X) for el in self.pressure_elements: if el.is_on_body: self.p[el.x_coord == self.X] += el.pressure self.shear_stress[el.x_coord == self.X] += el.shear_stress # Calculate total forces as sum of forces on each patch for var in ["D", "Dp", "Df", "Dw", "L", "Lp", "Lf", "M"]: setattr( self, var, sum([getattr(p, var) for p in self.pressure_patches]) ) f = self.get_free_surface_height xo = -10.1 * config.flow.lam xTrough, = fmin(f, xo, disp=False) xCrest, = fmin(lambda x: -f(x), xo, disp=False) self.Dw = ( 0.0625 * config.flow.density * config.flow.gravity * (f(xCrest) - f(xTrough)) ** 2 ) # Calculate center of pressure self.xBar = kp.integrate(self.X, self.p * self.X) / self.L if config.plotting.show_pressure: self.plot_pressure() def calculate_free_surface_profile(self): """Calculate free surface profile.""" if self.xFS is None: xFS = [] # Grow points upstream and downstream from first and last plate for surf in self.planing_surfaces: if surf.length > 0: pts = surf._get_element_coords() xFS.append( kp.growPoints( pts[1], pts[0], config.plotting.x_fs_min, config.plotting.growth_rate, ) ) xFS.append( kp.growPoints( pts[-2], pts[-1], config.plotting.x_fs_max, config.plotting.growth_rate, ) ) # Add points from each planing surface fsList = [ patch._get_element_coords() for patch in self.planing_surfaces if patch.length > 0 ] if len(fsList) > 0: xFS.append(np.hstack(fsList)) # Add points from each pressure cushion xFS.append( np.linspace(config.plotting.x_fs_min, config.plotting.x_fs_max, 100) ) # for patch in self.pressure_cushions: # if patch.neighborDown is not None: # ptsL = patch.neighborDown._get_element_coords() # else: # ptsL = np.array([patch.endPt[0] - 0.01, patch.endPt[0]]) # # if patch.neighborDown is not None: # ptsR = patch.neighborUp._get_element_coords() # else: # ptsR = np.array([patch.endPt[1], patch.endPt[1] + 0.01]) # # xEnd = ptsL[-1] + 0.5 * patch.get_length() # xFS.append( # kp.growPoints(ptsL[-2], ptsL[-1], # xEnd, config.growthRate)) # xFS.append( # kp.growPoints(ptsR[1], ptsR[0], xEnd, config.growthRate)) # Sort x locations and calculate surface heights if len(xFS) > 0: self.xFS = np.sort(np.unique(np.hstack(xFS))) self.zFS = np.array(list(map(self.get_free_surface_height, self.xFS))) else: self.xFS = np.array([config.xFSMin, config.xFSMax]) self.zFS = np.zeros_like(self.xFS) def get_free_surface_height(self, x): """Return free surface height at a given x-position consideringthe contributions from all pressure patches. Args ---- x : float x-position at which free surface height shall be returned. Returns ------- float Free-surface position at input x-position """ return sum( [patch.get_free_surface_height(x) for patch in self.pressure_patches] ) def get_free_surface_derivative(self, x, direction="c"): """Return slope (derivative) of free-surface profile. Args ---- x : float x-position Returns ------- float Derivative or slope of free-surface profile """ ddx = kp.getDerivative(self.get_free_surface_height, x, direction=direction) return ddx def write_results(self): """Write results to file.""" # Post-process results from current solution # self.calculate_pressure_and_shear_profile() self.calculate_free_surface_profile() if self.D is not None: self.write_forces() if self.X is not None: self.write_pressure_and_shear() if self.xFS is not None: self.write_free_surface() def write_pressure_and_shear(self): """Write pressure and shear stress profiles to data file.""" if len(self.pressure_elements) > 0: kp.writeaslist( join( config.it_dir, "pressureAndShear.{0}".format(config.io.data_format) ), ["x [m]", self.X], ["p [Pa]", self.p], ["shear_stress [Pa]", self.shear_stress], ) def write_free_surface(self): """Write free-surface profile to file.""" if len(self.pressure_elements) > 0: kp.writeaslist( join(config.it_dir, "freeSurface.{0}".format(config.io.data_format)), ["x [m]", self.xFS], ["y [m]", self.zFS], ) def write_forces(self): """Write forces to file.""" if len(self.pressure_elements) > 0: kp.writeasdict( join(config.it_dir, "forces_total.{0}".format(config.io.data_format)), ["Drag", self.D], ["WaveDrag", self.Dw], ["PressDrag", self.Dp], ["FricDrag", self.Df], ["Lift", self.L], ["PressLift", self.Lp], ["FricLift", self.Lf], ["Moment", self.M], ) for patch in self.pressure_patches: patch.write_forces() def load_response(self): """Load results from file.""" self.load_forces() self.load_pressure_and_shear() self.load_free_surface() def load_pressure_and_shear(self): """Load pressure and shear stress from file.""" self.x, self.p, self.shear_stress = np.loadtxt( join(config.it_dir, "pressureAndShear.{0}".format(config.io.data_format)), unpack=True, ) for el in [ el for patch in self.planing_surfaces for el in patch.pressure_elements ]: compare = np.abs(self.x - el.get_xloc()) < 1e-6 if any(compare): el.set_pressure(self.p[compare][0]) el.set_shear_stress(self.shear_stress[compare][0]) for p in self.planing_surfaces: p.calculate_forces() def load_free_surface(self): """Load free surface coordinates from file.""" try: Data = np.loadtxt( join(config.it_dir, "freeSurface.{0}".format(config.io.data_format)) ) self.xFS = Data[:, 0] self.zFS = Data[:, 1] except IOError: self.zFS = np.zeros_like(self.xFS) return None def load_forces(self): """Load forces from file.""" K = kp.Dictionary( join(config.it_dir, "forces_total.{0}".format(config.io.data_format)) ) self.D = K.read("Drag", 0.0) self.Dw = K.read("WaveDrag", 0.0) self.Dp = K.read("PressDrag", 0.0) self.Df = K.read("FricDrag", 0.0) self.L = K.read("Lift", 0.0) self.Lp = K.read("PressLift", 0.0) self.Lf = K.read("FricLift", 0.0) self.M = K.read("Moment", 0.0) for patch in self.pressure_patches: patch.load_forces() def plot_pressure(self): """Create a plot of the pressure and shear stress profiles.""" plt.figure(figsize=(5.0, 5.0)) plt.xlabel(r"$x/L_i$") plt.ylabel(r"$p/(1/2\rho U^2)$") for el in self.pressure_elements: el.plot() plt.plot(self.X, self.p, "k-") plt.plot(self.X, self.shear_stress * 1000, "c--") # Scale y axis by stagnation pressure for line in plt.gca().lines: x, y = line.get_data() line.set_data( x / config.body.reference_length * 2, y / config.flow.stagnation_pressure, ) plt.xlim([-1.0, 1.0]) # plt.xlim(kp.minMax(self.X / config.Lref * 2)) plt.ylim( [0.0, np.min([1.0, 1.2 * np.max(self.p / config.flow.stagnation_pressure)])] ) plt.savefig( "pressureElements.{0}".format(config.figFormat), format=config.figFormat ) plt.figure(1) return None def plot_free_surface(self): """Create a plot of the free surface profile.""" self.calculate_free_surface_profile() if self.lineFS is not None: self.lineFS.set_data(self.xFS, self.zFS) endPts = np.array([config.plotting.x_fs_min, config.plotting.x_fs_max]) if self.lineFSi is not None: self.lineFSi.set_data( endPts, config.flow.waterline_height * np.ones_like(endPts) ) return None PK!H#Dc+planingfsi-0.1.0.dist-info/entry_points.txtN+I/N.,()JOK-J,IM-ΰ-IKO+δJ&fqA%܂= \PK!;66"planingfsi-0.1.0.dist-info/LICENSEMIT License Copyright (c) 2018 Matthew Robert Kramer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU planingfsi-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hl #planingfsi-0.1.0.dist-info/METADATAVmO7_1UD"PS^헨R|[k/\wf_ҪTB<̘T*~k'jp ުִ*q/='XQ[[MYܤua& J'BX6%ҹQhbϮ« 9Sq9MZg$]I1mj{7H*V":xW`]q'[cHrc~ڰPj׈Og6cqݑ(?䰇GϞʜ4 Հ!P l;L6"W@*GM٧"8-DF)nLqOAeBL=0% C6@]%.!GS^IK %)ڝvRa8yLv׊l%O*ѹVwH6C4[=ܐn88JWj\B<TLYRaJ9dH:[7+b w\ e̸#αiV(TSxm4%-}oB>~jF֮٘qcA츣,jeh8>Y딣{wWsAtױfΫCj"ɞ^+.ƶ{r*[!nǾpal3A y&_HaRgMt_O*GP徼cP7M%SiFԪ6ohE(=S`H,7{qpɿiW0N *O,%_PI 4_4[TK2ӳj`)RDtv`CUO=^9K$r.EV"gg U7ˑ 㜍=^ ƆvC2{3< mT=ļ7Un{SՆ$y wCRlK}&R\囅˧zB|EЉ$%/PK!HU|$!planingfsi-0.1.0.dist-info/RECORDչXm%BBlbX B=vjj:2Ag=C| B d?6bdR$LXM[QDoV{] zN|8R]m>rmN`(#gUixp{ƂRv'R}S nK7*S-qp s})QR-E*yMHB"a_wZ/s< Z*Hcaq&*45 #!ڦ n3XSYTGT+SM8rF I3ۃj\lPuw}U/$92,1aQA8oĐ,EfHeV6wҾ_wk`P]N\IWRELe^9zU6)r4AyHai܁]/":i.F{ʒ4}S_m) 0t$=j^-֘早lZ>OuUpr2|Gu|_uK6k=0?g:XV,^vk qg@G bٯ&G^LڒE}('IesfPA՟M.sxȏegK5L]lN3V:5M~ŞD8!84vlܤRIYvV-&!7 ȗEAPK!zplaningfsi/__init__.pyPK!8planingfsi/__version__.pyPK!LYYplaningfsi/cli.pyPK!88 planingfsi/config.pyPK!`Aplaningfsi/fe/__init__.pyPK!(VF  Aplaningfsi/fe/felib.pyPK!$5$5Tplaningfsi/fe/femesh.pyPK!0planingfsi/fe/structure.pyPK!|Pplaningfsi/fsi/__init__.pyPK!'&1&1Pplaningfsi/fsi/figure.pyPK!T planingfsi/fsi/interpolator.pyPK!,!!planingfsi/fsi/simulation.pyPK!$ٮplaningfsi/potentialflow/__init__.pyPK!Y55+planingfsi/potentialflow/pressureelement.pyPK!JvLvL)planingfsi/potentialflow/pressurepatch.pyPK!kVWVW"1planingfsi/potentialflow/solver.pyPK!H#Dc+hplaningfsi-0.1.0.dist-info/entry_points.txtPK!;66"planingfsi-0.1.0.dist-info/LICENSEPK!HnHTU kplaningfsi-0.1.0.dist-info/WHEELPK!Hl #planingfsi-0.1.0.dist-info/METADATAPK!HU|$!planingfsi-0.1.0.dist-info/RECORDPK1