// MathJax

Regression and Classification Analysis

Regression Analysis

Cross-Sectional Analysis

OLS+WLS+GLS+GLM

$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\boldsymbol{\beta} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$
import numpy as np
import pandas as pd
import statsmodels.api as sm

X1 = np.random.normal(0, 3, size=(300,))
X2 = np.random.normal(0, 2, size=(300,))
X3 = np.random.normal(0, 2, size=(300,))
y = .1 + 2*X1 + 3*X2 + 1*X3 + np.random.normal(0, 3, size=300)
endog = y
exog = np.c_[np.ones_like(y), X1, X2, X3]

result = sm.OLS(endog=endog, exog=exog).fit()
result = sm.WLS(endog=endog, exog=exog, weights=1/sm.OLS(endog=endog, exog=exog).fit().resid**2).fit()
result = sm.GLS(endog=endog, exog=exog, sigma=np.diag(np.kron(sm.OLS(endog=endog, exog=exog).fit().resid.var(ddof=0), np.identity(endog.shape[0])))).fit()
result = sm.GLM(endog=endog, exog=exog, family=sm.families.Gaussian(sm.families.links.Identity())).fit()
result.summary()
$${\displaystyle \begin{aligned} \mathbf{y} = &\underbrace{\mathbf{X}}_{\text{Uncertainty}}\boldsymbol{\beta} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$

IV2SLS

import numpy as np
import pandas as pd
from statsmodels.sandbox.regression import gmm

Z = np.random.normal(0, 1, size=1000)
epsilon = np.random.normal(0, 1, size=1000)
epsilon = epsilon - ( np.cov(epsilon, Z)[0,1]/ np.cov(Z, Z)[0,1] ) * Z
X = 3*Z + 1 + epsilon + np.random.normal(size=1000)
y = 5*X + 4 + epsilon
data = pd.DataFrame(np.c_[y, np.ones_like(y), X, Z], columns=['y', 'const', 'X', 'Z'])

result = gmm.IV2SLS(data['y'], exog=data[['const', 'X']], instrument=data[['const', 'Z']]).fit()
result.summary()
import numpy as np
import pandas as pd
import linearmodels.iv.model as lm

Z = np.random.normal(0, 1, size=1000)
epsilon = np.random.normal(0, 1, size=1000)
epsilon = epsilon - ( np.cov(epsilon, Z)[0,1]/ np.cov(Z, Z)[0,1] ) * Z
X = 3*Z + 1 + epsilon + np.random.normal(size=1000)
y = 5*X + 4 + epsilon
data = pd.DataFrame(np.c_[y, np.ones_like(y), X, Z], columns=['y', 'const', 'X', 'Z'])

result = lm.IV2SLS(dependent=data['y'], exog=data['const'], endog=data['X'], instruments=data['Z']).fit(cov_type="homoskedastic", debiased=True)
result.wu_hausman() # Wu-Hausman test of exogeneity
result.wooldridge_regression # Wooldridge's regression test of exogeneity(Wu-Hausman test results)
result.sargan # Sargan's test
result.summary

 

$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\boldsymbol{\beta} + \underbrace{\boldsymbol{\varepsilon}}_{\text{Uncertainty}}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$

Nested Group Effect

import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf

n_group1 = 20; n_group2 = 100
data = pd.DataFrame(data=0, columns=['e'], index=pd.MultiIndex.from_product([np.arange(n_group1), np.arange(n_group2)], names=['G1', 'G2']))
G1_effect = data.groupby(['G1'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 2)).rename(columns={'e':'G1_Effect'})
G1G2_effect = data.groupby(['G1', 'G2'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 3)).rename(columns={'e':'G1G2_Effect'})
data = pd.concat([data, G1_effect, G1G2_effect], axis=1)
data = pd.concat([data]*10, axis=0)
data['y'] = data['G1_Effect'] + data['G1G2_Effect'] + np.random.normal(0, 1, size=data.shape[0])

result = smf.mixedlm(
	"y ~ 1", 
    re_formula="1", 
    vc_formula={"G2": "0 + C(G2)"},
    groups="G1",
    data=data.reset_index(),
).fit() # nested effect

print(data['y'].var())
print(data.groupby('G1')['y'].mean().var())
print(data.groupby('G2')['y'].mean().var()) # ~ 0 
print(data.groupby(['G1', 'G2'])['y'].mean().var() - data.groupby('G1')['y'].mean().var())
result.summary()

Crossed Group Effect

import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf

n_group1 = 20; n_group2 = 100
group = pd.DataFrame(data=0, columns=['e'], index=pd.MultiIndex.from_product([np.arange(n_group1), np.arange(n_group2)], names=['G1', 'G2']))
G1_effect = group.groupby(['G1'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 2)).rename(columns={'e':'G1_Effect'})
G2_effect = group.groupby(['G2'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 3)).rename(columns={'e':'G2_Effect'})
group = pd.concat([group, G1_effect, G2_effect], axis=1)
group = pd.concat([group]*10, axis=0)
group['y'] = group['G1_Effect'] + group['G2_Effect'] + np.random.normal(0, 1, size=group.shape[0])

result = smf.mixedlm(
	"y ~ 1", 
    re_formula=None, 
    vc_formula={"G1": "0 + C(G1)", "G2": "0 + C(G2)"},
    groups=np.ones_like(group['y']),
    data=group.reset_index(),
).fit() # crossed effect

print(group['y'].var(ddof=1))
print(group.groupby(['G1'])['y'].mean().var(ddof=1), group.groupby(['G1'])['y'].var(ddof=1).mean())
print(group.groupby(['G2'])['y'].mean().var(ddof=1), group.groupby(['G2'])['y'].var(ddof=1).mean())
print(group.groupby(['G1', 'G2'])['y'].mean().var(ddof=1), group.groupby(['G1', 'G2'])['y'].var(ddof=1).mean())
display(result.summary())

g1_result = smf.ols('y ~ 0+C(G1)', data=group.reset_index()).fit()
g2_result = smf.ols('y ~ 0+C(G2)', data=group.reset_index()).fit()
print(g1_result.params.mean(), g1_result.fittedvalues.var(ddof=1))
print(g2_result.params.mean(), g2_result.fittedvalues.var(ddof=1))

 

$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\underbrace{\boldsymbol{\beta}}_{\text{Uncertainty}} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
import statsmodels.stats.api as sms

np.random.seed(0)
N = np.random.normal(size=1000)
C = np.random.randint(3, size=1000)
X = np.c_[N, C]

y0 = 1 + 2 * X[np.where(C==0)[0], 0] + np.random.normal(0, 1, size=np.where(C==0)[0].shape[0])
y1 = 2 + 3 * X[np.where(C==1)[0], 0] + np.random.normal(0, 1, size=np.where(C==1)[0].shape[0])
y2 = 3 + 5 * X[np.where(C==2)[0], 0] + np.random.normal(0, 1, size=np.where(C==2)[0].shape[0])
y = np.r_[y0, y1, y2]
X = np.r_[X[np.where(C==0)[0]], X[np.where(C==1)[0]], X[np.where(C==2)[0]]]
data = pd.DataFrame(np.c_[y, X], columns=['y', 'X', 'group'])
data['group'] = data['group'].astype('category')
data['const'] = np.ones_like(data['y'])

# REML: restricted maximum likelihood
# Varying-coefficient model for intercepts and slopes
result = smf.mixedlm(formula='y ~ 1 + X', re_formula='~ 1 + X', groups=data['group'], data=data).fit(method=['nm', 'bfgs', 'lbfgs', 'powell', 'cg', 'basinhopping', 'minimize'][1], start_params=None, reml=True, niter_sa=0, do_cg=True, fe_pen=None, cov_pen=None, free=None, full_output=False)
display(result.summary())

beta1 = pd.DataFrame(result.random_effects).rename(index={'Group':'Intercept'}).add(result.fe_params, axis=0) 
beta2 = smf.ols('y ~ 0 + group + group:X', data=data).fit().summary()

 

Factor Analysis

$${\displaystyle \begin{aligned} \mathbf{X} - \mathbf{M} = \underbrace{\mathbf{L}}_{\text{Loadings}} \; \underbrace{\mathbf{F}}_{\text{Factor}} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \boldsymbol{\Sigma}) \\ \end{aligned} }$$
import numpy as np
import statsmodels.api as sm

L = np.random.normal(size=(5,3)) # loadings
F = np.random.normal(size=(400, 3)) # factor scores
X = (L@F.T).T + np.random.multivariate_normal(np.zeros(5), np.eye(5)*[10,10,10,1,1], size=400)

result = sm.Factor(endog=X, n_factor=3, corr=None, method=['pa', 'ml'][0], smc=True, endog_names=None, nobs=None, missing='drop').fit()
result.rotate(method=['varimax', 'quartimax', 'biquartimax', 'equamax', 'oblimin', 'parsimax', 'parsimony', 'biquartimin', 'promax'][-1])
result.summary()

L_ = result.loadings 
F_ = result.factor_scoring()
residuals = (X - X.mean(axis=0) - (L_@F_.T).T)
np.c_[residuals.mean(axis=0), residuals.var(axis=0)]

 

 

 

Time Series Analysis

Arch in Mean

import numpy as np
import pandas as pd
from arch.univariate import ARCHInMean, ARCH, Normal

