Preprocessing manual
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
Numpy/Pandas Preprocessing
- numpy
import numpy as np
np.c_[np.random.normal(size=(100, 2)), np.random.normal(size=(100, 2))] # 100*4
np.r_[np.random.normal(size=(100, 2)), np.random.normal(size=(100, 2))] # 200*2
np.mgrid[-1:1:100j] # 100,
np.mgrid[-1:1:100j, -1:1:30j] # 2*100*30
np.kron(np.arange(5), np.ones(3)) # [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]
np.repeat(np.arange(5), repeats=3) # [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]
np.tile(np.arange(3), reps=5) # [0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2]
arr = np.array([[8,1,7], [4,3,9], [5,2,6]])
np.apply_along_axis(func1d=sorted, axis=1, arr=arr)
- dataframe, series, index
import pandas as pd
index = pd.Index(['A', 'B', 'C'], name='Alphabet')
index = pd.MultiIndex.from_product([['A', 'B', 'C'],['1', '2']], names=['Alphabet', 'Number'])
index = pd.MultiIndex.from_tuples([('A', '1'), ('A', '2'), ('B', '1'), ('B', '2'), ('C', '1'), ('C', '2')], names=['Alphabet', 'Number'])
index = pd.MultiIndex.from_arrays([('A', 'A', 'B', 'B', 'C', 'C'), ('1', '2', '1', '2', '1', '2')], names=['Alphabet', 'Number'])
index = pd.MultiIndex.from_frame(pd.DataFrame([('A', '1'), ('A', '2'), ('B', '1'), ('B', '2'), ('C', '1'), ('C', '2')], columns=['Alphabet', 'Number']))
index.tolist()
index.to_series()
index.to_frame()
index.names # <-> index.name
index.values
index.sort_values(ascending=False)
index.difference([('A', '1'), ('A', '2')], sort=False)
index.drop([('A', '1'), ('A', '2')])
index.drop('A', level=0)
index[pd.Series([0])]
index[pd.Series([1])]
index[pd.Series([2])]
index[0]
index[1]
index[2]
index[slice(0,2)], index[index.get_loc('A')], index[index.get_locs(['A'])] # for sigle-level
index[slice(2,4)], index[index.get_loc('B')], index[index.get_locs(['B'])] # for sigle-level
index[slice(4,6)], index[index.get_loc('C')], index[index.get_locs(['C'])] # for sigle-level
index[index.get_locs(['A', '1'])] # for sequential multi-level
index[index.get_locs(['A', '2'])] # for sequential multi-level
index.get_level_values(0)
index.droplevel(level=[0])
index.swaplevel(0,1)
index.reorder_levels(['Number', 'Alphabet'])
series = pd.Series([1,2,3], index=pd.Index(['A', 'B', 'C'], name='Alphabet'), name='Number')
series.to_frame()
series.to_dict()
series.tolist()
series.name
series.index
series.values
series.sort_index(ascending=False)
series.sort_values(ascending=False)
series['A'].item()
series['B'].item()
series['C'].item()
frame = pd.DataFrame([[1],[2],[3]], index=pd.Index(['A', 'B', 'C'], name='Alphabet'), columns=['Number'])
frame.columns
frame.index
frame.values
frame.sort_index(ascending=False)
frame.sort_values(by=['Number'], ascending=False)
frame['Number']['A'].item()
frame['Number']['B'].item()
frame['Number']['C'].item()
- search
import numpy as np
import pandas as pd
dataframe = pd.DataFrame(np.random.randint(0, 3, (100, 5)), columns=['G1', 'G2', 'G3', 'A', 'B'])
dataframe['G1'] = dataframe['G1'].apply(lambda x: 'g1_0' if x == 0 else 'g1_1' if x == 1 else 'g1_2')
dataframe['G2'] = dataframe['G2'].apply(lambda x: 'g2_0' if x == 0 else 'g2_1' if x == 1 else 'g2_2')
dataframe['G3'] = dataframe['G3'].apply(lambda x: 'g3_0' if x == 0 else 'g3_1' if x == 1 else 'g3_2')
dataframe = dataframe.set_index(['G1', 'G2', 'G3']).sort_index()
# return index
dataframe.index.get_level_values(0) # return index of level 0
dataframe.index.get_level_values(1) # return index of level 1
dataframe.index.get_level_values(2) # return index of level 2
dataframe.columns.get_level_values(0)
# return dataframe for searching index-key condition
dataframe.xs(key='g1_0', level=0, axis=0)
dataframe.xs(key='g2_0', level=1, axis=0)
dataframe.xs(key='g3_0', level=2, axis=0)
dataframe.loc[['g1_0', 'g1_1', 'g1_2']] # zeroth single-level index with multiple keys
dataframe.loc[('g1_0', 'g2_0', 'g3_0')] # multiple-level index
dataframe.loc[dataframe.index.get_level_values(0) == 'g1_0'] # sigle-level index
dataframe.loc[dataframe.index.get_level_values(1) == 'g2_0'] # sigle-level index
dataframe.loc[dataframe.index.get_level_values(2) == 'g3_0'] # sigle-level index
dataframe.loc[dataframe.index.get_level_values(0).isin(['g1_0', 'g1_1', 'g1_2'])] # sigle-level index with multiple keys
dataframe.loc[dataframe.index.get_level_values(1).isin(['g2_0', 'g2_1', 'g2_2'])] # sigle-level index with multiple keys
dataframe.loc[dataframe.index.get_level_values(2).isin(['g3_0', 'g3_1', 'g3_2'])] # sigle-level index with multiple keys
dataframe.iloc[dataframe.index.get_loc('g1_0')] # zeroth single-level index
dataframe.iloc[dataframe.index.get_locs(['g1_0'])] # sequential multi-level index
dataframe.iloc[dataframe.index.get_locs(['g1_0', 'g2_0'])] # sequential multi-level index
dataframe.iloc[dataframe.index.get_locs(['g1_0', 'g2_0', 'g3_0'])] # sequential multi-level index
dataframe.iloc[dataframe.index.get_locs(['g1_0', 'g2_0', 'g3_0']), 0] # sequential multi-level index, column
dataframe.iloc[dataframe.index.get_locs(['g1_0', 'g2_0', 'g3_0']), 1] # sequential multi-level index, column
dataframe.at[('g1_0',)] # sequential multi-level index
dataframe.at[('g1_0', 'g2_0')] # sequential multi-level index
dataframe.at[('g1_0', 'g2_0', 'g3_0')] # sequential multi-level index
# return dataframe for searching data-value condition
dataframe.loc[dataframe['A'] == 0]
dataframe.loc[dataframe['B'] == 0]
# update after searching data
index_condition1 = lambda x: (x.index.get_level_values(level=1) == 'g2_0')&(x.index.get_level_values(level=2) == 'g3_0')
index_condition2 = lambda x: (x.index.get_level_values(level=1) == 'g2_1')|(x.index.get_level_values(level=2) == 'g3_1')
dataframe.loc[index_condition1] = dataframe.loc[index_condition1] + 10
dataframe.loc[index_condition2] = dataframe.loc[index_condition2] - 10
value_codition1 = lambda x: x.index[x['A'] == 1]
value_codition2 = lambda x: x.index[x['B'] == 1]
dataframe.loc[value_codition1] = dataframe.loc[value_codition1] - 20
dataframe.loc[value_codition2] = dataframe.loc[value_codition2] - 20
- manipulation
import numpy as np
import pandas as pd
dataframe = pd.DataFrame(np.random.randint(0, 3, (100, 5)), columns=['G1', 'G2', 'G3', 'A', 'B'])
dataframe['G1'] = dataframe['G1'].apply(lambda x: 'g1_0' if x == 0 else 'g1_1' if x == 1 else 'g1_2')
dataframe['G2'] = dataframe['G2'].apply(lambda x: 'g2_0' if x == 0 else 'g2_1' if x == 1 else 'g2_2')
dataframe['G3'] = dataframe['G3'].apply(lambda x: 'g3_0' if x == 0 else 'g3_1' if x == 1 else 'g3_2')
dataframe = dataframe.set_index(['G1', 'G2', 'G3']).sort_index(level=None, axis=0, ascending=True)
dataframe = dataframe.groupby(['G1', 'G2', 'G3']).count()
dataframe.unstack(level=0, fill_value=None).stack(level=1)
dataframe.unstack(level=1, fill_value=None).stack(level=1)
dataframe.unstack(level=2, fill_value=None).stack(level=1)
dataframe.swaplevel(0, 1).sort_index(level=0, axis=0, ascending=True) # dataframe.swaplevel(1, 0)
dataframe.swaplevel(1, 2).sort_index(level=1, axis=0, ascending=True) # dataframe.swaplevel(2, 1)
dataframe.swaplevel(2, 0).sort_index(level=2, axis=0, ascending=True) # dataframe.swaplevel(0, 2)
- group aggregation/disaggregation
import numpy as np
import pandas as pd
dataframe = pd.DataFrame(np.ones((10,2)), columns=['A', 'B'])
df_column_A = dataframe[['A']]; s_column_A = dataframe['A']
df_column_B = dataframe[['B']]; s_column_B = dataframe['B']
# operation for dataframe: assign & pipe
dataframe.assign(C=lambda df: df['A'] + df['B'] + df.size) # return frame (Recommanded)
dataframe.pipe(lambda df: df[['A', 'B']] + df.size) # return frame (Recommanded)
dataframe.pipe(lambda df: df['A'] + df['B'] + df.size) # return series (Recommanded)
dataframe.pipe(lambda df: df.size) # return contraction value (Recommanded)
dataframe[['A']].pipe(lambda df_column_A: df_column_A + df_column_A.size) # return dataframe
dataframe[['B']].pipe(lambda df_column_B: df_column_B + df_column_B.size) # return dataframe
dataframe['A'].pipe(lambda s_column_A: s_column_A + s_column_A.size) # return series
dataframe['B'].pipe(lambda s_column_B: s_column_B + s_column_B.size) # return series
# operation for series: apply
dataframe.apply(lambda df_column: df_column + df_column.size) # return dataframe
dataframe.apply(lambda df_column: df_column.sum() + df_column.size) # return series
dataframe[['A']].apply(lambda df_column_A: df_column_A) # return dataframe
dataframe[['B']].apply(lambda df_column_B: df_column_B) # return dataframe
dataframe['A'].apply(lambda element: element) # return series (Recommanded)
dataframe['B'].apply(lambda element: element) # return series (Recommanded)
# operation for elementwise: applymap
dataframe.applymap(lambda element: element) # (Recommanded)
# window operation for series: rolling, expanding, ewm
dataframe['A'].rolling(window=4, min_periods=None, center=False, closed='right').mean() # return series
dataframe['B'].rolling(window=4, min_periods=None, center=False, closed='left').mean() # return series
dataframe['A'].expanding().sum() # return series
dataframe['B'].expanding().sum() # return series
dataframe['A'].ewm(com=1).sum() # return series
dataframe['B'].ewm(com=1).sum() # return series
# groupby operation
dataframe = pd.DataFrame(np.c_[np.random.randint(0, 2, 10), np.ones((10,2))], columns=['G', 'A', 'B'])
dataframe.groupby(['G']).apply(lambda grp_df: grp_df[['G', 'A', 'B']]) # return dataframe
dataframe.groupby(['G']).apply(lambda grp_df: grp_df['A'] + grp_df['B']) # return series
dataframe.groupby(['G']).apply(lambda grp_df: grp_df['A'] + grp_df['B'] + grp_df[['G', 'A', 'B']].size) # return series
dataframe.groupby(['G'])[['G', 'A', 'B']].apply(lambda grp_df: 10*grp_df[['G', 'A', 'B']]) # return dataframe
dataframe.groupby(['G'])[['A', 'B']].apply(lambda grp_df: 10*grp_df[['A', 'B']]) # return dataframe
dataframe.groupby(['G'])['A'].apply(lambda grp_s_column_A: 10*grp_s_column_A) # return series
dataframe.groupby(['G'])['B'].apply(lambda grp_s_column_B: 10*grp_s_column_B) # return series
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df[['G', 'A', 'B']]).apply(lambda grp_df: 10*grp_df) # return dataframe
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df[['A', 'B']]).apply(lambda grp_df: 10*grp_df) # return dataframe
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df['A']).apply(lambda grp_s_column_A: 10*grp_s_column_A) # return series
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df['B']).apply(lambda grp_s_column_B: 10*grp_s_column_B) # return series
# groupby&rolling operation
dataframe.groupby(['G']).rolling(window=3, center=True)[['A', 'B']].apply(lambda grp_df: grp_df.sum()) # return dataframe
dataframe.groupby(['G']).rolling(window=3, center=True)['A'].apply(lambda grp_s_column_A: grp_s_column_A.sum()) # return series
dataframe.groupby(['G']).rolling(window=3, center=True)['B'].apply(lambda grp_s_column_B: grp_s_column_B.sum()) # return series
dataframe.groupby(['G'])[['A', 'B']].rolling(window=3, center=True).apply(lambda grp_df: grp_df.sum()) # return dataframe
dataframe.groupby(['G'])['A'].rolling(window=3, center=True).apply(lambda grp_s_column_A: grp_s_column_A.sum()) # return series
dataframe.groupby(['G'])['B'].rolling(window=3, center=True).apply(lambda grp_s_column_B: grp_s_column_B.sum()) # return series
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df[['A', 'B']]).rolling(window=3, center=True).apply(lambda grp_df: grp_df.sum()) # return dataframe
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df['A']).rolling(window=3, center=True).apply(lambda grp_s_column_A: grp_s_column_A.sum()) # return series
dataframe.groupby(['G']).pipe(lambda grp_df: grp_df['B']).rolling(window=3, center=True).apply(lambda grp_s_column_B: grp_s_column_B.sum()) # return series
import numpy as np
import pandas as pd
dataframe = pd.DataFrame(np.random.randint(0, 3, (100, 5)), columns=['G1', 'G2', 'G3', 'A', 'B'])
dataframe['G1'] = dataframe['G1'].apply(lambda x: 'g1_0' if x == 0 else 'g1_1' if x == 1 else 'g1_2')
dataframe['G2'] = dataframe['G2'].apply(lambda x: 'g2_0' if x == 0 else 'g2_1' if x == 1 else 'g2_2')
dataframe['G3'] = dataframe['G3'].apply(lambda x: 'g3_0' if x == 0 else 'g3_1' if x == 1 else 'g3_2')
dataframe['I'] = np.arange(dataframe.shape[0]) # primary key
# crosstab: 1D aggregation by index
dataframe.groupby(['G1', 'A'])['A'].count().unstack()
pd.crosstab(index=[dataframe['G1']], columns=[dataframe['A']], margins=True, margins_name='All')
pd.crosstab(index=[dataframe['G1']], columns=[dataframe['A']], values=dataframe['A'], aggfunc=['count'], margins=True, margins_name='All')
# crosstab: 2D aggregation by index&columns
dataframe.groupby(['G1', 'G2', 'A'])['A'].count().unstack().sum(axis=1).unstack()
pd.crosstab(index=[dataframe['G1']], columns=[dataframe['G2']], values=dataframe['A'], aggfunc=['count'], margins=True, margins_name='All')
dataframe.groupby(['G1', 'G2', 'A'])['A'].sum().unstack().sum(axis=1).unstack()
pd.crosstab(index=[dataframe['G1']], columns=[dataframe['G2']], values=dataframe['A'], aggfunc=['sum'], margins=True, margins_name='All')
# pivot: disaggregation
dataframe.set_index(['I', 'A']).unstack(level=1)
dataframe.pivot(index=['I'], columns=['A']) # note) index with primary key
pd.pivot(dataframe, index=['I'], columns=['A']) # note) index with primary key
- merge, concatenation
import numpy as np
import pandas as pd
dataframe1 = pd.DataFrame(np.c_[np.arange(30), np.random.randint(0, 3, (30, 5))], columns=['PK', 'G1', 'G2', 'G3', 'A', 'B'])
dataframe2 = pd.DataFrame(np.c_[np.arange(30), np.random.randint(3, 6, (30, 5))], columns=['PK', 'G1', 'G2', 'A', 'B', 'C'])
dataframe1.merge(dataframe2, how='left', on=['A', 'B'], suffixes=['_df1', '_df2'], validate='m:m')
dataframe1.merge(dataframe2, how='left', on=['PK', 'A', 'B'], suffixes=['_df1', '_df2'], validate='1:m')
pd.merge(left=dataframe1, right=dataframe2, how='left', on=['A', 'B'], suffixes=['_df1', '_df2'], validate='m:m')
pd.merge(left=dataframe1, right=dataframe2, how='left', on=['PK', 'A', 'B'], suffixes=['_df1', '_df2'], validate='1:m')
dataframe1 = pd.DataFrame(np.c_[np.arange(30), np.random.randint(0, 3, (30, 5))], columns=['PK', 'G1', 'G2', 'G3', 'A', 'B']).set_index(['PK'])
dataframe2 = pd.DataFrame(np.c_[np.arange(30), np.random.randint(3, 6, (30, 5))], columns=['PK', 'G1', 'G2', 'A', 'B', 'C']).set_index(['PK'])
pd.concat([dataframe1, dataframe2], axis=1, join='outer', keys=None, levels=None, names=None, verify_integrity=False)
Model-Specific Design
linear model
- logistic regression
- assumption:
- transform of measurable set
- transform of measure
naive bayes model
- bernoulli naive bayes classifier
- assumption:
- transform of measurable set
- transform of measure
- multinomial naive bayes classifier
- assumption:
- transform of measurable set
- transform of measure
- gaussian naive bayes classifier
- assumption:
- transform of measurable set
- transform of measure
- PCA, FactorAnalysis
discriminative analysis
- linear discriminative analysis
- assumption:
- transform of measurable set
- Binarizer, SplineTransformer
- transform of measure
- Multivariate Normal Distribution: QuantileTransformer, PowerTransformer
- quadratic discriminative analysis
- assumption:
- transform of measurable set
- transform of measure
support vector machine
- support vector classifier
- assumption:
- transform of measurable set
- transform of measure
tree
- decision tree
- assumption:
- transform of measurable set
- transform of measure
- random forest
- assumption:
- transform of measurable set
- transform of measure
k-nearest neighbor
- k-nearest neighbor
- assumption:
- transform of measurable set
- transform of measure
ensemble
- boosting
- assumption:
- transform of measurable set
- transform of measure
- bagging
- assumption:
- transform of measurable set
- transform of measure
- stacking
- assumption:
- transform of measurable set
- transform of measure
Data Engineering
Categorical Data Manipulation
Ordinal Variables
ordinal mapping
#
Nominal Variables
onehot-encoding
import numpy as np
import pandas as pd
# Scalar Encoding
df = pd.DataFrame(data=list(map(lambda x: dict(enumerate(list('ABCD')))[x], np.random.randint(4, size=1000))), columns=['X'])
scaler_encoding = pd.get_dummies(df['X'], prefix='Encoding').astype(int)
pd.concat([df, scaler_encoding], axis=1).drop_duplicates()
# Vector Encoding
ds1 = pd.Series(data=list(map(lambda x: dict(enumerate(list('ABC')))[x], np.random.randint(3, size=1000))), name='X1')
ds2 = pd.Series(data=list(map(lambda x: dict(enumerate(list('PQ')))[x], np.random.randint(2, size=1000))), name='X2')
df = pd.concat([ds1, ds2], axis=1)
vector_encoding = pd.get_dummies(df[['X1', 'X2']], prefix='Encoding').astype(int)
pd.concat([df, vector_encoding], axis=1).drop_duplicates()
import numpy as np
import pandas as pd
from patsy import dmatrix
# Scalar Encoding
df = pd.DataFrame(data=list(map(lambda x: dict(enumerate(list('ABCD')))[x], np.random.randint(4, size=1000))), columns=['X'])
scaler_encoding = dmatrix('C(X) + 0', data=df, return_type='dataframe')
scaler_encoding = dmatrix('C(X, levels=["D", "C", "B", "A"]) + 0', data=df, return_type='dataframe') # full-rank
scaler_encoding = dmatrix('C(X, Treatment(reference="A")) + 1', data=df, return_type='dataframe') # reduced-rank
pd.concat([df, scaler_encoding], axis=1).drop_duplicates()
# Vector Encoding
ds1 = pd.Series(data=list(map(lambda x: dict(enumerate(list('ABC')))[x], np.random.randint(3, size=1000))), name='X1')
ds2 = pd.Series(data=list(map(lambda x: dict(enumerate(list('PQR')))[x], np.random.randint(3, size=1000))), name='X2')
ds3 = pd.Series(data=list(map(lambda x: dict(enumerate(list('XYZ')))[x], np.random.randint(3, size=1000))), name='X3')
df = pd.concat([ds1, ds2, ds3], axis=1)
vector_encoding = dmatrix('C(X1):C(X2) + 0', data=df, return_type='dataframe')
vector_encoding = dmatrix('C(X1):C(X2) + 1', data=df, return_type='dataframe')
vector_encoding = dmatrix('C(X1) + C(X2) + 0', data=df, return_type='dataframe')
vector_encoding = dmatrix('C(X1) + C(X2) + C(X3) + 0', data=df, return_type='dataframe')
vector_encoding = dmatrix('C(X1) + C(X2, Treatment("P")) + C(X3, Treatment("X")) + 0', data=df, return_type='dataframe')
pd.concat([df, vector_encoding], axis=1).drop_duplicates()
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
# Scalar Encoding
df = pd.DataFrame(data=list(map(lambda x: dict(enumerate(list('ABCD')))[x], np.random.randint(4, size=1000))), columns=['X'])
encoder = OneHotEncoder(handle_unknown='ignore')
scaler_encoding = pd.DataFrame(np.c_[df.values, encoder.fit_transform(df).toarray()])
scaler_encoding.drop_duplicates()
# Vector Encoding
ds1 = pd.Series(data=list(map(lambda x: dict(enumerate(list('ABC')))[x], np.random.randint(3, size=1000))), name='X1')
ds2 = pd.Series(data=list(map(lambda x: dict(enumerate(list('PQ')))[x], np.random.randint(2, size=1000))), name='X2')
df = pd.concat([ds1, ds2], axis=1)
encoder = OneHotEncoder(handle_unknown='ignore', categories='auto', drop=None)
vector_encoding = pd.DataFrame(np.c_[df.values, encoder.fit_transform(df).toarray()])
vector_encoding.drop_duplicates()
embdding
import numpy as np
import pandas as pd
df = pd.DataFrame(data=list(map(lambda x: dict(enumerate(list('ABCD')))[x], np.random.randint(4, size=1000))), columns=['X'])
encoding = pd.get_dummies(df['X'], prefix='Encoding').astype(int)
embedding = pd.DataFrame(data=(np.random.normal(size=(10, 4)) @ encoding.values.T).T, columns=map(lambda x: 'Embedding_'+str(x), range(10)))
pd.concat([df, embedding], axis=1)
frequency corresponding to a target instance on response variable
#
specific-distribution mapping
import numpy as np
import pandas as pd
ds = pd.Series(data=list(map(lambda x: dict(enumerate(list('ABCDEFGHIJIJKLMNOPGRSTUVWXYZ')))[x], np.random.binomial(n=25, p=1/3, size=1000))), name='Category')
dist = ds.value_counts().sample(frac=1)
dist = pd.concat([dist[0::2].sort_values(), dist[1::2].sort_values()[::-1]], axis=0)
dist = dist.to_frame().reset_index().reset_index().rename(columns={'index':'RV'})
mapper = pd.Series(data=dist['RV'].values, index=dist['Category'].tolist())
df = pd.concat([ds, ds.apply(lambda category: mapper[category]).rename('RV')], axis=1)
df
Unified structure with numerical data
#
Numerical Data Manipulation
Continous Variables
#
Unified structure with categorical data
#
Time Series Data Manipulation
Time Feature
import pandas as pd
import numpy as np
df = pd.DataFrame(data=np.random.normal(size=1000), index=pd.date_range(start='2000-01-01', periods=1000, freq='d'), columns=['value']).asfreq('d')
df['year'] = df.index.year
df['quarterofyear'] = df.index.quarter
df['monthofyear'] = df.index.month
df['weekofyear'] = df.index.isocalendar().week # week of year
df['dayofyear'] = df.index.dayofyear
df['dayofmonth'] = df.index.day
df['dayofweek'] = df.index.dayofweek
df['nameofmonthofyear'] = df.index.month_name()
df['nameofdayofweek'] = df.index.day_name()
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import statsmodels.tsa.api as smt
ma = [1,]
ar = [1, 0,0,.5]
y = smt.ArmaProcess(ar=ar, ma=ma).generate_sample(nsample=300, burnin=50)
y = pd.Series(y, index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))
fig = plt.figure(figsize=(30, 5)); gs = mpl.gridspec.GridSpec(2, 10)
pd.plotting.autocorrelation_plot(y, ax=fig.add_subplot(gs[0, 0:5]))
pd.plotting.lag_plot(y, lag=1, ax=fig.add_subplot(gs[0, 5:10]))
smt.graphics.plot_acf(y, lags=30, ax=fig.add_subplot(gs[1, 0:5]))
smt.graphics.plot_pacf(y, lags=30, ax=fig.add_subplot(gs[1, 5:10]))
plt.tight_layout()
Future Frame
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.tsa.api as smt
from statsmodels.tsa.deterministic import DeterministicProcess, Fourier, Seasonality, TimeTrend
def future_frame(data, f_steps=20):
data = data if isinstance(data, pd.DataFrame) else data.rename('ts').to_frame()
data.index = data.index.rename('Date')
trend = TimeTrend(constant=True, order=2)
fourier = Fourier(period=52, order=3)
seasonal = Seasonality(period=11)
dp = DeterministicProcess(data.index, additional_terms=[trend, fourier, seasonal])
insample = dp.in_sample(); insample['sample-domain'] = 'in'
outofsample = dp.out_of_sample(steps=f_steps); outofsample['sample-domain'] = 'out'
f_frame = pd.concat([insample, outofsample], axis=0)
f_frame.index = f_frame.index.rename('Date')
f_frame = f_frame.merge(data, how='left', on='Date', validate='1:1')
f_frame['rel-trend'] = f_frame.index.year + f_frame.index.month/12 + f_frame.index.day/365
return f_frame
y = pd.Series(data=np.random.normal(0, 1, size=5300).cumsum(), index=pd.date_range('00:00:00', periods=5300, freq='D').rename('Date'))
y_future_frame = future_frame(y, f_steps=20)
endog = y_future_frame['ts'].copy()
endog_train = y_future_frame.loc[lambda x: x['sample-domain'] == 'in', 'ts'].copy()
endog_test = y_future_frame.loc[lambda x: x['sample-domain'] == 'out', 'ts'].copy()
exog = y_future_frame.drop(['ts', 'sample-domain'], axis=1).copy()
exog_train = y_future_frame.loc[lambda x: x['sample-domain'] == 'in'].drop(['ts', 'sample-domain'], axis=1).copy()
exog_test = y_future_frame.loc[lambda x: x['sample-domain'] == 'out'].drop(['ts', 'sample-domain'], axis=1).copy()
y_future_frame
Cross Validation & Long-Term Period Evaluation: Data Splition
import pandas as pd
import statsmodels.tsa.api as smt
from sklearn.model_selection import TimeSeriesSplit
y = smt.ArmaProcess(ar=[1, .3, -.5], ma=[1, .1, .3]).generate_sample(nsample=500, burnin=50).cumsum()
y = pd.Series(y, index=pd.date_range(end='00:00:00', periods=y.shape[0], freq='D'))
cv = TimeSeriesSplit(n_splits=int((500-50)/10), max_train_size=50, test_size=10, gap=0) # n_splits = (nsample - max_train_size) / test_size
pd.DataFrame(cv.split(y), columns=['X', 'y'])
Hypothesis Test
import numpy as np
from scipy import stats
import statsmodels.stats.api as sms
y = np.random.normal(size=1000)
sp_stat, sp_pval = stats.shapiro(y) # Normality [ sp_pval > 0.05 ]
dw_stat = sms.durbin_watson(y) # Autocorrelation [ 1.5 (positively correlated) < dw_stat < 2.5 (negatively correlated) ]
bl_stat, bl_p = stats.bartlett(y[:500], y[500:]) # Heteroscedasticity [ bl_p < 0.05]
import numpy as np
import pandas as pd
import statsmodels.tsa.api as smt
import statsmodels.stats.api as sms
y = smt.ArmaProcess(ar=[1, .3, -.5], ma=[1, .1, .3]).generate_sample(nsample=500, burnin=50).cumsum()
stats, p = sms.normal_ad(y) # [Normality] Anderson–Darling Test
stats, pval, used_lag, nobs, critical_values, icbest = smt.adfuller(y, maxlag=None, regression='c', autolag='AIC', store=False, regresults=False) # [Stationary:Stochastic Trend] ADF(Augmented Dickey-Fuller) Test
stats, pval, used_lag, critical_values = smt.kpss(y, regression= ['c', 'ct'][0], nlags=['auto', 'legacy'][0], store = False) # [Stationary:Deterministic Trend] KPSS Test
lm_stats, lm_pval, f_stats, f_pval = sms.het_breuschpagan(y, exog_het=np.c_[np.ones_like(y), np.arange(y.shape[0])]) # [Heteroskedasticity] Breusch–Pagan Test
return_df = sms.acorr_ljungbox(x=y, lags=None, boxpierce=False, model_df=0, period=None, return_df=True, auto_lag=False) # [Autocorrelation] Ljung–Box Test
Time Series Integration
import numpy as np
import pandas as pd
y = np.random.normal(size=1000)
data = pd.DataFrame(y.cumsum(), columns=['y'])
data['D1_y'] = data['y'].diff().fillna(data['y'].iloc[0])
data['I1D1_y'] = data['D1_y'].cumsum()
y = np.random.normal(size=1000)
data = pd.DataFrame(y.cumsum().cumsum(), columns=['y'])
data['D1_y'] = data['y'].diff().fillna(data['y'].iloc[0])
data['D2_y'] = data['D1_y'].diff().fillna(data['D1_y'].iloc[0])
data['I1D2_y'] = data['D2_y'].cumsum()
data['I2D2_y'] = data['D2_y'].cumsum().cumsum()
y = np.random.normal(size=1000)
data = pd.DataFrame(y.cumsum().cumsum().cumsum(), columns=['y'])
data['D1_y'] = data['y'].diff().fillna(data['y'].iloc[0])
data['D2_y'] = data['D1_y'].diff().fillna(data['D1_y'].iloc[0])
data['D3_y'] = data['D2_y'].diff().fillna(data['D2_y'].iloc[0])
data['I1D3_y'] = data['D3_y'].cumsum()
data['I2D3_y'] = data['D3_y'].cumsum().cumsum()
data['I3D3_y'] = data['D3_y'].cumsum().cumsum().cumsum()
data
Panel Data Manipulation
Data Panelization
#
Panel Cross-Validation
#
Missing value pre-processing
import numpy as np
from sklearn.impute import KNNImputer, SimpleImputer
X = [[1, 2, np.inf],
[3, 4, 3],
[np.nan, 6, 5],
[8, 8, 7]]
X = pd.DataFrame(X).replace([np.inf, -np.inf], np.nan)
imputer = SimpleImputer()
imputer = KNNImputer(n_neighbors=2)
X_new = imputer.fit_transform(X)
X_new
Cross-sectional Interpolation
#
Time Series Interpolation
Trend Interpolation
import numpy as np
import pandas as pd
import statsmodels.tsa.api as smt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import WhiteKernel, RBF, Matern, RationalQuadratic, ExpSineSquared, DotProduct, ConstantKernel
def gaussian_interpolation(series, constant_length=1.0, rbf_length=1.0):
if (series.isna().sum() == 0) or (series.isna().sum() >= series.shape[0]-1):
return series
else:
t_train = (series.index.year + series.index.month/12 + series.index.day/365)[~series.isna()] # non_missing_index
t_test = (series.index.year + series.index.month/12 + series.index.day/365)[series.isna()] # missing_index
y_train = series[~series.isna()] # non_missing_values
gpr = GaussianProcessRegressor(kernel=ConstantKernel(constant_length, constant_value_bounds="fixed") * RBF(rbf_length, length_scale_bounds="fixed"))
gpr = gpr.fit(t_train.values[:, np.newaxis], y_train.values[:, np.newaxis])
y_test = gpr.predict(t_test.values[:, np.newaxis])
series = series.fillna(pd.Series(y_test, index=series.index[series.isna()]))
return series
size = 500
data = pd.DataFrame(np.c_[
smt.ArmaProcess(ar=[1], ma=[1]).generate_sample(nsample=size, burnin=500).cumsum(),
smt.ArmaProcess(ar=[1, -.3, .2], ma=[1, .3, 1]).generate_sample(nsample=size, burnin=500).cumsum(),
], index=pd.date_range('00:00:00', periods=size, freq='D'), columns=['TS1', 'TS2'])
data.loc[data.index.to_series().sample(frac=.50).index, 'TS1'] = None
data.loc[data.index.to_series().sample(frac=.05).index, 'TS2'] = None
data['GP_TS1'] = data['TS1'].resample('m').apply(gaussian_interpolation)
data['GP_TS2'] = data['TS2'].resample('m').apply(gaussian_interpolation)
# interpolation visualization
data['GP_TS1'].plot(figsize=(30, 5))
data['TS1'].plot(figsize=(30, 5))
Cycle/Seasonal Interpolation
import numpy as np
import pandas as pd
import statsmodels.tsa.api as smt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import WhiteKernel, RBF, Matern, RationalQuadratic, ExpSineSquared, DotProduct, ConstantKernel
def gaussian_interpolation(series, constant_length=1.0, rbf_length=1.0, ess_length=1.0, periodicity=12.0):
if (series.isna().sum() == 0) or (series.isna().sum() >= series.shape[0]-1):
return series
else:
t_train = (series.index.year + series.index.month/12 + series.index.day/365)[~series.isna()] # non_missing_index
t_test = (series.index.year + series.index.month/12 + series.index.day/365)[series.isna()] # missing_index
y_train = series[~series.isna()] # non_missing_values
gpr = GaussianProcessRegressor(kernel=ConstantKernel(constant_length, constant_value_bounds="fixed") * RBF(rbf_length, length_scale_bounds="fixed") * ExpSineSquared(length_scale=ess_length, periodicity=periodicity, length_scale_bounds='fixed', periodicity_bounds='fixed'))
gpr = gpr.fit(t_train.values[:, np.newaxis], y_train.values[:, np.newaxis])
y_test = gpr.predict(t_test.values[:, np.newaxis])
series = series.fillna(pd.Series(y_test, index=series.index[series.isna()]))
return series
size = 500
data = pd.DataFrame(np.c_[
smt.ArmaProcess(ar=[1, 0,0,0,0,0,0,0,0,0,0,0, .8], ma=[1]).generate_sample(nsample=size, burnin=500).cumsum(),
smt.ArmaProcess(ar=[1, 0,0,0,0,.8], ma=[1, .3, 1]).generate_sample(nsample=size, burnin=500).cumsum(),
], index=pd.date_range('00:00:00', periods=size, freq='D'), columns=['TS1', 'TS2'])
data.loc[data.index.to_series().sample(frac=.50).index, 'TS1'] = None
data.loc[data.index.to_series().sample(frac=.20).index, 'TS2'] = None
data['GP_TS1'] = data['TS1'].resample('m').apply(gaussian_interpolation)
data['GP_TS2'] = data['TS2'].resample('m').apply(gaussian_interpolation)
# interpolation visualization
data['GP_TS1'].plot(figsize=(30, 5))
data['TS1'].plot(figsize=(30, 5))
Noise Interpolation
import numpy as np
import pandas as pd
import statsmodels.tsa.api as smt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import WhiteKernel, RBF, Matern, RationalQuadratic, ExpSineSquared, DotProduct, ConstantKernel
def gaussian_interpolation(series):
if (series.isna().sum() == 0) or (series.isna().sum() >= series.shape[0]-1):
return series
else:
t_train = (series.index.year + series.index.month/12 + series.index.day/365)[~series.isna()] # non_missing_index
t_test = (series.index.year + series.index.month/12 + series.index.day/365)[series.isna()] # missing_index
y_train = series[~series.isna()] # non_missing_values
gpr = GaussianProcessRegressor(optimizer=None, alpha=0, normalize_y=False,
kernel=ConstantKernel(1.0, constant_value_bounds="fixed") * RBF(1, length_scale_bounds="fixed") + WhiteKernel(noise_level=1**2, noise_level_bounds='fixed'),
)
gpr = gpr.fit(t_train.values[:, np.newaxis], y_train.values[:, np.newaxis])
y_test = gpr.predict(t_test.values[:, np.newaxis])
series = series.fillna(pd.Series(y_test, index=series.index[series.isna()]))
return series
size = 500
data = pd.DataFrame(np.c_[
smt.ArmaProcess(ar=[1], ma=[1]).generate_sample(nsample=size, burnin=500),
smt.ArmaProcess(ar=[1, -.3, .2], ma=[1, .3, 1]).generate_sample(nsample=size, burnin=500),
], index=pd.date_range('00:00:00', periods=size, freq='D'), columns=['TS1', 'TS2'])
data.loc[data.index.to_series().sample(frac=.50).index, 'TS1'] = None
data.loc[data.index.to_series().sample(frac=.20).index, 'TS2'] = None
data['GP_TS1'] = data['TS1'].resample('m').apply(gaussian_interpolation)
data['GP_TS2'] = data['TS2'].resample('m').apply(gaussian_interpolation)
# interpolation visualization
data['GP_TS1'].plot(figsize=(30, 5))
data['TS1'].plot(figsize=(30, 5))
Outlier pre-processing
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.ensemble import IsolationForest
from sklearn.covariance import EllipticEnvelope
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
X, y = make_classification(n_samples=10000, weights=[0.99, 0.01], flip_y=0)
outlier_detector = EllipticEnvelope(contamination=0.01)
outlier_detector = LocalOutlierFactor()
outlier_detector = OneClassSVM(nu=0.01)
outlier_detector = IsolationForest(contamination=0.1)
anomaly = outlier_detector.fit_predict(X)
# select all rows that are not outliers
mask = anomaly != -1
X_new, y_new = X[mask, :], y[mask]
print('- NEW:', X_new.shape, y_new.shape)
Validation ANOVA table
Model | Outlier Preprocessor | Scaling | Contamination | Cross Validation | Score |
Linear Model | EllipticEnvelop | Maxabs | probs | repeated number | scores |
Linear Model | EllipticEnvelop | Minmax | probs | repeated number | scores |
Linear Model | EllipticEnvelop | Robust | probs | repeated number | scores |
Linear Model | LocalOutlierFactor | Maxabs | probs | repeated number | scores |
Linear Model | LocalOutlierFactor | Minmax | probs | repeated number | scores |
Linear Model | LocalOutlierFactor | Robust | probs | repeated number | scores |
Linear Model | OneClassSVM | Maxabs | probs | repeated number | scores |
Linear Model | OneClassSVM | Minmax | probs | repeated number | scores |
Linear Model | OneClassSVM | Robust | probs | repeated number | scores |
Linear Model | IsolationForest | Maxabs | probs | repeated number | scores |
Linear Model | IsolationForest | Minmax | probs | repeated number | scores |
Linear Model | IsolationForest | Robust | probs | repeated number | scores |
binary classification
# https://scikit-learn.org/stable/modules/model_evaluation.html
import numpy as np
import pandas as pd
from scipy import stats
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.anova import AnovaRM
from statsmodels.stats.multicomp import MultiComparison
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold
from sklearn.ensemble import IsolationForest
from sklearn.covariance import EllipticEnvelope
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
# RobustScaler, MinMaxScaler
pipelines = dict()
pipelines['R'] = make_pipeline(RobustScaler(), LogisticRegression())
pipelines['M'] = make_pipeline(MinMaxScaler(), LogisticRegression())
pipelines['A'] = make_pipeline(MaxAbsScaler(), LogisticRegression())
teatments = dict()
for scaler, pipeline in pipelines.items():
for contamination in [0, .01, .05, .10, .2, .5]:
#EllipticEnvelope(contamination=0.01, random_state=None)
#LocalOutlierFactor()
#OneClassSVM(nu=0.01)
#IsolationForest(contamination=0.1, bootstrap=False, random_state=None, verbose=0)
anomaly = IsolationForest(contamination=contamination, bootstrap=False, random_state=None, verbose=0).fit_predict(X) if contamination != 0 else np.ones((X.shape[0],))
X_new, y_new = X[(anomaly != -1), :], y[(anomaly != -1)]
scores = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, fit_params=None, return_train_score=False, return_estimator=False, n_jobs=-1, verbose=0)
scores = pd.DataFrame(scores)
# before preprocessing
scores['before_size'] = y.shape[0]
freq = pd.Series(y).value_counts()
ratio = freq/freq.sum()
freq = pd.concat([pd.DataFrame(freq).T]*scores.shape[0], axis=0, ignore_index=True)
ratio = pd.concat([pd.DataFrame(ratio).T]*scores.shape[0], axis=0, ignore_index=True)
freq.columns = map(lambda column: 'before_freq_'+str(column), freq.columns)
ratio.columns = map(lambda column: 'before_ratio_'+str(column), ratio.columns)
scores = pd.concat([scores, freq, ratio], axis=1)
# after preprocessing
scores['after_size'] = y_new.shape[0]
freq = pd.Series(y_new).value_counts()
ratio = freq/freq.sum()
freq = pd.concat([pd.DataFrame(freq).T]*scores.shape[0], axis=0, ignore_index=True)
ratio = pd.concat([pd.DataFrame(ratio).T]*scores.shape[0], axis=0, ignore_index=True)
freq.columns = map(lambda column: 'after_freq_'+str(column), freq.columns)
ratio.columns = map(lambda column: 'after_ratio_'+str(column), ratio.columns)
scores = pd.concat([scores, freq, ratio], axis=1)
scores.index = pd.MultiIndex.from_tuples(map(lambda x: (scaler, contamination, x), scores.index))
scores.index.names = ['scaler', 'contamination', 'subject']
teatments[(scaler, contamination)] = scores
teatments = pd.concat(teatments.values(), axis=0).reset_index()
anova_data = teatments[['scaler', 'contamination', 'subject', 'test_recall']].copy()
"""
mixed_anova_data = anova_data.set_index(['scaler', 'contamination', 'subject']).unstack(1)
mixed_anova_data.index = pd.MultiIndex.from_tuples(map(lambda x: (x[0][0], x[1]), zip(mixed_anova_data.index, range(mixed_anova_data.index.shape[0]))))
mixed_anova_data = mixed_anova_data.stack(1).reset_index().rename(columns={'level_0':'scaler', 'level_1':'subject'})"""
# anova
anova_table = sm.stats.anova_lm(ols("""test_recall ~ C(scaler) + C(contamination) + C(scaler):C(contamination)""", data=anova_data).fit(), typ=2)
display(anova_table)
# post-hoc
mc = MultiComparison(anova_data['test_recall'], anova_data['scaler']+anova_data['contamination'].astype(str))
tukeyhsd = mc.tukeyhsd() # TUKEY HONESTLY SIGNIFICANT DIFFERENCE (HSD)
tukeyhsd.plot_simultaneous()
summary = pd.DataFrame(tukeyhsd.summary().data)
summary.columns = summary.iloc[0]
summary = summary.drop(0, axis=0).set_index(['group1', 'group2']).sort_values(['reject', 'meandiff'], ascending=[False, False])
display(summary)
Imbalance sample pre-processing
from collections import Counter
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_val_score
X, y = make_classification(n_samples=10000, weights=[0.99, 0.01], flip_y=0)
# UnderSampler
from imblearn.under_sampling import *
#X_new, y_new = RandomUnderSampler(random_state=None).fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = TomekLinks().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = CondensedNearestNeighbour().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = OneSidedSelection().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = EditedNearestNeighbours().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = NeighbourhoodCleaningRule().fit_resample(X, y) # or .fit_sample(X, y)
# OverSampler
from imblearn.over_sampling import *
#X_new, y_new = RandomOverSampler().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = ADASYN().fit_resample(X, y) # or .fit_sample(X, y)
#X_new, y_new = SMOTE().fit_resample(X, y) # or .fit_sample(X, y)
# CombinedSampler
from imblearn.combine import *
#X_new, y_new = SMOTEENN().fit_resample(X, y) # or .fit_sample(X, y)
X_new, y_new = SMOTETomek().fit_resample(X, y) # or .fit_sample(X, y)
classifier = DecisionTreeClassifier()
scores1 = cross_val_score(classifier, X, y, scoring='f1_micro', cv=5, n_jobs=-1)
scores2 = cross_val_score(classifier, X_new, y_new, scoring='f1_micro', cv=5, n_jobs=-1)
"""
classifier1 = DecisionTreeClassifier().fit(X, y)
classifier2 = DecisionTreeClassifier().fit(X_new, y_new)
print(classification_report(y, classifier1.predict(X)))
print(classification_report(y_new, classifier2.predict(X_new)))
"""
print('F-measure: %.3f' % scores1.mean())
print(' - y:', Counter(y))
print('F-measure: %.3f' % scores2.mean())
print(' - y_new:', Counter(y_new))
Validation ANOVA table
Model | Sampling | Method | Strategy | Cross Validation | Score |
Decision Tree | under | Random | ratio | repeated number | scores |
Decision Tree | under | TomexLinks | ratio | repeated number | scores |
Decision Tree | under | CondensedNearestNeighbour | ratio | repeated number | scores |
Decision Tree | under | OneSidedSelection | ratio | repeated number | scores |
Decision Tree | under | EditedNearestNeighbours | ratio | repeated number | scores |
Decision Tree | under | NeighbourhoodCleaningRule | ratio | repeated number | scores |
Decision Tree | over | Random | ratio | repeated number | scores |
Decision Tree | over | ADASYN | ratio | repeated number | scores |
Decision Tree | over | SMOTE | ratio | repeated number | scores |
Decision Tree | combined | SMOTEENN | ratio | repeated number | scores |
Decision Tree | combined | SMOTETomex | ratio | repeated number | scores |
binary classification
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.stats.anova import AnovaRM
from statsmodels.stats.multicomp import MultiComparison
from imblearn.under_sampling import *
from imblearn.over_sampling import *
from imblearn.combine import *
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_val_score, cross_validate, StratifiedKFold
X, y = make_classification(n_samples=1000, weights=[0.9, .1], n_classes=2, n_clusters_per_class=1, flip_y=0)
cv = StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
balancing_samplers = dict()
balancing_samplers['Under'] = dict()
balancing_samplers['Over'] = dict()
balancing_samplers['Combined'] = dict()
balancing_samplers['Under']['RandomUnderSampler'] = RandomUnderSampler(sampling_strategy='auto', random_state=None)
balancing_samplers['Under']['TomekLinks'] = TomekLinks(sampling_strategy='auto')
balancing_samplers['Under']['CondensedNearestNeighbour'] = CondensedNearestNeighbour(sampling_strategy='auto', n_neighbors=None, n_seeds_S=1, random_state=None)
balancing_samplers['Under']['OneSidedSelection'] = OneSidedSelection(sampling_strategy='auto', n_neighbors=None, n_seeds_S=1, random_state=None)
balancing_samplers['Under']['EditedNearestNeighbours'] = EditedNearestNeighbours(sampling_strategy='auto', n_neighbors=3)
balancing_samplers['Under']['NeighbourhoodCleaningRule'] = NeighbourhoodCleaningRule(sampling_strategy='auto', n_neighbors=3, threshold_cleaning=0.5)
balancing_samplers['Over']['RandomOverSampler'] = RandomOverSampler(sampling_strategy='auto', random_state=None)
balancing_samplers['Over']['ADASYN'] = ADASYN(sampling_strategy='auto', n_neighbors=5, random_state=None)
balancing_samplers['Over']['SMOTE'] = SMOTE(sampling_strategy='auto', k_neighbors=5, random_state=None)
balancing_samplers['Combined']['SMOTEENN'] = SMOTEENN(sampling_strategy='auto', smote=None, enn=None, random_state=None)
balancing_samplers['Combined']['SMOTETomek'] = SMOTETomek(sampling_strategy='auto', smote=None, tomek=None, random_state=None)
balancing_samplers['All'] = dict([('Identity', lambda _: _)] + list(balancing_samplers['Under'].items()) + list(balancing_samplers['Over'].items()) + list(balancing_samplers['Combined'].items()))
teatments = dict()
for sampling_strategy in [0, .5, .8, .9]:
X_new, y_new = SMOTETomek(sampling_strategy=sampling_strategy, smote=None, tomek=None, random_state=None).fit_resample(X, y) if sampling_strategy != 0 else (X, y)
classifier = DecisionTreeClassifier()
scores = cross_validate(classifier, X_new, y_new, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, fit_params=None, return_train_score=False, return_estimator=False, n_jobs=-1, verbose=0)
scores = pd.DataFrame(scores)
# before preprocessing
scores['before_size'] = y.shape[0]
freq = pd.Series(y).value_counts()
ratio = freq/freq.sum()
freq = pd.concat([pd.DataFrame(freq).T]*scores.shape[0], axis=0, ignore_index=True)
ratio = pd.concat([pd.DataFrame(ratio).T]*scores.shape[0], axis=0, ignore_index=True)
freq.columns = map(lambda column: 'before_freq_'+str(column), freq.columns)
ratio.columns = map(lambda column: 'before_ratio_'+str(column), ratio.columns)
scores = pd.concat([scores, freq, ratio], axis=1)
# after preprocessing
scores['after_size'] = y_new.shape[0]
freq = pd.Series(y_new).value_counts()
ratio = freq/freq.sum()
freq = pd.concat([pd.DataFrame(freq).T]*scores.shape[0], axis=0, ignore_index=True)
ratio = pd.concat([pd.DataFrame(ratio).T]*scores.shape[0], axis=0, ignore_index=True)
freq.columns = map(lambda column: 'after_freq_'+str(column), freq.columns)
ratio.columns = map(lambda column: 'after_ratio_'+str(column), ratio.columns)
scores = pd.concat([scores, freq, ratio], axis=1)
scores.index = pd.MultiIndex.from_tuples(map(lambda x: (sampling_strategy, x), scores.index))
scores.index.names = ['sampling_strategy', 'subject']
teatments[sampling_strategy] = scores
teatments = pd.concat(teatments.values(), axis=0).reset_index()
display(teatments.groupby('sampling_strategy')[['test_recall']].mean())
data = teatments.copy()
# anova
anova_table = AnovaRM(data=data, depvar='test_recall', within=['sampling_strategy'], subject='subject').fit().anova_table
display(anova_table)
# post-hoc
mc = MultiComparison(data['test_recall'], data['sampling_strategy'])
table, _, _ = mc.allpairtest(stats.ttest_ind, method= "bonf") # or method="sidak"
summary = pd.DataFrame(table.data)
summary.columns = summary.iloc[0]
summary = summary.drop(0, axis=0).set_index(['group1', 'group2']).sort_values(['reject'], ascending=[False])
display(summary)
Data transformation
transform of measurable set
# https://scikit-learn.org/stable/modules/preprocessing.html
# https://scikit-learn.org/stable/auto_examples/preprocessing/plot_all_scaling.html
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, OneHotEncoder, PolynomialFeatures, SplineTransformer, LabelBinarizer, LabelEncoder
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
import seaborn as sns
size = 10000
categorical_columns = ['Binary', 'Discrete_B', 'Discrete_P', 'Discrete_M1', 'Discrete_M2', 'Discrete_M3', 'Discrete_U', 'Discrete_G', 'Discrete_N']
numerical_columns = ['Continuous_N', 'Continuous_U', 'Continuous_E']
data = pd.DataFrame(np.c_[
stats.bernoulli.rvs(p=2/3, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
stats.poisson.rvs(mu=5, size=size),
stats.multinomial.rvs(n=5, p=[.3, .3, .3], size=size),
stats.randint.rvs(0, 10, size=size),
stats.geom.rvs(p=.2, size=size),
stats.nbinom.rvs(n=5, p=.2, size=size),
stats.norm.rvs(0,1, size=size),
stats.uniform.rvs(0,10, size=size),
stats.expon.rvs(0, 1, size=size),
], columns= categorical_columns + numerical_columns)
data.columns = pd.MultiIndex.from_tuples(map(lambda column: ('Categorical', column) if column in categorical_columns else ('Numerical', column), data.columns))
# for categorical variables
Binarizer(threshold = data['Categorical']['Discrete_B'].mean()).fit_transform(data['Categorical'][['Discrete_B']])
OrdinalEncoder().fit_transform(data['Categorical'][['Binary', 'Discrete_B']])
OneHotEncoder(sparse_output=False).fit_transform(data['Categorical'])
PolynomialFeatures(degree=2, interaction_only=False, include_bias=True, order=['C', 'F'][0]).fit_transform(data['Categorical'])
SplineTransformer(degree=3, n_knots=5, knots=['uniform', 'quantile'][0], extrapolation=['error', 'constant', 'linear', 'continue', 'periodic'][1], include_bias=True, order=['C', 'F'][0]).fit_transform(data['Categorical'])
# for numerical variables
KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][0], strategy=['uniform', 'quantile', 'kmeans'][0]).fit_transform(data['Numerical'])
KBinsDiscretizer(n_bins=10, encode=['ordinal', 'onehot-dense', 'onehot', ][2], strategy=['uniform', 'quantile', 'kmeans'][0]).fit_transform(data['Numerical'])
Binarizer(threshold = data['Numerical']['Continuous_N'].mean()).fit_transform(data['Numerical'][['Continuous_N']])
PolynomialFeatures(degree=2, interaction_only=False, include_bias=True, order=['C', 'F'][0]).fit_transform(data['Numerical'])
SplineTransformer(degree=3, n_knots=5, knots=['uniform', 'quantile'][0], extrapolation=['error', 'constant', 'linear', 'continue', 'periodic'][1], include_bias=True, order=['C', 'F'][0]).fit_transform(data['Numerical'])
transform of measure
# https://scikit-learn.org/stable/modules/preprocessing.html
# https://scikit-learn.org/stable/auto_examples/preprocessing/plot_all_scaling.html
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, LabelBinarizer, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
size = 1000
categorical_columns = ['Binary', 'Discrete_B', 'Discrete_P', 'Discrete_M1', 'Discrete_M2', 'Discrete_M3', 'Discrete_U', 'Discrete_G', 'Discrete_N']
numerical_columns = ['Continuous_N', 'Continuous_U', 'Continuous_E']
data = pd.DataFrame(np.c_[
stats.bernoulli.rvs(p=2/3, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
stats.poisson.rvs(mu=5, size=size),
stats.multinomial.rvs(n=5, p=[.3, .3, .3], size=size),
stats.randint.rvs(0, 10, size=size),
stats.geom.rvs(p=.2, size=size),
stats.nbinom.rvs(n=5, p=.2, size=size),
stats.norm.rvs(0,1, size=size),
stats.uniform.rvs(0,10, size=size),
stats.expon.rvs(0, 1, size=size),
], columns= categorical_columns + numerical_columns)
data.columns = pd.MultiIndex.from_tuples(map(lambda column: ('Categorical', column) if column in categorical_columns else ('Numerical', column), data.columns))
transformer = PowerTransformer(method='yeo-johnson') # yeo-johnson, box-cox
transformer = QuantileTransformer(n_quantiles=100, output_distribution='normal') # normal, uniform
transformer = MinMaxScaler()
transformer = RobustScaler()
transformer = StandardScaler()
transformer = Normalizer()
# Linear Transform
# Standardization, or mean removal and variance scaling
transformer = StandardScaler().fit(data)
transformer.transform(data).mean(axis=0) # 0
transformer.transform(data).std(axis=0) # 1
transformer = MinMaxScaler().fit(data)
transformer.transform(data).min(axis=0) # 0
transformer.transform(data).max(axis=0) # 1
transformer = MaxAbsScaler().fit(data)
transformer.transform(data).min(axis=0) # 0 or -1 (if a feature include negative values)
transformer.transform(data).max(axis=0) # 1
transformer = RobustScaler(quantile_range=(25.0, 75.0)).fit(data)
np.percentile(transformer.transform(data), [10, 50, 90]) # iqr 10%> -1, 50%> 0, 90%> 1
# Non-Linear Transform Uniform-like
transformer = QuantileTransformer(output_distribution='uniform').fit(data) #automatically collapse any outlier by setting them to the a priori defined range boundaries (0 and 1)
transformer.transform(data).min(axis=0) # 0
transformer.transform(data).max(axis=0) # 1
# Non-Linear Transform with Gaussian-like
transformer = QuantileTransformer(output_distribution='normal').fit(data)
transformer.transform(data).min(axis=0) # fixed value
transformer.transform(data).max(axis=0) # fixed value
transformer = PowerTransformer(method='yeo-johnson', standardize=True).fit(data) # yeo-johnson, box-cox (positive data)
transformer.transform(data).mean(axis=0) # 0
transformer.transform(data).std(axis=0) # 1
transformer.lambdas_
# Constraint
transformer = Normalizer(norm='l2').fit(data)
(transformer.transform(data)**2).sum(axis=1)
Validation ANOVA table
Model | Normality | Homoscedasticity | Vector Space | Cross-Validation | Score |
LDA | Y | Y | Y | repeated number | cost-sensitive metric |
LDA | Y | Y | N | repeated number | cost-sensitive metric |
LDA | Y | N | Y | repeated number | cost-sensitive metric |
LDA | Y | N | N | repeated number | cost-sensitive metric |
LDA | N | Y | Y | repeated number | cost-sensitive metric |
LDA | N | Y | N | repeated number | cost-sensitive metric |
LDA | N | N | Y | repeated number | cost-sensitive metric |
LDA | N | N | N | repeated number | cost-sensitive metric |
binary classification
# https://scikit-learn.org/stable/modules/model_evaluation.html
import pandas as pd
from scipy import stats
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.anova import AnovaRM
from statsmodels.stats.multicomp import MultiComparison
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold
X, y = make_classification(n_samples=10000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
cv = StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
# N:Normality, H:Heteroscedasticity V:Vector Space
pipelines = dict()
pipelines['_NHV'] = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), LinearDiscriminantAnalysis())
pipelines['_NH'] = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=True), LinearDiscriminantAnalysis())
pipelines['_NV'] = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=False), Normalizer(), LinearDiscriminantAnalysis())
pipelines['_N'] = make_pipeline(PowerTransformer(method='yeo-johnson', standardize=False), LinearDiscriminantAnalysis())
pipelines['_HV'] = make_pipeline(StandardScaler(), Normalizer(), LinearDiscriminantAnalysis())
pipelines['_H'] = make_pipeline(StandardScaler(), LinearDiscriminantAnalysis())
pipelines['_V'] = make_pipeline(Normalizer(), LinearDiscriminantAnalysis())
pipelines['_'] = make_pipeline(LinearDiscriminantAnalysis())
teatments = dict()
for teatment, pipeline in pipelines.items():
#cross_val_score(pipeline, X, y, scoring='f1', cv=cv, fit_params=None, n_jobs=-1, verbose=0)
scores = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, fit_params=None, return_train_score=False, return_estimator=False, n_jobs=-1, verbose=0)
scores = pd.DataFrame(scores)
scores.index = pd.MultiIndex.from_tuples(map(lambda x: (teatment, x), scores.index))
scores.index.names = ['treatment', 'subject']
teatments[teatment] = scores
teatments = pd.concat(teatments.values(), axis=0).reset_index()
teatments['normality'] = teatments['treatment'].apply(lambda x: 'N_Yes' if 'N' in list(x) else 'N_No')
teatments['heteroscedasticity'] = teatments['treatment'].apply(lambda x: 'H_Yes' if 'H' in list(x) else 'H_No')
teatments['vectorspace'] = teatments['treatment'].apply(lambda x: 'V_Yes' if 'V' in list(x) else 'V_No')
data = teatments.copy()#.set_index(['normality', 'heteroscedasticity', 'vectorspace'])
display(data.groupby(['treatment'])[['test_recall']].mean())
# anova
anova_table = sm.stats.anova_lm(ols("""
test_recall ~ C(normality) + C(heteroscedasticity) + C(vectorspace)
+ C(normality):C(heteroscedasticity) + C(heteroscedasticity):C(vectorspace) + C(normality):C(vectorspace) + C(normality):C(heteroscedasticity):C(vectorspace)
""", data=data).fit(), typ=2)
display(anova_table)
# post-hoc
mc = MultiComparison(data['test_recall'], data['treatment'])
tukeyhsd = mc.tukeyhsd() # TUKEY HONESTLY SIGNIFICANT DIFFERENCE (HSD)
tukeyhsd.plot_simultaneous()
summary = pd.DataFrame(tukeyhsd.summary().data)
summary.columns = summary.iloc[0]
summary = summary.drop(0, axis=0).set_index(['group1', 'group2']).sort_values(['reject', 'meandiff'], ascending=[False, False])
display(summary)
Dimensionality reduction
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.manifold import Isomap
from sklearn.manifold import LocallyLinearEmbedding
from sklearn.manifold import MDS
from sklearn.manifold import SpectralEmbedding
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.decomposition import FactorAnalysis
from sklearn.decomposition import TruncatedSVD
size = 100
data = pd.DataFrame(np.c_[
stats.bernoulli.rvs(p=2/3, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
stats.poisson.rvs(mu=5, size=size),
stats.multinomial.rvs(n=5, p=[.3, .3, .3], size=size),
np.random.normal(0,1, size=size)
], columns=['Binary', 'Discrete_B', 'Discrete_P', 'Category_1', 'Category_2', 'Category_3', 'Continuous'])
reduction = Isomap()
reduction = LocallyLinearEmbedding()
reduction = MDS()
reduction = SpectralEmbedding()
reduction = TSNE()
reduction = PCA(n_components=3)
reduction = FactorAnalysis(n_components=3)
reduction = TruncatedSVD(n_components=3)
data_new = reduction.fit_transform(data)
data_new
Data grouping: numerical variable to categorical variable
binning
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# pd.cut: equal width binning
# pd.qcut: equal frequency binning
size = 1000
num_bin = 10
df = pd.DataFrame(columns=['X_Category', 'X_Numerical_Discrete', 'X_Numerical_Continuous'],
data = np.c_[
np.array(list(map(lambda x: dict(enumerate(list('ABCDEFGHIJKLMNOPQRSTUVWXYZ')))[x], np.random.randint(0, 26, size=size)))),
np.random.binomial(n=100, p=.3, size=size),
np.random.normal(loc=0, scale=10, size=size),
])
df['X_Numerical_Discrete'] = df['X_Numerical_Discrete'].astype(int)
df['X_Numerical_Continuous'] = df['X_Numerical_Continuous'].astype(float)
df['X_Category_GROUPING'] = df['X_Category'].apply(lambda x: 'Class_A' if x in list('ABCDEFG') else 'Class_B' if x in list('HIZKLMN') else 'Class_C' if x in list('OPQRSTU') else 'Class_D')
df['X_Numerical_Discrete_EW_BINNIG'] = pd.cut(df['X_Numerical_Discrete'], bins=num_bin, precision=6, retbins=False)
df['X_Numerical_Discrete_EF_BINNIG'] = pd.qcut(df['X_Numerical_Discrete'], q=num_bin, precision=6, duplicates='drop', retbins=False)
df['X_Numerical_Continuous_EW_BINNIG'] = pd.cut(df['X_Numerical_Continuous'], bins=num_bin, precision=6, retbins=False)
df['X_Numerical_Continuous_EF_BINNIG'] = pd.qcut(df['X_Numerical_Continuous'], q=num_bin, precision=6, duplicates='drop', retbins=False)
plt.figure(figsize=(30,15))
df['X_Category_GROUPING'].value_counts().sort_index().plot(kind='bar', ax=plt.subplot2grid((3,2), (0,0), colspan=2))
df['X_Numerical_Discrete_EW_BINNIG'].value_counts().sort_index().plot(kind='bar', ax=plt.subplot2grid((3,2), (1,0)))
df['X_Numerical_Discrete_EF_BINNIG'].value_counts().sort_index().plot(kind='bar', ax=plt.subplot2grid((3,2), (1,1)))
df['X_Numerical_Continuous_EW_BINNIG'].value_counts().sort_index().plot(kind='bar', ax=plt.subplot2grid((3,2), (2,0)))
df['X_Numerical_Continuous_EF_BINNIG'].value_counts().sort_index().plot(kind='bar', ax=plt.subplot2grid((3,2), (2,1)))
plt.tight_layout()
clustering
from sklearn.cluster import KMeans
import numpy as np
import pandas as pd
from scipy import stats
size = 100
individual_features = ['X_IContinuous1', 'X_IContinuous2', 'X_IContinuous3']
grouping_features = ['X_GContinuous1', 'X_GContinuous2', 'X_GContinuous3']
data = pd.DataFrame(np.c_[
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
], columns= individual_features + grouping_features)
dependence_index = data['X_GContinuous1'].sample(frac=.8).index
data.loc[dependence_index, ['X_GContinuous2', 'X_GContinuous3']] = data.loc[dependence_index, ['X_GContinuous2', 'X_GContinuous3']].values + 1*data.loc[dependence_index, ['X_GContinuous1']].values
# individual features
for i_feature in individual_features:
kmeans = KMeans(n_clusters=10, random_state=0, n_init="auto").fit(data[[i_feature]])
data.loc[:, i_feature] = kmeans.cluster_centers_[kmeans.labels_] # clustering center features
# grouping features
kmeans = KMeans(n_clusters=10, random_state=0, n_init="auto").fit(data[grouping_features])
data.loc[:, grouping_features] = kmeans.cluster_centers_[kmeans.labels_] # clustering center features
data
Data quantification: categorical variable to numerical variable
onehot encoding and embedding
import numpy as np
import pandas as pd
df = pd.DataFrame(data=list(map(lambda x: dict(enumerate(list('ABCD')))[x], np.random.randint(4, size=1000))), columns=['X_Category'])
encoding = pd.get_dummies(df['X_Category'], prefix='Encoding').astype(int)
embedding = pd.DataFrame(data=(np.random.normal(size=(10, 4)) @ encoding.values.T).T, columns=map(lambda x: 'Embedding_'+str(x), range(10)))
pd.concat([df, encoding, embedding], axis=1)
conditional encoding for binary classification
import numpy as np
import pandas as pd
from scipy import stats
size = 100
categorical_columns = ['X_Binary', 'X_Discrete_B', 'X_Discrete_P', 'X_Category_1', 'X_Category_2', 'X_Category_3']
numerical_columns = ['X_Continuous1', 'X_Continuous2', 'X_Continuous3']
data = pd.DataFrame(np.c_[
stats.bernoulli.rvs(p=2/3, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
stats.poisson.rvs(mu=5, size=size),
stats.multinomial.rvs(n=5, p=[.3, .3, .3], size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.bernoulli.rvs(p=2/3, size=size),
], columns= categorical_columns + numerical_columns + ['Y'])
target_instance = data['Y'].unique()[1]
mapping_tables = dict()
for c_column in categorical_columns:
conditional_probs = (data.groupby([c_column, 'Y'])['Y'].count() / data.groupby([c_column])[c_column].count()).to_frame().rename(columns={0:'c_probs'})
mapping_tables[c_column] = conditional_probs.unstack(1).fillna(0)['c_probs']
data[c_column] = data[c_column].apply(lambda x: mapping_tables[c_column][target_instance][x])
data
conditional encoding for multi-class classification
import numpy as np
import pandas as pd
from scipy import stats
size = 100
categorical_columns = ['X_Binary', 'X_DiscreteB', 'X_DiscreteP', 'X_Category1', 'X_Category2', 'X_Category3']
numerical_columns = ['X_Continuous1', 'X_Continuous2', 'X_Continuous3']
data = pd.DataFrame(np.c_[
stats.bernoulli.rvs(p=2/3, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
stats.poisson.rvs(mu=5, size=size),
stats.multinomial.rvs(n=5, p=[.3, .3, .3], size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.norm.rvs(0,1, size=size),
stats.binom.rvs(n=5, p=2/3, size=size),
], columns= categorical_columns + numerical_columns + ['Y'])
mapping_tables = dict()
for c_column in categorical_columns:
conditional_probs = (data.groupby([c_column, 'Y'])['Y'].count() / data.groupby([c_column])[c_column].count()).to_frame().rename(columns={0:'c_probs'})
conditional_probs = conditional_probs.unstack(1).fillna(0)['c_probs']
conditional_probs.columns = list(map(lambda i: c_column + f'_{i}', range(len(conditional_probs.columns))))
mapping_tables[c_column] = conditional_probs.reset_index()
data = data.merge(mapping_tables[c_column], on=c_column, how='left', validate='m:1')
data = data[data.columns.difference(categorical_columns, sort=False)]
data
Pipelines
pipeline examples
# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
# Preprocessing Framework
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, LabelBinarizer, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import FunctionTransformer, PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
# Pipelining Framework
from copy import deepcopy
from sklearn.datasets import make_classification, make_regression
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, RFECV, SequentialFeatureSelector
from sklearn.model_selection import RepeatedStratifiedKFold, RepeatedKFold, cross_validate, GridSearchCV
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
from sklearn.compose import make_column_selector, make_column_transformer, ColumnTransformer, TransformedTargetRegressor
from sklearn.base import BaseEstimator, TransformerMixin
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
pipelines = dict()
pipelines['condition'] = make_pipeline(make_column_transformer((StandardScaler(), slice(0, 15)), (FunctionTransformer(lambda x: x), slice(15, 30))), LogisticRegression())
pipelines['pooling'] = make_pipeline(make_union(PowerTransformer(method='yeo-johnson', standardize=True), Normalizer(), PCA(n_components=7), FactorAnalysis(n_components=7)), LogisticRegression())
pipelines['imputer'] = make_pipeline(KNNImputer(), LogisticRegression())
pipelines['outlier'] = make_pipeline(QuantileTransformer(output_distribution='normal'), LogisticRegression())
pipelines['linearity'] = make_pipeline(PowerTransformer(), StandardScaler(), Normalizer(), LogisticRegression())
pipelines['dimensionality'] = make_pipeline(FactorAnalysis(n_components=7), LogisticRegression())
pipelines['rfecv'] = make_pipeline(RFECV(estimator=LogisticRegression()), LogisticRegression())
pipelines['sfs'] = make_pipeline(SequentialFeatureSelector(estimator=LogisticRegression(), direction='forward', n_features_to_select=3), LogisticRegression())
cv_results = dict()
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None)
for name, pipeline in deepcopy(pipelines).items():
cv_results[name] = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
#pipeline = GridSearchCV(estimator=pipeline, param_grid={}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][0], return_train_score=True)
#cv_results[name] = pipeline.fit(X, y).cv_results_
feature union
import numpy as np
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, LabelBinarizer, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import FunctionTransformer, PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
X = np.random.normal(size=(1000, 10))
def identity(x): return x
FeatureUnion([('FactorAnalysis', FactorAnalysis(n_components=3))]).fit_transform(X).shape # (1000, 3)
FeatureUnion([('FunctionTransformer', FunctionTransformer(identity)), ('FactorAnalysis', FactorAnalysis(n_components=3))]).fit_transform(X).shape # (1000, 13)
FeatureUnion([('StandardScaler', StandardScaler()), ('FactorAnalysis', FactorAnalysis(n_components=3))]).fit_transform(X).shape # (1000, 13)
FeatureUnion([('StandardScaler', StandardScaler()), ('Normalizer', Normalizer()), ('FactorAnalysis', FactorAnalysis(n_components=3))]).fit_transform(X).shape # (1000, 23)
FeatureUnion([('Pipeline', Pipeline([('StandardScaler', StandardScaler()), ('Normalizer', Normalizer())])), ('FactorAnalysis', FactorAnalysis(n_components=3))]).fit_transform(X).shape # (1000, 13)
make_union(FactorAnalysis(n_components=3)).fit_transform(X).shape # (1000, 3)
make_union(FunctionTransformer(identity), FactorAnalysis(n_components=3)).fit_transform(X).shape # (1000, 13)
make_union(StandardScaler(), FactorAnalysis(n_components=3)).fit_transform(X).shape # (1000, 13)
make_union(Normalizer(), StandardScaler(), FactorAnalysis(n_components=3)).fit_transform(X).shape # (1000, 23)
make_union(make_pipeline(Normalizer(), StandardScaler()), FactorAnalysis(n_components=3)).fit_transform(X).shape # (1000, 13)
column transformer
import numpy as np
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, LabelBinarizer, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import FunctionTransformer, PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
from sklearn.compose import make_column_transformer, ColumnTransformer, TransformedTargetRegressor
X = np.random.normal(size=(1000, 10))
def identity(x): return x
ColumnTransformer([('passthrough', 'passthrough', slice(0,5))]).fit_transform(X).shape # (1000, 5)
ColumnTransformer([('passthrough1', 'passthrough', slice(0,5)), ('passthrough2', 'passthrough', slice(8,10))]).fit_transform(X).shape # (1000, 7)
ColumnTransformer([('drop', 'drop', slice(0,5))]).fit_transform(X).shape # (1000, 0)
ColumnTransformer([('FunctionTransformer', FunctionTransformer(identity), slice(0,5))]).fit_transform(X).shape # (1000, 5)
FeatureUnion([('ColumnTransformer', ColumnTransformer([('passthrough', 'passthrough', slice(0,5))]))]).fit_transform(X).shape # (1000, 5)
FeatureUnion([('ColumnTransformer', ColumnTransformer([('passthrough', 'passthrough', slice(0,5))])), ('FunctionTransformer', FunctionTransformer(identity))]).fit_transform(X).shape # (1000, 15)
FeatureUnion([('ColumnTransformer', ColumnTransformer([('FunctionTransformer', FunctionTransformer(identity), slice(0,5))])), ('FunctionTransformer', FunctionTransformer(identity))]).fit_transform(X).shape # (1000, 15)
FeatureUnion([('ColumnTransformer1', ColumnTransformer([('FunctionTransformer', FunctionTransformer(identity), slice(0,5))])), ('ColumnTransformer2', ColumnTransformer([('FunctionTransformer', FunctionTransformer(identity), slice(5,10))]))]).fit_transform(X).shape # (1000, 15)
make_column_transformer(('passthrough', slice(0,5))).fit_transform(X).shape # (1000, 5)
make_column_transformer(('passthrough', slice(0,5)), ('passthrough', slice(8,10))).fit_transform(X).shape # (1000, 7)
make_column_transformer(('drop', slice(0,5))).fit_transform(X).shape # (1000, 0)
make_column_transformer((FunctionTransformer(identity), slice(0,5))).fit_transform(X).shape # (1000, 5)
make_union(make_column_transformer(('passthrough', slice(0,5)))).fit_transform(X).shape # (1000, 5)
make_union(make_column_transformer(('passthrough', slice(0,5))), FunctionTransformer(identity)).fit_transform(X).shape # (1000, 15)
make_union(make_column_transformer((FunctionTransformer(identity), slice(0,5))), FunctionTransformer(identity)).fit_transform(X).shape # (1000, 15)
make_union(make_column_transformer((FunctionTransformer(identity), slice(0,5))), make_column_transformer((FunctionTransformer(identity), slice(5,10)))).fit_transform(X).shape # (1000, 10)
custom transformer
# https://scikit-learn.org/stable/modules/model_evaluation.html
# binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
# multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
# Preprocessing Framework
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.preprocessing import Binarizer, KBinsDiscretizer, OrdinalEncoder, LabelBinarizer, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import FunctionTransformer, PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
# Pipelining Framework
from copy import deepcopy
from sklearn.datasets import make_classification, make_regression
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, RFECV, SequentialFeatureSelector
from sklearn.model_selection import RepeatedStratifiedKFold, RepeatedKFold, cross_validate, GridSearchCV
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
from sklearn.compose import make_column_selector, make_column_transformer, ColumnTransformer, TransformedTargetRegressor
from sklearn.base import BaseEstimator, TransformerMixin
class CustomTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X
X, y = make_classification(n_samples=3000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
pipelines = dict()
pipelines['data_condition'] = make_pipeline(
make_column_transformer(
(CustomTransformer(), slice(0, 5)),
(FunctionTransformer(lambda x: x), slice(5, 10))
),
SequentialFeatureSelector(estimator=LogisticRegression(), direction='backward'),
LogisticRegression()
)
cv_results = dict()
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None)
for name, pipeline in deepcopy(pipelines).items():
cv_results[name] = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
#pipeline = GridSearchCV(estimator=pipeline, param_grid={}, cv=cv, scoring=['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo'][0], return_train_score=True)
#cv_results[name] = pipeline.fit(X, y).cv_results_
custom preprocessor: fourier
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.compose import make_column_transformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline, make_union
class Transformer(BaseEstimator, TransformerMixin):
def __init__(self, depth=10):
self.depth = depth
def fit(self, X, y=None):
return self
def transform(self, X):
T = 40
f = lambda k: np.c_[np.sin(k*((2*np.pi)/T)*X), np.cos(k*((2*np.pi)/T)*X)]
extended_X = np.array([f(k) for k in range(1, self.depth)])
extended_X = np.c_[extended_X[:, :, 0].T, extended_X[:, :, 1].T]
X = np.c_[X, extended_X]
return X
X_train = np.linspace(-10, 10, 1000)
X_domain = np.linspace(-20, 20, 2000)
y = X_train**2 + np.random.normal(0, 10, size=X_train.shape[0])
model = make_pipeline(Transformer(depth=2), LinearRegression())
model.fit(X_train[:, np.newaxis], y)
plt.plot(X_train, y)
plt.plot(X_domain, model.predict(X_domain[:, np.newaxis]))
custom preprocessor: bessel
#
custom preprocessor: legendre
import numpy as np
import pandas as pd
from scipy.special import lpmv, lpn, lqn, lpmn, lqmn
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.compose import make_column_transformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline, make_union
class Transformer(BaseEstimator, TransformerMixin):
def __init__(self, depth):
self.depth = depth
def fit(self, X, y=None):
return self
def transform(self, X):
v = 1
scaled_X = np.cos(X)
f = lambda m,n: np.c_[lpmv(m, n, scaled_X)]
extended_X = np.array([f(m,n) for n in range(1, self.depth) for m in range(0, v)])
extended_X = extended_X.squeeze().T
X = np.c_[X, scaled_X, extended_X]
return X
X_train = np.linspace(-10, 10, 1000)
X_domain = np.linspace(-20, 20, 2000)
y = X_train**2 + np.random.normal(0, 10, size=X_train.shape[0])
model = make_pipeline(Transformer(depth=10), LinearRegression())
model.fit(X_train[:, np.newaxis], y)
plt.plot(X_train, y)
plt.plot(X_domain, model.predict(X_domain[:, np.newaxis]))
import numpy as np
import pandas as pd
from scipy.special import lpmv, lpn, lqn, lpmn, lqmn
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.compose import make_column_transformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline, make_union
class Transformer(BaseEstimator, TransformerMixin):
def __init__(self, depth):
self.depth = depth
def fit(self, X, y=None):
return self
def transform(self, X):
v = 1
scaled_X = np.cos(X.squeeze())
f1 = lambda m,n: np.c_[pd.Series(scaled_X).apply(lambda x: pd.Series(lpmn(m, n, x)[0].sum(axis=1))).T]
f2 = lambda m,n: np.c_[pd.Series(scaled_X).apply(lambda x: pd.Series(lqmn(m, n, x)[0].sum(axis=1))).T]
extended_X1 = np.array([f1(v,n) for n in range(max(1, v), self.depth)])
extended_X2 = np.array([f1(v,n) for n in range(max(1, v), self.depth)])
extended_X1 = np.column_stack([extended_X1[:, i, :].T for i in range(v+1)])
extended_X2 = np.column_stack([extended_X2[:, i, :].T for i in range(v+1)])
X = np.c_[X, scaled_X, extended_X1, extended_X2]
return X
X_train = np.linspace(-10, 10, 1000)
X_domain = np.linspace(-20, 20, 2000)
y = X_train**2 + np.random.normal(0, 10, size=X_train.shape[0])
model = make_pipeline(Transformer(depth=10), LinearRegression())
model.fit(X_train[:, np.newaxis], y)
plt.plot(X_train, y)
plt.plot(X_domain, model.predict(X_domain[:, np.newaxis]))
custom preprocessor: chaostic function fitting
from functools import partial
import numpy as np
from scipy.special import jv, yn, iv, kn, hankel1, hankel2
from scipy.special import lpn, lqn, lpmv
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.compose import make_column_transformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline, make_union
from sklearn.preprocessing import MaxAbsScaler
class Transformer(BaseEstimator, TransformerMixin):
def __init__(self, basis=None):
self.basis = basis
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
if self.basis == 'fourier':
T = 40
f = lambda k: np.c_[np.sin(k*((2*np.pi)/T)*X), np.cos(k*((2*np.pi)/T)*X)]
extended_X = np.array([f(k) for k in range(1, 10)])
extended_X = extended_X.reshape(-1, extended_X.shape[0]*extended_X.shape[-1])
X = np.c_[X, extended_X]
elif self.basis == 'bessel':
T = 1
f = lambda k: np.c_[jv(k, X/T)] # Bessel function of the first kind
#f = lambda k: np.c_[yn(k, X/T)] # Bessel function of the second kind
f = lambda k: np.c_[iv(k, X/T)] # Modified Bessel function of the second kind
#f = lambda k: np.c_[kn(k, X/T)] # Modified Bessel function of the second kind
extended_X = np.array([f(k) for k in range(1, 10)])
extended_X = extended_X.reshape(-1, extended_X.shape[0]*extended_X.shape[-1])
X = np.c_[X, extended_X]
elif self.basis == 'legendre':
T = 1
#f = lambda k: np.c_[lpn(k, X/T)] # Legendre function of the first kind
#f = lambda k: np.c_[lqn(k, X/T)] # Legendre function of the second kind
scaled_X = MaxAbsScaler().fit_transform(X)
f = lambda k: np.c_[lpmv(0, k,scaled_X)] # Associated legendre function
extended_X = np.array([f(k) for k in range(1, 100)])
extended_X = extended_X.reshape(-1, extended_X.shape[0]*extended_X.shape[-1])
X = np.c_[X, extended_X]
else:
X = np.c_[X]
return X
X_train = np.r_[np.linspace(-2, -0.001, 1000), np.linspace(0.001, 2, 1000)]
X_domain = np.r_[np.linspace(-2, -0.001, 1000), np.linspace(0.001, 2, 1000)]
y = np.sin(1/X_train)
model = make_pipeline(Transformer(basis='legendre'), LinearRegression())
model.fit(X_train[:, np.newaxis], y)
plt.plot(X_train, y)
plt.plot(X_domain, model.predict(X_domain[:, np.newaxis]))
Feature selection
- Statistic-based Filtering
- ANOVA Filter
- Model agnostic information-based feature selection
- Sequential feature selection: Forward, Backward, Stepwise
- Classification: accuracy
- Regression: r-squared, AIC, BIC
- Sequential feature selection: Forward, Backward, Stepwise
- Model specific information-based feature selection (Feature importance / Coefficient)
- Recuresive feature elimination: RFE, RFECV(RFE with optimal selected features)
- Select from model (Threshold)
classification(discrete variable; target): single feature selector
# https://scikit-learn.org/stable/modules/model_evaluation.html
#binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
#multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
from sklearn.datasets import make_classification
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
feature_selector = SelectKBest(f_classif, k=20) # after fit(), feature_selector.scores_
feature_selector = SelectPercentile(f_classif, percentile=30) # after fit(), feature_selector.scores_
feature_selector = RFE(estimator=DecisionTreeClassifier(), n_features_to_select=10) # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_
feature_selector = RFECV(estimator=DecisionTreeClassifier()) # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_
feature_selector = SelectFromModel(estimator=DecisionTreeClassifier(), threshold='0.2*mean') # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_, feature_selector.threshold_
feature_selector = SequentialFeatureSelector(estimator=DecisionTreeClassifier(), n_features_to_select=3, scoring='accuracy', direction=["forward", "backward"][0])
selected_transformed_data = feature_selector.fit_transform(X, y)
selected_data = X[:, feature_selector.get_support()]
feature_selector.get_support()
classification(discrete variable; target): combined feature selector
# https://scikit-learn.org/stable/modules/model_evaluation.html
#binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
#multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
from sklearn.datasets import make_classification
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
X, y = make_classification(n_samples=3000, n_features=30, n_classes=2, weights=[0.6, 0.4], flip_y=0)
feature_selector = make_pipeline(
SelectPercentile(f_classif, percentile=30),
RFE(estimator=LogisticRegression(), n_features_to_select=5),
SequentialFeatureSelector(estimator=DecisionTreeClassifier(), n_features_to_select=3, scoring='accuracy', direction=["forward", "backward"][0])
)
selected_transformed_data = feature_selector.fit_transform(X, y)
selected_data = X[:, feature_selector['SelectPercentile'.lower()].get_support()][:, feature_selector['RFE'.lower()].get_support()][:, feature_selector['SequentialFeatureSelector'.lower()].get_support()]
regression(continous variable; target): single feature selector
# https://scikit-learn.org/stable/modules/model_evaluation.html
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_regression
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import LinearRegression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
feature_selector = SelectKBest(f_regression, k=20) # after fit(), feature_selector.scores_
feature_selector = SelectPercentile(f_regression, percentile=30) # after fit(), feature_selector.scores_
feature_selector = RFE(estimator=DecisionTreeRegressor(), n_features_to_select=10) # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_
feature_selector = RFECV(estimator=DecisionTreeRegressor()) # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_
feature_selector = SelectFromModel(estimator=DecisionTreeRegressor(), threshold='0.2*mean') # after fit(), feature_selector.estimator_.feature_importances_, feature_selector.estimator_.coef_, feature_selector.threshold_
feature_selector = SequentialFeatureSelector(estimator=DecisionTreeRegressor(), n_features_to_select=3, scoring='r2', direction=["forward", "backward"][0])
selected_transformed_data = feature_selector.fit_transform(X, y)
selected_data = X[:, feature_selector.get_support()] # feature_selector.support_
regression(continous variable; target): combined feature selector
# https://scikit-learn.org/stable/modules/model_evaluation.html
# regression_scoring = ['r2', 'explained_variance', 'max_error', 'neg_mean_squared_error', 'neg_mean_absolute_error', 'neg_root_mean_squared_error', 'neg_median_absolute_error', 'neg_mean_absolute_percentage_error']
from sklearn.datasets import make_regression
from sklearn.feature_selection import chi2, f_classif, f_regression, r_regression, mutual_info_classif, mutual_info_regression
from sklearn.feature_selection import SelectKBest, SelectPercentile, SelectFromModel, RFE, RFECV, SequentialFeatureSelector
from sklearn.pipeline import make_pipeline, make_union, Pipeline, FeatureUnion
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import LinearRegression
X, y = make_regression(n_samples=3000, n_features=30, n_informative=5, n_targets=1, bias=0.0, effective_rank=None, tail_strength=0.5, noise=0.0, shuffle=True, coef=False, random_state=None)
feature_selector = make_pipeline(
SelectPercentile(f_regression, percentile=30),
RFE(estimator=LinearRegression(), n_features_to_select=5),
SequentialFeatureSelector(estimator=DecisionTreeRegressor(), n_features_to_select=3, scoring='r2', direction=["forward", "backward"][0])
)
selected_transformed_data = feature_selector.fit_transform(X, y)
selected_data = X[:, feature_selector['SelectPercentile'.lower()].get_support()][:, feature_selector['RFE'.lower()].get_support()][:, feature_selector['SequentialFeatureSelector'.lower()].get_support()] # feature_selector.support_
Binary classification
Recursive feature elimination (RFE)
from copy import deepcopy
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier # feature selector estimator: feature importances
from sklearn.linear_model import LogisticRegression # feature selector estimator: coefficient
from sklearn.ensemble import RandomForestClassifier # pipeline estimator: feature importances
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.pipeline import FeatureUnion, make_union, Pipeline, make_pipeline
from sklearn.feature_selection import RFE, RFECV
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, precision_recall_curve, auc
X, y = make_classification(n_samples=3000, n_features=20, n_classes=2, weights=[0.6, 0.4], flip_y=0)
pipelines = dict()
pipelines['scenario_1'] = make_pipeline(
make_union(
PowerTransformer(method='yeo-johnson', standardize=True),
Normalizer(),
PCA(n_components=7),
),
RFE(estimator=DecisionTreeClassifier(), n_features_to_select=10), # feature selector
RandomForestClassifier()
) # total features 47 > selected features 10
pipelines['scenario_2'] = Pipeline([
('featureunion', FeatureUnion([
('powertransformer', PowerTransformer(method='yeo-johnson', standardize=True)),
('normalizer', Normalizer()),
('pca', PCA(n_components=7)),])
),
('rfe', RFE(estimator=DecisionTreeClassifier(), n_features_to_select=10)), # feature selector
('randomforestclassifier', RandomForestClassifier())]
) # total features 47 > selected features 10
pipelines['scenario_3'] = make_pipeline(
PowerTransformer(method='yeo-johnson', standardize=True),
Normalizer(),
RFE(estimator=DecisionTreeClassifier(), n_features_to_select=10), # feature selector
RandomForestClassifier()
) # total features 20 > selected features 10
pipelines['scenario_4'] = make_pipeline(
PowerTransformer(method='yeo-johnson', standardize=True),
Normalizer(),
PCA(n_components=7),
RFE(estimator=DecisionTreeClassifier(), n_features_to_select=5), # feature selector
RandomForestClassifier()
) # total features 7 > selected features 5
#binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
#multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
cv_results = dict()
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
for name, pipeline in deepcopy(pipelines).items():
cv_results[name] = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
pipeline.fit(X, y)
pipelines.update({name:pipeline})
selected_freatures_importance = pipeline['randomforestclassifier'].feature_importances_
# pipeline.named_steps
# pipeline['rfe'].ranking_ # pipeline['rfe'].n_features_in_ : total feature 'combinations'
# pipeline['rfe'].support_ # pipeline['rfe'].n_features_ : selected feature 'combinations'
# pipeline['rfe'].get_feature_names_out() # selected_freatures
# visualization
cv_results = pd.DataFrame(cv_results)
cv_1st_normalization = list()
for name in cv_results.keys():
cv_result = cv_results[name].apply(lambda x: pd.Series(x)).stack(0).to_frame().reset_index().rename(columns={0:'score', 'level_0':'score_name', 'level_1':'fold'})
cv_result['scenario'] = name
cv_1st_normalization.append(cv_result)
cv_result = pd.concat(cv_1st_normalization, axis=0).reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'target' if x.startswith('train') or x.startswith('test') else 'nontarget')
cv_result = cv_result.loc[lambda x: x['domain'] == 'target'].copy().reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'train' if x.startswith('train') else 'test')
cv_result['scoring_type'] = cv_result['score_name'].apply(lambda x: '_'.join(x.split('_')[1:]))
sns.set_theme(style="ticks") # white, dark, whitegrid, darkgrid, ticks
g = sns.FacetGrid(cv_result, col="scoring_type", row="domain", aspect=2, height=3)
# sns.scatterplot, sns.lineplot
# sns.boxplot(outlier), sns.violinplot(variance), sns.boxenplot(variance), sns.stripplot(distribution), sns.pointplot(mean, variance), sns.barplot(mean)
g.map_dataframe(sns.boxplot, x="scenario", y="score", hue='fold') # x: numerical variable, y: numerical variable, hue: categorical variable
g.fig.suptitle('Scenario Evaluation')
g.add_legend()
g.tight_layout()
Sequential feature selection(SFS)
from copy import deepcopy
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier # feature selector estimator: feature importances
from sklearn.linear_model import LogisticRegression # feature selector estimator: coefficient
from sklearn.ensemble import RandomForestClassifier # pipeline estimator: feature importances
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.pipeline import FeatureUnion, make_union, Pipeline, make_pipeline
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, precision_recall_curve, auc
X, y = make_classification(n_samples=3000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
pipelines = dict()
pipelines['scenario_1'] = make_pipeline(SequentialFeatureSelector(estimator=LogisticRegression(), direction='forward', n_features_to_select=3), LogisticRegression())
pipelines['scenario_2'] = make_pipeline(SequentialFeatureSelector(estimator=LogisticRegression(), direction='forward', n_features_to_select='auto'), LogisticRegression())
pipelines['scenario_3'] = make_pipeline(SequentialFeatureSelector(estimator=LogisticRegression(), direction='backward', n_features_to_select=9), LogisticRegression())
#binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
#multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
cv_results = dict()
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
for name, pipeline in deepcopy(pipelines).items():
cv_results[name] = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
pipeline.fit(X, y)
pipelines.update({name:pipeline})
coefficient = pipeline['logisticregression'].coef_
# pipeline.named_steps
# pipeline['rfe'].ranking_ # pipeline['rfe'].n_features_in_ : total feature 'combinations'
# pipeline['rfe'].support_ # pipeline['rfe'].n_features_ : selected feature 'combinations'
# pipeline['rfe'].get_feature_names_out() # selected_freatures
# visualization
cv_results = pd.DataFrame(cv_results)
cv_1st_normalization = list()
for name in cv_results.keys():
cv_result = cv_results[name].apply(lambda x: pd.Series(x)).stack(0).to_frame().reset_index().rename(columns={0:'score', 'level_0':'score_name', 'level_1':'fold'})
cv_result['scenario'] = name
cv_1st_normalization.append(cv_result)
cv_result = pd.concat(cv_1st_normalization, axis=0).reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'target' if x.startswith('train') or x.startswith('test') else 'nontarget')
cv_result = cv_result.loc[lambda x: x['domain'] == 'target'].copy().reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'train' if x.startswith('train') else 'test')
cv_result['scoring_type'] = cv_result['score_name'].apply(lambda x: '_'.join(x.split('_')[1:]))
sns.set_theme(style="ticks") # white, dark, whitegrid, darkgrid, ticks
g = sns.FacetGrid(cv_result, col="scoring_type", row="domain", aspect=2, height=3)
# sns.scatterplot, sns.lineplot
# sns.boxplot(outlier), sns.violinplot(variance), sns.boxenplot(variance), sns.stripplot(distribution), sns.pointplot(mean, variance), sns.barplot(mean)
g.map_dataframe(sns.boxplot, x="scenario", y="score", hue='fold') # x: numerical variable, y: numerical variable, hue: categorical variable
g.fig.suptitle('Scenario Evaluation')
g.add_legend()
g.tight_layout()
Select from model
from copy import deepcopy
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier # feature selector estimator: feature importances
from sklearn.linear_model import LogisticRegression # feature selector estimator: coefficient
from sklearn.ensemble import RandomForestClassifier # pipeline estimator: feature importances
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, Normalizer, RobustScaler, MinMaxScaler, MaxAbsScaler
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.pipeline import FeatureUnion, make_union, Pipeline, make_pipeline
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import cross_val_score, cross_validate, GridSearchCV, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, precision_recall_curve, auc
X, y = make_classification(n_samples=3000, n_features=10, n_classes=2, weights=[0.6, 0.4], flip_y=0)
pipelines = dict()
pipelines['scenario_1'] = make_pipeline(SelectFromModel(estimator=LogisticRegression(), threshold='0.1*mean', max_features=5), RandomForestClassifier())
pipelines['scenario_2'] = make_pipeline(SelectFromModel(estimator=LogisticRegression(), threshold='0.2*mean', max_features=5), RandomForestClassifier())
pipelines['scenario_3'] = make_pipeline(SelectFromModel(estimator=LogisticRegression(), threshold='0.3*mean', max_features=5), RandomForestClassifier())
#binary_class_scoring = ['accuracy', 'balanced_accuracy', 'recall', 'average_precision', 'precision', 'f1', 'jaccard', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo']
#multi_class_scoring = ['accuracy', 'balanced_accuracy', 'recall_micro', 'recall_macro', 'recall_weighted', 'precision_micro', 'precision_macro', 'precision_weighted', 'f1_micro', 'f1_macro', 'f1_weighted', 'jaccard_micro', 'jaccard_macro', 'jaccard_weighted', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted']
cv_results = dict()
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=None) # StratifiedKFold(n_splits=5, shuffle=False, random_state=None) # cross validation & randomness control
for name, pipeline in deepcopy(pipelines).items():
cv_results[name] = cross_validate(pipeline, X, y, scoring=['accuracy', 'recall', 'precision', 'f1'], cv=cv, return_train_score=True, fit_params=None, return_estimator=False, n_jobs=-1, verbose=0)
pipeline.fit(X, y)
pipelines.update({name:pipeline})
# pipeline.named_steps
# pipeline['rfe'].ranking_ # pipeline['rfe'].n_features_in_ : total feature 'combinations'
# pipeline['rfe'].support_ # pipeline['rfe'].n_features_ : selected feature 'combinations'
# pipeline['rfe'].get_feature_names_out() # selected_freatures
# visualization
cv_results = pd.DataFrame(cv_results)
cv_1st_normalization = list()
for name in cv_results.keys():
cv_result = cv_results[name].apply(lambda x: pd.Series(x)).stack(0).to_frame().reset_index().rename(columns={0:'score', 'level_0':'score_name', 'level_1':'fold'})
cv_result['scenario'] = name
cv_1st_normalization.append(cv_result)
cv_result = pd.concat(cv_1st_normalization, axis=0).reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'target' if x.startswith('train') or x.startswith('test') else 'nontarget')
cv_result = cv_result.loc[lambda x: x['domain'] == 'target'].copy().reset_index(drop=True)
cv_result['domain'] = cv_result['score_name'].apply(lambda x: 'train' if x.startswith('train') else 'test')
cv_result['scoring_type'] = cv_result['score_name'].apply(lambda x: '_'.join(x.split('_')[1:]))
sns.set_theme(style="ticks") # white, dark, whitegrid, darkgrid, ticks
g = sns.FacetGrid(cv_result, col="scoring_type", row="domain", aspect=2, height=3)
# sns.scatterplot, sns.lineplot
# sns.boxplot(outlier), sns.violinplot(variance), sns.boxenplot(variance), sns.stripplot(distribution), sns.pointplot(mean, variance), sns.barplot(mean)
g.map_dataframe(sns.boxplot, x="scenario", y="score", hue='fold') # x: numerical variable, y: numerical variable, hue: categorical variable
g.fig.suptitle('Scenario Evaluation')
g.add_legend()
g.tight_layout()
Multi-class classification
#
Regression
#
Reference
Scikit-learn preprocessing API
from sklearn import preprocessing
#preprocessing.add_dummy_feature(X)
#preprocessing.binarize(X)
#preprocessing.label_binarize(y)
#preprocessing.maxabs_scale(X)
#preprocessing.minmax_scale(X)
#preprocessing.normalize(X)
#preprocessing.quantile_transform(X)
#preprocessing.robust_scale(X)
#preprocessing.scale(X)
#preprocessing.power_transform(X)
preprocessing.Binarizer()
preprocessing.FunctionTransformer()
preprocessing.KBinsDiscretizer()
preprocessing.KernelCenterer()
preprocessing.LabelBinarizer()
preprocessing.LabelEncoder()
preprocessing.MultiLabelBinarizer()
preprocessing.MaxAbsScaler()
preprocessing.MinMaxScaler()
preprocessing.Normalizer()
preprocessing.OneHotEncoder()
preprocessing.OrdinalEncoder()
preprocessing.PolynomialFeatures()
preprocessing.PowerTransformer()
preprocessing.QuantileTransformer()
preprocessing.RobustScaler()
preprocessing.StandardScaler()
from sklearn import model_selection
#model_selection.train_test_split(X, y)
#model_selection.LeavePGroupsOut(n_groups)
#model_selection.LeavePOut(p)
#model_selection.PredefinedSplit(test_fold)
model_selection.check_cv()
model_selection.GroupKFold()
model_selection.GroupShuffleSplit()
model_selection.KFold()
model_selection.LeaveOneGroupOut()
model_selection.LeaveOneOut()
model_selection.RepeatedKFold()
model_selection.RepeatedStratifiedKFold()
model_selection.ShuffleSplit()
model_selection.StratifiedKFold()
model_selection.StratifiedShuffleSplit()
model_selection.TimeSeriesSplit()
from sklearn import manifold
#manifold.locally_linear_embedding(X)
#manifold.smacof(dissimilarities)
#manifold.spectral_embedding(adjacency)
#manifold.trustworthiness(X, X_embedded)
manifold.Isomap()
manifold.LocallyLinearEmbedding()
manifold.MDS()
manifold.SpectralEmbedding()
manifold.TSNE()
from sklearn import mixture
mixture.BayesianGaussianMixture()
mixture.GaussianMixture()
from sklearn import gaussian_process
#gaussian_process.kernels.CompoundKernel(kernels)
#gaussian_process.kernels.Exponentiation(kernel, exponent)
#gaussian_process.kernels.Hyperparameter(name, value_type, bounds)
#gaussian_process.kernels.Kernel()
#gaussian_process.kernels.Product(k1, k2)
#gaussian_process.kernels.Sum(k1, k2)
gaussian_process.kernels.ConstantKernel()
gaussian_process.kernels.DotProduct()
gaussian_process.kernels.ExpSineSquared()
gaussian_process.kernels.Matern()
gaussian_process.kernels.PairwiseKernel()
gaussian_process.kernels.RBF()
gaussian_process.kernels.RationalQuadratic()
gaussian_process.kernels.WhiteKernel()
from sklearn import feature_selection
#feature_selection.SelectFromModel(estimator)
#feature_selection.SequentialFeatureSelector(estimator)
#feature_selection.RFE(estimator)
#feature_selection.RFECV(estimator)
#feature_selection.chi2(X, y)
#feature_selection.f_classif(X, y)
#feature_selection.f_regression(X, y)
#feature_selection.r_regression(X, y)
#feature_selection.mutual_info_classif(X, y, *)
#feature_selection.mutual_info_regression(X, y, *)
feature_selection.GenericUnivariateSelect()
feature_selection.SelectPercentile()
feature_selection.SelectKBest()
feature_selection.SelectFpr()
feature_selection.SelectFdr()
feature_selection.SelectFwe()
feature_selection.VarianceThreshold()
from sklearn import feature_extraction
#feature_extraction.image.extract_patches_2d(image, patch_size)
#feature_extraction.image.grid_to_graph(n_x, n_y)
#feature_extraction.image.img_to_graph(img)
#feature_extraction.image.reconstruct_from_patches_2d(patches, image_size)
feature_extraction.DictVectorizer()
feature_extraction.FeatureHasher()
feature_extraction.image.PatchExtractor()
feature_extraction.text.CountVectorizer()
feature_extraction.text.HashingVectorizer()
feature_extraction.text.TfidfTransformer()
feature_extraction.text.TfidfVectorizer()
from sklearn import decomposition
#decomposition.SparseCoder(dictionary)
#decomposition.dict_learning(X, n_components)
#decomposition.dict_learning_online(X)
#decomposition.fastica(X)
#decomposition.non_negative_factorization(X)
#decomposition.sparse_encode(X, dictionary)
decomposition.DictionaryLearning()
decomposition.FactorAnalysis()
decomposition.FastICA()
decomposition.IncrementalPCA()
decomposition.KernelPCA()
decomposition.LatentDirichletAllocation()
decomposition.MiniBatchDictionaryLearning()
decomposition.MiniBatchSparsePCA()
decomposition.NMF()
decomposition.PCA()
decomposition.SparsePCA()
decomposition.TruncatedSVD()
from sklearn import cluster
#cluster.affinity_propagation(S)
#cluster.cluster_optics_dbscan(reachability, core_distances, ordering, eps)
#cluster.cluster_optics_xi(reachability, predecessor, ordering, min_samples)
#cluster.compute_optics_graph(X)
#cluster.dbscan(X)
#cluster.estimate_bandwidth(X)
#cluster.k_means(X, n_clusters)
#cluster.kmeans_plusplus(X, n_clusters)
#cluster.mean_shift(X)
#cluster.spectral_clustering(affinity)
#cluster.ward_tree(X)
cluster.AffinityPropagation()
cluster.AgglomerativeClustering()
cluster.Birch()
cluster.DBSCAN()
cluster.FeatureAgglomeration()
cluster.KMeans()
cluster.MiniBatchKMeans()
cluster.MeanShift()
cluster.OPTICS()
cluster.SpectralClustering()
cluster.SpectralBiclustering()
cluster.SpectralCoclustering()
from sklearn import covariance
#covariance.empirical_covariance(X)
#covariance.graphical_lasso(emp_cov, alpha)
#covariance.ledoit_wolf(X)
#covariance.oas(X)
#covariance.shrunk_covariance(emp_cov)
covariance.EmpiricalCovariance()
covariance.EllipticEnvelope()
covariance.GraphicalLasso()
covariance.GraphicalLassoCV()
covariance.LedoitWolf()
covariance.MinCovDet()
covariance.OAS()
covariance.ShrunkCovariance()
from sklearn import compose
#compose.ColumnTransformer(transformers)
#compose.make_column_transformer(*transformers)
compose.TransformedTargetRegressor()
compose.make_column_selector()
'quantitative analysis > data analysis' 카테고리의 다른 글
Analysis Project Integration Management (0) | 2023.05.07 |
---|---|
Performance Monitoring (0) | 2023.05.07 |
Analysis Modeling (0) | 2023.05.07 |
Exploratory Data Analysis (EDA) & Confirmatory Data Analysis (CDA) (2) | 2023.05.07 |
Data Analysis Project (0) | 2023.05.07 |