Module ablation.ablation

ablation.py About: Load ablation methods for Ablation

Expand source code
"""
ablation.py
About: Load ablation methods for Ablation
"""
from copy import deepcopy
from itertools import chain
from typing import Dict, List

import numpy as np
import pandas as pd

from .dataset import NumpyDataset
from .utils.evaluate import append_dict_lists, eval_model_performance
from .utils.transform import le_to_ohe, ohe_to_le


def local_explanation_importance(
    exp: np.ndarray, relative=False
) -> np.ndarray:
    """Average absolute value of ranked local explanations. For example, the first
        value in the returned array is the average of largest local absolute feature
        importance across all samples.

    Args:
        exp (np.ndarray): local explanations of shape (samples, features)
        relative (bool): return importance relative to sum of all features

    Returns:
        np.ndarray: average importance of ranked local explanations
    """
    abs_exp = np.abs(exp)
    if relative:
        abs_exp = abs_exp / abs_exp.sum(-1, keepdims=True)
    ordered_exp = np.sort(abs_exp, -1)[:, ::-1]
    avg_importance = ordered_exp.mean(0)
    return avg_importance


class Ablation:
    """Class to perform Ablations using feature importance"""

    def __init__(
        self,
        perturbation_distribution: np.ndarray,
        model,
        dataset: NumpyDataset,
        X: np.ndarray,
        y: np.ndarray,
        explanation_values: np.ndarray,
        explanation_values_dense: np.ndarray,
        random_feat_idx: np.ndarray = None,
        local: bool = True,
        scoring_methods: List[str] = ["log_loss", "abs_diff", "auroc"],
        scoring_step: float = 0,
    ):
        """Constructor for ablation

        Args:
            perturbation_distribution (np.ndarray): perturbed dataset as a numpy arr
            model ([type]): a pre-trained model
            X (np.ndarray): original features dataset
            y (np.ndarray): target values
            explanation_values (np.ndarray): explanation values used for feature importance (eg. shap values)
            random_feat_idx (np.ndarray): random feature index
            local (bool): If True, perturbs data based on rank ordered local rather than global explanations.
                          Defaults to True.
            scoring_methods (List[str]): List of scoring methods ('log_loss','abs_diff', 'auroc', 'aupr')
            scoring_step (float): Fraction of features to ablate in-between model evaluations (1, 0]. If 0, will
                                  evaluate at every feature.

        """

        self.perturbation_distribution = perturbation_distribution
        self.model = model
        self.dataset = deepcopy(dataset)
        self.X = deepcopy(X)
        self.y = deepcopy(y)
        self.explanation_values = explanation_values
        self.explanation_values_dense = explanation_values_dense
        self.random_feat_idx = random_feat_idx
        self.local = local

        for s in scoring_methods:
            assert s in [
                "auroc",
                "aupr",
                "log_loss",
                "abs_diff",
            ], f"{s} is not a valid scoring method! "
        self.scoring_methods = scoring_methods

        assert (
            scoring_step < 1 and scoring_step >= 0
        ), "scoring step must be between 0 and 1"

        self.scoring_step = scoring_step

    def _steps(self):
        """Calculate steps at which to evaluate model performance
            1. perturb dataset at atleast every feature
            2. record model evaluation at each score step
            3. provide normalized pct step for each evaluation point

        Returns:
            Tuple[np.ndarray,np.ndarray,np.ndarray]: perturb steps, score steps, percent steps
        """

        # n_features = self.X.shape[1]
        n_features = len(self.dataset.original_feature_names)
        # fraction of features included at each step
        if self.scoring_step == 0:
            # If scoring_step is 0 default to scoring at each feature
            pct_steps = np.arange(0, n_features + 1) / n_features
        else:
            pct_steps = np.arange(0, 1 + self.scoring_step, self.scoring_step)
        # number of features at which to provide evaluation (exclude initial evaluation with all features)
        score_steps = (pct_steps * (n_features - 1)).astype(int)[1:]

        # Number of perturbations at minimum = n_features
        # Additional repeat steps taken if number of scoring steps> n_features
        perturb_steps = (
            score_steps
            if len(score_steps) > n_features
            else np.arange(n_features)
        )

        return perturb_steps, score_steps, pct_steps

    def ablate_features(self, plot=False) -> Dict[str, np.ndarray]:
        """Main function for ablation

        Args:
            plot (bool, optional): Plot ablation. Defaults to False.

        Returns:
            np.ndarray: scores, normalized scores, and percent of feature steps
        """
        perturb_steps, score_steps, pct_steps = self._steps()
        ranked_features = self._sorted_feature_rankings()

        # Convert dataset to LE from OHE
        self.X = ohe_to_le(self.X, self.dataset.agg_map)

        scores = {s: [] for s in self.scoring_methods}
        score_no_perturbation = self._get_model_performance()
        scores = append_dict_lists(scores, score_no_perturbation)

        indx = np.arange(len(self.X))
        # Replace features in order of importance with perturbation, calculate and store new score
        for i in perturb_steps:
            # Replace at feature_indx (i-th highest ) with perturbation distribution
            feature_idx = ranked_features[i]
            self.X[indx, feature_idx] = self.perturbation_distribution[
                indx, feature_idx
            ]
            if i in score_steps:
                score_perturbed = self._get_model_performance()
                scores = append_dict_lists(scores, score_perturbed)

        n_steps = len(pct_steps)
        n_scores = len(self.scoring_methods)

        scores = {k: np.array(s) for k, s in scores.items()}
        score_change = {
            k: np.concatenate([s[1:] - s[:-1], [0]]) for k, s in scores.items()
        }
        exp_importance = np.concatenate(
            [local_explanation_importance(self.explanation_values_dense), [0]]
        )

        results = {
            "pct_steps": list(pct_steps) * n_scores,
            "score_name": sum([[k] * n_steps for k in scores], []),
            "scores": np.concatenate(list(scores.values())),
            "score_change": np.concatenate(list(score_change.values())),
            "exp_importance": list(exp_importance) * n_scores,
        }

        # Restore Data Format
        # NOTE Test if self.X here equals self.dataset.X
        # self.X = le_to_ohe(self.X,self.dataset.agg_map)

        return pd.DataFrame(results)

    def _get_model_performance(self) -> np.ndarray:
        """Return performance for a model's paredicted probabilities

        Returns:
            float: model performance
        """
        return {
            s: eval_model_performance(
                self.model, le_to_ohe(self.X, self.dataset.agg_map), self.y, s
            )
            for s in self.scoring_methods
        }

    def _sorted_feature_rankings(self):
        """Generate ranked feature list, given explanation values

        Returns:
            np.array: Array of feature indices, in order of feature importance
                     If local, will return with shape (features, samples)
        """

        # vals = np.abs(self.explanation_values)
        vals = np.abs(self.explanation_values_dense)
        if not self.local:
            vals = vals.mean(0)

        return np.argsort(-vals).transpose()

    def random_sanity_check_idx(self) -> float:
        """Sanity check for random features

        Global: first index where a random feature appears in ranked global feature importance
        Local: minimum weighted rank among all random features

        Returns:
            float: random feature rank, random feature percent rank
        """

        if self.random_feat_idx is None:
            return self.X.shape[1], 1.0

        ranked_features = self._sorted_feature_rankings()
        if self.local:

            min_rank = len(ranked_features)
            for rand_feat_idx in self.random_feat_idx:
                is_random_feat = ranked_features == rand_feat_idx
                weighted_random_rank = sum(
                    is_random_feat.sum(1)
                    * np.arange(len(is_random_feat))
                    / is_random_feat.sum()
                )
                min_rank = min(min_rank, weighted_random_rank)

            # median_rank = np.where(
            #     is_random_feat.sum(1).cumsum() > is_random_feat.sum() / 2
            # )[0].min()

            return min_rank, min_rank / self.X.shape[1]

        is_random_feat = np.in1d(ranked_features, self.random_feat_idx)
        random_rank = np.where(is_random_feat)[0].min()

        return random_rank, random_rank / self.X.shape[1]

    def random_sanity_check_value(self) -> float:
        """Sanity check for random feature explanation values

        Returns:
            float: Maximum value of most important random feature global explanation
        """

        if self.random_feat_idx is None:
            return 0

        global_importance = np.abs(self.explanation_values).mean(0)
        random_feat_importance = global_importance[self.random_feat_idx].max()

        return random_feat_importance