white_noise = np.random.normal(0, 1, size=1000) # Standard Noise
y = np.empty_like(white_noise)
epsilon = np.empty_like(white_noise)
sigma2 = np.empty_like(white_noise)
for t, std_noise in enumerate(white_noise):
    sigma2[t] = 2 + .5*epsilon[t-1]**2 + .3*epsilon[t-2]**2
    epsilon[t] = std_noise * np.sqrt(sigma2[t])
    y[t] = -.3*y[t-1] + .7*np.sqrt(sigma2[t]) + epsilon[t] # kappa
    
data = pd.DataFrame(np.c_[y] ,columns=['y'], index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))    
result = ARCHInMean(data['y'], constant=False, lags=[1], form=["log", "vol", "var"][1], volatility=ARCH(2), distribution=Normal(), rescale=False).fit(disp=0) # lags:scalar > HAR, lags:1d-array > AR 
result.summary()

Factor Analysis

#

 

 

Panel Analysis

#

 

 

 

Classification Analysis

 

 

 

 

 

 

 


Cross Validation

Big Data Task: Classification

  • [X-(y) relevant split] StratifiedKFold, RepeatedStratifiedKFold, StratifiedShuffleSplit
  • [X-(G-y) relevant split] GroupKFold, GroupShuffleSplit

Big Data Task: Regession

  • [X-y irrelevant split] KFold, RepeatedKFold, ShuffleSplit
  • [X-(G-y) relevant split] GroupKFold, GroupShuffleSplit

Big Data Task: Time Series

  • [X-y irrelevant split] TimeSeriesSplit

Small Data Task

  • [X-(G-y) relevant split] LeavePGroupsOut, LeaveOneGroupOut
  • [X-y irrelevant split] LeavePOut, LeaveOneOut
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import TimeSeriesSplit
from sklearn.model_selection import LeaveOneGroupOut, LeaveOneOut, LeavePGroupsOut, LeavePOut
from sklearn.model_selection import GroupKFold, StratifiedGroupKFold, GroupShuffleSplit
from sklearn.model_selection import StratifiedKFold, RepeatedStratifiedKFold, StratifiedShuffleSplit 
from sklearn.model_selection import KFold, RepeatedKFold, ShuffleSplit

X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv_spliter = StratifiedKFold(n_splits=5, shuffle=False, random_state=None).split(X,y)
cv_spliter = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None).split(X,y)
cv_spliter = StratifiedShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X,y)


X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv_spliter = KFold(n_splits=5, shuffle=False, random_state=None).split(X)
cv_spliter = RepeatedKFold(n_splits=5, n_repeats=3, random_state=None).split(X)
cv_spliter = ShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X)


X = np.array([[1, 2], 
              [3, 4], 
              [5, 6], 
              [7, 8], 
              [3, 4], 
              [5, 6]])
y = np.array([1, 
              2, 
              1, 
              2, 
              1, 
              2])
cv_spliter = ShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X)
cv_spliter = LeavePOut(p=3).split(X)
cv_spliter = LeaveOneOut().split(X)
cv_spliter = TimeSeriesSplit(n_splits=5, max_train_size=None, test_size=None, gap=0).split(y) # pd.DataFrame(cv_spliter, columns=['X', 'y'])


X = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
y = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32]
groups = ['a','a', 'a', 'b', 'b', 'b', 'c', 'c', 'c', 'c', 'd', 'd', 'd', 'd', 'd', 'd']
cv_spliter = GroupKFold().split(X, y, groups=groups)
cv_spliter = StratifiedGroupKFold().split(X, y, groups=groups)
cv_spliter = GroupShuffleSplit().split(X, y, groups=groups)

cross validation

# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import cross_validate, RepeatedStratifiedKFold, RepeatedKFold 
from sklearn.linear_model import LinearRegression, LogisticRegression

# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
cv_results = cross_validate(LogisticRegression(), X, y, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
cv_results = cross_validate(LogisticRegression(), X, y, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)


# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)
cv_results = cross_validate(LinearRegression(), X, y, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LogisticRegression
from mlxtend.evaluate import bias_variance_decomp

# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=None, shuffle=True, stratify=y)
avg_expected_loss, avg_bias, avg_var = bias_variance_decomp(LogisticRegression(), X_train, y_train, X_test, y_test, loss='0-1_loss', random_seed=None)

# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=None, shuffle=True)
avg_expected_loss, avg_bias, avg_var = bias_variance_decomp(LinearRegression(), X_train, y_train, X_test, y_test, loss='mse', random_seed=None)

grid search

# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import GridSearchCV, RepeatedStratifiedKFold, RepeatedKFold 
from sklearn.linear_model import LinearRegression, LogisticRegression

# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
classifier = GridSearchCV(estimator=LogisticRegression(), param_grid={'fit_intercept':[False, True], 'penalty':['l2']}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][0], return_train_score=True)
classifier = GridSearchCV(estimator=LogisticRegression(), param_grid={'fit_intercept':[False, True], 'penalty':['l2']}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'][0], return_train_score=True)
cv_results = classifier.fit(X, y).cv_results_
classifier.best_estimator_ # fitted
classifier.estimator       # not fitted

# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)
regressor = GridSearchCV(estimator=LinearRegression(), param_grid={'fit_intercept':[False, True], 'positive':[False, True]}, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'][1], return_train_score=True)
cv_results = regressor.fit(X, y).cv_results_
regressor.best_estimator_ # fitted
regressor.estimator       # not fitted

 

 

 


Evaluation

Classification: Precision&Recall Trade-Off

import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
#from sklearn.metrics import roc_curve, precision_recall_curve, auc

def cls_evaluation(y_true, y_pred):
    # version 1: base confusion matrix
    confusion_matrix = pd.crosstab(index=[y_true], columns=[y_pred], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
    
    # version 2: extended confusion matrix
    conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
    conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
    confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
    confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
    display(confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True)))

    result = dict(); average='micro' # 'binary', 'macro', 'micro', 'weighted', 'samples'
    result['confusion_matrix'] = metrics.confusion_matrix(y_true, y_pred)
    result['accuracy_score'] = metrics.accuracy_score(y_true, y_pred)
    result['balanced_accuracy_score'] = metrics.balanced_accuracy_score(y_true, y_pred)
    result['recall_score'] = metrics.recall_score(y_true, y_pred, average=average)
    result['precision_score'] = metrics.precision_score(y_true, y_pred, average=average)
    result['f1_score'] = metrics.f1_score(y_true, y_pred, average=average) # precision&recall trade-off
    result['fbeta_score'] = metrics.fbeta_score(y_true, y_pred, beta=2, average=average)
    result['jaccard_score'] = metrics.jaccard_score(y_true, y_pred, average=average)
    result['hamming_loss'] = metrics.hamming_loss(y_true, y_pred)
    result['matthews_corrcoef'] = metrics.matthews_corrcoef(y_true, y_pred)
    result['multilabel_confusion_matrix'] = metrics.multilabel_confusion_matrix(y_true, y_pred)
    result['zero_one_loss'] = metrics.zero_one_loss(y_true, y_pred)
    return result

y_true = np.random.randint(3, size=100)
y_pred = np.random.randint(3, size=100)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])

cls_evaluation(df['TRUE'], df['PRED'])

Regression: Bias&Variance Trade-Off

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import metrics

def reg_evaluation(y_true, y_pred):
    y_resid = y_true - y_pred
    
    fig = plt.figure(figsize=(30,5))
    ax0 = plt.subplot2grid((2,5), (0,0), colspan=3, rowspan=1); ax0.set_title('Bias-Variance')
    ax1 = plt.subplot2grid((2,5), (1,0), colspan=3, rowspan=1, sharex=ax0, sharey=ax0); 
    ax2 = plt.subplot2grid((2,5), (0,3), colspan=1, rowspan=2); ax2.set_title('Correlation'); ax2.set_xlabel('PRED'); ax2.set_ylabel('RESID')
    ax3 = plt.subplot2grid((2,5), (0,4), colspan=1, rowspan=2); ax3.set_title('Residual Histogram')
    [ax0.spines[side].set_visible(False) for side in ['top', 'bottom', 'right']]
    [ax1.spines[side].set_visible(False) for side in ['top']]
    [ax2.spines[side].set_visible(False) for side in ['top', 'right']]

    ax0.plot(y_true, lw=0, marker='o', markerfacecolor='w', color='black', label='TRUE')
    ax0.plot(y_pred, lw=0, ms=10, marker='x', color='r', label='PRED')
    ax0.get_xaxis().set_visible(False)
    ax0.legend(frameon=False)

    for x, y1, y2 in zip(y_true.index, y_true, y_pred):
        ax1.axvline(x=x, ymin=min(y1,y2), ymax=max(y1,y2), c='r')

    
    # Homoscedasticity/Heteroscedasticity
    ax2.scatter(y_pred, y_resid, label='Homoscedasticity/Heteroscedasticity', color='black')
    boundary = max(y_pred.std(), y_resid.std())
    ax2.set_xlim(-boundary*10, boundary*10)
    ax2.set_ylim(-boundary*10, boundary*10)
    ax2.legend(frameon=False)

    ax3.hist(y_resid, bins=30, edgecolor='white')
    ax3.axvline(y_resid.mean(), c='r')
    
    result = dict()
    result['r2_score'] = metrics.r2_score(y_true, y_pred)
    result['explained_variance_score'] = metrics.explained_variance_score(y_true, y_pred)
    result['mean_squared_error'] = metrics.mean_squared_error(y_true, y_pred)
    result['mean_absolute_error'] = metrics.mean_absolute_error(y_true, y_pred)
    result['median_absolute_error'] = metrics.median_absolute_error(y_true, y_pred)
    result['max_error'] = metrics.max_error(y_true, y_pred)
    result['mean_tweedie_deviance'] = metrics.mean_tweedie_deviance(y_true, y_pred)

    return result

