Skip to content

Commit

Permalink
Added: PSI
Browse files Browse the repository at this point in the history
  • Loading branch information
shreeya-cy committed Jul 29, 2024
1 parent c20e6be commit 12d4745
Showing 1 changed file with 213 additions and 77 deletions.
290 changes: 213 additions & 77 deletions stream_viz/feature_drift/f_drift_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,93 +11,219 @@


class FeatureDriftDetector(DriftDetector):
def __init__(self, features_list: List[str], window_size=100, ks_test_pval=0.001):
"""
Class for detecting feature drift in streaming data using Kolmogorov-Smirnov test for numerical features.
Parameters
----------
features_list : List[str]
List of feature names to monitor for drift.
window_size : int, optional
Size of the window to use for drift detection (default is 300).
ks_test_pval : float, optional
P-value threshold for the Kolmogorov-Smirnov test (default is 0.001).
gap_size : int, optional
Size of the gap between segments when computing gradual drift (default is 50).
p_val_threshold : float, optional
P-value threshold for gradual drift detection (default is 0.0001).
psi_threshold : float, optional
psi threshold for Population Stability Index (default is 0.0001).
"""

def __init__(
self,
features_list: List[str],
categorical_features: List[str],
window_size: int = 300,
ks_test_pval: float = 0.001,
gap_size: int = 50,
p_val_threshold: float = 0.0001,
psi_threshold: float = 0.12,
) -> None:
self._drift_records: List[Dict[str, str]] = []
self._valid_keys: set[str] = get_fd_drift_type_keys()
self.window_size: int = window_size
self.gap_size: int = gap_size
self._window: Deque[Dict[str, float]] = deque(maxlen=window_size)
self._drift_timepoints: List[int] = []
self._moving_avg: pd.DataFrame = pd.DataFrame(columns=features_list)
self._ks_test_pval: float = ks_test_pval
self.p_val: float = ks_test_pval
self.p_val_grad: float = p_val_threshold
self.psi_threshold: float = psi_threshold
self._drift_tp_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self._feature_data_df: pd.DataFrame = pd.DataFrame(columns=features_list)
self.categorical_features: List[str] = categorical_features

def update(self, x_i: Dict[str, float], y_i: int, tpt: int) -> None:
"""
Update the feature drift detector with new data point and detect drift if window is full.
def update(self, x_i: Dict[str, float], y_i: int, tpt: int):
Parameters
----------
x_i : Dict[str, float]
Dictionary of feature values at the current time point.
y_i : int
Target value at the current time point.
tpt : int
Current time point.
"""
self._window.append(x_i)
self._feature_data_df.loc[tpt] = x_i

if len(self._window) == self.window_size:
self.detect_drift(tpt)

def detect_drift(self, tpt: int):
def detect_drift(self, tpt: int) -> None:
"""
Detect drift in the current window of data.
Parameters
----------
tpt : int
Current time point.
"""
window_df = pd.DataFrame(self._window)
for feature in window_df.columns:
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values, self.window_size, self._ks_test_pval
)
if feature in self.categorical_features:
drift_detected, drift_type = self._detect_drift_using_psi(
window_df[feature].values
)
else:
drift_detected, drift_type = self._detect_drift_using_ks(
window_df[feature].values
)
if drift_detected:
self._drift_tp_df.loc[tpt, feature] = drift_type

self._moving_avg.loc[tpt, feature] = window_df[feature].mean()

