Skip to content

Commit

Permalink
ENH: add pupil deconvolution method, ported from pyeparse
Browse files Browse the repository at this point in the history
This basically ports the code from pyeparse, with slight updates to work with mne and match mne and modern python conventions.
  • Loading branch information
scott-huberty committed Mar 18, 2024
1 parent 538bfb1 commit 8d5d749
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 7 deletions.
3 changes: 3 additions & 0 deletions doc/api/preprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ Projections:
convert_units
get_screen_visual_angle
interpolate_blinks
deconvolve
pupil_zscores
pupil_kernel

EEG referencing:

Expand Down
23 changes: 23 additions & 0 deletions doc/references.bib
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,17 @@ @article{HippEtAl2012
year = {2012}
}

@article{Hoeks1993,
author = {Hoeks, Bert and Levelt, Willem J M},
doi = {10.3758/BF03204445},
journal = {Behavior Research Methods, Instruments, & Computers},
number = {1},
pages = {16--26},
title = {Pupillary dilation as a measure of attention: a quantitative system analysis},
volume = {25},
year = {1993}
}

@article{HoldgrafEtAl2016,
author = {Holdgraf, Christopher R. and {de Heer}, Wendy and Pasley, Brian and Rieger, Jochem and Crone, Nathan and Lin, Jack J. and Knight, Robert T. and Theunissen, Frédéric E.},
doi = {10.1038/ncomms13654},
Expand Down Expand Up @@ -2153,6 +2164,18 @@ @inproceedings{StrohmeierEtAl2015
pages = {21--24}
}

@article{Wierda2012,
title = "Pupil dilation deconvolution reveals the dynamics of attention
at high temporal resolution",
author = "Wierda, Stefan M and van Rijn, Hedderik and Taatgen, Niels A and
Martens, Sander",
journal = "Proceedings of the National Academy of Sciences",
volume = 109,
number = 22,
pages = "8456--8460",
year = 2012,
}

@misc{WikipediaSI,
author = "{Wikipedia contributors}",
title = "International System of Units — {Wikipedia}{,} The Free Encyclopedia",
Expand Down
2 changes: 1 addition & 1 deletion ignore_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ pres
aas
vor
connec
Hoeks
hoeks
2 changes: 1 addition & 1 deletion mne/preprocessing/eyetracking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

from .eyetracking import set_channel_types_eyetrack, convert_units
from .calibration import Calibration, read_eyelink_calibration
from ._pupillometry import interpolate_blinks
from ._pupillometry import interpolate_blinks, deconvolve, pupil_kernel, pupil_zscores
from .utils import get_screen_visual_angle
269 changes: 268 additions & 1 deletion mne/preprocessing/eyetracking/_pupillometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@

import numpy as np

from mne import BaseEpochs
from mne._fiff.pick import _picks_to_idx
from mne.parallel import parallel_func

from ..._fiff.constants import FIFF
from ...io import BaseRaw
from ...utils import _check_preload, _validate_type, logger, warn
from ...utils import (
_check_option,
_check_preload,
_validate_type,
fill_doc,
logger,
warn,
)


def interpolate_blinks(raw, buffer=0.05, match="BAD_blink", interpolate_gaze=False):
Expand Down Expand Up @@ -115,3 +126,259 @@ def _interpolate_blinks(raw, buffer, blink_annots, interpolate_gaze):
)
else:
warn("No channels were interpolated.")


@fill_doc
def pupil_zscores(epochs, baseline=(None, 0)):
"""Get normalized pupil data.
This function normalizes pupil responses within each epoch by subtracting
the mean pupil response during a specified baseline period and then dividing
by the standard deviation of all data (across time). This may help to compare
pupil responses across epochs or participants.
Parameters
----------
epochs : instance of Epochs
The epochs with pupil channels.
%(pupil_baseline)s
Returns
-------
pupil_data : array
An array of pupil size data, shape (``n_epochs``, ``n_channels``, ``n_times``).
"""
# Code ported from https://github.com/pyeparse/pyeparse
_check_preload(epochs, "Z-score normalization")
_validate_type(epochs, BaseEpochs, "epochs")
_validate_type(baseline, (tuple, list, np.ndarray), "baseline")

pupil_picks = _picks_to_idx(epochs.info, "pupil", allow_empty=True)
if not pupil_picks.any():
raise RuntimeError("no pupil data")
if len(baseline) != 2:
raise RuntimeError("baseline must be a 2-element list")
baseline = np.array(baseline)
if baseline[0] is None:
baseline[0] = epochs.times[0]
if baseline[1] is None:
baseline[1] = epochs.times[-1]
baseline = epochs.time_as_index(baseline)
zs = epochs.get_data(pupil_picks)
std = np.nanstd(zs.flat)
bl = np.nanmean(zs[..., baseline[0] : baseline[1] + 1], axis=-1)
zs -= bl[:, np.newaxis, :]
zs /= std
return zs


@fill_doc
def deconvolve(
epochs,
spacing=0.1,
baseline=(None, 0),
bounds=None,
max_iter=500,
kernel=None,
n_jobs=1,
acc=1e-6,
method="minimize",
reg=100,
):
"""Deconvolve pupillary responses.
Parameters
----------
epochs : instance of Epochs
The epochs with pupil data to deconvolve.
spacing : float | array
Spacing of time points to use for deconvolution. Can also
be an array to directly specify time points to use.
%(pupil_baseline)s
This is passed to :func:`~mne.preprocessing.eyetracking.pupil_zscores`.
bounds : array of shape (2,) | None
Limits for deconvolution values. Can be, e.g. ``(0, np.inf)`` to
constrain to positive values. If ``None``, no bounds are used. Default is
``None``.
max_iter : int
Maximum number of iterations of minimization algorithm. Default is ``500``.
kernel : array | None
Kernel to assume when doing deconvolution. If ``None``, the
Hoeks and Levelt (1993) kernel will be used. :footcite:p:`Hoeks1993`.
%(n_jobs)s
acc : float
The requested accuracy. Lower accuracy generally means smoother
fits.
method : str
Can be ``"minimize"`` to use SLSQP or ``"inverse"`` to use
Tikhonov-regularized pseudoinverse. Default is ``"minimize"``.
reg : float
Regularization factor for pseudoinverse calculation. Only used if method is
``"inverse"``. Default is 100.
Returns
-------
fit : array
Array of fits, of shape (``n_epochs``, ``n_channels``, ``n_fit_times``).
times : array
The array of times at which points were fit.
Notes
-----
This method is adapted from:
Wierda et al., 2012, "Pupil dilation deconvolution reveals the
dynamics of attention at high temporal resolution." :footcite:p:`Wierda2012`
Our implementation does not, by default, force all weights to be
greater than zero. It also does not do first-order detrending,
which the Wierda paper discusses implementing.
References
----------
.. footbibliography::
"""
from scipy import linalg

# Code ported from https://github.com/pyeparse/pyeparse
_validate_type(spacing, (float, np.ndarray, tuple, list), "spacing")
_validate_type(bounds, (type(None), tuple, list, np.ndarray), "bounds")
_validate_type(max_iter, int, "max_iter")
_validate_type(kernel, (np.ndarray, type(None)), "kernel")
_validate_type(n_jobs, int, "n_jobs")
_validate_type(acc, float, "acc")
_validate_type(method, str, "method")
_check_option("method", method, ["minimize", "inverse"])
_validate_type(reg, (int, float), "reg")

if bounds is not None:
bounds = np.array(bounds)
if bounds.ndim != 1 or bounds.size != 2:
raise RuntimeError("bounds must be 2-element array or None")
if kernel is None:
kernel = pupil_kernel(epochs.info["sfreq"])
else:
kernel = np.array(kernel, np.float64)
if kernel.ndim != 1:
raise TypeError("kernel must be 1D")

# get the data (and make sure it exists)
pupil_data = pupil_zscores(epochs, baseline=baseline)

# set up parallel function (and check n_jobs)
parallel, p_fun, n_jobs = parallel_func(_do_deconv, n_jobs)

# figure out where the samples go
n_samp = len(epochs.times)
if not isinstance(spacing, (np.ndarray, tuple, list)):
times = np.arange(epochs.times[0], epochs.times[-1], spacing)
times = np.unique(times)
else:
times = np.asanyarray(spacing)
samples = epochs.time_as_index(times)
if len(samples) == 0:
warn("No usable samples")
return np.array([]), np.array([])

# convert bounds to slsqp representation
if bounds is not None:
bounds = np.array([bounds for _ in range(len(samples))])
else:
bounds = [] # compatible with old version of scipy

# Build the convolution matrix
conv_mat = np.zeros((n_samp, len(samples)))
for li, loc in enumerate(samples):
eidx = min(loc + len(kernel), n_samp)
conv_mat[loc:eidx, li] = kernel[: eidx - loc]

# do the fitting
if method == "inverse":
u, s, v = linalg.svd(conv_mat, full_matrices=False)
# Threshold small singular values
s[s < 1e-7 * s[0]] = 0
# Regularize non-zero singular values
s[s > 0] /= s[s > 0] ** 2 + reg
inv_conv_mat = np.dot(v.T, s[:, np.newaxis] * u.T)
fit = np.dot(pupil_data, inv_conv_mat.T)
else: # minimize
fit_fails = parallel(
p_fun(data, conv_mat, bounds, max_iter, acc)
for data in np.array_split(pupil_data, n_jobs)
)
fit = np.concatenate([f[0] for f in fit_fails])
fails = np.concatenate([f[1] for f in fit_fails])
if np.any(fails):
reasons = ", ".join(str(r) for r in np.setdiff1d(np.unique(fails), [0]))
warn(
f"{np.sum(fails != 0)} out of {len(fails)} fits "
f"did not converge (reasons: {reasons})"
)
return fit, times


def _do_deconv(pupil_data, conv_mat, bounds, max_iter, acc):
"""Parallelize deconvolution helper function."""
# Code ported from https://github.com/pyeparse/pyeparse
from scipy.optimize import fmin_slsqp

x0 = np.zeros(conv_mat.shape[1])
fit = np.empty((pupil_data.shape[0], pupil_data.shape[1], conv_mat.shape[1]))
failed = np.empty(fit.shape)
for ei, data in enumerate(pupil_data):
out = fmin_slsqp(
_score,
x0,
args=(data, conv_mat),
epsilon=1e-4,
bounds=bounds,
disp=False,
full_output=True,
iter=max_iter,
acc=acc,
)
fit[ei, :, :] = out[0]
failed[ei, :, :] = out[3]
return fit, failed


def _score(vals, x_0, conv_mat):
return np.mean((x_0 - conv_mat.dot(vals)) ** 2)


def pupil_kernel(sfreq, dur=4.0, t_max=0.930, n=10.1, s=1.0):
"""Generate pupil response kernel modeled as an Erlang gamma function.
Parameters
----------
sfreq : int
Sampling frequency (samples/second) to use in generating the kernel.
dur : float
Length (in seconds) of the generated kernel. Default is ``4.0`` seconds.
t_max : float
Time (in seconds) where the response maximum is stipulated to occur. Default is
``0.930`` seconds, as in Hoeks and Levelt (1993). :footcite:p:`Hoeks1993`.
n : float
Number of negative-exponential layers in the cascade defining the
gamma function. Default is ``10.1``, as in Hoeks and Levelt (1993).
:footcite:p:`Hoeks1993`.
s : float | None
Desired value for the area under the kernel. If ``None``, no scaling is
performed. Default is ``1.0``.
Returns
-------
h : array
The generated kernel.
References
----------
.. footbibliography::
"""
# Code ported from https://github.com/pyeparse/pyeparse
n_samp = int(np.round(sfreq * dur))
t = np.arange(n_samp, dtype=float) / sfreq
h = (t**n) * np.exp(-n * t / t_max)
scal = 1.0 if s is None else float(s) / (np.sum(h) * (t[1] - t[0]))
h = scal * h
return h
Loading

0 comments on commit 8d5d749

Please sign in to comment.