PK!}ZZkrampy/__init__.py"""krampy provides useful utilities for other Python projects.""" import logging VERSION = (0, 1, 1) __version__ = ".".join(map(str, VERSION)) logger = logging.getLogger("krampy") logger.setLevel("INFO") from .iterator import Iterator # from solver import RootFinder, fzero # from trigonometry import * # from general import * # import unit PK!DV--krampy/general.pyimport errno import os import sys from fnmatch import fnmatch import numpy as np """General utilities.""" import errno import os import re from fnmatch import fnmatch import numpy def mkdir_p(path): """Create directory if it doesn't exist""" try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise def slugify(value): """Normalize a string, convert to lowercase, remove non-alpha characters, and convert spaces to hypens. Inspired by Django: https://stackoverflow.com/questions/295135/turn-a-string-into-a-valid-filename. """ value = re.sub(r"[^\w\s\.-]", "", value).strip() value = re.sub(r"[-\s]+", "-", value) return value def camel_to_snake(camel_string): """Convert a CamelCase string to snake_string format.""" first_cap_re = re.compile(r"(.)([A-Z][a-z]+)") all_cap_re = re.compile(r"([a-z0-9])([A-Z])") sub_1 = first_cap_re.sub(r"\1_\2", camel_string) return all_cap_re.sub(r"\1_\2", sub_1).lower() def force_extension(filename, ext): """Force a file extension to be replaced in fileName""" # Remove existing extension split = filename.split(".") if len(split) > 1: filename = ".".join(split[:-1]) return "".join((filename, ext)) def match(match_list, search_str, startindex=0, ignore=None, **kwargs): """Return the first index of an item in provided iterable where the search string is matched.""" for i, val in enumerate(match_list[startindex:], start=startindex): if fnmatch(str(val), search_str) and i >= startindex: if val is not None and not val == ignore: return i if "default" in kwargs: return kwargs.get("default") raise ValueError("Search string {0} not found".format(search_str)) def index_match(index_list, match_list, search_str, **kwargs): """Index an array by matching a wildcard string in another list. Parameters ---------- index_list : list of object The list to return a value from. match_list : list of str The list containing strings to match with the wildcard. searc_str : str A wildcard string to match. Returns ------- object The first object in the index list that matches the wildcard. """ index = match(match_list, search_str, **kwargs) if not isinstance(index, int): return kwargs.get("default") return index_list[index] # def str2bool(str): # return str.lower() == 'true' # # # def writeTimeHistoryFile(fileName, *args, **kwargs): # delimiter = kwargs.get('delimiter', '\t') # ff = open(fileName, 'w') # # Write header # ff.write('# ') # for arg in args: # if not arg[0] == '': # ff.write(arg[0]) # if arg == args[-1]: # ff.write('\n') # else: # ff.write(delimiter) # # Write each row of values, and each arg is a different column # for i in range(len(args[0][1])): # for arg in args: # ff.write('{0:0.6e}'.format(arg[1][i])) # if arg == args[-1]: # ff.write('\n') # else: # ff.write(delimiter) # ff.close() # # # def resampleTime(t, y, dt): # tNew = np.linspace(t[0], t[-1], int((t[-1] - t[0]) / dt)) # yNew = np.zeros_like(tNew) # for i in range(len(tNew) - 1): # ind = np.nonzero(t <= tNew[i])[0][-1] # # yNew[i] = y[ind] + (y[ind + 1] - y[ind]) * (tNew[i] - t[ind]) / (t[ind + 1] - t[ind]) # # return tNew, yNew # # # def deriv(f, x): # ''' # Calculate derivative of function f at points x using central differencing, # except at the ends where one-sided differencing is used. # ''' # dfdx = np.zeros_like(x) # for i in range(len(x)): # if i == 0: # ind = [0, 1] # elif i == len(x) - 1: # ind = [-1, 0] # else: # ind = [-1, 1] # dfdx[i] = (f[i + ind[1]] - f[i + ind[0]]) / (x[i + ind[1]] - x[i + ind[0]]) # return dfdx # # # def mkdir_p(path): # '''Create directory if it doesn't exist''' # try: # os.makedirs(path) # except OSError as exc: # if exc.errno == errno.EEXIST and os.path.isdir(path): # pass # else: # raise # # # def symlink_f(source, target): # '''Force-create symlink at target pointing to soirce''' # try: # os.symlink(source, target) # except OSError: # os.remove(target) # os.symlink(source, target) # except: # raise OSError('Failed to create symlink') # # # def cumtrapz(x, y, reverse=False): # ''' Cumulative trapezoidal integration of a function.''' # if reverse: # x = x[::-1] # y = y[::-1] # # I = np.zeros_like(x) # area = 0.5 * np.diff(x) * (y[1:] + y[:-1]) # for i in range(1, len(I)): # I[i] = I[i - 1] + area[i - 1] # # if reverse: # I = I[::-1] # # return I # # # def parseKeywordArgs(args=sys.argv[1:]): # return parseArgs(args)[1] # # # def parseArgs(args=sys.argv[1:]): # '''Parse argument list, typically system arguments from calling script as executable. # ''' # newargs = [] # kwargs = {} # while len(args) > 0: # arg = args.pop(0) # if '=' in arg: # key, _, val = arg.partition('=') # if arg == 'True': # kwargs[key] = True # elif val == 'False': # kwargs[key] = False # else: # try: # kwargs[key] = float(val) # except: # kwargs[key] = val # else: # newargs.append(arg) # return tuple(newargs), kwargs # # def match(matchList, searchStr, startindex=0, ignore=None, **kwargs): # '''Return the first index of an item in provided iterable where the search string is matched.''' # for i, val in enumerate(matchList[startindex:], start=startindex): # if fnmatch(str(val), searchStr) and i >= startindex: # if val is not None and not val == ignore: # return i # if 'default' in kwargs: # return kwargs.get('default') # raise ValueError('Search string {0} not found'.format(searchStr)) # # # def indexMatch(indexList, matchList, searchStr, **kwargs): # '''Index an array indexList by matching searchStr with the first matching element of searchList.''' # index = match(matchList, searchStr, **kwargs) # if not isinstance(index, int): # return kwargs.get('default') # return indexList[index] # def trapz(x, f): # # Trapezoidal integration # I = np.argsort(x) # x = x[I] # f = f[I] # f[np.nonzero(np.abs(f) == float('Inf'))] = 0.0 # # return 0.5 * np.sum((x[1:] - x[0:-1]) * (f[1:] + f[0:-1])) # # # def integrate(x, f): # return trapz(x, f) # # # def growPoints(x0, x1, xMax, rate=1.1): # dx = x1 - x0 # x = [x1] # # if dx > 0: # done = lambda xt: xt > xMax # elif dx < 0: # done = lambda xt: xt < xMax # else: # done = lambda xt: True # # while not done(x[-1]): # x.append(x[-1] + dx) # dx *= rate # # return np.array(x[1:]) # # # def fillPoints(x0, x1, L, pctLast, targetRate=1.1): # dxLast = pctLast * L # x2 = x1 + np.sign(x1 - x0) * (L - 0.5 * dxLast) # # func = lambda rate: growPoints(x0, x1, x2, rate) # res1 = lambda rate: np.abs(np.diff(func(rate)[-2:])[0]) - dxLast # res2 = lambda rate: func(rate)[-1] - x2 # # return func(fzero(res2, fzero(res1, targetRate))) # # # def getDerivative(f, x, direction='c'): # dx = 1e-6 # fr = f(x + dx) # fl = f(x - dx) # # if direction[0].lower() == 'r' or np.isnan(fl): # return (fr - f(x)) / dx # elif direction[0].lower() == 'l' or np.isnan(fr): # return (f(x) - fl) / dx # else: # return (f(x + dx) - f(x - dx)) / (2 * dx) # # # def rotatePt(oldPt, basePt, ang): # relPos = np.array(oldPt) - np.array(basePt) # newPos = rotateVec(relPos, ang) # newPt = basePt + newPos # # return newPt # # # def forceExtension(fileName, ext): # '''Force a file extension to be replaced in fileName''' # # Remove existing extension # split = fileName.split('.') # if len(split) > 1: # fileName = '.'.join(split[:-1]) # # return ''.join((fileName, ext)) # # # def getFiles(directory, pattern): # '''Function returns files in data directory matching provided wildcard pattern''' # return [os.path.join(directory, f) for f in os.listdir(directory) if fnmatch(f, pattern)] # def extrap1d(interpolator): # xs = interpolator.x # ys = interpolator.y # # def pointwise(x): # if x < xs[0]: # return ys[0] + (x - xs[0]) * (ys[1] - ys[0]) / (xs[1] - xs[0]) # elif x > xs[-1]: # return ys[-1] + (x - xs[-1]) * (ys[-1] - ys[-2]) / (xs[-1] - xs[-2]) # else: # return interpolator(x) # # return pointwise # # def minMax(x): # return min(x), max(x) # # def sign(x): # if x > 0: # return 1.0 # elif x < 0: # return -1.0 # else: # return 0.0 # # def heaviside(x): # if x > 0: # return 1.0 # elif x < 0: # return 0.0 # else: # return 0.5 # # def inRange(x, lims): # if x >= lims[0] and x <= lims[1]: # return True # else: # return False # # def createIfNotExist(dirName): # if not os.path.exists(dirName): # os.makedirs(dirName)#, 0755) # # def writeasdict(filename, *args, **kwargs): # dataFormat = kwargs.get('dataFormat', '>+10.8e') # ff = open(filename, 'w') # for name, value in args: # ff.write('{2:{0}} : {3:{1}}\n'.format('<14', dataFormat, name, value)) # ff.close() # # def writeaslist(filename, *args, **kwargs): # headerFormat = kwargs.get('headerFormat', '<15') # dataFormat = kwargs.get('dataFormat', '>+10.8e') # ff = open(filename, 'w') # write(ff, headerFormat, [item for item in [arg[0] for arg in args]]) # for value in zip(*[arg[1] for arg in args]): # write(ff, dataFormat, value) # ff.close() # # def write(ff, writeFormat, items): # if isinstance(items[0], str): # ff.write('# ') # else: # ff.write(' ') # ff.write(''.join('{1:{0}} '.format(writeFormat, item) for item in items) + '\n') # # def sortDirByNum(dirStr, direction='forward'): # num = np.array([float(''.join(i for i in dir.lower() if i.isdigit() or i == '.').strip('.')) for dir in dirStr]) # if direction == 'reverse': # ind = np.argsort(num)[::-1] # else: # ind = np.argsort(num) # return [dirStr[i] for i in ind], num[ind] # # def getFG(x): # txMax = 5.0 # N = 20 # # pt = np.array([-1., 0., 1.]) * np.sqrt(3./5.) # w = np.array([5., 8., 5.]) / 9 # # t = np.linspace(0.0, txMax / x, N + 1) # dt = 0.5 * (t[1] - t[0]) # # f = 0. # g = 0. # for i in range(N): # ti = t[i] + dt * (pt + 1) # F = w * dt * np.exp(-ti * x) / (ti**2 + 1) # # f += np.sum(F) # g += np.sum(F * ti) # # return f, g # # def ensureDict(Dict): # if isinstance(Dict, str): # Dict = Dictionary(Dict) # return Dict # # def rm_rf(dList): # dList = map(None, dList) # for d in dList: # for path in (os.path.join(d,f) for f in os.listdir(d)): # if os.path.isdir(path): # rm_rf(path) # else: # os.unlink(path) # os.rmdir(d) # # def checkDir(f): # if not os.path.isdir(f): # os.makedirs(f) # # def listdirNoHidden(d): # f = os.listdir(d) # rmInd = [] # for i in range(len(f)): # if f[i].startswith('.'): # rmInd.append(i) # for i in range(len(rmInd)): # del f[rmInd[i]] # return f PK!5,,krampy/io_old.py"""A module holding a class to extend the built-in dictionary. The Dictionary class is a subclass of the built-in dict type, and contains methods for loading and instantiating the dictionary from text files. Dictionary file import relies on converting the input file to json format, then creating a dict object from the json string and performing some smart type conversion for dictionary keys and values. The input dictionary should be similar in format to a json dictionary, with the following allowable modifications: - Comma delimiters are not necessary between lines, as they will be automatically added - The file does not need to begin and end with curly brackets - Strings do not need to be surrounded by quotes, although they may be to force import as a string The load methods us regular expression patterns defined in the Pattern class to parse the tokens in the file. Patterns for numbers, words, None, NaN, Inf, and environment variables exist. """ import json import os import re class Pattern(object): """A small helper object for storing Regex patterns. Class Attributes ---------------- DELIMITER : SRE_Pattern Any single character of the following set (in parens): (:,{}[]) NUMBER : SRE_Pattern Any number, including int, float, or exponential patterns. Will only match if entire string is number. LITERAL : SRE_Pattern Any normal chars or path delimiters (/\.) surrounded by single- or double-quotes BOOL : SRE_Pattern Case-insensitive boolean values, True or False NONE : SRE_Pattern Case-insensitive None WORD : SRE_Pattern A continuous string of normal chars, including plus and minus signs ENV : SRE_Pattern A continuous string of normal chars preceded by a dollar sign, $ NANINF : SRE_Pattern Case-insensitive NaN or Inf, optionally with sign Usage ----- Pattern.NUMBER.match('1.455e+10') will return a re.MatchObject """ DELIMITER = re.compile(r"[:,\{\}\[\]]") NUMBER = re.compile(r"\A[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?\Z") LITERAL = re.compile(r"(\"[\w\\\/\.]*\")|(\'[\w\\\/\.]*\')") BOOL = re.compile(r"(True|False)", re.IGNORECASE) NONE = re.compile(r"(None)", re.IGNORECASE) WORD = re.compile(r"[-+\w]+") ENV = re.compile(r"\$(\w+)") NANINF = re.compile(r"[+-]?(nan|inf)", re.IGNORECASE) class Dictionary(dict): """An file-based extension of the standard dict object Dictionaries can be easily read from files, which may also depend themselves on recursive loading of sub-Dictionaries. Parameters ---------- construct_from : str, dict, or None, optional, default=None If a dict is provided, the values are copied into this instance. If a string is provided which starts with a curly bracket "{", the string is parsed via the load_from_string method, which aims to be "smart" in cleaning the string into json format and subsequently loading it as a dictionary. If the string does not begin with a curly bracket, it is taken to be a filename, which is in-turn loaded as a string and passed to the load_from_string method. If no value or None is provided, a blank dictionary is instantiated. """ def __init__(self, construct_from=None): source_file = None # If no argument, just call default dict constructor if construct_from is None: super().__init__() return # Call appropriate function to load dictionary values if isinstance(construct_from, dict): self.load_from_dict(construct_from) elif isinstance(construct_from, str): if construct_from.strip().startswith("{"): self.load_from_string(construct_from) else: source_file = construct_from self.load_from_file(construct_from) else: raise ValueError("Argument to Dictionary must be string, dict, or None") # If specified, read values from a base dictionary # All local values override the base dictionary values base_dict_dir = self.read("baseDict", default=None) if base_dict_dir is not None: # Allow for loading dictionary from different directory by tracing # relative references from original file directory if base_dict_dir.startswith("."): if source_file is not None: # Begin from the directory the source_file lies in dir_name = os.path.dirname(source_file) else: # Otherwise, use the current directory dir_name = "." base_dict_dir = os.path.join(dir_name, base_dict_dir) base_dict = Dictionary(base_dict_dir) # Use values in base_dict if they don't exist in this Dictionary for key, val in base_dict.items(): if not key in self: self.update({key: val}) def load_from_file(self, filename): """Load Dictionary information from a text file. Arguments --------- filename : str Name or path of file to load. """ # Convert file format to appropriate string, then load dict from the string dict_list = [] with open(filename) as ff: for line in ff: # Remove comment strings, everything after # discarded line = line.split("#")[0].strip() if line != "": dict_list.append(line) dict_string = ",".join(dict_list) self.load_from_string(dict_string) def load_from_string(self, in_string): """Load Dictionary information from a string. Convert the string to a json-compatible string, then pass to load_from_json. Arguments --------- in_string : str String to parse. """ # Surround string with curly brackets if it's not already if not in_string.startswith("{"): in_string = in_string.join("{}") # Replace multiple consecutive commas with one in_string = re.sub(",,+", ",", in_string) # Replace [, or {, or ,] or ,} with bracket only def repl(m): return m.group(1) in_string = re.sub(r"([\{\[]),", repl, in_string) in_string = re.sub(r",([\]\}])", repl, in_string) in_string = in_string.replace("}{", "},{") # Replace environment variables with their value & surround by quotes def repl(m): return os.environ[m.group(1)].join('""') in_string = Pattern.ENV.sub(repl, in_string) # Split in_string into a list of delimiters and sub-strings in_list = [] while len(in_string) > 0: in_string = in_string.strip() match = Pattern.DELIMITER.search(in_string) if match: # Process any words before the delimiter if match.start() > 0: word = in_string[: match.start()].strip() if Pattern.LITERAL.match(word): # Surround literals by escaped double-quotes word = word[1:-1].join(('\\"', '\\"')).join('""') elif Pattern.BOOL.match(word): # Convert boolean values to lowercase word = word.lower() elif Pattern.NUMBER.match(word): pass elif Pattern.WORD.match(word): # Surround words by double-quotes word = word.join('""') else: # Surround by literal double-quotes, and regular ones word = word.join(('\\"', '\\"')).join('""') in_list.append(word) in_list.append(match.group(0)) in_string = in_string[match.end() :] else: # No more delimiters break json_string = "".join(in_list) self.load_from_json(json_string) def load_from_json(self, json_string): """Load Dictionary information from a json-formatted string. The string is first loaded by the json module as a dict, then passed to load_from_dict. Arguments --------- json_string : str String to parse. """ try: dict_ = json.loads(json_string) except: raise ValueError("Error converting string to json: {0}".format(json_string)) self.load_from_dict(dict_) def load_from_dict(self, dict_): """Load Dictionary information from a standard dict. The values are copied to this Dictionary and parsed, with special handling of certain variables. Arguments --------- dict_ : dict Dictionary to copy. """ for key, val in dict_.items(): if isinstance(val, str): # Process NaN and infinity match = Pattern.NANINF.match(val) if match: val = float(match.group(0)) elif Pattern.LITERAL.match(val): # Remove quotes from literal string val = val[1:-1] elif Pattern.NONE.match(val): # Convert None string to None val = None # Evaluate if unit. is in the string elif "unit." in val: try: val = eval(val) except: ValueError("Cannot process the value {0}: {1}".format(key, val)) elif isinstance(val, dict): val = Dictionary(val) # Remove quotes from key if Pattern.LITERAL.match(key): key = key[1:-1] self.update({key: val}) def read(self, key, default=None, type_=None): """Load value from dictionary. Arguments --------- key : str Dictionary key. default : optional, default=None Default value if key not in Dictionary. type_ : optional, default=No conversion Type to convert dictionary value to. """ val = self.read_or_default(key, default) if type_ is not None: val = type_(val) return val def read_or_default(self, key, default): """Load value from dictionary, or return default if not found. Arguments --------- key : str Dictionary key. default : Default value if key not in Dictionary. """ return self.get(key, default) def read_load_or_default(self, key, default): """Load value from dictionary. If value is a string, will load attribute of string name from config. Arguments --------- key : str Dictionary key. default : Default value if key not in Dictionary. """ val = self.read(key, default) if isinstance(val, str): # Here because if in preamble there is # a circular import from planingfsi import config val = getattr(config, val) return val PK!Sp!p!krampy/iotools/IOTools.pyimport csv import os import sys import xlwt import xlrd import xlutils.copy as xlcp import krampy as kp class OutputWorkbook: def __init__(self, name, unitDict=None): self.name = name if os.path.exists(name): self.xlObj = xlrd.open_workbook(name) self.sheetNames = self.xlObj.sheet_names() self.sheetNumCells = [(s.nrows, s.ncols) for s in self.xlObj.sheets()] self.xlObj = xlcp.copy(self.xlObj) else: self.sheetNames = [] self.xlObj = xlwt.Workbook() self.sheets = [] self.unitDict = unitDict def addSheet(self, sheetName): sheet = OutputWorksheet(self, sheetName, self.unitDict) self.sheets.append(sheet) return sheet def save(self): try: for i in range(10): if i == 0: newFileName = self.name else: newFileName = "{0}_{1}.xls".format(self.name.replace(".xls", ""), i) try: self.xlObj.save(newFileName) print("Output workbook saved to {0}".format(newFileName)) return None except: i += 0 except: raise NameError("Cannot find an appropriate output file path.") class OutputWorksheet: def __init__(self, parent, name, unitDict=None): if not unitDict: unitDict = {"nd": "-"} self.name = name # If sheet already exists, load it, otherwise create it if self.name in parent.sheetNames: ind = parent.sheetNames.index(self.name) self.xlObj = parent.xlObj.get_sheet(ind) for i in range(parent.sheetNumCells[ind][0]): for j in range(parent.sheetNumCells[ind][1]): self.xlObj.write(i, j, "") else: self.xlObj = parent.xlObj.add_sheet(self.name) self.activeRow = 0 self.unitDict = unitDict def getHeaderString(self, header): if not isinstance(header, str): name = header[0] unit = header[1] else: name = header unit = "nd" return "{0} [{1}]".format(name, self.unitDict.get(unit, unit)) def writeHeaders(self, *args): self.writeHeader(*args) def writeHeader(self, *args): self.writeRow(*[self.getHeaderString(arg) for arg in args]) def writeRow(self, *values): # Write values to the next row activeCol = 0 for value in values: if not hasattr(value, "__iter__"): value = [value] for valuei in value: self.xlObj.write(self.activeRow, activeCol, valuei) activeCol += 1 self.activeRow += 1 def writeCell(self, indx, indy, value): self.xlObj.write(indx, indy, value) class OutputCSV: def __init__(self, name, unitDict=None): self.name = name self.fObj = open(name, "wb") self.writer = csv.writer(self.fObj, delimiter=",") self.activeRow = 0 if not unitDict: self.unitDict = {"nd": "-"} else: self.unitDict = unitDict def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() return isinstance(value, TypeError) def save(self): self.close() def close(self): self.fObj.close() def getHeaderString(self, header): if not isinstance(header, str): name = header[0] unit = header[1] else: name = header unit = "nd" return "{0} [{1}]".format(name, self.unitDict.get(unit, unit)) def writeHeaders(self, *args): self.writeHeader(*args) def writeHeader(self, *args): self.writeRow(*[self.getHeaderString(arg) for arg in args]) def writeRow(self, *values): # Concatenate all arguments into one list and write row to file # Stacks each value as if they were lists combined writeList = [] for value in values: if hasattr(value, "__iter__"): writeList += [v for v in value] else: writeList += [value] self.writer.writerow(writeList) class CombinedOut: def __init__(self, *files): self.files = files def write(self, obj): for f in self.files: f.write(obj) class LogFile: def __init__(self, *fileNames, **kwargs): dirName = kwargs.get("dirName", ".") kp.mkdir_p(dirName) self.files = [open(os.path.join(dirName, ff), "w") for ff in fileNames] sys.stdout = CombinedOut(sys.stdout, *self.files) sys.stderr = CombinedOut(sys.stderr, *self.files) def close(self): for ff in self.files: ff.close() class TimestampedLogFile(LogFile): def __init__(self, baseName, startTime, **kwargs): timeString = startTime.replace(microsecond=0).isoformat().replace(":", "-") LogFile.__init__(self, "{0}_{1}.dat".format(baseName, timeString), **kwargs) def writeRowToFile(f, *values): # Write values to the next row activeCol = 0 for i, value in enumerate(values): if not hasattr(value, "__iter__"): value = [value] for j, valuei in enumerate(value): if i > 0 or j > 0: f.write(", ") f.write("{0}".format(valuei)) activeCol += 1 f.write("\n") def getLineCSV(f): return [float(s.replace(" ", "")) for s in f.readline().split(",")] def _getOutCell(outSheet, rowIndex, colIndex): """ HACK: Extract the internal xlwt cell representation. """ row = outSheet._Worksheet__rows.get(rowIndex) if not row: return None cell = row._Row__cells.get(colIndex) return cell def setOutCell(outSheet, row, col, value): """ Change cell value without changing formatting. """ previousCell = _getOutCell(outSheet, row, col) outSheet.write(row, col, value) if previousCell: newCell = _getOutCell(outSheet, row, col) if newCell: newCell.xf_idx = previousCell.xf_idx # Thread-safe printing def safePrint(s=""): print("{0}\n".format(s)) # Print functions for various sections, etc. HEADER_WIDTH = 100 TEXT_WIDTH = HEADER_WIDTH - 6 def printHeaderLine(s=""): if len(s) > TEXT_WIDTH: printHeaderLine(s[:TEXT_WIDTH]) printHeaderLine(s[TEXT_WIDTH:]) else: safePrint("|| {0:{1}s} ||".format(s, TEXT_WIDTH)) def printSpacerLine(): safePrint("=" * HEADER_WIDTH) def printSubsection(s): safePrint() printSpacerLine() safePrint(s) printSpacerLine() safePrint() def printHeader(*args): """ Print header. Each argument will be printed on its own line. A value of None provided will be printed as a blank line. A spacer line is printed before and after the provided arguments. """ printSpacerLine() for arg in args: if arg is not None: printHeaderLine(arg) else: printHeaderLine() printSpacerLine() # Function to convert a list to a string (for printing) def list2str(l, formatString=""): s = "[" for i, li in enumerate(l): s += "{0:{1}}".format(li, formatString) if i < len(l) - 1: s += ", " s += "]" return s # def selectFile(*filetypes, **kwargs): # from Tkinter import Tk # from tkFileDialog import askopenfilename, askopenfilenames # # title = kwargs.get('title', 'Please select file(s)') # root = Tk() # root.withdraw() # filename = askopenfilename(title=title, # filetypes=[filetype for filetype in filetypes]+[('All files', '*.*')]) # # # Error if no file selected # if len(filename) == 0: # raise NameError('Please select one or more appropriate files') # # return filename # # def selectFiles(*filetypes, **kwargs): # from Tkinter import Tk # from tkFileDialog import askopenfilename, askopenfilenames # # title = kwargs.get('title', 'Please select file(s)') # root = Tk() # root.withdraw() # filenames = root.tk.splitlist(askopenfilenames(title=title, # filetypes=[filetype for filetype in filetypes]+[('All files', '*.*')])) # # # Error if no file selected # if len(filenames) == 0: # raise NameError('Please select one or more appropriate files') # return filenames PK!N<  krampy/iotools/InputSheet.pyimport sys import xlrd import krampy as kp # A class to automatically and flexibly collect the data from a row of an input excel sheet class InputRow: def storeValue(self, varName, value): if value == '': value = None elif isinstance(value, str): # one-length unicode strings if value.lower() == 'y': value = True elif value.lower() == 'n': value = False setattr(self, varName, value) class InputSheet: def __init__(self, bookName, sheetName, varList, **kwargs): if isinstance(varList, dict): self.varList = varList else: self.varList = {} for var in varList: if (isinstance(var, tuple) or isinstance(var, list)) and len(var) == 3: self.varList[var[0]] = (var[1], var[2]) else: self.varList[var[0]] = var[1] self.numHeaderRows = kwargs.get('num_header_rows', kwargs.get('numHeaderRows', 1)) self.headerMatchRow = kwargs.get('header_match_row', kwargs.get('headerMatchRow', 0)) self.rowList = [] self.numRows = 0 self.defaultValue = kwargs.get('default', None) self.loadSheet(bookName, sheetName) def __iter__(self): for row in self.rowList: yield row def __getitem__(self, index): return self.rowList[index] def getRowByMatch(self, attributeName, value): return self[kp.match(self.getList(attributeName), value)] def getRowValue(self, indexVar, matchVar, matchValue): return kp.indexMatch(self.getList(indexVar), self.getList(matchVar), matchValue) def getList(self, varName): return [getattr(row, varName) for row in self.rowList] def getColNum(self, wildcard, default=None): try: return kp.match(self.header, wildcard) except (ValueError, IndexError): if default is not None: return default else: print 'Could not match wildcard {0} to a column'.format(wildcard) return None def loadSheet(self, bookName, sheetName): if isinstance(bookName, xlrd.Book): book = bookName else: book = xlrd.open_workbook(bookName) if sheetName in book.sheet_names(): sheet = book.sheet_by_name(sheetName) # Store header self.header = sheet.row_values(self.headerMatchRow) # Create a temporary dictionary where each entry is a column self.numRows = sheet.nrows - self.numHeaderRows tmpDict = {} for key, val in self.varList.iteritems(): if isinstance(val, tuple): val = val[0] colNum = self.getColNum(val, default=sys.maxint) if colNum is not None and colNum < sys.maxint: tmpDict[key] = sheet.col_values(colNum, start_rowx=self.numHeaderRows) else: tmpDict[key] = [self.defaultValue for _ in range(self.numRows)] # Add rows to row list for i in range(self.numRows): row = InputRow() for key, val in self.varList.iteritems(): if isinstance(val, tuple): row.storeValue(key, map(val[1], [tmpDict[key][i]])[0]) else: row.storeValue(key, tmpDict[key][i]) self.rowList.append(row) else: print 'Sheet "{0}" does not exist in workbook "{1}". Skipping import.'.format(sheetName, bookName) PK!]krampy/iotools/__init__.py# from IOTools import * # from InputSheet import InputSheet from .dictionary import Dictionary, load_dict_from_file # from Stopwatch import Stopwatch PK!WEqqkrampy/iotools/dictionary.pyimport json import os import re import warnings from collections import UserDict from krampy import logger def replace_single_quotes_with_double_quotes(string): """Replace all single-quoted strings with double-quotes.""" def repl(m): return m.group(1).join('""') return re.sub(r"'(.+?)'", repl, string) def replace_environment_variables(string): """Replace environment variables with their value.""" def repl(m): return os.environ[m.group(1)] return re.sub("\$(\w+)", repl, string) def add_quotes_to_words(string): """Find words inside a string and surround with double-quotes.""" quoted_pattern = re.compile('(".+?")') word_pattern = re.compile("([\w.-]+)") # Any number, integer, float, or exponential number_pattern = re.compile(r"[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?") matches = quoted_pattern.split(string) def repl(m): """Add quotes, only if it isn't a number.""" value = m.group(1) if number_pattern.match(value): return value return value.join('""') return "".join( word_pattern.sub(repl, m) if i % 2 == 0 else m for i, m in enumerate(matches) ) def jsonify_string(string): """Loop through a string, ensuring double-quotes are used to comply with json standard. * Find pattern. * Add everything up until the pattern to new copy of string. * Add pattern with single-quotes substituted for double-quotes. * If there's a double-quote inside single-quotes, it means we have an apostrophe. * Add everything up to the double quote. * Once they are all added, which will eventually include the apostrophe, normal matching will proceed. """ if not string.startswith("{"): string = string.join("{}") string = replace_single_quotes_with_double_quotes(string) string = add_quotes_to_words(string) string = replace_environment_variables(string) string = re.sub(",+", ",", string) string = ( string.replace("[,", "[") .replace("{,", "{") .replace(",]", "]") .replace(",}", "}") .replace("}{", "},{") ) logger.debug('JSONified string: "{}"'.format(string)) return string def load_dict_from_file(filename): """Read a file, which is a less strict JSON format, and return a dictionary.""" logger.debug('Loading Dictionary from file "{}"'.format(filename)) with open(filename) as f: dict_iter = (line.split("#")[0].strip() for line in f.readlines()) dict_ = load_dict_from_string(",".join(dict_iter)) # If specified, read values from a base dictionary # All local values override the base dictionary values base_dict_dir = dict_.get("baseDict", dict_.get("base_dict")) if base_dict_dir: base_dict_dir = os.path.split(base_dict_dir) # Tracing relative references from original file directory if base_dict_dir[0].startswith("."): base_dict_dir = os.path.abspath( os.path.join(os.path.dirname(filename), *base_dict_dir) ) base_dict = Dictionary(from_file=base_dict_dir) dict_.update({k: v for k, v in base_dict.items() if k not in dict_}) return dict_ def load_dict_from_string(string): """Convert string to JSON string, convert to a dictionary, and return.""" logger.debug('Loading Dictionary from string: "{}"'.format(string)) json_string = jsonify_string(string) try: dict_ = json.loads(json_string) except json.decoder.JSONDecodeError: raise ValueError('Error loading from json string: "{}"'.format(json_string)) # Provide specialized handling of certain strings for key, val in dict_.items(): if isinstance(val, str): match = re.match(r"([+-]?nan|[+-]?inf)", val) if match: dict_[key] = float(match.group(1)) elif "unit." in val: # noinspection PyUnresolvedReferences from krampy import unit dict_[key] = eval(val) return dict_ class Dictionary(UserDict): """A deprecated class for loading dictionary from file.""" def __init__(self, from_dict=None, from_file=None, from_string=None): message = ( "The krampy.iotools.Dictionary class is deprecated. " "Consider using the function load_dict_from_file instead." ) warnings.warn(message, DeprecationWarning) super().__init__() if from_dict: self.update(from_dict) elif from_file: self.update(load_dict_from_file(from_file)) elif from_string: self.update(load_dict_from_string(from_string)) PK!krampy/iterator.pyimport itertools from collections import namedtuple from .iotools import load_dict_from_file class ProductIterator(object): """An iterator, wrapped around itertools.product, returning namedtuple objects.""" def __init__(self, *args, from_dict=None, ignore_dict=None, **kwargs): self.items = {} if from_dict: self.items.update(from_dict) elif args: self.items.update(dict(args)) elif kwargs: self.items.update(dict(**kwargs)) self.ignore_items = {} if ignore_dict: self.ignore_items = ProductIterator(from_dict=ignore_dict) def __contains__(self, item): return any(it == item for it in iter(self)) def __iter__(self): keys = self.items.keys() values = [v if hasattr(v, "__iter__") else [v] for v in self.items.values()] iterator = itertools.product(*values) class_ = namedtuple("IteratorItem", keys) for it in iterator: if it not in self.ignore_items: yield class_(*it) class Iterator(object): def __init__(self, *args, from_dict=None, from_dict_file=None): self.sub_iterators = [] if from_dict: self.load_from_dict(from_dict) elif from_dict_file: dict_ = load_dict_from_file(from_dict_file) self.load_from_dict(dict_) elif len(args) > 0: iterator = ProductIterator(*args) self.sub_iterators.append(iterator) def load_from_dict(self, dict_): # Process dicts in iterDicts list separately if "iter_dicts" in dict_: for iter_dict in dict_["iter_dicts"]: iterator = Iterator(from_dict=iter_dict) self.sub_iterators.extend(iterator.sub_iterators) dict_.pop("iter_dicts") if "product_dict" in dict_: iterator = ProductIterator( from_dict=dict_["product_dict"], ignore_dict=dict_.get("ignore_dict") ) self.sub_iterators.append(iterator) def __iter__(self): """Recursively iterate through all iterators.""" for iterator in self.sub_iterators: for item in iterator: yield item PK!ֶ^V11krampy/krampy_old.pyimport os from fnmatch import fnmatch import numpy as np class RootFinder: def __init__(self, func, xo, method, **kwargs): self.func = func self.dim = len(xo) self.xMin = kwargs.get("xMin", -np.ones_like(xo) * float("Inf")) self.xMax = kwargs.get("xMax", np.ones_like(xo) * float("Inf")) self.maxIt = kwargs.get("maxIt", 100) self.dx0 = kwargs.get("firstStep", 1e-6) self.errLim = kwargs.get("errLim", 1e-6) self.relax = kwargs.get("relax", 1.0) self.dxMax = kwargs.get("dxMax", np.ones_like(xo) * float("Inf")) self.dxMaxInc = kwargs.get("dxMaxInc", self.dxMax) self.dxMaxDec = kwargs.get("dxMaxDec", self.dxMax) self.relax = kwargs.get("relax", 1.0) self.derivativeMethod = kwargs.get("derivativeMethod", "right") self.err = 1.0 self.it = 0 self.J = None self.xOld = None self.fOld = None self.maxJacobianResetStep = kwargs.get("maxJacobianResetStep", 5) self.converged = False # Calculate function value at initial point self.x = xo # Convert to numpy arrays for v in ["x", "xMin", "xMax", "dxMax", "dxMaxInc", "dxMaxDec"]: exec("self.{0} = np.array([a for a in self.{0}])".format(v)) self.evalF() if method.lower() == "broyden": self.getStep = self.getStepBroyden else: self.getStep = self.getStepSecant def reinitialize(self, xo): self.err = 1.0 self.it = 0 self.J = None # Calculate function value at initial point self.x = xo self.evalF() def setMaxStep(self, *args): if len(args) == 1: self.dxMaxInc = args[0] self.dxMaxDec = args[0] else: self.dxMaxInc = args[0] self.dxMaxDec = args[1] def limitStep(self, dx=None): if dx is None: dx = self.dx dx *= self.relax x = self.x + dx x = np.max(np.vstack((x, self.xMin)), axis=0) x = np.min(np.vstack((x, self.xMax)), axis=0) dx = x - self.x dxLimPct = np.ones_like(dx) for i in range(len(dxLimPct)): if dx[i] > 0: dxLimPct[i] = np.min([dx[i], self.dxMaxInc[i]]) / dx[i] elif dx[i] < 0: dxLimPct[i] = np.max([dx[i], -self.dxMaxDec[i]]) / dx[i] dx *= np.min(dxLimPct) self.dx = dx return dx def storePrevStep(self): self.xOld = self.x * 1.0 self.fOld = self.f * 1.0 def evalErr(self): self.err = np.max(np.abs(self.dx + 1e-8)) if self.df is not None: self.err += np.max(np.abs(self.df + 1e-8)) else: self.err += np.max(np.abs(self.f)) def evalF(self): self.f = self.func(self.x) if self.fOld is not None: self.df = self.f - self.fOld else: self.df = None def takeStep(self, dx=None): if dx is not None: self.dx = dx self.storePrevStep() self.x += self.dx self.evalF() self.it += 1 def getStepSecant(self): if self.it == 0: self.dx = np.ones_like(self.x) * self.dx0 else: self.dx = -self.f * (self.x - self.xOld) / (self.f - self.fOld + 1e-8) return self.dx def reset_jacobian(self): fo = self.f * 1.0 xo = self.x * 1.0 self.J = np.zeros((self.dim, self.dim)) for i in range(self.dim): self.dx = np.zeros_like(self.x) self.dx[i] = self.dx0 self.takeStep() self.J[:, i] = self.df / self.dx[i] self.f = fo self.x = xo self.step = 0 def getStepBroyden(self): if self.it == 0 or self.J is None: self.reset_jacobian() dx = np.reshape(self.dx, (self.dim, 1)) df = np.reshape(self.df, (self.dim, 1)) self.J += np.dot(df - np.dot(self.J, dx), dx.T) / np.linalg.norm(dx) ** 2 dx *= 0.0 dof = [ not x <= xMin and not x >= xMax for x, xMin, xMax in zip(self.x, self.xMin, self.xMax) ] if any(dof): A = -self.J b = self.f.reshape(self.dim, 1) dx[np.ix_(dof)] = np.linalg.solve(A[np.ix_(dof, dof)], b[np.ix_(dof)]) if any(np.abs(self.f) - np.abs(self.fOld) > 0.0): self.step += 1 if self.step >= self.maxJacobianResetStep: dx = np.ones_like(dx) * self.dx0 self.J = None self.dx = dx.reshape(self.dim) return self.dx def solve(self): while self.err >= self.errLim and self.it < self.maxIt: self.getStep() self.limitStep() self.takeStep() self.evalErr() self.converged = self.err < self.errLim and not any(self.x <= self.xMin) return self.x def mag(vec): return np.sum(np.array([veci ** 2 for veci in vec])) ** 0.5 def ang2vec(ang): return np.array([np.cos(ang), np.sin(ang)]) def ang2vecd(ang): return np.array([cosd(ang), sind(ang)]) def deg2rad(ang): return ang * np.pi / 180 def rad2deg(ang): return ang * 180 / np.pi def cosd(ang): return np.cos(deg2rad(ang)) def sind(ang): return np.sin(deg2rad(ang)) def tand(ang): return np.tan(deg2rad(ang)) def acosd(slope): return rad2deg(np.arccos(slope)) def asind(slope): return rad2deg(np.arcsin(slope)) def atand(slope): return rad2deg(np.arctan(slope)) def atand2(y, x): return rad2deg(np.arctan2(y, x)) def cumdiff(x): return np.sum(np.diff(x)) def fzero(f, xo, **kwargs): maxIt = kwargs.get("maxIt", 100) dx0 = kwargs.get("firstStep", 1e-6) errLim = kwargs.get("errLim", 1e-6) printout = kwargs.get("printout", False) xname = kwargs.get("xname", "x") xscale = kwargs.get("xscale", 1.0) xmin = kwargs.get("xmin", -float("Inf")) xmax = kwargs.get("xmax", float("Inf")) nspace = kwargs.get("nspace", 0.0) method = kwargs.get("method", "Secant") if nspace == 0: space = " " else: space = " " * nspace if method == "Secant" or method == "Newton": def grad(x): dx = 1e-6 return (f(x + dx) - f(x - dx)) / (2 * dx) err = 1 it = 0 xOld = xo fOld = f(xOld) xNew = xOld + dx0 while err >= errLim and it < maxIt: fNew = f(xNew) if printout: string = "{5}Iteration {0}: {1} = {2:f}, {3} = {4:5.3e}" print((string.format(it, xname, xNew * xscale, "residual", err, space))) if method == "Newton": dx = -fNew / grad(xOld) elif method == "Secant": dx = -fNew * (xNew - xOld) / (fNew - fOld) xOld = xNew fOld = fNew xNew += dx xNew = max(min(xNew, xmax), xmin) dx = xNew - xOld err = np.abs(dx) it += 1 return xNew elif method == "Golden": GR = (np.sqrt(5) - 1) / 2 X = np.array([xmin, xmax - GR * (xmax - xmin), xmax]) F = np.zeros_like(X) for i in range(len(X)): F[i] = np.abs(f(X[i])) err = 1 it = 0 while err > errLim: xNew = X[0] + GR * (X[2] - X[0]) fNew = np.abs(f(xNew)) if fNew < F[1]: X = np.array([X[1], xNew, X[2]]) F = np.array([F[1], fNew, X[2]]) else: X = np.array([xNew, X[1], X[0]]) F = np.array([fNew, F[1], F[0]]) err = np.abs(X[2] - X[0]) it += 1 return f((X[2] + X[0]) / 2) def integrate(x, f): # Trapezoidal integration I = np.argsort(x) x = x[I] f = f[I] f[np.nonzero(np.abs(f) == float("Inf"))] = 0.0 return 0.5 * np.sum((x[1:] - x[0:-1]) * (f[1:] + f[0:-1])) def growPoints(x0, x1, xMax, rate=1.1): dx = x1 - x0 x = [x1] def done(xt): if dx > 0: return xt > xMax elif dx < 0: return xt < xMax else: return True while not done(x[-1]): x.append(x[-1] + dx) dx *= rate return np.array(x[1:]) def fillPoints(x0, x1, L, pctLast, targetRate=1.1): dxLast = pctLast * L x2 = x1 + np.sign(x1 - x0) * (L - 0.5 * dxLast) def func(rate): return growPoints(x0, x1, x2, rate) def res1(rate): return np.abs(np.diff(func(rate)[-2:])[0]) - dxLast def res2(rate): return func(rate)[-1] - x2 return func(fzero(res2, fzero(res1, targetRate))) def getDerivative(f, x, direction="c"): dx = 1e-6 fr = f(x + dx) fl = f(x - dx) if direction[0].lower() == "r" or np.isnan(fl): return (fr - f(x)) / dx elif direction[0].lower() == "l" or np.isnan(fr): return (f(x) - fl) / dx else: return (f(x + dx) - f(x - dx)) / (2 * dx) def cross2(a, b): return a[0] * b[1] - a[1] * b[0] def rotateVec(v, ang): C = cosd(ang) S = sind(ang) return np.array([C * v[0] - S * v[1], S * v[0] + C * v[1]]) def rotatePt(oldPt, basePt, ang): relPos = np.array(oldPt) - np.array(basePt) newPos = rotateVec(relPos, ang) newPt = basePt + newPos return newPt def extrap1d(interpolator): xs = interpolator.x ys = interpolator.y def pointwise(x): if x < xs[0]: return ys[0] + (x - xs[0]) * (ys[1] - ys[0]) / (xs[1] - xs[0]) elif x > xs[-1]: return ys[-1] + (x - xs[-1]) * (ys[-1] - ys[-2]) / (xs[-1] - xs[-2]) else: return interpolator(x) return pointwise def minMax(x): return min(x), max(x) def sign(x): if x > 0: return 1.0 elif x < 0: return -1.0 else: return 0.0 def heaviside(x): if x > 0: return 1.0 elif x < 0: return 0.0 else: return 0.5 def inRange(x, lims): if x >= lims[0] and x <= lims[1]: return True else: return False def createIfNotExist(dirName): if not os.path.exists(dirName): os.makedirs(dirName) # , 0755) def writeasdict(filename, *args, **kwargs): dataFormat = kwargs.get("dataFormat", ">+10.8e") ff = open(filename, "w") for name, value in args: ff.write("{2:{0}} : {3:{1}}\n".format("<14", dataFormat, name, value)) ff.close() def writeaslist(filename, *args, **kwargs): headerFormat = kwargs.get("headerFormat", "<15") dataFormat = kwargs.get("dataFormat", ">+10.8e") ff = open(filename, "w") write(ff, headerFormat, [item for item in [arg[0] for arg in args]]) for value in zip(*[arg[1] for arg in args]): write(ff, dataFormat, value) ff.close() def write(ff, writeFormat, items): if isinstance(items[0], str): ff.write("# ") else: ff.write(" ") ff.write("".join("{1:{0}} ".format(writeFormat, item) for item in items) + "\n") def sortDirByNum(dirStr, direction="forward"): num = np.array( [ float("".join(i for i in d.lower() if i.isdigit() or i == ".").strip(".")) for d in dirStr ] ) if direction == "reverse": ind = np.argsort(num)[::-1] else: ind = np.argsort(num) return [dirStr[i] for i in ind], num[ind] def getFG(x): txMax = 5.0 N = 20 pt = np.array([-1.0, 0.0, 1.0]) * np.sqrt(3.0 / 5.0) w = np.array([5.0, 8.0, 5.0]) / 9 t = np.linspace(0.0, txMax / x, N + 1) dt = 0.5 * (t[1] - t[0]) f = 0.0 g = 0.0 for i in range(N): ti = t[i] + dt * (pt + 1) F = w * dt * np.exp(-ti * x) / (ti ** 2 + 1) f += np.sum(F) g += np.sum(F * ti) return f, g def rm_rf(dList): dList = list(dList) for d in dList: for path in (os.path.join(d, f) for f in os.listdir(d)): if os.path.isdir(path): rm_rf(path) else: os.unlink(path) os.rmdir(d) def find_files(path=".", pattern="*"): """Return list of files in path matching the pattern. Args ---- path : str, optional Path to search. Default is current directory. pattern : str Wildcard pattern to match. """ return [d for d in os.listdir(path) if fnmatch(d, pattern)] def checkDir(f): if not os.path.isdir(f): os.makedirs(f) def listdir_nohidden(d): return [di for di in os.listdir(d) if not fnmatch(di, ".*")] listdirNoHidden = listdir_nohidden PK!纥**krampy/solver.pyimport numpy import numpy as np class RootFinder: def __init__(self, func, xo, method, **kwargs): self.func = func self.dim = len(xo) self.xMin = kwargs.get("xMin", -np.ones_like(xo) * float("Inf")) self.xMax = kwargs.get("xMax", np.ones_like(xo) * float("Inf")) self.maxIt = kwargs.get("maxIt", 100) self.dx0 = kwargs.get("firstStep", 1e-6) self.errLim = kwargs.get("errLim", 1e-6) self.relax = kwargs.get("relax", 1.0) self.dxMax = kwargs.get("dxMax", np.ones_like(xo) * float("Inf")) self.dxMaxInc = kwargs.get("dxMaxInc", self.dxMax) self.dxMaxDec = kwargs.get("dxMaxDec", self.dxMax) self.relax = kwargs.get("relax", 1.0) self.derivativeMethod = kwargs.get("derivativeMethod", "right") self.err = 1.0 self.it = 0 self.J = None self.xOld = None self.fOld = None self.maxJacobianResetStep = kwargs.get("maxJacobianResetStep", 5) self.converged = False # Calculate function value at initial point self.x = xo # Convert to numpy arrays for v in ["x", "xMin", "xMax", "dxMax", "dxMaxInc", "dxMaxDec"]: exec("self.{0} = np.array([a for a in self.{0}])".format(v)) self.evalF() if method.lower() == "broyden": self.getStep = self.getStepBroyden else: self.getStep = self.getStepSecant def reinitialize(self, xo): self.err = 1.0 self.it = 0 self.J = None # Calculate function value at initial point self.x = xo self.evalF() def setMaxStep(self, *args): if len(args) == 1: self.dxMaxInc = args[0] self.dxMaxDec = args[0] else: self.dxMaxInc = args[0] self.dxMaxDec = args[1] def limitStep(self, dx=None): if dx is None: dx = self.dx dx *= self.relax x = self.x + dx x = np.max(np.vstack((x, self.xMin)), axis=0) x = np.min(np.vstack((x, self.xMax)), axis=0) dx = x - self.x dxLimPct = np.ones_like(dx) for i in range(len(dxLimPct)): if dx[i] > 0: dxLimPct[i] = np.min([dx[i], self.dxMaxInc[i]]) / dx[i] elif dx[i] < 0: dxLimPct[i] = np.max([dx[i], -self.dxMaxDec[i]]) / dx[i] dx *= np.min(dxLimPct) self.dx = dx return dx def storePrevStep(self): self.xOld = self.x * 1.0 self.fOld = self.f * 1.0 def evalErr(self): self.err = np.max(np.abs(self.dx + 1e-8)) if self.df is not None: self.err += np.max(np.abs(self.df + 1e-8)) else: self.err += np.max(np.abs(self.f)) def evalF(self): self.f = self.func(self.x) if self.fOld is not None: self.df = self.f - self.fOld else: self.df = None def getStepSecant(self): if self.it == 0: self.dx = np.ones_like(self.x) * self.dx0 else: self.dx = -self.f * (self.x - self.xOld) / (self.f - self.fOld + 1e-8) return self.dx def takeStep(self, dx=None): if not dx is None: self.dx = dx self.storePrevStep() self.x += self.dx self.evalF() self.it += 1 def resetJacobian(self): fo = self.f * 1.0 xo = self.x * 1.0 self.J = np.zeros((self.dim, self.dim)) for i in range(self.dim): self.dx = np.zeros_like(self.x) self.dx[i] = self.dx0 self.takeStep() self.J[:, i] = self.df / self.dx[i] self.f = fo self.x = xo self.step = 0 def getStepBroyden(self): if self.it == 0 or self.J is None: self.resetJacobian() dx = np.reshape(self.dx, (self.dim, 1)) df = np.reshape(self.df, (self.dim, 1)) self.J += np.dot(df - np.dot(self.J, dx), dx.T) / np.linalg.norm(dx) ** 2 dx *= 0.0 dof = [ not x <= xMin and not x >= xMax for x, xMin, xMax in zip(self.x, self.xMin, self.xMax) ] if any(dof): A = -self.J b = self.f.reshape(self.dim, 1) dx[np.ix_(dof)] = np.linalg.solve(A[np.ix_(dof, dof)], b[np.ix_(dof)]) if any(np.abs(self.f) - np.abs(self.fOld) > 0.0): self.step += 1 if self.step >= self.maxJacobianResetStep: dx = np.ones_like(dx) * self.dx0 self.J = None self.dx = dx.reshape(self.dim) return self.dx def solve(self): while self.err >= self.errLim and self.it < self.maxIt: self.getStep() self.limitStep() self.takeStep() self.evalErr() self.converged = self.err < self.errLim and not any(self.x <= self.xMin) return self.x class RootFinderNew(RootFinder): def takeStep(self, dx=None): if not dx is None: self.dx = dx self.storePrevStep() self.x += self.dx self.evalF() self.it += 1 def resetJacobian(self): fo = self.f * 1.0 xo = self.x * 1.0 self.J = np.zeros((self.dim, self.dim)) for i in range(self.dim): self.dx = np.zeros_like(self.x) self.dx[i] = self.dx0 self.takeStep() self.J[:, i] = self.df / self.dx[i] self.f = fo self.x = xo self.step = 0 def getStepBroyden(self): if self.it == 0 or self.J is None: self.resetJacobian() dx = np.reshape(self.dx, (self.dim, 1)) df = np.reshape(self.df, (self.dim, 1)) self.J += np.dot(df - np.dot(self.J, dx), dx.T) / np.linalg.norm(dx) ** 2 dx *= 0.0 dof = [ not x <= xMin and not x >= xMax for x, xMin, xMax in zip(self.x, self.xMin, self.xMax) ] if any(dof): A = -self.J b = self.f.reshape(self.dim, 1) dx[np.ix_(dof)] = np.linalg.solve(A[np.ix_(dof, dof)], b[np.ix_(dof)]) if any(np.abs(self.f) - np.abs(self.fOld) > 0.0): self.step += 1 if self.step >= self.maxJacobianResetStep: dx = np.ones_like(dx) * self.dx0 self.J = None self.dx = dx.reshape(self.dim) return self.dx def solve(self): while self.err >= self.errLim and self.it < self.maxIt: self.getStep() self.limitStep() self.takeStep() self.evalErr() self.converged = self.err < self.errLim and not any(self.x <= self.xMin) return self.x def fzero(func, x_init, **kwargs): """Find the root of a function func with an initial guess x_init. Parameters ---------- func : function The function for which to find the zero-crossing point. x_init : float The initial guess of the function's solution. Keyword Parameters ------------------ max_it : int Maximum number of iterations (default=100) first_step : float Length of first step (default=1e-6) err_lim : float Tolerance for iteration residual (default=1e-6) xmin : float Minimum value for x-variable (default=-Inf) xmax : float Maximum value for x-variable (default=+Inf) """ max_it = kwargs.get("maxIt", 100) first_step = kwargs.get("first_step", 1e-6) err_lim = kwargs.get("err_lim", 1e-6) x_min = kwargs.get("xmin", -float("Inf")) x_max = kwargs.get("xmax", float("Inf")) error = 1.0 it_num = 0 x_old = x_init f_old = func(x_old) x_new = x_old + first_step while error >= err_lim and it_num < max_it: f_new = func(x_new) delta_x = -f_new * (x_new - x_old) / (f_new - f_old) x_old, f_old = x_new, f_new x_new = max(min(x_new + delta_x, x_max), x_min) error = numpy.abs(x_new - x_old) it_num += 1 return x_new # def fzero(f, xo, **kwargs): # ''' # Find the root of a function f with an initial guess xo. # # Available keyword arguments: # maxIt : maximum number of iterations (default=100) # firstStep : length of first step (default=1e-6) # errLim : tolerance for iteration residual (default=1e-6) # printOut : Boolean for whether to print results as solver runs (default=False) # xscale : scale factor for x-variable (default=1.0) # xmin : minimum value for x-variable (default=-Inf) # xmax : maximum value for x-variable (default=+Inf) # method : method to use for solving, options: ['Secant', 'Golden'] (default='Secant') # ''' # maxIt = kwargs.get('maxIt', 100) # dx0 = kwargs.get('firstStep', 1e-6) # errLim = kwargs.get('errLim', 1e-6) # printout = kwargs.get('printout', False) # xname = kwargs.get('xname', 'x') # xscale = kwargs.get('xscale', 1.0) # xmin = kwargs.get('xmin', -float('Inf')) # xmax = kwargs.get('xmax', float('Inf')) # nspace = kwargs.get('nspace', 0.0) # method = kwargs.get('method', 'Secant') # # if nspace == 0: # space = ' ' # else: # space = ' ' * nspace # # if method == 'Secant' or method == 'Newton': # def grad(x): # dx = 1e-6 # return (f(x+dx) - f(x-dx)) / (2*dx) # # err = 1 # it = 0 # xOld = xo # fOld = f(xOld) # xNew = xOld + dx0 # while err >= errLim and it < maxIt: # fNew = f(xNew) # # if printout: # print '{5}Iteration {0}: {1} = {2:f}, {3} = {4:5.3e}'.format(it, xname, xNew*xscale, 'residual', err, space) # # if method == 'Newton': # dx = -fNew / grad(xOld) # elif method == 'Secant': # dx = -fNew * (xNew - xOld) / (fNew - fOld) # # xOld = xNew # fOld = fNew # xNew += dx # # xNew = max(min(xNew, xmax), xmin) # dx = xNew - xOld # # err = np.abs(dx) # it += 1 # return xNew # # elif method == 'Golden': # GR = (np.sqrt(5) - 1) / 2 # X = np.array([xmin, xmax - GR * (xmax - xmin), xmax]) # F = np.zeros_like(X) # for i in range(len(X)): # F[i] = np.abs(f(X[i])) # # err = 1 # it = 0 # while err > errLim: # xNew = X[0] + GR * (X[2] - X[0]) # fNew = np.abs(f(xNew)) # # if fNew < F[1]: # X = np.array([X[1], xNew, X[2]]) # F = np.array([F[1], fNew, X[2]]) # else: # X = np.array([xNew, X[1], X[0]]) # F = np.array([fNew, F[1], F[0]]) # err = np.abs(X[2] - X[0]) # it += 1 # # return f((X[2] + X[0]) / 2) PK!']]krampy/stopwatch.py"""A simple stopwatch for measuring walltime.""" import datetime class Stopwatch(object): """A simple class representing a stopwatch to measure wall clock time. Parameters ---------- start : bool, optional If True, the Stopwatch will be started on instantiation. Attributes ---------- start_time : datetime The time the Stopwatch was started. stop_time : datetime The time the Stopwatch was stopped. """ def __init__(self, start=False): self.start_time = None self.stop_time = None if start: self.start() def start(self): """Start the clock.""" self.start_time = datetime.datetime.now() def stop(self): """Stop the clock.""" self.stop_time = datetime.datetime.now() def get_elapsed_time(self): """Return the Stopwatch elapsed time. If the stop time is set, return the difference between stop and start time. Otherwise, use the current time as the stop time. Returns ------- datetime.datetime The elapsed time. """ if self.stop_time is None: return datetime.datetime.now() - self.start_time return self.stop_time - self.start_time def get_elapsed_seconds(self): """Return the elapsed time in seconds.""" return self.get_elapsed_time().total_seconds() def print_elapsed_time(self, stop=False, print_func=print): """Print the elapsed time to the screen. Parameters ---------- stop : bool, optional If True, the Stopwatch will also be stopped when printing the elapsed time. print_func : function, optional A logger function to handle logging. Defaults to print function. """ if stop: self.stop() minutes, seconds = divmod(self.get_elapsed_seconds(), 60) hours, minutes = divmod(minutes, 60) print_func( "\nExecution completed in %d hours, %d minutes, %d seconds", hours, minutes, seconds, ) PK!>-krampy/trig.py"""Convenient trigonometric functions.""" import math import numpy def mag(vec): """Return the magnitude of a vector.""" return numpy.linalg.norm(vec) def deg2rad(ang): """Convert an angle from degrees to radians. Parameters ---------- ang : float An angle, in degrees. Returns ------- float The angle in radians. """ return ang * numpy.pi / 180 def rad2deg(ang): """Convert an angle from radians to degrees. Parameters ---------- ang : float An angle, in radians. Returns ------- float The angle in degrees. """ return ang * 180.0 / numpy.pi def cosd(ang): """Return the cosine of an angle specified in degrees. Parameters ---------- ang : float An angle, in degrees. """ return numpy.cos(deg2rad(ang)) def sind(ang): """Return the sine of an angle specified in degrees. Parameters ---------- ang : float An angle, in degrees. """ return numpy.sin(deg2rad(ang)) def tand(ang): """Return the tangent of an angle specified in degrees. Parameters ---------- ang : float An angle, in degrees. """ return math.tan(deg2rad(ang)) def acosd(slope): """Return the arccosine of an angle specified in degrees. Returns ------- float An angle, in degrees. """ return rad2deg(math.acos(slope)) def asind(slope): """Return the arcsine of an angle specified in degrees. Returns ------- float An angle, in degrees. """ return rad2deg(math.asin(slope)) def atand(slope): """Return the arctangent of an angle specified in degrees. Returns ------- float An angle, in degrees. """ return rad2deg(math.atan(slope)) def atand2(delta_y, delta_x): """Return the arctan2 of an angle specified in degrees. Returns ------- float An angle, in degrees. """ return rad2deg(numpy.arctan2(delta_y, delta_x)) def ang2vec(ang): """Convert angle in radians to a 3-d unit vector in the x-y plane.""" return numpy.array([numpy.cos(ang), numpy.sin(ang), 0.0]) def ang2vecd(ang): """Convert angle in degrees to a 3-d unit vector in the x-y plane.""" return ang2vec(deg2rad(ang)) def angd2vec(ang): """Deprecated alias for ang2vecd.""" return ang2vecd(ang) def angd2vec2d(ang): """Convert angle in degrees to a 2-d unit vector in the x-y plane.""" return ang2vecd(ang)[:2] def quaternion_to_euler_angles(angle, x_comp, y_comp, z_comp): """Convert a quaternion to Euler angles. Parameters ---------- angle : float The angle to rotate about the component vector. x_comp, y_comp, z_comp : float, float, float The x, y, and z components of the vector about which to rotate. """ alpha = math.degrees( math.atan2( +2.0 * (angle * x_comp + y_comp * z_comp), +1.0 - 2.0 * (x_comp ** 2 + y_comp ** 2), ) ) beta = math.degrees( math.asin(max([min([+2.0 * (angle * y_comp - z_comp * x_comp), 1.0]), -1.0])) ) gamma = math.degrees( math.atan2( +2.0 * (angle * z_comp + x_comp * y_comp), +1.0 - 2.0 * (y_comp ** 2 + z_comp ** 2), ) ) return alpha, beta, gamma def rotate_vec_2d(vec, ang): """Rotate a 2d vector v by angle ang in degrees.""" vec3d = numpy.zeros(3) vec3d[:2] = vec return rotate_vec(vec3d, 0, 0, ang)[:2] def rotate_vec(vec, ang_x=0.0, ang_y=0.0, ang_z=0.0, about=numpy.zeros(3)): """Rotate a 3d vector v by angle ang in degrees.""" rot_x = numpy.array( [[1, 0, 0], [0, cosd(ang_x), -sind(ang_x)], [0, sind(ang_x), cosd(ang_x)]] ) rot_y = numpy.array( [[cosd(ang_y), 0, sind(ang_y)], [0, 1, 0], [-sind(ang_y), 0, cosd(ang_y)]] ) rot_z = numpy.array( [[cosd(ang_z), -sind(ang_z), 0], [sind(ang_z), cosd(ang_z), 0], [0, 0, 1]] ) return ( numpy.dot( numpy.dot(numpy.dot(rot_x, rot_y), rot_z), (numpy.asarray(vec) - about).T ) + about ) def cross2(vec_a, vec_b): """Return the cross product of two 2d vectors.""" return vec_a[0] * vec_b[1] - vec_a[1] * vec_b[0] PK!S krampy/unit.py""" A simple module for storing unit conversion factors to convert to SI. Usage: import unit a length e.g. can be represented as 1.0 * unit.ft to represent one foot, which will return 0.3048 meters. """ import math # Acceleration due to gravity grav = 9.80665 # Convert length to m m = 1.0 cm = m / 100.0 mm = m / 1000.0 ft = 0.3048 inch = ft / 12.0 # Convert speed to m/s mps = 1.0 fps = ft kts = 0.5144444 # Convert mass to kg kg = 1.0 lbm = kg / 2.20462 # Convert force to N N = 1.0 lb = grav * lbm lbf = 1.0 * lb # lbs-force mtf = grav * 1000.0 # metric tonnes-force stf = N * lb * 2000.0 rad = 1.0 deg = math.pi / 180.0 PK!)(;krampy/unit_old.py"""This module is used to store factors for unit conversion. Usage: import planingfsi.unit as unit value = 15.0 * unit.ft """ import math # Convert length to m m = 1.0 cm = m / 100.0 mm = m / 1000.0 ft = 0.3048 inch = ft / 12.0 # Convert speed to m/s mps = 1.0 fps = ft # Convert mass to kg kg = 1.0 lbm = kg / 2.20462 # Convert force to N N = 1.0 lb = 9.81 * lbm lbf = 1.0 * lb rad = 1.0 deg = math.pi / 180.0 PK!f?vM))planingfsi/__init__.py"""PlaningFSI is a Python package for solving 2d FSI problems. PlaningFSI solves the general FSI problem of 2-D planing surfaces on a free surface, where each surface can be rigid or flexible. The substructures are part of a rigid body, which can be free in sinkage and trim. The fluid solver is based on linearized potential-flow assumptions, and the structural solver considers a large-deformation simple beam element. """ import logging from .__version__ import __version__ logger = logging.getLogger("planingfsi") logger.setLevel(logging.DEBUG) PK!$planingfsi/__version__.py__version__ = "0.1.1" PK!LYYplaningfsi/cli.py"""Command line program for PlaningFSI. This script is called from the command line with options to plot or load results from file. It reads the fluid and solid body properties from dictionary files, assembles the problem, and runs it. """ import click import krampy from . import config from . import logger from .fe.femesh import Mesh from .fsi.simulation import Simulation @click.group(name="planingfsi", help="Run the PlaningFSI program") @click.option("post_mode", "--post", is_flag=True) @click.option("--plot_save", is_flag=True) @click.option("new_case", "--new", is_flag=True) def run_planingfsi(post_mode, plot_save, new_case): if post_mode: logger.info("Running in post-processing mode") config.plotting.save = True config.plotting.plot_any = True config.io.results_from_file = True if plot_save: config.plotting.save = True config.plotting.plot_any = True if new_case: krampy.rm_rf(krampy.find_files(config.path.case_dir, "[0-9]*")) simulation = Simulation() simulation.load_input_files() simulation.run() @run_planingfsi.command(name="mesh") @click.argument("mesh_dict", required=False) @click.option("plot_show", "--show", is_flag=True) @click.option("plot_save", "--save", is_flag=True) @click.option("--verbose", is_flag=True) def generate_mesh(mesh_dict, plot_show, plot_save, verbose): if not mesh_dict: config.path.mesh_dict_dir = mesh_dict mesh = Mesh() exec(open(config.path.mesh_dict_dir).read()) mesh.display(disp=verbose) mesh.plot(show=plot_show, save=plot_save) mesh.write() PK!88planingfsi/config.py"""This module is used to store the global configuration. Values are stored after reading the configDict file, and values can be accessed by other packages and modules by importing the config module. Usage: from planingfsi import config The global attributes can then be simply accessed via config.attribute_name """ import math import os from pathlib import Path import matplotlib from krampy.iotools import load_dict_from_file from planingfsi import logger DICT_NAME = "configDict" logger.info("Loading values from {0}".format(DICT_NAME)) class ConfigItem(object): """A descriptor to represent a configuration item. Attributes are loaded from a dictionary with fancy default handling. """ def __init__(self, alt_key=None, alt_keys=None, default=None, type_=None): self.name = None self.alt_keys = [] if alt_key: self.alt_keys.append(alt_key) if alt_keys: self.alt_keys.extend(alt_keys) self.default = default self.type_ = type_ def __get__(self, instance, owner): """Retrieve the value from the instance dictionary. If it doesn't exist, return the default.""" if self.name in instance.__dict__: return instance.__dict__[self.name] return self.default def __set__(self, instance, value): """When the value is set, try to convert it and then store it in the instance dictionary.""" if value is not None and self.type_ is not None: value = self.type_(value) instance.__dict__[self.name] = value def __set_name__(self, _, name): self.name = name @property def keys(self): """A list of keys to look for when reading in the value.""" return [self.name] + self.alt_keys def get_from_dict(self, dict_): """Try to read all keys from the dictionary until a non-None value is found. Returns the default value if no appropriate value is found in the dictionary. """ for key in self.keys: value = dict_.get(key) if value is not None: return value raise KeyError( 'None of the following keys "{}" found in dictionary.'.format(self.keys) ) class SubConfig(object): """An empty class used simply for dividing the configuration into different sections. Also useful in helping define the namespace scopes. """ def __init__(self): if Path(DICT_NAME).exists(): self.load_from_dict(DICT_NAME) def load_from_dict(self, dict_name: str): """Load the configuration from a dictionary file. Parameters ---------- dict_name The path to the dictionary file. """ dict_ = load_dict_from_file(dict_name) config_items = {key: item for key, item in self.__class__.__dict__.items()} for key, config_item in self.__class__.__dict__.items(): if isinstance(config_item, ConfigItem): try: value = config_item.get_from_dict(dict_) except KeyError: pass else: setattr(self, config_item.name, value) class FlowConfig(SubConfig): density = ConfigItem(alt_key="rho", default=998.2) gravity = ConfigItem(alt_key="g", default=9.81) kinematic_viscosity = ConfigItem(alt_key="nu", default=1e-6) waterline_height = ConfigItem(alt_key="hWL", default=0.0) num_dim = ConfigItem(alt_key="dim", default=2) include_friction = ConfigItem(alt_key="shearCalc", default=False) _froude_num = ConfigItem(alt_key="Fr", default=None, type_=float) _flow_speed = ConfigItem(alt_key="U", default=None, type_=float) @property def reference_length(self): return body.reference_length @property def flow_speed(self): """The flow speed is the native variable to store free-stream velocity. However, if Froude number is set from input file, that should override the flow speed input. """ if self._froude_num is not None: if self._flow_speed is not None: raise ValueError( "Only one flow speed variable (either Froude number or flow " "speed) must be set in {0}".format(DICT_NAME) ) self.froude_num = self._froude_num elif self._flow_speed is None: raise ValueError("Must specify either U or Fr in {0}".format(DICT_NAME)) return self._flow_speed @flow_speed.setter def flow_speed(self, value): """Set the raw flow speed variable and ensure raw Froude number is not also set.""" self._flow_speed = value self._froude_num = None @property def froude_num(self): return self.flow_speed / math.sqrt(self.gravity * self.reference_length) @froude_num.setter def froude_num(self, value): self.flow_speed = value * math.sqrt(self.gravity * self.reference_length) @property def stagnation_pressure(self): return 0.5 * self.density * self.flow_speed ** 2 @property def k0(self): return self.gravity / self.flow_speed ** 2 @property def lam(self): return 2 * math.pi / self.k0 class BodyConfig(SubConfig): xCofG = ConfigItem(default=0.0) yCofG = ConfigItem(default=0.0) _xCofR = ConfigItem(type_=float) _yCofR = ConfigItem(type_=float) mass = ConfigItem(alt_key="m", default=1.0) _weight = ConfigItem(alt_key="W") reference_length = ConfigItem(alt_keys=["Lref", "Lc"], default=1.0) _cushion_pressure = ConfigItem(alt_key="Pc", default=0.0) _seal_pressure = ConfigItem(alt_key="Ps", default=0.0) # TODO: Do these belong here? PcBar = ConfigItem() PsBar = ConfigItem() # Rigid body motion parameters time_step = ConfigItem("timeStep", default=1e-3) relax_rigid_body = ConfigItem("rigidBodyRelax", default=1.0) motion_method = ConfigItem("motionMethod", default="Physical") motion_jacobian_first_step = ConfigItem("motionJacobianFirstStep", default=1e-6) bow_seal_tip_load = ConfigItem("bowSealTipLoad", default=0.0) tip_constraint_ht = ConfigItem("tipConstraintHt", type_=float) seal_load_pct = ConfigItem("sealLoadPct", default=1.0) cushion_force_method = ConfigItem("cushionForceMethod", default="Fixed") initial_draft = ConfigItem("initialDraft", default=0.0) initial_trim = ConfigItem("initialTrim", default=0.0) max_draft_step = ConfigItem("maxDraftStep", default=1e-3) max_trim_step = ConfigItem("maxTrimStep", default=1e-3) max_draft_acc = ConfigItem("maxDraftAcc", default=1000.0) max_trim_acc = ConfigItem("maxTrimAcc", default=1000.0) free_in_draft = ConfigItem("freeInDraft", default=False) free_in_trim = ConfigItem("freeInTrim", default=False) draft_damping = ConfigItem("draftDamping", default=1000.0) trim_damping = ConfigItem("trimDamping", default=500.0) _relax_draft = ConfigItem("draftRelax", type_=float) _relax_trim = ConfigItem("trimRelax", type_=float) @property def relax_draft(self): if self._relax_draft is not None: return self._relax_draft return body.relax_rigid_body @property def relax_trim(self): if self._relax_trim is not None: return self._relax_trim return body.relax_rigid_body @property def Pc(self): return self._cushion_pressure @property def PcBar(self): return self._cushion_pressure * self.reference_length / self.weight @PcBar.setter def PcBar(self, value): self._cushion_pressure = value * self.weight / self.reference_length @property def Ps(self): return self._seal_pressure @property def PsBar(self): if self._cushion_pressure == 0.0: return 0.0 return self._seal_pressure / self._cushion_pressure @PsBar.setter def PsBar(self, value): self._seal_pressure = value * self._cushion_pressure @property def xCofR(self): if self._xCofR is not None: return self._xCofR return self.xCofG @property def yCofR(self): if self._yCofR is not None: return self._yCofR return self.yCofG @property def weight(self): if self._weight is not None: return self._weight return self.mass * flow.gravity @weight.setter def weight(self, value): self.mass = value / flow.gravity class PlotConfig(SubConfig): pType = ConfigItem(alt_key="pScaleType", default="stagnation") _pScale = ConfigItem(alt_key="pScale", default=1.0) _pScalePct = ConfigItem(alt_key="pScalePct", default=1.0) _pScaleHead = ConfigItem(alt_key="pScaleHead", default=1.0) growth_rate = ConfigItem(alt_key="growthRate", default=1.1) CofR_grid_len = ConfigItem(alt_key="CofRGridLen", default=0.5) fig_format = ConfigItem(alt_key="figFormat", default="eps") pressure_limiter = ConfigItem("pressureLimiter", default=False) # Load plot extents ext_e = ConfigItem("extE", default=0.1) ext_w = ConfigItem("extW", default=0.1) ext_n = ConfigItem("extN", default=0.1) ext_s = ConfigItem("extS", default=0.1) xmin = ConfigItem("plotXMin", type_=float) xmax = ConfigItem("plotXMax", type_=float) ymin = ConfigItem("plotYMin", type_=float) ymax = ConfigItem("plotYMax", type_=float) lambda_min = ConfigItem("lamMin", default=-1.0) lambda_max = ConfigItem("lamMax", default=1.0) _x_fs_min = ConfigItem("xFSMin", type_=float) _x_fs_max = ConfigItem("xFSMax", type_=float) # Whether to save, show, or watch plots save = ConfigItem("plotSave", default=False) show_pressure = ConfigItem("plotPressure", default=False) show = ConfigItem("plotShow", default=False) _watch = ConfigItem("plotWatch", default=False) @property def watch(self): return self._watch or self.show @property def plot_any(self): return self.show or self.save or self.watch or self.show_pressure @property def x_fs_min(self): if self._x_fs_min is not None: return self._x_fs_min if self.xmin is not None: return self.xmin return self.lambda_min * flow.lam @property def x_fs_max(self): if self._x_fs_max is not None: return self._x_fs_max if self.xmax is not None: return self.xmax return self.lambda_max * flow.lam @property def pScale(self): if plotting.pType == "stagnation": return flow.stagnation_pressure elif plotting.pType == "cushion": return body.Pc if body.Pc > 0.0 else 1.0 elif plotting.pType == "hydrostatic": return flow.density * flow.gravity * self._pScaleHead return self._pScale * self._pScalePct class PathConfig(SubConfig): # Directories and file formats case_dir = ConfigItem("caseDir", default=".") fig_dir_name = ConfigItem("figDirName", default="figures") body_dict_dir = ConfigItem("bodyDictDir", default="bodyDict") input_dict_dir = ConfigItem("inputDictDir", default="inputDict") cushion_dict_dir = ConfigItem( alt_keys=["pressureCushionDictDir", "cushionDictDir"], default="cushionDict" ) mesh_dir = ConfigItem("meshDir", default="mesh") mesh_dict_dir = ConfigItem("meshDictDir", default="meshDict") class IOConfig(SubConfig): data_format = ConfigItem("dataFormat", default="txt") write_interval = ConfigItem("writeInterval", default=1) write_time_histories = ConfigItem("writeTimeHistories", default=False) results_from_file = ConfigItem("resultsFromFile", default=False) class SolverConfig(SubConfig): # Parameters for wetted length solver wetted_length_solver = ConfigItem("wettedLengthSolver", default="Secant") wetted_length_tol = ConfigItem("wettedLengthTol", default=1e-6) wetted_length_relax = ConfigItem("wettedLengthRelax", default=1.0) wetted_length_max_it = ConfigItem("wettedLengthMaxIt", default=20) wetted_length_max_it_0 = ConfigItem("wettedLengthMaxIt0", default=100) wetted_length_max_step_pct = ConfigItem("wettedLengthMaxStepPct", default=0.2) _wetted_length_max_step_pct_inc = ConfigItem( "wettedLengthMaxStepPctInc", type_=float ) _wetted_length_max_step_pct_dec = ConfigItem( "wettedLengthMaxStepPctDec", type_=float ) wetted_length_max_jacobian_reset_step = ConfigItem( "wettedLengthMaxJacobianResetStep", default=100 ) max_it = ConfigItem("maxIt", default=1) num_ramp_it = ConfigItem("rampIt", default=0) relax_initial = ConfigItem("relaxI", default=0.01) relax_final = ConfigItem("relaxF", default=0.5) max_residual = ConfigItem("tolerance", default=1e-6) pretension = ConfigItem("pretension", default=0.1) relax_FEM = ConfigItem(alt_keys=["FEMRelax", "relaxFEM"], default=1.0) max_FEM_disp = ConfigItem("maxFEMDisp", default=1.0) num_damp = ConfigItem("numDamp", default=0.0) @property def wetted_length_max_step_pct_inc(self): if self._wetted_length_max_step_pct_inc is not None: return self._wetted_length_max_step_pct_inc return self.wetted_length_max_step_pct @property def wetted_length_max_step_pct_dec(self): if self._wetted_length_max_step_pct_dec is not None: return self._wetted_length_max_step_pct_dec return self.wetted_length_max_step_pct # Flow-related variables flow = FlowConfig() body = BodyConfig() plotting = PlotConfig() path = PathConfig() io = IOConfig() solver = SolverConfig() def load_from_file(filename): for c in [flow, body, plotting, path, io, solver]: c.load_from_dict(filename) # Initialized constants ramp = 1.0 has_free_structure = False it_dir = "" it = -1 # Use tk by default. Otherwise try Agg. Otherwise, disable plotting. _fallback_engine = "Agg" if os.environ.get("DISPLAY") is None: matplotlib.use(_fallback_engine) else: try: from matplotlib import pyplot except ImportError: try: matplotlib.use(_fallback_engine) except ImportError: plotting.plot_any = False PK!planingfsi/fe/__init__.pyPK!(VF  planingfsi/fe/felib.pyimport numpy as np from scipy.interpolate import interp1d import planingfsi.config as config # import planingfsi.krampy as kp class Node: obj = [] @classmethod def get_index(cls, ind): return cls.obj[ind] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def count(cls): return len(cls.All()) @classmethod def find_nearest(cls): return [o for o in cls.obj] def __init__(self): self.node_num = len(Node.obj) Node.obj.append(self) self.x = 0.0 self.y = 0.0 self.dof = [self.node_num * config.flow.num_dim + i for i in [0, 1]] self.is_dof_fixed = [True] * config.flow.num_dim self.fixed_load = np.zeros(config.flow.num_dim) self.line_xy = None def set_coordinates(self, x, y): self.x = x self.y = y def move_coordinates(self, dx, dy): self.x += dx self.y += dy def get_coordinates(self): return self.x, self.y def plot(self, sty=None): if self.line_xy is not None: self.line_xy.set_data(self.x, self.y) class Element: obj = [] def __init__(self): self.element_num = len(Element.obj) Element.obj.append(self) # self.fluidS = [] # self.fluidP = [] self.node = [None] * 2 self.dof = [0] * config.flow.num_dim self.length = 0.0 self.initial_length = None self.qp = np.zeros((2)) self.qs = np.zeros((2)) self.lineEl = None self.lineEl0 = None self.plot_on = True def set_properties(self, **kwargs): length = kwargs.get("length", None) axialForce = kwargs.get("axialForce", None) EA = kwargs.get("EA", None) if not length == None: self.length = length self.initial_length = length if not axialForce == None: self.axial_force = axialForce self.initial_axial_force = axialForce if not EA == None: self.EA = EA def set_nodes(self, nodeList): self.node = nodeList self.dof = [dof for nd in self.node for dof in nd.dof] self.update_geometry() self.init_pos = [np.array(nd.get_coordinates()) for nd in self.node] def set_parent(self, parent): self.parent = parent def update_geometry(self): x = [nd.x for nd in self.node] y = [nd.y for nd in self.node] self.length = ((x[1] - x[0]) ** 2 + (y[1] - y[0]) ** 2) ** 0.5 if self.initial_length is None: self.initial_length = self.length self.gamma = kp.atand2(y[1] - y[0], x[1] - x[0]) def set_pressure_and_shear(self, qp, qs): self.qp = qp self.qs = qs def plot(self): if self.lineEl is not None and self.plot_on: self.lineEl.set_data([nd.x for nd in self.node], [nd.y for nd in self.node]) if self.lineEl0 is not None and self.plot_on: basePt = [self.parent.parent.xCofR0, self.parent.parent.yCofR0] pos = [ kp.rotatePt(pos, basePt, self.parent.parent.trim) - np.array([0, self.parent.parent.draft]) for pos in self.init_pos ] x, y = list(zip(*[[posi[i] for i in range(2)] for posi in pos])) self.lineEl0.set_data(x, y) class TrussElement(Element): def __init__(self): Element.__init__(self) self.initial_axial_force = 0.0 self.EA = 0.0 def get_stiffness_and_force(self): # Stiffness matrices in local coordinates KL = ( np.array([[1, 0, -1, 0], [0, 0, 0, 0], [-1, 0, 1, 0], [0, 0, 0, 0]]) * self.EA / self.length ) KNL = ( np.array([[1, 0, -1, 0], [0, 1, 0, -1], [-1, 0, 1, 0], [0, -1, 0, 1]]) * self.axial_force / self.length ) # Force vectors in local coordinates FL = np.array([[self.qs[0]], [self.qp[0]], [self.qs[1]], [self.qp[1]]]) FNL = np.array([[1], [0], [-1], [0]]) * self.axial_force # Add linear and nonlinear components K = KL + KNL F = FL + FNL # Rotate stiffness and force matrices into global coordinates C = kp.cosd(self.gamma) S = kp.sind(self.gamma) TM = np.array([[C, S, 0, 0], [-S, C, 0, 0], [0, 0, C, S], [0, 0, -S, C]]) K = np.dot(np.dot(TM.T, K), TM) F = np.dot(TM.T, F) return K, F def update_geometry(self): Element.update_geometry(self) self.axial_force = (1 - config.ramp) * self.initial_axial_force + self.EA * ( self.length - self.initial_length ) / self.initial_length class RigidElement(Element): pass # def __init__(self): # super().__init__(self) PK!$5$5planingfsi/fe/femesh.pyimport os import numpy as np import matplotlib.pyplot as plt import planingfsi.config as config import krampy as kp class Mesh: @classmethod def get_pt_by_id(cls, ID): return Point.find_by_id(ID) def __init__(self): self.submesh = [] self.add_point(0, "dir", [0, 0]) def add_submesh(self, name=""): submesh = Submesh(name) self.submesh.append(submesh) return submesh def add_point(self, ID, method, position, **kwargs): P = Point() P.set_id(ID) if method == "dir": P.set_position(np.array(position)) elif method == "rel": base_pt_id, ang, R = position P.set_position( Point.find_by_id(base_pt_id).get_position() + R * kp.ang2vecd(ang) ) elif method == "con": base_pt_id, dim, val = position ang = kwargs.get("angle", 0 if dim == "x" else 90) base_pt = Point.find_by_id(base_pt_id).get_position() if dim == "x": P.set_position( np.array([val, base_pt[1] + (val - base_pt[0]) * kp.tand(ang)]) ) elif dim == "y": P.set_position( np.array([base_pt[0] + (val - base_pt[1]) / kp.tand(ang), val]) ) else: print("Incorrect dimension specification") elif method == "pct": base_pt_id, end_pt_id, pct = position base_pt = Point.find_by_id(base_pt_id).get_position() end_pt = Point.find_by_id(end_pt_id).get_position() P.set_position((1 - pct) * base_pt + pct * end_pt) else: raise NameError( "Incorrect position specification method for point, ID: {0}".format(ID) ) return P def add_point_along_curve(self, ID, curve, pct): P = Point() P.set_id(ID) P.set_position(map(curve.get_shape_func(), [pct])[0]) return P def add_load(self, pt_id, F): Point.find_by_id(pt_id).add_fixed_load(F) def fix_all_points(self): for pt in Point.All(): pt.set_fixed_dof("x", "y") def fix_points(self, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.set_fixed_dof("x", "y") def rotate_points(self, base_pt_id, angle, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.rotate(base_pt_id, angle) def rotate_all_points(self, base_pt_id, angle): for pt in Point.All(): pt.rotate(base_pt_id, angle) def move_points(self, dx, dy, pt_id_list): for pt in [Point.find_by_id(pt_id) for pt_id in pt_id_list]: pt.move(dx, dy) def move_all_points(self, dx, dy): for pt in Point.All(): pt.move(dx, dy) def scale_all_points(self, sf, base_pt_id=0): base_pt = Point.find_by_id(base_pt_id).get_position() for pt in Point.All(): pt.set_position((pt.get_position() - base_pt) * sf + base_pt) def get_diff(self, pt0, pt1): return ( Point.find_by_id(pt1).get_position() - Point.find_by_id(pt0).get_position() ) def get_length(self, pt0, pt1): return np.linalg.norm(self.get_diff(pt0, pt1)) def display(self, **kwargs): if kwargs.get("disp", False): Shape.print_all() print(("Line count: {0}".format(Line.count()))) print(("Point count: {0}".format(Point.count()))) def plot(self, **kwargs): show = kwargs.get("show", False) save = kwargs.get("save", False) plot = show or save if plot: plt.figure(figsize=(16, 14)) plt.axis("equal") plt.xlabel(r"$x$", size=18) plt.ylabel(r"$y$", size=18) Shape.plot_all() lims = plt.gca().get_xlim() ext = (lims[1] - lims[0]) * 0.1 plt.xlim([lims[0] - ext, lims[1] + ext]) # Process optional arguments and save or show figure if save: saved_file_name = kwargs.get("fileName", "meshLayout") plt.savefig(saved_file_name + ".eps", format="eps") if show: plt.show() def write(self): kp.createIfNotExist(config.path.mesh_dir) x, y = list(zip(*[pt.get_position() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "nodes.txt"), ["x", x], ["y", y] ) x, y = list(zip(*[pt.get_fixed_dof() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "fixedDOF.txt"), ["x", x], ["y", y], headerFormat=">1", dataFormat=">1", ) x, y = list(zip(*[pt.get_fixed_load() for pt in Point.All()])) kp.writeaslist( os.path.join(config.path.mesh_dir, "fixedLoad.txt"), ["x", x], ["y", y], headerFormat=">6", dataFormat="6.4e", ) for sm in self.submesh: sm.write() class Submesh(Mesh): def __init__(self, name): Mesh.__init__(self) self.name = name self.line = [] def add_curve(self, pt_id1, pt_id2, **kwargs): arc_length = kwargs.get("arcLen", None) radius = kwargs.get("radius", None) C = Curve(kwargs.get("Nel", 1)) C.set_id(kwargs.get("ID", -1)) C.set_end_pts_by_id(pt_id1, pt_id2) if arc_length is not None: C.set_arc_length(arc_length) elif radius is not None: C.set_radius(radius) else: C.set_arc_length(0.0) C.distribute_points() for pt in C.pt: pt.set_used() self.line += [l for l in C.get_lines()] return C def write(self): if len(self.line) > 0: ptL, ptR = list( zip( *[ [pt.get_index() for pt in l._get_element_coords()] for l in self.line ] ) ) kp.writeaslist( os.path.join( config.path.mesh_dir, "elements_{0}.txt".format(self.name) ), ["ptL", ptL], ["ptR", ptR], headerFormat="<4", dataFormat=">4", ) class Shape: obj = [] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def count(cls): return len(cls.All()) @classmethod def plot_all(cls): for o in cls.obj: o.plot() @classmethod def print_all(cls): for o in cls.obj: o.display() @classmethod def find_by_id(cls, ID): if ID == -1: return None else: obj = [a for a in cls.All() if a.ID == ID] if len(obj) == 0: return None else: return obj[0] def __init__(self): self.set_index(Shape.count()) self.ID = -1 Shape.obj.append(self) def set_id(self, ID): existing = self.__class__.find_by_id(ID) if existing is not None: existing.set_id(-1) self.ID = ID def set_index(self, ind): self.ind = ind def get_id(self): return self.ID def get_index(self): return self.ind def plot(self): return 0 class Point(Shape): obj = [] def __init__(self): Shape.__init__(self) self.set_index(Point.count()) Point.obj.append(self) self.pos = np.zeros(2) self.is_dof_fixed = [True] * 2 self.fixed_load = np.zeros(2) self.is_used = False def set_used(self, used=True): self.is_used = True self.set_free_dof("x", "y") def get_fixed_load(self): return self.fixed_load def add_fixed_load(self, F): for i in range(2): self.fixed_load[i] += F[i] def set_free_dof(self, *args): for arg in args: if arg == "x": self.is_dof_fixed[0] = False if arg == "y": self.is_dof_fixed[1] = False def set_fixed_dof(self, *args): for arg in args: if arg == "x": self.is_dof_fixed[0] = True if arg == "y": self.is_dof_fixed[1] = True def get_fixed_dof(self): return self.is_dof_fixed def move(self, dx, dy): self.set_position(self.get_position() + np.array([dx, dy])) def set_position(self, pos): self.pos = pos def get_position(self): return self.pos def get_x_pos(self): return self.pos[0] def get_y_pos(self): return self.pos[1] def rotate(self, base_pt_id, angle): base_pt = Point.find_by_id(base_pt_id).get_position() self.set_position(kp.rotateVec(self.get_position() - base_pt, angle) + base_pt) def display(self): print( ( "{0} {1}: ID = {2}, Pos = {3}".format( self.__class__.__name__, self.get_index(), self.get_id(), self.get_position(), ) ) ) def plot(self): if self.ID == -1: plt.plot(self.pos[0], self.pos[1], "r*") else: plt.plot(self.pos[0], self.pos[1], "ro") plt.text(self.pos[0], self.pos[1], " {0}".format(self.ID)) class Curve(Shape): obj = [] def __init__(self, Nel=1): Shape.__init__(self) self.set_index(Curve.count()) Curve.obj.append(self) self.pt = [] self.line = [] self._end_pts = [] self.Nel = Nel self.plot_sty = "m--" def set_end_pts_by_id(self, pt_id1, pt_id2): self.set_end_pts([Point.find_by_id(pid) for pid in [pt_id1, pt_id2]]) def get_shape_func(self): xy = [pt.get_position() for pt in self._end_pts] if self.radius == 0.0: return lambda s: xy[0] * (1 - s) + xy[1] * s else: x, y = list(zip(*xy)) gam = np.arctan2(y[1] - y[0], x[1] - x[0]) alf = self.arc_length / (2 * self.radius) return lambda s: self._end_pts[0].get_position() + 2 * self.radius * np.sin( s * alf ) * kp.ang2vec(gam + (s - 1) * alf) def set_radius(self, R): self.radius = R self.calculate_chord() self.calculate_arc_length() def set_arc_length(self, arc_length): self.calculate_chord() if self.chord >= arc_length: self.arc_length = 0.0 self.set_radius(0.0) else: self.arc_length = arc_length self.calculate_radius() def calculate_radius(self): self.radius = 1 / self.calculate_curvature() def calculate_chord(self): x, y = list(zip(*[pt.get_position() for pt in self._end_pts])) self.chord = ((x[1] - x[0]) ** 2 + (y[1] - y[0]) ** 2) ** 0.5 def calculate_arc_length(self): if self.radius == 0: self.arc_length = self.chord else: f = lambda s: self.chord / (2 * self.radius) - np.sin(s / (2 * self.radius)) # Keep increasing guess until fsolve finds the first non-zero root self.arc_length = kp.fzero(f, self.chord + 1e-6) def calculate_curvature(self): f = lambda x: x * self.chord / 2 - np.sin(x * self.arc_length / 2) # Keep increasing guess until fsolve finds the first non-zero root kap = 0.0 kap0 = 0.0 while kap <= 1e-6: kap = kp.fzero(f, kap0) kap0 += 0.02 return kap def distribute_points(self): self.pt.append(self._end_pts[0]) if not self.Nel == 1: # Distribute N points along a parametric curve defined by f(s), s in [0,1] s = np.linspace(0.0, 1.0, self.Nel + 1)[1:-1] for xy in map(self.get_shape_func(), s): P = Point() self.pt.append(P) P.set_position(xy) self.pt.append(self._end_pts[1]) self.generate_lines() def generate_lines(self): for ptSt, ptEnd in zip(self.pt[:-1], self.pt[1:]): L = Line() L.set_end_pts([ptSt, ptEnd]) self.line.append(L) def get_lines(self): return self.line def set_pts(self, pt): self.pt = pt def set_end_pts(self, end_pt): self._end_pts = end_pt def _get_element_coords(self): return self.pt def getPtIDs(self): return [pt.get_id() for pt in self.pt] def display(self): print( ( "{0} {1}: ID = {2}, Pt IDs = {3}".format( self.__class__.__name__, self.get_index(), self.get_id(), self.getPtIDs(), ) ) ) def plot(self): x, y = list(zip(*[pt.get_position() for pt in self.pt])) plt.plot(x, y, self.plot_sty) class Line(Curve): obj = [] def __init__(self): Curve.__init__(self) self.set_index(Line.count()) Line.obj.append(self) self.plot_sty = "b-" def set_end_pts(self, endPt): Curve.set_end_pts(self, endPt) self.set_pts(endPt) PK!planingfsi/fe/structure.pyimport os import numpy as np from scipy.interpolate import interp1d # import planingfsi.io from planingfsi import config # from planingfsi import krampy as kp # from planingfsi import io from . import felib as fe class FEStructure: """Parent object for solving the finite-element structure. Consists of several rigid bodies and substructures. """ def __init__(self): self.rigid_body = [] self.substructure = [] self.node = [] self.res = 1.0 def add_rigid_body(self, dict_=None): if dict_ is None: dict_ = planingfsi.io.Dictionary() rigid_body = RigidBody(dict_) self.rigid_body.append(rigid_body) return rigid_body def add_substructure(self, dict_=None): if dict_ is None: dict_ = planingfsi.io.Dictionary() ss_type = dict_.read("substructureType", "rigid") if ss_type.lower() == "flexible" or ss_type.lower() == "truss": ss = FlexibleSubstructure(dict_) FlexibleSubstructure.obj.append(ss) elif ss_type.lower() == "torsionalspring": ss = TorsionalSpringSubstructure(dict_) else: ss = RigidSubstructure(dict_) self.substructure.append(ss) # Find parent body and add substructure to it body = [ b for b in self.rigid_body if b.name == dict_.read("bodyName", "default") ] if len(body) > 0: body = body[0] else: body = self.rigid_body[0] body.add_substructure(ss) ss.add_parent(body) print( ( "Adding Substructure {0} of type {1} to rigid body {2}".format( ss.name, ss.type_, body.name ) ) ) return ss def initialize_rigid_bodies(self): for bd in self.rigid_body: bd.initialize_position() def update_fluid_forces(self): for bd in self.rigid_body: bd.update_fluid_forces() def calculate_response(self): if config.io.results_from_file: self.load_response() else: for bd in self.rigid_body: bd.update_position() bd.update_substructure_positions() def get_residual(self): self.res = 0.0 for bd in self.rigid_body: if bd.free_in_draft or bd.free_in_trim: self.res = np.max([np.abs(bd.res_l), self.res]) self.res = np.max([np.abs(bd.res_m), self.res]) self.res = np.max([FlexibleSubstructure.res, self.res]) def load_response(self): self.update_fluid_forces() for bd in self.rigid_body: bd.load_motion() for ss in bd.substructure: ss.load_coordinates() ss.update_geometry() def write_results(self): for bd in self.rigid_body: bd.write_motion() for ss in bd.substructure: ss.write_coordinates() def plot(self): for body in self.rigid_body: for struct in body.substructure: struct.plot() def load_mesh(self): # Create all nodes x, y = np.loadtxt(os.path.join(config.path.mesh_dir, "nodes.txt"), unpack=True) xf, yf = np.loadtxt( os.path.join(config.path.mesh_dir, "fixedDOF.txt"), unpack=True ) fx, fy = np.loadtxt( os.path.join(config.path.mesh_dir, "fixedLoad.txt"), unpack=True ) for xx, yy, xxf, yyf, ffx, ffy in zip(x, y, xf, yf, fx, fy): nd = fe.Node() nd.set_coordinates(xx, yy) nd.is_dof_fixed = [bool(xxf), bool(yyf)] nd.fixed_load = np.array([ffx, ffy]) self.node.append(nd) for struct in self.substructure: struct.load_mesh() if ( struct.type_ == "rigid" or struct.type_ == "rotating" or struct.type_ == "torsionalSpring" ): struct.set_fixed_dof() for ss in self.substructure: ss.set_attachments() class RigidBody(object): def __init__(self, dict_): self.dict_ = dict_ self.num_dim = 2 self.draft = 0.0 self.trim = 0.0 self.name = self.dict_.read("bodyName", "default") self.weight = self.dict_.read( "W", self.dict_.read("loadPct", 1.0) * config.body.weight ) self.weight *= config.body.seal_load_pct self.m = self.dict_.read("m", self.weight / config.flow.gravity) self.Iz = self.dict_.read("Iz", self.m * config.body.reference_length ** 2 / 12) self.has_planing_surface = self.dict_.read("hasPlaningSurface", False) var = [ "max_draft_step", "max_trim_step", "free_in_draft", "free_in_trim", "draft_damping", "trim_damping", "max_draft_acc", "max_trim_acc", "xCofG", "yCofG", "xCofR", "yCofR", "initial_draft", "initial_trim", "relax_draft", "relax_trim", "time_step", "num_damp", ] for v in var: if v in self.dict_: setattr(self, v, self.dict_.read(v)) elif hasattr(config.body, v): setattr(self, v, getattr(config.body, v)) elif hasattr(config.solver, v): setattr(self, v, getattr(config.solver, v)) elif hasattr(config, v): setattr(self, v, getattr(config, v)) else: raise ValueError("Cannot find symbol: {0}".format(v)) # setattr(self, v, self.dict_.read(v, getattr(config.body, getattr(config, v)))) # self.xCofR = self.dict_.read('xCofR', self.xCofG) # self.yCofR = self.dict_.read('yCofR', self.yCofG) self.xCofR0 = self.xCofR self.yCofR0 = self.yCofR self.max_disp = np.array([self.max_draft_step, self.max_trim_step]) self.free_dof = np.array([self.free_in_draft, self.free_in_trim]) self.c_damp = np.array([self.draft_damping, self.trim_damping]) self.max_acc = np.array([self.max_draft_acc, self.max_trim_acc]) self.relax = np.array([self.relax_draft, self.relax_trim]) if self.free_in_draft or self.free_in_trim: config.has_free_structure = True self.v = np.zeros((self.num_dim)) self.a = np.zeros((self.num_dim)) self.v_old = np.zeros((self.num_dim)) self.a_old = np.zeros((self.num_dim)) self.beta = self.dict_.read("beta", 0.25) self.gamma = self.dict_.read("gamma", 0.5) self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 self.solver = None self.disp_old = 0.0 self.res_old = None self.two_ago_disp = 0.0 self.predictor = True self.f_old = 0.0 self.two_ago_f = 0.0 self.res_l = 1.0 self.res_m = 1.0 self.J = None self.J_tmp = None # Assign displacement function depending on specified method self.get_disp = lambda: (0.0, 0.0) if any(self.free_dof): if config.body.motion_method == "Secant": self.get_disp = self.get_disp_secant elif config.body.motion_method == "Broyden": self.get_disp = self.get_disp_broyden elif config.body.motion_method == "BroydenNew": self.get_disp = self.get_disp_broyden_new elif config.body.motion_method == "Physical": self.get_disp = self.get_disp_physical elif config.body.motion_method == "Newmark-Beta": self.get_disp = self.get_disp_newmark_beta elif config.body.motion_method == "PhysicalNoMass": self.get_disp = self.get_disp_physical_no_mass elif config.body.motion_method == "Sep": self.get_disp = self.getDispSep self.trim_solver = None self.draft_solver = None self.substructure = [] self.node = None print(("Adding Rigid Body: {0}".format(self.name))) def add_substructure(self, ss): self.substructure.append(ss) def store_nodes(self): self.node = [] for ss in self.substructure: for nd in ss.node: if not any([n.node_num == nd.node_num for n in self.node]): self.node.append(nd) def initialize_position(self): self.set_position(self.initial_draft, self.initial_trim) def set_position(self, draft, trim): self.update_position(draft - self.draft, trim - self.trim) def update_position(self, dDraft=None, dTrim=None): if dDraft is None: dDraft, dTrim = self.get_disp() if np.isnan(dDraft): dDraft = 0.0 if np.isnan(dTrim): dTrim = 0.0 if self.node is None: self.store_nodes() for nd in self.node: xo, yo = nd.get_coordinates() newPos = kp.rotatePt([xo, yo], [self.xCofR, self.yCofR], dTrim) nd.move_coordinates(newPos[0] - xo, newPos[1] - yo - dDraft) for s in self.substructure: s.update_geometry() self.xCofG, self.yCofG = kp.rotatePt( [self.xCofG, self.yCofG], [self.xCofR, self.yCofR], dTrim ) self.yCofG -= dDraft self.yCofR -= dDraft self.draft += dDraft self.trim += dTrim self.print_motion() def update_substructure_positions(self): FlexibleSubstructure.update_all() for ss in self.substructure: print(("Updating position for substructure: {0}".format(ss.name))) # if ss.type.lower() == 'flexible': # ss.getPtDispFEM() if ss.type_.lower() == "torsionalspring" or ss.type_.lower() == "rigid": ss.updateAngle() def update_fluid_forces(self): self.reset_loads() for ss in self.substructure: ss.update_fluid_forces() self.D += ss.D self.L += ss.L self.M += ss.M self.Da += ss.Da self.La += ss.La self.Ma += ss.Ma self.res_l = self.get_res_lift() self.res_m = self.get_res_moment() def reset_loads(self): self.D *= 0.0 self.L *= 0.0 self.M *= 0.0 if config.body.cushion_force_method.lower() == "assumed": self.L += ( config.body.Pc * config.body.reference_length * kp.cosd(config.trim) ) def get_disp_physical(self): disp = self.limit_disp(self.time_step * self.v) for i in range(self.num_dim): if np.abs(disp[i]) == np.abs(self.max_disp[i]): self.v[i] = disp[i] / self.time_step self.v += self.time_step * self.a self.a = np.array( [self.weight - self.L, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.a -= self.Cdamp * (self.v + self.v**3) * config.ramp self.a -= self.c_damp * self.v * config.ramp self.a /= np.array([self.m, self.Iz]) self.a = np.min( np.vstack( (np.abs(self.a), np.array([self.max_draft_acc, self.max_trim_acc])) ), axis=0, ) * np.sign(self.a) # accLimPct = np.min(np.vstack((np.abs(self.a), self.maxAcc)), axis=0) * np.sign(self.a) # for i in range(len(self.a)): # if self.a[i] == 0.0 or not self.freeDoF[i]: # accLimPct[i] = 1.0 # else: # accLimPct[i] /= self.a[i] # # self.a *= np.min(accLimPct) disp *= config.ramp return disp def get_disp_newmark_beta(self): self.a = np.array( [self.weight - self.L, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.a -= self.Cdamp * self.v * config.ramp self.a /= np.array([self.m, self.Iz]) self.a = np.min( np.vstack( (np.abs(self.a), np.array([self.max_draft_acc, self.max_trim_acc])) ), axis=0, ) * np.sign(self.a) dv = (1 - self.gamma) * self.a_old + self.gamma * self.a dv *= self.time_step dv *= 1 - self.numDamp self.v += dv disp = 0.5 * (1 - 2 * self.beta) * self.a_old + self.beta * self.a disp *= self.time_step disp += self.v_old disp *= self.time_step self.a_old = self.a self.v_old = self.v disp *= self.relax disp *= config.ramp disp = self.limit_disp(disp) # disp *= config.ramp return disp def get_disp_physical_no_mass(self): F = np.array( [ self.weight - self.L, self.M - self.weight * (config.body.xCofG - config.body.xCofR), ] ) # F -= self.Cdamp * self.v if self.predictor: disp = F / self.c_damp * self.time_step self.predictor = False else: disp = 0.5 * self.time_step / self.c_damp * (F - self.two_ago_f) self.predictor = True disp *= self.relax * config.ramp disp = self.limit_disp(disp) # self.v = disp / self.timeStep self.two_ago_f = self.f_old self.f_old = disp * self.c_damp / self.time_step return disp def get_disp_secant(self): if self.solver is None: self.resFun = lambda x: np.array( [ self.L - self.weight, self.M - self.weight * (config.body.xCofG - config.body.xCofR), ] ) self.solver = kp.RootFinder( self.resFun, np.array([config.body.initial_draft, config.body.initial_trim]), "secant", dxMax=self.max_disp * self.free_dof, ) if not self.disp_old is None: self.solver.takeStep(self.disp_old) # Solve for limited displacement disp = self.solver.limitStep(self.solver.getStep()) self.two_ago_disp = self.disp_old self.disp_old = disp return disp def reset_jacobian(self): if self.J_tmp is None: self.Jit = 0 self.J_tmp = np.zeros((self.num_dim, self.num_dim)) self.step = 0 self.Jfo = self.resFun(self.x) self.res_old = self.Jfo * 1.0 else: f = self.resFun(self.x) self.J_tmp[:, self.Jit] = (f - self.Jfo) / self.disp_old[self.Jit] self.Jit += 1 disp = np.zeros((self.num_dim)) if self.Jit < self.num_dim: disp[self.Jit] = config.body.motion_jacobian_first_step if self.Jit > 0: disp[self.Jit - 1] = -config.body.motion_jacobian_first_step self.disp_old = disp if self.Jit >= self.num_dim: self.J = self.J_tmp * 1.0 self.J_tmp = None self.disp_old = None return disp def get_disp_broyden_new(self): if self.solver is None: self.resFun = lambda x: np.array( [ f for f, free_dof in zip( [self.get_res_lift(), self.get_res_moment()], self.free_dof ) if free_dof ] ) maxDisp = [ m for m, free_dof in zip(self.max_disp, self.free_dof) if free_dof ] self.x = np.array( [ f for f, free_dof in zip([self.draft, self.trim], self.free_dof) if free_dof ] ) self.solver = kp.RootFinder(self.resFun, self.x, "broyden", dxMax=maxDisp) if not self.disp_old is None: self.solver.takeStep(self.disp_old) # Solve for limited displacement disp = self.solver.limitStep(self.solver.getStep()) self.two_ago_disp = self.disp_old self.disp_old = disp return disp def get_disp_broyden(self): if self.solver is None: self.resFun = lambda x: np.array( [self.L - self.weight, self.M - self.weight * (self.xCofG - self.xCofR)] ) # self.resFun = lambda x: np.array([self.get_res_moment(), self.get_res_lift()]) self.solver = 1.0 self.x = np.array([self.draft, self.trim]) if self.J is None: disp = self.reset_jacobian() else: self.f = self.resFun(self.x) if not self.disp_old is None: self.x += self.disp_old dx = np.reshape(self.disp_old, (self.num_dim, 1)) df = np.reshape(self.f - self.res_old, (self.num_dim, 1)) self.J += ( np.dot(df - np.dot(self.J, dx), dx.T) / np.linalg.norm(dx) ** 2 ) dof = self.free_dof dx = np.zeros_like(self.x) A = -self.J[np.ix_(dof, dof)] b = self.f.reshape(self.num_dim, 1)[np.ix_(dof)] dx[np.ix_(dof)] = np.linalg.solve(A, b) if self.res_old is not None: if any(np.abs(self.f) - np.abs(self.res_old) > 0.0): self.step += 1 if self.step >= 6: print("\nResetting Jacobian for Motion\n") disp = self.reset_jacobian() disp = dx.reshape(self.num_dim) # disp = self.solver.getStep() disp *= self.relax disp = self.limit_disp(disp) self.disp_old = disp self.res_old = self.f * 1.0 self.step += 1 return disp def limit_disp(self, disp): dispLimPct = np.min(np.vstack((np.abs(disp), self.max_disp)), axis=0) * np.sign( disp ) for i in range(len(disp)): if disp[i] == 0.0 or not self.free_dof[i]: dispLimPct[i] = 1.0 else: dispLimPct[i] /= disp[i] return disp * np.min(dispLimPct) * self.free_dof def get_res_lift(self): if np.isnan(self.L): res = 1.0 else: res = (self.L - self.weight) / ( config.flow.stagnation_pressure * config.body.reference_length + 1e-6 ) return np.abs(res * self.free_in_draft) def get_res_moment(self): if np.isnan(self.M): res = 1.0 else: if self.xCofG == self.xCofR and self.M == 0.0: res = 1.0 else: res = (self.M - self.weight * (self.xCofG - self.xCofR)) / ( config.flow.stagnation_pressure * config.body.reference_length ** 2 + 1e-6 ) return np.abs(res * self.free_in_trim) def print_motion(self): print(("Rigid Body Motion: {0}".format(self.name))) print((" CofR: ({0}, {1})".format(self.xCofR, self.yCofR))) print((" CofG: ({0}, {1})".format(self.xCofG, self.yCofG))) print((" Draft: {0:5.4e}".format(self.draft))) print((" Trim Angle: {0:5.4e}".format(self.trim))) print((" Lift Force: {0:5.4e}".format(self.L))) print((" Drag Force: {0:5.4e}".format(self.D))) print((" Moment: {0:5.4e}".format(self.M))) print((" Lift Force Air: {0:5.4e}".format(self.La))) print((" Drag Force Air: {0:5.4e}".format(self.Da))) print((" Moment Air: {0:5.4e}".format(self.Ma))) print((" Lift Res: {0:5.4e}".format(self.res_l))) print((" Moment Res: {0:5.4e}".format(self.res_m))) def write_motion(self): kp.writeasdict( os.path.join( config.it_dir, "motion_{0}.{1}".format(self.name, config.io.data_format) ), ["xCofR", self.xCofR], ["yCofR", self.yCofR], ["xCofG", self.xCofG], ["yCofG", self.yCofG], ["draft", self.draft], ["trim", self.trim], ["liftRes", self.res_l], ["momentRes", self.res_m], ["Lift", self.L], ["Drag", self.D], ["Moment", self.M], ["LiftAir", self.La], ["DragAir", self.Da], ["MomentAir", self.Ma], ) for ss in self.substructure: if ss.type_ == "torsionalSpring": ss.writeDeformation() def load_motion(self): K = kp.dict_ionary( os.path.join( config.it_dir, "motion_{0}.{1}".format(self.name, config.io.data_format) ) ) self.xCofR = K.read("xCofR", np.nan) self.yCofR = K.read("yCofR", np.nan) self.xCofG = K.read("xCofG", np.nan) self.yCofG = K.read("yCofG", np.nan) self.draft = K.read("draft", np.nan) self.trim = K.read("trim", np.nan) self.res_l = K.read("liftRes", np.nan) self.res_m = K.read("momentRes", np.nan) self.L = K.read("Lift", np.nan) self.D = K.read("Drag", np.nan) self.M = K.read("Moment", np.nan) self.La = K.read("LiftAir", np.nan) self.Da = K.read("DragAir", np.nan) self.Ma = K.read("MomentAir", np.nan) class Substructure: count = 0 obj = [] @classmethod def All(cls): return [o for o in cls.obj] @classmethod def find_by_name(cls, name): return [o for o in cls.obj if o.name == name][0] def __init__(self, dict_): self.index = Substructure.count Substructure.count += 1 Substructure.obj.append(self) self.dict_ = dict_ self.name = self.dict_.read("substructureName", "") self.type_ = self.dict_.read("substructureType", "rigid") self.interpolator = None self.seal_pressure = self.dict_.read_load_or_default("Ps", 0.0) self.seal_pressure_method = self.dict_.read("PsMethod", "constant") self.seal_over_pressure_pct = self.dict_.read_load_or_default( "overPressurePct", 1.0 ) self.cushion_pressure_type = self.dict_.read("cushionPressureType", None) self.tip_load = self.dict_.read_load_or_default("tipLoad", 0.0) self.tip_constraint_height = self.dict_.read("tipConstraintHt", None) self.struct_interp_type = self.dict_.read("structInterpType", "linear") self.struct_extrap = self.dict_.read("structExtrap", True) self.line_fluid_pressure = None self.line_air_pressure = None self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.U = 0.0 def add_parent(self, parent): self.parent = parent def get_residual(self): return 0.0 def set_interpolator(self, interpolator): self.interpolator = interpolator def set_element_properties(self): for el in self.el: el.set_properties(length=self.get_arc_length() / len(self.el)) def load_mesh(self): ndSt, ndEnd = np.loadtxt( os.path.join(config.path.mesh_dir, "elements_{0}.txt".format(self.name)), unpack=True, ) if isinstance(ndSt, float): ndSt = [int(ndSt)] ndEnd = [int(ndEnd)] else: ndSt = [int(nd) for nd in ndSt] ndEnd = [int(nd) for nd in ndEnd] ndInd = ndSt + [ndEnd[-1]] # Generate Element list self.node = [fe.Node.get_index(i) for i in ndInd] self.set_interp_function() self.el = [self.element_type() for _ in ndSt] self.set_element_properties() for ndSti, ndEndi, el in zip(ndSt, ndEnd, self.el): el.set_nodes([fe.Node.get_index(ndSti), fe.Node.get_index(ndEndi)]) el.set_parent(self) def set_interp_function(self): self.node_arc_length = np.zeros(len(self.node)) for i, nd0, nd1 in zip( list(range(len(self.node) - 1)), self.node[:-1], self.node[1:] ): self.node_arc_length[i + 1] = ( self.node_arc_length[i] + ((nd1.x - nd0.x) ** 2 + (nd1.y - nd0.y) ** 2) ** 0.5 ) if len(self.node_arc_length) == 2: self.struct_interp_type = "linear" elif len(self.node_arc_length) == 3 and not self.struct_interp_type == "linear": self.struct_interp_type = "quadratic" x, y = [np.array(xx) for xx in zip(*[(nd.x, nd.y) for nd in self.node])] self.interp_func_x, self.interp_func_y = ( interp1d(self.node_arc_length, x), interp1d(self.node_arc_length, y, kind=self.struct_interp_type), ) if self.struct_extrap: self.interp_func_x, self.interp_func_y = self.extrap_coordinates( self.interp_func_x, self.interp_func_y ) def extrap_coordinates(self, fxi, fyi): def extrap1d(interpolator): xs = interpolator.x ys = interpolator.y def pointwise(xi): if xi < xs[0]: return ys[0] + (xi - xs[0]) * (ys[1] - ys[0]) / (xs[1] - xs[0]) elif xi > xs[-1]: return ys[-1] + (xi - xs[-1]) * (ys[-1] - ys[-2]) / ( xs[-1] - xs[-2] ) else: return interpolator(xi) def ufunclike(xs): return np.array(list(map(pointwise, np.array([xs]))))[0] return ufunclike return extrap1d(fxi), extrap1d(fyi) def get_coordinates(self, si): return self.interp_func_x(si), self.interp_func_y(si) def get_xcoordinates(self, s): return self.get_coordinates(s)[0] def get_ycoordinates(self, s): return self.get_coordinates(s)[1] def get_arc_length(self): return max(self.node_arc_length) def write_coordinates(self): kp.writeaslist( os.path.join( config.it_dir, "coords_{0}.{1}".format(self.name, config.io.data_format) ), ["x [m]", [nd.x for nd in self.node]], ["y [m]", [nd.y for nd in self.node]], ) def load_coordinates(self): x, y = np.loadtxt( os.path.join( config.it_dir, "coords_{0}.{1}".format(self.name, config.io.data_format) ), unpack=True, ) for xx, yy, nd in zip(x, y, self.node): nd.set_coordinates(xx, yy) def update_fluid_forces(self): self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 if self.interpolator is not None: sMin, sMax = self.interpolator.get_min_max_s() for i, el in enumerate(self.el): # Get pressure at end points and all fluid points along element nodeS = [self.node_arc_length[i], self.node_arc_length[i + 1]] if self.interpolator is not None: s, pressure_fluid, tau = self.interpolator.get_loads_in_range( nodeS[0], nodeS[1] ) # Limit pressure to be below stagnation pressure if config.plotting.pressure_limiter: pressure_fluid = np.min( np.hstack( ( pressure_fluid, np.ones_like(pressure_fluid) * config.flow.stagnation_pressure, ) ), axis=0, ) else: s = np.array(nodeS) pressure_fluid = np.zeros_like(s) tau = np.zeros_like(s) ss = nodeS[1] Pc = 0.0 if self.interpolator is not None: if ss > sMax: Pc = self.interpolator.fluid.upstream_pressure elif ss < sMin: Pc = self.interpolator.fluid.downstream_pressure elif self.cushion_pressure_type == "Total": Pc = config.body.Pc # Store fluid and air pressure components for element (for # plotting) if i == 0: self.fluidS += [s[0]] self.fluidP += [pressure_fluid[0]] self.airS += [nodeS[0]] self.airP += [Pc - self.seal_pressure] self.fluidS += [ss for ss in s[1:]] self.fluidP += [pp for pp in pressure_fluid[1:]] self.airS += [ss for ss in nodeS[1:]] if self.seal_pressure_method.lower() == "hydrostatic": self.airP += [ Pc - self.seal_pressure + config.flow.density * config.flow.gravity * (self.interp_func_y(si) - config.flow.waterline_height) for si in nodeS[1:] ] else: self.airP += [Pc - self.seal_pressure for _ in nodeS[1:]] # Apply ramp to hydrodynamic pressure pressure_fluid *= config.ramp ** 2 # Add external cushion pressure to external fluid pressure pressure_cushion = np.zeros_like(s) Pc = 0.0 for ii, ss in enumerate(s): if self.interpolator is not None: if ss > sMax: Pc = self.interpolator.fluid.upstream_pressure elif ss < sMin: Pc = self.interpolator.fluid.downstream_pressure elif self.cushion_pressure_type == "Total": Pc = config.body.Pc pressure_cushion[ii] = Pc # Calculate internal pressure if self.seal_pressure_method.lower() == "hydrostatic": pressure_internal = ( self.seal_pressure - config.flow.density * config.flow.gravity * ( np.array([self.interp_func_y(si) for si in s]) - config.flow.waterline_height ) ) else: pressure_internal = ( self.seal_pressure * np.ones_like(s) * self.seal_over_pressure_pct ) pressure_external = pressure_fluid + pressure_cushion pressure_total = pressure_external - pressure_internal # Integrate pressure profile, calculate center of pressure and # distribute force to nodes integral = kp.integrate(s, pressure_total) if integral == 0.0: qp = np.zeros(2) else: pct = ( kp.integrate(s, s * pressure_total) / integral - s[0] ) / kp.cumdiff(s) qp = integral * np.array([1 - pct, pct]) integral = kp.integrate(s, tau) if integral == 0.0: qs = np.zeros(2) else: pct = (kp.integrate(s, s * tau) / integral - s[0]) / kp.cumdiff(s) qs = -integral * np.array([1 - pct, pct]) el.set_pressure_and_shear(qp, qs) # Calculate external force and moment for rigid body calculation if ( config.body.cushion_force_method.lower() == "integrated" or config.body.cushion_force_method.lower() == "assumed" ): if config.body.cushion_force_method.lower() == "integrated": integrand = pressure_external elif config.body.cushion_force_method.lower() == "assumed": integrand = pressure_fluid n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t) ] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.D -= kp.integrate(s, np.array(zip(*f)[0])) self.L += kp.integrate(s, np.array(zip(*f)[1])) self.M += kp.integrate(s, np.array(m)) else: if self.interpolator is not None: self.D = self.interpolator.fluid.D self.L = self.interpolator.fluid.L self.M = self.interpolator.fluid.M integrand = pressure_cushion n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.Da -= kp.integrate(s, np.array(list(zip(*f))[0])) self.La += kp.integrate(s, np.array(list(zip(*f))[1])) self.Ma += kp.integrate(s, np.array(m)) def get_normal_vector(self, s): dxds = kp.getDerivative(lambda si: self.get_coordinates(si)[0], s) dyds = kp.getDerivative(lambda si: self.get_coordinates(si)[1], s) return kp.rotateVec(kp.ang2vecd(kp.atand2(dyds, dxds)), -90) def plot_pressure_profiles(self): if self.line_fluid_pressure is not None: self.line_fluid_pressure.set_data( self.get_pressure_plot_points(self.fluidS, self.fluidP) ) if self.line_air_pressure is not None: self.line_air_pressure.set_data( self.get_pressure_plot_points(self.airS, self.airP) ) def get_pressure_plot_points(self, s0, p0): sp = [(s, p) for s, p in zip(s0, p0) if not np.abs(p) < 1e-4] if len(sp) > 0: s0, p0 = list(zip(*sp)) nVec = list(map(self.get_normal_vector, s0)) coords0 = [np.array(self.get_coordinates(s)) for s in s0] coords1 = [ c + config.plotting.pScale * p * n for c, p, n in zip(coords0, p0, nVec) ] return list( zip( *[ xyi for c0, c1 in zip(coords0, coords1) for xyi in [c0, c1, np.ones(2) * np.nan] ] ) ) else: return [], [] def update_geometry(self): self.set_interp_function() def plot(self): for el in self.el: el.plot() # for nd in [self.node[0],self.node[-1]]: # nd.plot() self.plot_pressure_profiles() def set_attachments(self): return None def set_angle(self): return None class FlexibleSubstructure(Substructure): obj = [] res = 0.0 @classmethod def update_all(cls): num_dof = fe.Node.count() * config.flow.num_dim Kg = np.zeros((num_dof, num_dof)) Fg = np.zeros((num_dof, 1)) Ug = np.zeros((num_dof, 1)) # Assemble global matrices for all substructures together for ss in cls.obj: ss.update_fluid_forces() ss.assemble_global_stiffness_and_force() Kg += ss.K Fg += ss.F for nd in fe.Node.All(): for i in range(2): Fg[nd.dof[i]] += nd.fixed_load[i] # Determine fixed degrees of freedom dof = [False for _ in Fg] for nd in fe.Node.All(): for dofi, fdofi in zip(nd.dof, nd.is_dof_fixed): dof[dofi] = not fdofi # Solve FEM linear matrix equation if any(dof): Ug[np.ix_(dof)] = np.linalg.solve(Kg[np.ix_(dof, dof)], Fg[np.ix_(dof)]) cls.res = np.max(np.abs(Ug)) Ug *= config.relax_FEM Ug *= np.min([config.max_FEM_disp / np.max(Ug), 1.0]) for nd in fe.Node.All(): nd.move_coordinates(Ug[nd.dof[0], 0], Ug[nd.dof[1], 0]) for ss in cls.obj: ss.update_geometry() def __init__(self, dict_): # FlexibleSubstructure.obj.append(self) Substructure.__init__(self, dict_) self.element_type = fe.TrussElement self.pretension = self.dict_.read("pretension", -0.5) self.EA = self.dict_.read("EA", 5e7) self.K = None self.F = None # self.U = None config.has_free_structure = True def get_residual(self): return np.max(np.abs(self.U)) def initialize_matrices(self): num_dof = fe.Node.count() * config.flow.num_dim self.K = np.zeros((num_dof, num_dof)) self.F = np.zeros((num_dof, 1)) self.U = np.zeros((num_dof, 1)) def assemble_global_stiffness_and_force(self): if self.K is None: self.initialize_matrices() else: self.K *= 0 self.F *= 0 for el in self.el: self.add_loads_from_element(el) def add_loads_from_element(self, el): K, F = el.get_stiffness_and_force() self.K[np.ix_(el.dof, el.dof)] += K self.F[np.ix_(el.dof)] += F # def getPtDispFEM(self): # if self.K is None: # self.initializeMatrices() ## self.U *= 0.0 # self.update_fluid_forces() # self.assembleGlobalStiffnessAndForce() # # dof = [False for dof in self.F] # for nd in self.node: # for dofi, fdofi in zip(nd.dof, nd.fixedDOF): # dof[dofi] = not fdofi # if any(dof): ## self.U[np.ix_(dof)] = np.linalg.solve(self.K[np.ix_(dof,dof)], self.F[np.ix_(dof)]) # # # Relax displacement and limit step if necessary # self.U *= config.relaxFEM # self.U *= np.min([config.maxFEMDisp / np.max(self.U), 1.0]) # # for nd in self.node: # nd.moveCoordinates(self.U[nd.dof[0],0], self.U[nd.dof[1],0]) # # self.update_geometry() def set_element_properties(self): Substructure.set_element_properties(self) for el in self.el: el.set_properties(axialForce=-self.pretension, EA=self.EA) def update_geometry(self): for el in self.el: el.update_geometry() Substructure.set_interp_function(self) class RigidSubstructure(Substructure): def __init__(self, dict_): Substructure.__init__(self, dict_) self.element_type = fe.RigidElement def set_attachments(self): return None def updateAngle(self): return None def set_fixed_dof(self): for nd in self.node: for j in range(config.flow.num_dim): nd.is_dof_fixed[j] = True class TorsionalSpringSubstructure(FlexibleSubstructure, RigidSubstructure): def __init__(self, dict_): FlexibleSubstructure.__init__(self, dict_) self.element_type = fe.RigidElement self.tip_load_pct = self.dict_.read("tipLoadPct", 0.0) self.base_pt_pct = self.dict_.read("basePtPct", 1.0) self.spring_constant = self.dict_.read("spring_constant", 1000.0) self.theta = 0.0 self.Mt = 0.0 # TODO self.MOld = None self.relax = self.dict_.read("relaxAng", config.body.relax_rigid_body) self.attach_pct = self.dict_.read("attachPct", 0.0) self.attached_node = None self.attached_element = None self.minimum_angle = self.dict_.read("minimumAngle", -float("Inf")) self.max_angle_step = self.dict_.read("maxAngleStep", float("Inf")) config.has_free_structure = True def load_mesh(self): Substructure.load_mesh(self) self.set_fixed_dof() if self.base_pt_pct == 1.0: self.base_pt = self.node[-1].get_coordinates() elif self.base_pt_pct == 0.0: self.base_pt = self.node[0].get_coordinates() else: self.base_pt = self.get_coordinates( self.base_pt_pct * self.get_arc_length() ) self.set_element_properties() self.set_angle(self.dict_.read("initialAngle", 0.0)) def set_attachments(self): attached_substructure_name = self.dict_.read("attachedSubstructure", None) if attached_substructure_name is not None: self.attached_substructure = Substructure.find_by_name( attached_substructure_name ) else: self.attached_substructure = None if self.dict_.read("attachedSubstructureEnd", "End").lower() == "start": self.attached_ind = 0 else: self.attached_ind = -1 if self.attached_node is None and self.attached_substructure is not None: self.attached_node = self.attached_substructure.node[self.attached_ind] self.attached_element = self.attached_substructure.el[self.attached_ind] def update_fluid_forces(self): self.fluidS = [] self.fluidP = [] self.airS = [] self.airP = [] self.D = 0.0 self.L = 0.0 self.M = 0.0 self.Dt = 0.0 self.Lt = 0.0 self.Mt = 0.0 self.Da = 0.0 self.La = 0.0 self.Ma = 0.0 if self.interpolator is not None: s_min, s_max = self.interpolator.get_min_max_s() for i, el in enumerate(self.el): # Get pressure at end points and all fluid points along element node_s = [self.node_arc_length[i], self.node_arc_length[i + 1]] if self.interpolator is not None: s, pressure_fluid, tau = self.interpolator.get_loads_in_range( node_s[0], node_s[1] ) # Limit pressure to be below stagnation pressure if config.plotting.pressure_limiter: pressure_fluid = np.min( np.hstack( ( pressure_fluid, np.ones_like(pressure_fluid) * config.flow.stagnation_pressure, ) ), axis=0, ) else: s = np.array(node_s) pressure_fluid = np.zeros_like(s) tau = np.zeros_like(s) ss = node_s[1] Pc = 0.0 if self.interpolator is not None: if ss > s_max: Pc = self.interpolator.fluid.get_upstream_pressure() elif ss < s_min: Pc = self.interpolator.fluid.get_downstream_pressure() elif self.cushion_pressure_type == "Total": Pc = config.body.Pc # Store fluid and air pressure components for element (for # plotting) if i == 0: self.fluidS += [s[0]] self.fluidP += [pressure_fluid[0]] self.airS += [node_s[0]] self.airP += [Pc - self.seal_pressure] self.fluidS += [ss for ss in s[1:]] self.fluidP += [pp for pp in pressure_fluid[1:]] self.airS += [ss for ss in node_s[1:]] self.airP += [Pc - self.seal_pressure for _ in node_s[1:]] # Apply ramp to hydrodynamic pressure pressure_fluid *= config.ramp ** 2 # Add external cushion pressure to external fluid pressure pressure_cushion = np.zeros_like(s) Pc = 0.0 for ii, ss in enumerate(s): if self.interpolator is not None: if ss > s_max: Pc = self.interpolator.fluid.get_upstream_pressure() elif ss < s_min: Pc = self.interpolator.fluid.get_downstream_pressure() elif self.cushion_pressure_type == "Total": Pc = config.body.Pc pressure_cushion[ii] = Pc pressure_internal = self.seal_pressure * np.ones_like(s) pressure_external = pressure_fluid + pressure_cushion pressure_total = pressure_external - pressure_internal # Calculate external force and moment for rigid body calculation if ( config.body.cushion_force_method.lower() == "integrated" or config.body.cushion_force_method.lower() == "assumed" ): if config.body.cushion_force_method.lower() == "integrated": integrand = pressure_external elif config.body.cushion_force_method.lower() == "assumed": integrand = pressure_fluid n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] fC = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(pressure_external, tau, n, t) ] fFl = [ -pi * ni + taui * ti for pi, taui, ni, ti in zip(pressure_fluid, tau, n, t) ] f = fC + fFl print( ("Cushion Lift-to-Weight: {0}".format(fC[1] / config.body.weight)) ) r = [ np.array([pt[0] - config.body.xCofR, pt[1] - config.body.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.D -= kp.integrate(s, np.array(zip(*f)[0])) self.L += kp.integrate(s, np.array(zip(*f)[1])) self.M += kp.integrate(s, np.array(m)) else: if self.interpolator is not None: self.D = self.interpolator.fluid.D self.L = self.interpolator.fluid.L self.M = self.interpolator.fluid.M # Apply pressure loading for moment calculation # integrand = pFl integrand = pressure_total n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.base_pt[0], pt[1] - self.base_pt[1]]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] fx, fy = list(zip(*f)) self.Dt += kp.integrate(s, np.array(fx)) self.Lt += kp.integrate(s, np.array(fy)) self.Mt += kp.integrate(s, np.array(m)) integrand = pressure_cushion n = list(map(self.get_normal_vector, s)) t = [kp.rotateVec(ni, -90) for ni in n] f = [-pi * ni + taui * ti for pi, taui, ni, ti in zip(integrand, tau, n, t)] r = [ np.array([pt[0] - self.parent.xCofR, pt[1] - self.parent.yCofR]) for pt in map(self.get_coordinates, s) ] m = [kp.cross2(ri, fi) for ri, fi in zip(r, f)] self.Da -= kp.integrate(s, np.array(zip(*f)[0])) self.La += kp.integrate(s, np.array(zip(*f)[1])) self.Ma += kp.integrate(s, np.array(m)) # Apply tip load tipC = self.get_coordinates(self.tip_load_pct * self.get_arc_length()) tipR = np.array([tipC[i] - self.base_pt[i] for i in [0, 1]]) tipF = np.array([0.0, self.tip_load]) * config.ramp tipM = kp.cross2(tipR, tipF) self.Lt += tipF[1] self.Mt += tipM # Apply moment from attached substructure # el = self.attachedEl # attC = self.attachedNode.get_coordinates() # attR = np.array([attC[i] - self.basePt[i] for i in [0,1]]) # attF = el.axialForce * kp.ang2vec(el.gamma + 180) # attM = kp.cross2(attR, attF) * config.ramp ## attM = np.min([np.abs(attM), np.abs(self.Mt)]) * kp.sign(attM) # if np.abs(attM) > 2 * np.abs(tipM): ## attM = attM * np.abs(tipM) / np.abs(attM) # self.Mt += attM def updateAngle(self): if np.isnan(self.Mt): theta = 0.0 else: theta = -self.Mt if not self.spring_constant == 0.0: theta /= self.spring_constant dTheta = (theta - self.theta) * self.relax dTheta = np.min([np.abs(dTheta), self.max_angle_step]) * np.sign(dTheta) self.set_angle(self.theta + dTheta) def set_angle(self, ang): dTheta = np.max([ang, self.minimum_angle]) - self.theta if self.attached_node is not None and not any( [nd == self.attached_node for nd in self.node] ): attNd = [self.attached_node] else: attNd = [] # basePt = np.array([c for c in self.basePt]) basePt = np.array([c for c in self.node[-1].get_coordinates()]) for nd in self.node + attNd: oldPt = np.array([c for c in nd.get_coordinates()]) newPt = kp.rotatePt(oldPt, basePt, -dTheta) nd.set_coordinates(newPt[0], newPt[1]) self.theta += dTheta self.residual = dTheta self.update_geometry() print((" Deformation for substructure {0}: {1}".format(self.name, self.theta))) def get_residual(self): return self.residual # return self.theta + self.Mt / self.spring_constant def writeDeformation(self): kp.writeasdict( os.path.join( config.it_dir, "deformation_{0}.{1}".format(self.name, config.io.data_format), ), ["angle", self.theta], ) PK!planingfsi/fsi/__init__.pyPK!'&1&1planingfsi/fsi/figure.pyimport os import numpy as np import matplotlib.pyplot as plt import planingfsi.config as config # import planingfsi.krampy as kp class FSIFigure: def __init__(self, solid, fluid): self.solid = solid self.fluid = fluid plt.figure(figsize=(16, 12)) if config.plotting.watch: plt.ion() self.geometryAx = plt.axes([0.05, 0.6, 0.9, 0.35]) for nd in self.solid.node: nd.line_x_yline_xy, = plt.plot([], [], "ro") self.fluid.lineFS, = plt.plot([], [], "b-") self.fluid.lineFSi, = plt.plot([], [], "b--") for struct in self.solid.substructure: struct.line_air_pressure, = plt.plot([], [], "g-") struct.line_fluid_pressure, = plt.plot([], [], "r-") for el in struct.el: el.lineEl0, = plt.plot([], [], "k--") el.lineEl, = plt.plot([], [], "k-", linewidth=2) self.lineCofR = [] for bd in self.solid.rigid_body: CofRPlot( self.geometryAx, bd, symbol=False, style="k--", marker="s", fill=False ) self.lineCofR.append( CofRPlot(self.geometryAx, bd, symbol=False, style="k-", marker="o") ) xMin, xMax = kp.minMax( [nd.x for struct in self.solid.substructure for nd in struct.node] ) yMin, yMax = kp.minMax( [nd.y for struct in self.solid.substructure for nd in struct.node] ) if config.plotting.xmin is not None: xMin = config.plotting.xmin config.plotting.ext_w = 0.0 if config.plotting.xmax is not None: xMax = config.plotting.xmax config.plotting.ext_e = 0.0 if config.plotting.ymin is not None: yMin = config.plotting.ymin config.plotting.ext_s = 0.0 if config.plotting.ymax is not None: yMax = config.plotting.ymax config.plotting.ext_n = 0.0 plt.xlabel(r"$x$ [m]", fontsize=22) plt.ylabel(r"$y$ [m]", fontsize=22) plt.xlim( [ xMin - (xMax - xMin) * config.plotting.ext_w, xMax + (xMax - xMin) * config.plotting.ext_e, ] ) plt.ylim( [ yMin - (yMax - yMin) * config.plotting.ext_s, yMax + (yMax - yMin) * config.plotting.ext_n, ] ) plt.gca().set_aspect("equal") self.TXT = plt.text( 0.05, 0.95, "", ha="left", va="top", transform=plt.gca().transAxes ) self.subplot = [] bd = [bd for bd in self.solid.rigid_body] if len(bd) > 0: self.subplot.append(ForceSubplot([0.70, 0.30, 0.25, 0.2], bd[0])) self.subplot.append(MotionSubplot([0.70, 0.05, 0.25, 0.2], bd[0])) if len(bd) > 1: self.subplot.append(ForceSubplot([0.05, 0.30, 0.25, 0.2], bd[1])) self.subplot.append(MotionSubplot([0.05, 0.05, 0.25, 0.2], bd[1])) self.subplot.append(ResidualSubplot([0.40, 0.05, 0.25, 0.45], self.solid)) def update(self): self.solid.plot() self.fluid.plot_free_surface() self.TXT.set_text( ( r"Iteration {0}" + "\n" + r"$Fr={1:>8.3f}$" + "\n" + r"$\bar{{P_c}}={2:>8.3f}$" ).format(config.it, config.flow.froude_num, config.body.PcBar) ) # Update each lower subplot for s in self.subplot: s.update( self.solid.res < config.solver.max_residual and config.it > config.solver.num_ramp_it ) for l in self.lineCofR: l.update() plt.draw() if config.plotting.save: self.save() def writeTimeHistories(self): for s in self.subplot: if isinstance(s, TimeHistory): s.write() def save(self): plt.savefig( os.path.join( config.path.fig_dir_name, "frame{1:04d}.{0}".format(config.plotting.fig_format, config.it), ), format=config.plotting.fig_format, ) # , dpi=300) def show(self): plt.show(block=True) class PlotSeries: def __init__(self, xFunc, yFunc, **kwargs): self.x = [] self.y = [] self.additionalSeries = [] self.getX = xFunc self.getY = yFunc self.seriesType = kwargs.get("type", "point") self.ax = kwargs.get("ax", plt.gca()) self.legEnt = kwargs.get("legEnt", None) self.ignoreFirst = kwargs.get("ignoreFirst", False) self.line, = self.ax.plot([], [], kwargs.get("sty", "k-")) if self.seriesType == "history+curr": self.additionalSeries.append( PlotSeries( xFunc, yFunc, ax=self.ax, type="point", sty=kwargs.get("currSty", "ro"), ) ) def update(self, final=False): if self.seriesType == "point": self.x = self.getX() self.y = self.getY() if final: self.line.set_marker("*") self.line.set_markerfacecolor("y") self.line.set_markersize(10) else: if self.ignoreFirst and self.getX() == 0: self.x.append(np.nan) self.y.append(np.nan) else: self.x.append(self.getX()) self.y.append(self.getY()) self.line.set_data(self.x, self.y) for s in self.additionalSeries: s.update(final) class TimeHistory: def __init__(self, pos, name="default"): self.series = [] self.ax = [] self.title = "" self.xlabel = "" self.ylabel = "" self.name = name self.addAxes(plt.axes(pos)) def addAxes(self, ax): self.ax.append(ax) def addYAxes(self): self.addAxes(plt.twinx(self.ax[0])) def addSeries(self, series): self.series.append(series) return series def setProperties(self, axInd=0, **kwargs): plt.setp(self.ax[axInd], **kwargs) def createLegend(self, axInd=0): line, name = list( zip(*[(s.line, s.legEnt) for s in self.series if s.legEnt is not None]) ) self.ax[axInd].legend(line, name, loc="lower left") def update(self, final=False): for s in self.series: s.update(final) for ax in self.ax: xMin, xMax = 0.0, config.it + 5 yMin = np.nan for i, l in enumerate(ax.get_lines()): y = l.get_data()[1] if len(np.shape(y)) == 0: y = np.array([y]) try: y = y[np.ix_(~np.isnan(y))] except: y = [] if not np.shape(y)[0] == 0: yMinL = np.min(y) - 0.02 yMaxL = np.max(y) + 0.02 if i == 0 or np.isnan(yMin): yMin, yMax = yMinL, yMaxL else: yMin = np.min([yMin, yMinL]) yMax = np.max([yMax, yMaxL]) ax.set_xlim([xMin, xMax]) if ~np.isnan(yMin) and ~np.isnan(yMax): ax.set_ylim([yMin, yMax]) def write(self): ff = open("{0}_timeHistories.txt".format(self.name), "w") x = self.series[0].x y = list(zip(*[s.y for s in self.series])) for xi, yi in zip(x, y): ff.write("{0:4.0f}".format(xi)) for yii in yi: ff.write(" {0:8.6e}".format(yii)) ff.write("\n") ff.close() class MotionSubplot(TimeHistory): def __init__(self, pos, body): TimeHistory.__init__(self, pos, body.name) itFunc = lambda: config.it drftFunc = lambda: body.draft trimFunc = lambda: body.trim self.addYAxes() # Add plot series to appropriate axis self.addSeries( PlotSeries( itFunc, drftFunc, ax=self.ax[0], sty="b-", type="history+curr", legEnt="Draft", ) ) self.addSeries( PlotSeries( itFunc, trimFunc, ax=self.ax[1], sty="r-", type="history+curr", legEnt="Trim", ) ) self.setProperties( title=r"Motion History: {0}".format(body.name), xlabel=r"Iteration", ylabel=r"$d$ [m]", ) self.setProperties(1, ylabel=r"$\theta$ [deg]") self.createLegend() class ForceSubplot(TimeHistory): def __init__(self, pos, body): TimeHistory.__init__(self, pos, body.name) itFunc = lambda: config.it liftFunc = lambda: body.L / body.weight dragFunc = lambda: body.D / body.weight momFunc = lambda: body.M / (body.weight * config.body.reference_length) self.addSeries( PlotSeries( itFunc, liftFunc, sty="r-", type="history+curr", legEnt="Lift", ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, dragFunc, sty="b-", type="history+curr", legEnt="Drag", ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, momFunc, sty="g-", type="history+curr", legEnt="Moment", ignoreFirst=True, ) ) self.setProperties( title=r"Force & Moment History: {0}".format(body.name), # xlabel=r'Iteration', \ ylabel=r"$\mathcal{D}/W$, $\mathcal{L}/W$, $\mathcal{M}/WL_c$", ) self.createLegend() class ResidualSubplot(TimeHistory): def __init__(self, pos, solid): TimeHistory.__init__(self, pos, "residuals") itFunc = lambda: config.it col = ["r", "b", "g"] for bd, coli in zip(solid.rigid_body, col): self.addSeries( PlotSeries( itFunc, bd.get_res_lift, sty="{0}-".format(coli), type="history+curr", legEnt="ResL: {0}".format(bd.name), ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, bd.get_res_moment, sty="{0}--".format(coli), type="history+curr", legEnt="ResM: {0}".format(bd.name), ignoreFirst=True, ) ) self.addSeries( PlotSeries( itFunc, lambda: np.abs(solid.res), sty="k-", type="history+curr", legEnt="Total", ignoreFirst=True, ) ) self.setProperties(title=r"Residual History", yscale="log") self.createLegend() class CofRPlot: def __init__(self, ax, body, **kwargs): self.ax = ax self.body = body self.symbol = kwargs.get("symbol", True) style = kwargs.get("style", "k-") fill = kwargs.get("fill", True) marker = kwargs.get("marker", "o") if not fill: fcol = "w" else: fcol = "k" self.line, = self.ax.plot([], [], style) self.lineCofG, = self.ax.plot( [], [], "k" + marker, markersize=8, markerfacecolor=fcol ) self.update() def update(self): l = config.plotting.CofR_grid_len c = np.array([self.body.xCofR, self.body.yCofR]) hvec = kp.rotateVec(np.array([0.5 * l, 0.0]), self.body.trim) vvec = kp.rotateVec(np.array([0.0, 0.5 * l]), self.body.trim) pts = [c - hvec, c + hvec, np.ones(2) * np.nan, c - vvec, c + vvec] self.line.set_data(list(zip(*pts))) self.lineCofG.set_data(self.body.xCofG, self.body.yCofG) PK!T planingfsi/fsi/interpolator.pyimport numpy as np from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp class Interpolator: def __init__(self, solid, fluid, dict_=""): self.solid = solid self.fluid = fluid self.solid.set_interpolator(self) self.fluid.interpolator = self self.solidPositionFunction = None self.fluidPressureFunction = None self.getBodyHeight = self.getSurfaceHeightFixedX self.sSep = [] self.sImm = [] if isinstance(dict_, str): dict_ = kp.Dictionary(dict_) # self.Dict = dict_ self.sSepPctStart = dict_.read("sSepPctStart", 0.5) self.sImmPctStart = dict_.read("sImmPctStart", 0.9) def setBodyHeightFunction(self, funcName): exec("self.getBodyHeight = self.{0}".format(funcName)) def set_solid_position_function(self, func): self.solidPositionFunction = func def set_fluid_pressure_function(self, func): self.fluidPressureFunction = func def getSurfaceHeightFixedX(self, x): s = np.max([self.getSFixedX(x, 0.5), 0.0]) return self.get_coordinates(s)[1] def get_coordinates(self, s): return self.solidPositionFunction(s) def get_min_max_s(self): pts = self.fluid._get_element_coords() return [self.getSFixedX(x) for x in [pts[0], pts[-1]]] def getSFixedX(self, fixedX, soPct=0.5): return kp.fzero( lambda s: self.get_coordinates(s)[0] - fixedX, soPct * self.solid.get_arc_length(), ) def getSFixedY(self, fixedY, soPct): return kp.fzero( lambda s: self.get_coordinates(s)[1] - fixedY, soPct * self.solid.get_arc_length(), ) @property def immersed_length(self): if self.sImm == []: self.sImm = self.sImmPctStart * self.solid.get_arc_length() self.sImm = kp.fzero( lambda s: self.get_coordinates(s)[1] - config.flow.waterline_height, self.sImm, ) return self.get_coordinates(self.sImm)[0] def get_separation_point(self): def yCoord(s): return self.get_coordinates(s[0])[1] if self.sSep == []: self.sSep = self.sSepPctStart * self.solid.get_arc_length() self.sSep = fmin(yCoord, np.array([self.sSep]), disp=False, xtol=1e-6)[0] self.sSep = np.max([self.sSep, 0.0]) return self.get_coordinates(self.sSep) def get_loads_in_range(self, s0, s1): x = [self.solidPositionFunction(s)[0] for s in [s0, s1]] x, p, tau = self.fluidPressureFunction(x[0], x[1]) s = np.array([self.getSFixedX(xx, 0.5) for xx in x]) return s, p, tau PK!,!!planingfsi/fsi/simulation.pyimport os import numpy as np import planingfsi.config as config # import planingfsi.io # import planingfsi.krampy as kp # from planingfsi import krampy, io from planingfsi.fe.structure import FEStructure from planingfsi.fsi.interpolator import Interpolator from planingfsi.potentialflow.solver import PotentialPlaningSolver from .figure import FSIFigure class Simulation(object): """Simulation object to manage the FSI problem. Handles the iteration between the fluid and solid solvers. Attributes ---------- solid_solver : FEStructure Solid solver. fluid_solver : PotentialPlaningSolver Fluid solver. """ def __init__(self): # Create solid and fluid solver objects self.solid_solver = FEStructure() self.fluid_solver = PotentialPlaningSolver() # def setFluidPressureFunc(self, func): # self.fluidPressureFunc = func # # def setSolidPositionFunc(self, func): # self.solidPositionFunc = func def update_fluid_response(self): self.fluid_solver.calculate_response() self.solid_solver.update_fluid_forces() def update_solid_response(self): self.solid_solver.calculate_response() def create_dirs(self): config.it_dir = os.path.join(config.path.case_dir, "{0}".format(config.it)) config.path.fig_dir_name = os.path.join( config.path.case_dir, config.path.fig_dir_name ) if self.check_output_interval() and not config.io.results_from_file: kp.createIfNotExist(config.path.case_dir) kp.createIfNotExist(config.it_dir) if config.plotting.save: kp.createIfNotExist(config.path.fig_dir_name) if config.it == 0: for f in os.listdir(config.path.fig_dir_name): os.remove(os.path.join(config.path.fig_dir_name, f)) def load_input_files(self): # Add all rigid bodies if os.path.exists(config.path.body_dict_dir): for dict_name in krampy.listdir_nohidden(config.path.body_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.body_dict_dir, dict_name) ) self.solid_solver.add_rigid_body(dict_) else: self.solid_solver.add_rigid_body() # Add all substructures for dict_name in krampy.listdir_nohidden(config.path.input_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.input_dict_dir, dict_name) ) substructure = self.solid_solver.add_substructure(dict_) if dict_.read("hasPlaningSurface", False): planing_surface = self.fluid_solver.add_planing_surface(dict_) interpolator = Interpolator(substructure, planing_surface, dict_) interpolator.set_solid_position_function(substructure.get_coordinates) interpolator.set_fluid_pressure_function( planing_surface.get_loads_in_range ) # Add all pressure cushions if os.path.exists(config.path.cushion_dict_dir): for dict_name in krampy.listdir_nohidden(config.path.cushion_dict_dir): dict_ = planingfsi.io.Dictionary( os.path.join(config.path.cushion_dict_dir, dict_name) ) self.fluid_solver.add_pressure_cushion(dict_) def run(self): """Run the fluid-structure interaction simulation by iterating between the fluid and solid solvers. """ config.it = 0 self.solid_solver.load_mesh() if config.io.results_from_file: self.create_dirs() self.apply_ramp() self.it_dirs = kp.sortDirByNum(kp.find_files("[0-9]*"))[1] if config.plotting.plot_any: self.figure = FSIFigure(self.solid_solver, self.fluid_solver) # Initialize body at specified trim and draft self.solid_solver.initialize_rigid_bodies() self.update_fluid_response() # Iterate between solid and fluid solvers until equilibrium while config.it <= config.solver.num_ramp_it or ( self.get_residual() >= config.solver.max_residual and config.it <= config.solver.max_it ): # Calculate response if config.has_free_structure: self.apply_ramp() self.update_solid_response() self.update_fluid_response() self.solid_solver.get_residual() else: self.solid_solver.res = 0.0 # Write, print, and plot results self.create_dirs() self.write_results() self.print_status() self.update_figure() # Increment iteration count self.increment() if config.plotting.save and not config.plotting.fig_format == "png": config.plotting.fig_format = "png" self.figure.save() if config.io.write_time_histories: self.figure.write_time_histories() print("Execution complete") if config.plotting.show: self.figure.show() def increment(self): if config.io.results_from_file: old_ind = np.nonzero(config.it == self.it_dirs)[0][0] if not old_ind == len(self.it_dirs) - 1: config.it = int(self.it_dirs[old_ind + 1]) else: config.it = config.max_it + 1 self.create_dirs() else: config.it += 1 def update_figure(self): if config.plotting.plot_any: self.figure.update() def get_body_res(self, x): self.solid_solver.get_pt_disp_rb(x[0], x[1]) self.solid_solver.update_nodal_positions() self.update_fluid_response() # Write, print, and plot results self.create_dirs() self.write_results() self.print_status() self.update_figure() # Update iteration number depending on whether loading existing or # simply incrementing by 1 if config.io.results_from_file: if config.it < len(self.it_dirs) - 1: config.it = int(self.it_dirs[config.it + 1]) else: config.it = config.max_it it += 1 else: config.it += 1 config.res_l = self.solid_solver.rigid_body[0].get_res_l() config.res_m = self.solid_solver.rigid_body[0].get_res_moment() print("Rigid Body Residuals:") print((" Lift: {0:0.4e}".format(config.res_l))) print((" Moment: {0:0.4e}\n".format(config.res_m))) return np.array([config.res_l, config.res_m]) def apply_ramp(self): if config.io.results_from_file: self.loadResults() else: if config.solver.num_ramp_it == 0: ramp = 1.0 else: ramp = np.min((config.it / float(config.solver.num_ramp_it), 1.0)) config.ramp = ramp config.relax_FEM = ( 1 - ramp ) * config.solver.relax_initial + ramp * config.solver.relax_final def get_residual(self): if config.io.results_from_file: return 1.0 return kp.Dictionary( os.path.join(config.it_dir, "overallQuantities.txt") ).read("Residual", 0.0) else: return self.solid_solver.res def print_status(self): print( "Residual after iteration {1:>4d}: {0:5.3e}".format( self.get_residual(), config.it ) ) def check_output_interval(self): return ( config.it >= config.solver.max_it or self.get_residual() < config.solver.max_residual or np.mod(config.it, config.io.write_interval) == 0 ) def write_results(self): if self.check_output_interval() and not config.io.results_from_file: kp.writeasdict( os.path.join(config.it_dir, "overallQuantities.txt"), ["Ramp", config.ramp], ["Residual", self.solid_solver.res], ) self.fluid_solver.write_results() self.solid_solver.write_results() def load_results(self): K = kp.Dictionary(os.path.join(config.it_dir, "overallQuantities.txt")) config.ramp = K.read("Ramp", 0.0) self.solid_solver.res = K.read("Residual", 0.0) PK!$planingfsi/potentialflow/__init__.pyPK!Y55+planingfsi/potentialflow/pressureelement.py"""Module containing definitions of different types of pressure element.""" import numpy as np from scipy.special import sici # TODO: Move all plotting commands to its own module try: import matplotlib.pyplot as plt except ImportError: plt = None pass import planingfsi.config as config # import planingfsi.krampy as kp _PLOT_INFINITY = 50.0 def _get_aux_fg(lam): """Return f and g functions, which are dependent on the auxiliary sine and cosine functions. Combined into one function to reduce number of calls to scipy.special.sici(). """ lam = abs(lam) (aux_sine, aux_cosine) = sici(lam) (sine, cosine) = np.sin(lam), np.cos(lam) return ( cosine * (0.5 * np.pi - aux_sine) + sine * aux_cosine, sine * (0.5 * np.pi - aux_sine) - cosine * aux_cosine, ) def _get_gamma1(lam, aux_g): """Return result of first integral (Gamma_1 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_g : float Second result of `_get_aux_fg` Returns ------- float """ if lam > 0: return (aux_g + np.log(lam)) / np.pi else: return (aux_g + np.log(-lam)) / np.pi + (2 * np.sin(lam) - lam) def _get_gamma2(lam, aux_f): """Return result of second integral (Gamma_2 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_f : float First result of `_get_aux_fg` Returns ------- float """ return kp.sign(lam) * aux_f / np.pi + kp.heaviside(-lam) * (2 * np.cos(lam) - 1) def _get_gamma3(lam, aux_f): """Return result of third integral (Gamma_3 in my thesis). Args ---- lam : float Non-dimensional x-position. aux_f : float First result of `_get_aux_fg` Returns ------- float """ return ( -kp.sign(lam) * aux_f / np.pi - 2 * kp.heaviside(-lam) * np.cos(lam) - kp.heaviside(lam) ) def _eval_left_right(f, x, dx=1e-6): """Evaluate a function by taking the average of the values just above and below the x value. Used to avoid singularity/Inf/NaN. Args ---- f : function Function handle. x : float Argument of function. dx : float, optional Step size. Returns ------ float : Function value. """ return (f(x + dx) + f(x - dx)) / 2 class PressureElement(object): """Abstract base class to represent all different types of pressure elements. Attributes ---------- x_coord : float x-location of element. z_coord : float z-coordinate of element, if on body. pressure : float The pressure/strength of the element. shear_stress : float Shear stress at the element. width : float Width of element. is_source : bool True of element is a source. is_on_body : bool True if on body. Used for force calculation. parent : PressurePatch Pressure patch that this element belongs to. """ def __init__( self, x_coord=np.nan, z_coord=np.nan, pressure=np.nan, shear_stress=0.0, width=np.nan, is_source=False, is_on_body=False, parent=None, ): # TODO: Replace x_coord with coord pointer to Coordinate object, # (e.g. self.coords.x). self.x_coord = x_coord self.z_coord = z_coord self._pressure = pressure self.shear_stress = shear_stress self._width = np.zeros(2) self.width = width self.is_source = is_source self.is_on_body = is_on_body self.parent = parent @property def width(self): return sum(self._width) @property def pressure(self): return self._pressure @pressure.setter def pressure(self, value): self._pressure = value def get_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient of element. Args ---- x_coord : float Dimensional x-coordinate. """ x_rel = x_coord - self.x_coord if self.width == 0.0: return 0.0 elif x_rel == 0.0 or x_rel == self._width[1] or x_rel == -self._width[0]: influence = _eval_left_right(self._get_local_influence_coefficient, x_rel) else: influence = self._get_local_influence_coefficient(x_rel) return influence / (config.flow.density * config.flow.gravity) def get_influence(self, x_coord): """Return _get_local_influence_coefficient for actual pressure. Args ---- x_coord : float Dimensional x-coordinate. """ return self.get_influence_coefficient(x_coord) * self.pressure def _get_local_influence_coefficient(self, x_coord): """Return influence coefficient in iso-geometric coordinates. Each subclass must implement its own method. Args ---- x_coord : float x-coordinate, centered at element location. """ pass def __repr__(self): """Print element attributes.""" string = ( "{0}: (x,z) = ({1}, {2}), width = {3}," "is_source = {4}, p = {5}, is_on_body = {6}" ) return string.format( self.__class__.__name__, self.x_coord, self.z_coord, self.width, self.is_source, self.pressure, self.is_on_body, ) def plot(self, x_coords, color="b"): """Plot pressure element shape.""" _pressure = np.array([self.pressure(xi) for xi in x_coords]) plt.plot(x_coords, _pressure, color=color, linetype="-") plt.plot( self.x_coord * np.ones(2), [0.0, self.pressure], color=color, linetype="--" ) class AftHalfTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile but towards aft direction. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): self._width[0] = width @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel < -self.width or _x_rel > 0.0: return 0.0 else: return self._pressure * (1 + _x_rel / self.width) def _get_local_influence_coefficient(self, x_rel): lambda_0 = config.flow.k0 * x_rel aux_f, aux_g = _get_aux_fg(lambda_0) _influence = _get_gamma1(lambda_0, aux_g) / (self.width * config.flow.k0) _influence += _get_gamma2(lambda_0, aux_f) lambda_2 = config.flow.k0 * (x_rel + self.width) __, aux_g = _get_aux_fg(lambda_2) _influence -= _get_gamma1(lambda_2, aux_g) / (self.width * config.flow.k0) return _influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord - np.array([self.width, 0]) super().plot(x_coords, color="g") class ForwardHalfTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile but towards forward direction. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): self._width[1] = width @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel > self.width or _x_rel < 0.0: return 0.0 else: return self._pressure * (1 - _x_rel / self.width) def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, aux_g = _get_aux_fg(lambda_0) influence = _get_gamma1(lambda_0, aux_g) / (self.width * config.flow.k0) influence -= _get_gamma2(lambda_0, aux_f) lambda_1 = config.flow.k0 * (x_coord - self.width) _, aux_g = _get_aux_fg(lambda_1) influence -= _get_gamma1(lambda_1, aux_g) / (self.width * config.flow.k0) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([0, self.width]) super().plot(x_coords, color="b") class CompleteTriangularPressureElement(PressureElement): """Pressure element that is triangular in profile towards both directions. Args ---- **kwargs Same as PressureElement arguments, with the following defaults overridden: is_on_body : True """ def __init__(self, **kwargs): kwargs["is_on_body"] = kwargs.get("is_on_body", True) kwargs["width"] = kwargs.get("width", np.zeros(2)) super().__init__(**kwargs) @PressureElement.width.setter def width(self, width): if not len(width) == 2: raise ValueError("Width must be length-two array") else: self._width = np.array(width) @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel > self._width[1] or _x_rel < -self._width[0]: return 0.0 elif _x_rel < 0.0: return self._pressure * (1 + _x_rel / self._width[0]) else: return self._pressure * (1 - _x_rel / self._width[1]) def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord _, aux_g = _get_aux_fg(lambda_0) influence = ( _get_gamma1(lambda_0, aux_g) * self.width / (self._width[1] * self._width[0]) ) lambda_1 = config.flow.k0 * (x_coord - self._width[1]) _, aux_g = _get_aux_fg(lambda_1) influence -= _get_gamma1(lambda_1, aux_g) / self._width[1] lambda_2 = config.flow.k0 * (x_coord + self._width[0]) _, aux_g = _get_aux_fg(lambda_2) influence -= _get_gamma1(lambda_2, aux_g) / self._width[0] return influence / config.flow.k0 def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([-self._width[0], 0.0, self._width[1]]) super().plot(x_coords, color="r") class AftSemiInfinitePressureBand(PressureElement): """Semi-infinite pressure band in aft direction. Args ---- **kwargs Same as PressureElement arguments """ @PressureElement.pressure.getter def pressure(self, x_coord=0.0): """Return shape of pressure profile.""" x_rel = x_coord - self.x_coord if x_rel > 0.0: return 0.0 else: return self._pressure def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, _ = _get_aux_fg(lambda_0) influence = _get_gamma2(lambda_0, aux_f) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([-_PLOT_INFINITY, 0]) super().plot(x_coords, color="r") class ForwardSemiInfinitePressureBand(PressureElement): """Semi-infinite pressure band in forward direction. Args ---- **kwargs Same as PressureElement arguments """ @PressureElement.pressure.getter def pressure(self, x_coord=None): """Return shape of pressure profile.""" if x_coord is None: return self._pressure _x_rel = x_coord - self.x_coord if _x_rel < 0.0: return 0.0 else: return self._pressure def _get_local_influence_coefficient(self, x_coord): """Return _get_local_influence_coefficient coefficient in iso-geometric coordinates. Args ---- x_coord : float x-coordinate, centered at element location. """ lambda_0 = config.flow.k0 * x_coord aux_f, _ = _get_aux_fg(lambda_0) influence = _get_gamma3(lambda_0, aux_f) return influence def plot(self): """Plot pressure element shape.""" x_coords = self.x_coord + np.array([0, _PLOT_INFINITY]) super().plot(x_coords, color="r") PK!JvLvL)planingfsi/potentialflow/pressurepatch.py"""Classes representing a pressure patch on the free surface.""" import os import numpy as np from scipy.interpolate import interp1d from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp import planingfsi.potentialflow.pressureelement as pe class PressurePatch(object): """Abstract base class representing a patch of pressure elements on the free surface. Attributes ---------- pressure_elements : list List of pressure elements. base_pt : float x-location of base point. length : float Length of pressure patch. is_kutta_unknown : bool True of trailing edge pressure unknown neighbor_up : PressurePatch PressurePatch instance upstream of this one. neighbor_down : PressurePatch PressurePatch instance downstream of this one. interpolator : FSIInterpolator Object to get interpolated body position if a `PlaningSurface` force_dict : dict Dictionary of global force values derived from element pressure/shear stress. parent : PotentialPlaningCalculation Parent solver this patch belongs to. """ def __init__(self, neighbor_up=None, neighbor_down=None, parent=None): self.pressure_elements = [] self._end_pts = np.zeros(2) self.is_kutta_unknown = False self._neighbor_up = None self._neighbor_down = None self.neighbor_up = neighbor_up self.neighbor_down = neighbor_down self.interpolator = None self.force_dict = {} self.parent = parent @property def base_pt(self): return self._end_pts[0] @base_pt.setter def base_pt(self, x): self._end_pts[0] = x @property def length(self): return self._end_pts[1] - self._end_pts[0] @property def neighbor_up(self): return self._neighbor_up @neighbor_up.setter def neighbor_up(self, obj): if obj is not None: self._neighbor_up = obj self._neighbor_up.neighbor_down = self @property def neighbor_down(self): return self._neighbor_down @neighbor_down.setter def neighbor_down(self, obj): if obj is not None: self._neighbor_down = obj self._neighbor_down.neighbor_up = self def _reset_element_coords(self): """Re-distribute pressure element positions given the length and end points of this patch. """ x = self._get_element_coords() for i, el in enumerate(self.pressure_elements): el.x_coord = x[i] if not el.is_source: el.z_coord = self.interpolator.getBodyHeight(x[i]) if isinstance(el, pe.CompleteTriangularPressureElement): el.width = [x[i] - x[i - 1], x[i + 1] - x[i]] elif isinstance(el, pe.ForwardHalfTriangularPressureElement): el.width = x[i + 1] - x[i] elif isinstance(el, pe.AftHalfTriangularPressureElement): el.width = x[i] - x[i - 1] elif isinstance(el, pe.AftSemiInfinitePressureBand): el.width = np.inf else: raise ValueError("Invalid Element Type!") def get_free_surface_height(self, x): """Get free surface height at a position x due to the elements on this patch. Args ---- x : float x-position at which to calculate free-surface height. """ return sum([el.get_influence(x) for el in self.pressure_elements]) def calculate_wave_drag(self): """Calculate wave drag of patch. Returns ------- None """ xo = -10.1 * config.flow.lam xTrough, = fmin(self.get_free_surface_height, xo, disp=False) xCrest, = fmin(lambda x: -self.get_free_surface_height(x), xo, disp=False) self.Dw = ( 0.0625 * config.flow.density * config.flow.gravity * ( self.get_free_surface_height(xCrest) - self.get_free_surface_height(xTrough) ) ** 2 ) def print_forces(self): """Print forces to screen.""" print(("Forces and Moment for {0}:".format(self.patch_name))) print((" Total Drag [N] : {0:6.4e}".format(self.D))) print((" Wave Drag [N] : {0:6.4e}".format(self.Dw))) print((" Pressure Drag [N] : {0:6.4e}".format(self.Dp))) print((" Frictional Drag [N] : {0:6.4e}".format(self.Df))) print((" Total Lift [N] : {0:6.4e}".format(self.L))) print((" Pressure Lift [N] : {0:6.4e}".format(self.Lp))) print((" Frictional Lift [N] : {0:6.4e}".format(self.Lf))) print((" Moment [N-m] : {0:6.4e}".format(self.M))) def write_forces(self): """Write forces to file.""" kp.writeasdict( os.path.join( config.it_dir, "forces_{0}.{1}".format(self.patch_name, config.io.data_format), ), ["Drag", self.D], ["WaveDrag", self.Dw], ["PressDrag", self.Dp], ["FricDrag", self.Df], ["Lift", self.L], ["PressLift", self.Lp], ["FricLift", self.Lf], ["Moment", self.M], ["BasePt", self.base_pt], ["Length", self.length], ) def load_forces(self): """Load forces from file.""" K = kp.Dictionary( os.path.join( config.it_dir, "forces_{0}.{1}".format(self.patch_name, config.io.data_format), ) ) self.D = K.read("Drag", 0.0) self.Dw = K.read("WaveDrag", 0.0) self.Dp = K.read("PressDrag", 0.0) self.Df = K.read("FricDrag", 0.0) self.L = K.read("Lift", 0.0) self.Lp = K.read("PressLift", 0.0) self.Lf = K.read("FricLift", 0.0) self.M = K.read("Moment", 0.0) self.base_pt = K.read("BasePt", 0.0) self.length = K.read("Length", 0.0) class PressureCushion(PressurePatch): """Pressure Cushion consisting solely of source elements. Args ---- dict_ : krampy.Dictionary or str Dictionary instance or string with dictionary path to load properties. **kwargs : dict Keyword arguments. Attributes ---------- patch_name : str Name of patch base_pt : float Right end of `PressurePatch`. """ count = 0 def __init__(self, dict_, **kwargs): super(self.__class__, self).__init__(self, **kwargs) self.index = PressureCushion.count PressureCushion.count += 1 self.patch_name = dict_.read( "pressureCushionName", "pressureCushion{0}".format(self.index) ) # TODO: cushion_type variability should be managed by sub-classing # PressurePatch self.cushion_type = dict_.read("cushionType", "") self.cushion_pressure = kwargs.get( "Pc", dict_.read_load_or_default("cushionPressure", 0.0) ) self.neighbor_up = PlaningSurface.find_by_name( dict_.read("upstreamPlaningSurface", "") ) self.neighbor_down = PlaningSurface.find_by_name( dict_.read("downstreamPlaningSurface", "") ) if self.neighbor_down is not None: self.neighbor_down.upstream_pressure = self.cushion_pressure if self.neighbor_up is not None: self.neighbor_up.kutta_pressure = self.cushion_pressure if self.cushion_type == "infinite": # Dummy element, will have 0 pressure self.pressure_elements += [ pe.AftSemiInfinitePressureBand(is_source=True, is_on_body=False) ] self.pressure_elements += [ pe.AftSemiInfinitePressureBand(is_source=True, is_on_body=False) ] self._end_pts[0] = -1000.0 # doesn't matter where else: self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement( is_source=True, is_on_body=False ) ] Nfl = dict_.read("numElements", 10) self.smoothing_factor = dict_.read("smoothingFactor", np.nan) for n in [self.neighbor_down, self.neighbor_up]: if n is None and ~np.isnan(self.smoothing_factor): self.pressure_elements += [ pe.CompleteTriangularPressureElement( is_source=True, is_on_body=False ) for __ in range(Nfl) ] self.end_pts = [ dict_.read_load_or_default(key, 0.0) for key in ["downstreamLoc", "upstreamLoc"] ] self.pressure_elements += [ pe.AftHalfTriangularPressureElement(is_source=True, is_on_body=False) ] self._update_end_pts() @property def base_pt(self): if self.neighbor_up is None: return self._end_pts[1] else: return self.neighbor_up.base_pt @base_pt.setter def base_pt(self, x): self._end_pts[1] = x def _get_element_coords(self): """Return x-locations of all elements.""" if not self.cushion_type == "smoothed" or np.isnan(self.smoothing_factor): return self.end_pts else: add_width = np.arctanh(0.99) * self.length / (2 * self.smoothing_factor) addL = np.linspace( -add_width, add_width, len(self.pressure_elements) / 2 + 1 ) x = np.hstack((self.end_pts[0] + addL, self.end_pts[1] + addL)) return x def _update_end_pts(self): if self.neighbor_down is not None: self._end_pts[0] = self.neighbor_down.end_pts[1] if self.neighbor_up is not None: self._end_pts[1] = self.neighbor_up.end_pts[0] self._reset_element_coords() # Set source pressure of elements for elNum, el in enumerate(self.pressure_elements): el.is_source = True if self.cushion_type == "smoothed" and not np.isnan(self.smoothing_factor): alf = 2 * self.smoothing_factor / self.length el.set_source_pressure( 0.5 * self.cushion_pressure * ( np.tanh(alf * el.x_coord) - np.tanh(alf * (el.x_coord - self.length)) ) ) # for infinite pressure cushion, first element is dummy, set to # zero, second is semiInfinitePressureBand and set to cushion # pressure elif self.cushion_type == "infinite": if elNum == 0: el.pressure = 0.0 else: el.pressure = self.cushion_pressure else: el.pressure = self.cushion_pressure @PressurePatch.length.setter def length(self, length): self._end_pts[0] = self._end_pts[1] - length def calculate_forces(self): self.calculate_wave_drag() class PlaningSurface(PressurePatch): """Planing Surface consisting of unknown elements.""" count = 0 all = [] @classmethod def find_by_name(cls, name): """Return first planing surface matching provided name. Returns ------- PlaningSurface instance or None of no match found """ if not name == "": matches = [o for o in cls.all if o.patch_name == name] if len(matches) > 0: return matches[0] return None def __init__(self, dict_, **kwargs): PressurePatch.__init__(self) self.index = PlaningSurface.count PlaningSurface.count += 1 PlaningSurface.all.append(self) self.patch_name = dict_.read("substructureName", "") Nfl = dict_.read("Nfl", 0) self.point_spacing = dict_.read("pointSpacing", "linear") self.initial_length = dict_.read("initialLength", None) self.minimum_length = dict_.read("minimumLength", 0.0) self.maximum_length = dict_.read("maximum_length", float("Inf")) self.spring_constant = dict_.read("springConstant", 1e4) self.kutta_pressure = kwargs.get( "kuttaPressure", dict_.read_load_or_default("kuttaPressure", 0.0) ) self._upstream_pressure = kwargs.get( "upstreamPressure", dict_.read_load_or_default("upstreamPressure", 0.0) ) self.is_initialized = False self.is_active = True self.is_kutta_unknown = True self.is_sprung = dict_.read("isSprung", False) if self.is_sprung: self.initial_length = 0.0 self.minimum_length = 0.0 self.maximum_length = 0.0 self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement(is_on_body=False) ] self.pressure_elements += [ pe.ForwardHalfTriangularPressureElement( is_source=True, pressure=self.kutta_pressure ) ] self.pressure_elements += [ pe.CompleteTriangularPressureElement() for __ in range(Nfl - 1) ] self.pressure_elements += [ pe.AftHalfTriangularPressureElement( is_source=True, pressure=self.upstream_pressure ) ] for el in self.pressure_elements: el.parent = self # Define point spacing if self.point_spacing == "cosine": self.pct = 0.5 * (1 - kp.cosd(np.linspace(0, 180, Nfl + 1))) else: self.pct = np.linspace(0.0, 1.0, Nfl + 1) self.pct /= self.pct[-2] self.pct = np.hstack((0.0, self.pct)) @property def upstream_pressure(self): if self.neighbor_up is not None: return self.neighbor_up.cushion_pressure else: return 0.0 @property def downstream_pressure(self): if self.neighbor_down is not None: return self.neighbor_down.cushion_pressure else: return 0.0 @upstream_pressure.setter def upstream_pressure(self, pressure): self._upstream_pressure = pressure el = self.pressure_elements[-1] el.pressure = pressure el.is_source = True el.z_coord = np.nan @PressurePatch.length.setter def length(self, length): length = np.max([length, 0.0]) x0 = self.interpolator.get_separation_point()[0] self._end_pts = [x0, x0 + length] self._reset_element_coords() def initialize_end_pts(self): """Initialize end points to be at separation point and wetted length root. """ self.base_pt = self.interpolator.get_separation_point()[0] if not self.is_initialized: if self.initial_length is None: self.length = self.interpolator.immersed_length - self.base_pt else: self.length = self.initial_length self.is_initialized = True def _reset_element_coords(self): """Set width of first element to be twice as wide.""" PressurePatch._reset_element_coords(self) x = self._get_element_coords() self.pressure_elements[0].width = x[2] - x[0] def _get_element_coords(self): """Get position of pressure elements.""" return self.pct * self.length + self._end_pts[0] def get_residual(self): """Get residual to drive first element pressure to zero.""" if self.length <= 0.0: return 0.0 else: return self.pressure_elements[0].pressure / config.flow.stagnation_pressure def calculate_forces(self): # TODO: Re-factor, this is messy." if self.length > 0.0: el = [el for el in self.pressure_elements if el.is_on_body] self.x = np.array([eli.x_coord for eli in el]) self.p = np.array([eli.pressure for eli in el]) self.p += self.pressure_elements[0].pressure self.shear_stress = np.array([eli.shear_stress for eli in el]) self.s = np.array([self.interpolator.getSFixedX(xx) for xx in self.x]) self.fP = interp1d(self.x, self.p, bounds_error=False, fill_value=0.0) self.fTau = interp1d( self.x, self.shear_stress, bounds_error=False, fill_value=0.0 ) AOA = kp.atand(self._get_body_derivative(self.x)) self.Dp = kp.integrate(self.s, self.p * kp.sind(AOA)) self.Df = kp.integrate(self.s, self.shear_stress * kp.cosd(AOA)) self.Lp = kp.integrate(self.s, self.p * kp.cosd(AOA)) self.Lf = -kp.integrate(self.s, self.shear_stress * kp.sind(AOA)) self.D = self.Dp + self.Df self.L = self.Lp + self.Lf self.M = kp.integrate( self.x, self.p * kp.cosd(AOA) * (self.x - config.body.xCofR) ) else: self.Dp = 0.0 self.Df = 0.0 self.Lp = 0.0 self.Lf = 0.0 self.D = 0.0 self.L = 0.0 self.M = 0.0 self.x = [] if self.is_sprung: self._apply_spring() self.calculate_wave_drag() def get_loads_in_range(self, x0, x1): """Return pressure and shear stress values at points between two arguments. """ # Get indices within range unless length is zero if self.length > 0.0: ind = np.nonzero((self.x > x0) * (self.x < x1))[0] else: ind = [] # Output all values within range if not ind == []: x = np.hstack((x0, self.x[ind], x1)) p = np.hstack((self.fP(x0), self.p[ind], self.fP(x1))) tau = np.hstack((self.fTau(x0), self.shear_stress[ind], self.fTau(x1))) else: x = np.array([x0, x1]) p = np.zeros_like(x) tau = np.zeros_like(x) return x, p, tau def _apply_spring(self): xs = self.pressure_elements[0].x_coord zs = self.pressure_elements[0].z_coord disp = zs - self.parent.get_free_surface_height(xs) Fs = -self.spring_constant * disp self.L += Fs self.M += Fs * (xs - config.body.xCofR) def _calculate_shear_stress(self): def shear_stress_func(xx): if xx == 0.0: return 0.0 else: Rex = config.flow.flow_speed * xx / config.flow.kinematic_viscosity return ( 0.332 * config.flow.density * config.flow.flow_speed ** 2 * Rex ** -0.5 ) x = self._get_element_coords()[0:-1] s = np.array([self.interpolator.getSFixedX(xx) for xx in x]) s = s[-1] - s for si, el in zip(s, self.pressure_elements): el.shear_stress = shear_stress_func(si) def _get_body_derivative(self, x, direction="r"): if isinstance(x, float): x = [x] return np.array( [ kp.getDerivative(self.interpolator.getBodyHeight, xx, direction) for xx in x ] ) PK!kVWVW"planingfsi/potentialflow/solver.py"""Fundamental module for constructing and solving planing potential flow problems """ from os.path import join import numpy as np from scipy.optimize import fmin import planingfsi.config as config # import planingfsi.krampy as kp # from planingfsi import io from planingfsi.potentialflow.pressurepatch import PlaningSurface from planingfsi.potentialflow.pressurepatch import PressureCushion if config.plotting.plot_any: import matplotlib.pyplot as plt class PotentialPlaningSolver(object): """The top-level object which handles calculation of the potential flow problem. Pointers to the planing surfaces and pressure elements are stored for reference during initialization and problem setup. The potential planing calculation is then solved during iteration with the structural solver. """ # Class method to handle singleton behavior __instance = None def __new__(cls): if cls.__instance is None: cls.__instance = object.__new__(cls) cls.__instance.init() return cls.__instance def init(self): # Not __init__. This method gets called by __new__ on the first # instance. self.X = None self.D = None self.Dw = None self.Dp = None self.Df = None self.L = None self.Lp = None self.Lf = None self.M = None self.p = None self.shear_stress = None self.xFS = None self.zFS = None self.x = None self.xBar = None self.xHist = None self.storeLen = None self.max_len = None self.planing_surfaces = [] self.pressure_cushions = [] self.pressure_patches = [] self.pressure_elements = [] self.lineFS = None self.lineFSi = None self.solver = None self.min_len = None self.init_len = None def add_pressure_patch(self, instance): """Add pressure patch to the calculation. Args ---- instance : PressurePatch The pressure patch object to add. Returns ------- None """ self.pressure_patches.append(instance) self.pressure_elements += [el for el in instance.pressure_elements] return None def add_planing_surface(self, dict_, **kwargs): """Add planing surface to the calculation from a dictionary file name. Args ---- dict_ : planingfsi.io.Dictionary The dictionary file. Returns ------- PlaningSurface Instance created from dictionary. """ kwargs["parent"] = self instance = PlaningSurface(dict_, **kwargs) self.planing_surfaces.append(instance) self.add_pressure_patch(instance) return instance def add_pressure_cushion(self, dict_, **kwargs): """Add pressure cushion to the calculation from a dictionary file name. Args ---- dict_ : planingfsi.io.Dictionary The dictionary file. Returns ------- PressureCushion Instance created from dictionary. """ kwargs["parent"] = self instance = PressureCushion(dict_, **kwargs) self.pressure_cushions.append(instance) self.add_pressure_patch(instance) return instance def get_planing_surface_by_name(self, name): """Return planing surface by name. Args ---- name : str Name of planing surface. Returns ------- PlaningSurface Planing surface object match """ l = [surf for surf in self.planing_surfaces if surf.patch_name == name] if len(l) > 0: return l[0] else: return None def print_element_status(self): """Print status of each element.""" for el in self.pressure_elements: el.print_element() return None def calculate_pressure(self): """Calculate pressure of each element to satisfy body boundary conditions. """ # Form lists of unknown and source elements unknownEl = [ el for el in self.pressure_elements if not el.is_source and el.width > 0.0 ] nanEl = [el for el in self.pressure_elements if el.width <= 0.0] sourceEl = [el for el in self.pressure_elements if el.is_source] # Form arrays to build linear system # x location of unknown elements X = np.array([el.x_coord for el in unknownEl]) # influence coefficient matrix (to be reshaped) A = np.array([el.get_influence_coefficient(x) for x in X for el in unknownEl]) # height of each unknown element above waterline Z = np.array([el.z_coord for el in unknownEl]) - config.flow.waterline_height # subtract contribution from source elements Z -= np.array([np.sum([el.get_influence(x) for el in sourceEl]) for x in X]) # Solve linear system dim = len(unknownEl) if not dim == 0: p = np.linalg.solve(np.reshape(A, (dim, dim)), np.reshape(Z, (dim, 1)))[ :, 0 ] else: p = np.zeros_like(Z) # Apply calculated pressure to the elements for pi, el in zip(p, unknownEl): el.pressure = pi for el in nanEl: el.pressure = 0.0 return None def get_residual(self, Lw): """Return array of residuals of each planing surface to satisfy Kutta condition. Args ---- Lw : np.ndarray Array of wetted lengths of planing surfaces. Returns ------- np.ndarray Array of trailing edge residuals of each planing surface. """ # Set length of each planing surface for Lwi, p in zip(Lw, self.planing_surfaces): p.length = np.min([Lwi, p.maximum_length]) # Update bounds of pressure cushions for p in self.pressure_cushions: p._update_end_pts() # Solve for unknown pressures and output residual self.calculate_pressure() res = np.array([p.get_residual() for p in self.planing_surfaces]) def array_to_string(array): return ", ".join(["{0:11.4e}".format(a) for a in array]).join("[]") print(" Lw: ", array_to_string(Lw)) print(" Residual:", array_to_string(res)) print() return res def calculate_response(self): """Calculate response, including pressure and free-surface profiles. Will load results from file if specified. """ if config.io.results_from_file: self.load_response() else: self.calculate_response_unknown_wetted_length() def calculate_response_unknown_wetted_length(self): """Calculate potential flow problem via iteration to find wetted length of all planing surfaces to satisfy all trailing edge conditions. """ # Reset results so they will be recalculated after new solution self.xFS = None self.X = None self.xHist = [] # Initialize planing surface lengths and then solve until residual is # *zero* if len(self.planing_surfaces) > 0: for p in self.planing_surfaces: p.initialize_end_pts() print(" Solving for wetted length:") if self.min_len is None: self.min_len = np.array( [p.minimum_length for p in self.planing_surfaces] ) self.max_len = np.array( [p.maximum_length for p in self.planing_surfaces] ) self.init_len = np.array( [p.initial_length for p in self.planing_surfaces] ) else: for i, p in enumerate(self.planing_surfaces): L = p.get_length() self.init_len[i] = p.initial_length if ~np.isnan(L) and L - self.min_len[i] > 1e-6: # and self.solver.it < self.solver.maxIt: self.init_len[i] = L * 1.0 else: self.init_len[i] = p.initial_length dxMaxDec = config.solver.wetted_length_max_step_pct_dec * ( self.init_len - self.min_len ) dxMaxInc = config.solver.wetted_length_max_step_pct_inc * ( self.init_len - self.min_len ) for i, p in enumerate(self.planing_surfaces): if p.initial_length == 0.0: self.init_len[i] = 0.0 if self.solver is None: self.solver = kp.RootFinder( self.get_residual, self.init_len * 1.0, config.solver.wetted_length_solver, xMin=self.min_len, xMax=self.max_len, errLim=config.solver.wetted_length_tol, dxMaxDec=dxMaxDec, dxMaxInc=dxMaxInc, firstStep=1e-6, maxIt=config.solver.wetted_length_max_it_0, maxJacobianResetStep=config.solver.wetted_length_max_jacobian_reset_step, relax=config.solver.wetted_length_relax, ) else: self.solver.maxIt = config.solver.wetted_length_max_it self.solver.reinitialize(self.init_len * 1.0) self.solver.setMaxStep(dxMaxInc, dxMaxDec) if any(self.init_len > 0.0): self.solver.solve() self.calculate_pressure_and_shear_profile() def plot_residuals_over_range(self): """Plot residuals.""" self.storeLen = np.array([p.get_length() for p in self.planing_surfaces]) xx, yy = list(zip(*self.xHist)) N = 10 L = np.linspace(0.001, 0.25, N) X = np.zeros((N, N)) Y = np.zeros((N, N)) Z1 = np.zeros((N, N)) Z2 = np.zeros((N, N)) for i, Li in enumerate(L): for j, Lj in enumerate(L): x = np.array([Li, Lj]) y = self.get_residual(x) X[i, j] = x[0] Y[i, j] = x[1] Z1[i, j] = y[0] Z2[i, j] = y[1] for i, Zi, seal in zip([2, 3], [Z1, Z2], ["bow", "stern"]): plt.figure(i) plt.contourf(X, Y, Zi, 50) plt.gray() plt.colorbar() plt.contour(X, Y, Z1, np.array([0.0]), colors="b") plt.contour(X, Y, Z2, np.array([0.0]), colors="b") plt.plot(xx, yy, "k.-") if self.solver.converged: plt.plot(xx[-1], yy[-1], "y*", markersize=10) else: plt.plot(xx[-1], yy[-1], "rx", markersize=8) plt.title("Residual for {0} seal trailing edge pressure".format(seal)) plt.xlabel("Bow seal length") plt.ylabel("Stern seal length") plt.savefig("{0}SealResidual_{1}.png".format(seal, config.it), format="png") plt.clf() self.get_residual(self.storeLen) def calculate_pressure_and_shear_profile(self): """Calculate pressure and shear stress profiles over plate surface.""" if self.X is None: if config.flow.include_friction: for p in self.planing_surfaces: p._calculate_shear_stress() # Calculate forces on each patch for p in self.pressure_patches: p.calculate_forces() # Calculate pressure profile if len(self.pressure_patches) > 0: self.X = np.sort( np.unique( np.hstack( [p._get_element_coords() for p in self.pressure_patches] ) ) ) self.p = np.zeros_like(self.X) self.shear_stress = np.zeros_like(self.X) else: self.X = np.array([-1e6, 1e6]) self.p = np.zeros_like(self.X) self.shear_stress = np.zeros_like(self.X) for el in self.pressure_elements: if el.is_on_body: self.p[el.x_coord == self.X] += el.pressure self.shear_stress[el.x_coord == self.X] += el.shear_stress # Calculate total forces as sum of forces on each patch for var in ["D", "Dp", "Df", "Dw", "L", "Lp", "Lf", "M"]: setattr( self, var, sum([getattr(p, var) for p in self.pressure_patches]) ) f = self.get_free_surface_height xo = -10.1 * config.flow.lam xTrough, = fmin(f, xo, disp=False) xCrest, = fmin(lambda x: -f(x), xo, disp=False) self.Dw = ( 0.0625 * config.flow.density * config.flow.gravity * (f(xCrest) - f(xTrough)) ** 2 ) # Calculate center of pressure self.xBar = kp.integrate(self.X, self.p * self.X) / self.L if config.plotting.show_pressure: self.plot_pressure() def calculate_free_surface_profile(self): """Calculate free surface profile.""" if self.xFS is None: xFS = [] # Grow points upstream and downstream from first and last plate for surf in self.planing_surfaces: if surf.length > 0: pts = surf._get_element_coords() xFS.append( kp.growPoints( pts[1], pts[0], config.plotting.x_fs_min, config.plotting.growth_rate, ) ) xFS.append( kp.growPoints( pts[-2], pts[-1], config.plotting.x_fs_max, config.plotting.growth_rate, ) ) # Add points from each planing surface fsList = [ patch._get_element_coords() for patch in self.planing_surfaces if patch.length > 0 ] if len(fsList) > 0: xFS.append(np.hstack(fsList)) # Add points from each pressure cushion xFS.append( np.linspace(config.plotting.x_fs_min, config.plotting.x_fs_max, 100) ) # for patch in self.pressure_cushions: # if patch.neighborDown is not None: # ptsL = patch.neighborDown._get_element_coords() # else: # ptsL = np.array([patch.endPt[0] - 0.01, patch.endPt[0]]) # # if patch.neighborDown is not None: # ptsR = patch.neighborUp._get_element_coords() # else: # ptsR = np.array([patch.endPt[1], patch.endPt[1] + 0.01]) # # xEnd = ptsL[-1] + 0.5 * patch.get_length() # xFS.append( # kp.growPoints(ptsL[-2], ptsL[-1], # xEnd, config.growthRate)) # xFS.append( # kp.growPoints(ptsR[1], ptsR[0], xEnd, config.growthRate)) # Sort x locations and calculate surface heights if len(xFS) > 0: self.xFS = np.sort(np.unique(np.hstack(xFS))) self.zFS = np.array(list(map(self.get_free_surface_height, self.xFS))) else: self.xFS = np.array([config.xFSMin, config.xFSMax]) self.zFS = np.zeros_like(self.xFS) def get_free_surface_height(self, x): """Return free surface height at a given x-position consideringthe contributions from all pressure patches. Args ---- x : float x-position at which free surface height shall be returned. Returns ------- float Free-surface position at input x-position """ return sum( [patch.get_free_surface_height(x) for patch in self.pressure_patches] ) def get_free_surface_derivative(self, x, direction="c"): """Return slope (derivative) of free-surface profile. Args ---- x : float x-position Returns ------- float Derivative or slope of free-surface profile """ ddx = kp.getDerivative(self.get_free_surface_height, x, direction=direction) return ddx def write_results(self): """Write results to file.""" # Post-process results from current solution # self.calculate_pressure_and_shear_profile() self.calculate_free_surface_profile() if self.D is not None: self.write_forces() if self.X is not None: self.write_pressure_and_shear() if self.xFS is not None: self.write_free_surface() def write_pressure_and_shear(self): """Write pressure and shear stress profiles to data file.""" if len(self.pressure_elements) > 0: kp.writeaslist( join( config.it_dir, "pressureAndShear.{0}".format(config.io.data_format) ), ["x [m]", self.X], ["p [Pa]", self.p], ["shear_stress [Pa]", self.shear_stress], ) def write_free_surface(self): """Write free-surface profile to file.""" if len(self.pressure_elements) > 0: kp.writeaslist( join(config.it_dir, "freeSurface.{0}".format(config.io.data_format)), ["x [m]", self.xFS], ["y [m]", self.zFS], ) def write_forces(self): """Write forces to file.""" if len(self.pressure_elements) > 0: kp.writeasdict( join(config.it_dir, "forces_total.{0}".format(config.io.data_format)), ["Drag", self.D], ["WaveDrag", self.Dw], ["PressDrag", self.Dp], ["FricDrag", self.Df], ["Lift", self.L], ["PressLift", self.Lp], ["FricLift", self.Lf], ["Moment", self.M], ) for patch in self.pressure_patches: patch.write_forces() def load_response(self): """Load results from file.""" self.load_forces() self.load_pressure_and_shear() self.load_free_surface() def load_pressure_and_shear(self): """Load pressure and shear stress from file.""" self.x, self.p, self.shear_stress = np.loadtxt( join(config.it_dir, "pressureAndShear.{0}".format(config.io.data_format)), unpack=True, ) for el in [ el for patch in self.planing_surfaces for el in patch.pressure_elements ]: compare = np.abs(self.x - el.get_xloc()) < 1e-6 if any(compare): el.set_pressure(self.p[compare][0]) el.set_shear_stress(self.shear_stress[compare][0]) for p in self.planing_surfaces: p.calculate_forces() def load_free_surface(self): """Load free surface coordinates from file.""" try: Data = np.loadtxt( join(config.it_dir, "freeSurface.{0}".format(config.io.data_format)) ) self.xFS = Data[:, 0] self.zFS = Data[:, 1] except IOError: self.zFS = np.zeros_like(self.xFS) return None def load_forces(self): """Load forces from file.""" K = kp.Dictionary( join(config.it_dir, "forces_total.{0}".format(config.io.data_format)) ) self.D = K.read("Drag", 0.0) self.Dw = K.read("WaveDrag", 0.0) self.Dp = K.read("PressDrag", 0.0) self.Df = K.read("FricDrag", 0.0) self.L = K.read("Lift", 0.0) self.Lp = K.read("PressLift", 0.0) self.Lf = K.read("FricLift", 0.0) self.M = K.read("Moment", 0.0) for patch in self.pressure_patches: patch.load_forces() def plot_pressure(self): """Create a plot of the pressure and shear stress profiles.""" plt.figure(figsize=(5.0, 5.0)) plt.xlabel(r"$x/L_i$") plt.ylabel(r"$p/(1/2\rho U^2)$") for el in self.pressure_elements: el.plot() plt.plot(self.X, self.p, "k-") plt.plot(self.X, self.shear_stress * 1000, "c--") # Scale y axis by stagnation pressure for line in plt.gca().lines: x, y = line.get_data() line.set_data( x / config.body.reference_length * 2, y / config.flow.stagnation_pressure, ) plt.xlim([-1.0, 1.0]) # plt.xlim(kp.minMax(self.X / config.Lref * 2)) plt.ylim( [0.0, np.min([1.0, 1.2 * np.max(self.p / config.flow.stagnation_pressure)])] ) plt.savefig( "pressureElements.{0}".format(config.figFormat), format=config.figFormat ) plt.figure(1) return None def plot_free_surface(self): """Create a plot of the free surface profile.""" self.calculate_free_surface_profile() if self.lineFS is not None: self.lineFS.set_data(self.xFS, self.zFS) endPts = np.array([config.plotting.x_fs_min, config.plotting.x_fs_max]) if self.lineFSi is not None: self.lineFSi.set_data( endPts, config.flow.waterline_height * np.ones_like(endPts) ) return None PK!H#Dc+planingfsi-0.1.1.dist-info/entry_points.txtN+I/N.,()JOK-J,IM-ΰ-IKO+δJ&fqA%܂= \PK!;66"planingfsi-0.1.1.dist-info/LICENSEMIT License Copyright (c) 2018 Matthew Robert Kramer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU planingfsi-0.1.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HF #planingfsi-0.1.1.dist-info/METADATAV[O#7~8+^fvUUۈоJ8B6sBU*EɌ> FQ(e}5NiaA?yk*yI&2փB(kѬv_ dk,*F&BEkjHqE=܉JI4LLoy+陀3!wkB dn ULc(f[24?[>bZ C&e>s缽2S sWT2+x~jO,nS2o̥BiU42X+wBJa{>~ࣃ#Nۨ"Î330jۻH_@oE2^_MTSR0{(JNie>Fgo% KfHE2n c{0k>ϧrޯ+V u| czsZ}un>7حEε QV8tN3-"xbih-z!ms\&:5ߥ݀wҳ=)%|A&z @R=j!ĕ \*XM^"gse$[X*b)ZLDŽB!,qӯk)+t FX @,mʥLFiݛB[8#>"5%nIDz^{龤I%YbN)dF+98iKTV{~NP ~Tv!ZB˔%PE-B 4C.a\4hpeLwg v;yi+ Uҥp۝O!~mjF^U-Pۮ6yUIxt4.8Gsي_ /WMWe[}p@M$FfP׶e-c7]_rw` :Иw>ބn8;,SS.[%Z63} i? cwwwFyPK!HEaa !planingfsi-0.1.1.dist-info/RECORDɮyfp0`0x0{x|As++=90U߰!?R+P>C"}?pwyT2AYі~J!`,FY8fMI=<3ڥ1hBN/JF]F"q/(Te.~@]+d 2H<@aR?>LEW|:nqks g:%& A?)9dc Rwo2>7'o٫C!M;cx%:-Y2mD@mEWnű%Hr}0(kJ74F'Oq瞋EhФ1C0xa-"ltvҢ}dD-syS>GqsHQ—9z׃(s4_2:MOA5&(&p?pm? d$XyM8k{=o`_kK>B;SL""oFJ4=wrm{mG }UcԐ˼KJKoL$ :D&5uE0ι=a?^gԛNY8waH0oZpzS @ߒ*Oln|28ہ'RXN!Ŭ'e۔%{4@,k*+'nxYÌ 5ef.?POx*>l O #Im__3|#6L\Gr4Ѯ-h`9# s"j?a,손0!Uf rηc0sX6#rinfȿny^OOA+k4YNۇ ՠ!LS8ɿgb8b~ʾx10>4 6}C^,o6xV38/PK!}ZZkrampy/__init__.pyPK!DV--krampy/general.pyPK!5,,U/krampy/io_old.pyPK!Sp!p!e\krampy/iotools/IOTools.pyPK!N<   ~krampy/iotools/InputSheet.pyPK!]dkrampy/iotools/__init__.pyPK!WEqq3krampy/iotools/dictionary.pyPK!ޞkrampy/iterator.pyPK!ֶ^V11קkrampy/krampy_old.pyPK!纥**krampy/solver.pyPK!']]krampy/stopwatch.pyPK!>-V krampy/trig.pyPK!S ^krampy/unit.pyPK!)(;!krampy/unit_old.pyPK!f?vM))"planingfsi/__init__.pyPK!$Q%planingfsi/__version__.pyPK!LYY%planingfsi/cli.pyPK!88&,planingfsi/config.pyPK!udplaningfsi/fe/__init__.pyPK!(VF  dplaningfsi/fe/felib.pyPK!$5$5wplaningfsi/fe/femesh.pyPK!Eplaningfsi/fe/structure.pyPK!splaningfsi/fsi/__init__.pyPK!'&1&1splaningfsi/fsi/figure.pyPK!T %planingfsi/fsi/interpolator.pyPK!,!!#planingfsi/fsi/simulation.pyPK!$planingfsi/potentialflow/__init__.pyPK!Y55+0planingfsi/potentialflow/pressureelement.pyPK!JvLvL)*planingfsi/potentialflow/pressurepatch.pyPK!kVWVW"Tplaningfsi/potentialflow/solver.pyPK!H#Dc+}planingfsi-0.1.1.dist-info/entry_points.txtPK!;66" planingfsi-0.1.1.dist-info/LICENSEPK!HnHTU planingfsi-0.1.1.dist-info/WHEELPK!HF #planingfsi-0.1.1.dist-info/METADATAPK!HEaa !oplaningfsi-0.1.1.dist-info/RECORDPK##