PKcOO&bamboos/__init__.py"""Wrapper Functions for pandas, numpy, and scikit learn""" __version__ = "0.2.0" __author__ = "adityasidharta" from bamboos.color import color from bamboos.date import date_single, date_double from bamboos.encode import ( fit_binary, fit_categorical, fit_label, fit_onehot, transform_binary, transform_categorical, transform_label, transform_onehot, ) PKzNbamboos/analysis.pyPKvNbamboos/benchmark.pyPKrNLbamboos/color.pycolor_wheel = [ "#e6194B", "#3cb44b", "#ffe119", "#4363d8", "#f58231", "#911eb4", "#42d4f4", "#f032e6", "#bfef45", "#fabebe", "#469990", "#e6beff", "#9A6324", "#fffac8", "#800000", "#aaffc3", "#808000", "#ffd8b1", "#000075", "#a9a9a9", "#ffffff", "#000000", ] def color(idx): return color_wheel[idx % (len(color_wheel) - 1)] PKeN@((bamboos/date.pyimport calendar import datetime as dt from typing import Any import numpy as np import pandas as pd from bamboos.utils.dates import ( DAYS_IN_MONTH, DAYS_IN_YEAR, MONTH_IN_YEAR, SECOND_IN_MINUTE, SECOND_IN_HOUR, MINUTE_IN_HOUR, HOUR_IN_DAY, BUSINESS_OPEN, BUSINESS_CLOSE, MIDNIGHT_START, MORNING_START, AFTERNOON_START, NIGHT_START, NIGHT_END, SATURDAY, SUNDAY, ) def _get_max_day(row: pd.Series, col_name: str): """ for each row in the pandas DataFrame, give the number of days in that month, given the year and month """ if (pd.isnull(row[col_name + "_year"])) or (pd.isnull(row[col_name + "_year"])): return np.nan return calendar.monthrange(int(row[col_name + "_year"]), int(row[col_name + "_month"]))[1] def _get_cyclical_sin(df: pd.Series, col_name: str, col_type: str, col_max: Any): """ Perform cyclical encoding for the following col_type (month, days, hours, etc) by computing the cosine and sine """ return np.sin(2. * np.pi * df["{}_{}".format(col_name, col_type)] / col_max) def _get_cyclical_cos(df: pd.Series, col_name: str, col_type: str, col_max: Any): """ Perform cyclical encoding for the following col_type (month, days, hours, etc) by computing the cosine and sine """ return np.cos(2. * np.pi * df["{}_{}".format(col_name, col_type)] / col_max) def date_single(input_df: pd.DataFrame, col_name: str, cur_time: dt.datetime = dt.datetime.now()): """ Perform Feature Engineering on a single datetime column. """ df = input_df[[col_name]].copy() df[col_name] = pd.to_datetime(df[col_name]) df[col_name + "_age"] = cur_time.year - df[col_name].dt.year df[col_name + "_year"] = df[col_name].dt.year df[col_name + "_month"] = df[col_name].dt.month df[col_name + "_day"] = df[col_name].dt.day df[col_name + "_hour"] = df[col_name].dt.hour df[col_name + "_minute"] = df[col_name].dt.minute df[col_name + "_second"] = df[col_name].dt.second df[col_name + "_day_of_week"] = df[col_name].dt.dayofweek df[col_name + "_day_of_year"] = df[col_name].dt.dayofyear df[col_name + "_week_of_year"] = df[col_name].dt.weekofyear df[col_name + "_is_weekend"] = (df[col_name + "_day_of_week"] == SATURDAY) | ( df[col_name + "_day_of_week"] == SUNDAY ) df[col_name + "_year_elapsed"] = (cur_time - df[col_name]).dt.days / DAYS_IN_YEAR df[col_name + "_month_elapsed"] = (cur_time - df[col_name]).dt.days / DAYS_IN_MONTH df[col_name + "_day_elapsed"] = (cur_time - df[col_name]).dt.days df[col_name + "_month_sin"] = _get_cyclical_sin(df, col_name, "month", MONTH_IN_YEAR) df[col_name + "_month_cos"] = _get_cyclical_cos(df, col_name, "month", MONTH_IN_YEAR) df[col_name + "_day_sin"] = _get_cyclical_sin(df, col_name, "day", df[col_name + "_max_day"]) df[col_name + "_day_cos"] = _get_cyclical_cos(df, col_name, "day", df[col_name + "_max_day"]) df[col_name + "_hour_sin"] = _get_cyclical_sin(df, col_name, "hour", HOUR_IN_DAY) df[col_name + "_hour_cos"] = _get_cyclical_cos(df, col_name, "hour", HOUR_IN_DAY) df[col_name + "_minute_sin"] = _get_cyclical_sin(df, col_name, "minute", MINUTE_IN_HOUR) df[col_name + "_minute_cos"] = _get_cyclical_cos(df, col_name, "minute", MINUTE_IN_HOUR) df[col_name + "_second_sin"] = _get_cyclical_sin(df, col_name, "second", SECOND_IN_MINUTE) df[col_name + "_second_cos"] = _get_cyclical_cos(df, col_name, "second", SECOND_IN_MINUTE) df[col_name + "_is_year_start"] = df[col_name].dt.is_year_start df[col_name + "_is_year_end"] = df[col_name].dt.is_year_end df[col_name + "_is_quarter_start"] = df[col_name].dt.is_quarter_start df[col_name + "_is_quarter_end"] = df[col_name].dt.is_quarter_end df[col_name + "_is_month_start"] = df[col_name].dt.is_month_start df[col_name + "_is_month_end"] = df[col_name].dt.is_month_end df[col_name + "_is_business_hour"] = (df[col_name + "_hour"] > BUSINESS_OPEN) & ( df[col_name + "_hour"] < BUSINESS_CLOSE ) df[col_name + "_period"] = pd.cut( df[col_name + "_hour"], bins=[MIDNIGHT_START, MORNING_START, AFTERNOON_START, NIGHT_START, NIGHT_END], labels=["dawn", "morning", "afternoon", "night"], ) return df.remove(columns=col_name) def date_double(input_df: pd.DataFrame, begin_col: str, end_col: str): """ Perform Feature Engineering on DataFrame with two connected Datetime columns. One specifying the start date of an event, and the other one specifying the end date of the event. """ df = input_df[[begin_col, end_col]].copy() df[begin_col] = pd.to_datetime(df[begin_col]) df[end_col] = pd.to_datetime(df[end_col]) df["{}_{}_year".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.days / DAYS_IN_YEAR df["{}_{}_month".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.days / DAYS_IN_MONTH df["{}_{}_days".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.days df["{}_{}_hour".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.seconds / SECOND_IN_HOUR df["{}_{}_minute".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.seconds / SECOND_IN_MINUTE df["{}_{}_second".format(begin_col, end_col)] = (df[end_col] - df[begin_col]).dt.seconds return df.drop(columns=[begin_col, end_col]) PKzNK,nbamboos/encode.pyfrom typing import Any, Dict, List, Tuple import numpy as np import pandas as pd from category_encoders import OrdinalEncoder, OneHotEncoder, BinaryEncoder from bamboos.utils.dataframe import insert_df def fit_label(input_df, cols, na_value=None): df = input_df.copy() if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) encoder = OrdinalEncoder(cols) encoder = encoder.fit(df) for idx in range(len(encoder.mapping)): encoder.mapping[idx]["mapping"].loc[np.nan] = -2 result_df = encoder.transform(df) for col in cols: result_df[col] = result_df[col].replace({-1: 0, -2: 0}) result_df[col] = result_df[col].astype(int) model = {"encoder": encoder, "cols": cols, "na_value": na_value} return result_df, model def transform_label(input_df, model): df = input_df.copy() encoder = model["encoder"] cols = model["cols"] na_value = model["na_value"] if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) result_df = encoder.transform(df) for col in cols: result_df[col] = result_df[col].replace({-1: 0, -2: 0}) result_df[col] = result_df[col].astype(int) return result_df def fit_onehot(input_df, cols, na_value=None): df = input_df.copy() if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) drop_cols = ["{}_nan".format(col) for col in cols] encoder = OneHotEncoder(cols=cols, use_cat_names=True) encoder = encoder.fit(df) result_df = encoder.transform(df) for drop_col in drop_cols: if drop_col in result_df.columns: result_df = result_df.drop(columns=[drop_col]) model = {"encoder": encoder, "cols": cols, "na_value": na_value, "drop_cols": drop_cols} return result_df, model def transform_onehot(input_df, model): df = input_df.copy() encoder = model["encoder"] cols = model["cols"] na_value = model["na_value"] drop_cols = model["drop_cols"] if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) result_df = encoder.transform(df) for drop_col in drop_cols: if drop_col in result_df.columns: result_df = result_df.drop(columns=[drop_col]) return result_df def fit_binary(input_df, cols, na_value=None): df = input_df.copy() if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) encoder = BinaryEncoder(cols=cols, drop_invariant=True) encoder = encoder.fit(df) for idx in range(len(encoder.base_n_encoder.ordinal_encoder.mapping)): encoder.base_n_encoder.ordinal_encoder.mapping[idx]["mapping"].loc[np.nan] = -2 result_df = encoder.transform(df) model = {"encoder": encoder, "cols": cols, "na_value": na_value} return result_df, model def transform_binary(input_df, model): df = input_df.copy() encoder = model["encoder"] cols = model["cols"] na_value = model["na_value"] if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) result_df = encoder.transform(df) return result_df def fit_categorical(input_df, cols, na_value=None, max_onehot=10, max_binary=1000): df = input_df.copy() if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) onehot_cols = [] label_cols = [] binary_cols = [] for col in cols: col_values = df[col].values cardinality = len(np.unique(col_values[~pd.isnull(col_values)])) if cardinality < max_onehot: onehot_cols.append(col) elif cardinality < max_binary: label_cols.append(col) else: binary_cols.append(col) df, onehot_model = fit_onehot(df, onehot_cols, na_value) df, label_model = fit_label(df, label_cols, na_value) result_df, binary_model = fit_binary(df, binary_cols, na_value) model = { "onehot_model": onehot_model, "label_model": label_model, "binary_model": binary_model, "onehot_cols": onehot_cols, "label_cols": label_cols, "binary_cols": binary_cols, "cols": cols, "na_value": na_value, "max_onehot": max_onehot, "max_binary": max_binary, } return result_df, model def transform_categorical(input_df, model): df = input_df.copy() onehot_model = model["onehot_model"] label_model = model["label_model"] binary_model = model["binary_model"] cols = model["cols"] na_value = model["na_value"] if na_value is not None: for col in cols: df[col] = df[col].replace({na_value: np.nan}) df = transform_onehot(df, onehot_model) df = transform_label(df, label_model) result_df = transform_binary(df, binary_model) return result_df def fit_stats( input_df: pd.DataFrame, stat_cols: List[str], target_cols: List[str], metric_cols: Any = None ) -> Tuple[pd.DataFrame, Dict[str, Dict[str, pd.DataFrame]]]: for col_name in ["stat_cols", "target_cols", "metric_cols", "default"]: assert col_name not in stat_cols, "Please don't use {} as a Stats column name".format(col_name) assert col_name not in target_cols, "Please don't use {} as a Target column name".format(col_name) df = input_df.copy() df = df[stat_cols + target_cols] if not metric_cols: metric_cols = ["mean", "median", "std", "min", "max"] if isinstance(metric_cols, dict): assert set(stat_cols) == set(metric_cols) stats_encoder = dict() # type: dict for stat_col in stat_cols: stats_encoder[stat_col] = dict() if isinstance(metric_cols, dict): stat_metric_cols = metric_cols[stat_col] else: stat_metric_cols = metric_cols for target_col in target_cols: agg_df = df.groupby(stat_col)[target_col].agg(stat_metric_cols).reset_index() default_df = pd.DataFrame([df[target_col].agg(stat_metric_cols)]).reset_index(drop=True) default_df[stat_col] = "default" default_df = default_df[[stat_col] + stat_metric_cols] full_agg_df = agg_df.append(default_df) stat_colname = ["{}_{}_{}".format(stat_col, target_col, metrics_col) for metrics_col in stat_metric_cols] full_agg_df.columns = [stat_col] + stat_colname stats_encoder[stat_col][target_col] = full_agg_df stats_encoder["stat_cols"] = stat_cols stats_encoder["target_cols"] = target_cols stats_encoder["metric_cols"] = metric_cols result_df = transform_stats(input_df, stats_encoder) return result_df, stats_encoder def transform_stats(input_df: pd.DataFrame, stats_encoder_dict: Dict[str, Any]) -> pd.DataFrame: stat_cols = stats_encoder_dict["stat_cols"] target_cols = stats_encoder_dict["target_cols"] metric_cols = stats_encoder_dict["metric_cols"] result_df = input_df.copy() # pylint: disable=cell-var-from-loop for stat_col in stat_cols: if isinstance(metric_cols, dict): stat_metric_cols = metric_cols[stat_col] else: stat_metric_cols = metric_cols for target_col in target_cols[::-1]: stat_col_idx = result_df.columns.get_loc(stat_col) + 1 stat_colname = ["{}_{}_{}".format(stat_col, target_col, metrics_col) for metrics_col in stat_metric_cols] small_df = result_df[[stat_col]].copy() small_df[stat_col] = small_df[stat_col].apply( lambda x: "default" if x not in stats_encoder_dict[stat_col][target_col][stat_col].tolist() else x ) agg_df = small_df.merge(stats_encoder_dict[stat_col][target_col], how="left", on=stat_col, validate="m:1")[ stat_colname ] result_df = insert_df(result_df, agg_df, stat_col_idx) return result_df PKvNbamboos/ensemble.pyPKvNbamboos/explain.pyPKeN{{' bamboos/explore.pyimport matplotlib.pyplot as plt import numpy as np import seaborn as sns from matplotlib_venn import venn2 def get_venn(array_a, array_b, name_a, name_b): set_a = set(array_a) set_b = set(array_b) n_a = len(set_a) n_b = len(set_b) n_a_b = len(set_a.intersection(set_b)) n_a_notb = n_a - n_a_b n_b_nota = n_b - n_a_b sns.set() plt.figure(figsize=(15, 5)) venn2(subsets=(n_a_notb, n_b_nota, n_a_b), set_labels=(name_a, name_b)) plt.show() def get_quantile(array, intervals=0.01): for interval in np.arange(intervals, 1 + intervals, intervals): print("Quantile {:.3f} : {}".format(interval, array.quantile(interval))) PKvN bamboos/io.pyPKeN=4eRbamboos/logger.pyimport logging import os from bamboos.utils.dates import get_datetime class Logger: def __init__(self): self.name = None self.datetime = None self.level = None self.logger = None self.ch = None self.fh = None self.is_setup = False def setup_logger(self, name, logger_path, level=logging.INFO): self.name = name self.datetime = get_datetime() self.level = level self.logger = logging.getLogger(name) self.logger.setLevel(level) self.ch = self.get_stream_handler() self.fh = self.get_file_handler(os.path.join(logger_path, "{}_{}.log".format(self.name, self.datetime))) self.logger.addHandler(self.ch) self.logger.addHandler(self.fh) self.is_setup = True def get_file_handler(self, path): fh = logging.FileHandler(path) formatter = logging.Formatter("%(asctime)s - %(message)s") fh.setLevel(self.level) fh.setFormatter(formatter) return fh def get_stream_handler(self): ch = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(message)s") ch.setLevel(self.level) ch.setFormatter(formatter) return ch def info(self, msg): assert self.is_setup, "Please Setup the Logger First" return self.logger.info(msg) def debug(self, msg): assert self.is_setup, "Please Setup the Logger First" return self.logger.debug(msg) def error(self, msg): assert self.is_setup, "Please Setup the Logger First" return self.logger.error(msg) def warning(self, msg): assert self.is_setup, "Please Setup the Logger First" return self.logger.warning(msg) logger = Logger() PKLyNbamboos/metric.pyPKNbamboos/prepare.pyPKvNbamboos/select.pyPKvNbamboos/tune.pyPKvNbamboos/visualize.pyPKvNbamboos/utils/__init__.pyPKNvXRRbamboos/utils/dataframe.pyimport pandas as pd def locate_col(df: pd.DataFrame, col_name: str): """ Return index after col_name within the dataframe df Args: df: DataFrame of interest col_name: Name of columns within the DataFrame Returns: (int): index of the column within the dataframe """ idx = df.columns.get_loc(col_name) + 1 return idx def insert_df(input_outer_df: pd.DataFrame, input_inner_df: pd.DataFrame, loc: int): """ Insert `input_inner_df` into `input_outer_df_df` at specified index, `loc`. Args: input_outer_df (pd.DataFrame): DataFrame which will be inserted by another DataFrame input_inner_df (pd.DataFrame): DataFrame to be inserted loc (int): location index of insertion Returns: (pd.DataFrame), `outer_df` with `inner_df` inserted in between according to the specified index """ assert isinstance(input_outer_df, pd.DataFrame) assert isinstance(input_inner_df, pd.DataFrame) outer_df = input_outer_df.copy() inner_df = input_inner_df.copy() outer_df = outer_df.reset_index(drop=True) inner_df = inner_df.reset_index(drop=True) if len(outer_df) != len(inner_df): raise ValueError("len is not the same") return pd.concat([outer_df.iloc[:, :loc], inner_df, outer_df.iloc[:, loc:]], axis=1, join_axes=[outer_df.index]) PKNibamboos/utils/dates.pyimport datetime DAYS_IN_MONTH = 30.41 DAYS_IN_YEAR = 365.25 MONTH_IN_YEAR = 12. SECOND_IN_MINUTE = 60. SECOND_IN_HOUR = 3600. MINUTE_IN_HOUR = 60. HOUR_IN_DAY = 24. BUSINESS_OPEN = 9. BUSINESS_CLOSE = 17. MIDNIGHT_START = -1 MORNING_START = 6 AFTERNOON_START = 12 NIGHT_START = 18 NIGHT_END = 24 SATURDAY = 5 SUNDAY = 6 def get_datetime(): return datetime.datetime.now().strftime("%Y%m%d-%H%M%S") PKNbamboos/utils/format.pyPK}VNbamboos/utils/model/__init__.pyPK!~N!bamboos/utils/model/base_model.pyfrom typing import Any import numpy as np from sklearn.preprocessing import OneHotEncoder class Model: def __init__(self, name: str, model: Any, pred_type: str, threshold: float) -> None: self.name = name self.model = model self.num_class = None self.pred_type = pred_type self.threshold = threshold def fit(self, X_train, y_train): raise NotImplementedError() def predict(self, X_test): raise NotImplementedError() def evaluate(self, X_val, y_val, metric, **kwargs): y_pred = self.predict(X_val) metric_value = metric(y_val, y_pred, **kwargs) return metric_value def predict_proba(self, X_test): raise NotImplementedError() def evaluate_proba(self, X_val, y_val, metric, **kwargs): y_score = self.predict_proba(X_val) if y_score is None: return np.nan if self.pred_type == "binary": result = metric(y_val, y_score, **kwargs) else: assert self.pred_type == "multiclass" if isinstance(y_val, np.ndarray): y_val_ohe = OneHotEncoder(categories=[range(self.num_class)], sparse=False).fit_transform( y_val.reshape(-1, 1) ) else: y_val_ohe = OneHotEncoder(categories=[range(self.num_class)], sparse=False).fit_transform( y_val.values.reshape(-1, 1) ) result = metric(y_val_ohe, y_score, **kwargs) return result PK!~N%bamboos/utils/model/ligthgbm_model.pyimport lightgbm as lgb import numpy as np from bamboos.utils.model.base_model import Model class LGBModel(Model): def __init__(self, name: str, pred_type: str, threshold: float = 0.5, **kwargs) -> None: super().__init__(name, None, pred_type, threshold) self.kwargs = kwargs if self.pred_type == "multiclass": assert "num_class" in self.kwargs.keys() self.num_class = self.kwargs["num_class"] def fit(self, X_train, y_train): lgb_train = lgb.Dataset(X_train, y_train, free_raw_data=False) if self.pred_type == "binary": params = {"task": "train", "objective": "binary", "verbosity": -1} elif self.pred_type == "multiclass": params = {"task": "train", "objective": "multiclass", "verbosity": -1} elif self.pred_type == "regression": params = {"task": "train", "objective": "regression", "verbosity": -1} else: raise ValueError("pred_type should be one of the following: ['binary', 'multiclass', 'regression']") for key, value in self.kwargs.items(): if key != "num_boost_round": params[key] = value if "num_boost_round" in self.kwargs.keys(): self.model = lgb.train(params, lgb_train, self.kwargs.get("num_boost_round")) else: self.model = lgb.train(params, lgb_train) def predict(self, X_test): if self.pred_type == "binary": prob = self.model.predict(X_test) pred = np.where(prob >= self.threshold, 1, 0) elif self.pred_type == "multiclass": pred = np.argmax(self.model.predict(X_test), axis=1) elif self.pred_type == "regression": pred = self.model.predict(X_test) return pred def predict_proba(self, X_test): if self.pred_type in ["binary", "multiclass"]: result = self.model.predict(X_test) else: raise ValueError("pred_type should be on of the following: ['binary', 'multiclass']") return result PK!~NH{ bamboos/utils/model/model_zoo.pyfrom typing import Any from sklearn.ensemble import ( AdaBoostClassifier, AdaBoostRegressor, BaggingClassifier, BaggingRegressor, ExtraTreesClassifier, ExtraTreesRegressor, GradientBoostingClassifier, GradientBoostingRegressor, RandomForestClassifier, RandomForestRegressor, ) from sklearn.linear_model import ElasticNet, Lasso, LinearRegression, LogisticRegression, Ridge, RidgeClassifier from sklearn.naive_bayes import GaussianNB from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.neural_network import MLPClassifier, MLPRegressor from sklearn.svm import LinearSVC, LinearSVR, SVC, SVR from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from bamboos.utils.model.ligthgbm_model import LGBModel from bamboos.utils.model.sklearn_model import SkLearnModel from bamboos.utils.model.xgboost_model import XGBoostModel estimator_dict = { "LinearRegression": LinearRegression, "Ridge": Ridge, "Lasso": Lasso, "ElasticNet": ElasticNet, "SVR": SVR, "LinearSVR": LinearSVR, "KNeighborsRegressor": KNeighborsRegressor, "DecisionTreeRegressor": DecisionTreeRegressor, "AdaBoostRegressor": AdaBoostRegressor, "BaggingRegressor": BaggingRegressor, "ExtraTreesRegressor": ExtraTreesRegressor, "GradientBoostingRegressor": GradientBoostingRegressor, "RandomForestRegressor": RandomForestRegressor, "MLPRegressor": MLPRegressor, "LogisticRegression": LogisticRegression, "RidgeClassifier": RidgeClassifier, "SVC": SVC, "LinearSVC": LinearSVC, "GaussianNB": GaussianNB, "KNeighborsClassifier": KNeighborsClassifier, "DecisionTreeClassifier": DecisionTreeClassifier, "AdaBoostClassifier": AdaBoostClassifier, "BaggingClassifier": BaggingClassifier, "ExtraTreesClassifier": ExtraTreesClassifier, "GradientBoostingClassifier": GradientBoostingClassifier, "RandomForestClassifier": RandomForestClassifier, "MLPClassifier": MLPClassifier, } def get_estimator(model_name: str): if model_name in estimator_dict.keys(): return estimator_dict[model_name] raise ValueError("model_name is not inside catalyst model dictionary") def createModel(model_name: str, model_type: str, num_class: int = None, **kwargs): if model_type == "multiclass": if num_class is None: raise AssertionError("For multiclass model, num_class must be provided") model: Any if "XGBoost" in model_name: if num_class: model = XGBoostModel(model_name, model_type, num_class=num_class, **kwargs) else: model = XGBoostModel(model_name, model_type, **kwargs) elif "LightGBM" in model_name: if num_class: model = LGBModel(model_name, model_type, num_class=num_class, **kwargs) else: model = LGBModel(model_name, model_type, **kwargs) else: estimator = get_estimator(model_name) if num_class: model = SkLearnModel(model_name, estimator(**kwargs), model_type, num_class=num_class) else: model = SkLearnModel(model_name, estimator(**kwargs), model_type) return model def regression_model_dict() -> dict: """ Wrapper function containing dictionary of all sklearn, xgboost, and light gbm models for regression dataset. Returns: Dictionary containing all sklearn, xgboost, and light gbm models for regression dataset """ return { "LinearRegression": createModel("LinearRegression", "regression"), "Ridge": createModel("Ridge", "regression"), "Lasso": createModel("Lasso", "regression"), "ElasticNet": createModel("ElasticNet", "regression"), "KNeighborsRegressor": createModel("KNeighborsRegressor", "regression"), "DecisionTreeRegressor": createModel("DecisionTreeRegressor", "regression"), "AdaBoostRegressor": createModel("AdaBoostRegressor", "regression"), "BaggingRegressor": createModel("BaggingRegressor", "regression"), "ExtraTreesRegressor": createModel("ExtraTreesRegressor", "regression", n_estimators=100), "GradientBoostingRegressor": createModel("GradientBoostingRegressor", "regression"), "RandomForestRegressor": createModel("RandomForestRegressor", "regression", n_estimators=100), "XGBoost": createModel("XGBoostRegressor", "regression", num_boost_round=100), "LightGBM": createModel("LightGBMRegressor", "regression", num_boost_round=100), } def binary_model_dict() -> dict: """ Wrapper function containing dictionary of all sklearn, xgboost, and light gbm models for binary dataset. Returns: Dictionary containing all sklearn, xgboost, and light gbm models for binary dataset """ return { "LogisticRegression": createModel("LogisticRegression", "binary", solver="lbfgs", max_iter=1000), "RidgeClassifier": createModel("RidgeClassifier", "binary"), "GaussianNB": createModel("GaussianNB", "binary"), "KNeighborsClassifier": createModel("KNeighborsClassifier", "binary"), "DecisionTreeClassifier": createModel("DecisionTreeClassifier", "binary"), "AdaBoostClassifier": createModel("AdaBoostClassifier", "binary"), "BaggingClassifier": createModel("BaggingClassifier", "binary"), "ExtraTreesClassifier": createModel("ExtraTreesClassifier", "binary", n_estimators=100), "GradientBoostingClassifier": createModel("GradientBoostingClassifier", "binary"), "RandomForestClassifier": createModel("RandomForestClassifier", "binary", n_estimators=100), "XGBoost": createModel("XGBoostBinary", "binary", num_boost_round=100), "LightGBM": createModel("LightGBMBinary", "binary", num_boost_round=100), } def multiclass_model_dict(num_class: int) -> dict: """ Wrapper function containing dictionary of all sklearn, xgboost, and light gbm models for multiclass dataset. Args: num_class (int): Number of class in the multiclass dataset Returns: Dictionary containing all sklearn, xgboost, and light gbm models for multiclass dataset """ return { "LogisticRegression": createModel( "LogisticRegression", "multiclass", num_class=num_class, multi_class="multinomial", solver="lbfgs", max_iter=1000, ), "RidgeClassifier": createModel("RidgeClassifier", "multiclass", num_class=num_class), "GaussianNB": createModel("GaussianNB", "multiclass", num_class=num_class), "KNeighborsClassifier": createModel("KNeighborsClassifier", "multiclass", num_class=num_class), "DecisionTreeClassifier": createModel("DecisionTreeClassifier", "multiclass", num_class=num_class), "ExtraTreesClassifier": createModel( "ExtraTreesClassifier", "multiclass", num_class=num_class, n_estimators=100 ), "RandomForestClassifier": createModel( "RandomForestClassifier", "multiclass", num_class=num_class, n_estimators=100 ), "XGBoost": createModel("XGBoostBinary", "multiclass", num_class=num_class, num_boost_round=100), "LightGBM": createModel("LightGBMBinary", "multiclass", num_class=num_class, num_boost_round=100), } PK!~N"fAA bamboos/utils/model/param_zoo.pyfrom hyperopt import hp from hyperopt.pyll.base import scope from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier regression_param_dict = { "Ridge": { "alpha": hp.uniform("alpha", 0.01, 10.0), "solver": hp.choice("solver", ["auto", "svd", "cholesky", "lsqr", "sparse_cg", "sag", "saga"]), }, "Lasso": {"alpha": hp.uniform("alpha", 0.01, 10.0)}, "ElasticNet": {"alpha": hp.uniform("alpha", 0.01, 10.0), "l1_ratio": hp.uniform("l1_ratio", 0.01, 0.99)}, "LinearSVR": { "epsilon": hp.uniform("epsilon", 0., 0.3), "C": hp.uniform("C", 0.1, 5.), "loss": hp.choice("loss", ["epsilon_insensitive", "squared_epsilon_insensitive"]), }, "KNeighborsRegressor": { "n_neighbors": scope.int(hp.quniform("n_neighbours", 1, 100, 1)), "algorithm": hp.choice("algorithm", ["ball_tree", "kd_tree", "brute", "auto"]), "weights": hp.choice("weights", ["uniform", "distance"]), "leaf_size": scope.int(hp.quniform("leaf_size", 5, 100, 1)), "p": scope.int(hp.choice("p", [1, 2])), "n_jobs": -1, }, "DecisionTreeRegressor": { "criterion": hp.choice("criterion", ["mse", "friedman_mse", "mae"]), "splitter": hp.choice("splitter", ["best", "random"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), }, "AdaBoostRegressor": { "base_estimator": hp.choice("base_estimator", [DecisionTreeRegressor(max_depth=n) for n in range(3, 50)]), "n_estimators": scope.int(hp.quniform("n_estimators", 50, 1000, 1)), "learning_rate": hp.quniform("learning_rate", 0.01, 10., 0.01), "loss": hp.choice("loss", ["linear", "square", "exponential"]), }, "BaggingRegressor": { "n_jobs": -1, "base_estimator": hp.choice( "base_estimator", [None] + [DecisionTreeRegressor(max_depth=n) for n in range(3, 50)] ), "n_estimators": 100, "max_samples": hp.quniform("max_samples", 0.1, 1.0, 0.01), "max_features": hp.quniform("max_features", 0.1, 1.0, 0.01), "bootstrap": hp.choice("bootstrap", [True, False]), "bootstrap_features": hp.choice("bootstrap_features", [True, False]), }, "ExtraTreesRegressor": { "n_jobs": -1, "n_estimators": 100, "criterion": hp.choice("criterion", ["mae", "mse"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "GradientBoostingRegressor": { "loss": hp.choice("loss", ["ls", "lad", "huber", "quantile"]), "learning_rate": hp.quniform("learning_rate", 0.01, 0.5, 0.01), "n_estimators": scope.int(hp.quniform("n_estimators", 100, 1000, 100)), "subsample": hp.quniform("subsample", 0.5, 1.0, 0.1), "criterion": hp.choice("criterion", ["friedman_mse", "mse", "mae"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "alpha": hp.quniform("alpha", 0.1, 0.99, 0.01), }, "RandomForestRegressor": { "n_jobs": -1, "n_estimators": scope.int(hp.quniform("n_estimators", 100, 1000, 100)), "criterion": hp.choice("criterion", ["mae", "mse"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "MLPRegressor": { "hidden_layer_sizes": hp.choice( "hidden_layer_sizes", [(n_nodes,) * n_layer for n_nodes in range(100, 1001, 50) for n_layer in range(1, 4, 1)], ), "activation": hp.choice("activation", ["logistic", "tanh", "relu"]), "solver": hp.choice("solver", ["lbfgs", "sgd", "adam"]), "alpha": hp.quniform("alpha", 0.00001, 0.001, 0.00001), "learning_rate": hp.choice("learning_rate", ["constant", "invscaling", "adaptive"]), "learning_rate_init": hp.quniform("learning_rate_init", 0.001, 1.0, 0.001), "max_iter": scope.int(hp.quniform("max_iter", 100, 1000, 100)), }, "XGBoost": { "booster": hp.choice("booster", ["gbtree", "gblinear", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "subsample": hp.quniform("subsample", 0.5, 1.0, 0.1), "colsample_bytree": hp.quniform("colsample_bytree", 0.5, 1.0, 0.1), "max_leaves": scope.int(hp.choice("max_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "gamma": scope.int(hp.choice("gamma", [0, 1, 2])), }, "LightGBM": { "boosting": hp.choice("booster", ["gbdt", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "num_leaves": scope.int(hp.choice("num_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "min_data_in_leaf": scope.int(hp.quniform("min_data_in_leaf", 1, 100, 1)), "bagging_fraction": hp.quniform("bagging_fraction", 0.5, 1.0, 0.1), "bagging_freq": scope.int(hp.choice("bagging_freq", [0, 1, 2, 3, 4, 5])), "feature_fraction": hp.quniform("feature_fraction", 0.5, 1.0, 0.1), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "min_gain_to_split": scope.int(hp.choice("min_gain_to_split", [0, 1, 2])), }, } binary_param_dict = { "LogisticRegression": { "penalty": hp.choice("penalty", ["l2"]), "C": hp.quniform("C", 0.1, 3.0, 0.1), "solver": hp.choice("solver", ["newton-cg", "sag", "lbfgs"]), "max_iter": 1000, }, "KNeighborsClassifier": { "n_neighbors": scope.int(hp.quniform("n_neighbours", 1, 100, 1)), "algorithm": hp.choice("algorithm", ["ball_tree", "kd_tree", "brute", "auto"]), "weights": hp.choice("weights", ["uniform", "distance"]), "leaf_size": scope.int(hp.quniform("leaf_size", 5, 100, 1)), "p": scope.int(hp.choice("p", [1, 2])), "n_jobs": -1, }, "DecisionTreeClassifier": { "criterion": hp.choice("criterion", ["gini", "entropy"]), "splitter": hp.choice("splitter", ["best", "random"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), }, "AdaBoostClassifier": { "base_estimator": hp.choice("base_estimator", [DecisionTreeClassifier(max_depth=n) for n in range(1, 50)]), "n_estimators": scope.int(hp.quniform("n_estimators", 50, 1000, 1)), "learning_rate": hp.quniform("learning_rate", 0.01, 10., 0.01), }, "BaggingClassifier": { "n_jobs": -1, "base_estimator": hp.choice( "base_estimator", [None] + [DecisionTreeClassifier(max_depth=n) for n in range(1, 50)] ), "n_estimators": 100, "max_samples": hp.quniform("max_samples", 0.1, 1.0, 0.01), "max_features": hp.quniform("max_features", 0.1, 1.0, 0.01), "bootstrap": hp.choice("bootstrap", [True, False]), "bootstrap_features": hp.choice("bootstrap_features", [True, False]), }, "ExtraTreesClassifier": { "n_jobs": -1, "n_estimators": 100, "criterion": hp.choice("criterion", ["gini", "entropy"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "GradientBoostingClassifier": { "loss": hp.choice("loss", ["deviance", "exponential"]), "learning_rate": hp.quniform("learning_rate", 0.01, 0.5, 0.01), "n_estimators": scope.int(hp.quniform("n_estimators", 100, 1000, 100)), "subsample": hp.quniform("subsample", 0.5, 1.0, 0.1), "criterion": hp.choice("criterion", ["friedman_mse", "mse", "mae"]), "max_depth": scope.int(hp.quniform("max_depth", 3, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), }, "RandomForestClassifier": { "n_jobs": -1, "n_estimators": scope.int(hp.quniform("n_estimators", 100, 1000, 100)), "criterion": hp.choice("criterion", ["gini", "entropy"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "MLPClassifier": { "hidden_layer_sizes": hp.choice( "hidden_layer_sizes", [(n_nodes,) * n_layer for n_nodes in range(100, 1001, 50) for n_layer in range(1, 4, 1)], ), "activation": hp.choice("activation", ["logistic", "tanh", "relu"]), "solver": hp.choice("solver", ["lbfgs", "sgd", "adam"]), "alpha": hp.quniform("alpha", 0.00001, 0.001, 0.00001), "learning_rate": hp.choice("learning_rate", ["constant", "invscaling", "adaptive"]), "learning_rate_init": hp.quniform("learning_rate_init", 0.001, 1.0, 0.001), "max_iter": scope.int(hp.quniform("max_iter", 100, 1000, 100)), }, "XGBoost": { "booster": hp.choice("booster", ["gbtree", "gblinear", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "subsample": hp.quniform("subsample", 0.5, 1.0, 0.1), "colsample_bytree": hp.quniform("colsample_bytree", 0.5, 1.0, 0.1), "max_leaves": scope.int(hp.choice("max_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "gamma": scope.int(hp.choice("gamma", [0, 1, 2])), }, "LightGBM": { "boosting": hp.choice("booster", ["gbdt", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "num_leaves": scope.int(hp.choice("num_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "min_data_in_leaf": scope.int(hp.quniform("min_data_in_leaf", 1, 100, 1)), "bagging_fraction": hp.quniform("bagging_fraction", 0.5, 1.0, 0.1), "bagging_freq": scope.int(hp.choice("bagging_freq", [1, 2, 3, 4, 5])), "feature_fraction": hp.quniform("feature_fraction", 0.5, 1.0, 0.1), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "min_gain_to_split": scope.int(hp.choice("min_gain_to_split", [0, 1, 2])), }, } multiclass_param_dict = { "LogisticRegression": { "penalty": hp.choice("penalty", ["l2"]), "C": hp.quniform("C", 0.1, 3.0, 0.1), "solver": hp.choice("solver", ["newton-cg", "sag", "lbfgs"]), "max_iter": 1000, }, "KNeighborsClassifier": { "n_neighbors": scope.int(hp.quniform("n_neighbours", 1, 100, 1)), "algorithm": hp.choice("algorithm", ["ball_tree", "kd_tree", "brute", "auto"]), "weights": hp.choice("weights", ["uniform", "distance"]), "leaf_size": scope.int(hp.quniform("leaf_size", 5, 100, 1)), "p": scope.int(hp.choice("p", [1, 2])), "n_jobs": -1, }, "DecisionTreeClassifier": { "criterion": hp.choice("criterion", ["gini", "entropy"]), "splitter": hp.choice("splitter", ["best", "random"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), }, "ExtraTreesClassifier": { "n_jobs": -1, "n_estimators": 100, "criterion": hp.choice("criterion", ["gini", "entropy"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "RandomForestClassifier": { "n_jobs": -1, "n_estimators": scope.int(hp.quniform("n_estimators", 100, 1000, 100)), "criterion": hp.choice("criterion", ["gini", "entropy"]), "max_depth": scope.int(hp.quniform("max_depth", 5, 1000, 1)), "min_samples_split": hp.choice("min_samples_split", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "min_samples_leaf": hp.choice("min_samples_leaf", [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]), "max_features": hp.choice("max_features", ["auto", "sqrt", "log2"]), "bootstrap": hp.choice("bootstrap", [True, False]), }, "MLPClassifier": { "hidden_layer_sizes": hp.choice( "hidden_layer_sizes", [(n_nodes,) * n_layer for n_nodes in range(100, 1001, 50) for n_layer in range(1, 4, 1)], ), "activation": hp.choice("activation", ["logistic", "tanh", "relu"]), "solver": hp.choice("solver", ["lbfgs", "sgd", "adam"]), "alpha": hp.quniform("alpha", 0.00001, 0.001, 0.00001), "learning_rate": hp.choice("learning_rate", ["constant", "invscaling", "adaptive"]), "learning_rate_init": hp.quniform("learning_rate_init", 0.001, 1.0, 0.001), "max_iter": scope.int(hp.quniform("max_iter", 100, 1000, 100)), }, "XGBoost": { "booster": hp.choice("booster", ["gbtree", "gblinear", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "subsample": hp.quniform("subsample", 0.5, 1.0, 0.1), "colsample_bytree": hp.quniform("colsample_bytree", 0.5, 1.0, 0.1), "max_leaves": scope.int(hp.choice("max_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "gamma": scope.int(hp.choice("gamma", [0, 1, 2])), }, "LightGBM": { "boosting": hp.choice("booster", ["gbdt", "dart"]), "eta": hp.quniform("eta", 0.01, 1.01, 0.01), "num_leaves": scope.int(hp.choice("num_leaves", [10 * (2 ** x) for x in range(0, 11, 1)])), "max_depth": scope.int(hp.quniform("max_depth", 3, 50, 1)), "min_data_in_leaf": scope.int(hp.quniform("min_data_in_leaf", 1, 100, 1)), "bagging_fraction": hp.quniform("bagging_fraction", 0.5, 1.0, 0.1), "bagging_freq": scope.int(hp.choice("bagging_freq", [1, 2, 3, 4, 5])), "feature_fraction": hp.quniform("feature_fraction", 0.5, 1.0, 0.1), "num_boost_round": scope.int(hp.quniform("num_boost_round", 100, 1000, 100)), "min_gain_to_split": scope.int(hp.choice("min_gain_to_split", [0, 1, 2])), }, } PKeNwTI$bamboos/utils/model/sklearn_model.pyimport warnings from sklearn.base import BaseEstimator from bamboos.utils.model.base_model import Model class SkLearnModel(Model): def __init__(self, name: str, model: BaseEstimator, pred_type: str, threshold: float = 0.5, **kwargs) -> None: super().__init__(name, model, pred_type, threshold) self.kwargs = kwargs if self.pred_type == "multiclass": assert "num_class" in self.kwargs.keys() self.num_class = self.kwargs["num_class"] def fit(self, X_train, y_train): self.model.fit(X_train, y_train) def predict(self, X_test): if self.pred_type == "binary": result = self.model.predict(X_test) elif self.pred_type == "multiclass": result = self.model.predict(X_test) else: assert self.pred_type == "regression" result = self.model.predict(X_test) return result def predict_proba(self, X_test): if self.pred_type == "binary": if not hasattr(self.model, "predict_proba"): warnings.warn("Model {} does not have attribute predict_proba. Returning None".format(self.name)) result = None else: result = self.model.predict_proba(X_test)[:, 1] elif self.pred_type == "multiclass": if not hasattr(self.model, "predict_proba"): warnings.warn("Model {} does not have attribute predict_proba. Returning None".format(self.name)) result = None else: result = self.model.predict_proba(X_test) else: raise ValueError("pred_type should be on of the following: ['binary', 'multiclass']") return result PK!~N` {{$bamboos/utils/model/xgboost_model.pyimport numpy as np import xgboost as xgb from bamboos.utils.model.base_model import Model class XGBoostModel(Model): def __init__(self, name: str, pred_type: str, threshold: float = 0.5, **kwargs) -> None: super().__init__(name, None, pred_type, threshold) self.kwargs = kwargs if self.pred_type == "multiclass": assert "num_class" in self.kwargs.keys() self.num_class = self.kwargs["num_class"] def fit(self, X_train, y_train): dtrain = xgb.DMatrix(X_train, label=y_train) if self.pred_type == "binary": params = {"objective": "binary:logistic", "silent": 1} elif self.pred_type == "multiclass": params = {"objective": "multi:softprob", "silent": 1} else: assert self.pred_type == "regression" params = {"objective": "reg:linear", "silent": 1} for key, value in self.kwargs.items(): params[key] = value if "num_boost_round" in self.kwargs.keys(): self.model = xgb.train(params, dtrain, self.kwargs.get("num_boost_round"), verbose_eval=False) else: self.model = xgb.train(params, dtrain, verbose_eval=False) def predict(self, X_test): dtest = xgb.DMatrix(X_test) if self.pred_type == "binary": prob = self.model.predict(dtest) pred = np.where(prob >= self.threshold, 1, 0) elif self.pred_type == "multiclass": if np.all(np.isnan(self.model.predict(dtest))): # Return array of NaN if model predicts all NaN pred = self.model.predict(dtest)[:, 0] else: pred = np.argmax(self.model.predict(dtest), axis=1) else: assert self.pred_type == "regression" pred = self.model.predict(dtest) return pred def predict_proba(self, X_test): dtest = xgb.DMatrix(X_test) if self.pred_type in ["binary", "multiclass"]: result = self.model.predict(dtest) else: raise ValueError("pred_type should be on of the following: ['binary', 'multiclass']") return result PKvN=%::bamboos-0.2.0.dist-info/LICENSEMIT License Copyright (c) 2019 Aditya Kelvianto Sidharta Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HMuSabamboos-0.2.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!HDiV bamboos-0.2.0.dist-info/METADATAR]K0|`{zE\d~1JrhUrGht+-d^ 埾,@),aB[jv[\ЖX`#(}GP"a^Gt.p(X9)OwCqOi DM]Su9us(z"o5]>/čo՜'grKPK!Ha bamboos-0.2.0.dist-info/RECORDɒJ}? Tȸ(""H!ddzn5W X|qO*lguK;c]I;2"2^ ݋-2?.uE֦[p? ְ{(v%ۈ6G8.wz}yPQVAT<-EM٠EL߫0KPӓMMn86DEV0  &|…<|:io["63E/cr0̇s(i#t*(,k%Cg&tHs_| k[¼+Na VD521Ei<~_@j%*a 9ʛ)4?UsrԼm쑬5+1o  \a{GOԢmuqGL?24ym=,L .]~aU.{:*(XrwC~%z2E LF!Sz\ -ڒMHٽNꓖ4 RvqB5rb8$ht{:* 3IeYV_Z\NP@uC. }xX$HǷʄ~~NFP^"ڮ) 7- uFvb\tW(X ϰbr([ 6M38q8`Flߋ 2]Q_w)N/MbS&hSe;IY D^_Ӿ~߬6l3r(qkJ*CѠr8N/)z<槦,-}7MAlRq[ʆ F<8u:3;wМb?Ǎ$i蝹>ΤƆCů<޷x