PK3FHOA!A!wtk/__init__.py'''A Wasserstein Subsequence Kernel for Time Series''' __version__ = '0.2.1' import numpy as np import ot from .utilities import ensure_psd from sklearn.metrics import pairwise from sklearn.preprocessing import scale def transform_to_dist_matrix(time_series_train, time_series_test, k, normalized=False): ''' Computes the distance matrices for training and test. If time_series_train is of shape n x m, the resulting training matrix is of shape n x n. If time_series_test is of shape o x m, the resulting test matrix is of shape n x o. Args: time_series_train (np.ndarray): Training time series time_series_test (np.ndarray): Test time series k (int): Subsequence length normalized (bool): Whether to normalized subsequences or not Returns np.ndarray: Kernel matrix for training np.ndarray: Kernel matrix for testing ''' return pairwise_subsequence_kernel(time_series_train, time_series_test, k, wasserstein_kernel, normalized=normalized) def get_kernel_matrix(distance_matrix, psd=False, gamma=0.1, tol=1e-8): ''' Returns a kernel matrix from a given distance matrix by calculating np.exp(-gamma*distance_matrix) Args: distance_matrix (np.ndarray): Square distance matrix psd (bool): Whether to ensure positive definiteness gamma (float): Gamma for kernel matrix calculation tol (float): Tolerance when removing negative eigenvalues ''' M = np.exp(-gamma*distance_matrix) # Add psd-ensuring conditions return M if not psd else ensure_psd(M, tol=tol) def subsequences(time_series, k): time_series = np.asarray(time_series) n = time_series.size shape = (n - k + 1, k) strides = time_series.strides * 2 return np.lib.stride_tricks.as_strided( time_series, shape=shape, strides=strides ) def wasserstein_kernel(subsequences_1, subsequences_2, metric='euclidean'): ''' Calculates the distance between two time series using their corresponding set of subsequences. The metric used to align them may be optionally changed. ''' C = ot.dist(subsequences_1, subsequences_2, metric=metric) return ot.emd2([], [], C) def binarized_wasserstein_kernel(subsequences_1, subsequences_2, metric='euclidean'): ''' Calculates the distance between two time series using their corresponding set of subsequences. The metric used to align relies on a thresholded version of the euclidean (or other) distance ''' C = ot.dist(subsequences_1, subsequences_2, metric=metric) C = (C>np.percentile(C,5)).astype(int) return ot.emd2([], [], C) def linear_kernel(subsequences_1, subsequences_2): ''' Calculates the linear kernel between two time series using their corresponding set of subsequences. ''' K_lin = pairwise.linear_kernel(subsequences_1, subsequences_2) n = subsequences_1.shape[0] m = subsequences_2.shape[0] return np.sum(K_lin)/(n*m) def polynomial_kernel(subsequences_1, subsequences_2, p=2, c=1.0): ''' Calculates the linear kernel between two time series using their corresponding set of subsequences. ''' K_poly = pairwise.polynomial_kernel(subsequences_1, subsequences_2, degree=p, coef0=c, gamma=1.0) n = subsequences_1.shape[0] m = subsequences_2.shape[0] return np.sum(K_poly)/(n*m) def rbf_kernel(subsequences_1, subsequences_2): ''' Calculates the rbf kernel between two time series using their corresponding set of subsequences. ''' K_rbf = pairwise.rbf_kernel(subsequences_1, subsequences_2) n = subsequences_1.shape[0] m = subsequences_2.shape[0] return np.sum(K_rbf)/(n*m) def custom_rbf_kernel(subsequences_1, subsequences_2, e_dist, gamma): n = subsequences_1.shape[0] m = subsequences_2.shape[0] K_rbf = np.exp(-gamma*(e_dist**2)) return np.sum(K_rbf)/(n*m) def brownian_bridge_kernel(subsequences_1, subsequences_2, c): n = subsequences_1.shape[0] m = subsequences_2.shape[0] K_brown = c - np.abs(subsequences_1-subsequences_2) K_brown[K_brown<0] = 0 return np.sum(K_brown)/(n*m) def pairwise_subsequence_kernel( time_series_train, time_series_test, k, functor=wasserstein_kernel, par_grid = [1], normalized = False): ''' Applies a calculation functor to all pairs of a data set. As a result, two matrices will be calculated: 1. The square matrix between all pairs of training samples 2. The rectangular matrix between test samples (rows), and training samples (columns). These matrices can be fed directly to a classifier. Notice that this function will apply the kernel to *each* of the subsequences in both time series. ''' n = len(time_series_train) m = len(time_series_test) K_train = np.zeros((n, n)) # Need to initialize with zeros for symmetry K_test = np.empty((m, n)) # Since this is rectangular, no need for zeros if functor in (custom_rbf_kernel, brownian_bridge_kernel): K_par_train = [] K_par_test = [] for i in range(len(par_grid)): K_par_train.append(np.zeros((n, n))) K_par_test.append(np.empty((m, n))) # Create subsequences of the time series. These cannot be easily # shared with other calls of the method. subsequences_train = dict() subsequences_test = dict() for i, ts_i in enumerate(time_series_train): subsequences_train[i] = subsequences(ts_i, k) for i, ts_i in enumerate(time_series_test): subsequences_test[i] = subsequences(ts_i, k) # Evaluate the functor for *all* relevant pairs, while filling up # the initially empty kernel matrices. desc = 'Pairwise kernel computations' # for i, ts_i in enumerate(tqdm.tqdm(time_series_train, desc=desc)): for i, ts_i in enumerate(time_series_train): for j, ts_j in enumerate(time_series_train[i:]): s_i = subsequences_train[i] # first shapelet s_j = subsequences_train[i + j] # second shapelet if normalized: s_i = scale(s_i, axis=1) s_j = scale(s_j, axis=1) if functor in (custom_rbf_kernel, brownian_bridge_kernel): # euclidean distance if functor == custom_rbf_kernel: e_dist = pairwise.euclidean_distances(s_i, s_j) for idx, par in enumerate(par_grid): if functor == custom_rbf_kernel: K_par_train[idx][i, i + j] = functor(s_i, s_j, e_dist, par) else: K_par_train[idx][i, i + j] = functor(s_i, s_j, par) else: K_train[i, i + j] = functor(s_i, s_j) for j, ts_j in enumerate(time_series_test): s_i = subsequences_train[i] # first shapelet # Second shapelet; notice that there is no index shift in # comparison to the code above. s_j = subsequences_test[j] if normalized: s_i = scale(s_i, axis=1) s_j = scale(s_j, axis=1) if functor in (custom_rbf_kernel, brownian_bridge_kernel): if functor == custom_rbf_kernel: # euclidean distance e_dist = pairwise.euclidean_distances(s_i, s_j) for idx, par in enumerate(par_grid): if functor == custom_rbf_kernel: K_par_test[idx][j, i] = functor(s_i, s_j, e_dist, par) else: K_par_test[idx][j, i] = functor(s_i, s_j, par) else: # Fill the test matrix; since it has different dimensions # than the training matrix, the indices are swapped here. K_test[j, i] = functor(s_i, s_j) # Makes the matrix symmetric since we only fill the upper diagonal # in the code above. # Makes the matrix symmetric since we only fill the upper diagonal # in the code above. if functor in (custom_rbf_kernel, brownian_bridge_kernel): for k_idx in range(len(par_grid)): K_train_cur = K_par_train[k_idx] K_par_train[k_idx] = K_train_cur + K_train_cur.T K_train = K_par_train K_test = K_par_test else: K_train = K_train + K_train.T return K_train, K_test PKE]>ODD wtk/main.py#!/usr/bin/env python3 import argparse import logging import numpy as np from kernels import pairwise_subsequence_kernel from kernels import wasserstein_kernel from utilities import read_ucr_data, custom_grid_search_cv, ensure_psd from sklearn.svm import SVC from sklearn.metrics import accuracy_score from sklearn.model_selection import GridSearchCV ENSURE_PSD = True if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('TRAIN', type=str) parser.add_argument('TEST', type=str) parser.add_argument('--k', '-k', type=int, default=10, help='Subsequence (shapelet) length' ) args = parser.parse_args() X_train, y_train, name = read_ucr_data(args.TRAIN) X_test, y_test, _ = read_ucr_data(args.TEST) logging.basicConfig() logger = logging.getLogger(name) logger.setLevel(level=logging.INFO) logger.info('Starting analysis') K_train, K_test = pairwise_subsequence_kernel( X_train, X_test, args.k, wasserstein_kernel ) logger.info('Wasserstein distance computed, starting SVM grid search...') param_grid = { 'C': np.logspace(-3, 5, num=9), } gammas = np.logspace(-4,1,num=6) kernel_matrices_train = [] kernel_matrices_test = [] kernel_params = [] for g in gammas: M_train = np.exp(-g*K_train) M_test = np.exp(-g*K_test) # Add psd-ensuring conditions if ENSURE_PSD: M_train = ensure_psd(M_train) kernel_matrices_train.append(M_train) kernel_matrices_test.append(M_test) kernel_params.append(g) svm = SVC(kernel='precomputed') # Gridsearch gs, best_params = custom_grid_search_cv(svm, param_grid, kernel_matrices_train, y_train, cv=5) # Store best params gamma = kernel_params[best_params['K_idx']] C = best_params['params']['C'] print(f"Best C: {C}") y_pred = gs.predict(kernel_matrices_test[best_params['K_idx']]) accuracy = accuracy_score(y_test, y_pred) logger.info('Accuracy = {:2.2f}'.format(accuracy * 100)) PKE]>Og}11wtk/utilities.py''' Contains various utility functions for data handling, pre-processing, and so on. ''' import os import numpy as np import logging from glob import glob from pathlib import Path import shutil import numpy as np from os.path import basename, join from numpy.linalg import eigh from sklearn.svm import SVC from sklearn.metrics import accuracy_score from sklearn.model_selection import StratifiedKFold from sklearn.model_selection import ParameterGrid from sklearn.model_selection._validation import _fit_and_score from sklearn.base import clone, ClassifierMixin, BaseEstimator from sklearn.metrics import make_scorer, accuracy_score def strip_suffix(s, suffix): ''' Removes a suffix from a string if the string contains it. Else, the string will not be modified and no error will be raised. ''' if not s.endswith(suffix): return s return s[:len(s)-len(suffix)] def get_ucr_dataset(data_dir: str, dataset_name: str): ''' Loads train and test data from a folder in which the UCR data sets are stored. ''' X_train, y_train, _ = read_ucr_data(os.path.join(data_dir, dataset_name, f'{dataset_name}_TRAIN')) X_test, y_test, _ = read_ucr_data(os.path.join(data_dir, dataset_name, f'{dataset_name}_TEST')) return X_train, y_train, X_test, y_test def read_ucr_data(filename): ''' Loads an UCR data set from a file, returning the samples and the respective labels. Also extracts the data set name such that one may easily display results. ''' data = np.loadtxt(filename, delimiter=',') Y = data[:, 0] X = data[:, 1:] # Remove all potential suffixes to obtain the data set name. This is # somewhat inefficient, but we only have to do it once. name = os.path.basename(filename) name = strip_suffix(name, '_TRAIN') name = strip_suffix(name, '_TEST') return X, Y, name def custom_grid_search_cv(model, param_grid, precomputed_kernels, y, cv=5): # Custom model for an array of precomputed kernels # 1. Stratified K-fold cv = StratifiedKFold(n_splits=cv, shuffle=False) results = [] for train_index, test_index in cv.split(precomputed_kernels[0], y): split_results = [] params = [] # list of dict, its the same for every split # run over the kernels first for K_idx, K in enumerate(precomputed_kernels): # Run over parameters for p in list(ParameterGrid(param_grid)): sc = _fit_and_score(clone(model), K, y, scorer=make_scorer(accuracy_score), train=train_index, test=test_index, verbose=0, parameters=p, fit_params=None) split_results.append(sc) params.append({'K_idx': K_idx, 'params': p}) results.append(split_results) # Collect results and average results = np.array(results) fin_results = results.mean(axis=0) # select the best results best_idx = np.argmax(fin_results) # Return the fitted model and the best_parameters ret_model = clone(model).set_params(**params[best_idx]['params']) return ret_model.fit(precomputed_kernels[params[best_idx]['K_idx']], y), params[best_idx] from numpy.linalg import eigh def ensure_psd(K, tol=1e-8): # Helper function to remove negative eigenvalues w,v = eigh(K) if (w<-tol).sum() >= 1: neg = np.argwhere(w<-tol) w[neg] = 0 Xp = v.dot(np.diag(w)).dot(v.T) return Xp else: return K def krein_svm_grid_search(K_train: np.ndarray, K_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray, param_grid: dict={'C': np.logspace(-3, 5, num=9)}, gammas: np.ndarray=np.logspace(-4,1,num=6)): logger = logging.getLogger() logger.info('Starting analysis') kernel_matrices_train = [] kernel_matrices_test = [] kernel_params = [] for g in gammas: M_train = np.exp(-g*K_train) M_test = np.exp(-g*K_test) # Add psd-ensuring conditions M_train = ensure_psd(M_train) kernel_matrices_train.append(M_train) kernel_matrices_test.append(M_test) kernel_params.append(g) svm = SVC(kernel='precomputed') # Gridsearch gs, best_params = custom_grid_search_cv(svm, param_grid, kernel_matrices_train, y_train, cv=5) # Store best params gamma = kernel_params[best_params['K_idx']] C = best_params['params']['C'] print(f"Best C: {C}") print(f"Best gamma: {gamma}") y_pred = gs.predict(kernel_matrices_test[best_params['K_idx']]) accuracy = accuracy_score(y_test, y_pred) logger.info('Accuracy = {:2.2f}'.format(accuracy * 100)) return gs class KreinSVC(BaseEstimator, ClassifierMixin): ''' An SVC which ensures positive definiteness before training. It also computes the kernel matrix from the given distance matrix: K = np.exp(-self.psd_gamma*D_matrix) ''' def __init__(self, C=1.0, kernel='precomputed', degree=3, gamma='auto_deprecated', coef0=0.0, shrinking=True, probability=False, tol=1e-3, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', random_state=None, psd_tol=1e-8, psd_gamma=1.0): self.C = C self.kernel = kernel self.shrinking = shrinking self.probability = probability self.verbose = verbose self.cache_size = cache_size self.class_weight = class_weight self.max_iter = max_iter self.decision_function_shape = decision_function_shape self.random_state = random_state self.psd_tol = psd_tol self.psd_gamma = psd_gamma self.svc = SVC(C=C, kernel=kernel, degree=degree, gamma=gamma, coef0=coef0, shrinking=shrinking, probability=probability, tol=tol, cache_size=cache_size, class_weight=class_weight, verbose=verbose, max_iter=max_iter, decision_function_shape=decision_function_shape, random_state=random_state) def get_params(): return { 'C': self.C, 'kernel': self.kernel, 'shrinking': self.shrinking, 'probability': self.probability, 'verbose': self.verbose, 'cache_size': self.cache_size, 'class_weight': self.class_weight, 'max_iter': self.max_iter, 'decision_function_shape': self.decision_function_shape, 'random_state': self.random_state, 'psd_tol': self.psd_tol, 'psd_gamma': self.psd_gamma } def set_params(self, **parameters): for parameter, value in parameters.items(): setattr(self, parameter, value) return self def fit(self, X, y, sample_weight=None): ''' Args: X (np.ndarray): Distance_matrix ''' M_train = np.exp(-self.psd_gamma*X) K = ensure_psd(M_train, tol=self.psd_tol) self.svc.fit(K, y, sample_weight) def predict(self, X): M_test = np.exp(-self.psd_gamma*X) return self.svc.predict(M_test) def predict_proba(self, X): return self.svc.predict_proba(X) PK!HPOwtk-0.2.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!HqcA wtk-0.2.1.dist-info/METADATAVn7}W i4H/hb&r($.%%${I=ȹ93+d.Qkkk^\R ib{.ዻ,ktL{\ թWd.3u4֥;-Us]P`0aQIfutya|!.uõ8º!.AKC'6[RLR\"DſIP;*oL* 5A7<vە3:8'ٹe͜.@`.3$Lfdj}: %BrH~ e8xUu]tn|E!L1^(RC&J6=K!%Alɰ\=SgԌλTBM>~=Ԣjm s7MDIA̒$yhj| &p^K m` z&h[2K~Zd+,k# Mqk5=Zm@H-IKUHer4LS9aI+=eȌ-r9}I:E# %8UPU6eHWIp|BHOٚYkSfYd  YUqdJK M0x`R]5K2WaRgnC {&[%s?%_'15 xQ&ɀ02pr/=:h! i]wb:5!2B7@2s,޿g6!@Y[_H06Y17}Q MD0b7 ܟ3qm[*l`pDWl\%;es "7taFL-6:}z;qⱦ+;*]z4GX]q6) Zьw@m!}coapBLa@4 OEXau^i` YȄ|RFvguvq&c!lx**"&gKiTcS7|*l+ex%-Yt3`W`3y|34XG047-%#|hGKT W#7ݢki8O`.ř+cGc`oPㄗx%OKzvvЉ7x;VU4(,G<f .$.~{!`*$Pݡu?N߬D|ϥ 8է+4sdZ;z'vՏ[Jy5BUPK!Hd (wtk-0.2.1.dist-info/RECORDmKr0нg4|j( @Ēr8 yV6%ބHe RΩ]O49WO>n&w#rJƭ[O9)ʅk@]1͓)d q[œ:?> 2oBopFk{2Lvm]f.Rew6u #{_> $N$YRN;&{u1?įB'f/ToaX:mV}fyB GpdLxlƇ+ZٖncٳT#Ifű ȺDSAPK3FHOA!A!wtk/__init__.pyPKE]>ODD n!wtk/main.pyPKE]>Og}11)wtk/utilities.pyPK!HPO:Fwtk-0.2.1.dist-info/WHEELPK!HqcA Fwtk-0.2.1.dist-info/METADATAPK!Hd (