Skip to content

Commit

Permalink
Merge pull request #10 from aditya0by0/feature_drift_PSI
Browse files Browse the repository at this point in the history
Feature drift for categorical variables using PSI
  • Loading branch information
aditya0by0 authored Aug 3, 2024
2 parents 3b3c751 + 77b2bf9 commit 6ccf351
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 74 deletions.
2 changes: 1 addition & 1 deletion stream_viz/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# This library enables interactive plots in the notebook.
# Enables functionality to hover over data points to see values, zoom in/out, and pan the plot.
import mpld3
# import mpld3
import pandas as pd
from IPython.core.display_functions import display
from ipywidgets import HBox, IntSlider, SelectMultiple, VBox, interactive_output
Expand Down
169 changes: 124 additions & 45 deletions stream_viz/feature_drift/f_drift_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from scipy.stats import ks_2samp

from stream_viz.base import DriftDetector
from stream_viz.data_encoders.cfpdss_data_encoder import CfpdssDataEncoder
from stream_viz.utils.drifts_types import FeatureDriftType, get_fd_drift_type_keys


Expand All @@ -26,27 +27,38 @@ class FeatureDriftDetector(DriftDetector):
Size of the gap between segments when computing gradual drift (default is 50).
p_val_threshold : float, optional
P-value threshold for gradual drift detection (default is 0.0001).
psi_threshold : float, optional
psi threshold for Population Stability Index (default is 0.0001).
"""

def __init__(
self,
features_list: List[str],
data_encoder: CfpdssDataEncoder,
window_size: int = 300,
ks_test_pval: float = 0.001,
gap_size: int = 50,
p_val_threshold: float = 0.0001,
psi_threshold: float = 0.12,
) -> None:
self._data_encoder = data_encoder
self._drift_records: List[Dict[str, str]] = []
self._valid_keys: set[str] = get_fd_drift_type_keys()
self.window_size: int = window_size
self.gap_size: int = gap_size
self._window: Deque[Dict[str, float]] = deque(maxlen=window_size)
self._drift_timepoints: List[int] = []
self._moving_avg: pd.DataFrame = pd.DataFrame(columns=features_list)
self._moving_avg: pd.DataFrame = pd.DataFrame(
columns=self._data_encoder.X_encoded_data.columns
)
self.p_val: float = ks_test_pval
self.p_val_grad: float = p_val_threshold
self._drift_tp_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self._feature_data_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self.psi_threshold: float = psi_threshold
self._drift_tp_df: pd.DataFrame = pd.DataFrame(
columns=self._data_encoder.X_encoded_data.columns
)
self._feature_data_df: pd.DataFrame = pd.DataFrame(
columns=self._data_encoder.X_encoded_data.columns
)

def update(self, x_i: Dict[str, float], y_i: int, tpt: int) -> None:
"""
Expand Down Expand Up @@ -78,9 +90,19 @@ def detect_drift(self, tpt: int) -> None:
"""
window_df = pd.DataFrame(self._window)
for feature in window_df.columns:
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values
)
if feature in self._data_encoder.categorical_column_mapping.values():
drift_detected, drift_type = self._detect_drift_using_psi(
window_df[feature].values
)
elif feature in self._data_encoder.numerical_column_mapping.values():
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values
)
else:
raise ValueError(
f"Feature {feature} not supported (neither in categorical or numerical mapping of the "
f"encoder)."
)
if drift_detected:
self._drift_tp_df.loc[tpt, feature] = drift_type

Expand Down Expand Up @@ -125,6 +147,74 @@ def _detect_drift_using_ks(

return False, None

def _detect_drift_using_psi(
self, window_data: np.ndarray
) -> Tuple[bool, Optional[str]]:
"""
Detect drift using the Population Stability Index (PSI).
Parameters
----------
window_data : np.ndarray
Array of feature values in the current window.
Returns
-------
Tuple[bool, Optional[str]]
A tuple indicating whether drift was detected and the type of drift.
"""
first_half = window_data[: self.window_size // 2]
second_half = window_data[self.window_size // 2 :]

grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)]
grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :]

psi_value = self.calculate_psi(first_half, second_half)
grad_psi_value = self.calculate_psi(grad_first_part, grad_second_part)

if psi_value > self.psi_threshold:
mean_diff = np.mean(second_half) - np.mean(first_half)
if np.abs(mean_diff) > np.std(window_data):
return True, "sudden_drift"
elif mean_diff > 0:
return True, "linear_drift"

if grad_psi_value > self.psi_threshold:
return True, "gradual_drift"

return False, None

def calculate_psi(self, expected, actual, buckets=10):
"""
Calculate the Population Stability Index (PSI) between two distributions.
Parameters
----------
expected : np.ndarray
The expected distribution (first half of the window data).
actual : np.ndarray
The actual distribution (second half of the window data).
buckets : int, optional
Number of buckets to divide the distributions into (default is 10).
Returns
-------
float
The PSI value.
"""
expected_percents = np.histogram(expected, bins=buckets, range=(0, 1))[0] / len(
expected
)
actual_percents = np.histogram(actual, bins=buckets, range=(0, 1))[0] / len(
actual
)
expected_percents = np.where(expected_percents == 0, 0.01, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.01, actual_percents)
psi_values = (actual_percents - expected_percents) * np.log(
actual_percents / expected_percents
)
return np.sum(psi_values)

def plot(self, feature_name: str, window_size: Optional[int] = None) -> None:
"""
Plot the feature values over time, highlighting detected drift points.
Expand All @@ -139,6 +229,18 @@ def plot(self, feature_name: str, window_size: Optional[int] = None) -> None:
if window_size is None:
window_size = self.window_size