y_true = np.random.normal(0, 10, size=300)
y_pred = y_true + np.random.normal(3, 4, size=300) # bias, variance
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])

reg_evaluation(df['TRUE'], df['PRED'])

 

Time Series

# [1, Theoretical Simple Assumption] y = f(X1, X2, X3, ...) + noise
# >>> (1-1) X means deterministic variables (explanatory variable)
# >>> (1-2) noise is independent probabilistic variable 
# >>> (1-3) the variance of y comes from noise

# [2, Experimental Calculation] y_hat = f_hat(X1, X2, X3, ...)
# >>> (2-1) X is given explanatory variables, the 'hat' means approximation

# [3] Estimation: y - y_hat = residual
# >>> (3-1) residual means approximation of noises
# >>> (3-2) if an estimator is well defined, correlation between y_hat and residual is expected to 0. 

# [4, Evaluation] endog:y = fittedvalues:y_hat + resid:residual
# >>> (4-1) variances of endog, fittedvalues, resid  
# >>> (4-2) variance of fittedvalues = f_hat(X1, X2, X3, ...): variance by propagation of uncertainty between explanatory variables with function(linear map or nonlinear map)

import numpy as np
import pandas as pd
import statsmodels
import statsmodels.tsa.api as smt

def tsa_evaluation(y_true, y_pred, num_params=1):
    endog = y_true
    fittedvalues = y_pred
    resid = y_true - y_pred

    
    result = dict()
    # result['sst'] = result['ssr'] + result['sse'], for only Linear Regression
    # 1 = result['r2'] + result['k2'], for all statistical cases
    
    result['sst'] = ((endog - endog.mean())**2).sum() # deviation volume > all-data variances
    result['ssr'] = ((fittedvalues - endog.mean())**2).sum() # deviation volume > all-data variances
    result['sse'] = resid.apply(lambda x: x**2).sum() # deviation volume > all-data variances
    
    result['sse/ssr'] = result['sse']/result['ssr'] # deviation volume ratio 
    result['ssr/sst:explained variance'] = result['ssr']/result['sst'] # deviation volume ratio > fraction of variance explained 
    result['sse/sst:unexplained variance'] = result['sse']/result['sst'] # deviation volume ratio > fraction of variance unexplained (FVU)

    result['r2'] = 1 - (result['sse']/result['sst']) # [ 0 < r2 < 1 ] coefficient of determination: common variance shared bwtween the variables, (~covariance) 
    result['k2'] = 1 - result['r2'] # [ 0 < k2 < 1 ] coefficient of alienation: common variance not shared bwtween the variables

    result['y-yhat:corr'] = np.corrcoef(endog, fittedvalues)[0,1] # [ -1 < correlation < 1 ]
    result['y-yhat:cov'] = np.cov(endog, fittedvalues)[0,1]
    result['y-yhat:leverage'] = np.cov(endog, fittedvalues)[0,1]/np.cov(endog, fittedvalues)[0,0] # [ 0 < leverage < 1 ]
    result['yhat-resid:corr'] = np.corrcoef(fittedvalues, resid)[0,1]
    result['yhat-resid:cov'] = np.cov(fittedvalues, resid)[0,1]

    result['mst'] = endog.var(ddof=1) # deviation area > individual variance
    result['msr'] = ((fittedvalues - endog.mean())**2).sum() / num_params # deviation area > individual variance
    result['mse'] = resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params) # deviation area > individual variance
    
    result['rmse'] = np.sqrt(resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params)) # deviation length
    result['mae'] = resid.apply(np.abs).sum()/(resid.shape[0]-num_params) # deviation length
    result['mape'] = (resid.apply(np.abs) / (fittedvalues + resid).apply(np.abs)).sum() / (resid.shape[0]-num_params) # deviation length   
    return result

y_true = pd.Series(data=np.random.normal(0, 1, size=300).cumsum(), index=pd.date_range('00:00:00', periods=300, freq='D'))
y_pred = y_true + .3*np.random.normal(0, 1, size=300).cumsum() + .1*np.random.normal(0, 1, size=300)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])

tsa_evaluation(df['TRUE'], df['PRED'], num_params=1)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics
#from sklearn.metrics import roc_curve, precision_recall_curve, auc

def tsa_evaluation(y_true, y_pred):
    y_true = (y_true - y_true.iloc[0]).expanding(min_periods=1).mean().apply(lambda x: 1 if x > 0 else 0)
    y_pred = (y_pred - y_pred.iloc[0]).expanding(min_periods=1).mean().apply(lambda x: 1 if x > 0 else 0)
    
    # version 1: base confusion matrix
    confusion_matrix = pd.crosstab(index=[y_true], columns=[y_pred], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
    
    # version 2: extended confusion matrix
    conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
    conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
    confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
    confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
    display(confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True)))

    result = dict(); average='micro' # 'binary', 'macro', 'micro', 'weighted', 'samples'
    result['confusion_matrix'] = metrics.confusion_matrix(y_true, y_pred)
    result['accuracy_score'] = metrics.accuracy_score(y_true, y_pred)
    result['balanced_accuracy_score'] = metrics.balanced_accuracy_score(y_true, y_pred)
    result['recall_score'] = metrics.recall_score(y_true, y_pred, average=average)
    result['precision_score'] = metrics.precision_score(y_true, y_pred, average=average)
    result['f1_score'] = metrics.f1_score(y_true, y_pred, average=average) # precision&recall trade-off
    result['fbeta_score'] = metrics.fbeta_score(y_true, y_pred, beta=2, average=average)
    result['jaccard_score'] = metrics.jaccard_score(y_true, y_pred, average=average)
    result['hamming_loss'] = metrics.hamming_loss(y_true, y_pred)
    result['matthews_corrcoef'] = metrics.matthews_corrcoef(y_true, y_pred)
    result['multilabel_confusion_matrix'] = metrics.multilabel_confusion_matrix(y_true, y_pred)
    result['zero_one_loss'] = metrics.zero_one_loss(y_true, y_pred)
    return result


y_true = pd.Series(data=np.random.normal(0, 1, size=300).cumsum(), index=pd.date_range('00:00:00', periods=300, freq='D'))
y_pred = y_true + .5*np.random.normal(0, 1, size=300).cumsum() + .1*np.random.normal(0, 1, size=300)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])

tsa_evaluation(df['TRUE'], df['PRED'])

 

 

 


Task: Binary Classification

: precision&recall trade-off

Models

from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, naive_bayes, tree, neighbors, discriminant_analysis, svm, neural_network

X, y = make_classification(n_samples=3000, n_features=10, n_classes=2, n_clusters_per_class=1, weights=[.6, .4], flip_y=0)

classifier = linear_model.LogisticRegression(penalty='l2', max_iter=500)
classifier = ensemble.AdaBoostClassifier()
classifier = ensemble.GradientBoostingClassifier()
classifier = ensemble.BaggingClassifier()
classifier = ensemble.ExtraTreesClassifier()
classifier = ensemble.RandomForestClassifier()
classifier = tree.DecisionTreeClassifier()
classifier = tree.ExtraTreeClassifier()
classifier = neighbors.KNeighborsClassifier()
classifier = neighbors.RadiusNeighborsClassifier()
classifier = naive_bayes.GaussianNB()
classifier = discriminant_analysis.LinearDiscriminantAnalysis()
classifier = discriminant_analysis.QuadraticDiscriminantAnalysis()
classifier = svm.SVC(kernel='poly', probability=True, max_iter=10000) # kernel{‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
classifier = neural_network.MLPClassifier(hidden_layer_sizes=(100,), max_iter=100, activation='relu', solver='adam', learning_rate='adaptive')

classifier.fit(X, y)
classifier.predict(X)
classifier.predict_proba(X)

Metrics

from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=2, n_clusters_per_class=1, weights=[1/10, 9/10])
classifier = LogisticRegression()
classifier.fit(X, y)

y_true = y 
y_pred = classifier.predict(X)

## CLASSIFICATION
metrics.confusion_matrix(y_true, y_pred)
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred, average=None)
metrics.precision_score(y_true, y_pred, average=None)
metrics.f1_score(y_true, y_pred, average=None)
metrics.fbeta_score(y_true, y_pred, beta=2, average=None)
metrics.jaccard_score(y_true, y_pred, average=None)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.hamming_loss(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)

Pipelines

#

Validation

# https://scikit-learn.org/stable/modules/model_evaluation.html

import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, precision_recall_curve, auc

def parameter_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
    return scores

def scenario_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index.name = 'split' # Repeated Stratified KFold: n_repeats * k_flod
    return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)

X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control

# version1: multiple parameters
classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, param_grid={'lineardiscriminantanalysis__priors':[(.1, .9), (.5, .5), None]}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][-3], return_train_score=True)
classifier.fit(X, y)
parameter_validation_scores(classifier.cv_results_)

# version2: multiple scoring
# v.s. cross_val_score(classifier, X, y, scoring='f1', cv=cv, fit_params=None, n_jobs=-1, verbose=0)
classifier = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('lineardiscriminantanalysis', LinearDiscriminantAnalysis())])
cv_results = cross_validate(classifier, X, y, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)

# additional: trade-off metrics
y_true = y.copy()
y_prob = classifier.fit(X,y).predict_proba(X).copy()
fpr, tpr1, thresholds1 = roc_curve(y_true, y_prob[:,1])
ppv, tpr2, thresholds2 = precision_recall_curve(y_true, y_prob[:,1])
roc_auc = auc(fpr, tpr1) # X-axis(fpr): fall-out / y-axis(tpr): recall
pr_auc = auc(tpr2, ppv) # # X-axis(tpr): recall / y-axis(ppv): precision

