Regression and Classification Analysis
Regression Analysis
Cross-Sectional Analysis
OLS+WLS+GLS+GLM
$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\boldsymbol{\beta} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$import numpy as np
import pandas as pd
import statsmodels.api as sm
X1 = np.random.normal(0, 3, size=(300,))
X2 = np.random.normal(0, 2, size=(300,))
X3 = np.random.normal(0, 2, size=(300,))
y = .1 + 2*X1 + 3*X2 + 1*X3 + np.random.normal(0, 3, size=300)
endog = y
exog = np.c_[np.ones_like(y), X1, X2, X3]
result = sm.OLS(endog=endog, exog=exog).fit()
result = sm.WLS(endog=endog, exog=exog, weights=1/sm.OLS(endog=endog, exog=exog).fit().resid**2).fit()
result = sm.GLS(endog=endog, exog=exog, sigma=np.diag(np.kron(sm.OLS(endog=endog, exog=exog).fit().resid.var(ddof=0), np.identity(endog.shape[0])))).fit()
result = sm.GLM(endog=endog, exog=exog, family=sm.families.Gaussian(sm.families.links.Identity())).fit()
result.summary()
$${\displaystyle
\begin{aligned}
\mathbf{y} = &\underbrace{\mathbf{X}}_{\text{Uncertainty}}\boldsymbol{\beta} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\
\end{aligned}
}$$
IV2SLS
import numpy as np
import pandas as pd
from statsmodels.sandbox.regression import gmm
Z = np.random.normal(0, 1, size=1000)
epsilon = np.random.normal(0, 1, size=1000)
epsilon = epsilon - ( np.cov(epsilon, Z)[0,1]/ np.cov(Z, Z)[0,1] ) * Z
X = 3*Z + 1 + epsilon + np.random.normal(size=1000)
y = 5*X + 4 + epsilon
data = pd.DataFrame(np.c_[y, np.ones_like(y), X, Z], columns=['y', 'const', 'X', 'Z'])
result = gmm.IV2SLS(data['y'], exog=data[['const', 'X']], instrument=data[['const', 'Z']]).fit()
result.summary()
import numpy as np
import pandas as pd
import linearmodels.iv.model as lm
Z = np.random.normal(0, 1, size=1000)
epsilon = np.random.normal(0, 1, size=1000)
epsilon = epsilon - ( np.cov(epsilon, Z)[0,1]/ np.cov(Z, Z)[0,1] ) * Z
X = 3*Z + 1 + epsilon + np.random.normal(size=1000)
y = 5*X + 4 + epsilon
data = pd.DataFrame(np.c_[y, np.ones_like(y), X, Z], columns=['y', 'const', 'X', 'Z'])
result = lm.IV2SLS(dependent=data['y'], exog=data['const'], endog=data['X'], instruments=data['Z']).fit(cov_type="homoskedastic", debiased=True)
result.wu_hausman() # Wu-Hausman test of exogeneity
result.wooldridge_regression # Wooldridge's regression test of exogeneity(Wu-Hausman test results)
result.sargan # Sargan's test
result.summary
$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\boldsymbol{\beta} + \underbrace{\boldsymbol{\varepsilon}}_{\text{Uncertainty}}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$
Nested Group Effect
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
n_group1 = 20; n_group2 = 100
data = pd.DataFrame(data=0, columns=['e'], index=pd.MultiIndex.from_product([np.arange(n_group1), np.arange(n_group2)], names=['G1', 'G2']))
G1_effect = data.groupby(['G1'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 2)).rename(columns={'e':'G1_Effect'})
G1G2_effect = data.groupby(['G1', 'G2'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 3)).rename(columns={'e':'G1G2_Effect'})
data = pd.concat([data, G1_effect, G1G2_effect], axis=1)
data = pd.concat([data]*10, axis=0)
data['y'] = data['G1_Effect'] + data['G1G2_Effect'] + np.random.normal(0, 1, size=data.shape[0])
result = smf.mixedlm(
"y ~ 1",
re_formula="1",
vc_formula={"G2": "0 + C(G2)"},
groups="G1",
data=data.reset_index(),
).fit() # nested effect
print(data['y'].var())
print(data.groupby('G1')['y'].mean().var())
print(data.groupby('G2')['y'].mean().var()) # ~ 0
print(data.groupby(['G1', 'G2'])['y'].mean().var() - data.groupby('G1')['y'].mean().var())
result.summary()
Crossed Group Effect
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
n_group1 = 20; n_group2 = 100
group = pd.DataFrame(data=0, columns=['e'], index=pd.MultiIndex.from_product([np.arange(n_group1), np.arange(n_group2)], names=['G1', 'G2']))
G1_effect = group.groupby(['G1'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 2)).rename(columns={'e':'G1_Effect'})
G2_effect = group.groupby(['G2'], group_keys=False)[['e']].apply(lambda e: e + np.random.normal(0, 3)).rename(columns={'e':'G2_Effect'})
group = pd.concat([group, G1_effect, G2_effect], axis=1)
group = pd.concat([group]*10, axis=0)
group['y'] = group['G1_Effect'] + group['G2_Effect'] + np.random.normal(0, 1, size=group.shape[0])
result = smf.mixedlm(
"y ~ 1",
re_formula=None,
vc_formula={"G1": "0 + C(G1)", "G2": "0 + C(G2)"},
groups=np.ones_like(group['y']),
data=group.reset_index(),
).fit() # crossed effect
print(group['y'].var(ddof=1))
print(group.groupby(['G1'])['y'].mean().var(ddof=1), group.groupby(['G1'])['y'].var(ddof=1).mean())
print(group.groupby(['G2'])['y'].mean().var(ddof=1), group.groupby(['G2'])['y'].var(ddof=1).mean())
print(group.groupby(['G1', 'G2'])['y'].mean().var(ddof=1), group.groupby(['G1', 'G2'])['y'].var(ddof=1).mean())
display(result.summary())
g1_result = smf.ols('y ~ 0+C(G1)', data=group.reset_index()).fit()
g2_result = smf.ols('y ~ 0+C(G2)', data=group.reset_index()).fit()
print(g1_result.params.mean(), g1_result.fittedvalues.var(ddof=1))
print(g2_result.params.mean(), g2_result.fittedvalues.var(ddof=1))
$${\displaystyle \begin{aligned} \mathbf{y} = &\mathbf{X}\underbrace{\boldsymbol{\beta}}_{\text{Uncertainty}} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \sigma_{\boldsymbol{\varepsilon}}^{2}\mathbf{I}_{n}) \\ \end{aligned} }$$
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
import statsmodels.stats.api as sms
np.random.seed(0)
N = np.random.normal(size=1000)
C = np.random.randint(3, size=1000)
X = np.c_[N, C]
y0 = 1 + 2 * X[np.where(C==0)[0], 0] + np.random.normal(0, 1, size=np.where(C==0)[0].shape[0])
y1 = 2 + 3 * X[np.where(C==1)[0], 0] + np.random.normal(0, 1, size=np.where(C==1)[0].shape[0])
y2 = 3 + 5 * X[np.where(C==2)[0], 0] + np.random.normal(0, 1, size=np.where(C==2)[0].shape[0])
y = np.r_[y0, y1, y2]
X = np.r_[X[np.where(C==0)[0]], X[np.where(C==1)[0]], X[np.where(C==2)[0]]]
data = pd.DataFrame(np.c_[y, X], columns=['y', 'X', 'group'])
data['group'] = data['group'].astype('category')
data['const'] = np.ones_like(data['y'])
# REML: restricted maximum likelihood
# Varying-coefficient model for intercepts and slopes
result = smf.mixedlm(formula='y ~ 1 + X', re_formula='~ 1 + X', groups=data['group'], data=data).fit(method=['nm', 'bfgs', 'lbfgs', 'powell', 'cg', 'basinhopping', 'minimize'][1], start_params=None, reml=True, niter_sa=0, do_cg=True, fe_pen=None, cov_pen=None, free=None, full_output=False)
display(result.summary())
beta1 = pd.DataFrame(result.random_effects).rename(index={'Group':'Intercept'}).add(result.fe_params, axis=0)
beta2 = smf.ols('y ~ 0 + group + group:X', data=data).fit().summary()
Factor Analysis
$${\displaystyle \begin{aligned} \mathbf{X} - \mathbf{M} = \underbrace{\mathbf{L}}_{\text{Loadings}} \; \underbrace{\mathbf{F}}_{\text{Factor}} + \boldsymbol{\varepsilon}, \quad \boldsymbol{\varepsilon} \sim \mathcal{N}(\mathbf{0}, \boldsymbol{\Sigma}) \\ \end{aligned} }$$import numpy as np
import statsmodels.api as sm
L = np.random.normal(size=(5,3)) # loadings
F = np.random.normal(size=(400, 3)) # factor scores
X = (L@F.T).T + np.random.multivariate_normal(np.zeros(5), np.eye(5)*[10,10,10,1,1], size=400)
result = sm.Factor(endog=X, n_factor=3, corr=None, method=['pa', 'ml'][0], smc=True, endog_names=None, nobs=None, missing='drop').fit()
result.rotate(method=['varimax', 'quartimax', 'biquartimax', 'equamax', 'oblimin', 'parsimax', 'parsimony', 'biquartimin', 'promax'][-1])
result.summary()
L_ = result.loadings
F_ = result.factor_scoring()
residuals = (X - X.mean(axis=0) - (L_@F_.T).T)
np.c_[residuals.mean(axis=0), residuals.var(axis=0)]
Time Series Analysis
Arch in Mean
import numpy as np
import pandas as pd
from arch.univariate import ARCHInMean, ARCH, Normal
white_noise = np.random.normal(0, 1, size=1000) # Standard Noise
y = np.empty_like(white_noise)
epsilon = np.empty_like(white_noise)
sigma2 = np.empty_like(white_noise)
for t, std_noise in enumerate(white_noise):
sigma2[t] = 2 + .5*epsilon[t-1]**2 + .3*epsilon[t-2]**2
epsilon[t] = std_noise * np.sqrt(sigma2[t])
y[t] = -.3*y[t-1] + .7*np.sqrt(sigma2[t]) + epsilon[t] # kappa
data = pd.DataFrame(np.c_[y] ,columns=['y'], index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))
result = ARCHInMean(data['y'], constant=False, lags=[1], form=["log", "vol", "var"][1], volatility=ARCH(2), distribution=Normal(), rescale=False).fit(disp=0) # lags:scalar > HAR, lags:1d-array > AR
result.summary()
Factor Analysis
#
Panel Analysis
#
Classification Analysis
Cross Validation
Big Data Task: Classification
- [X-(y) relevant split] StratifiedKFold, RepeatedStratifiedKFold, StratifiedShuffleSplit
- [X-(G-y) relevant split] GroupKFold, GroupShuffleSplit
Big Data Task: Regession
- [X-y irrelevant split] KFold, RepeatedKFold, ShuffleSplit
- [X-(G-y) relevant split] GroupKFold, GroupShuffleSplit
Big Data Task: Time Series
- [X-y irrelevant split] TimeSeriesSplit
Small Data Task
- [X-(G-y) relevant split] LeavePGroupsOut, LeaveOneGroupOut
- [X-y irrelevant split] LeavePOut, LeaveOneOut
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import TimeSeriesSplit
from sklearn.model_selection import LeaveOneGroupOut, LeaveOneOut, LeavePGroupsOut, LeavePOut
from sklearn.model_selection import GroupKFold, StratifiedGroupKFold, GroupShuffleSplit
from sklearn.model_selection import StratifiedKFold, RepeatedStratifiedKFold, StratifiedShuffleSplit
from sklearn.model_selection import KFold, RepeatedKFold, ShuffleSplit
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv_spliter = StratifiedKFold(n_splits=5, shuffle=False, random_state=None).split(X,y)
cv_spliter = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None).split(X,y)
cv_spliter = StratifiedShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X,y)
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv_spliter = KFold(n_splits=5, shuffle=False, random_state=None).split(X)
cv_spliter = RepeatedKFold(n_splits=5, n_repeats=3, random_state=None).split(X)
cv_spliter = ShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X)
X = np.array([[1, 2],
[3, 4],
[5, 6],
[7, 8],
[3, 4],
[5, 6]])
y = np.array([1,
2,
1,
2,
1,
2])
cv_spliter = ShuffleSplit(n_splits=5, train_size=0.75, test_size=.25, random_state=None).split(X)
cv_spliter = LeavePOut(p=3).split(X)
cv_spliter = LeaveOneOut().split(X)
cv_spliter = TimeSeriesSplit(n_splits=5, max_train_size=None, test_size=None, gap=0).split(y) # pd.DataFrame(cv_spliter, columns=['X', 'y'])
X = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
y = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32]
groups = ['a','a', 'a', 'b', 'b', 'b', 'c', 'c', 'c', 'c', 'd', 'd', 'd', 'd', 'd', 'd']
cv_spliter = GroupKFold().split(X, y, groups=groups)
cv_spliter = StratifiedGroupKFold().split(X, y, groups=groups)
cv_spliter = GroupShuffleSplit().split(X, y, groups=groups)
cross validation
# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import cross_validate, RepeatedStratifiedKFold, RepeatedKFold
from sklearn.linear_model import LinearRegression, LogisticRegression
# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
cv_results = cross_validate(LogisticRegression(), X, y, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
cv_results = cross_validate(LogisticRegression(), X, y, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)
cv_results = cross_validate(LinearRegression(), X, y, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LogisticRegression
from mlxtend.evaluate import bias_variance_decomp
# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=None, shuffle=True, stratify=y)
avg_expected_loss, avg_bias, avg_var = bias_variance_decomp(LogisticRegression(), X_train, y_train, X_test, y_test, loss='0-1_loss', random_seed=None)
# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=None, shuffle=True)
avg_expected_loss, avg_bias, avg_var = bias_variance_decomp(LinearRegression(), X_train, y_train, X_test, y_test, loss='mse', random_seed=None)
grid search
# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import GridSearchCV, RepeatedStratifiedKFold, RepeatedKFold
from sklearn.linear_model import LinearRegression, LogisticRegression
# classification
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
classifier = GridSearchCV(estimator=LogisticRegression(), param_grid={'fit_intercept':[False, True], 'penalty':['l2']}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][0], return_train_score=True)
classifier = GridSearchCV(estimator=LogisticRegression(), param_grid={'fit_intercept':[False, True], 'penalty':['l2']}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'][0], return_train_score=True)
cv_results = classifier.fit(X, y).cv_results_
classifier.best_estimator_ # fitted
classifier.estimator # not fitted
# regression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)
regressor = GridSearchCV(estimator=LinearRegression(), param_grid={'fit_intercept':[False, True], 'positive':[False, True]}, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'][1], return_train_score=True)
cv_results = regressor.fit(X, y).cv_results_
regressor.best_estimator_ # fitted
regressor.estimator # not fitted
Evaluation
Classification: Precision&Recall Trade-Off
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
#from sklearn.metrics import roc_curve, precision_recall_curve, auc
def cls_evaluation(y_true, y_pred):
# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[y_true], columns=[y_pred], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
display(confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True)))
result = dict(); average='micro' # 'binary', 'macro', 'micro', 'weighted', 'samples'
result['confusion_matrix'] = metrics.confusion_matrix(y_true, y_pred)
result['accuracy_score'] = metrics.accuracy_score(y_true, y_pred)
result['balanced_accuracy_score'] = metrics.balanced_accuracy_score(y_true, y_pred)
result['recall_score'] = metrics.recall_score(y_true, y_pred, average=average)
result['precision_score'] = metrics.precision_score(y_true, y_pred, average=average)
result['f1_score'] = metrics.f1_score(y_true, y_pred, average=average) # precision&recall trade-off
result['fbeta_score'] = metrics.fbeta_score(y_true, y_pred, beta=2, average=average)
result['jaccard_score'] = metrics.jaccard_score(y_true, y_pred, average=average)
result['hamming_loss'] = metrics.hamming_loss(y_true, y_pred)
result['matthews_corrcoef'] = metrics.matthews_corrcoef(y_true, y_pred)
result['multilabel_confusion_matrix'] = metrics.multilabel_confusion_matrix(y_true, y_pred)
result['zero_one_loss'] = metrics.zero_one_loss(y_true, y_pred)
return result
y_true = np.random.randint(3, size=100)
y_pred = np.random.randint(3, size=100)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])
cls_evaluation(df['TRUE'], df['PRED'])
Regression: Bias&Variance Trade-Off
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import metrics
def reg_evaluation(y_true, y_pred):
y_resid = y_true - y_pred
fig = plt.figure(figsize=(30,5))
ax0 = plt.subplot2grid((2,5), (0,0), colspan=3, rowspan=1); ax0.set_title('Bias-Variance')
ax1 = plt.subplot2grid((2,5), (1,0), colspan=3, rowspan=1, sharex=ax0, sharey=ax0);
ax2 = plt.subplot2grid((2,5), (0,3), colspan=1, rowspan=2); ax2.set_title('Correlation'); ax2.set_xlabel('PRED'); ax2.set_ylabel('RESID')
ax3 = plt.subplot2grid((2,5), (0,4), colspan=1, rowspan=2); ax3.set_title('Residual Histogram')
[ax0.spines[side].set_visible(False) for side in ['top', 'bottom', 'right']]
[ax1.spines[side].set_visible(False) for side in ['top']]
[ax2.spines[side].set_visible(False) for side in ['top', 'right']]
ax0.plot(y_true, lw=0, marker='o', markerfacecolor='w', color='black', label='TRUE')
ax0.plot(y_pred, lw=0, ms=10, marker='x', color='r', label='PRED')
ax0.get_xaxis().set_visible(False)
ax0.legend(frameon=False)
for x, y1, y2 in zip(y_true.index, y_true, y_pred):
ax1.axvline(x=x, ymin=min(y1,y2), ymax=max(y1,y2), c='r')
# Homoscedasticity/Heteroscedasticity
ax2.scatter(y_pred, y_resid, label='Homoscedasticity/Heteroscedasticity', color='black')
boundary = max(y_pred.std(), y_resid.std())
ax2.set_xlim(-boundary*10, boundary*10)
ax2.set_ylim(-boundary*10, boundary*10)
ax2.legend(frameon=False)
ax3.hist(y_resid, bins=30, edgecolor='white')
ax3.axvline(y_resid.mean(), c='r')
result = dict()
result['r2_score'] = metrics.r2_score(y_true, y_pred)
result['explained_variance_score'] = metrics.explained_variance_score(y_true, y_pred)
result['mean_squared_error'] = metrics.mean_squared_error(y_true, y_pred)
result['mean_absolute_error'] = metrics.mean_absolute_error(y_true, y_pred)
result['median_absolute_error'] = metrics.median_absolute_error(y_true, y_pred)
result['max_error'] = metrics.max_error(y_true, y_pred)
result['mean_tweedie_deviance'] = metrics.mean_tweedie_deviance(y_true, y_pred)
return result
y_true = np.random.normal(0, 10, size=300)
y_pred = y_true + np.random.normal(3, 4, size=300) # bias, variance
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])
reg_evaluation(df['TRUE'], df['PRED'])
Time Series
# [1, Theoretical Simple Assumption] y = f(X1, X2, X3, ...) + noise
# >>> (1-1) X means deterministic variables (explanatory variable)
# >>> (1-2) noise is independent probabilistic variable
# >>> (1-3) the variance of y comes from noise
# [2, Experimental Calculation] y_hat = f_hat(X1, X2, X3, ...)
# >>> (2-1) X is given explanatory variables, the 'hat' means approximation
# [3] Estimation: y - y_hat = residual
# >>> (3-1) residual means approximation of noises
# >>> (3-2) if an estimator is well defined, correlation between y_hat and residual is expected to 0.
# [4, Evaluation] endog:y = fittedvalues:y_hat + resid:residual
# >>> (4-1) variances of endog, fittedvalues, resid
# >>> (4-2) variance of fittedvalues = f_hat(X1, X2, X3, ...): variance by propagation of uncertainty between explanatory variables with function(linear map or nonlinear map)
import numpy as np
import pandas as pd
import statsmodels
import statsmodels.tsa.api as smt
def tsa_evaluation(y_true, y_pred, num_params=1):
endog = y_true
fittedvalues = y_pred
resid = y_true - y_pred
result = dict()
# result['sst'] = result['ssr'] + result['sse'], for only Linear Regression
# 1 = result['r2'] + result['k2'], for all statistical cases
result['sst'] = ((endog - endog.mean())**2).sum() # deviation volume > all-data variances
result['ssr'] = ((fittedvalues - endog.mean())**2).sum() # deviation volume > all-data variances
result['sse'] = resid.apply(lambda x: x**2).sum() # deviation volume > all-data variances
result['sse/ssr'] = result['sse']/result['ssr'] # deviation volume ratio
result['ssr/sst:explained variance'] = result['ssr']/result['sst'] # deviation volume ratio > fraction of variance explained
result['sse/sst:unexplained variance'] = result['sse']/result['sst'] # deviation volume ratio > fraction of variance unexplained (FVU)
result['r2'] = 1 - (result['sse']/result['sst']) # [ 0 < r2 < 1 ] coefficient of determination: common variance shared bwtween the variables, (~covariance)
result['k2'] = 1 - result['r2'] # [ 0 < k2 < 1 ] coefficient of alienation: common variance not shared bwtween the variables
result['y-yhat:corr'] = np.corrcoef(endog, fittedvalues)[0,1] # [ -1 < correlation < 1 ]
result['y-yhat:cov'] = np.cov(endog, fittedvalues)[0,1]
result['y-yhat:leverage'] = np.cov(endog, fittedvalues)[0,1]/np.cov(endog, fittedvalues)[0,0] # [ 0 < leverage < 1 ]
result['yhat-resid:corr'] = np.corrcoef(fittedvalues, resid)[0,1]
result['yhat-resid:cov'] = np.cov(fittedvalues, resid)[0,1]
result['mst'] = endog.var(ddof=1) # deviation area > individual variance
result['msr'] = ((fittedvalues - endog.mean())**2).sum() / num_params # deviation area > individual variance
result['mse'] = resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params) # deviation area > individual variance
result['rmse'] = np.sqrt(resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params)) # deviation length
result['mae'] = resid.apply(np.abs).sum()/(resid.shape[0]-num_params) # deviation length
result['mape'] = (resid.apply(np.abs) / (fittedvalues + resid).apply(np.abs)).sum() / (resid.shape[0]-num_params) # deviation length
return result
y_true = pd.Series(data=np.random.normal(0, 1, size=300).cumsum(), index=pd.date_range('00:00:00', periods=300, freq='D'))
y_pred = y_true + .3*np.random.normal(0, 1, size=300).cumsum() + .1*np.random.normal(0, 1, size=300)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])
tsa_evaluation(df['TRUE'], df['PRED'], num_params=1)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics
#from sklearn.metrics import roc_curve, precision_recall_curve, auc
def tsa_evaluation(y_true, y_pred):
y_true = (y_true - y_true.iloc[0]).expanding(min_periods=1).mean().apply(lambda x: 1 if x > 0 else 0)
y_pred = (y_pred - y_pred.iloc[0]).expanding(min_periods=1).mean().apply(lambda x: 1 if x > 0 else 0)
# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[y_true], columns=[y_pred], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
display(confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True)))
result = dict(); average='micro' # 'binary', 'macro', 'micro', 'weighted', 'samples'
result['confusion_matrix'] = metrics.confusion_matrix(y_true, y_pred)
result['accuracy_score'] = metrics.accuracy_score(y_true, y_pred)
result['balanced_accuracy_score'] = metrics.balanced_accuracy_score(y_true, y_pred)
result['recall_score'] = metrics.recall_score(y_true, y_pred, average=average)
result['precision_score'] = metrics.precision_score(y_true, y_pred, average=average)
result['f1_score'] = metrics.f1_score(y_true, y_pred, average=average) # precision&recall trade-off
result['fbeta_score'] = metrics.fbeta_score(y_true, y_pred, beta=2, average=average)
result['jaccard_score'] = metrics.jaccard_score(y_true, y_pred, average=average)
result['hamming_loss'] = metrics.hamming_loss(y_true, y_pred)
result['matthews_corrcoef'] = metrics.matthews_corrcoef(y_true, y_pred)
result['multilabel_confusion_matrix'] = metrics.multilabel_confusion_matrix(y_true, y_pred)
result['zero_one_loss'] = metrics.zero_one_loss(y_true, y_pred)
return result
y_true = pd.Series(data=np.random.normal(0, 1, size=300).cumsum(), index=pd.date_range('00:00:00', periods=300, freq='D'))
y_pred = y_true + .5*np.random.normal(0, 1, size=300).cumsum() + .1*np.random.normal(0, 1, size=300)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['TRUE', 'PRED'])
tsa_evaluation(df['TRUE'], df['PRED'])
Task: Binary Classification
: precision&recall trade-off
Models
from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, naive_bayes, tree, neighbors, discriminant_analysis, svm, neural_network
X, y = make_classification(n_samples=3000, n_features=10, n_classes=2, n_clusters_per_class=1, weights=[.6, .4], flip_y=0)
classifier = linear_model.LogisticRegression(penalty='l2', max_iter=500)
classifier = ensemble.AdaBoostClassifier()
classifier = ensemble.GradientBoostingClassifier()
classifier = ensemble.BaggingClassifier()
classifier = ensemble.ExtraTreesClassifier()
classifier = ensemble.RandomForestClassifier()
classifier = tree.DecisionTreeClassifier()
classifier = tree.ExtraTreeClassifier()
classifier = neighbors.KNeighborsClassifier()
classifier = neighbors.RadiusNeighborsClassifier()
classifier = naive_bayes.GaussianNB()
classifier = discriminant_analysis.LinearDiscriminantAnalysis()
classifier = discriminant_analysis.QuadraticDiscriminantAnalysis()
classifier = svm.SVC(kernel='poly', probability=True, max_iter=10000) # kernel{‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
classifier = neural_network.MLPClassifier(hidden_layer_sizes=(100,), max_iter=100, activation='relu', solver='adam', learning_rate='adaptive')
classifier.fit(X, y)
classifier.predict(X)
classifier.predict_proba(X)
Metrics
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=2, n_clusters_per_class=1, weights=[1/10, 9/10])
classifier = LogisticRegression()
classifier.fit(X, y)
y_true = y
y_pred = classifier.predict(X)
## CLASSIFICATION
metrics.confusion_matrix(y_true, y_pred)
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred, average=None)
metrics.precision_score(y_true, y_pred, average=None)
metrics.f1_score(y_true, y_pred, average=None)
metrics.fbeta_score(y_true, y_pred, beta=2, average=None)
metrics.jaccard_score(y_true, y_pred, average=None)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.hamming_loss(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)
Pipelines
#
Validation
# https://scikit-learn.org/stable/modules/model_evaluation.html
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, precision_recall_curve, auc
def parameter_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
return scores
def scenario_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index.name = 'split' # Repeated Stratified KFold: n_repeats * k_flod
return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)
X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
# version1: multiple parameters
classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, param_grid={'lineardiscriminantanalysis__priors':[(.1, .9), (.5, .5), None]}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][-3], return_train_score=True)
classifier.fit(X, y)
parameter_validation_scores(classifier.cv_results_)
# version2: multiple scoring
# v.s. cross_val_score(classifier, X, y, scoring='f1', cv=cv, fit_params=None, n_jobs=-1, verbose=0)
classifier = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('lineardiscriminantanalysis', LinearDiscriminantAnalysis())])
cv_results = cross_validate(classifier, X, y, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)
# additional: trade-off metrics
y_true = y.copy()
y_prob = classifier.fit(X,y).predict_proba(X).copy()
fpr, tpr1, thresholds1 = roc_curve(y_true, y_prob[:,1])
ppv, tpr2, thresholds2 = precision_recall_curve(y_true, y_prob[:,1])
roc_auc = auc(fpr, tpr1) # X-axis(fpr): fall-out / y-axis(tpr): recall
pr_auc = auc(tpr2, ppv) # # X-axis(tpr): recall / y-axis(ppv): precision
Evaluation
from datetime import datetime
from functools import wraps
import joblib
import os
from tqdm import tqdm
from copy import deepcopy
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.pipeline import FeatureUnion, Pipeline, make_pipeline, make_union
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import FunctionTransformer, Binarizer, KBinsDiscretizer, OrdinalEncoder, OneHotEncoder, PolynomialFeatures, SplineTransformer, LabelBinarizer, LabelEncoder
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn import decomposition
from sklearn import linear_model, naive_bayes, discriminant_analysis, svm, neighbors, tree, ensemble
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn import metrics
class ResultVisualizer:
@staticmethod
def training_scenario(cv_results):
cv_results = pd.DataFrame(cv_results)
cv_1st_normalization = list()
for name in cv_results.keys():
cv_result = cv_results[name].apply(lambda x: pd.Series(x)).stack(0).to_frame().reset_index().rename(columns={0:'score', 'level_0':'score_name', 'level_1':'split'})
cv_result['scenario'] = name
cv_1st_normalization.append(cv_result)
cv_result = pd.concat(cv_1st_normalization, axis=0).reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'target' if x.startswith('train') or x.startswith('test') else 'nontarget')
cv_result = cv_result.loc[lambda x: x['domain'] == 'target'].copy().reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'train' if x.startswith('train') else 'validation')
cv_result['scoring_type'] = cv_result['score_name'].apply(lambda x: '_'.join(x.split('_')[1:]))
sns.set_theme(style="ticks") # white, dark, whitegrid, darkgrid, ticks
g = sns.FacetGrid(cv_result, col="scoring_type", row="domain", aspect=2, height=3)
g.map_dataframe(sns.boxplot, x="scenario", y="score", hue='split') # x: numerical variable, y: numerical variable, hue: categorical variable
g.fig.suptitle('Scenario Evaluation')
g.add_legend()
g.tight_layout()
return g
@staticmethod
def tradeoff_curve(curve_points):
roc_curve_points_by_scenario = list()
pr_curve_points_by_scenario = list()
for scenario, (y_true, y_prob) in curve_points.items():
roc_curve_points = metrics.roc_curve(y_true, y_prob[:, 1]) # ['FPR', 'TPR', 'Threshold']
roc_curve_points = pd.DataFrame(roc_curve_points).T
roc_curve_points.columns = ['FPR', 'TPR', 'Threshold']
roc_curve_points.insert(0, 'Scenario', scenario)
roc_curve_points_by_scenario.append(roc_curve_points)
pr_curve_points = metrics.precision_recall_curve(y_true, y_prob[:, 1]) # ['PPV', 'TPR', 'Threshold']
pr_curve_points = pd.DataFrame(pr_curve_points).T
pr_curve_points.columns = ['PPV', 'TPR', 'Threshold']
pr_curve_points.insert(0, 'Scenario', scenario)
pr_curve_points_by_scenario.append(pr_curve_points)
roc_curve_points_by_scenario = pd.concat(roc_curve_points_by_scenario, axis=0)
pr_curve_points_by_scenario = pd.concat(pr_curve_points_by_scenario, axis=0)
plt.figure(figsize=(30,5))
ax0 = plt.subplot2grid((1,2), (0,0))
ax0.spines['bottom'].set_visible(False)
ax0.spines['right'].set_visible(False)
ax0.tick_params(top=True, bottom=False, left=True, right=False, labeltop=True, labelbottom=False, labelleft=True, labelright=False)
ax1 = plt.subplot2grid((1,2), (0,1))
ax1.spines['bottom'].set_visible(False)
ax1.spines['left'].set_visible(False)
ax1.tick_params(top=True, bottom=False, left=False, right=True, labeltop=True, labelbottom=False, labelleft=False, labelright=True)
sns.scatterplot(data=roc_curve_points_by_scenario, x='FPR', y='TPR', hue='Scenario', ax=ax0)
sns.scatterplot(data=pr_curve_points_by_scenario, x='TPR', y='PPV', hue='Scenario', ax=ax1)
plt.tight_layout()
def evaluation(*args, **kwargs):
def decorator(func):
@wraps(func)
def wrapper(model, X, y, model_name=None, model_info=None, domain_kind=None, scenario=None, note=None, reposit=False, verbose=False):
model_repository = 'model_repository'
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if reposit:
# model_repository
saving_name = model_name + current_time.replace('-', '').replace(' ', '').replace(':', '')
os.mkdir(model_repository) if not model_repository in os.listdir() else None
joblib.dump(model, f'./{model_repository}/pipeline_{saving_name}.joblib')
pipeline = joblib.load(f'./{model_repository}/pipeline_{saving_name}.joblib')
y, y_pred, y_prob = func(model, X, y)
confusion_matrix = pd.crosstab(index=y, columns=y_pred)
if confusion_matrix.columns.size != 2:
confusion_matrix[pd.Index([0,1]).difference(confusion_matrix.columns).item()] = 0
elif confusion_matrix.index.size != 2:
confusion_matrix = confusion_matrix.T
confusion_matrix[pd.Index([0,1]).difference(confusion_matrix.columns).item()] = 0
confusion_matrix = confusion_matrix.T
TP = confusion_matrix.iloc[1, 1]
TN = confusion_matrix.iloc[0, 0]
FP = confusion_matrix.iloc[0, 1]
FN = confusion_matrix.iloc[1, 0]
summary = dict()
summary['datetime'] = [current_time]
summary['model'] = [model_name]
summary['information'] = [model_info]
summary['scenario'] = [scenario]
summary['domain_kind'] = [domain_kind]
summary['total'] = [y.shape[0]]
summary['true_positive'] = [TP]
summary['true_negative'] = [TN]
summary['false_positive'] = [FP]
summary['false_negative'] = [FN]
summary['accuracy'] = [metrics.accuracy_score(y, y_pred)]
summary['precision'] = [metrics.precision_score(y, y_pred)]
summary['recall'] = [metrics.recall_score(y, y_pred)]
summary['f1'] = [metrics.f1_score(y, y_pred)]
summary['note'] = [note]
evaluation = pd.concat([pd.read_csv(f'./{model_repository}/.evaluation.csv'), pd.DataFrame(summary)], axis=0).reset_index(drop=True) if '.evaluation.csv' in os.listdir(model_repository) else pd.DataFrame(summary)
evaluation.to_csv(f'./{model_repository}/.evaluation.csv', index=False)
if verbose:
display(evaluation)
return y_pred, y_prob, evaluation.copy()
return wrapper
return decorator
@evaluation(description="my_description")
def prediction_with(model, X, y, model_name=None, model_info=None, domain_kind=None, scenario=None, note=None, reposit=False, verbose=False):
y_pred = model.predict(X)
y_prob = model.predict_proba(X)
return y, y_pred, y_prob
class CategoricalColumnTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
self.base = make_pipeline(
SelectPercentile(mutual_info_classif, percentile=30),
OneHotEncoder(handle_unknown='ignore', sparse_output=False),
).fit(X_train[categorical_columns], y_train)
def fit(self, X, y=None):
return self
def transform(self, X):
X = self.base.transform(X)
return X
class NumericalColumnTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
self.base = make_pipeline(
SelectPercentile(f_classif, percentile=30),
SplineTransformer(degree=3, n_knots=5, knots=['uniform', 'quantile'][0], extrapolation=['error', 'constant', 'linear', 'continue', 'periodic'][1], include_bias=True, order=['C', 'F'][0]),
).fit(X_train[numerical_columns], y_train)
def fit(self, X, y=None):
return self
def transform(self, X):
X = self.base.transform(X)
return X
# data preprocessing
categorical_columns = ['X0', 'X1', 'X2', 'X3', 'X4']
numerical_columns = ['X5', 'X6', 'X7', 'X8', 'X9']
X, y = make_classification(n_samples=1000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0, random_state=0)
data = pd.DataFrame(np.c_[X, y], columns=categorical_columns + numerical_columns + ['y'])
data.loc[:, categorical_columns] = KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][0], strategy=['uniform', 'quantile', 'kmeans'][0]).fit_transform(data[categorical_columns])
X = data.loc[:, data.columns != 'y']
y = data.loc[:, data.columns == 'y']['y']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.2, shuffle=None, stratify=None, random_state=0)
# modeling through pipelining
notification = """NOTE"""
def identity(X): return X
cct = CategoricalColumnTransformer()
nct = NumericalColumnTransformer()
pipelines = dict()
pipelines['scenario_1'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), linear_model.LogisticRegression())
pipelines['scenario_2'] = make_pipeline(make_column_transformer((cct, categorical_columns), (KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][2], strategy=['uniform', 'quantile', 'kmeans'][0]), numerical_columns)), naive_bayes.MultinomialNB())
pipelines['scenario_3'] = make_pipeline(make_column_transformer((FunctionTransformer(identity), categorical_columns), (nct, numerical_columns)), discriminant_analysis.LinearDiscriminantAnalysis())
pipelines['scenario_4'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), svm.SVC())
pipelines['scenario_5'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), neighbors.KNeighborsClassifier())
pipelines['scenario_6'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), tree.ExtraTreeClassifier())
pipelines['scenario_7'] = make_pipeline(make_column_transformer((cct, categorical_columns), (nct, numerical_columns)), ensemble.GradientBoostingClassifier())
# syntax: <estimator>__<parameter>
param_grids = dict()
param_grids['logisticregression'] = {
'logisticregression__fit_intercept' : [False],
'logisticregression__solver' : ['newton-cholesky'],
'logisticregression__C':[.001, .005, .007,]
}
param_grids['multinomialnb'] = {
'multinomialnb__class_prior':[(.1, .9), (.2, .8), (.3, .7)],
}
param_grids['lineardiscriminantanalysis'] = {
'lineardiscriminantanalysis__priors':[(.1, .9), (.2, .8), (.3, .7)],
}
param_grids['svc'] = {
'svc__probability':[True],
'svc__C' : [.01, .05, .07],
'svc__kernel' : ['rbf'],
}
param_grids['kneighborsclassifier'] = {
'kneighborsclassifier__n_neighbors': [20, 30],
'kneighborsclassifier__metric': ['minkowski'],
'kneighborsclassifier__weights': ['uniform'],
'kneighborsclassifier__algorithm': ['auto'],
'kneighborsclassifier__p' : [1],
'kneighborsclassifier__leaf_size' : [30, 50, 100],
}
param_grids['extratreeclassifier'] = {
'extratreeclassifier__max_depth':[10, 20, 30],
'extratreeclassifier__min_impurity_decrease':[.01, .05, .1],
}
param_grids['gradientboostingclassifier'] = {
'gradientboostingclassifier__n_estimators':[10, 30, 50],
'gradientboostingclassifier__subsample':[.7,.8, 1],
'gradientboostingclassifier__min_samples_split':[2],
'gradientboostingclassifier__min_impurity_decrease':[.01, .05, .1],
}
# https://scikit-learn.org/stable/modules/model_evaluation.html
#binary_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
cv_results = dict(); curve_points = dict()
cross_validation = RepeatedStratifiedKFold(n_splits=5, n_repeats=3,random_state=0) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None)
for scenario, pipeline in tqdm(deepcopy(pipelines).items()):
model_name = list(pipeline.named_steps.keys())[-1]
param_grid = param_grids[model_name]
# [STEP1]: validation for multiple parameter
pipeline = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=cross_validation, scoring='accuracy', return_train_score=True)
pipeline.fit(X_train, y_train)
pipelines.update({scenario:pipeline})
#pipeline.cv_results_
#pipeline.best_params_
#pipeline.best_estimator_
#pipeline.best_score_
# [STEP2]: prediction & evaluation
scenario_description = '/'.join(list(map(lambda step: step[0], pipeline.get_params()['estimator__steps'])))
best_param = '/'.join(list(map(lambda param: '_'.join(param[0].split('__')[1:]) + ': '+str(param[1]), pipeline.best_params_.items())))
y_train_pred, y_train_prob, evaluation = prediction_with(pipeline, X_train, y_train, model_name=model_name, model_info=best_param, domain_kind='train', scenario=scenario_description, reposit=True, note=notification)
y_test_pred, y_test_prob, evaluation = prediction_with(pipeline, X_test, y_test, model_name=model_name, model_info=best_param, domain_kind='test', scenario=scenario_description, note=notification)
# [STEP3]: scenario visualization data
cv_results[scenario] = cross_validate(pipeline.best_estimator_, X_train, y_train, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cross_validation, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0) # cross-validation multiple scoring
curve_points[scenario] = (y_test, y_test_prob)
ResultVisualizer.training_scenario(cv_results)
ResultVisualizer.tradeoff_curve(curve_points)
#y_pred, y_prob, evaluation = prediction_with(pipelines['scenario_1'], X_test[:10], y_test[:10], note='virtual_test', reposit=False, verbose=True)
evaluation.tail(int(len(pipelines)*2))
Binary Confusion Matrix
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
X, y = make_classification(
n_samples=1000, n_features=3, n_informative=2, n_redundant=1, n_repeated=0, n_classes=2, n_clusters_per_class=1,
weights=[.6, .4], flip_y=0, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0, shuffle=True, random_state=0)
classifier = LogisticRegression(penalty='l1', solver='liblinear')
classifier.fit(X, y)
y_true = y
y_pred = classifier.predict(X)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['ACTUAL', 'PRED'])
# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[df['ACTUAL']], columns=[df['PRED']], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True))
ROC/PR Curve for Binary Classification
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import roc_curve, precision_recall_curve, auc
X, y = make_classification(n_samples=100, n_features=5, n_informative=2, n_redundant=1, n_repeated=1, n_classes=2, n_clusters_per_class=1)
classifier = LogisticRegression()
classifier.fit(X, y)
y_true = y
y_prob = classifier.predict_proba(X)
y_pred = classifier.predict(X)
confusion_matrix = confusion_matrix(y_true, y_pred)
recall = confusion_matrix[1, 1]/(confusion_matrix[1, 0]+confusion_matrix[1, 1])
fallout = confusion_matrix[0, 1]/(confusion_matrix[0, 0]+confusion_matrix[0, 1])
precision = confusion_matrix[0, 0]/(confusion_matrix[0, 0]+confusion_matrix[1, 0])
fpr, tpr1, thresholds1 = roc_curve(y_true, y_prob[:,1])
ppv, tpr2, thresholds2 = precision_recall_curve(y_true, y_prob[:,1])
# visualization
print('- ROC AUC:', auc(fpr, tpr1))
print('- PR AUC:', auc(tpr2, ppv))
print(classification_report(y_true, y_pred, target_names=['down', 'up']))
plt.figure(figsize=(25, 7))
ax0 = plt.subplot2grid((1,2), (0,0))
ax1 = plt.subplot2grid((1,2), (0,1))
ax0.plot(fpr, tpr1, 'o-') # X-axis(fpr): fall-out / y-axis(tpr): recall
ax0.plot([fallout], [recall], 'bo', ms=10)
ax0.plot([0, 1], [0, 1], 'k--')
ax0.set_xlabel('Fall-Out')
ax0.set_ylabel('Recall')
ax1.plot(tpr2, ppv, 'o-') # X-axis(tpr): recall / y-axis(ppv): precision
ax1.plot([recall], [precision], 'bo', ms=10)
ax1.plot([0, 1], [1, 0], 'k--')
ax1.set_xlabel('Recall')
ax1.set_ylabel('Precision')
plt.show()
Task: Multi-Class Classification
: precision&recall trade-off
Models
from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, naive_bayes, tree, neighbors, discriminant_analysis, svm, neural_network
X, y = make_classification(n_samples=3000, n_features=10, n_classes=3, n_clusters_per_class=1, weights=[.6, .3, .1], flip_y=0)
classifier = linear_model.LogisticRegression(penalty='l2', max_iter=500)
classifier = ensemble.AdaBoostClassifier()
classifier = ensemble.GradientBoostingClassifier()
classifier = ensemble.BaggingClassifier()
classifier = ensemble.ExtraTreesClassifier()
classifier = ensemble.RandomForestClassifier()
classifier = tree.DecisionTreeClassifier()
classifier = tree.ExtraTreeClassifier()
classifier = neighbors.KNeighborsClassifier()
classifier = neighbors.RadiusNeighborsClassifier()
classifier = naive_bayes.GaussianNB()
classifier = discriminant_analysis.LinearDiscriminantAnalysis()
classifier = discriminant_analysis.QuadraticDiscriminantAnalysis()
classifier = svm.SVC(kernel='poly', probability=True, max_iter=10000) # kernel{‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
classifier = neural_network.MLPClassifier(hidden_layer_sizes=(100,), max_iter=100, activation='relu', solver='adam', learning_rate='adaptive')
classifier.fit(X, y)
classifier.predict(X)
classifier.predict_proba(X)
Metrics
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=5, n_clusters_per_class=1, weights=[1/10, 3/10, 2/10, 1/10, 3/10])
classifier = LogisticRegression()
classifier.fit(X, y)
y_true = y
y_pred = classifier.predict(X)
## CLASSIFICATION
metrics.confusion_matrix(y_true, y_pred)
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred, average='micro')
metrics.precision_score(y_true, y_pred, average='micro')
metrics.f1_score(y_true, y_pred, average='micro')
metrics.fbeta_score(y_true, y_pred, beta=2, average='micro')
metrics.jaccard_score(y_true, y_pred, average='micro')
metrics.hamming_loss(y_true, y_pred)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)
Pipelines
#
Validation
# https://scikit-learn.org/stable/modules/model_evaluation.html
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
def parameter_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
return scores
def scenario_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index.name = 'split' # Repeated Stratified KFold: n_repeats * k_flod
return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)
X, y = make_classification(n_samples=10000, n_features=5, n_classes=3, n_clusters_per_class=1, weights=[.6, .3, .1], flip_y=0, n_informative=4, n_redundant=0, n_repeated=0)
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
# version1: multiple parameters
classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, param_grid={'lineardiscriminantanalysis__priors':[(.1, .1, .8), (.4, .3, .3), None]}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'][1], return_train_score=True)
classifier.fit(X, y)
parameter_validation_scores(classifier.cv_results_)
# version2: multiple scoring
# v.s. cross_val_score(classifier, X, y, scoring='accuracy', cv=cv, fit_params=None, n_jobs=-1, verbose=0)
classifier = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('lineardiscriminantanalysis', LinearDiscriminantAnalysis())])
cv_results = cross_validate(classifier, X, y, scoring=['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)
Evaluation
#
Multi Class Confusion Matrix
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
X, y = make_classification(
n_samples=3000, n_features=7, n_informative=4, n_redundant=1, n_repeated=0, n_classes=5, n_clusters_per_class=1,
weights=None, flip_y=0, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0, shuffle=True, random_state=0)
classifier = LogisticRegression(penalty='l1', solver='liblinear')
classifier.fit(X, y)
y_true = y
y_pred = classifier.predict(X)
df = pd.DataFrame(np.c_[y_true, y_pred], columns=['ACTUAL', 'PRED'])
# version 1: base confusion matrix
confusion_matrix = pd.crosstab(index=[df['ACTUAL']], columns=[df['PRED']], margins=True, margins_name='All', dropna=False, normalize=False) # .unstack(level=0).stack(level=1)
# version 2: extended confusion matrix
conditional_probability_for_actual = confusion_matrix.div(confusion_matrix.loc[:, 'All'], axis=0).rename(columns=dict(map(lambda x: (x, str(x)+'|A'), confusion_matrix.columns)))
conditional_probability_for_prediction = confusion_matrix.div(confusion_matrix.loc['All', :], axis=1).rename(index=dict(map(lambda x: (x, str(x)+'|P'), confusion_matrix.columns)))
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_actual], axis=1)
confusion_matrix = pd.concat([confusion_matrix, conditional_probability_for_prediction], axis=0)
confusion_matrix.iloc[:, :].style.background_gradient(cmap=sns.light_palette("green", as_cmap=True))
ROC/PR Curve for Multi Class Classification
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import roc_curve, precision_recall_curve, auc
def multiclass_roc_curve(y_true, y_prob):
import numpy as np
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
y_enco = label_binarize(y_true, classes=np.sort(np.unique(y_true)).tolist())
fpr = dict()
tpr = dict()
thr = dict()
auc_ = dict()
for class_idx in range(np.unique(y_true).shape[0]):
fpr[class_idx], tpr[class_idx], thr[class_idx] = roc_curve(y_enco[:, class_idx], y_prob[:, class_idx])
auc_[class_idx] = auc(fpr[class_idx], tpr[class_idx])
return fpr, tpr, thr, auc_
def multiclass_pr_curve(y_true, y_prob):
import numpy as np
from sklearn.metrics import precision_recall_curve, auc
from sklearn.preprocessing import label_binarize
y_enco = label_binarize(y_true, classes=np.sort(np.unique(y_true)).tolist())
ppv = dict()
tpr = dict()
thr = dict()
auc_ = dict()
for class_idx in range(np.unique(y_true).shape[0]):
ppv[class_idx], tpr[class_idx], thr[class_idx] = precision_recall_curve(y_enco[:, class_idx], y_prob[:, class_idx])
auc_[class_idx] = auc(tpr[class_idx], ppv[class_idx])
return ppv, tpr, thr, auc_
X, y = make_classification(n_samples=300, n_features=8, n_informative=5, n_redundant=1, n_repeated=1, n_classes=5, n_clusters_per_class=1, weights=[1/10, 3/10, 2/10, 1/10, 3/10])
classifier = LogisticRegression()
classifier.fit(X, y)
y_true = y
y_pred = classifier.predict(X)
y_prob = classifier.predict_proba(X)
fpr, tpr1, thr1, auc1 = multiclass_roc_curve(y_true, y_prob)
ppv, tpr2, thr2, auc2 = multiclass_pr_curve(y_true, y_prob)
# visualization
print(classification_report(y_true, y_pred, target_names=['A', 'B', 'C', 'D', 'E']))
plt.figure(figsize=(25, 7))
ax0 = plt.subplot2grid((1,2), (0,0))
ax1 = plt.subplot2grid((1,2), (0,1))
for class_idx in range(np.unique(y_true).shape[0]):
ax0.plot(fpr[class_idx], tpr1[class_idx], 'o-', ms=5, label=str(class_idx) + f' | {round(auc1[class_idx], 2)}')
ax1.plot(tpr2[class_idx], ppv[class_idx], 'o-', ms=5, label=str(class_idx) + f' | {round(auc2[class_idx], 2)}')
ax0.plot([0, 1], [0, 1], 'k--')
ax0.set_xlabel('Fall-Out')
ax0.set_ylabel('Recall')
ax0.legend()
ax1.plot([0, 1], [1, 0], 'k--')
ax1.set_xlabel('Recall')
ax1.set_ylabel('Precision')
ax1.legend()
plt.show()
Imbalance Class Evaluation Metrics
#
Task: Multi-Label Classification
: precision&recall trade-off
Models
#
Metrics
#
Pipelines
#
Validation
#
Evaluation
#
Task: Regression
: bias&variance trade-off
Models
import numpy as np
from sklearn.datasets import make_classification, make_regression
from sklearn import linear_model, ensemble, neighbors, tree, svm, neural_network
X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
regressor = linear_model.LinearRegression()
regressor = linear_model.Ridge(alpha=.5)
regressor = linear_model.Lasso(alpha=0.1)
regressor = linear_model.LassoLars(alpha=.1, normalize=False)
regressor = linear_model.ElasticNet()
regressor = linear_model.BayesianRidge()
regressor = ensemble.AdaBoostRegressor()
regressor = ensemble.GradientBoostingRegressor()
regressor = ensemble.BaggingRegressor()
regressor = ensemble.ExtraTreesRegressor()
regressor = ensemble.RandomForestRegressor()
regressor = neighbors.KNeighborsRegressor()
regressor = neighbors.RadiusNeighborsRegressor()
regressor = tree.DecisionTreeRegressor()
regressor = tree.ExtraTreeRegressor()
regressor = svm.LinearSVR(max_iter=1000)
regressor = svm.SVR(kernel='poly', max_iter=1000) # kernel: {‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}, default=’rbf’
regressor = svm.NuSVR(max_iter=1000)
regressor = neural_network.MLPRegressor(hidden_layer_sizes=(100,), max_iter=10, activation='relu', solver='adam', learning_rate='adaptive')
regressor.fit(X, y)
regressor.predict(X)
Metrics
from sklearn import metrics
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
regressor = LinearRegression()
regressor.fit(X, y)
y_true = y
y_pred = regressor.predict(X)
# REGRESSION
metrics.explained_variance_score(y_true, y_pred)
metrics.max_error(y_true, y_pred)
metrics.mean_absolute_error(y_true, y_pred)
metrics.mean_squared_error(y_true, y_pred)
metrics.median_absolute_error(y_true, y_pred)
metrics.r2_score(y_true, y_pred)
metrics.mean_tweedie_deviance(y_true, y_pred)
Pipelines
#
Validation
# https://scikit-learn.org/stable/modules/model_evaluation.html
import pandas as pd
from sklearn.datasets import make_regression
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, KFold, RepeatedKFold
from sklearn.linear_model import LinearRegression
def parameter_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('train_score') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('_train_score', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('test_score') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('_test_score', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index = pd.MultiIndex.from_frame(pd.DataFrame(cv_results['params']))
return scores
def scenario_validation_scores(cv_results):
train_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('train') , cv_results.items())))
train_scores = train_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=train_scores[0].to_dict())
train_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('train_score', column.replace('train_', '')), train_scores.columns))
test_scores = pd.DataFrame(list(filter(lambda score: score[0].startswith('test') , cv_results.items())))
test_scores = test_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=test_scores[0].to_dict())
test_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('test_score', column.replace('test_', '')), test_scores.columns))
time_scores = pd.DataFrame(list(filter(lambda score: score[0].endswith('time') , cv_results.items())))
time_scores = time_scores[1].apply(lambda x: pd.Series(x)).T.rename(columns=time_scores[0].to_dict())
time_scores.columns = pd.MultiIndex.from_tuples(map(lambda column: ('time', column.replace('_time', '')), time_scores.columns))
scores = pd.concat([train_scores, test_scores, time_scores], axis=1)
scores.index.name = 'split' # Repeated KFold: n_repeats * k_flod
return scores #.swaplevel(0,1,axis=1).sort_index(axis=1)
X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=None) # KFold(n_splits=10, shuffle=False, random_state=None)
# version1: multiple parameters
regressor = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearRegression())
regressor = GridSearchCV(estimator=regressor, param_grid={'linearregression__fit_intercept':[False, True], 'linearregression__positive':[False, True]}, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'][1], return_train_score=True)
regressor.fit(X, y)
parameter_validation_scores(regressor.cv_results_)
# version2: multiple scoring
# v.s. cross_val_score(regressor, X, y, cv=cv, scoring='r2', fit_params=None, n_jobs=-1, verbose=0)
regressor = Pipeline([('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)), ('normalizer', Normalizer()), ('linearregression', LinearRegression())])
cv_results = cross_validate(regressor, X, y, cv=cv, scoring=['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error'], fit_params=None, return_train_score=True, return_estimator=False, n_jobs=-1, verbose=0)
scenario_validation_scores(cv_results)
Evaluation
from datetime import datetime
from functools import wraps
import joblib
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.datasets import make_regression, make_classification
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold, KFold
from sklearn import metrics
class TemplateTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit(self, X, y=None):
return self
def transform(self, X):
import pandas as pd
from statsmodels.stats.outliers_influence import variance_inflation_factor
features_by_vif = pd.Series(
data = [variance_inflation_factor(X, i) for i in range(X.shape[1])],
index = range(X.shape[1])).sort_values(ascending=True).iloc[:X.shape[1] - 1].index.tolist()
return X[:, features_by_vif]
def evaluation(*args, **kwargs):
def decorator(func):
@wraps(func)
def wrapper(model, X, y, model_name='model', domain_kind='train', verbose=False):
y, y_pred = func(model, X, y)
if verbose:
pass
summary = dict()
summary['datetime'] = [datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
summary['model'] = [model_name]
summary['domain'] = [domain_kind]
summary['MAE'] = [metrics.mean_absolute_error(y, y_pred)]
summary['MAPE'] = [metrics.mean_absolute_percentage_error(y, y_pred)]
summary['MSE'] = [metrics.mean_squared_error(y, y_pred)]
summary['R2'] = [metrics.r2_score(y, y_pred)]
evaluation = pd.DataFrame(summary)
return y_pred, evaluation
return wrapper
return decorator
@evaluation(description="my_description")
def prediction(model, X, y, model_name='model', domain_kind='train', verbose=False):
y_pred = model.predict(X)
return y, y_pred
X, y = make_regression(n_samples=3000, n_features=10, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
feature_space = FeatureUnion(transformer_list=[('TemplateTransformer', TemplateTransformer()),])
pipelines = dict()
pipelines['LinearRegression'] = Pipeline(steps=[('FeatureSpace', feature_space), ('LinearRegression', LinearRegression())])
# syntax: <estimator>__<parameter>
param_grids = dict()
param_grids['LinearRegression'] = dict(
LinearRegression__fit_intercept = [False, True]
)
# https://scikit-learn.org/stable/modules/model_evaluation.html
names = []
results = []
for idx, ((name, pipeline), param_grid) in enumerate(zip(pipelines.items(), param_grids.values())):
scorings = ['neg_mean_squared_error']
scoring = scorings[0]
cross_validation = KFold(n_splits=10, shuffle=True, random_state=None)
pipeline = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=cross_validation, scoring=scoring)
pipeline.fit(X, y)
# [STEP3]: save & load
joblib.dump(pipeline, f'{idx}{name}_pipeline.joblib')
pipeline = joblib.load(f'{idx}{name}_pipeline.joblib')
# [STEP4]: prediction & evaluation
y_pred, evaluation = prediction(pipeline, X, y, model_name=name, domain_kind='train')
eval_table = evaluation if idx == 0 else pd.concat([eval_table, evaluation], axis=0)
#print('*', name)
names.append(name)
results.append(cross_val_score(pipeline, X, y, cv=cross_validation, scoring=scoring))
fig = plt.figure(figsize=(25,7)); layout=(1,1); axes = dict()
axes[0] = plt.subplot2grid(layout, (0,0), fig=fig)
axes[0].boxplot(results)
axes[0].set_title('Evaluate Algorithms')
axes[0].set_xticklabels(names)
display(eval_table)
plt.show()
Task: Time Series Analysis(1): Scikit-Learn
: bias&variance trade-off
Models
#
Metrics
#
Pipelines
#
Validation
#
Evaluation
#
Task: Time Series Analysis(2): Statsmodels
import numpy as np
import pandas as pd
from tqdm import tqdm
import statsmodels.api as sm
import statsmodels.tsa.api as smt
from statsmodels.tsa.deterministic import DeterministicProcess, Fourier, Seasonality, TimeTrend
from sklearn.model_selection import TimeSeriesSplit
def future_frame(series, f_steps=20):
trend = TimeTrend(constant=True, order=2)
fourier = Fourier(period=52, order=3)
seasonal = Seasonality(period=11)
dp = DeterministicProcess(series.index, additional_terms=[trend, fourier, seasonal])
insample = dp.in_sample(); insample['sample-domain'] = 'in'
outofsample = dp.out_of_sample(steps=f_steps); outofsample['sample-domain'] = 'out'
f_frame = pd.concat([insample, outofsample], axis=0)
f_frame.index = f_frame.index.rename('Date')
f_frame.loc[lambda x: x['sample-domain'] == 'in', 'ts'] = series
# Time Series Feature Engineering
f_frame.drop(['trend'], axis=1, inplace=True)
f_frame['rel-trend'] = f_frame.index.year + f_frame.index.month/12 + f_frame.index.day/365
return f_frame
def tsa_evaluation(y_true, y_pred, num_params=1):
endog = y_true
fittedvalues = y_pred
resid = y_true - y_pred
result = dict()
# result['sst'] = result['ssr'] + result['sse'], for only Linear Regression
# 1 = result['r2'] + result['k2'], for all statistical cases
result['sst'] = ((endog - endog.mean())**2).sum() # deviation volume > all-data variances
result['ssr'] = ((fittedvalues - endog.mean())**2).sum() # deviation volume > all-data variances
result['sse'] = resid.apply(lambda x: x**2).sum() # deviation volume > all-data variances
result['(sse/ssr)'] = result['sse']/result['ssr'] # deviation volume ratio
result['(ssr/sst)explained variance'] = result['ssr']/result['sst'] # deviation volume ratio > fraction of variance explained
result['(ssr/sst)unexplained variance'] = result['sse']/result['sst'] # deviation volume ratio > fraction of variance unexplained (FVU)
result['r2'] = 1 - (result['sse']/result['sst']) # [ 0 < r2 < 1 ] coefficient of determination: common variance shared bwtween the variables, (~covariance)
result['k2'] = 1 - result['r2'] # [ 0 < k2 < 1 ] coefficient of alienation: common variance not shared bwtween the variables
result['y-yhat:corr'] = np.corrcoef(endog, fittedvalues)[0,1] # [ -1 < correlation < 1 ]
result['y-yhat:cov'] = np.cov(endog, fittedvalues)[0,1]
result['y-yhat:leverage'] = np.cov(endog, fittedvalues)[0,1]/np.cov(endog, fittedvalues)[0,0] # [ 0 < leverage < 1 ]
result['yhat-resid:corr'] = np.corrcoef(fittedvalues, resid)[0,1]
result['yhat-resid:cov'] = np.cov(fittedvalues, resid)[0,1]
result['mst'] = endog.var(ddof=1) # deviation area > individual variance
result['msr'] = ((fittedvalues - endog.mean())**2).sum() / num_params # deviation area > individual variance
result['mse'] = resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params) # deviation area > individual variance
result['rmse'] = np.sqrt(resid.apply(lambda x: x**2).sum()/(resid.shape[0]-num_params)) # deviation length
result['mae'] = resid.apply(np.abs).sum()/(resid.shape[0]-num_params) # deviation length
result['mape'] = (resid.apply(np.abs) / (fittedvalues + resid).apply(np.abs)).sum() / (resid.shape[0]-num_params) # deviation length
return result
# given data
y = smt.ArmaProcess(ar=[1, .3, -.5], ma=[1, .1, .3]).generate_sample(nsample=600, burnin=50).cumsum()
y = pd.Series(y, index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))
training_periods = 300
forecasting_steps = 20
y_future_frame = future_frame(y, f_steps=forecasting_steps).asfreq('D')
endog_past = y_future_frame.loc[lambda x: x['sample-domain'] == 'in', 'ts'].copy()
exog_past = y_future_frame.loc[lambda x: x['sample-domain'] == 'in'].drop(['ts', 'sample-domain'], axis=1).copy()
exog_futrue = y_future_frame.loc[lambda x: x['sample-domain'] == 'out'].drop(['ts', 'sample-domain'], axis=1).copy()
# cross-validation method: n-step ahead
train_evaluations = list()
test_evaluations = list()
ts_iterator = tqdm(
enumerate(
TimeSeriesSplit(
n_splits=int((y.shape[0]-training_periods)/forecasting_steps), max_train_size=training_periods, test_size=forecasting_steps, gap=0).split(y)), total=int((y.shape[0]-training_periods)/forecasting_steps))
for split_idx, (train_idx, test_idx) in ts_iterator: # n_splits = (nsample - max_train_size) / test_size
endog_train = endog_past.iloc[train_idx].copy() # y_true: train
endog_val = endog_train.iloc[-forecasting_steps:].copy() # y_true: validation
endog_test = endog_past.iloc[test_idx].copy() # y_true: test
exog_train = exog_past.iloc[train_idx].copy()
exog_val = exog_train.iloc[-forecasting_steps:].copy()
exog_test = exog_past.iloc[test_idx].copy()
#model = smt.SARIMAX(endog=endog_train, exog=exog_train, order=(2,1,2), trend='n', freq='D').fit(disp=0)
#y_pred_val = model.predict(start=endog_val.index[0], end=endog_val.index[-1])
#y_pred_test = model.forecast(steps=forecasting_steps, exog=exog_test)
model = sm.OLS(endog=endog_train, exog=exog_train).fit()
y_pred_val = model.predict(exog_val)
y_pred_test = model.predict(exog_test)
evaluation_train = tsa_evaluation(endog_val, y_pred_val)
evaluation_test = tsa_evaluation(endog_test, y_pred_test)
evaluation_train.update(TrainStart=endog_train.index[0], TrainEnd=endog_train.index[-1], TrainPeriods=endog_train.shape[0], TestStart=endog_val.index[0], TestEnd=endog_val.index[-1], TestPeriods=endog_val.shape[0])
evaluation_test.update(TrainStart=endog_train.index[0], TrainEnd=endog_train.index[-1], TrainPeriods=endog_train.shape[0], TestStart=endog_test.index[0], TestEnd=endog_test.index[-1], TestPeriods=endog_test.shape[0])
evaluation_train = pd.DataFrame(data=evaluation_train.values(), index=evaluation_train.keys(), columns=pd.Index([f'split{split_idx}'], name='n_split'))
evaluation_test = pd.DataFrame(data=evaluation_test.values(), index=evaluation_test.keys(), columns=pd.Index([f'split{split_idx}'], name='n_split'))
train_evaluations.append(evaluation_train)
test_evaluations.append(evaluation_test)
past_evaluation_train = pd.concat(train_evaluations, axis=1).T.reset_index().set_index(['n_split', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']); past_evaluation_train['domain'] = 'train'
past_evaluation_test = pd.concat(test_evaluations, axis=1).T.reset_index().set_index(['n_split', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']); past_evaluation_test['domain'] = 'test'
past_evaluation = pd.concat([past_evaluation_train, past_evaluation_test], axis=0).reset_index().set_index(['n_split', 'domain', 'TrainStart', 'TrainEnd', 'TrainPeriods', 'TestStart','TestEnd', 'TestPeriods']).astype(float).sort_values(by=['n_split'])
final_result = sm.OLS(endog=endog_past.iloc[-training_periods:], exog=exog_past.iloc[-training_periods:]).fit()
y_future_frame.loc[lambda x: x['sample-domain'] == 'out', 'ts'] = final_result.predict(exog_futrue)
past_evaluation.groupby(['domain']).mean().T
Reference
Scikit-learn modeling API
from sklearn import linear_model
#linear_model.enet_path(X, y)
#linear_model.lars_path(X, y)
#linear_model.lars_path_gram(Xy, Gram)
#linear_model.lasso_path(X, y)
#linear_model.orthogonal_mp(X, y)
#linear_model.orthogonal_mp_gram(Gram, Xy)
#linear_model.ridge_regression(X, y, alpha)
linear_model.LogisticRegression()
linear_model.LogisticRegressionCV()
linear_model.PassiveAggressiveClassifier()
linear_model.Perceptron()
linear_model.RidgeClassifier()
linear_model.RidgeClassifierCV()
linear_model.SGDClassifier()
linear_model.ElasticNet()
linear_model.ElasticNetCV()
linear_model.Lars()
linear_model.LarsCV()
linear_model.Lasso()
linear_model.LassoCV()
linear_model.LassoLars()
linear_model.LassoLarsCV()
linear_model.LassoLarsIC()
linear_model.OrthogonalMatchingPursuit()
linear_model.OrthogonalMatchingPursuitCV()
linear_model.ARDRegression()
linear_model.BayesianRidge()
linear_model.MultiTaskElasticNet()
linear_model.MultiTaskElasticNetCV()
linear_model.MultiTaskLasso()
linear_model.MultiTaskLassoCV()
linear_model.HuberRegressor()
linear_model.RANSACRegressor()
linear_model.TheilSenRegressor()
linear_model.PoissonRegressor()
linear_model.TweedieRegressor()
linear_model.GammaRegressor()
linear_model.PassiveAggressiveRegressor()
from sklearn import ensemble
#ensemble.StackingClassifier(estimators)
#ensemble.StackingRegressor(estimators)
#ensemble.VotingClassifier(estimators)
#ensemble.VotingRegressor(estimators)
ensemble.AdaBoostClassifier()
ensemble.AdaBoostRegressor()
ensemble.BaggingClassifier()
ensemble.BaggingRegressor()
ensemble.ExtraTreesClassifier()
ensemble.ExtraTreesRegressor()
ensemble.GradientBoostingClassifier()
ensemble.GradientBoostingRegressor()
ensemble.IsolationForest()
ensemble.RandomForestClassifier()
ensemble.RandomForestRegressor()
ensemble.RandomTreesEmbedding()
from sklearn import naive_bayes
naive_bayes.BernoulliNB()
naive_bayes.CategoricalNB()
naive_bayes.ComplementNB()
naive_bayes.GaussianNB()
naive_bayes.MultinomialNB()
from sklearn import neighbors
#neighbors.BallTree(X[, leaf_size, metric])
#neighbors.KDTree(X[, leaf_size, metric])
#neighbors.kneighbors_graph(X, n_neighbors)
#neighbors.radius_neighbors_graph(X, radius)
neighbors.KernelDensity()
neighbors.KNeighborsClassifier()
neighbors.KNeighborsRegressor()
neighbors.KNeighborsTransformer()
neighbors.LocalOutlierFactor()
neighbors.RadiusNeighborsClassifier()
neighbors.RadiusNeighborsRegressor()
neighbors.RadiusNeighborsTransformer()
neighbors.NearestCentroid()
neighbors.NearestNeighbors()
neighbors.NeighborhoodComponentsAnalysis()
from sklearn import neural_network
neural_network.BernoulliRBM()
neural_network.MLPClassifier()
neural_network.MLPRegressor()
from sklearn import svm
svm.LinearSVC()
svm.LinearSVR()
svm.NuSVC()
svm.NuSVR()
svm.OneClassSVM()
svm.SVC()
svm.SVR()
from sklearn import tree
#tree.export_graphviz(decision_tree)
#tree.export_text(decision_tree)tree.DecisionTreeClassifier()
tree.DecisionTreeRegressor()
tree.ExtraTreeClassifier()
tree.ExtraTreeRegressor()
from sklearn import discriminant_analysis
discriminant_analysis.LinearDiscriminantAnalysis()
discriminant_analysis.QuadraticDiscriminantAnalysis()
from sklearn import gaussian_process
gaussian_process.GaussianProcessClassifier()
gaussian_process.GaussianProcessRegressor()
from sklearn import pipeline
#pipeline.FeatureUnion(transformer_list)
#pipeline.Pipeline(steps)
#pipeline.make_pipeline(*steps[, memory, verbose])
#pipeline.make_union(*transformers[, n_jobs, ...])
from sklearn import model_selection
#model_selection.GridSearchCV(estimator, param_grid)
#model_selection.ParameterGrid(param_grid)
#model_selection.ParameterSampler(param_distributions, n_iter)
#model_selection.RandomizedSearchCV(estimator, param_distributions)
Scikit-learn evaluation API
from sklearn import metrics
## CLASSIFICATION
metrics.accuracy_score(y_true, y_pred)
metrics.balanced_accuracy_score(y_true, y_pred)
metrics.classification_report(y_true, y_pred)
metrics.confusion_matrix(y_true, y_pred)
metrics.f1_score(y_true, y_pred)
metrics.fbeta_score(y_true, y_pred, beta)
metrics.hamming_loss(y_true, y_pred)
metrics.jaccard_score(y_true, y_pred)
metrics.matthews_corrcoef(y_true, y_pred)
metrics.multilabel_confusion_matrix(y_true, y_pred)
metrics.precision_score(y_true, y_pred)
metrics.recall_score(y_true, y_pred)
metrics.zero_one_loss(y_true, y_pred)
#metrics.brier_score_loss(y_true, y_prob)
#metrics.auc(x, y)
#metrics.average_precision_score(y_true, y_pred)
#metrics.cohen_kappa_score(y1, y2)
#metrics.dcg_score(y_true, y_score)
#metrics.det_curve(y_true, y_score)
#metrics.hinge_loss(y_true, pred_decision)
#metrics.log_loss(y_true, y_pred)
#metrics.ndcg_score(y_true, y_score)
#metrics.precision_recall_curve(y_true, y_pred)
#metrics.precision_recall_fscore_support()
#metrics.roc_auc_score(y_true, y_score)
#metrics.roc_curve(y_true, y_score)
#metrics.top_k_accuracy_score(y_true, y_score)
# REGRESSION
metrics.explained_variance_score(y_true, y_pred)
metrics.max_error(y_true, y_pred)
metrics.mean_absolute_error(y_true, y_pred)
metrics.mean_squared_error(y_true, y_pred)
metrics.median_absolute_error(y_true, y_pred)
metrics.mean_absolute_percentage_error()
metrics.r2_score(y_true, y_pred)
metrics.mean_tweedie_deviance(y_true, y_pred)
#metrics.mean_squared_log_error(y_true, y_pred)
#metrics.mean_poisson_deviance(y_true, y_pred)
#metrics.mean_gamma_deviance(y_true, y_pred)
#metrics.d2_tweedie_score(y_true, y_pred)
#metrics.mean_pinball_loss(y_true, y_pred)
ANOVA for parameter validation (grid search ANOVA)
# https://scikit-learn.org/stable/modules/model_evaluation.html
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold
def GridSearchANOVA(cross_validation_result, verbose=True):
from itertools import combinations
import pandas as pd
from scipy import stats
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.multicomp import MultiComparison
# ANOVA data
evaluation = pd.DataFrame(cross_validation_result).drop('params', axis=1)
evaluation = evaluation.set_index(list(filter(lambda column: column.startswith('param'), evaluation.columns)))
evaluation = evaluation[list(filter(lambda column: column.startswith('split'), evaluation.columns))]
evaluation.columns = pd.MultiIndex.from_tuples(map(lambda column: (column.split('_')[1], column.split('_')[0][5:]), evaluation.columns))
evaluation.index.names = list(map(lambda index: index[6:], evaluation.index.names))
evaluation = evaluation.stack(0)
evaluation.index.names = list(evaluation.index.names)[:-1] + ['domain'] # train/test
factors = list(evaluation.index.names)
evaluation = evaluation.stack(0).to_frame().rename(columns={0:'score'})
evaluation.index.names = list(evaluation.index.names[:-1]) + ['subject']
evaluation = evaluation.reset_index().fillna('None')
for factor in factors:
evaluation[factor] = evaluation[factor].astype(str)
evaluation['paired_factor'] = eval("+' & '+".join(list(map(lambda factor: "evaluation['"+factor+"']", factors))))
score_means = evaluation.groupby(factors)[['score']].mean()
if verbose:
display(score_means)
# ANOVA Assumption
import pingouin as pg
assumptions = dict()
assumptions['homoscedasticity'] = pg.homoscedasticity(data=evaluation, dv='score', group='paired_factor')
assumptions['normality'] = pg.normality(data=evaluation, dv='score', group='paired_factor') # Shapiro-Wilk test
# Factorial ANOVA
factors = list(map(lambda factor: 'C('+factor+')', factors))
variance_sources = list()
for i, sources in enumerate(map(lambda i: combinations(factors, i), range(1, len(factors)+1))):
sources = list(sources)
i += 1
for source in sources:
source = ':'.join(list(source)) if i != 1 else source[0]
variance_sources.append(source)
patsy_formula = ' ~ ' + ' + '.join(variance_sources)
anova_table = sm.stats.anova_lm(ols('score' + patsy_formula, data=evaluation).fit(), typ=2)
mc = MultiComparison(evaluation['score'], evaluation['paired_factor'])
posthoc_results = mc.tukeyhsd() # TUKEY HONESTLY SIGNIFICANT DIFFERENCE (HSD)
anova_summary = posthoc_results.summary()
anova_summary = pd.DataFrame(anova_summary.data[1:], columns=anova_summary.data[0])
if verbose:
display(anova_table)
display(anova_summary)
display(posthoc_results.plot_simultaneous())
return assumptions, score_means, anova_table, anova_summary
X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=None) # cross validation & randomness control
classifier = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
classifier = GridSearchCV(estimator=classifier, cv=cv, scoring=['accuracy', 'recall', 'precision', 'f1'][1], return_train_score=True,
param_grid={
'powertransformer__standardize':[True, False],
'lineardiscriminantanalysis__priors':[(.1, .9), (.5, .5), None],
'lineardiscriminantanalysis__solver':['svd', 'lsqr', 'eigen'],
})
classifier.fit(X, y)
assumpation, score_means, anova_table, anova_summary = GridSearchANOVA(classifier.cv_results_, verbose=True)
ANOVA for scenario validation (ScenarioANOVA)
'quantitative analysis > data analysis' 카테고리의 다른 글
Analysis Project Integration Management (0) | 2023.05.07 |
---|---|
Performance Monitoring (0) | 2023.05.07 |
Exploratory Data Analysis (EDA) & Confirmatory Data Analysis (CDA) (2) | 2023.05.07 |
Data Acquisition & Preprocessing (0) | 2023.05.07 |
Data Analysis Project (0) | 2023.05.07 |