Skip to content

Commit

Permalink
add experimental paralellisation to esst
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Weber authored and Lucas Weber committed Nov 23, 2023
1 parent bc1d7e8 commit 21856c0
Showing 1 changed file with 51 additions and 9 deletions.
60 changes: 51 additions & 9 deletions changepoynt/algorithms/esst.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import matplotlib.pyplot as plt
import numpy as np
from typing import Callable
from functools import partial
from changepoynt.algorithms.base_algorithm import Algorithm
from changepoynt.utils import normalization
from changepoynt.utils import linalg as lg
import fbpca
import multiprocessing as mp


class ESST(Algorithm):
Expand All @@ -15,7 +17,7 @@ class ESST(Algorithm):
"""

def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5, scoring_step: int = 1,
scale: bool = True) -> None:
scale: bool = True, parallel=False) -> None:
"""
We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.
Expand All @@ -29,6 +31,7 @@ def __init__(self, window_length: int, n_windows: int = None, lag: int = None, r
self.scale = scale
self.lag = lag
self.scoring_step = scoring_step
self.parallel = parallel

# set some default values when they have not been specified
if self.n_windows is None:
Expand Down Expand Up @@ -66,8 +69,12 @@ def transform(self, time_series: np.ndarray) -> np.ndarray:
else:
time_series = time_series.copy()

return _transform(time_series, starting_point, self.window_length, self.n_windows, self.lag, self.scoring_step,
self.methods['rsvd'])
if self.parallel:
return _transform_parallel(time_series, starting_point, self.window_length, self.n_windows, self.lag,
self.scoring_step, self.methods['rsvd'])
else:
return _transform(time_series, starting_point, self.window_length, self.n_windows, self.lag,
self.scoring_step, self.methods['rsvd'])


def _transform(time_series: np.ndarray, start_idx: int, window_length: int, n_windows: int, lag: int, scoring_step: int,
Expand Down Expand Up @@ -104,6 +111,41 @@ def _transform(time_series: np.ndarray, start_idx: int, window_length: int, n_wi
return score


def _transform_parallel(time_series: np.ndarray, start_idx: int, window_length: int, n_windows: int, lag: int,
scoring_step: int, scoring_function: Callable) -> np.ndarray:
"""
Compute heavy and hopefully jit compilable score computation for the SST method. It does not do any parameter
checking and can throw cryptic errors. It's only used for internal use as a private function.
:param time_series: 1D array containing the time series to be scored
:param start_idx: integer defining the start sample index for the score computation
:param window_length: the size of the time series window for each column of the hankel matrix
:param n_windows: amount of columns in the hankel matrix
"""

# initialize a scoring array with no values yet
score = np.zeros_like(time_series)

# compute the offset
offset = (n_windows + lag)

# make the generator for the hankel matrices
gener = (np.concatenate(
(lg.compile_hankel(time_series, idx-lag, window_length, n_windows),
lg.compile_hankel(time_series, idx, window_length, n_windows)
),
axis=1)
for idx in range(start_idx, time_series.shape[0], scoring_step))

# make a process pool with batches
with mp.Pool(mp.cpu_count()) as pp:
for idx, result in enumerate(pp.imap(scoring_function, gener, chunksize=100)):
idx = start_idx + idx*scoring_step
score[idx - offset - scoring_step // 2:idx - offset + (scoring_step + 1) // 2] = result

return score


def skewness(pdf_matrix: np.ndarray, x: np.ndarray) -> np.ndarray:
"""
Compute the skewness of a matrix of probability density functions given as a numpy array.
Expand Down Expand Up @@ -168,19 +210,19 @@ def _main():
# make synthetic step function
np.random.seed(123)
# synthetic (frequency change)
x0 = np.sin(2 * np.pi * 1 * np.linspace(0, 10, 1000))
x1 = np.sin(2 * np.pi * 2 * np.linspace(0, 10, 1000))
x2 = np.sin(2 * np.pi * 8 * np.linspace(0, 10, 1000))
x3 = np.sin(2 * np.pi * 4 * np.linspace(0, 10, 1000))
x0 = np.sin(2 * np.pi * 1 * np.linspace(0, 10, 10000))
x1 = np.sin(2 * np.pi * 2 * np.linspace(0, 10, 10000))
x2 = np.sin(2 * np.pi * 8 * np.linspace(0, 10, 10000))
x3 = np.sin(2 * np.pi * 4 * np.linspace(0, 10, 10000))
x = np.hstack([x0, x1, x2, x3])
x += np.random.rand(x.size)

# create the method
esst_recognizer = ESST(50)
esst_recognizer = ESST(50, parallel=True)

# compute the score
start = time()
score = esst_recognizer.transform(x)
score1 = esst_recognizer.transform(x)
print(f'Computation for {len(x)} signal values took {time()-start} s.')


Expand Down

0 comments on commit 21856c0

Please sign in to comment.