Evaluation

from datetime import datetime
from functools import wraps
import joblib
import os
from tqdm import tqdm
from copy import deepcopy
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.pipeline import FeatureUnion, Pipeline, make_pipeline, make_union
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import FunctionTransformer, Binarizer, KBinsDiscretizer, OrdinalEncoder, OneHotEncoder, PolynomialFeatures, SplineTransformer, LabelBinarizer, LabelEncoder
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn import decomposition
from sklearn import linear_model, naive_bayes, discriminant_analysis, svm, neighbors, tree, ensemble
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn import metrics

class ResultVisualizer:
    @staticmethod
    def training_scenario(cv_results):
        cv_results = pd.DataFrame(cv_results)
        cv_1st_normalization = list()
        for name in cv_results.keys():
            cv_result = cv_results[name].apply(lambda x: pd.Series(x)).stack(0).to_frame().reset_index().rename(columns={0:'score', 'level_0':'score_name', 'level_1':'split'})
            cv_result['scenario'] = name
            cv_1st_normalization.append(cv_result)

        cv_result = pd.concat(cv_1st_normalization, axis=0).reset_index(drop=True)
        cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'target' if x.startswith('train') or x.startswith('test') else 'nontarget')
        cv_result = cv_result.loc[lambda x: x['domain'] == 'target'].copy().reset_index(drop=True) 
        cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'train' if x.startswith('train') else 'validation')
        cv_result['scoring_type'] = cv_result['score_name'].apply(lambda x: '_'.join(x.split('_')[1:]))

        sns.set_theme(style="ticks") # white, dark, whitegrid, darkgrid, ticks
        g = sns.FacetGrid(cv_result, col="scoring_type",  row="domain", aspect=2, height=3) 
        g.map_dataframe(sns.boxplot, x="scenario", y="score", hue='split') # x: numerical variable, y: numerical variable, hue: categorical variable
        g.fig.suptitle('Scenario Evaluation')
        g.add_legend()
        g.tight_layout()
        return g
    
    @staticmethod
    def tradeoff_curve(curve_points):        
        roc_curve_points_by_scenario = list()
        pr_curve_points_by_scenario = list()
        for scenario, (y_true, y_prob) in curve_points.items():
            roc_curve_points = metrics.roc_curve(y_true, y_prob[:, 1]) # ['FPR', 'TPR', 'Threshold']
            roc_curve_points = pd.DataFrame(roc_curve_points).T
            roc_curve_points.columns = ['FPR', 'TPR', 'Threshold']
            roc_curve_points.insert(0, 'Scenario', scenario)
            roc_curve_points_by_scenario.append(roc_curve_points)
            
            pr_curve_points = metrics.precision_recall_curve(y_true, y_prob[:, 1]) # ['PPV', 'TPR', 'Threshold']
            pr_curve_points = pd.DataFrame(pr_curve_points).T
            pr_curve_points.columns = ['PPV', 'TPR', 'Threshold']
            pr_curve_points.insert(0, 'Scenario', scenario)
            pr_curve_points_by_scenario.append(pr_curve_points)
            
        roc_curve_points_by_scenario = pd.concat(roc_curve_points_by_scenario, axis=0)
        pr_curve_points_by_scenario = pd.concat(pr_curve_points_by_scenario, axis=0)
                
        plt.figure(figsize=(30,5))
        ax0 = plt.subplot2grid((1,2), (0,0))
        ax0.spines['bottom'].set_visible(False)
        ax0.spines['right'].set_visible(False)
        ax0.tick_params(top=True, bottom=False, left=True, right=False, labeltop=True, labelbottom=False, labelleft=True, labelright=False)

        ax1 = plt.subplot2grid((1,2), (0,1))        
        ax1.spines['bottom'].set_visible(False)
        ax1.spines['left'].set_visible(False)
        ax1.tick_params(top=True, bottom=False, left=False, right=True, labeltop=True, labelbottom=False, labelleft=False, labelright=True)

        sns.scatterplot(data=roc_curve_points_by_scenario, x='FPR', y='TPR', hue='Scenario', ax=ax0)
        sns.scatterplot(data=pr_curve_points_by_scenario, x='TPR', y='PPV', hue='Scenario', ax=ax1)
        plt.tight_layout()
        

def evaluation(*args, **kwargs):    
    def decorator(func):
        @wraps(func)
        def wrapper(model, X, y, model_name=None, model_info=None, domain_kind=None, scenario=None, note=None, reposit=False, verbose=False): 
            model_repository = 'model_repository'
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            if reposit:
                # model_repository
                saving_name = model_name + current_time.replace('-', '').replace(' ', '').replace(':', '')
                os.mkdir(model_repository) if not model_repository in os.listdir() else None
                joblib.dump(model, f'./{model_repository}/pipeline_{saving_name}.joblib')
                pipeline = joblib.load(f'./{model_repository}/pipeline_{saving_name}.joblib')
            
            y, y_pred, y_prob = func(model, X, y)
            confusion_matrix = pd.crosstab(index=y, columns=y_pred)
            
            if confusion_matrix.columns.size != 2:
                confusion_matrix[pd.Index([0,1]).difference(confusion_matrix.columns).item()] = 0
            elif confusion_matrix.index.size != 2:
                confusion_matrix = confusion_matrix.T
                confusion_matrix[pd.Index([0,1]).difference(confusion_matrix.columns).item()] = 0
                confusion_matrix = confusion_matrix.T
            
            TP = confusion_matrix.iloc[1, 1]
            TN = confusion_matrix.iloc[0, 0]
            FP = confusion_matrix.iloc[0, 1]
            FN = confusion_matrix.iloc[1, 0]
                            
            summary = dict()
            summary['datetime'] = [current_time]
            summary['model'] = [model_name]
            summary['information'] = [model_info]            
            summary['scenario'] = [scenario]
            summary['domain_kind'] = [domain_kind]            
            summary['total'] = [y.shape[0]]
            summary['true_positive'] = [TP]
            summary['true_negative'] = [TN]
            summary['false_positive'] = [FP]
            summary['false_negative'] = [FN]            
            summary['accuracy'] = [metrics.accuracy_score(y, y_pred)]
            summary['precision'] = [metrics.precision_score(y, y_pred)]    
            summary['recall'] = [metrics.recall_score(y, y_pred)]
            summary['f1'] = [metrics.f1_score(y, y_pred)]
            summary['note'] = [note]
            
            evaluation = pd.concat([pd.read_csv(f'./{model_repository}/.evaluation.csv'), pd.DataFrame(summary)], axis=0).reset_index(drop=True) if '.evaluation.csv' in os.listdir(model_repository) else pd.DataFrame(summary)
            evaluation.to_csv(f'./{model_repository}/.evaluation.csv', index=False)
            
            if verbose:
                display(evaluation)
            
            return y_pred, y_prob, evaluation.copy()
        return wrapper
    return decorator

@evaluation(description="my_description")
def prediction_with(model, X, y, model_name=None, model_info=None, domain_kind=None, scenario=None, note=None, reposit=False, verbose=False):
    y_pred = model.predict(X)    
    y_prob = model.predict_proba(X)    
    return y, y_pred, y_prob

class CategoricalColumnTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.base = make_pipeline(
            SelectPercentile(mutual_info_classif, percentile=30), 
            OneHotEncoder(handle_unknown='ignore', sparse_output=False),
        ).fit(X_train[categorical_columns], y_train)
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = self.base.transform(X)
        return X

    
class NumericalColumnTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.base = make_pipeline(
            SelectPercentile(f_classif, percentile=30), 
            SplineTransformer(degree=3, n_knots=5, knots=['uniform', 'quantile'][0], extrapolation=['error', 'constant', 'linear', 'continue', 'periodic'][1], include_bias=True, order=['C', 'F'][0]),
        ).fit(X_train[numerical_columns], y_train)
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = self.base.transform(X)
        return X


# data preprocessing
categorical_columns = ['X0', 'X1', 'X2', 'X3', 'X4']
numerical_columns = ['X5', 'X6', 'X7', 'X8', 'X9']
X, y = make_classification(n_samples=1000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0, random_state=0)
data = pd.DataFrame(np.c_[X, y], columns=categorical_columns + numerical_columns + ['y'])
data.loc[:, categorical_columns] = KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][0], strategy=['uniform', 'quantile', 'kmeans'][0]).fit_transform(data[categorical_columns])
X = data.loc[:, data.columns != 'y']
y = data.loc[:, data.columns == 'y']['y']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.2, shuffle=None, stratify=None, random_state=0)


# modeling through pipelining
notification = """NOTE"""
def identity(X): return X
cct = CategoricalColumnTransformer()
nct = NumericalColumnTransformer()

pipelines = dict()
pipelines['scenario_1'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), linear_model.LogisticRegression()) 
pipelines['scenario_2'] = make_pipeline(make_column_transformer((cct, categorical_columns), (KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][2], strategy=['uniform', 'quantile', 'kmeans'][0]), numerical_columns)), naive_bayes.MultinomialNB()) 
pipelines['scenario_3'] = make_pipeline(make_column_transformer((FunctionTransformer(identity), categorical_columns), (nct, numerical_columns)), discriminant_analysis.LinearDiscriminantAnalysis()) 
pipelines['scenario_4'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), svm.SVC()) 
pipelines['scenario_5'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), neighbors.KNeighborsClassifier()) 
pipelines['scenario_6'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), tree.ExtraTreeClassifier()) 
pipelines['scenario_7'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), ensemble.GradientBoostingClassifier()) 