Functions

def local_explanation_importance(exp: numpy.ndarray, relative=False) ‑> numpy.ndarray

Average absolute value of ranked local explanations. For example, the first value in the returned array is the average of largest local absolute feature importance across all samples.

Args

exp : np.ndarray
local explanations of shape (samples, features)
relative : bool
return importance relative to sum of all features

Returns

np.ndarray
average importance of ranked local explanations
Expand source code
def local_explanation_importance(
    exp: np.ndarray, relative=False
) -> np.ndarray:
    """Average absolute value of ranked local explanations. For example, the first
        value in the returned array is the average of largest local absolute feature
        importance across all samples.

    Args:
        exp (np.ndarray): local explanations of shape (samples, features)
        relative (bool): return importance relative to sum of all features

    Returns:
        np.ndarray: average importance of ranked local explanations
    """
    abs_exp = np.abs(exp)
    if relative:
        abs_exp = abs_exp / abs_exp.sum(-1, keepdims=True)
    ordered_exp = np.sort(abs_exp, -1)[:, ::-1]
    avg_importance = ordered_exp.mean(0)
    return avg_importance

Classes

class Ablation (perturbation_distribution: numpy.ndarray, model, dataset: NumpyDataset, X: numpy.ndarray, y: numpy.ndarray, explanation_values: numpy.ndarray, explanation_values_dense: numpy.ndarray, random_feat_idx: numpy.ndarray = None, local: bool = True, scoring_methods: List[str] = ['log_loss', 'abs_diff', 'auroc'], scoring_step: float = 0)

