Module ablation.dataset
data_loader.py About: Load datasets for Ablation
Expand source code
"""
data_loader.py
About: Load datasets for Ablation
"""
from dataclasses import dataclass
from os import path
from typing import List, Union
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
DATA_PATH = path.join(path.dirname(path.abspath(__file__)), "data")
@dataclass
class NumpyDataset:
X_train: np.array
y_train: np.array
X_test: np.array
y_test: np.array
n_classes: int
# Feature names (pre and post encoding, respectively)
feature_names: List[str]
original_feature_names: List[str]
def __post_init__(self):
"""
Populate necessary fields after dataclass initialization. In this
case, generate the aggregation mapping.
"""
self._agg_map = self.calculate_aggregation_map()
def stratified_subsample(self, percentage, random_state=42):
_, self.X_train, _, self.y_train = train_test_split(
self.X_train,
self.y_train,
test_size=int(len(self.y_train) * percentage),
random_state=random_state,
stratify=self.y_train,
)
_, self.X_test, _, self.y_test = train_test_split(
self.X_test,
self.y_test,
test_size=int(len(self.y_test) * percentage),
random_state=random_state,
stratify=self.y_test,
)
def add_random_features(self, n_rand_features=4, random_state=42):
np.random.seed(random_state)
random_train = np.random.normal(0, 1, (len(self.X_train), n_rand_features))
random_test = np.random.normal(0, 1, (len(self.X_test), n_rand_features))
self.X_train = np.concatenate([self.X_train, random_train], -1)
self.X_test = np.concatenate([self.X_test, random_test], -1)
# Calculate the random feature names
random_feature_names = [f"#RANDOM{idx}#" for idx in range(n_rand_features)]
# Add random feature names to list of post-encoded features
self.feature_names += random_feature_names
# Add random feature names to list of original features
self.original_feature_names += random_feature_names
# Re-calculate map since features have been added
self._agg_map = self.calculate_aggregation_map()
def calculate_aggregation_map(self) -> Union[List[List[int]], None]:
"""
Generate a list of indices to map sparse to dense. If the feature sets match
no mapping can be created (except for 1-to-1). Return None and downstream
processes should interpret that as there being no difference in pre and post
encoded features.
"""
if len(self.feature_names) == len(self.original_feature_names):
return None
else:
return [
[
j
for (j, val) in enumerate(self.feature_names)
if ((x == val) or (x == "_".join(val.split("_")[:-1])))
]
for (i, x) in enumerate(self.original_feature_names)
]
@property
def agg_map(self):
return self._agg_map
@property
def random_feat_idx(self):
return np.array(
[idx for (idx, val) in enumerate(self.feature_names) if "#RANDOM" in val]
)
@property
def dense_random_feat_idx(self):
return np.array(
[
idx
for (idx, val) in enumerate(self.original_feature_names)
if "#RANDOM" in val
]
)
def load_data(
dataset: str,
dataset_sample_percentage: float = 1,
n_rand_features: int = 0,
) -> NumpyDataset:
"""Load preprocessed data from the list
Args:
dataset (str): Dataset name to be loaded
dataset_sample_percentage (float): stratified subsample of dataset
n_rand_features (int): number of random features to append for sanity checking explanation
Returns:
NumpyDataset: dataset
"""
options = {
"german": prepare_german_data,
"adult": prepare_adult_data,
"spambase": prepare_spambase_data,
"har": prepare_har_data,
"synthetic": prepare_synthetic_data,
"synthetic_multiclass": prepare_synthetic_multiclass_data,
"synthetic_cat": prepare_synthetic_categorical_data,
}
dataset = options[dataset]()
if dataset_sample_percentage < 1:
dataset.stratified_subsample(dataset_sample_percentage)
if n_rand_features > 0:
dataset.add_random_features(n_rand_features)
return dataset
def prepare_german_data() -> NumpyDataset:
"""Prepare German dataset
Returns:
NumpyDataset: dataset
"""
data = pd.read_csv(path.join(DATA_PATH, "MyGermanData.csv"))
X, y = data.drop("credit.rating", axis=1), data["credit.rating"]
cat_ix = X.select_dtypes(include=["object"]).columns
num_ix = X.select_dtypes(include=["int64"]).columns
X_train, y_train, X_test, y_test = split_dataset(
X, y.values.flatten(), test_perc=0.2
)
encoder = OneHotEncoder()
scaler = StandardScaler()
ct = ColumnTransformer(
[("categoricals", encoder, cat_ix), ("numericals", scaler, num_ix)],
remainder="passthrough",
verbose_feature_names_out=False,
)
X_train = ct.fit_transform(X_train)
X_test = ct.transform(X_test)
return NumpyDataset(
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
n_classes=2,
feature_names=list(ct.get_feature_names_out()),
original_feature_names=cat_ix.tolist() + num_ix.tolist(),
)
def prepare_adult_data() -> NumpyDataset:
"""Prepare Adult dataset
Returns:
NumpyDataset: dataset
"""
column_names = [
"age",
"workclass",
"fnlwgt",
"education",
"education-num",
"marital-status",
"occupation",
"relationship",
"race",
"sex",
"capital-gain",
"capital-loss",
"hours-per-week",
"native-country",
"income-over-50k",
]
training_data = pd.read_csv(
path.join(DATA_PATH, "adult.data"), names=column_names, index_col=False
)
test_data = pd.read_csv(
path.join(DATA_PATH, "adult.test"), names=column_names, index_col=False
)
training_data["income-over-50k"] = training_data["income-over-50k"].map(
{" >50K": 1, " <=50K": 0}
)
test_data["income-over-50k"] = test_data["income-over-50k"].map(
{" >50K": 1, " <=50K": 0}
)
X_train = training_data.drop("income-over-50k", axis=1)
y_train = training_data["income-over-50k"]
X_test = test_data.drop("income-over-50k", axis=1)
y_test = test_data["income-over-50k"]
cat_ix = X_train.select_dtypes(include=["object"]).columns
num_ix = X_train.select_dtypes(include=["int64"]).columns
encoder = OneHotEncoder()
scaler = StandardScaler()
ct = ColumnTransformer(
[("categoricals", encoder, cat_ix), ("numericals", scaler, num_ix)],
remainder="passthrough",
verbose_feature_names_out=False,
)
X_train = ct.fit_transform(X_train)
X_test = ct.transform(X_test)
return NumpyDataset(
X_train=X_train.toarray(),
y_train=y_train.values.flatten(),
X_test=X_test.toarray(),
y_test=y_test.values.flatten(),
n_classes=2,
feature_names=list(ct.get_feature_names_out()),
original_feature_names=cat_ix.tolist() + num_ix.tolist(),
)
def prepare_spambase_data() -> NumpyDataset:
"""Prepare Spambase dataset
Returns:
NumpyDataset: dataset
"""
X = pd.read_csv(path.join(DATA_PATH, "spambase-X.csv"))
y = pd.read_csv(path.join(DATA_PATH, "spambase-y.csv"))
X_train, y_train, X_test, y_test = split_dataset(
X.values, y.values.flatten(), test_perc=0.2
)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return NumpyDataset(
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
n_classes=len(np.unique(y)),
feature_names=list(X.columns),
original_feature_names=list(X.columns),
)
def prepare_har_data() -> NumpyDataset:
"""Prepare HAR dataset
Returns:
NumpyDataset: dataset
"""
# Load Datasets
x_train = pd.read_pickle(path.join(DATA_PATH, "har_x_train.pkl"))
y_train = pd.read_pickle(path.join(DATA_PATH, "har_y_train.pkl"))
x_test = pd.read_pickle(path.join(DATA_PATH, "har_x_test.pkl"))
y_test = pd.read_pickle(path.join(DATA_PATH, "har_y_test.pkl"))
# Load Columns
feature_names = pd.read_table(
path.join(DATA_PATH, "har_features.txt"),
sep="\s+",
header=None,
squeeze=True,
)
feature_names = feature_names[1].values
# Standard Scaler
scaler = StandardScaler().fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)
# # Convert y to numpy array
y_train = np.array(y_train)
y_test = np.array(y_test)
# Change number from 0 to n_classes -1
y_train -= 1
y_test -= 1
return NumpyDataset(
X_train=x_train,
y_train=y_train,
X_test=x_test,
y_test=y_test,
n_classes=len(np.unique(y_train)),
feature_names=list(feature_names),
original_feature_names=list(feature_names),
)
def split_dataset(
X: np.ndarray,
y: np.ndarray,
val_perc: float = 0.0,
test_perc: float = 0.2,
):
assert test_perc > 0, "Must have a test set"
(
X_train,
X_test,
y_train,
y_test,
) = train_test_split(X, y, test_size=test_perc, random_state=42)
if val_perc > 0:
(X_train, X_val, y_train, y_val,) = train_test_split(
X_train,
y_train,
test_size=val_perc / (1 - test_perc),
random_state=42,
)
return X_train, y_train, X_val, y_val, X_test, y_test
return X_train, y_train, X_test, y_test
def prepare_synthetic_data() -> NumpyDataset:
"""Prepare synthetic dataset
Returns:
NumpyDataset: dataset
"""
np.random.seed(42)
d = 20
n = 1000
X = np.random.normal(size=(n, d))
coef = np.random.normal(size=(d, 1))
prob = 1 / (1 + np.exp(-X @ coef))
y = (prob > np.random.rand(n, 1)).astype(int).squeeze()
X_train, y_train, X_test, y_test = split_dataset(X, y, test_perc=0.2)
return NumpyDataset(
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
n_classes=len(np.unique(y)),
feature_names=[f"feat_{i}" for i in range(X_train.shape[1])],
original_feature_names=[f"feat_{i}" for i in range(X_train.shape[1])],
)
def prepare_synthetic_multiclass_data() -> NumpyDataset:
"""Prepare synthetic dataset
Returns:
NumpyDataset: dataset
"""
np.random.seed(42)
d = 20
n = 1000
n_classes = 3
X = np.random.normal(size=(n, d))
coef = np.random.normal(size=(d, n_classes))
prob = np.exp(X @ coef) / np.exp(X @ coef).sum(-1, keepdims=True)
y = np.array([np.random.choice(list(range(n_classes)), p=p) for p in prob])
X_train, y_train, X_test, y_test = split_dataset(X, y, test_perc=0.2)
return NumpyDataset(
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
n_classes=len(np.unique(y)),
feature_names=[f"feat_{i}" for i in range(X_train.shape[1])],
original_feature_names=[f"feat_{i}" for i in range(X_train.shape[1])],
)
def prepare_synthetic_categorical_data() -> NumpyDataset:
"""Prepare synthetic dataset
Returns:
NumpyDataset: dataset
"""
np.random.seed(42)
d = 20
n = 1000
n_cat = 5
n_cont = d - n_cat
levels = 6
X_cont = np.random.normal(size=(n, n_cont))
X_cat = (np.random.rand(n, n_cat) * levels).astype(int)
X = np.hstack([X_cont, X_cat])
coef_cont = np.random.normal(size=(n_cont, 1))
coef_cat = np.random.normal(size=(1, n_cat, levels))
X_cat_mult_coef = (np.eye(levels)[X_cat] * coef_cat).sum(axis=-1)
logit_cat = X_cat_mult_coef.sum(axis=-1, keepdims=True)
logit_cont = X_cont @ coef_cont
logits = logit_cont + logit_cat
prob = 1 / (1 + np.exp(-logits))
y = (prob > np.random.rand(n, 1)).astype(int).squeeze()
X_df = pd.DataFrame(X, columns=[f"feat_{i}" for i in range(d)])
cat_feat = [f"feat_{i}" for i in range(n_cont, d)]
cont_feat = [f"feat_{i}" for i in range(n_cont)]
X_df[cat_feat] = X_df[cat_feat].astype(int)
encoder = OneHotEncoder()
scaler = StandardScaler()
X_train, y_train, X_test, y_test = split_dataset(X_df, y, test_perc=0.2)
ct = ColumnTransformer(
[
("categoricals", encoder, cat_feat),
("numericals", scaler, cont_feat),
],
remainder="passthrough",
verbose_feature_names_out=False,
)
X_train = ct.fit_transform(X_train)
X_test = ct.transform(X_test)
return NumpyDataset(
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
n_classes=len(np.unique(y)),
feature_names=list(ct.get_feature_names_out()),
original_feature_names=list(X_df.columns),
)
Functions
def load_data(dataset: str, dataset_sample_percentage: float = 1, n_rand_features: int = 0) ‑> NumpyDataset
-
Load preprocessed data from the list
Args
dataset
:str
- Dataset name to be loaded
dataset_sample_percentage
:float
- stratified subsample of dataset
n_rand_features
:int
- number of random features to append for sanity checking explanation
Returns
NumpyDataset
- dataset
Expand source code
def load_data( dataset: str, dataset_sample_percentage: float = 1, n_rand_features: int = 0, ) -> NumpyDataset: """Load preprocessed data from the list Args: dataset (str): Dataset name to be loaded dataset_sample_percentage (float): stratified subsample of dataset n_rand_features (int): number of random features to append for sanity checking explanation Returns: NumpyDataset: dataset """ options = { "german": prepare_german_data, "adult": prepare_adult_data, "spambase": prepare_spambase_data, "har": prepare_har_data, "synthetic": prepare_synthetic_data, "synthetic_multiclass": prepare_synthetic_multiclass_data, "synthetic_cat": prepare_synthetic_categorical_data, } dataset = options[dataset]() if dataset_sample_percentage < 1: dataset.stratified_subsample(dataset_sample_percentage) if n_rand_features > 0: dataset.add_random_features(n_rand_features) return dataset
def prepare_adult_data() ‑> NumpyDataset
-
Expand source code
def prepare_adult_data() -> NumpyDataset: """Prepare Adult dataset Returns: NumpyDataset: dataset """ column_names = [ "age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "income-over-50k", ] training_data = pd.read_csv( path.join(DATA_PATH, "adult.data"), names=column_names, index_col=False ) test_data = pd.read_csv( path.join(DATA_PATH, "adult.test"), names=column_names, index_col=False ) training_data["income-over-50k"] = training_data["income-over-50k"].map( {" >50K": 1, " <=50K": 0} ) test_data["income-over-50k"] = test_data["income-over-50k"].map( {" >50K": 1, " <=50K": 0} ) X_train = training_data.drop("income-over-50k", axis=1) y_train = training_data["income-over-50k"] X_test = test_data.drop("income-over-50k", axis=1) y_test = test_data["income-over-50k"] cat_ix = X_train.select_dtypes(include=["object"]).columns num_ix = X_train.select_dtypes(include=["int64"]).columns encoder = OneHotEncoder() scaler = StandardScaler() ct = ColumnTransformer( [("categoricals", encoder, cat_ix), ("numericals", scaler, num_ix)], remainder="passthrough", verbose_feature_names_out=False, ) X_train = ct.fit_transform(X_train) X_test = ct.transform(X_test) return NumpyDataset( X_train=X_train.toarray(), y_train=y_train.values.flatten(), X_test=X_test.toarray(), y_test=y_test.values.flatten(), n_classes=2, feature_names=list(ct.get_feature_names_out()), original_feature_names=cat_ix.tolist() + num_ix.tolist(), )
def prepare_german_data() ‑> NumpyDataset
-
Expand source code
def prepare_german_data() -> NumpyDataset: """Prepare German dataset Returns: NumpyDataset: dataset """ data = pd.read_csv(path.join(DATA_PATH, "MyGermanData.csv")) X, y = data.drop("credit.rating", axis=1), data["credit.rating"] cat_ix = X.select_dtypes(include=["object"]).columns num_ix = X.select_dtypes(include=["int64"]).columns X_train, y_train, X_test, y_test = split_dataset( X, y.values.flatten(), test_perc=0.2 ) encoder = OneHotEncoder() scaler = StandardScaler() ct = ColumnTransformer( [("categoricals", encoder, cat_ix), ("numericals", scaler, num_ix)], remainder="passthrough", verbose_feature_names_out=False, ) X_train = ct.fit_transform(X_train) X_test = ct.transform(X_test) return NumpyDataset( X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, n_classes=2, feature_names=list(ct.get_feature_names_out()), original_feature_names=cat_ix.tolist() + num_ix.tolist(), )
def prepare_har_data() ‑> NumpyDataset
-
Expand source code
def prepare_har_data() -> NumpyDataset: """Prepare HAR dataset Returns: NumpyDataset: dataset """ # Load Datasets x_train = pd.read_pickle(path.join(DATA_PATH, "har_x_train.pkl")) y_train = pd.read_pickle(path.join(DATA_PATH, "har_y_train.pkl")) x_test = pd.read_pickle(path.join(DATA_PATH, "har_x_test.pkl")) y_test = pd.read_pickle(path.join(DATA_PATH, "har_y_test.pkl")) # Load Columns feature_names = pd.read_table( path.join(DATA_PATH, "har_features.txt"), sep="\s+", header=None, squeeze=True, ) feature_names = feature_names[1].values # Standard Scaler scaler = StandardScaler().fit(x_train) x_train = scaler.transform(x_train) x_test = scaler.transform(x_test) # # Convert y to numpy array y_train = np.array(y_train) y_test = np.array(y_test) # Change number from 0 to n_classes -1 y_train -= 1 y_test -= 1 return NumpyDataset( X_train=x_train, y_train=y_train, X_test=x_test, y_test=y_test, n_classes=len(np.unique(y_train)), feature_names=list(feature_names), original_feature_names=list(feature_names), )
def prepare_spambase_data() ‑> NumpyDataset
-
Expand source code
def prepare_spambase_data() -> NumpyDataset: """Prepare Spambase dataset Returns: NumpyDataset: dataset """ X = pd.read_csv(path.join(DATA_PATH, "spambase-X.csv")) y = pd.read_csv(path.join(DATA_PATH, "spambase-y.csv")) X_train, y_train, X_test, y_test = split_dataset( X.values, y.values.flatten(), test_perc=0.2 ) scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) return NumpyDataset( X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, n_classes=len(np.unique(y)), feature_names=list(X.columns), original_feature_names=list(X.columns), )
def prepare_synthetic_categorical_data() ‑> NumpyDataset
-
Expand source code
def prepare_synthetic_categorical_data() -> NumpyDataset: """Prepare synthetic dataset Returns: NumpyDataset: dataset """ np.random.seed(42) d = 20 n = 1000 n_cat = 5 n_cont = d - n_cat levels = 6 X_cont = np.random.normal(size=(n, n_cont)) X_cat = (np.random.rand(n, n_cat) * levels).astype(int) X = np.hstack([X_cont, X_cat]) coef_cont = np.random.normal(size=(n_cont, 1)) coef_cat = np.random.normal(size=(1, n_cat, levels)) X_cat_mult_coef = (np.eye(levels)[X_cat] * coef_cat).sum(axis=-1) logit_cat = X_cat_mult_coef.sum(axis=-1, keepdims=True) logit_cont = X_cont @ coef_cont logits = logit_cont + logit_cat prob = 1 / (1 + np.exp(-logits)) y = (prob > np.random.rand(n, 1)).astype(int).squeeze() X_df = pd.DataFrame(X, columns=[f"feat_{i}" for i in range(d)]) cat_feat = [f"feat_{i}" for i in range(n_cont, d)] cont_feat = [f"feat_{i}" for i in range(n_cont)] X_df[cat_feat] = X_df[cat_feat].astype(int) encoder = OneHotEncoder() scaler = StandardScaler() X_train, y_train, X_test, y_test = split_dataset(X_df, y, test_perc=0.2) ct = ColumnTransformer( [ ("categoricals", encoder, cat_feat), ("numericals", scaler, cont_feat), ], remainder="passthrough", verbose_feature_names_out=False, ) X_train = ct.fit_transform(X_train) X_test = ct.transform(X_test) return NumpyDataset( X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, n_classes=len(np.unique(y)), feature_names=list(ct.get_feature_names_out()), original_feature_names=list(X_df.columns), )
def prepare_synthetic_data() ‑> NumpyDataset
-
Expand source code
def prepare_synthetic_data() -> NumpyDataset: """Prepare synthetic dataset Returns: NumpyDataset: dataset """ np.random.seed(42) d = 20 n = 1000 X = np.random.normal(size=(n, d)) coef = np.random.normal(size=(d, 1)) prob = 1 / (1 + np.exp(-X @ coef)) y = (prob > np.random.rand(n, 1)).astype(int).squeeze() X_train, y_train, X_test, y_test = split_dataset(X, y, test_perc=0.2) return NumpyDataset( X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, n_classes=len(np.unique(y)), feature_names=[f"feat_{i}" for i in range(X_train.shape[1])], original_feature_names=[f"feat_{i}" for i in range(X_train.shape[1])], )
def prepare_synthetic_multiclass_data() ‑> NumpyDataset
-
Expand source code
def prepare_synthetic_multiclass_data() -> NumpyDataset: """Prepare synthetic dataset Returns: NumpyDataset: dataset """ np.random.seed(42) d = 20 n = 1000 n_classes = 3 X = np.random.normal(size=(n, d)) coef = np.random.normal(size=(d, n_classes)) prob = np.exp(X @ coef) / np.exp(X @ coef).sum(-1, keepdims=True) y = np.array([np.random.choice(list(range(n_classes)), p=p) for p in prob]) X_train, y_train, X_test, y_test = split_dataset(X, y, test_perc=0.2) return NumpyDataset( X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, n_classes=len(np.unique(y)), feature_names=[f"feat_{i}" for i in range(X_train.shape[1])], original_feature_names=[f"feat_{i}" for i in range(X_train.shape[1])], )
def split_dataset(X: numpy.ndarray, y: numpy.ndarray, val_perc: float = 0.0, test_perc: float = 0.2)
-
Expand source code
def split_dataset( X: np.ndarray, y: np.ndarray, val_perc: float = 0.0, test_perc: float = 0.2, ): assert test_perc > 0, "Must have a test set" ( X_train, X_test, y_train, y_test, ) = train_test_split(X, y, test_size=test_perc, random_state=42) if val_perc > 0: (X_train, X_val, y_train, y_val,) = train_test_split( X_train, y_train, test_size=val_perc / (1 - test_perc), random_state=42, ) return X_train, y_train, X_val, y_val, X_test, y_test return X_train, y_train, X_test, y_test
Classes
class NumpyDataset (X_train:
, y_train: , X_test: , y_test: , n_classes: int, feature_names: List[str], original_feature_names: List[str]) -
NumpyDataset(X_train:
, y_train: , X_test: , y_test: , n_classes: int, feature_names: List[str], original_feature_names: List[str]) Expand source code
@dataclass class NumpyDataset: X_train: np.array y_train: np.array X_test: np.array y_test: np.array n_classes: int # Feature names (pre and post encoding, respectively) feature_names: List[str] original_feature_names: List[str] def __post_init__(self): """ Populate necessary fields after dataclass initialization. In this case, generate the aggregation mapping. """ self._agg_map = self.calculate_aggregation_map() def stratified_subsample(self, percentage, random_state=42): _, self.X_train, _, self.y_train = train_test_split( self.X_train, self.y_train, test_size=int(len(self.y_train) * percentage), random_state=random_state, stratify=self.y_train, ) _, self.X_test, _, self.y_test = train_test_split( self.X_test, self.y_test, test_size=int(len(self.y_test) * percentage), random_state=random_state, stratify=self.y_test, ) def add_random_features(self, n_rand_features=4, random_state=42): np.random.seed(random_state) random_train = np.random.normal(0, 1, (len(self.X_train), n_rand_features)) random_test = np.random.normal(0, 1, (len(self.X_test), n_rand_features)) self.X_train = np.concatenate([self.X_train, random_train], -1) self.X_test = np.concatenate([self.X_test, random_test], -1) # Calculate the random feature names random_feature_names = [f"#RANDOM{idx}#" for idx in range(n_rand_features)] # Add random feature names to list of post-encoded features self.feature_names += random_feature_names # Add random feature names to list of original features self.original_feature_names += random_feature_names # Re-calculate map since features have been added self._agg_map = self.calculate_aggregation_map() def calculate_aggregation_map(self) -> Union[List[List[int]], None]: """ Generate a list of indices to map sparse to dense. If the feature sets match no mapping can be created (except for 1-to-1). Return None and downstream processes should interpret that as there being no difference in pre and post encoded features. """ if len(self.feature_names) == len(self.original_feature_names): return None else: return [ [ j for (j, val) in enumerate(self.feature_names) if ((x == val) or (x == "_".join(val.split("_")[:-1]))) ] for (i, x) in enumerate(self.original_feature_names) ] @property def agg_map(self): return self._agg_map @property def random_feat_idx(self): return np.array( [idx for (idx, val) in enumerate(self.feature_names) if "#RANDOM" in val] ) @property def dense_random_feat_idx(self): return np.array( [ idx for (idx, val) in enumerate(self.original_feature_names) if "#RANDOM" in val ] )
Class variables
var X_test :
var X_train :
var feature_names : List[str]
var n_classes : int
var original_feature_names : List[str]
var y_test :
var y_train :
Instance variables
var agg_map
-
Expand source code
@property def agg_map(self): return self._agg_map
var dense_random_feat_idx
-
Expand source code
@property def dense_random_feat_idx(self): return np.array( [ idx for (idx, val) in enumerate(self.original_feature_names) if "#RANDOM" in val ] )
var random_feat_idx
-
Expand source code
@property def random_feat_idx(self): return np.array( [idx for (idx, val) in enumerate(self.feature_names) if "#RANDOM" in val] )
Methods
def add_random_features(self, n_rand_features=4, random_state=42)
-
Expand source code
def add_random_features(self, n_rand_features=4, random_state=42): np.random.seed(random_state) random_train = np.random.normal(0, 1, (len(self.X_train), n_rand_features)) random_test = np.random.normal(0, 1, (len(self.X_test), n_rand_features)) self.X_train = np.concatenate([self.X_train, random_train], -1) self.X_test = np.concatenate([self.X_test, random_test], -1) # Calculate the random feature names random_feature_names = [f"#RANDOM{idx}#" for idx in range(n_rand_features)] # Add random feature names to list of post-encoded features self.feature_names += random_feature_names # Add random feature names to list of original features self.original_feature_names += random_feature_names # Re-calculate map since features have been added self._agg_map = self.calculate_aggregation_map()
def calculate_aggregation_map(self) ‑> Optional[List[List[int]]]
-
Generate a list of indices to map sparse to dense. If the feature sets match no mapping can be created (except for 1-to-1). Return None and downstream processes should interpret that as there being no difference in pre and post encoded features.
Expand source code
def calculate_aggregation_map(self) -> Union[List[List[int]], None]: """ Generate a list of indices to map sparse to dense. If the feature sets match no mapping can be created (except for 1-to-1). Return None and downstream processes should interpret that as there being no difference in pre and post encoded features. """ if len(self.feature_names) == len(self.original_feature_names): return None else: return [ [ j for (j, val) in enumerate(self.feature_names) if ((x == val) or (x == "_".join(val.split("_")[:-1]))) ] for (i, x) in enumerate(self.original_feature_names) ]
def stratified_subsample(self, percentage, random_state=42)
-
Expand source code
def stratified_subsample(self, percentage, random_state=42): _, self.X_train, _, self.y_train = train_test_split( self.X_train, self.y_train, test_size=int(len(self.y_train) * percentage), random_state=random_state, stratify=self.y_train, ) _, self.X_test, _, self.y_test = train_test_split( self.X_test, self.y_test, test_size=int(len(self.y_test) * percentage), random_state=random_state, stratify=self.y_test, )