diff --git a/stream_viz/feature_drift/f_drift_detector.py b/stream_viz/feature_drift/f_drift_detector.py index 643f19c..ad24b36 100644 --- a/stream_viz/feature_drift/f_drift_detector.py +++ b/stream_viz/feature_drift/f_drift_detector.py @@ -11,83 +11,196 @@ class FeatureDriftDetector(DriftDetector): - def __init__(self, features_list: List[str], window_size=100, ks_test_pval=0.001): + """ + Class for detecting feature drift in streaming data using Kolmogorov-Smirnov test for numerical features. + + Parameters + ---------- + features_list : List[str] + List of feature names to monitor for drift. + window_size : int, optional + Size of the window to use for drift detection (default is 300). + ks_test_pval : float, optional + P-value threshold for the Kolmogorov-Smirnov test (default is 0.001). + gap_size : int, optional + Size of the gap between segments when computing gradual drift (default is 50). + p_val_threshold : float, optional + P-value threshold for gradual drift detection (default is 0.0001). + psi_threshold : float, optional + psi threshold for Population Stability Index (default is 0.0001). + """ + + def __init__( + self, + features_list: List[str], + categorical_features: List[str], + window_size: int = 300, + ks_test_pval: float = 0.001, + gap_size: int = 50, + p_val_threshold: float = 0.0001, + psi_threshold: float = 0.12, + ) -> None: self._drift_records: List[Dict[str, str]] = [] self._valid_keys: set[str] = get_fd_drift_type_keys() self.window_size: int = window_size + self.gap_size: int = gap_size self._window: Deque[Dict[str, float]] = deque(maxlen=window_size) self._drift_timepoints: List[int] = [] self._moving_avg: pd.DataFrame = pd.DataFrame(columns=features_list) - self._ks_test_pval: float = ks_test_pval + self.p_val: float = ks_test_pval + self.p_val_grad: float = p_val_threshold + self.psi_threshold: float = psi_threshold self._drift_tp_df: pd.DataFrame = pd.DataFrame(columns=features_list) self._feature_data_df: pd.DataFrame = pd.DataFrame(columns=features_list) + self.categorical_features: List[str] = categorical_features + + def update(self, x_i: Dict[str, float], y_i: int, tpt: int) -> None: + """ + Update the feature drift detector with new data point and detect drift if window is full. - def update(self, x_i: Dict[str, float], y_i: int, tpt: int): + Parameters + ---------- + x_i : Dict[str, float] + Dictionary of feature values at the current time point. + y_i : int + Target value at the current time point. + tpt : int + Current time point. + """ self._window.append(x_i) self._feature_data_df.loc[tpt] = x_i if len(self._window) == self.window_size: self.detect_drift(tpt) - def detect_drift(self, tpt: int): + def detect_drift(self, tpt: int) -> None: + """ + Detect drift in the current window of data. + + Parameters + ---------- + tpt : int + Current time point. + """ window_df = pd.DataFrame(self._window) for feature in window_df.columns: - drift_detected, drift_type = self._detect_drift_using_ks( - window_df[feature].values, self.window_size, self._ks_test_pval - ) + if feature in self.categorical_features: + drift_detected, drift_type = self._detect_drift_using_psi( + window_df[feature].values + ) + else: + drift_detected, drift_type = self._detect_drift_using_ks( + window_df[feature].values + ) if drift_detected: self._drift_tp_df.loc[tpt, feature] = drift_type self._moving_avg.loc[tpt, feature] = window_df[feature].mean() - @staticmethod def _detect_drift_using_ks( - window_data: np.ndarray, win_size: int, p_val: float + self, window_data: np.ndarray ) -> Tuple[bool, Optional[str]]: - first_half = window_data[: win_size // 2] - second_half = window_data[win_size // 2 :] + """ + Detect drift using the Kolmogorov-Smirnov test. + + Parameters + ---------- + window_data : np.ndarray + Array of feature values in the current window. + + Returns + ------- + Tuple[bool, Optional[str]] + A tuple indicating whether drift was detected and the type of drift. + """ + first_half = window_data[: self.window_size // 2] + second_half = window_data[self.window_size // 2 :] + + grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)] + grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :] ks_stat, p_value = ks_2samp(first_half, second_half) - if p_value < p_val: + grad_ks_stat, grad_p_value = ks_2samp(grad_first_part, grad_second_part) + + if p_value < self.p_val: mean_diff = np.mean(second_half) - np.mean(first_half) if np.abs(mean_diff) > np.std(window_data): return True, "sudden_drift" elif mean_diff > 0: return True, "linear_drift" - else: - return True, "gradual_drift" + # else: + # return True, "gradual_drift" + + if grad_p_value < self.p_val_grad: + return True, "gradual_drift" return False, None - def plot(self, feature_name, window_size=None): - if window_size is None: - window_size = self.window_size - feature_data = self._feature_data_df[feature_name] - plt.figure(figsize=(10, 6)) - plt.scatter(feature_data.index, feature_data, marker="o", s=2) + def _detect_drift_using_psi( + self, window_data: np.ndarray + ) -> Tuple[bool, Optional[str]]: + first_half = window_data[: self.window_size // 2] + second_half = window_data[self.window_size // 2 :] - moving_mean = feature_data.rolling(window=window_size).mean() - plt.plot( - feature_data.index, - moving_mean, - color="black", - linestyle="-", - label=f"{feature_name} Moving Mean", + grad_first_part = window_data[: (self.window_size // 2) - (self.gap_size // 2)] + grad_second_part = window_data[(self.window_size // 2) + (self.gap_size // 2) :] + + psi_value = self.calculate_psi(first_half, second_half) + grad_psi_value = self.calculate_psi(grad_first_part, grad_second_part) + + if psi_value > self.psi_threshold: + mean_diff = np.mean(second_half) - np.mean(first_half) + if np.abs(mean_diff) > np.std(window_data): + return True, "sudden_drift" + elif mean_diff > 0: + return True, "linear_drift" + + if grad_psi_value > self.psi_threshold: + return True, "gradual_drift" + + return False, None + + def calculate_psi(self, expected, actual, buckets=10): + expected_percents = np.histogram(expected, bins=buckets, range=(0, 1))[0] / len( + expected + ) + actual_percents = np.histogram(actual, bins=buckets, range=(0, 1))[0] / len( + actual + ) + expected_percents = np.where(expected_percents == 0, 0.01, expected_percents) + actual_percents = np.where(actual_percents == 0, 0.01, actual_percents) + psi_values = (actual_percents - expected_percents) * np.log( + actual_percents / expected_percents ) + return np.sum(psi_values) - # drift_points, drift_types, moving_avg = self.detect_feature_drift( - # feature_data, window_size, 3 - # ) + def plot(self, feature_name: str, window_size: Optional[int] = None) -> None: + """ + Plot the feature values over time, highlighting detected drift points. - # plt.plot(moving_avg.index, moving_avg, color='orange', linestyle='-', label=f'{feature} Moving Mean with trimming') + Parameters + ---------- + feature_name : str + The name of the feature to plot. + window_size : Optional[int], optional + Size of the window for calculating moving average (default is None, uses instance's window_size). + """ + if window_size is None: + window_size = self.window_size + + plt.figure(figsize=(10, 6)) drift_type_temp_label = [] - for idx, drift_type in self._drift_tp_df[feature_name].items(): - color = ( - "red" - if drift_type == "Sudden Drift" - else "orange" if drift_type == "Linear Drift" else "blue" - ) + for idx, drift_type in self._drift_tp_df[feature_name].dropna().items(): + if drift_type == "sudden_drift": + color = "red" + elif drift_type == "linear_drift": + color = "orange" + elif drift_type == "gradual_drift": + color = "blue" + else: + color = "yellow" + plt.axvline( x=idx, color=color, @@ -95,9 +208,22 @@ def plot(self, feature_name, window_size=None): label=( f"{drift_type}" if drift_type not in drift_type_temp_label else "" ), + alpha=0.5, ) drift_type_temp_label.append(drift_type) + feature_data = self._feature_data_df[feature_name] + plt.scatter(feature_data.index, feature_data, marker="o", s=2) + + moving_mean = feature_data.rolling(window=window_size).mean() + plt.plot( + feature_data.index, + moving_mean, + color="black", + linestyle="-", + label=f"{feature_name} Moving Mean", + ) + plt.title(f"{feature_name} vs. Time") plt.xlabel("Time") plt.ylabel(f"{feature_name}") @@ -108,10 +234,31 @@ def plot(self, feature_name, window_size=None): @property def drift_records(self) -> List[FeatureDriftType]: + """ + Property to get drift records. + + Returns + ------- + List[FeatureDriftType] + List of detected drift records. + """ return self._drift_records @drift_records.setter - def drift_records(self, drift_record: FeatureDriftType): + def drift_records(self, drift_record: FeatureDriftType) -> None: + """ + Property setter to add a drift record if valid. + + Parameters + ---------- + drift_record : FeatureDriftType + A dictionary representing a drift record. + + Raises + ------ + ValueError + If the drift record is invalid. + """ if isinstance(drift_record, dict) and self._validate_drift(drift_record): self._drift_records.append(drift_record) else: @@ -124,52 +271,41 @@ def drift_records(self, drift_record: FeatureDriftType): NormalDataEncoder, ) from stream_viz.data_streamer import DataStreamer - from stream_viz.utils.constants import _MISSING_DATA_PATH, _NORMAL_DATA_PATH - - # Cfpdss data encoding with missing values - # missing = MissingDataEncoder() - # missing.read_csv_data( - # filepath_or_buffer=_MISSING_DATA_PATH, - # index_col=[0], - # ) + from stream_viz.utils.constants import _NORMAL_DATA_PATH + normal = NormalDataEncoder() normal.read_csv_data(_NORMAL_DATA_PATH) normal.encode_data() + # Create a mapping of original to encoded column names + encoded_categorical_cols = normal.X_encoded_data.columns[ + normal.X_encoded_data.columns.str.startswith("c") + ] + original_to_encoded_categorical_cols = { + original: encoded + for original, encoded in zip( + normal.original_categorical_cols, encoded_categorical_cols + ) + } + # As the KS test is only for numerical features X_numerical = normal.X_encoded_data[normal.original_numerical_cols] - - dt_streamer = DataStreamer( - fd_detector_obj=FeatureDriftDetector(X_numerical.columns) + X_categorical = normal.X_encoded_data[encoded_categorical_cols] + all_features = X_numerical.columns.tolist() + X_categorical.columns.tolist() + fd_detector = FeatureDriftDetector( + features_list=all_features, + categorical_features=encoded_categorical_cols.tolist(), ) - dt_streamer.stream_data(X_df=X_numerical, y_df=normal.y_encoded_data) + dt_streamer = DataStreamer(fd_detector_obj=fd_detector) + dt_streamer.stream_data(X_df=normal.X_encoded_data, y_df=normal.y_encoded_data) + + # Plot feature drift for a numerical features dt_streamer.fd_detector_obj.plot(feature_name=X_numerical.columns[0]) - # ----- Test: Feature Drift Detection for numerical variables on Dummy drift data ----- - # features_list = ["n_feature_1", "n_feature_2"] - # drift_detector = FeatureDriftDetector( - # features_list=features_list, window_size=100, ks_test_pval=0.001 - # ) - # - # # Generate data for 3 distributions for each feature - # random_state = np.random.RandomState(seed=42) - # dist_a_f1 = random_state.normal(0.8, 0.05, 1000) - # dist_b_f1 = random_state.normal(0.4, 0.02, 1000) - # dist_c_f1 = random_state.normal(0.6, 0.1, 1000) - # - # dist_a_f2 = random_state.normal(0.3, 0.04, 1000) - # dist_b_f2 = random_state.normal(0.7, 0.03, 1000) - # dist_c_f2 = random_state.normal(0.5, 0.05, 1000) - # - # # Concatenate data to simulate a data stream with 2 drifts for each feature - # stream_f1 = np.concatenate((dist_a_f1, dist_b_f1, dist_c_f1)) - # stream_f2 = np.concatenate((dist_a_f2, dist_b_f2, dist_c_f2)) - # - # # Simulate streaming data update - # for i, (val_f1, val_f2) in enumerate(zip(stream_f1, stream_f2)): - # x_i = {"n_feature_1": val_f1, "n_feature_2": val_f2} - # drift_detector.update(x_i, 1, i) - # - # drift_detector._drift_tp_df.head() - # drift_detector._moving_avg_df.head() + # Plot feature drift for a categorical features + dt_streamer.fd_detector_obj.plot(feature_name=X_categorical.columns[0]) + + # dt = FeatureDriftDetector(fd_detector_obj=normal) + # dt.plot("n0") + # dt.plot("c5_b")