Class to perform Ablations using feature importance

Constructor for ablation

Args

perturbation_distribution : np.ndarray
perturbed dataset as a numpy arr
model : [type]
a pre-trained model
X : np.ndarray
original features dataset
y : np.ndarray
target values
explanation_values : np.ndarray
explanation values used for feature importance (eg. shap values)
random_feat_idx : np.ndarray
random feature index
local : bool
If True, perturbs data based on rank ordered local rather than global explanations. Defaults to True.
scoring_methods : List[str]
List of scoring methods ('log_loss','abs_diff', 'auroc', 'aupr')
scoring_step : float
Fraction of features to ablate in-between model evaluations (1, 0]. If 0, will evaluate at every feature.
Expand source code
class Ablation:
    """Class to perform Ablations using feature importance"""

    def __init__(
        self,
        perturbation_distribution: np.ndarray,
        model,
        dataset: NumpyDataset,
        X: np.ndarray,
        y: np.ndarray,
        explanation_values: np.ndarray,
        explanation_values_dense: np.ndarray,
        random_feat_idx: np.ndarray = None,
        local: bool = True,
        scoring_methods: List[str] = ["log_loss", "abs_diff", "auroc"],
        scoring_step: float = 0,
    ):
        """Constructor for ablation

        Args:
            perturbation_distribution (np.ndarray): perturbed dataset as a numpy arr
            model ([type]): a pre-trained model
            X (np.ndarray): original features dataset
            y (np.ndarray): target values
            explanation_values (np.ndarray): explanation values used for feature importance (eg. shap values)
            random_feat_idx (np.ndarray): random feature index
            local (bool): If True, perturbs data based on rank ordered local rather than global explanations.
                          Defaults to True.
            scoring_methods (List[str]): List of scoring methods ('log_loss','abs_diff', 'auroc', 'aupr')
            scoring_step (float): Fraction of features to ablate in-between model evaluations (1, 0]. If 0, will
                                  evaluate at every feature.

        """

        self.perturbation_distribution = perturbation_distribution
        self.model = model
        self.dataset = deepcopy(dataset)
        self.X = deepcopy(X)
        self.y = deepcopy(y)
        self.explanation_values = explanation_values
        self.explanation_values_dense = explanation_values_dense
        self.random_feat_idx = random_feat_idx
        self.local = local

        for s in scoring_methods:
            assert s in [
                "auroc",
                "aupr",
                "log_loss",
                "abs_diff",
            ], f"{s} is not a valid scoring method! "
        self.scoring_methods = scoring_methods

        assert (
            scoring_step < 1 and scoring_step >= 0
        ), "scoring step must be between 0 and 1"

        self.scoring_step = scoring_step

    def _steps(self):
        """Calculate steps at which to evaluate model performance
            1. perturb dataset at atleast every feature
            2. record model evaluation at each score step
            3. provide normalized pct step for each evaluation point

        Returns:
            Tuple[np.ndarray,np.ndarray,np.ndarray]: perturb steps, score steps, percent steps
        """

        # n_features = self.X.shape[1]
        n_features = len(self.dataset.original_feature_names)
        # fraction of features included at each step
        if self.scoring_step == 0:
            # If scoring_step is 0 default to scoring at each feature
            pct_steps = np.arange(0, n_features + 1) / n_features
        else:
            pct_steps = np.arange(0, 1 + self.scoring_step, self.scoring_step)
        # number of features at which to provide evaluation (exclude initial evaluation with all features)
        score_steps = (pct_steps * (n_features - 1)).astype(int)[1:]

        # Number of perturbations at minimum = n_features
        # Additional repeat steps taken if number of scoring steps> n_features
        perturb_steps = (
            score_steps
            if len(score_steps) > n_features
            else np.arange(n_features)
        )

        return perturb_steps, score_steps, pct_steps

    def ablate_features(self, plot=False) -> Dict[str, np.ndarray]:
        """Main function for ablation

        Args:
            plot (bool, optional): Plot ablation. Defaults to False.

        Returns:
            np.ndarray: scores, normalized scores, and percent of feature steps
        """
        perturb_steps, score_steps, pct_steps = self._steps()
        ranked_features = self._sorted_feature_rankings()

        # Convert dataset to LE from OHE
        self.X = ohe_to_le(self.X, self.dataset.agg_map)

        scores = {s: [] for s in self.scoring_methods}
        score_no_perturbation = self._get_model_performance()
        scores = append_dict_lists(scores, score_no_perturbation)

        indx = np.arange(len(self.X))
        # Replace features in order of importance with perturbation, calculate and store new score
        for i in perturb_steps:
            # Replace at feature_indx (i-th highest ) with perturbation distribution
            feature_idx = ranked_features[i]
            self.X[indx, feature_idx] = self.perturbation_distribution[
                indx, feature_idx
            ]
            if i in score_steps:
                score_perturbed = self._get_model_performance()
                scores = append_dict_lists(scores, score_perturbed)

        n_steps = len(pct_steps)
        n_scores = len(self.scoring_methods)

        scores = {k: np.array(s) for k, s in scores.items()}
        score_change = {
            k: np.concatenate([s[1:] - s[:-1], [0]]) for k, s in scores.items()
        }
        exp_importance = np.concatenate(
            [local_explanation_importance(self.explanation_values_dense), [0]]
        )

        results = {
            "pct_steps": list(pct_steps) * n_scores,
            "score_name": sum([[k] * n_steps for k in scores], []),
            "scores": np.concatenate(list(scores.values())),
            "score_change": np.concatenate(list(score_change.values())),
            "exp_importance": list(exp_importance) * n_scores,
        }

        # Restore Data Format
        # NOTE Test if self.X here equals self.dataset.X
        # self.X = le_to_ohe(self.X,self.dataset.agg_map)

        return pd.DataFrame(results)

    def _get_model_performance(self) -> np.ndarray:
        """Return performance for a model's paredicted probabilities

        Returns:
            float: model performance
        """
        return {
            s: eval_model_performance(
                self.model, le_to_ohe(self.X, self.dataset.agg_map), self.y, s
            )
            for s in self.scoring_methods
        }

    def _sorted_feature_rankings(self):
        """Generate ranked feature list, given explanation values

        Returns:
            np.array: Array of feature indices, in order of feature importance
                     If local, will return with shape (features, samples)
        """

        # vals = np.abs(self.explanation_values)
        vals = np.abs(self.explanation_values_dense)
        if not self.local:
            vals = vals.mean(0)

        return np.argsort(-vals).transpose()

    def random_sanity_check_idx(self) -> float:
        """Sanity check for random features

        Global: first index where a random feature appears in ranked global feature importance
        Local: minimum weighted rank among all random features

        Returns:
            float: random feature rank, random feature percent rank
        """

        if self.random_feat_idx is None:
            return self.X.shape[1], 1.0

        ranked_features = self._sorted_feature_rankings()
        if self.local:

            min_rank = len(ranked_features)
            for rand_feat_idx in self.random_feat_idx:
                is_random_feat = ranked_features == rand_feat_idx
                weighted_random_rank = sum(
                    is_random_feat.sum(1)
                    * np.arange(len(is_random_feat))
                    / is_random_feat.sum()
                )
                min_rank = min(min_rank, weighted_random_rank)

            # median_rank = np.where(
            #     is_random_feat.sum(1).cumsum() > is_random_feat.sum() / 2
            # )[0].min()

            return min_rank, min_rank / self.X.shape[1]

        is_random_feat = np.in1d(ranked_features, self.random_feat_idx)
        random_rank = np.where(is_random_feat)[0].min()

        return random_rank, random_rank / self.X.shape[1]

    def random_sanity_check_value(self) -> float:
        """Sanity check for random feature explanation values

        Returns:
            float: Maximum value of most important random feature global explanation
        """

        if self.random_feat_idx is None:
            return 0

        global_importance = np.abs(self.explanation_values).mean(0)
        random_feat_importance = global_importance[self.random_feat_idx].max()

        return random_feat_importance