@staticmethod
def _detect_drift_using_ks(
window_data: np.ndarray, win_size: int, p_val: float
self, window_data: np.ndarray
) -> Tuple[bool, Optional[str]]:
first_half = window_data[: win_size // 2]
second_half = window_data[win_size // 2 :]
"""
Detect drift using the Kolmogorov-Smirnov test.
Parameters
----------
window_data : np.ndarray
Array of feature values in the current window.
Returns
-------
Tuple[bool, Optional[str]]
A tuple indicating whether drift was detected and the type of drift.
"""
first_half = window_data[: self.window_size // 2]
second_half = window_data[self.window_size // 2 :]

grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)]
grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :]

ks_stat, p_value = ks_2samp(first_half, second_half)
if p_value < p_val:
grad_ks_stat, grad_p_value = ks_2samp(grad_first_part, grad_second_part)

if p_value < self.p_val:
mean_diff = np.mean(second_half) - np.mean(first_half)
if np.abs(mean_diff) > np.std(window_data):
return True, "sudden_drift"
elif mean_diff > 0:
return True, "linear_drift"
else:
return True, "gradual_drift"
# else:
# return True, "gradual_drift"

if grad_p_value < self.p_val_grad:
return True, "gradual_drift"

return False, None

def plot(self, feature_name, window_size=None):
if window_size is None:
window_size = self.window_size
feature_data = self._feature_data_df[feature_name]
plt.figure(figsize=(10, 6))
plt.scatter(feature_data.index, feature_data, marker="o", s=2)
def _detect_drift_using_psi(
self, window_data: np.ndarray
) -> Tuple[bool, Optional[str]]:
first_half = window_data[: self.window_size // 2]
second_half = window_data[self.window_size // 2 :]

moving_mean = feature_data.rolling(window=window_size).mean()
plt.plot(
feature_data.index,
moving_mean,
color="black",
linestyle="-",
label=f"{feature_name} Moving Mean",
grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)]
grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :]

psi_value = self.calculate_psi(first_half, second_half)
grad_psi_value = self.calculate_psi(grad_first_part, grad_second_part)

if psi_value > self.psi_threshold:
mean_diff = np.mean(second_half) - np.mean(first_half)
if np.abs(mean_diff) > np.std(window_data):
return True, "sudden_drift"
elif mean_diff > 0:
return True, "linear_drift"

if grad_psi_value > self.psi_threshold:
return True, "gradual_drift"

return False, None

def calculate_psi(self, expected, actual, buckets=10):
expected_percents = np.histogram(expected, bins=buckets, range=(0, 1))[0] / len(
expected
)
actual_percents = np.histogram(actual, bins=buckets, range=(0, 1))[0] / len(
actual
)
expected_percents = np.where(expected_percents == 0, 0.01, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.01, actual_percents)
psi_values = (actual_percents - expected_percents) * np.log(
actual_percents / expected_percents
)
return np.sum(psi_values)

# drift_points, drift_types, moving_avg = self.detect_feature_drift(
# feature_data, window_size, 3
# )
def plot(self, feature_name: str, window_size: Optional[int] = None) -> None:
"""
Plot the feature values over time, highlighting detected drift points.
# plt.plot(moving_avg.index, moving_avg, color='orange', linestyle='-', label=f'{feature} Moving Mean with trimming')
Parameters
----------
feature_name : str
The name of the feature to plot.
window_size : Optional[int], optional
Size of the window for calculating moving average (default is None, uses instance's window_size).
"""
if window_size is None:
window_size = self.window_size

plt.figure(figsize=(10, 6))

drift_type_temp_label = []
for idx, drift_type in self._drift_tp_df[feature_name].items():
color = (
"red"
if drift_type == "Sudden Drift"
else "orange" if drift_type == "Linear Drift" else "blue"
)
for idx, drift_type in self._drift_tp_df[feature_name].dropna().items():
if drift_type == "sudden_drift":
color = "red"
elif drift_type == "linear_drift":
color = "orange"
elif drift_type == "gradual_drift":
color = "blue"
else:
color = "yellow"

plt.axvline(
x=idx,
color=color,
linestyle="--",
label=(
f"{drift_type}" if drift_type not in drift_type_temp_label else ""
),
alpha=0.5,
)
drift_type_temp_label.append(drift_type)

feature_data = self._feature_data_df[feature_name]
plt.scatter(feature_data.index, feature_data, marker="o", s=2)

moving_mean = feature_data.rolling(window=window_size).mean()
plt.plot(
feature_data.index,
moving_mean,
color="black",
linestyle="-",
label=f"{feature_name} Moving Mean",
)

plt.title(f"{feature_name} vs. Time")
plt.xlabel("Time")
plt.ylabel(f"{feature_name}")
Expand All @@ -108,10 +234,31 @@ def plot(self, feature_name, window_size=None):

@property
def drift_records(self) -> List[FeatureDriftType]:
"""
Property to get drift records.
Returns
-------
List[FeatureDriftType]
List of detected drift records.
"""
return self._drift_records

@drift_records.setter
def drift_records(self, drift_record: FeatureDriftType):
def drift_records(self, drift_record: FeatureDriftType) -> None:
"""
Property setter to add a drift record if valid.
Parameters
----------
drift_record : FeatureDriftType
A dictionary representing a drift record.
Raises
------
ValueError
If the drift record is invalid.
"""
if isinstance(drift_record, dict) and self._validate_drift(drift_record):
self._drift_records.append(drift_record)
else:
Expand All @@ -124,52 +271,41 @@ def drift_records(self, drift_record: FeatureDriftType):
NormalDataEncoder,
)
from stream_viz.data_streamer import DataStreamer
from stream_viz.utils.constants import _MISSING_DATA_PATH, _NORMAL_DATA_PATH

