Skip to content

Commit

Permalink
Added: user_guide and PSI
Browse files Browse the repository at this point in the history
  • Loading branch information
shreeya-cy committed Jul 29, 2024
1 parent 3b3c751 commit a52996f
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 309 deletions.
112 changes: 78 additions & 34 deletions stream_viz/feature_drift/f_drift_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ class FeatureDriftDetector(DriftDetector):
Size of the gap between segments when computing gradual drift (default is 50).
p_val_threshold : float, optional
P-value threshold for gradual drift detection (default is 0.0001).
psi_threshold : float, optional
psi threshold for Population Stability Index (default is 0.0001).
"""

def __init__(
self,
features_list: List[str],
categorical_features: List[str],
window_size: int = 300,
ks_test_pval: float = 0.001,
gap_size: int = 50,
p_val_threshold: float = 0.0001,
psi_threshold: float = 0.12,
) -> None:
self._drift_records: List[Dict[str, str]] = []
self._valid_keys: set[str] = get_fd_drift_type_keys()
Expand All @@ -45,8 +49,10 @@ def __init__(
self._moving_avg: pd.DataFrame = pd.DataFrame(columns=features_list)
self.p_val: float = ks_test_pval
self.p_val_grad: float = p_val_threshold
self.psi_threshold: float = psi_threshold
self._drift_tp_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self._feature_data_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self.categorical_features: List[str] = categorical_features

def update(self, x_i: Dict[str, float], y_i: int, tpt: int) -> None:
"""
Expand Down Expand Up @@ -78,9 +84,14 @@ def detect_drift(self, tpt: int) -> None:
"""
window_df = pd.DataFrame(self._window)
for feature in window_df.columns:
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values
)
if feature in self.categorical_features:
drift_detected, drift_type = self._detect_drift_using_psi(
window_df[feature].values
)
else:
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values
)
if drift_detected:
self._drift_tp_df.loc[tpt, feature] = drift_type

Expand Down Expand Up @@ -125,6 +136,44 @@ def _detect_drift_using_ks(

return False, None

def _detect_drift_using_psi(
self, window_data: np.ndarray
) -> Tuple[bool, Optional[str]]:
first_half = window_data[: self.window_size // 2]
second_half = window_data[self.window_size // 2 :]

grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)]
grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :]

psi_value = self.calculate_psi(first_half, second_half)
grad_psi_value = self.calculate_psi(grad_first_part, grad_second_part)

if psi_value > self.psi_threshold:
mean_diff = np.mean(second_half) - np.mean(first_half)
if np.abs(mean_diff) > np.std(window_data):
return True, "sudden_drift"
elif mean_diff > 0:
return True, "linear_drift"

if grad_psi_value > self.psi_threshold:
return True, "gradual_drift"

return False, None

def calculate_psi(self, expected, actual, buckets=10):
expected_percents = np.histogram(expected, bins=buckets, range=(0, 1))[0] / len(
expected
)
actual_percents = np.histogram(actual, bins=buckets, range=(0, 1))[0] / len(
actual
)
expected_percents = np.where(expected_percents == 0, 0.01, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.01, actual_percents)
psi_values = (actual_percents - expected_percents) * np.log(
actual_percents / expected_percents
)
return np.sum(psi_values)

def plot(self, feature_name: str, window_size: Optional[int] = None) -> None:
"""
Plot the feature values over time, highlighting detected drift points.
Expand Down Expand Up @@ -228,40 +277,35 @@ def drift_records(self, drift_record: FeatureDriftType) -> None:
normal.read_csv_data(_NORMAL_DATA_PATH)
normal.encode_data()

# Create a mapping of original to encoded column names
encoded_categorical_cols = normal.X_encoded_data.columns[
normal.X_encoded_data.columns.str.startswith("c")
]
original_to_encoded_categorical_cols = {
original: encoded
for original, encoded in zip(
normal.original_categorical_cols, encoded_categorical_cols
)
}

# As the KS test is only for numerical features
X_numerical = normal.X_encoded_data[normal.original_numerical_cols]
# X_categorical = normal.X_encoded_data[normal.original_categorical_cols]
dt_streamer = DataStreamer(
fd_detector_obj=FeatureDriftDetector(X_numerical.columns)
X_categorical = normal.X_encoded_data[encoded_categorical_cols]
all_features = X_numerical.columns.tolist() + X_categorical.columns.tolist()
fd_detector = FeatureDriftDetector(
features_list=all_features,
categorical_features=encoded_categorical_cols.tolist(),
)
dt_streamer.stream_data(X_df=X_numerical, y_df=normal.y_encoded_data)

dt_streamer = DataStreamer(fd_detector_obj=fd_detector)
dt_streamer.stream_data(X_df=normal.X_encoded_data, y_df=normal.y_encoded_data)

# Plot feature drift for a numerical features
dt_streamer.fd_detector_obj.plot(feature_name=X_numerical.columns[0])

# ----- Test: Feature Drift Detection for numerical variables on Dummy drift data -----
# features_list = ["n_feature_1", "n_feature_2"]
# drift_detector = FeatureDriftDetector(
# features_list=features_list, window_size=100, ks_test_pval=0.001
# )
#
# # Generate data for 3 distributions for each feature
# random_state = np.random.RandomState(seed=42)
# dist_a_f1 = random_state.normal(0.8, 0.05, 1000)
# dist_b_f1 = random_state.normal(0.4, 0.02, 1000)
# dist_c_f1 = random_state.normal(0.6, 0.1, 1000)
#
# dist_a_f2 = random_state.normal(0.3, 0.04, 1000)
# dist_b_f2 = random_state.normal(0.7, 0.03, 1000)
# dist_c_f2 = random_state.normal(0.5, 0.05, 1000)
#
# # Concatenate data to simulate a data stream with 2 drifts for each feature
# stream_f1 = np.concatenate((dist_a_f1, dist_b_f1, dist_c_f1))
# stream_f2 = np.concatenate((dist_a_f2, dist_b_f2, dist_c_f2))
#
# # Simulate streaming data update
# for i, (val_f1, val_f2) in enumerate(zip(stream_f1, stream_f2)):
# x_i = {"n_feature_1": val_f1, "n_feature_2": val_f2}
# drift_detector.update(x_i, 1, i)
#
# drift_detector._drift_tp_df.head()
# drift_detector._moving_avg_df.head()
# Plot feature drift for a categorical features
dt_streamer.fd_detector_obj.plot(feature_name=X_categorical.columns[0])

# dt = FeatureDriftDetector(fd_detector_obj=normal)
# dt.plot("n0")
# dt.plot("c5_b")
Loading

0 comments on commit a52996f

Please sign in to comment.