Methods

def ablate_features(self, plot=False) ‑> Dict[str, numpy.ndarray]

Main function for ablation

Args

plot : bool, optional
Plot ablation. Defaults to False.

Returns

np.ndarray
scores, normalized scores, and percent of feature steps
Expand source code
def ablate_features(self, plot=False) -> Dict[str, np.ndarray]:
    """Main function for ablation

    Args:
        plot (bool, optional): Plot ablation. Defaults to False.

    Returns:
        np.ndarray: scores, normalized scores, and percent of feature steps
    """
    perturb_steps, score_steps, pct_steps = self._steps()
    ranked_features = self._sorted_feature_rankings()

    # Convert dataset to LE from OHE
    self.X = ohe_to_le(self.X, self.dataset.agg_map)

    scores = {s: [] for s in self.scoring_methods}
    score_no_perturbation = self._get_model_performance()
    scores = append_dict_lists(scores, score_no_perturbation)

    indx = np.arange(len(self.X))
    # Replace features in order of importance with perturbation, calculate and store new score
    for i in perturb_steps:
        # Replace at feature_indx (i-th highest ) with perturbation distribution
        feature_idx = ranked_features[i]
        self.X[indx, feature_idx] = self.perturbation_distribution[
            indx, feature_idx
        ]
        if i in score_steps:
            score_perturbed = self._get_model_performance()
            scores = append_dict_lists(scores, score_perturbed)

    n_steps = len(pct_steps)
    n_scores = len(self.scoring_methods)

    scores = {k: np.array(s) for k, s in scores.items()}
    score_change = {
        k: np.concatenate([s[1:] - s[:-1], [0]]) for k, s in scores.items()
    }
    exp_importance = np.concatenate(
        [local_explanation_importance(self.explanation_values_dense), [0]]
    )

    results = {
        "pct_steps": list(pct_steps) * n_scores,
        "score_name": sum([[k] * n_steps for k in scores], []),
        "scores": np.concatenate(list(scores.values())),
        "score_change": np.concatenate(list(score_change.values())),
        "exp_importance": list(exp_importance) * n_scores,
    }

    # Restore Data Format
    # NOTE Test if self.X here equals self.dataset.X
    # self.X = le_to_ohe(self.X,self.dataset.agg_map)

    return pd.DataFrame(results)
