neural_filter.py 4.77 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
"""
NeuralFilterCell
**************

This module implements a basic trainable all-pole first order filter using pyTorch


Copyright (c) 2018 Idiap Research Institute, http://www.idiap.ch/

Written by Francois Marelli <Francois.Marelli@idiap.ch>

This file is part of neural_filters.

neural_filters is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License version 3 as
published by the Free Software Foundation.

neural_filters is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with neural_filters. If not, see <http://www.gnu.org/licenses/>.

"""

M. François's avatar
M. François committed
28
import numpy as np
29 30 31
import torch
from torch.nn import Parameter
from torch.nn import functional as F
M. François's avatar
M. François committed
32 33 34 35
from torch.nn._functions.rnn import Recurrent, VariableRecurrent
from torch.nn.utils.rnn import PackedSequence

from . import asig
36 37


Francois Marelli's avatar
Francois Marelli committed
38
class NeuralFilter(torch.nn.Module):
39 40 41 42 43 44 45
    """
    A trainable first-order all-pole filter :math:`\\frac{1}{1 - P z^{-1}}`

    * **hidden_size** (int) - the size of the data vector
    """

    def __init__(self, hidden_size):
Francois Marelli's avatar
Francois Marelli committed
46
        super(NeuralFilter, self).__init__()
47 48 49 50 51 52 53

        self.hidden_size = hidden_size

        self.bias_forget = Parameter(torch.Tensor(hidden_size))

        self.reset_parameters()

Francois Marelli's avatar
Francois Marelli committed
54
    def reset_parameters(self, init=None, min_modulus=0, max_modulus=1):
Francois Marelli's avatar
Francois Marelli committed
55
        if init is None:
Francois Marelli's avatar
Francois Marelli committed
56 57
            parts = self.hidden_size * 2
            ranges = np.arange(1, parts, 2)
58

M. François's avatar
M. François committed
59
            init = ranges * (max_modulus - min_modulus) / parts + min_modulus
Francois Marelli's avatar
Francois Marelli committed
60 61 62 63

        if not isinstance(init, np.ndarray):
            init = np.array(init, ndmin=1)

M. François's avatar
M. François committed
64 65
        init_modulus = asig(init)
        ten_init = torch.from_numpy(init_modulus)
Francois Marelli's avatar
Francois Marelli committed
66
        self.bias_forget.data.copy_(ten_init)
67 68

    def __repr__(self):
69
        s = '{name}({hidden_size})'
70 71
        return s.format(name=self.__class__.__name__, **self.__dict__)

M. François's avatar
M. François committed
72 73 74 75
    def check_forward_args(self, input_var, hidden, batch_sizes):
        is_input_packed = batch_sizes is not None
        expected_input_dim = 2 if is_input_packed else 3
        if input_var.dim() != expected_input_dim:
76
            raise RuntimeError(
M. François's avatar
M. François committed
77 78 79
                'input must have {} dimensions, got {}'.format(
                    expected_input_dim, input_var.dim()))
        if self.hidden_size != input_var.size(-1):
80
            raise RuntimeError(
M. François's avatar
M. François committed
81 82
                'input.size(-1) must be equal to hidden_size. Expected {}, got {}'.format(
                    self.input_size, input_var.size(-1)))
83

M. François's avatar
M. François committed
84 85 86 87 88 89 90 91 92 93
        if is_input_packed:
            mini_batch = int(batch_sizes[0])
        else:
            mini_batch = input_var.size(1)

        expected_hidden_size = (mini_batch, self.hidden_size)

        def check_hidden_size(hx, expected_hidden_size, msg='Expected hidden size {}, got {}'):
            if tuple(hx.size()) != expected_hidden_size:
                raise RuntimeError(msg.format(expected_hidden_size, tuple(hx.size())))
94

M. François's avatar
M. François committed
95 96 97 98
        check_hidden_size(hidden, expected_hidden_size,
                          'Expected hidden[0] size {}, got {}')

    def step(self, input_var, hidden, a=None):
99 100
        if a is None:
            a = F.sigmoid(self.bias_forget)
101

M. François's avatar
M. François committed
102
        next_state = (a * hidden) + input_var
103 104
        return next_state

M. François's avatar
M. François committed
105 106 107 108 109 110 111 112
    def forward(self, input_var, hidden=None):
        is_packed = isinstance(input_var, PackedSequence)
        if is_packed:
            input_var, batch_sizes = input_var
            max_batch_size = int(batch_sizes[0])
        else:
            batch_sizes = None
            max_batch_size = input_var.size(1)
113

M. François's avatar
M. François committed
114 115
        if hidden is None:
            hidden = input_var.data.new_zeros(max_batch_size, self.hidden_size, requires_grad=False)
116

M. François's avatar
M. François committed
117
        self.check_forward_args(input_var, hidden, batch_sizes)
118

119 120 121
        # compute this once for all steps for efficiency
        a = F.sigmoid(self.bias_forget)

M. François's avatar
M. François committed
122 123
        func = Recurrent(self.step) if batch_sizes is None else VariableRecurrent(self.step)
        nexth, output = func(input_var, hidden, (a,), batch_sizes)
124

M. François's avatar
M. François committed
125 126
        if is_packed:
            output = PackedSequence(output, batch_sizes)
127

M. François's avatar
M. François committed
128
        return output, nexth
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145

    @property
    def gradients(self):
        grad = self.bias_forget.grad
        if grad is not None:
            gradient = grad.data.numpy()
            return gradient.reshape((gradient.size, 1))
        else:
            return np.zeros((self.hidden_size, 1))

    @property
    def denominator(self):
        forgetgate = F.sigmoid(self.bias_forget).data.numpy()
        forgetgate = forgetgate.reshape((forgetgate.size, 1))
        one = np.ones(forgetgate.shape)
        denom = np.concatenate((one, -forgetgate), axis=1)
        return denom