# syntax: <estimator>__<parameter>
param_grids = dict()
param_grids['logisticregression'] = {
    'logisticregression__fit_intercept' : [False],
    'logisticregression__solver' : ['newton-cholesky'],
    'logisticregression__C':[.001, .005, .007,]
}
param_grids['multinomialnb'] = {
    'multinomialnb__class_prior':[(.1, .9), (.2, .8), (.3, .7)],
}
param_grids['lineardiscriminantanalysis'] = {
    'lineardiscriminantanalysis__priors':[(.1, .9), (.2, .8), (.3, .7)],
}
param_grids['svc'] = {
    'svc__probability':[True],
    'svc__C' : [.01, .05, .07],
    'svc__kernel' : ['rbf'],
}
param_grids['kneighborsclassifier'] = {
    'kneighborsclassifier__n_neighbors': [20, 30],
    'kneighborsclassifier__metric': ['minkowski'],    
    'kneighborsclassifier__weights': ['uniform'],
    'kneighborsclassifier__algorithm': ['auto'],
    'kneighborsclassifier__p' : [1],
    'kneighborsclassifier__leaf_size' : [30, 50, 100],
}
param_grids['extratreeclassifier'] = {
    'extratreeclassifier__max_depth':[10, 20, 30], 
    'extratreeclassifier__min_impurity_decrease':[.01, .05, .1],    
}
param_grids['gradientboostingclassifier'] = {
    'gradientboostingclassifier__n_estimators':[10, 30, 50], 
    'gradientboostingclassifier__subsample':[.7,.8, 1], 
    'gradientboostingclassifier__min_samples_split':[2], 
    'gradientboostingclassifier__min_impurity_decrease':[.01, .05, .1],    
}



# https://scikit-learn.org/stable/modules/model_evaluation.html
#binary_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
cv_results = dict(); curve_points = dict()
cross_validation = RepeatedStratifiedKFold(n_splits=5, n_repeats=3,random_state=0) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None)
for scenario, pipeline in tqdm(deepcopy(pipelines).items()):    
    model_name = list(pipeline.named_steps.keys())[-1]
    param_grid = param_grids[model_name]

    # [STEP1]: validation for multiple parameter
    pipeline = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=cross_validation, scoring='accuracy', return_train_score=True)
    pipeline.fit(X_train, y_train)
    pipelines.update({scenario:pipeline})
    #pipeline.cv_results_
    #pipeline.best_params_
    #pipeline.best_estimator_
    #pipeline.best_score_
        
    # [STEP2]: prediction & evaluation
    scenario_description = '/'.join(list(map(lambda step: step[0], pipeline.get_params()['estimator__steps'])))
    best_param = '/'.join(list(map(lambda param: '_'.join(param[0].split('__')[1:]) + ': '+str(param[1]), pipeline.best_params_.items())))
    y_train_pred, y_train_prob, evaluation = prediction_with(pipeline, X_train, y_train, model_name=model_name, model_info=best_param, domain_kind='train', scenario=scenario_description, reposit=True, note=notification)
    y_test_pred, y_test_prob, evaluation = prediction_with(pipeline, X_test, y_test, model_name=model_name, model_info=best_param, domain_kind='test', scenario=scenario_description, note=notification)
    
    # [STEP3]: scenario visualization data
    cv_results[scenario] = cross_validate(pipeline.best_estimator_, X_train, y_train, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cross_validation, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0) # cross-validation multiple scoring
    curve_points[scenario] = (y_test, y_test_prob)

    
ResultVisualizer.training_scenario(cv_results)  
ResultVisualizer.tradeoff_curve(curve_points)

#y_pred, y_prob, evaluation = prediction_with(pipelines['scenario_1'], X_test[:10], y_test[:10], note='virtual_test', reposit=False, verbose=True)
evaluation.tail(int(len(pipelines)*2))

Binary Confusion Matrix

import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=1000, n_features=3, n_informative=2, n_redundant=1, n_repeated=0, n_classes=2, n_clusters_per_class=1, 
    weights=[.6, .4], flip_y=0, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0, shuffle=True, random_state=0)

classifier = LogisticRegression(penalty='l1', solver='liblinear')
classifier.fit(X, y)

y_true = y 
y_pred = classifier.predict(X)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['ACTUAL', 'PRED'])



# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[df['ACTUAL']], columns=[df['PRED']], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)

# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True))

ROC/PR Curve for Binary Classification

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import roc_curve, precision_recall_curve, auc

X, y = make_classification(n_samples=100, n_features=5, n_informative=2, n_redundant=1, n_repeated=1, n_classes=2, n_clusters_per_class=1)
classifier = LogisticRegression()
classifier.fit(X, y)

y_true = y 
y_prob = classifier.predict_proba(X)
y_pred = classifier.predict(X)

confusion_matrix = confusion_matrix(y_true, y_pred)
recall = confusion_matrix[1, 1]/(confusion_matrix[1, 0]+confusion_matrix[1, 1])
fallout = confusion_matrix[0, 1]/(confusion_matrix[0, 0]+confusion_matrix[0, 1])
precision = confusion_matrix[0, 0]/(confusion_matrix[0, 0]+confusion_matrix[1, 0])
fpr, tpr1, thresholds1 = roc_curve(y_true, y_prob[:,1])
ppv, tpr2, thresholds2 = precision_recall_curve(y_true, y_prob[:,1])

# visualization
print('- ROC AUC:', auc(fpr, tpr1))
print('- PR AUC:', auc(tpr2, ppv))
print(classification_report(y_true, y_pred, target_names=['down', 'up']))
plt.figure(figsize=(25, 7))
ax0 = plt.subplot2grid((1,2), (0,0))
ax1 = plt.subplot2grid((1,2), (0,1))

ax0.plot(fpr, tpr1, 'o-') # X-axis(fpr): fall-out / y-axis(tpr): recall
ax0.plot([fallout], [recall], 'bo', ms=10)
ax0.plot([0, 1], [0, 1], 'k--')
ax0.set_xlabel('Fall-Out')
ax0.set_ylabel('Recall')

ax1.plot(tpr2, ppv, 'o-') # X-axis(tpr): recall / y-axis(ppv): precision
ax1.plot([recall], [precision], 'bo', ms=10)
ax1.plot([0, 1], [1, 0], 'k--')
ax1.set_xlabel('Recall')
ax1.set_ylabel('Precision')
plt.show()

 

 

 


Task: Multi-Class Classification

: precision&recall trade-off

Models

from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, naive_bayes, tree, neighbors, discriminant_analysis, svm, neural_network

X, y = make_classification(n_samples=3000, n_features=10, n_classes=3, n_clusters_per_class=1, weights=[.6, .3, .1], flip_y=0)

classifier = linear_model.LogisticRegression(penalty='l2', max_iter=500)
classifier = ensemble.AdaBoostClassifier()
classifier = ensemble.GradientBoostingClassifier()
classifier = ensemble.BaggingClassifier()
classifier = ensemble.ExtraTreesClassifier()
classifier = ensemble.RandomForestClassifier()
classifier = tree.DecisionTreeClassifier()
classifier = tree.ExtraTreeClassifier()
classifier = neighbors.KNeighborsClassifier()
classifier = neighbors.RadiusNeighborsClassifier()
classifier = naive_bayes.GaussianNB()
classifier = discriminant_analysis.LinearDiscriminantAnalysis()
classifier = discriminant_analysis.QuadraticDiscriminantAnalysis()
classifier = svm.SVC(kernel='poly', probability=True, max_iter=10000) # kernel{‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
classifier = neural_network.MLPClassifier(hidden_layer_sizes=(100,), max_iter=100, activation='relu', solver='adam', learning_rate='adaptive')

classifier.fit(X, y)
classifier.predict(X)
classifier.predict_proba(X)

Metrics

from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=5, n_clusters_per_class=1, weights=[1/10, 3/10, 2/10, 1/10, 3/10])
classifier = LogisticRegression()
classifier.fit(X, y)

y_true = y 
y_pred = classifier.predict(X)

## CLASSIFICATION
metrics.confusion_matrix(y_true, y_pred)
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred, average='micro')
metrics.precision_score(y_true, y_pred, average='micro')
metrics.f1_score(y_true, y_pred, average='micro')
metrics.fbeta_score(y_true, y_pred, beta=2, average='micro')
metrics.jaccard_score(y_true, y_pred, average='micro')
metrics.hamming_loss(y_true, y_pred)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)

Pipelines

#

Validation

# https://scikit-learn.org/stable/modules/model_evaluation.html

import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold

def parameter_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
    return scores

def scenario_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index.name = 'split' # Repeated Stratified KFold: n_repeats * k_flod
    return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)

X, y = make_classification(n_samples=10000, n_features=5, n_classes=3, n_clusters_per_class=1, weights=[.6, .3, .1], flip_y=0, n_informative=4, n_redundant=0, n_repeated=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control

# version1: multiple parameters
classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, param_grid={'lineardiscriminantanalysis__priors':[(.1, .1, .8), (.4, .3, .3), None]}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'][1], return_train_score=True)
classifier.fit(X, y)
parameter_validation_scores(classifier.cv_results_)

# version2: multiple scoring
# v.s. cross_val_score(classifier, X, y, scoring='accuracy', cv=cv, fit_params=None, n_jobs=-1, verbose=0)
classifier = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('lineardiscriminantanalysis', LinearDiscriminantAnalysis())])
cv_results = cross_validate(classifier, X, y, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)