# Cfpdss data encoding with missing values
# missing = MissingDataEncoder()
# missing.read_csv_data(
# filepath_or_buffer=_MISSING_DATA_PATH,
# index_col=[0],
# )
from stream_viz.utils.constants import _NORMAL_DATA_PATH

normal = NormalDataEncoder()
normal.read_csv_data(_NORMAL_DATA_PATH)
normal.encode_data()

# Create a mapping of original to encoded column names
encoded_categorical_cols = normal.X_encoded_data.columns[
normal.X_encoded_data.columns.str.startswith("c")
]
original_to_encoded_categorical_cols = {
original: encoded
for original, encoded in zip(
normal.original_categorical_cols, encoded_categorical_cols
)
}

# As the KS test is only for numerical features
X_numerical = normal.X_encoded_data[normal.original_numerical_cols]

dt_streamer = DataStreamer(
fd_detector_obj=FeatureDriftDetector(X_numerical.columns)
X_categorical = normal.X_encoded_data[encoded_categorical_cols]
all_features = X_numerical.columns.tolist() + X_categorical.columns.tolist()
fd_detector = FeatureDriftDetector(
features_list=all_features,
categorical_features=encoded_categorical_cols.tolist(),
)
dt_streamer.stream_data(X_df=X_numerical, y_df=normal.y_encoded_data)

dt_streamer = DataStreamer(fd_detector_obj=fd_detector)
dt_streamer.stream_data(X_df=normal.X_encoded_data, y_df=normal.y_encoded_data)

# Plot feature drift for a numerical features
dt_streamer.fd_detector_obj.plot(feature_name=X_numerical.columns[0])

# ----- Test: Feature Drift Detection for numerical variables on Dummy drift data -----
# features_list = ["n_feature_1", "n_feature_2"]
# drift_detector = FeatureDriftDetector(
# features_list=features_list, window_size=100, ks_test_pval=0.001
# )
#
# # Generate data for 3 distributions for each feature
# random_state = np.random.RandomState(seed=42)
# dist_a_f1 = random_state.normal(0.8, 0.05, 1000)
# dist_b_f1 = random_state.normal(0.4, 0.02, 1000)
# dist_c_f1 = random_state.normal(0.6, 0.1, 1000)
#
# dist_a_f2 = random_state.normal(0.3, 0.04, 1000)
# dist_b_f2 = random_state.normal(0.7, 0.03, 1000)
# dist_c_f2 = random_state.normal(0.5, 0.05, 1000)
#
# # Concatenate data to simulate a data stream with 2 drifts for each feature
# stream_f1 = np.concatenate((dist_a_f1, dist_b_f1, dist_c_f1))
# stream_f2 = np.concatenate((dist_a_f2, dist_b_f2, dist_c_f2))
#
# # Simulate streaming data update
# for i, (val_f1, val_f2) in enumerate(zip(stream_f1, stream_f2)):
# x_i = {"n_feature_1": val_f1, "n_feature_2": val_f2}
# drift_detector.update(x_i, 1, i)
#
# drift_detector._drift_tp_df.head()
# drift_detector._moving_avg_df.head()
# Plot feature drift for a categorical features
dt_streamer.fd_detector_obj.plot(feature_name=X_categorical.columns[0])

# dt = FeatureDriftDetector(fd_detector_obj=normal)
# dt.plot("n0")
# dt.plot("c5_b")

0 comments on commit 12d4745

Please sign in to comment.