Source code for pactools.dar_model.base_lattice

from abc import ABCMeta, abstractmethod

import numpy as np
from scipy import linalg

from .base_dar import BaseDAR
from ..utils.arma import ki2ai


class BaseLattice(BaseDAR):
    __metaclass__ = ABCMeta

    def __init__(self, iter_newton=0, eps_newton=0.001, **kwargs):
        """Creates a base Lattice model with Taylor expansion

        iter_newton : maximum number of Newton-Raphson iterations
        eps_newton  : threshold to stop Newton-Raphson iterations
        """
        super(BaseLattice, self).__init__(**kwargs)
        self.iter_newton = iter_newton
        self.eps_newton = eps_newton

    def synthesis(self, sigdriv=None, sigin_init=None):
        """Create a signal from fitted model

        sigdriv      : driving signal
        sigdriv_init : shape (1, self.ordar_) previous point of the past

        """
        ordar_ = self.ordar_
        if sigdriv is None:
            sigdriv = self.sigdriv
            try:
                newbasis = self.basis_
            except AttributeError:
                newbasis = self._make_basis()
        else:
            newbasis = self._make_basis(sigdriv=sigdriv)

        n_epochs, n_points = sigdriv.shape

        if sigin_init is None:
            sigin_init = np.random.randn(1, ordar_)
        sigin_init = np.atleast_2d(sigin_init)
        if sigin_init.shape[1] < ordar_:
            raise (ValueError, "initialization of incorrect length: %d instead"
                   "of %d (self.ordar_)" % (sigin_init.shape[1], ordar_))

        # initialization
        signal = np.zeros(n_points)
        signal[:ordar_] = sigin_init[0, -ordar_:]

        # compute basis corresponding to sigdriv
        newbasis = self._make_basis(sigdriv=sigdriv)

        # synthesis of the signal
        random_sig = np.random.randn(n_epochs, n_points - ordar_)
        burn_in = np.random.randn(n_epochs, ordar_)
        sigout = self.synthesize(random_sig, burn_in=burn_in,
                                 newbasis=newbasis)

        return sigout

    # ------------------------------------------------ #
    # Functions that are added to the derived class    #
    # ------------------------------------------------ #
    def synthesize(self, random_sig, burn_in=None, newbasis=None):
        """Apply the inverse lattice filter to synthesize
        an AR signal

        random_sig : array containing the excitation starting at t=0
        burn_in    : auxiliary excitations before t=0
        newbasis   : if None, we use the basis used for fitting (self.basis_)
                    else, we use the given basis.

        returns
        -------
        sigout : array containing the synthesized signal

        """
        if newbasis is None:
            basis = self.basis_
        else:
            basis = newbasis
        # -------- select ordar (prefered: ordar_)
        ordar = self.ordar_

        # -------- prepare excitation signal and burn in phase
        if burn_in is None:
            tburn = 0
            excitation = random_sig
        else:
            tburn = burn_in.size
            excitation = np.hstack((burn_in, random_sig))

        n_basis, n_epochs, n_points = basis.shape
        n_epochs, n_points_minus_ordar_ = random_sig.shape

        # -------- allocate arrays for forward and backward residuals
        e_forward = np.zeros(n_epochs, ordar + 1)
        e_backward = np.zeros(n_epochs, ordar + 1)

        # -------- prepare parcor coefficients
        parcor_list = self._develop_parcor(self.AR_, basis)
        parcor_list = self.decode(parcor_list)

        gain = self._develop_gain(basis, squared=False, log=False)

        # -------- create the output signal
        sigout = np.zeros(n_epochs, n_points_minus_ordar_)
        for t in range(-tburn, n_points_minus_ordar_):
            e_forward[:, ordar] = excitation[:, t + tburn] * gain[0, max(t, 0)]
            for p in range(ordar, 0, -1):
                e_forward[:, p - 1] = (
                    e_forward[:, p] -
                    parcor_list[p - 1, :, max(t, 0)] * e_backward[:, p - 1])
            for p in range(ordar, 0, -1):
                e_backward[:, p] = (
                    e_backward[:, p - 1] +
                    parcor_list[p - 1, :, max(t, 0)] * e_forward[:, p - 1])
            e_backward[:, 0] = e_forward[:, 0]
            sigout[:, max(t, 0)] = e_forward[:, 0]

        return sigout

    def cell(self, parcor_list, forward_residual, backward_residual):
        """Apply a single cell of the direct lattice filter
        to whiten the original signal

        parcor_list       : array, lattice coefficients
        forward_residual  : array, forward residual from previous cell
        backward_residual : array, backward residual from previous cell

        the input arrays should have same shape as self.sigin

        returns:
        forward_residual  : forward residual after this cell
        backward_residual : backward residual after this cell

        """
        f_residual = np.copy(forward_residual)
        b_residual = np.zeros(backward_residual.shape)
        delayed = np.copy(backward_residual[:, 0:-1])

        # -------- apply the cell
        b_residual[:, 1:] = delayed + (parcor_list[:, 1:] * f_residual[:, 1:])
        b_residual[:, 0] = parcor_list[:, 0] * f_residual[:, 0]
        f_residual[:, 1:] += parcor_list[:, 1:] * delayed
        return f_residual, b_residual

    def whiten(self):
        """Apply the direct lattice filter to whiten the original signal

        returns:
        residual : array containing the residual (whitened) signal
        backward : array containing the backward residual (whitened) signal

        """
        # compute the error on both the train and test part
        sigin = self.sigin
        basis = self.basis_

        # -------- select ordar (prefered: ordar_)
        ordar = self.ordar_
        n_epochs, n_points = sigin.shape

        residual = np.copy(sigin)
        backward = np.copy(sigin)
        for k in range(ordar):
            parcor_list = self._develop_parcor(self.AR_[k], basis)
            parcor_list = self.decode(parcor_list)
            residual, backward = self.cell(parcor_list, residual, backward)

        return residual, backward

    def _develop_parcor(self, LAR, basis):
        single_dim = LAR.ndim == 1
        LAR = np.atleast_2d(LAR)
        # n_basis, n_epochs, n_points = basis.shape
        # ordar, n_basis = LAR.shape
        # ordar, n_epochs, n_points = lar_list.shape
        lar_list = np.tensordot(LAR, basis, axes=([1], [0]))

        if single_dim:
            # n_epochs, n_points = lar_list.shape
            lar_list = lar_list[0, :, :]
        return lar_list

    # ------------------------------------------------ #
    # Functions that should be overloaded              #
    # ------------------------------------------------ #
    @abstractmethod
    def decode(self, lar):
        """Extracts parcor coefficients from encoded version (e.g. LAR)

        lar : array containing the encoded coefficients

        returns:
        ki : array containing the decoded coefficients (same size as lar)

        """
        pass

    @abstractmethod
    def encode(self, ki):
        """Encodes parcor coefficients to parameterized coefficients
        (for instance LAR)

        ki : array containing the original parcor coefficients

        returns:
        lar : array containing the encoded coefficients (same size as ki)

        """
        pass

    @abstractmethod
    def _common_gradient(self, p, ki):
        """Compute common factor in gradient. The gradient is computed as
        G[p] = sum from t=1 to T {g[p,t] * F(t)}
        where F(t) is the vector of driving signal and its powers
        g[p,t] = (e_forward[p, t] * e_backward[p-1, t-1]
                  + e_backward[p, t] * e_forward[p-1, t]) * phi'[k[p,t]]
        phi is the encoding function, and phi' is its derivative.

        p  : order corresponding to the current lattice cell
        ki : array containing the original parcor coefficients

        returns:
        g : array containing the factors (size (n_epochs, n_points - 1))

        """
        pass

    @abstractmethod
    def _common_hessian(self, p, ki):
        """Compute common factor in Hessian. The Hessian is computed as
        H[p] = sum from t=1 to T {F(t) * h[p,t] * F(t).T}
        where F(t) is the vector of driving signal and its powers
        h[p,t] = (e_forward[p, t-1]**2 + e_backward[p-1, t-1]**2)
                    * phi'[k[p,t]]**2
                 + (e_forward[p, t] * e_backward[p-1, t-1]
                    e_backward[p, t] * e_forward[p-1, t]) * phi''[k[p,t]]
        phi is the encoding function, phi' is its first derivative,
        and phi'' is its second derivative.

        p  : order corresponding to the current lattice cell
        ki : array containing the original parcor coefficients

        returns:
        h : array containing the factors (size (n_epochs, n_points - 1))

        """
        pass

    # ------------------------------------------------ #
    # Functions that overload abstract methods         #
    # ------------------------------------------------ #
    def _next_model(self):
        """Compute the AR model at successive orders

        Acts as a generator that stores the result in self.AR_
        Creates the models with orders from 0 to self.ordar

        Typical usage:
        for AR_ in A._next_model():
            A.AR_ = AR_
            A.ordar_ = AR_.shape[0]

        """
        if self.basis_ is None:
            raise ValueError(
                '%s: basis_ does not yet exist' % self.__class__.__name__)

        # --------  get the training data
        sigin, basis, weights = self._get_train_data([self.sigin, self.basis_])

        if weights is not None:
            weights = np.sqrt(weights)
            w_basis = basis * weights
        else:
            w_basis = basis

        # -------- select signal, basis and regression signals
        n_basis, n_epochs, n_points = basis.shape
        ordar_ = self.ordar
        scale = 1.0 / n_points

        # -------- prepare residual signals
        forward_res = np.copy(sigin)
        backward_res = np.copy(sigin)

        self.forward_residual = np.empty((ordar_ + 1, n_epochs, n_points))
        self.backward_residual = np.empty((ordar_ + 1, n_epochs, n_points))
        self.forward_residual[0] = forward_res
        self.backward_residual[0] = backward_res

        # -------- model at order 0
        AR_ = np.empty((0, self.n_basis))
        yield AR_

        # -------- loop on successive orders
        for k in range(0, ordar_):
            # -------- prepare initial estimation (driven parcor)
            if weights is not None:
                # the weights and basis are not delay as backward_res /!\
                forward_res = weights[:, k + 1:] * forward_res[:, k + 1:]
                backward_res = weights[:, k + 1:] * backward_res[:, k:-1]
            else:
                forward_res = forward_res[:, k + 1:]
                backward_res = backward_res[:, k:-1]

            forward_regressor = basis[:, :, k + 1:] * forward_res
            backward_regressor = basis[:, :, k + 1:] * backward_res

            # this reshape method will throw an error if a copy is needed
            forward_regressor.shape = (n_basis, -1)
            backward_regressor.shape = (n_basis, -1)
            backward_res.shape = (1, -1)

            R = (np.dot(forward_regressor, forward_regressor.T) + np.dot(
                backward_regressor, backward_regressor.T))
            r = np.dot(forward_regressor, backward_res.T)
            backward_res.shape = (n_epochs, -1)

            R *= scale
            r *= scale * 2.0
            parcor = -np.linalg.solve(R, r).T

            # n_basis, n_epochs, n_points = basis.shape
            # n_epochs, n_points = parcor_list.shape
            parcor_list = self._develop_parcor(parcor.ravel(), basis)
            parcor_list = np.maximum(parcor_list, -0.999999)
            parcor_list = np.minimum(parcor_list, 0.999999)

            lar_list = self.encode(parcor_list[:, k:])
            if weights is not None:
                lar_list *= weights[:, k:]

            R2 = scale * np.dot(w_basis[:, :, k:].reshape(n_basis, -1),
                                w_basis[:, :, k:].reshape(n_basis, -1).T)
            r2 = scale * np.dot(w_basis[:, :, k:].reshape(n_basis, -1),
                                lar_list.reshape(-1, 1))
            LAR = np.linalg.solve(R2, r2).T

            # -------- Newton-Raphson refinement
            dLAR = np.inf
            itnum = 0
            while (itnum < self.iter_newton
                   and np.amax(np.abs(dLAR)) > self.eps_newton):
                itnum += 1

                # -------- compute next residual
                lar_list = self._develop_parcor(LAR.ravel(), basis)
                parcor_list = self.decode(lar_list)
                forward_res_next, backward_res_next = self.cell(
                    parcor_list, self.forward_residual[k],
                    self.backward_residual[k])
                self.forward_residual[k + 1] = forward_res_next
                self.backward_residual[k + 1] = backward_res_next

                # -------- correct the current vector
                g = self._common_gradient(k + 1, parcor_list)
                if weights is not None:
                    g *= weights[:, 1:n_points]
                # n_epochs, n_points - 1 = g.shape = h.shape
                # n_basis, n_epochs, n_points = basis.shape
                gradient = 2.0 * np.dot(w_basis[:, :, 1:n_points].reshape(
                    n_basis, -1), g.reshape(-1, 1))
                gradient.shape = (n_basis, 1)

                h = self._common_hessian(k + 1, parcor_list)
                hessian = 2.0 * np.dot(
                    (w_basis[:, :, 1:n_points] * h).reshape(n_basis, -1),
                    w_basis[:, :, 1:n_points].reshape(n_basis, -1).T)

                dLAR = np.reshape(
                    linalg.solve(hessian, gradient), (1, n_basis))
                LAR -= dLAR

            # -------- save current cell and residuals
            lar_list = self._develop_parcor(LAR.ravel(), basis)
            parcor_list = self.decode(lar_list)

            forward_res, backward_res = self.cell(parcor_list,
                                                  self.forward_residual[k],
                                                  self.backward_residual[k])
            self.forward_residual[k + 1] = forward_res
            self.backward_residual[k + 1] = backward_res

            AR_ = np.vstack((AR_, np.reshape(LAR, (1, n_basis))))
            yield AR_

    def _estimate_error(self, recompute=False):
        """Estimates the prediction error

        uses self.sigin, self.basis_ and AR_

        """
        if not recompute:
            # if residual are stored, simply return the right one
            try:
                self.residual_ = self.forward_residual[self.ordar_]
            # otherwise, compute the prediction error
            except AttributeError:
                recompute = True

        if recompute:
            self.residual_, _ = self.whiten()

    def _develop(self, basis):
        """Compute the AR models and gains at instants fixed by newcols

        returns:
        ARcols : array containing the AR parts
        Gcols  : array containing the gains

        The size of ARcols is (1 + ordar, n_epochs, n_columns)
        The size of Gcols is (1, n_epochs, n_columns)

        """
        n_basis, n_epochs, n_points = basis.shape
        ordar = self.ordar_

        # -------- expand on the basis
        AR_cols = np.ones((1, n_epochs, n_points))
        if ordar > 0:
            parcor_list = self._develop_parcor(self.AR_, basis)
            parcor_list = self.decode(parcor_list)
            AR_cols = np.vstack((AR_cols, ki2ai(parcor_list)))
        G_cols = self._develop_gain(basis, squared=False, log=False)

        return AR_cols, G_cols