Evaluation

#

 

Multi Class Confusion Matrix

import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=3000, n_features=7, n_informative=4, n_redundant=1, n_repeated=0, n_classes=5, n_clusters_per_class=1, 
    weights=None, flip_y=0, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0, shuffle=True, random_state=0)

classifier = LogisticRegression(penalty='l1', solver='liblinear')
classifier.fit(X, y)

y_true = y 
y_pred = classifier.predict(X)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['ACTUAL', 'PRED'])



# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[df['ACTUAL']], columns=[df['PRED']], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)

# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True))

ROC/PR Curve for Multi Class Classification

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import roc_curve, precision_recall_curve, auc

def multiclass_roc_curve(y_true, y_prob):
    import numpy as np
    from sklearn.metrics import roc_curve, auc
    from sklearn.preprocessing import label_binarize
    y_enco = label_binarize(y_true, classes=np.sort(np.unique(y_true)).tolist())

    fpr = dict()
    tpr = dict()
    thr = dict()
    auc_ = dict()
    for class_idx in range(np.unique(y_true).shape[0]):
        fpr[class_idx], tpr[class_idx], thr[class_idx] = roc_curve(y_enco[:, class_idx], y_prob[:, class_idx])
        auc_[class_idx] = auc(fpr[class_idx], tpr[class_idx])
    return fpr, tpr, thr, auc_

def multiclass_pr_curve(y_true, y_prob):
    import numpy as np
    from sklearn.metrics import precision_recall_curve, auc
    from sklearn.preprocessing import label_binarize
    y_enco = label_binarize(y_true, classes=np.sort(np.unique(y_true)).tolist())

    ppv = dict()
    tpr = dict()
    thr = dict()
    auc_ = dict()
    for class_idx in range(np.unique(y_true).shape[0]):
        ppv[class_idx], tpr[class_idx], thr[class_idx] = precision_recall_curve(y_enco[:, class_idx], y_prob[:, class_idx])
        auc_[class_idx] = auc(tpr[class_idx], ppv[class_idx])
    return ppv, tpr, thr, auc_

X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=5, n_clusters_per_class=1, weights=[1/10, 3/10, 2/10, 1/10, 3/10])
classifier = LogisticRegression()
classifier.fit(X, y)

y_true = y 
y_pred = classifier.predict(X)
y_prob = classifier.predict_proba(X)
fpr, tpr1, thr1, auc1 = multiclass_roc_curve(y_true, y_prob)
ppv, tpr2, thr2, auc2 = multiclass_pr_curve(y_true, y_prob)

# visualization
print(classification_report(y_true, y_pred, target_names=['A', 'B', 'C', 'D', 'E']))
plt.figure(figsize=(25, 7))
ax0 = plt.subplot2grid((1,2), (0,0))
ax1 = plt.subplot2grid((1,2), (0,1))
for class_idx in range(np.unique(y_true).shape[0]):
    ax0.plot(fpr[class_idx], tpr1[class_idx], 'o-', ms=5, label=str(class_idx) + f' | {round(auc1[class_idx], 2)}')    
    ax1.plot(tpr2[class_idx], ppv[class_idx], 'o-', ms=5, label=str(class_idx) + f' | {round(auc2[class_idx], 2)}')

ax0.plot([0, 1], [0, 1], 'k--')
ax0.set_xlabel('Fall-Out')
ax0.set_ylabel('Recall')
ax0.legend() 
ax1.plot([0, 1], [1, 0], 'k--')
ax1.set_xlabel('Recall')
ax1.set_ylabel('Precision')
ax1.legend()
plt.show()

Imbalance Class Evaluation Metrics

#

 

 

 


Task: Multi-Label Classification

: precision&recall trade-off

Models

#

Metrics

#

Pipelines

#

Validation

#

Evaluation

#

 

 


Task: Regression

: bias&variance trade-off

Models

import numpy as np
from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, neighbors, tree, svm, neural_network

X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)

regressor = linear_model.LinearRegression()
regressor = linear_model.Ridge(alpha=.5)
regressor = linear_model.Lasso(alpha=0.1)
regressor = linear_model.LassoLars(alpha=.1, normalize=False)
regressor = linear_model.ElasticNet()
regressor = linear_model.BayesianRidge()
regressor = ensemble.AdaBoostRegressor()
regressor = ensemble.GradientBoostingRegressor()
regressor = ensemble.BaggingRegressor()
regressor = ensemble.ExtraTreesRegressor()
regressor = ensemble.RandomForestRegressor()
regressor = neighbors.KNeighborsRegressor()
regressor = neighbors.RadiusNeighborsRegressor()
regressor = tree.DecisionTreeRegressor()
regressor = tree.ExtraTreeRegressor()
regressor = svm.LinearSVR(max_iter=1000)
regressor = svm.SVR(kernel='poly', max_iter=1000) # kernel: {‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
regressor = svm.NuSVR(max_iter=1000)
regressor = neural_network.MLPRegressor(hidden_layer_sizes=(100,), max_iter=10, activation='relu', solver='adam', learning_rate='adaptive')

regressor.fit(X, y)
regressor.predict(X)

Metrics

from sklearn import metrics
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
regressor = LinearRegression()
regressor.fit(X, y)

y_true = y 
y_pred = regressor.predict(X)

# REGRESSION
metrics.explained_variance_score(y_true, y_pred)
metrics.max_error(y_true, y_pred)
metrics.mean_absolute_error(y_true, y_pred)
metrics.mean_squared_error(y_true, y_pred)
metrics.median_absolute_error(y_true, y_pred)
metrics.r2_score(y_true, y_pred)
metrics.mean_tweedie_deviance(y_true, y_pred)

Pipelines

#

Validation

# https://scikit-learn.org/stable/modules/model_evaluation.html

import pandas as pd
from sklearn.datasets import make_regression
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, KFold, RepeatedKFold
from sklearn.linear_model import LinearRegression

def parameter_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
    return scores

def scenario_validation_scores(cv_results):
    train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
    train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
    train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))

    test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
    test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
    test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))

    time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
    time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
    time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))

    scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
    scores.index.name = 'split' # Repeated KFold: n_repeats * k_flod    
    return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)

X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)

