import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn import model_selection
from .dar_model import DAR, extract_driver
from .utils.validation import check_is_fitted
from .utils.progress_bar import ProgressBar
from .utils.parallel import Parallel
[docs]class DARSklearn(DAR, BaseEstimator):
"""Different interface to DAR models, to use in scikit-learn's GridSearchCV
Parameters
----------
fs : float
Sampling frequency
max_ordar : int >= 0
Maximum ordar over a potential cross-validation scheme.
The log-likelihood does not use the first max_ordar points in its
computation, to fairly compare different ordar over cross-validation.
ordar : int >= 0
Order of the autoregressive model (p)
ordriv : int >= 0
Order of the taylor expansion for sigdriv (m)
normalize : boolean
If True, the basis vectors are normalized to unit energy.
ortho : boolean
If True, the basis vectors are orthogonalized.
center : boolean
If True, we subtract the mean in sigin
iter_gain : int >=0
Maximum number of iteration in gain estimation
eps_gain : float >= 0
Threshold to stop iterations in gain estimation
use_driver_phase : boolean
If True, we divide the driver by its instantaneous amplitude.
Examples
--------
>>> from sklearn.model_selection import GridSearchCV
>>> from pactools.grid_search import DARSklearn
>>> model = DARSklearn(fs=fs)
>>> param_grid = {'ordar': [10, 20, 30], 'ordriv': [0, 1, 2]}
>>> gscv = GridSearchCV(model, param_grid=param_grid)
>>> X = MultipleArray(sigin, sigdriv, sigdriv_imag)
>>> gscv.fit(X)
>>> print(gscv.cv_results_)
"""
[docs] def __init__(self, fs, max_ordar, ordar=1, ordriv=0, normalize=False,
ortho=True, center=True, iter_gain=10, eps_gain=1.0e-4,
progress_bar=False, use_driver_phase=False,
warn_gain_estimation_failure=False):
super(DARSklearn, self).__init__(
ordar=ordar, ordriv=ordriv, criterion=None, normalize=normalize,
ortho=ortho, center=center, iter_gain=iter_gain, eps_gain=eps_gain,
progress_bar=progress_bar, use_driver_phase=use_driver_phase,
max_ordar=max_ordar,
warn_gain_estimation_failure=warn_gain_estimation_failure)
self.fs = fs
def fit(self, X, y=None):
"""Fit the DAR model
Parameters
----------
X : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
y : None
Not used
Returns
-------
self
"""
sigin, sigdriv, sigdriv_imag = X.to_list()
super(DARSklearn, self).fit(sigin=sigin, sigdriv=sigdriv, fs=self.fs,
sigdriv_imag=sigdriv_imag)
return self
def fit_transform(self, X, y=None):
"""Fit the model and transform sigin into the residuals
Parameters
----------
X : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
y : None
Not used
Returns
-------
residuals : array, shape (n_epochs, n_points)
Residuals of sigin unexplained by the model
"""
self.fit(X, y)
return self.transform(X, y)
def transform(self, X, y=None):
"""Transform sigin into the residuals unexplained by the fitted model
Parameters
----------
X : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
y : None
Not used
Returns
-------
residuals : array, shape (n_epochs, n_points)
Residuals of sigin unexplained by the fitted model.
"""
check_is_fitted(self, 'AR_')
sigin, sigdriv, sigdriv_imag = X.to_list()
return super(DARSklearn,
self).transform(sigin=sigin, sigdriv=sigdriv, fs=self.fs,
sigdriv_imag=sigdriv_imag)
def score(self, X, y=None):
"""Difference in log-likelihood of this model and a model AR(0)
We subtract the log-likelihood of an autoregressive model at order 0,
in order to have a reference stable over cross-validation splits.
Parameters
----------
X : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
y : None
Not used
Returns
-------
score : float
Difference in log-likelihood of this model and a model AR(0).
The high is the better.
"""
self.transform(X, y)
score = self.logl * self.tmax
max_ordar = self.max_ordar
if max_ordar is None:
raise ValueError(
'max_ordar should not be zero, since it biases the grid search'
' over ordar. Set max_ordar to the maximum value of ordar in '
'the grid search, or bigger.')
# remove the log likelihood of an AR(0) to better compare likelihoods
score -= self._estimate_log_likelihood_ref(skip=max_ordar)[0]
return score
[docs]class AddDriverDelay(BaseEstimator, TransformerMixin):
"""Transformer which adds a time delay in the driver of a DAR model
Examples
--------
>>> from sklearn.model_selection import GridSearchCV
>>> from sklearn.pipeline import pipeline
>>> from pactools.grid_search import DARSklearn, AddDriverDelay
>>> from pactools.grid_search import MultipleArray
>>> param_grid = {
... 'dar__ordar': [10, 20, 30],
... 'dar__ordriv': [0, 1, 2],
... 'add__delay': [-10, 0, 10]
... }
>>> model = Pipeline(steps=[
... ('add', AddDriverDelay()),
... ('dar', DARSklearn(fs=fs, max_ordar=30)),
... ])
>>> X = MultipleArray(sigin, sigdriv, sigdriv_imag)
>>> gscv.fit(X)
>>> print(gscv.cv_results_)
"""
[docs] def __init__(self, delay=0, n_decay=30):
self.delay = delay
self.n_decay = n_decay
def fit(self, X, y=None):
"""No fit is needed"""
return self
def transform(self, X, y=None):
"""Apply the temporal delay on sigdriv and sigdriv_imag
Parameters
----------
X : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
y : None
Not used
Returns
-------
Xt : MultipleArray with three signals:
sigin : array, shape (n_epochs, n_points)
Signal that is to be modeled
sigdriv : array, shape (n_epochs, n_points)
Signal that drives the model
sigdriv_imag : array, shape (n_epochs, n_points), or None
Second driver, containing the imaginary part of the driver
Is not None only if extract_complex is True.
"""
sigin, sigdriv, sigdriv_imag = X.to_list()
# window decay for temporal continuity after np.roll
n_decay = self.n_decay
window = np.blackman(n_decay * 2 - 1)[:n_decay]
def window_and_roll(sig):
sig = sig.copy() # copy to avoid modifying original
sig[..., :n_decay] *= window
sig[..., -n_decay:] *= window[::-1]
sig = np.roll(sig, self.delay, axis=-1)
return sig
sigdriv = window_and_roll(sigdriv)
if sigdriv_imag is not None:
sigdriv_imag = window_and_roll(sigdriv_imag)
return MultipleArray(sigin, sigdriv, sigdriv_imag)
[docs]class MultipleArray(object):
"""Store a list of arrays into a single object and handle indexing for them
This is to be used as X in GridSearchCV(DARSklearn()).fit(X)
Some of the arrays can be None.
Parameters
----------
*signals : several ndarray or None
Stored signals
Examples
--------
>>> import numpy as np
>>> from pactools.grid_search import MultipleArray
>>> ma = MultipleArray(np.ones(3, 10), None, None, np.zeros(3, 10, 2))
>>> print(ma)
>>> print(ma[1:])
>>> from sklearn.model_selection import GridSearchCV
>>> from pactools.grid_search import DARSklearn
>>> model = DARSklearn(fs=fs)
>>> param_grid = {'ordar': [10, 20, 30], 'ordriv': [0, 1, 2]}
>>> gscv = GridSearchCV(model, param_grid=param_grid)
>>> X = MultipleArray(sigin, sigdriv, sigdriv_imag)
>>> gscv.fit(X)
"""
[docs] def __init__(self, *signals):
self.signals = signals
@property
def shape(self):
"""Only return the shape of the first array"""
return self.signals[0].shape
def take(self, *args, **kwargs):
tmp = [
X.take(*args, **kwargs) if X is not None else None
for X in self.signals
]
return MultipleArray(*tmp)
def __str__(self):
return 'MultipleArray:\n' + '\n'.join([str(X) for X in self.signals])
def __repr__(self):
return 'MultipleArray:\n' + '\n'.join([repr(X) for X in self.signals])
def __getitem__(self, indices):
tmp = [X[indices] if X is not None else None for X in self.signals]
return MultipleArray(*tmp)
def to_list(self):
return self.signals
class GridSearchCVProgressBar(model_selection.GridSearchCV):
"""Monkey patch Parallel to have a progress bar during grid search"""
def _get_param_iterator(self):
"""Return ParameterGrid instance for the given param_grid"""
iterator = super(GridSearchCVProgressBar, self)._get_param_iterator()
iterator = list(iterator)
n_candidates = len(iterator)
cv = model_selection._split.check_cv(self.cv, None)
n_splits = getattr(cv, 'n_splits', 3)
max_value = n_candidates * n_splits
class ParallelProgressBar(Parallel):
def __call__(self, iterable):
bar = ProgressBar(max_value=max_value, title='GridSearchCV')
iterable = bar(iterable)
return super(ParallelProgressBar, self).__call__(iterable)
# Monkey patch
model_selection._search.Parallel = ParallelProgressBar
return iterator