PKYuM`xone/__init__.py"""Frequently used functions for financial data analysis""" __version__ = '0.0.5' __submodules__ = [ 'utils', 'calendar', 'files', 'procs', 'plots', ] PKXMA|ssxone/__main__.pydef main(): import xdoctest xdoctest.doctest_module('xone') if __name__ == '__main__': main() PKXMR xone/bbg.pyASSETS = ['Equity', 'Index', 'Curncy', 'Corp'] if __name__ == '__main__': """ CommandLine: python -m xone.bbg all """ import xdoctest xdoctest.doctest_module() PK3M@? xone/cache.pyimport hashlib import json import pandas as pd import sys import inspect from functools import wraps from xone import utils, files, logs def cache_file(symbol, func, has_date, root, date_type='date'): """ Data file Args: symbol: symbol func: use function to categorize data has_date: contains date in data file root: root path date_type: parameters pass to utils.cur_time, [date, time, time_path, ...] Returns: str: date file """ cur_mod = sys.modules[func.__module__] data_tz = getattr(cur_mod, 'DATA_TZ') if hasattr(cur_mod, 'DATA_TZ') else 'UTC' cur_dt = utils.cur_time(typ=date_type, tz=data_tz, trading=False) if has_date: if hasattr(cur_mod, 'FILE_WITH_DATE'): file_fmt = getattr(cur_mod, 'FILE_WITH_DATE') else: file_fmt = '{root}/{typ}/{symbol}/{cur_dt}.parq' else: if hasattr(cur_mod, 'FILE_NO_DATE'): file_fmt = getattr(cur_mod, 'FILE_NO_DATE') else: file_fmt = '{root}/{typ}/{symbol}.parq' return data_file( file_fmt=file_fmt, root=root, cur_dt=cur_dt, typ=func.__name__, symbol=symbol ) def update_data(func): """ Decorator to save data more easily. Use parquet as data format Args: func: function to load data from data source Returns: wrapped function """ default = dict([ (param.name, param.default) for param in inspect.signature(func).parameters.values() if param.default != getattr(inspect, '_empty') ]) @wraps(func) def wrapper(*args, **kwargs): default.update(kwargs) kwargs.update(default) cur_mod = sys.modules[func.__module__] logger = logs.get_logger(name_or_func=f'{cur_mod.__name__}.{func.__name__}', types='stream') root_path = cur_mod.DATA_PATH date_type = kwargs.pop('date_type', 'date') save_static = kwargs.pop('save_static', True) save_dynamic = kwargs.pop('save_dynamic', True) symbol = kwargs.get('symbol') file_kw = dict(func=func, symbol=symbol, root=root_path, date_type=date_type) d_file = cache_file(has_date=True, **file_kw) s_file = cache_file(has_date=False, **file_kw) cached = kwargs.pop('cached', False) if cached and save_static and files.exists(s_file): logger.info(f'Reading data from {s_file} ...') return pd.read_parquet(s_file) data = func(*args, **kwargs) if save_static: files.create_folder(s_file, is_file=True) save_data(data=data, file_fmt=s_file, append=False) logger.info(f'Saved data file to {s_file} ...') if save_dynamic: drop_dups = kwargs.pop('drop_dups', None) files.create_folder(d_file, is_file=True) save_data(data=data, file_fmt=d_file, append=True, drop_dups=drop_dups) logger.info(f'Saved data file to {d_file} ...') return data return wrapper def save_data(data, file_fmt, append=False, drop_dups=None, info=None, **kwargs): """ Save data to file Args: data: pd.DataFrame file_fmt: data file format in terms of f-strings append: if append data to existing data drop_dups: list, drop duplicates in columns info: dict, infomation to be hashed and passed to f-strings **kwargs: additional parameters for f-strings Examples: >>> data = pd.DataFrame([[1, 2], [3, 4]], columns=['a', 'b']) >>> save_data(data, '{ROOT}/daily/{typ}.parq', ROOT='/data', typ='earnings') """ from xone import utils d_file = data_file(file_fmt=file_fmt, info=info, **kwargs) if append and files.exists(d_file): data = pd.DataFrame(pd.concat([pd.read_parquet(d_file), data], sort=False)) if drop_dups is not None: data.drop_duplicates(subset=utils.tolist(drop_dups), inplace=True) if not data.empty: data.to_parquet(d_file) return data def data_file(file_fmt, info=None, **kwargs): """ Data file name for given infomation Args: file_fmt: file format in terms of f-strings info: dict, to be hashed and then pass to f-string using 'hash_key' these info will also be passed to f-strings **kwargs: arguments for f-strings Returns: str: data file name """ from xone import utils if isinstance(info, dict): kwargs['hash_key'] = hashlib.md5(json.dumps(info).encode('utf-8')).hexdigest() kwargs.update(info) return utils.fstr(fmt=file_fmt, **kwargs) PKXM?_1FFxone/calendar.pyimport pandas as pd import sys from pandas.tseries import holiday class USTradingCalendar(holiday.AbstractHolidayCalendar): rules = [ holiday.Holiday('NewYearsDay', month=1, day=1, observance=holiday.nearest_workday), holiday.USMartinLutherKingJr, holiday.USPresidentsDay, holiday.GoodFriday, holiday.USMemorialDay, holiday.Holiday('USIndependenceDay', month=7, day=4, observance=holiday.nearest_workday), holiday.USLaborDay, holiday.USThanksgivingDay, holiday.Holiday('Christmas', month=12, day=25, observance=holiday.nearest_workday) ] def trading_dates(start, end, calendar='US'): """ Trading dates for given exchange Args: start: start date end: end date calendar: exchange as string Returns: pd.DatetimeIndex: datetime index Examples: >>> bus_dates = ['2018-12-24', '2018-12-26', '2018-12-27'] >>> trd_dates = trading_dates(start='2018-12-23', end='2018-12-27') >>> assert len(trd_dates) == len(bus_dates) >>> assert pd.Series(trd_dates == pd.DatetimeIndex(bus_dates)).all() """ kw = dict(start=pd.Timestamp(start, tz='UTC').date(), end=pd.Timestamp(end, tz='UTC').date()) us_cal = getattr(sys.modules[__name__], f'{calendar}TradingCalendar')() return pd.DatetimeIndex(freq='B', **kw).drop(us_cal.holidays(**kw)) if __name__ == '__main__': """ CommandLine: python -m xone.calendar all """ import xdoctest xdoctest.doctest_module() PKXM/ xone/files.pyimport pandas as pd import os import re import glob import time DATE_FMT = '\d{4}-(0?[1-9]|1[012])-(0?[1-9]|[12][0-9]|3[01])' def exists(path): """ Check path or file exists (use os.path.exists) Args: path: path or file """ return os.path.exists(path=path) def create_folder(path_name, is_file=False): """ Make folder as well as all parent folders if not exists Args: path_name: full path name is_file: whether input is name of file """ assert isinstance(path_name, str) path_sep = path_name.replace('\\', '/').split('/') for i in range(1, len(path_sep) + (0 if is_file else 1)): cur_path = '/'.join(path_sep[:i]) if not os.path.exists(cur_path): os.mkdir(cur_path) def all_files(path_name, keyword='', ext='', full_path=True, has_date=False, date_fmt=DATE_FMT): """ Search all files with criteria Args: path_name: full path name keyword: keyword to search ext: file extensions, split by ',' full_path: whether return full path (default True) has_date: whether has date in file name (default False) date_fmt: date format to check for has_date parameter Returns: list: all file names fulfilled criteria """ if not os.path.exists(path=path_name): return [] if keyword or ext: to_find = (('*%s*' % keyword) if keyword else '*') + '.' + (ext if ext else '*') files = [f for f in glob.iglob('/'.join([path_name, to_find]))] files = [ f.replace('\\', '/').split('/')[-1] for f in sorted(files, key=os.path.getmtime, reverse=True) if f[0] != '~' ] else: files = [ f for f in os.listdir(path=path_name) if os.path.isfile('/'.join([path_name, f])) and (f[0] != '~') ] if has_date: r = re.compile(date_fmt) files = filter(lambda vv: r.match(vv) is not None, files) if full_path: return ['/'.join([path_name, f]) for f in files] return files def all_folders(path_name, keyword='', has_date=False, date_fmt=DATE_FMT): """ Search all folders with criteria Args: path_name: full path name keyword: keyword to search has_date: whether has date in file name (default False) date_fmt: date format to check for has_date parameter Returns: list: all folder names fulfilled criteria """ if not os.path.exists(path=path_name): return [] if keyword: to_find = (('*%s*' % keyword) if keyword else '*') + '.*' files = [f for f in glob.iglob('/'.join([path_name, to_find]))] files = [ f.replace('\\', '/').split('/')[-1] for f in files if f[0] != '~' ] else: files = [ f for f in os.listdir(path=path_name) if os.path.isdir('/'.join([path_name, f])) and (f[0] != '~') ] if keyword != '': keyword = keyword.lower() files = filter(lambda vv: keyword in vv.lower(), files) if has_date: r = re.compile(date_fmt) files = filter(lambda vv: r.match(vv) is not None, files) return ['/'.join([path_name, f]) for f in files] def latest_file(path_name, keyword='', ext='', debug=False): """ Latest modified file in folder Args: path_name: full path name keyword: keyword to search ext: file extension debug: print out debug message if not found Returns: str: latest file name """ files = all_files(path_name=path_name, keyword=keyword, ext=ext, full_path=True) if len(files) == 0: if debug: print('File is not found in folder: %s' % path_name) return '' modified_time = [os.path.getmtime(f) for f in files] files = [f for (dt, f) in sorted(zip(modified_time, files))] return files[-1] def file_modified_time(file_name): return pd.to_datetime(time.ctime(os.path.getmtime(filename=file_name))) if __name__ == '__main__': """ CommandLine: python -m xone.files all """ import xdoctest xdoctest.doctest_module() PK%PL!f xone/logs.pyimport logging from xone import utils def get_logger( name_or_func, log_file='', level=logging.INFO, fmt='%(asctime)s:%(name)s:%(levelname)s:%(message)s', types='stream' ): """ Generate logger Args: name_or_func: logger name or current running function log_file: logger file level: level of logs - debug, info, error fmt: log formats types: file or stream, or both Returns: logger Examples: >>> get_logger(name_or_func='download_data', level='debug', types='stream') >>> get_logger(name_or_func='preprocess', log_file='pre.log', types='file|stream') """ if isinstance(level, str): level = getattr(logging, level.upper()) log_name = name_or_func if isinstance(name_or_func, str) else utils.func_scope(name_or_func) logger = logging.getLogger(name=log_name) logger.setLevel(level=level) if not len(logger.handlers): formatter = logging.Formatter(fmt=fmt) if 'file' in types: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(fmt=formatter) logger.addHandler(file_handler) if 'stream' in types: stream_handler = logging.StreamHandler() stream_handler.setFormatter(fmt=formatter) logger.addHandler(stream_handler) return logger PKLL3)) xone/plots.pyfrom xone import utils def plot_multi(data, cols=None, spacing=.06, color_map=None, plot_kw=None, **kwargs): """ Plot data with multiple scaels together Args: data: DataFrame of data cols: columns to be plotted spacing: spacing between legends color_map: customized colors in map plot_kw: kwargs for each plot **kwargs: kwargs for the first plot Returns: ax for plot Examples: >>> import pandas as pd >>> import numpy as np >>> >>> idx = range(5) >>> data = pd.DataFrame(dict(a=np.exp(idx), b=idx), index=idx) >>> # plot_multi(data=data, cols=['a', 'b'], plot_kw=[dict(style='.-'), dict()]) """ import matplotlib.pyplot as plt from pandas import plotting if cols is None: cols = data.columns if plot_kw is None: plot_kw = [dict(), dict()] if len(cols) == 0: return num_colors = len(utils.flatten(cols)) # Get default color style from pandas colors = getattr(getattr(plotting, '_style'), '_get_standard_colors')(num_colors=num_colors) if color_map is None: color_map = dict() fig = plt.figure() ax, lines, labels, c_idx = None, [], [], 0 for n in range(0, len(cols)): if isinstance(cols[n], (list, tuple)): ylabel = ' / '.join(cols[n]) color = [ color_map.get(cols[n][_ - c_idx], colors[_ % len(colors)]) for _ in range(c_idx, c_idx + len(cols[n])) ] c_idx += len(cols[n]) else: ylabel = cols[n] color = color_map.get(cols[n], colors[c_idx % len(colors)]) c_idx += 1 if ax is None: # First y-axes ax = data.loc[:, cols[n]].plot( label=cols[n], color=color, legend=False, zorder=n, **plot_kw[0], **kwargs ) ax.set_ylabel(ylabel=ylabel) line, label = ax.get_legend_handles_labels() ax.spines['left'].set_edgecolor('#D5C4A1') ax.spines['left'].set_alpha(.5) else: # Multiple y-axes ax_new = ax.twinx() ax_new.spines['right'].set_position(('axes', 1 + spacing * (n - 1))) data.loc[:, cols[n]].plot( ax=ax_new, label=cols[n], color=color, legend=False, zorder=n, **plot_kw[n] ) ax_new.set_ylabel(ylabel=ylabel) line, label = ax_new.get_legend_handles_labels() ax_new.spines['right'].set_edgecolor('#D5C4A1') ax_new.spines['right'].set_alpha(.5) ax_new.grid(False) # Proper legend position lines += line labels += label fig.legend(lines, labels, loc=8, prop=dict(), ncol=num_colors).set_zorder(len(cols)) ax.set_xlabel(' \n ') return ax def plot_h(data, cols, wspace=.1, plot_kw=None, **kwargs): """ Plot horizontally Args: data: DataFrame of data cols: columns to be plotted wspace: spacing between plots plot_kw: kwargs for each plot **kwargs: kwargs for the whole plot Returns: axes for plots Examples: >>> import pandas as pd >>> import numpy as np >>> >>> idx = range(5) >>> data = pd.DataFrame(dict(a=np.exp(idx), b=idx), index=idx) >>> # plot_h(data=data, cols=['a', 'b'], wspace=.2, plot_kw=[dict(style='.-'), dict()]) """ import matplotlib.pyplot as plt if plot_kw is None: plot_kw = [dict()] * len(cols) fig, axes = plt.subplots(nrows=1, ncols=len(cols), **kwargs) plt.subplots_adjust(wspace=wspace) for n in range(len(cols)): data.loc[:, cols[n]].plot(ax=axes[n], **plot_kw[n]) return axes PKXM b b xone/procs.pyimport sys import queue from multiprocessing import Process, cpu_count from itertools import product import win32process import win32api def run(func, keys, max_procs=None, show_proc=False, affinity=None, **kwargs): """ Provide interface for multiprocessing Args: func: callable functions keys: keys in kwargs that want to use process max_procs: max number of processes show_proc: whether to show process affinity: CPU affinity **kwargs: kwargs for func """ if max_procs is None: max_procs = cpu_count() kw_arr = saturate_kwargs(keys=keys, **kwargs) if len(kw_arr) == 0: return if isinstance(affinity, int): win32process.SetProcessAffinityMask(win32api.GetCurrentProcess(), affinity) task_queue = queue.Queue() while len(kw_arr) > 0: for _ in range(max_procs): if len(kw_arr) == 0: break kw = kw_arr.pop(0) p = Process(target=func, kwargs=kw) p.start() sys.stdout.flush() task_queue.put(p) if show_proc: signature = ', '.join([f'{k}={v}' for k, v in kw.items()]) print(f'[{func.__name__}] ({signature})') while not task_queue.empty(): p = task_queue.get() p.join() def saturate_kwargs(keys, **kwargs): """ Saturate all combinations of kwargs Args: keys: keys in kwargs that want to use process **kwargs: kwargs for func """ # Validate if keys are in kwargs and if they are iterable if isinstance(keys, str): keys = [keys] keys = [k for k in keys if k in kwargs and hasattr(kwargs.get(k, None), '__iter__')] if len(keys) == 0: return [] # Saturate coordinates of kwargs kw_corr = list(product(*(range(len(kwargs[k])) for k in keys))) # Append all possible values kw_arr = [] for corr in kw_corr: kw_arr.append( dict(zip(keys, [kwargs[keys[i]][corr[i]] for i in range(len(keys))])) ) # All combinations of kwargs of inputs for k in keys: kwargs.pop(k, None) kw_arr = [{**k, **kwargs} for k in kw_arr] return kw_arr if __name__ == '__main__': """ CommandLine: python -m xone.procs all """ import xdoctest xdoctest.doctest_module() PKXuM˱N)>)> xone/utils.pyimport numpy as np import pandas as pd import sys import inspect import json def tolist(iterable): """ Simpler implementation of flatten method Args: iterable: any array or value Returns: list: list of unique values Examples: >>> tolist('xyz') ['xyz'] >>> tolist(['ab', 'cd', 'xy', 'ab']) ['ab', 'cd', 'xy'] """ return pd.Series(iterable).drop_duplicates().tolist() def fmt_dt(dt, fmt='%Y-%m-%d'): """ Format date string Args: dt: any date format fmt: output date format Returns: str: date format Examples: >>> fmt_dt(dt='2018-12') '2018-12-01' >>> fmt_dt(dt='2018-12-31', fmt='%Y%m%d') '20181231' """ return pd.Timestamp(dt).strftime(fmt) def trade_day(dt, cal='US'): """ Latest trading day w.r.t given dt Args: dt: date of reference cal: trading calendar Returns: pd.Timestamp: last trading day Examples: >>> trade_day('2018-12-25').strftime('%Y-%m-%d') '2018-12-24' """ from xone import calendar dt = pd.Timestamp(dt).date() return calendar.trading_dates(start=dt - pd.Timedelta('10D'), end=dt, calendar=cal)[-1] def cur_time(typ='date', tz='US/Eastern', trading=True, cal='US'): """ Current time Args: typ: one of ['date', 'time', 'time_path', 'raw', ''] tz: timezone trading: check if current date is trading day cal: trading calendar Returns: relevant current time or date Examples: >>> cur_time(typ='date') >>> cur_time(typ='time', tz='UTC') >>> cur_time(typ='time_path', tz='Asia/Hong_Kong') >>> cur_time(typ='raw', tz='Europe/London') >>> cur_time(typ='') >>> cur_dt = pd.Timestamp('today', tz='US/Eastern').strftime('%Y-%m-%d') >>> cur_time(typ='date', trading=False) == cur_dt True """ dt = pd.Timestamp('now', tz=tz) if typ == 'date': if trading: return trade_day(dt=dt, cal=cal).strftime('%Y-%m-%d') else: return dt.strftime('%Y-%m-%d') if typ == 'time': return dt.strftime('%Y-%m-%d %H:%M:%S') if typ == 'time_path': return dt.strftime('%Y-%m-%d/%H-%M-%S') if typ == 'raw': return dt return trade_day(dt).date() if trading else dt.date() def align_data(*args): """ Resample and aligh data for defined frequency Args: *args: DataFrame of data to be aligned Returns: pd.DataFrame: aligned data with renamed columns Examples: >>> start = '2018-09-10T10:10:00' >>> tz = 'Australia/Sydney' >>> idx = pd.DatetimeIndex(start=start, periods=6, freq='min').tz_localize(tz) >>> close_1 = [31.08, 31.10, 31.11, 31.07, 31.04, 31.04] >>> vol_1 = [10166, 69981, 14343, 10096, 11506, 9718] >>> d1 = pd.DataFrame(dict(price=close_1, volume=vol_1), index=idx) >>> d1 price volume 2018-09-10 10:10:00+10:00 31.08 10166 2018-09-10 10:11:00+10:00 31.10 69981 2018-09-10 10:12:00+10:00 31.11 14343 2018-09-10 10:13:00+10:00 31.07 10096 2018-09-10 10:14:00+10:00 31.04 11506 2018-09-10 10:15:00+10:00 31.04 9718 >>> close_2 = [70.81, 70.78, 70.85, 70.79, 70.79, 70.79] >>> vol_2 = [4749, 6762, 4908, 2002, 9170, 9791] >>> d2 = pd.DataFrame(dict(price=close_2, volume=vol_2), index=idx) >>> d2 price volume 2018-09-10 10:10:00+10:00 70.81 4749 2018-09-10 10:11:00+10:00 70.78 6762 2018-09-10 10:12:00+10:00 70.85 4908 2018-09-10 10:13:00+10:00 70.79 2002 2018-09-10 10:14:00+10:00 70.79 9170 2018-09-10 10:15:00+10:00 70.79 9791 >>> align_data(d1, d2) price_1 volume_1 price_2 volume_2 2018-09-10 10:10:00+10:00 31.08 10166 70.81 4749 2018-09-10 10:11:00+10:00 31.10 69981 70.78 6762 2018-09-10 10:12:00+10:00 31.11 14343 70.85 4908 2018-09-10 10:13:00+10:00 31.07 10096 70.79 2002 2018-09-10 10:14:00+10:00 31.04 11506 70.79 9170 2018-09-10 10:15:00+10:00 31.04 9718 70.79 9791 """ res = pd.DataFrame(pd.concat([ d.loc[~d.index.duplicated(keep='first')].rename( columns=lambda vv: '%s_%d' % (vv, i + 1) ) for i, d in enumerate(args) ], axis=1)) data_cols = [col for col in res.columns if col[-2:] == '_1'] other_cols = [col for col in res.columns if col[-2:] != '_1'] res.loc[:, other_cols] = res.loc[:, other_cols].fillna(method='pad') return res.dropna(subset=data_cols) def cat_data(data_kw): """ Concatenate data with ticker as sub column index Args: data_kw: key = ticker, value = pd.DataFrame Returns: pd.DataFrame Examples: >>> start = '2018-09-10T10:10:00' >>> tz = 'Australia/Sydney' >>> idx = pd.DatetimeIndex(start=start, periods=6, freq='min').tz_localize(tz) >>> close_1 = [31.08, 31.10, 31.11, 31.07, 31.04, 31.04] >>> vol_1 = [10166, 69981, 14343, 10096, 11506, 9718] >>> d1 = pd.DataFrame(dict(price=close_1, volume=vol_1), index=idx) >>> close_2 = [70.81, 70.78, 70.85, 70.79, 70.79, 70.79] >>> vol_2 = [4749, 6762, 4908, 2002, 9170, 9791] >>> d2 = pd.DataFrame(dict(price=close_2, volume=vol_2), index=idx) >>> cat_data({'BHP AU': d1, 'RIO AU': d2}) ticker BHP AU RIO AU price volume price volume 2018-09-10 10:10:00+10:00 31.08 10166 70.81 4749 2018-09-10 10:11:00+10:00 31.10 69981 70.78 6762 2018-09-10 10:12:00+10:00 31.11 14343 70.85 4908 2018-09-10 10:13:00+10:00 31.07 10096 70.79 2002 2018-09-10 10:14:00+10:00 31.04 11506 70.79 9170 2018-09-10 10:15:00+10:00 31.04 9718 70.79 9791 """ if len(data_kw) == 0: return pd.DataFrame() return pd.DataFrame(pd.concat([ data.assign(ticker=ticker).set_index('ticker', append=True) .unstack('ticker').swaplevel(0, 1, axis=1) for ticker, data in data_kw.items() ], axis=1)) def flatten(iterable, maps=None, unique=False): """ Flatten any array of items to list Args: iterable: any array or value maps: map items to values unique: drop duplicates Returns: list: flattened list References: https://stackoverflow.com/a/40857703/1332656 Examples: >>> flatten('abc') ['abc'] >>> flatten(1) [1] >>> flatten(1.) [1.0] >>> flatten(['ab', 'cd', ['xy', 'zz']]) ['ab', 'cd', 'xy', 'zz'] >>> flatten(['ab', ['xy', 'zz']], maps={'xy': '0x'}) ['ab', '0x', 'zz'] """ if iterable is None: return [] if maps is None: maps = dict() if isinstance(iterable, (str, int, float)): return [maps.get(iterable, iterable)] else: x = [maps.get(item, item) for item in _to_gen_(iterable)] return list(set(x)) if unique else x def _to_gen_(iterable): """ Recursively iterate lists and tuples """ from collections import Iterable for elm in iterable: if isinstance(elm, Iterable) and not isinstance(elm, (str, bytes)): yield from flatten(elm) else: yield elm def to_frame(data_list, exc_cols=None): """ Dict in Python 3.6 keeps insertion order, but cannot be relied upon This method is to keep column names in order In Python 3.7 this method is redundant Args: data_list: list of dict exc_cols: exclude columns Returns: pd.DataFrame Example: >>> d_list = [ >>> dict(id=1, symbol='1 HK', price=88.8), >>> dict(id=700, symbol='700 HK', price=350.) >>> ] >>> >>> to_frame(d_list).columns.tolist() ['id', 'symbol', 'price'] >>> to_frame(d_list, ['price']).columns.tolist() ['id', 'symbol'] """ from collections import OrderedDict return pd.DataFrame( pd.Series(data_list).apply(OrderedDict).tolist() ).drop(columns=[] if exc_cols is None else exc_cols) def spline_curve(x, y, step, val_min=0, val_max=None, kind='quadratic', **kwargs): """ Fit spline curve for given x, y values Args: x: x-values y: y-values step: step size for interpolation val_min: minimum value of result val_max: maximum value of result kind: for scipy.interpolate.interp1d Specifies the kind of interpolation as a string (‘linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’, where ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a spline interpolation of zeroth, first, second or third order; ‘previous’ and ‘next’ simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is ‘linear’. **kwargs: additional parameters for interp1d Returns: pd.Series: fitted curve Examples: >>> x = pd.Series([1, 2, 3]) >>> y = pd.Series([np.exp(1), np.exp(2), np.exp(3)]) >>> r = spline_curve(x=x, y=y, step=.5, val_min=3, val_max=18, fill_value='extrapolate') >>> r.round(2).index.tolist() [1.0, 1.5, 2.0, 2.5, 3.0] >>> r.round(2).tolist() [3.0, 4.05, 7.39, 12.73, 18.0] >>> y_df = pd.DataFrame(dict(a=[np.exp(1), np.exp(2), np.exp(3)], b=[2, 3, 4])) >>> r_df = spline_curve(x=x, y=y_df, step=.5, val_min=3, fill_value='extrapolate') >>> r_df.round(2) a b 1.0 3.00 3.0 1.5 4.05 3.0 2.0 7.39 3.0 2.5 12.73 3.5 3.0 20.09 4.0 """ from scipy.interpolate import interp1d from collections import OrderedDict if isinstance(y, pd.DataFrame): return pd.DataFrame(OrderedDict([(col, spline_curve( x, y.loc[:, col], step=step, val_min=val_min, val_max=val_max, kind=kind )) for col in y.columns])) fitted_curve = interp1d(x, y, kind=kind, **kwargs) new_x = np.arange(x.min(), x.max() + step / 2., step=step) return pd.Series( new_x, index=new_x, name=y.name if hasattr(y, 'name') else None ).apply(fitted_curve).clip(val_min, val_max) def func_scope(func): cur_mod = sys.modules[func.__module__] return f'{cur_mod.__name__}.{func.__name__}' def format_float(digit=0, is_pct=False): """ Number display format for pandas Args: digit: number of digits to keep if negative, add one space in front of positive pct is_pct: % display Returns: lambda function to format floats Examples: >>> format_float(0)(1e5) '100,000' >>> format_float(1)(1e5) '100,000.0' >>> format_float(-1, True)(.2) ' 20.0%' >>> format_float(-1, True)(-.2) '-20.0%' >>> pd.options.display.float_format = format_float(2) """ if is_pct: space = ' ' if digit < 0 else '' fmt = f'{{:{space}.{abs(int(digit))}%}}' return lambda vv: 'NaN' if np.isnan(vv) else fmt.format(vv) else: return lambda vv: 'NaN' if np.isnan(vv) else ( f'{{:,.{digit}f}}'.format(vv) if vv else '-' + ' ' * abs(digit) ) class FString(object): def __init__(self, str_fmt): self.str_fmt = str_fmt def __str__(self): kwargs = inspect.currentframe().f_back.f_globals.copy() kwargs.update(inspect.currentframe().f_back.f_locals) return self.str_fmt.format(**kwargs) def fstr(fmt, **kwargs): """ Delayed evaluation of f-strings Args: fmt: f-string but in terms of normal string, i.e., '{path}/{file}.parq' **kwargs: variables for f-strings, i.e., path, file = '/data', 'daily' Returns: FString object References: https://stackoverflow.com/a/42497694/1332656 https://stackoverflow.com/a/4014070/1332656 Examples: >>> fmt = '{file}.parq' >>> file = 'data' >>> fstr(fmt, file=file) 'data.parq' """ locals().update(kwargs) return f'{FString(str_fmt=fmt)}' def to_str(data, fmt='{key}={value}', sep=', ', public_only=True): """ Convert dict to string Args: data: dict fmt: how key and value being represented sep: how pairs of key and value are seperated public_only: if display public members only Returns: str: string representation of dict Examples: >>> test_dict = dict(b=1, a=0, c=2, _d=3) >>> to_str(test_dict) '{b=1, a=0, c=2}' >>> to_str(test_dict, sep='|') '{b=1|a=0|c=2}' >>> to_str(test_dict, public_only=False) '{b=1, a=0, c=2, _d=3}' """ assert isinstance(data, dict) if public_only: keys = list(filter(lambda vv: vv[0] != '_', data.keys())) else: keys = list(data.keys()) return '{' + sep.join([ to_str(data=v, fmt=fmt, sep=sep) if isinstance(v, dict) else fstr(fmt=fmt, key=k, value=v) for k, v in data.items() if k in keys ]) + '}' def inst_repr(instance, fmt='str', public_only=True): """ Generate class instance signature from its __dict__ From python 3.6 dict is ordered and order of attributes will be preserved automatically Args: instance: class instance fmt: ['json', 'str'] public_only: if display public members only Returns: str: string or json representation of instance """ if not hasattr(instance, '__dict__'): return '' if public_only: inst_dict = {k: v for k, v in instance.__dict__.items() if k[0] != '_'} else: inst_dict = instance.__dict__ if fmt == 'json': return json.dumps(inst_dict, indent=2) elif fmt == 'str': return to_str(inst_dict, public_only=public_only) return '' def load_module(full_path): """ Load module from full path Args: full_path: module full path name Returns: python module References: https://stackoverflow.com/a/67692/1332656 Examples: load_module('/path/to/file.py') """ from importlib import util file_name = full_path.replace('\\', '/').split('/')[-1] assert file_name[-3:] == '.py' module_name = file_name[:-3] spec = util.spec_from_file_location(name=module_name, location=full_path) module = util.module_from_spec(spec=spec) spec.loader.exec_module(module=module) return module class AttributeDict(dict): """ Dot access support for dict attributes References: https://stackoverflow.com/a/5021467/1332656 """ __getattr__ = dict.__getitem__ __setattr__ = dict.__setitem__ if __name__ == '__main__': """ CommandLine: python -m xone.utils all """ import xdoctest xdoctest.doctest_module(__file__) PKo U#:|8bI'0/dM__N*&!^ruP vd ̞PIF_ IwWE+9Wp3sBJɄv|o2<>H?kH ˶srO/&ȣHioJ C PV).ĞLo NrmWp/Ϻ"u]>1|yacd eSn*ܼFi[Nn]’hu@Ao]xO1o;K5e8uxDnCX}ˢd"iv׃o5445]Dz. o[hY۾DsW%c`@+WTWxn&N9稙Bd\q^W'NYf zb PKYuM`xone/__init__.pyPKXMA|ssxone/__main__.pyPKXMR xone/bbg.pyPK3M@? xxone/cache.pyPKXM?_1FFjxone/calendar.pyPKXM/ xone/files.pyPK%PL!f ,xone/logs.pyPKLL3)) 2xone/plots.pyPKXM b b Axone/procs.pyPKXuM˱N)>)> bKxone/utils.pyPK