# version1: multiple parameters
regressor = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearRegression())
regressor = GridSearchCV(estimator=regressor, param_grid={'linearregression__fit_intercept':[False, True], 'linearregression__positive':[False, True]}, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'][1], return_train_score=True)
regressor.fit(X, y)
parameter_validation_scores(regressor.cv_results_)

# version2: multiple scoring
# v.s. cross_val_score(regressor, X, y, cv=cv, scoring='r2', fit_params=None, n_jobs=-1, verbose=0)
regressor = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('linearregression', LinearRegression())])
cv_results = cross_validate(regressor, X, y, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'], fit_params=None, return_train_score=True, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)

Evaluation

from datetime import datetime
from functools import wraps
import joblib
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.datasets import make_regression, make_classification
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold, KFold
from sklearn import metrics

class TemplateTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        import pandas as pd
        from statsmodels.stats.outliers_influence import variance_inflation_factor
        features_by_vif = pd.Series(
            data = [variance_inflation_factor(X, i) for i in range(X.shape[1])], 
            index = range(X.shape[1])).sort_values(ascending=True).iloc[:X.shape[1] - 1].index.tolist()
        return X[:, features_by_vif]

def evaluation(*args, **kwargs):
    def decorator(func):
        @wraps(func)
        def wrapper(model, X, y, model_name='model', domain_kind='train', verbose=False): 
            y, y_pred = func(model, X, y) 
            if verbose:
                pass
            summary = dict()
            summary['datetime'] = [datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
            summary['model'] = [model_name]                            
            summary['domain'] = [domain_kind]                
            summary['MAE'] = [metrics.mean_absolute_error(y, y_pred)]
            summary['MAPE'] = [metrics.mean_absolute_percentage_error(y, y_pred)]
            summary['MSE'] = [metrics.mean_squared_error(y, y_pred)]    
            summary['R2'] = [metrics.r2_score(y, y_pred)]
            evaluation = pd.DataFrame(summary)
            return y_pred, evaluation
        return wrapper
    return decorator

@evaluation(description="my_description")
def prediction(model, X, y, model_name='model', domain_kind='train', verbose=False):
    y_pred = model.predict(X)
    return y, y_pred
    
X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
feature_space = FeatureUnion(transformer_list=[('TemplateTransformer', TemplateTransformer()),])

pipelines = dict()
pipelines['LinearRegression'] = Pipeline(steps=[('FeatureSpace', feature_space), ('LinearRegression', LinearRegression())])

# syntax: <estimator>__<parameter>
param_grids = dict()
param_grids['LinearRegression'] = dict(
    LinearRegression__fit_intercept = [False, True]
)

# https://scikit-learn.org/stable/modules/model_evaluation.html
names = []
results = []
for idx, ((name, pipeline), param_grid) in enumerate(zip(pipelines.items(), param_grids.values())):
    scorings = ['neg_mean_squared_error']
    scoring = scorings[0]
    
    cross_validation = KFold(n_splits=10, shuffle=True, random_state=None)
    pipeline = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=cross_validation, scoring=scoring)
    pipeline.fit(X, y)

    # [STEP3]: save & load
    joblib.dump(pipeline, f'{idx}{name}_pipeline.joblib')
    pipeline = joblib.load(f'{idx}{name}_pipeline.joblib')

    # [STEP4]: prediction & evaluation
    y_pred, evaluation = prediction(pipeline, X, y, model_name=name, domain_kind='train')
    eval_table = evaluation if idx == 0 else pd.concat([eval_table, evaluation], axis=0)
        
    #print('*', name)
    names.append(name)
    results.append(cross_val_score(pipeline, X, y, cv=cross_validation, scoring=scoring))

fig = plt.figure(figsize=(25,7)); layout=(1,1); axes = dict()
axes[0] = plt.subplot2grid(layout, (0,0), fig=fig)
axes[0].boxplot(results)
axes[0].set_title('Evaluate Algorithms')
axes[0].set_xticklabels(names)

display(eval_table)
plt.show()

 

 

 

 


Task: Time Series Analysis(1): Scikit-Learn

: bias&variance trade-off

Models

#

 

Metrics

#

 

Pipelines

#

 

Validation

#

 

Evaluation

#

 

 

 


Task: Time Series Analysis(2): Statsmodels

import numpy as np
import pandas as pd
from tqdm import tqdm
import statsmodels.api as sm
import statsmodels.tsa.api as smt
from statsmodels.tsa.deterministic import DeterministicProcess, Fourier, Seasonality, TimeTrend
from sklearn.model_selection import TimeSeriesSplit

def future_frame(series, f_steps=20):
    trend = TimeTrend(constant=True, order=2)
    fourier = Fourier(period=52, order=3)
    seasonal = Seasonality(period=11)
    
    dp = DeterministicProcess(series.index, additional_terms=[trend, fourier, seasonal])
    insample = dp.in_sample(); insample['sample-domain'] = 'in'
    outofsample = dp.out_of_sample(steps=f_steps); outofsample['sample-domain'] = 'out'
    
    f_frame = pd.concat([insample, outofsample], axis=0)
    f_frame.index = f_frame.index.rename('Date')
    f_frame.loc[lambda x: x['sample-domain'] == 'in', 'ts'] = series    

    # Time Series Feature Engineering
    f_frame.drop(['trend'], axis=1, inplace=True)
    f_frame['rel-trend'] = f_frame.index.year + f_frame.index.month/12 + f_frame.index.day/365
    
    return f_frame

def tsa_evaluation(y_true, y_pred, num_params=1):
    endog = y_true
    fittedvalues = y_pred
    resid = y_true - y_pred
    
    result = dict()
    # result['sst'] = result['ssr'] + result['sse'], for only Linear Regression
    # 1 = result['r2'] + result['k2'], for all statistical cases
    
    result['sst'] = ((endog - endog.mean())**2).sum() # deviation volume > all-data variances
    result['ssr'] = ((fittedvalues - endog.mean())**2).sum() # deviation volume > all-data variances
    result['sse'] = resid.apply(lambda x: x**2).sum() # deviation volume > all-data variances
    
    result['(sse/ssr)'] = result['sse']/result['ssr'] # deviation volume ratio 
    result['(ssr/sst)explained variance'] = result['ssr']/result['sst'] # deviation volume ratio > fraction of variance explained 
    result['(ssr/sst)unexplained variance'] = result['sse']/result['sst'] # deviation volume ratio > fraction of variance unexplained (FVU)

    result['r2'] = 1 - (result['sse']/result['sst']) # [ 0 < r2 < 1 ] coefficient of determination: common variance shared bwtween the variables, (~covariance) 
    result['k2'] = 1 - result['r2'] # [ 0 < k2 < 1 ] coefficient of alienation: common variance not shared bwtween the variables

    result['y-yhat:corr'] = np.corrcoef(endog, fittedvalues)[0,1] # [ -1 < correlation < 1 ]
    result['y-yhat:cov'] = np.cov(endog, fittedvalues)[0,1]
    result['y-yhat:leverage'] = np.cov(endog, fittedvalues)[0,1]/np.cov(endog, fittedvalues)[0,0] # [ 0 < leverage < 1 ]
    result['yhat-resid:corr'] = np.corrcoef(fittedvalues, resid)[0,1]
    result['yhat-resid:cov'] = np.cov(fittedvalues, resid)[0,1]

    result['mst'] = endog.var(ddof=1) # deviation area > individual variance
    result['msr'] = ((fittedvalues - endog.mean())**2).sum() / num_params # deviation area > individual variance
    result['mse'] = resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params) # deviation area > individual variance
    
    result['rmse'] = np.sqrt(resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params)) # deviation length
    result['mae'] = resid.apply(np.abs).sum()/(resid.shape[0]-num_params) # deviation length
    result['mape'] = (resid.apply(np.abs) / (fittedvalues + resid).apply(np.abs)).sum() / (resid.shape[0]-num_params) # deviation length   
    return result

# given data
y = smt.ArmaProcess(ar=[1, .3, -.5], ma=[1, .1, .3]).generate_sample(nsample=600, burnin=50).cumsum()
y = pd.Series(y, index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))

training_periods = 300
forecasting_steps = 20

y_future_frame = future_frame(y, f_steps=forecasting_steps).asfreq('D')
endog_past = y_future_frame.loc[lambda x: x['sample-domain'] == 'in', 'ts'].copy()
exog_past = y_future_frame.loc[lambda x: x['sample-domain'] == 'in'].drop(['ts', 'sample-domain'], axis=1).copy()
exog_futrue = y_future_frame.loc[lambda x: x['sample-domain'] == 'out'].drop(['ts', 'sample-domain'], axis=1).copy()

# cross-validation method: n-step ahead
train_evaluations = list()
test_evaluations = list()
ts_iterator = tqdm(
    enumerate(
        TimeSeriesSplit(
            n_splits=int((y.shape[0]-training_periods)/forecasting_steps), max_train_size=training_periods, test_size=forecasting_steps, gap=0).split(y)), total=int((y.shape[0]-training_periods)/forecasting_steps))
for split_idx, (train_idx, test_idx) in ts_iterator: # n_splits = (nsample - max_train_size) / test_size
    endog_train = endog_past.iloc[train_idx].copy() # y_true: train
    endog_val = endog_train.iloc[-forecasting_steps:].copy() # y_true: validation
    endog_test = endog_past.iloc[test_idx].copy() # y_true: test
    exog_train = exog_past.iloc[train_idx].copy()
    exog_val = exog_train.iloc[-forecasting_steps:].copy()    
    exog_test = exog_past.iloc[test_idx].copy()

    #model = smt.SARIMAX(endog=endog_train, exog=exog_train, order=(2,1,2), trend='n', freq='D').fit(disp=0)
    #y_pred_val = model.predict(start=endog_val.index[0], end=endog_val.index[-1])
    #y_pred_test = model.forecast(steps=forecasting_steps, exog=exog_test)
    model = sm.OLS(endog=endog_train, exog=exog_train).fit()    
    y_pred_val = model.predict(exog_val)
    y_pred_test = model.predict(exog_test)
    
    evaluation_train = tsa_evaluation(endog_val, y_pred_val)
    evaluation_test = tsa_evaluation(endog_test, y_pred_test)
    evaluation_train.update(TrainStart=endog_train.index[0], TrainEnd=endog_train.index[-1], TrainPeriods=endog_train.shape[0], TestStart=endog_val.index[0], TestEnd=endog_val.index[-1], TestPeriods=endog_val.shape[0])
    evaluation_test.update(TrainStart=endog_train.index[0], TrainEnd=endog_train.index[-1], TrainPeriods=endog_train.shape[0], TestStart=endog_test.index[0], TestEnd=endog_test.index[-1], TestPeriods=endog_test.shape[0])
    evaluation_train = pd.DataFrame(data=evaluation_train.values(), index=evaluation_train.keys(), columns=pd.Index([f'split{split_idx}'], name='n_split'))
    evaluation_test = pd.DataFrame(data=evaluation_test.values(), index=evaluation_test.keys(), columns=pd.Index([f'split{split_idx}'], name='n_split'))
    train_evaluations.append(evaluation_train)
    test_evaluations.append(evaluation_test)