def random_sanity_check_idx(self) ‑> float

Sanity check for random features

Global: first index where a random feature appears in ranked global feature importance Local: minimum weighted rank among all random features

Returns

float
random feature rank, random feature percent rank
Expand source code
def random_sanity_check_idx(self) -> float:
    """Sanity check for random features

    Global: first index where a random feature appears in ranked global feature importance
    Local: minimum weighted rank among all random features

    Returns:
        float: random feature rank, random feature percent rank
    """

    if self.random_feat_idx is None:
        return self.X.shape[1], 1.0

    ranked_features = self._sorted_feature_rankings()
    if self.local:

        min_rank = len(ranked_features)
        for rand_feat_idx in self.random_feat_idx:
            is_random_feat = ranked_features == rand_feat_idx
            weighted_random_rank = sum(
                is_random_feat.sum(1)
                * np.arange(len(is_random_feat))
                / is_random_feat.sum()
            )
            min_rank = min(min_rank, weighted_random_rank)

        # median_rank = np.where(
        #     is_random_feat.sum(1).cumsum() > is_random_feat.sum() / 2
        # )[0].min()

        return min_rank, min_rank / self.X.shape[1]

    is_random_feat = np.in1d(ranked_features, self.random_feat_idx)
    random_rank = np.where(is_random_feat)[0].min()

    return random_rank, random_rank / self.X.shape[1]
def random_sanity_check_value(self) ‑> float

Sanity check for random feature explanation values

Returns

float
Maximum value of most important random feature global explanation
Expand source code
def random_sanity_check_value(self) -> float:
    """Sanity check for random feature explanation values

    Returns:
        float: Maximum value of most important random feature global explanation
    """

    if self.random_feat_idx is None:
        return 0

    global_importance = np.abs(self.explanation_values).mean(0)
    random_feat_importance = global_importance[self.random_feat_idx].max()

    return random_feat_importance