if feature_name in self._data_encoder.X_encoded_data.columns:
# if user provides feature name according to encoded data
pass

# if user provides feature name in original dataset
elif feature_name in self._data_encoder.original_categorical_cols:
feature_name = self._data_encoder.categorical_column_mapping[feature_name]
elif feature_name in self._data_encoder.numerical_column_mapping:
feature_name = self._data_encoder.numerical_column_mapping[feature_name]
else:
raise ValueError(f"Feature {feature_name} not recognized")

plt.figure(figsize=(10, 6))

drift_type_temp_label = []
Expand Down Expand Up @@ -180,7 +282,7 @@ def plot(self, feature_name: str, window_size: Optional[int] = None) -> None:
plt.ylabel(f"{feature_name}")
plt.grid(True)
plt.xticks(np.arange(0, len(feature_data), 1000))
plt.legend()
plt.legend(loc="upper right") # Position legend at the top right
plt.show()

@property
Expand Down Expand Up @@ -228,40 +330,17 @@ def drift_records(self, drift_record: FeatureDriftType) -> None:
normal.read_csv_data(_NORMAL_DATA_PATH)
normal.encode_data()

# As the KS test is only for numerical features
X_numerical = normal.X_encoded_data[normal.original_numerical_cols]
# X_categorical = normal.X_encoded_data[normal.original_categorical_cols]
dt_streamer = DataStreamer(
fd_detector_obj=FeatureDriftDetector(X_numerical.columns)
)
dt_streamer.stream_data(X_df=X_numerical, y_df=normal.y_encoded_data)

dt_streamer.fd_detector_obj.plot(feature_name=X_numerical.columns[0])

# ----- Test: Feature Drift Detection for numerical variables on Dummy drift data -----
# features_list = ["n_feature_1", "n_feature_2"]
# drift_detector = FeatureDriftDetector(
# features_list=features_list, window_size=100, ks_test_pval=0.001
# )
#
# # Generate data for 3 distributions for each feature
# random_state = np.random.RandomState(seed=42)
# dist_a_f1 = random_state.normal(0.8, 0.05, 1000)
# dist_b_f1 = random_state.normal(0.4, 0.02, 1000)
# dist_c_f1 = random_state.normal(0.6, 0.1, 1000)
#
# dist_a_f2 = random_state.normal(0.3, 0.04, 1000)
# dist_b_f2 = random_state.normal(0.7, 0.03, 1000)
# dist_c_f2 = random_state.normal(0.5, 0.05, 1000)
#
# # Concatenate data to simulate a data stream with 2 drifts for each feature
# stream_f1 = np.concatenate((dist_a_f1, dist_b_f1, dist_c_f1))
# stream_f2 = np.concatenate((dist_a_f2, dist_b_f2, dist_c_f2))
#
# # Simulate streaming data update
# for i, (val_f1, val_f2) in enumerate(zip(stream_f1, stream_f2)):
# x_i = {"n_feature_1": val_f1, "n_feature_2": val_f2}
# drift_detector.update(x_i, 1, i)
#
# drift_detector._drift_tp_df.head()
# drift_detector._moving_avg_df.head()
fd_detector = FeatureDriftDetector(data_encoder=normal)

dt_streamer = DataStreamer(fd_detector_obj=fd_detector)
dt_streamer.stream_data(X_df=normal.X_encoded_data, y_df=normal.y_encoded_data)

# Plot feature drift for a numerical features
dt_streamer.fd_detector_obj.plot(feature_name="n0")

# Plot feature drift for a categorical features
dt_streamer.fd_detector_obj.plot(feature_name="c6")

# dt = FeatureDriftDetector(fd_detector_obj=normal)
# dt.plot("n0")
# dt.plot("c5_b")
Loading

0 comments on commit 6ccf351

Please sign in to comment.