past_evaluation_train = pd.concat(train_evaluations, axis=1).T.reset_index().set_index(['n_split', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']); past_evaluation_train['domain'] = 'train'
past_evaluation_test = pd.concat(test_evaluations, axis=1).T.reset_index().set_index(['n_split', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']); past_evaluation_test['domain'] = 'test'
past_evaluation = pd.concat([past_evaluation_train, past_evaluation_test], axis=0).reset_index().set_index(['n_split', 'domain', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']).astype(float).sort_values(by=['n_split'])

final_result = sm.OLS(endog=endog_past.iloc[-training_periods:], exog=exog_past.iloc[-training_periods:]).fit()
y_future_frame.loc[lambda x: x['sample-domain'] == 'out', 'ts'] = final_result.predict(exog_futrue)

past_evaluation.groupby(['domain']).mean().T

 

 

 

 

 


Reference

Scikit-learn modeling API

더보기
from sklearn import linear_model
#linear_model.enet_path(X, y)
#linear_model.lars_path(X, y)
#linear_model.lars_path_gram(Xy, Gram)
#linear_model.lasso_path(X, y)
#linear_model.orthogonal_mp(X, y)
#linear_model.orthogonal_mp_gram(Gram, Xy)
#linear_model.ridge_regression(X, y, alpha)
linear_model.LogisticRegression()
linear_model.LogisticRegressionCV()
linear_model.PassiveAggressiveClassifier()
linear_model.Perceptron()
linear_model.RidgeClassifier()
linear_model.RidgeClassifierCV()
linear_model.SGDClassifier()
linear_model.ElasticNet()
linear_model.ElasticNetCV()
linear_model.Lars()
linear_model.LarsCV()
linear_model.Lasso()
linear_model.LassoCV()
linear_model.LassoLars()
linear_model.LassoLarsCV()
linear_model.LassoLarsIC()
linear_model.OrthogonalMatchingPursuit()
linear_model.OrthogonalMatchingPursuitCV()
linear_model.ARDRegression()
linear_model.BayesianRidge()
linear_model.MultiTaskElasticNet()
linear_model.MultiTaskElasticNetCV()
linear_model.MultiTaskLasso()
linear_model.MultiTaskLassoCV()
linear_model.HuberRegressor()
linear_model.RANSACRegressor()
linear_model.TheilSenRegressor()
linear_model.PoissonRegressor()
linear_model.TweedieRegressor()
linear_model.GammaRegressor()
linear_model.PassiveAggressiveRegressor()

from sklearn import ensemble
#ensemble.StackingClassifier(estimators)
#ensemble.StackingRegressor(estimators)
#ensemble.VotingClassifier(estimators)
#ensemble.VotingRegressor(estimators)
ensemble.AdaBoostClassifier()
ensemble.AdaBoostRegressor()
ensemble.BaggingClassifier()
ensemble.BaggingRegressor()
ensemble.ExtraTreesClassifier()
ensemble.ExtraTreesRegressor()
ensemble.GradientBoostingClassifier()
ensemble.GradientBoostingRegressor()
ensemble.IsolationForest()
ensemble.RandomForestClassifier()
ensemble.RandomForestRegressor()
ensemble.RandomTreesEmbedding()

from sklearn import naive_bayes
naive_bayes.BernoulliNB()
naive_bayes.CategoricalNB()
naive_bayes.ComplementNB()
naive_bayes.GaussianNB()
naive_bayes.MultinomialNB()

from sklearn import neighbors
#neighbors.BallTree(X[, leaf_size, metric])
#neighbors.KDTree(X[, leaf_size, metric])
#neighbors.kneighbors_graph(X, n_neighbors)
#neighbors.radius_neighbors_graph(X, radius)
neighbors.KernelDensity()
neighbors.KNeighborsClassifier()
neighbors.KNeighborsRegressor()
neighbors.KNeighborsTransformer()
neighbors.LocalOutlierFactor()
neighbors.RadiusNeighborsClassifier()
neighbors.RadiusNeighborsRegressor()
neighbors.RadiusNeighborsTransformer()
neighbors.NearestCentroid()
neighbors.NearestNeighbors()
neighbors.NeighborhoodComponentsAnalysis()

from sklearn import neural_network
neural_network.BernoulliRBM()
neural_network.MLPClassifier()
neural_network.MLPRegressor()

from sklearn import svm
svm.LinearSVC()
svm.LinearSVR()
svm.NuSVC()
svm.NuSVR()
svm.OneClassSVM()
svm.SVC()
svm.SVR()

from sklearn import tree
#tree.export_graphviz(decision_tree)
#tree.export_text(decision_tree)tree.DecisionTreeClassifier()
tree.DecisionTreeRegressor()
tree.ExtraTreeClassifier()
tree.ExtraTreeRegressor()

from sklearn import discriminant_analysis
discriminant_analysis.LinearDiscriminantAnalysis()
discriminant_analysis.QuadraticDiscriminantAnalysis()

from sklearn import gaussian_process
gaussian_process.GaussianProcessClassifier()
gaussian_process.GaussianProcessRegressor()

from sklearn import pipeline
#pipeline.FeatureUnion(transformer_list)
#pipeline.Pipeline(steps)
#pipeline.make_pipeline(*steps[, memory, verbose])
#pipeline.make_union(*transformers[, n_jobs, ...])

from sklearn import model_selection
#model_selection.GridSearchCV(estimator, param_grid)
#model_selection.ParameterGrid(param_grid)
#model_selection.ParameterSampler(param_distributions, n_iter)
#model_selection.RandomizedSearchCV(estimator, param_distributions)

Scikit-learn evaluation API

더보기
from sklearn import metrics

## CLASSIFICATION
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.classification_report(y_true, y_pred)
metrics.confusion_matrix(y_true, y_pred)
metrics.f1_score(y_true, y_pred)
metrics.fbeta_score(y_true, y_pred, beta)
metrics.hamming_loss(y_true, y_pred)
metrics.jaccard_score(y_true, y_pred)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.precision_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)
#metrics.brier_score_loss(y_true, y_prob)
#metrics.auc(x, y)
#metrics.average_precision_score(y_true, y_pred)
#metrics.cohen_kappa_score(y1, y2)
#metrics.dcg_score(y_true, y_score)
#metrics.det_curve(y_true, y_score)
#metrics.hinge_loss(y_true, pred_decision)
#metrics.log_loss(y_true, y_pred)
#metrics.ndcg_score(y_true, y_score)
#metrics.precision_recall_curve(y_true, y_pred)
#metrics.precision_recall_fscore_support()
#metrics.roc_auc_score(y_true, y_score)
#metrics.roc_curve(y_true, y_score)
#metrics.top_k_accuracy_score(y_true, y_score)

# REGRESSION
metrics.explained_variance_score(y_true, y_pred)
metrics.max_error(y_true, y_pred)
metrics.mean_absolute_error(y_true, y_pred)
metrics.mean_squared_error(y_true, y_pred)
metrics.median_absolute_error(y_true, y_pred)
metrics.mean_absolute_percentage_error()
metrics.r2_score(y_true, y_pred)
metrics.mean_tweedie_deviance(y_true, y_pred)
#metrics.mean_squared_log_error(y_true, y_pred)
#metrics.mean_poisson_deviance(y_true, y_pred)
#metrics.mean_gamma_deviance(y_true, y_pred)
#metrics.d2_tweedie_score(y_true, y_pred)
#metrics.mean_pinball_loss(y_true, y_pred)

ANOVA for parameter validation (grid search ANOVA)

더보기
# https://scikit-learn.org/stable/modules/model_evaluation.html

import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold

def GridSearchANOVA(cross_validation_result, verbose=True):
    from itertools import combinations
    import pandas as pd
    from scipy import stats
    import statsmodels.api as sm
    from statsmodels.formula.api import ols
    from statsmodels.stats.multicomp import MultiComparison
    
    # ANOVA data
    evaluation = pd.DataFrame(cross_validation_result).drop('params', axis=1)
    evaluation = evaluation.set_index(list(filter(lambda column: column.startswith('param'), evaluation.columns)))
    evaluation = evaluation[list(filter(lambda column: column.startswith('split'), evaluation.columns))]
    evaluation.columns = pd.MultiIndex.from_tuples(map(lambda column: (column.split('_')[1], column.split('_')[0][5:]), evaluation.columns))
    evaluation.index.names = list(map(lambda index: index[6:], evaluation.index.names))
    evaluation = evaluation.stack(0)
    evaluation.index.names = list(evaluation.index.names)[:-1] + ['domain'] # train/test
    factors = list(evaluation.index.names)

    evaluation = evaluation.stack(0).to_frame().rename(columns={0:'score'})
    evaluation.index.names = list(evaluation.index.names[:-1]) + ['subject']
    evaluation = evaluation.reset_index().fillna('None')

    for factor in factors: 
        evaluation[factor] = evaluation[factor].astype(str)
    evaluation['paired_factor'] = eval("+' & '+".join(list(map(lambda factor: "evaluation['"+factor+"']", factors))))
    
    score_means = evaluation.groupby(factors)[['score']].mean()
    if verbose:
        display(score_means)

    # ANOVA Assumption
    import pingouin as pg
    assumptions = dict()
    assumptions['homoscedasticity'] = pg.homoscedasticity(data=evaluation, dv='score', group='paired_factor')
    assumptions['normality'] = pg.normality(data=evaluation, dv='score', group='paired_factor') # Shapiro-Wilk test
        
    # Factorial ANOVA
    factors = list(map(lambda factor: 'C('+factor+')', factors))
    variance_sources = list()
    for i, sources in enumerate(map(lambda i: combinations(factors, i),  range(1, len(factors)+1))):
        sources = list(sources)
        i += 1
        for source in sources:
            source =  ':'.join(list(source)) if i != 1 else source[0]
            variance_sources.append(source)
    patsy_formula = ' ~ ' + ' + '.join(variance_sources)
    anova_table = sm.stats.anova_lm(ols('score' + patsy_formula, data=evaluation).fit(), typ=2)

    mc = MultiComparison(evaluation['score'], evaluation['paired_factor'])
    posthoc_results = mc.tukeyhsd() # TUKEY HONESTLY SIGNIFICANT DIFFERENCE (HSD)
    anova_summary = posthoc_results.summary()
    anova_summary = pd.DataFrame(anova_summary.data[1:], columns=anova_summary.data[0])
    
    if verbose:
        display(anova_table)
        display(anova_summary)
        display(posthoc_results.plot_simultaneous())
    return assumptions, score_means, anova_table, anova_summary


X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=None) # cross validation & randomness control

classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, cv=cv, scoring=['accuracy', 'recall', 'precision', 'f1'][1], return_train_score=True,
                          param_grid={
                              'powertransformer__standardize':[True, False],
                              'lineardiscriminantanalysis__priors':[(.1, .9), (.5, .5), None],
                              'lineardiscriminantanalysis__solver':['svd', 'lsqr', 'eigen'],
                          })
classifier.fit(X, y)

assumpation, score_means, anova_table, anova_summary = GridSearchANOVA(classifier.cv_results_, verbose=True)

ANOVA for scenario validation (ScenarioANOVA)

+ Recent posts