diff --git a/xbob/learn/mlp/cost.cpp b/xbob/learn/mlp/cost.cpp new file mode 100644 index 0000000000000000000000000000000000000000..54a9b242e83dd4f86f71afd753141fe542eaccb3 --- /dev/null +++ b/xbob/learn/mlp/cost.cpp @@ -0,0 +1,57 @@ +/** + * @author Andre Anjos <andre.anjos@idiap.ch> + * @date Tue 29 Apr 09:26:02 2014 CEST + * + * @brief Bindings for Cost functions + * + * Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland + */ + +#define XBOB_LEARN_MLP_MODULE +#include <xbob.blitz/cppapi.h> +#include <xbob.blitz/cleanup.h> +#include <xbob.learn.mlp/api.h> +#include <structmember.h> + +/**************************************** + * Implementation of Machine base class * + ****************************************/ + +PyDoc_STRVAR(s_machine_str, XBOB_EXT_MODULE_PREFIX ".Machine"); + +PyDoc_STRVAR(s_machine_doc, +"Machine(shape)\n\ +Machine(config)\n\ +Machine(other)\n\ +\n\ +A Multi-layer Perceptron Machine.\n\ +\n\ +An MLP Machine is a representation of a Multi-Layer Perceptron.\n\ +This implementation is feed-forward and fully-connected. The\n\ +implementation allows setting of input normalization values and\n\ +a global activation function. References to fully-connected\n\ +feed-forward networks:\n\ +\n\ + Bishop's Pattern Recognition and Machine Learning, Chapter 5.\n\ + Figure 5.1 shows what we mean.\n\ +\n\ +MLPs normally are multi-layered systems, with 1 or more hidden\n\ +layers. As a special case, this implementation also supports\n\ +connecting the input directly to the output by means of a single\n\ +weight matrix. This is equivalent of a\n\ +:py:class:`xbob.learn.linear.Machine`, with the advantage it can\n\ +be trained by trainers defined in this package.\n\ +\n\ +An MLP can be constructed in different ways. In the first form,\n\ +the user specifies the machine shape as sequence (e.g. a tuple).\n\ +The sequence should contain the number of inputs (first element),\n\ +number of outputs (last element) and the number of neurons in\n\ +each hidden layer (elements between the first and last element\n\ +of given tuple). The activation function will be set to\n\ +hyperbolic tangent. The machine is remains **uninitialized**.\n\ +In the second form the user passes a pre-opened HDF5 file\n\ +pointing to the machine information to be loaded in memory.\n\ +Finally, in the last form (copy constructor), the user passes\n\ +another :py:class:`Machine` that will be fully copied.\n\ +"); + diff --git a/xbob/learn/mlp/include/xbob.learn.mlp/api.h b/xbob/learn/mlp/include/xbob.learn.mlp/api.h index cb421e5b75f74409614b824dd4de9d34dda9ec3c..189c06f7cb7cde4bf63876f5e0b8ee8fa941f38d 100644 --- a/xbob/learn/mlp/include/xbob.learn.mlp/api.h +++ b/xbob/learn/mlp/include/xbob.learn.mlp/api.h @@ -26,6 +26,13 @@ enum _PyBobLearnMLP_ENUM{ PyBobLearnMLPMachine_Type_NUM, PyBobLearnMLPMachine_Check_NUM, PyBobLearnMLPMachine_NewFromSize_NUM, + // Bindings for xbob.learn.mlp.Cost and variants + PyBobLearnCost_Type_NUM, + PyBobLearnCost_Check_NUM, + PyBobLearnSquareError_Type_NUM, + PyBobLearnSquareError_Check_NUM, + PyBobLearnCrossEntropyLoss_Type_NUM, + PyBobLearnCrossEntropyLoss_Check_NUM, // Total number of C API pointers PyXbobLearnMLP_API_pointers }; @@ -53,6 +60,36 @@ typedef struct { #define PyBobLearnMLPMachine_NewFromSize_RET PyObject* #define PyBobLearnMLPMachine_NewFromSize_PROTO (Py_ssize_t i, Py_ssize_t o) +typedef struct { + PyObject_HEAD + bob::trainer::Cost* cxx; +} PyBobLearnCostObject; + +#define PyBobLearnCost_Type_TYPE PyTypeObject + +#define PyBobLearnCost_Check_RET int +#define PyBobLearnCost_Check_PROTO (PyObject* o) + +typedef struct { + PyBobLearnCostObject parent; + bob::trainer::SquareError* cxx; +} PyBobLearnSquareErrorObject; + +#define PyBobLearnSquareError_Type_TYPE PyTypeObject + +#define PyBobLearnSquareError_Check_RET int +#define PyBobLearnSquareError_Check_PROTO (PyObject* o) + +typedef struct { + PyBobLearnCostObject parent; + bob::trainer::CrossEntropyLoss* cxx; +} PyBobLearnCrossEntropyLossObject; + +#define PyBobLearnCrossEntropyLoss_Type_TYPE PyTypeObject + +#define PyBobLearnCrossEntropyLoss_Check_RET int +#define PyBobLearnCrossEntropyLoss_Check_PROTO (PyObject* o) + #ifdef XBOB_LEARN_MLP_MODULE /* This section is used when compiling `xbob.learn.mlp' itself */ @@ -63,9 +100,9 @@ typedef struct { extern int PyXbobLearnMLP_APIVersion; - /****************************************** + /*************************************** * Bindings for xbob.learn.mlp.Machine * - ******************************************/ + ***************************************/ extern PyBobLearnMLPMachine_Type_TYPE PyBobLearnMLPMachine_Type; @@ -73,6 +110,22 @@ typedef struct { PyBobLearnMLPMachine_NewFromSize_RET PyBobLearnMLPMachine_NewFromSize PyBobLearnMLPMachine_NewFromSize_PROTO; + /************************************ + * Bindings for xbob.learn.mlp.Cost * + ************************************/ + + extern PyBobLearnCost_Type_TYPE PyBobLearnCost_Type; + + PyBobLearnCost_Check_RET PyBobLearnCost_Check PyBobLearnCost_Check_PROTO; + + extern PyBobLearnSquareError_Type_TYPE PyBobLearnSquareError_Type; + + PyBobLearnSquareError_Check_RET PyBobLearnSquareError_Check PyBobLearnSquareError_Check_PROTO; + + extern PyBobLearnCrossEntropyLoss_Type_TYPE PyBobLearnCrossEntropyLoss_Type; + + PyBobLearnCrossEntropyLoss_Check_RET PyBobLearnCrossEntropyLoss_Check PyBobLearnCrossEntropyLoss_Check_PROTO; + #else /* This section is used in modules that use `xbob.learn.mlp's' C-API */ @@ -115,6 +168,22 @@ typedef struct { # define PyBobLearnMLPMachine_NewFromSize (*(PyBobLearnMLPMachine_NewFromSize_RET (*)PyBobLearnMLPMachine_NewFromSize_PROTO) PyXbobLearnMLP_API[PyBobLearnMLPMachine_NewFromSize_NUM]) + /************************************ + * Bindings for xbob.learn.mlp.Cost * + ************************************/ + +# define PyBobLearnCost_Type (*(PyBobLearnCost_Type_TYPE *)PyXbobLearnMLP_API[PyBobLearnCost_Type_NUM]) + +# define PyBobLearnCost_Check (*(PyBobLearnCost_Check_RET (*)PyBobLearnCost_Check_PROTO) PyXbobLearnMLP_API[PyBobLearnCost_Check_NUM]) + +# define PyBobLearnSquareError_Type (*(PyBobLearnSquareError_Type_TYPE *)PyXbobLearnMLP_API[PyBobLearnSquareError_Type_NUM]) + +# define PyBobLearnSquareError_Check (*(PyBobLearnSquareError_Check_RET (*)PyBobLearnSquareError_Check_PROTO) PyXbobLearnMLP_API[PyBobLearnSquareError_Check_NUM]) + +# define PyBobLearnCrossEntropyLoss_Type (*(PyBobLearnCrossEntropyLoss_Type_TYPE *)PyXbobLearnMLP_API[PyBobLearnCrossEntropyLoss_Type_NUM]) + +# define PyBobLearnCrossEntropyLoss_Check (*(PyBobLearnCrossEntropyLoss_Check_RET (*)PyBobLearnCrossEntropyLoss_Check_PROTO) PyXbobLearnMLP_API[PyBobLearnCrossEntropyLoss_Check_NUM]) + # if !defined(NO_IMPORT_ARRAY) /** diff --git a/xbob/learn/mlp/main.cpp b/xbob/learn/mlp/main.cpp index 0982f400eb4736573ed01c4c4982e4cea121c4b2..e7d647f1aed990239897f68fd3500539d2dde96a 100644 --- a/xbob/learn/mlp/main.cpp +++ b/xbob/learn/mlp/main.cpp @@ -41,6 +41,15 @@ static PyObject* create_module (void) { PyBobLearnMLPMachine_Type.tp_new = PyType_GenericNew; if (PyType_Ready(&PyBobLearnMLPMachine_Type) < 0) return 0; + PyBobLearnCost_Type.tp_new = PyType_GenericNew; + if (PyType_Ready(&PyBobLearnCost_Type) < 0) return 0; + + PyBobLearnSquareError_Type.tp_base = &PyBobLearnCost_Type; + if (PyType_Ready(&PyBobLearnSquareError_Type) < 0) return 0; + + PyBobLearnCrossEntropyLoss_Type.tp_new = &PyBobLearnCost_Type; + if (PyType_Ready(&PyBobLearnCrossEntropyLoss_Type) < 0) return 0; + # if PY_VERSION_HEX >= 0x03000000 PyObject* m = PyModule_Create(&module_definition); # else @@ -57,6 +66,15 @@ static PyObject* create_module (void) { Py_INCREF(&PyBobLearnMLPMachine_Type); if (PyModule_AddObject(m, "Machine", (PyObject *)&PyBobLearnMLPMachine_Type) < 0) return 0; + Py_INCREF(&PyBobLearnCost_Type); + if (PyModule_AddObject(m, "Cost", (PyObject *)&PyBobLearnCost_Type) < 0) return 0; + + Py_INCREF(&PyBobLearnSquareError_Type); + if (PyModule_AddObject(m, "SquareError", (PyObject *)&PyBobLearnSquareError_Type) < 0) return 0; + + Py_INCREF(&PyBobLearnCrossEntropyLoss_Type); + if (PyModule_AddObject(m, "CrossEntropyLoss", (PyObject *)&PyBobLearnCrossEntropyLoss_Type) < 0) return 0; + static void* PyXbobLearnMLP_API[PyXbobLearnMLP_API_pointers]; /* exhaustive list of C APIs */ @@ -77,6 +95,22 @@ static PyObject* create_module (void) { PyXbobLearnMLP_API[PyBobLearnMLPMachine_NewFromSize_NUM] = (void *)&PyBobLearnMLPMachine_NewFromSize; + /************************************ + * Bindings for xbob.learn.mlp.Cost * + ************************************/ + + PyXbobLearnMLP_API[PyBobLearnCost_Type_NUM] = (void *)&PyBobLearnCost_Type; + + PyXbobLearnMLP_API[PyBobLearnCost_Check_NUM] = (void *)&PyBobLearnCost_Check; + + PyXbobLearnMLP_API[PyBobLearnSquareError_Type_NUM] = (void *)&PyBobLearnSquareError_Type; + + PyXbobLearnMLP_API[PyBobLearnSquareError_Check_NUM] = (void *)&PyBobLearnSquareError_Check; + + PyXbobLearnMLP_API[PyBobLearnCrossEntropyLoss_Type_NUM] = (void *)&PyBobLearnCrossEntropyLoss_Type; + + PyXbobLearnMLP_API[PyBobLearnCrossEntropyLoss_Check_NUM] = (void *)&PyBobLearnCrossEntropyLoss_Check; + #if PY_VERSION_HEX >= 0x02070000 /* defines the PyCapsule */ diff --git a/xbob/learn/mlp/test_backprop.py b/xbob/learn/mlp/test_backprop.py new file mode 100644 index 0000000000000000000000000000000000000000..3673e1281b20aa57bad9ae10adcf07c68066cd51 --- /dev/null +++ b/xbob/learn/mlp/test_backprop.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Andre Anjos <andre.anjos@idiap.ch> +# Tue Jul 19 09:47:23 2011 +0200 +# +# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland + +"""Tests for BackProp MLP training. +""" + +import numpy + +from .. import MLPBaseTrainer, MLPBackPropTrainer, CrossEntropyLoss, SquareError +from ...machine import HyperbolicTangentActivation, LogisticActivation, IdentityActivation, MLP + +class PythonBackProp(MLPBaseTrainer): + """A simple version of the vanilla BackProp algorithm written in Python + """ + + def __init__(self, batch_size, cost, machine, train_biases, + learning_rate=0.1, momentum=0.0): + + super(PythonBackProp, self).__init__(batch_size, cost, machine, train_biases) + self.previous_derivatives = [numpy.zeros(k.shape, dtype=float) for k in machine.weights] + self.previous_bias_derivatives = [numpy.zeros(k.shape, dtype=float) for k in machine.biases] + + self.learning_rate = learning_rate + self.momentum = momentum + + def train(self, machine, input, target): + + # Run dataset through + prev_cost = self.cost(machine, input, target) + self.backward_step(machine, input, target) + + # Updates weights and biases + new_weights = list(machine.weights) + for k,W in enumerate(new_weights): + new_weights[k] = W - (((1-self.momentum)*self.learning_rate*self.derivatives[k]) + (self.momentum*self.previous_derivatives[k])) + self.previous_derivatives = [self.learning_rate*k for k in self.derivatives] + machine.weights = new_weights + + if self.train_biases: + new_biases = list(machine.biases) + for k,B in enumerate(new_biases): + new_biases[k] = B - (((1-self.momentum)*self.learning_rate*self.bias_derivatives[k]) + (self.momentum*self.previous_bias_derivatives[k])) + self.previous_bias_derivatives = [self.learning_rate*k for k in self.bias_derivatives] + machine.biases = new_biases + + return prev_cost, self.cost(machine, input, target) + +def check_training(machine, cost, bias_training, batch_size, learning_rate, + momentum): + + python_machine = MLP(machine) + + X = numpy.random.rand(batch_size, machine.weights[0].shape[0]) + T = numpy.zeros((batch_size, machine.weights[-1].shape[1])) + + python_trainer = PythonBackProp(batch_size, cost, machine, bias_training, + learning_rate, momentum) + cxx_trainer = MLPBackPropTrainer(batch_size, cost, machine, bias_training) + cxx_trainer.learning_rate = learning_rate + cxx_trainer.momentum = momentum + + # checks previous state matches + for k,D in enumerate(cxx_trainer.previous_derivatives): + assert numpy.allclose(D, python_trainer.previous_derivatives[k]) + for k,D in enumerate(cxx_trainer.previous_bias_derivatives): + assert numpy.allclose(D, python_trainer.previous_bias_derivatives[k]) + for k,W in enumerate(machine.weights): + assert numpy.allclose(W, python_machine.weights[k]) + for k,B in enumerate(machine.biases): + assert numpy.allclose(B, python_machine.biases[k]) + assert numpy.alltrue(machine.input_subtract == python_machine.input_subtract) + assert numpy.alltrue(machine.input_divide == python_machine.input_divide) + + prev_cost, cost = python_trainer.train(python_machine, X, T) + assert cost <= prev_cost #this should always be true for a fixed dataset + cxx_trainer.train(machine, X, T) + + # checks each component of machine and trainer, make sure they match + for k,D in enumerate(cxx_trainer.derivatives): + assert numpy.allclose(D, python_trainer.derivatives[k]) + for k,D in enumerate(cxx_trainer.bias_derivatives): + assert numpy.allclose(D, python_trainer.bias_derivatives[k]) + for k,W in enumerate(machine.weights): + assert numpy.allclose(W, python_machine.weights[k]) + for k,B in enumerate(machine.biases): + assert numpy.allclose(B, python_machine.biases[k]) + assert numpy.alltrue(machine.input_subtract == python_machine.input_subtract) + assert numpy.alltrue(machine.input_divide == python_machine.input_divide) + +def test_2in_1out_nobias(): + + machine = MLP((2, 1)) + machine.randomize() + machine.hidden_activation = LogisticActivation() + machine.output_activation = LogisticActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = CrossEntropyLoss(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE, 0.1, 0.0) + +def test_1in_2out_nobias(): + + machine = MLP((1, 2)) + machine.randomize() + machine.hidden_activation = LogisticActivation() + machine.output_activation = LogisticActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = CrossEntropyLoss(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE, 0.1, 0.0) + +def test_2in_3_1out_nobias(): + + machine = MLP((2, 3, 1)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE, 0.1, 0.0) + +def test_100in_10_10_5out_nobias(): + + machine = MLP((100, 10, 10, 5)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE, 0.1, 0.0) + +def test_2in_3_1out(): + + machine = MLP((2, 3, 1)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE, 0.1, 0.0) + +def test_20in_10_5_3out(): + + machine = MLP((20, 10, 5, 3)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE, 0.1, 0.0) + +def test_20in_10_5_3out_with_momentum(): + + machine = MLP((20, 10, 5, 3)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE, 0.1, 0.1) diff --git a/xbob/learn/mlp/test_cost.py b/xbob/learn/mlp/test_cost.py new file mode 100644 index 0000000000000000000000000000000000000000..6d151d0eac96809821a9676ceb70b1e21b3a8388 --- /dev/null +++ b/xbob/learn/mlp/test_cost.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Andre Anjos <andre.dos.anjos@gmail.com> +# Fri 7 Jun 08:59:24 2013 + +"""Test cost functions +""" + +import numpy +import math + +from .. import SquareError, CrossEntropyLoss +from ...machine import LogisticActivation, IdentityActivation +from . import gradient + +def is_close(x, y, eps=1e-10): + return (abs(x - y) < eps) + +def rand_safe0(eps=2e-4): + return numpy.random.rand()*(1-2*eps)+eps + +def rand_safe(n, eps=2e-4): + return numpy.random.rand(n)*(1-2*eps)+eps + +def rand_safe2(n, p, eps=2e-4): + return numpy.random.rand(n,p)*(1-2*eps)+eps + +def rand_safe3(n, p, q, eps=2e-4): + return numpy.random.rand(n,p,q)*(1-2*eps)+eps + +def rand_safe4(n, p, q, r, eps=2e-4): + return numpy.random.rand(n,p,q,r)*(1-2*eps)+eps + +def test_square_error(): + + op = SquareError(IdentityActivation()) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = 0.5*math.pow(p-q,2) + assert is_close(op.f(p,q), expected), 'SquareError does not perform as expected %g != %g' % (op.f(p,q), expected) + +def test_square_error_derivative(): + + op = SquareError(IdentityActivation()) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = p-q + assert is_close(op.f_prime(p,q), expected), 'SquareError derivative does not perform as expected %g != %g' % (op.f(p,q), expected) + + # go for approximation + for p,q in zip(x,y): + absdiff = abs(op.f_prime(p,q)-gradient.estimate(op.f,p,args=(q,))) + assert absdiff < 1e-4, 'SquareError derivative and estimation do not match to 10^-4: |%g-%g| = %g' % (op.f_prime(p,q), gradient.estimate(op.f,p,args=(q,)), absdiff) + +def test_square_error_error(): + + act = LogisticActivation() + op = SquareError(act) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = p*(1-p)*(p-q) + assert is_close(op.error(p,q), expected), 'SquareError error does not perform as expected %g != %g' % (op.error(p,q), expected) + +def test_cross_entropy(): + + op = CrossEntropyLoss(LogisticActivation()) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = -q*math.log(p) - (1-q)*math.log(1-p) + assert is_close(op.f(p,q), expected), 'CrossEntropyLoss does not perform as expected %g != %g' % (op.f(p,q), expected) + +def test_cross_entropy_derivative(): + + op = CrossEntropyLoss(LogisticActivation()) + x = rand_safe(10, eps=0.2) #10 random numbers between 0 and 1 + y = rand_safe(10, eps=0.2) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = (p-q)/(p*(1-p)) + assert is_close(op.f_prime(p,q), expected), 'CrossEntropyLoss derivative does not perform as expected %g != %g' % (op.f(p,q), expected) + + # go for approximation + for p,q in zip(x,y): + reldiff = abs((op.f_prime(p,q)-gradient.estimate(op.f,p,args=(q,))) / op.f_prime(p,q)) + assert reldiff < 1e-3, 'SquareError derivative and estimation do not match to 10^-4: |%g-%g| = %g' % (op.f_prime(p,q), gradient.estimate(op.f,p,args=(q,)), reldiff) + +def test_square_error_equality(): + + op1 = SquareError(IdentityActivation()) + op2 = SquareError(IdentityActivation()) + + assert op1 == op2 + +def test_cross_entropy_equality(): + + op1 = CrossEntropyLoss(IdentityActivation()) + op2 = CrossEntropyLoss(IdentityActivation()) + + assert op1 == op2 + +def test_cross_entropy_error_with_logistic(): + + act = LogisticActivation() + op = CrossEntropyLoss(act) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = p-q + assert is_close(op.error(p,q), expected), 'CrossEntropyLoss+LogisticActivation error does not perform as expected %g != %g' % (op.error(p,q), expected) + +def test_cross_entropy_error_without_logistic(): + + act = IdentityActivation() + op = CrossEntropyLoss(act) + x = rand_safe(10) #10 random numbers between 0 and 1 + y = rand_safe(10) #10 random numbers between 0 and 1 + + # go for an exact match + for p,q in zip(x,y): + expected = (p-q)/(p*(1-p)) + assert is_close(op.error(p,q), expected), 'CrossEntropyLoss+IdentityActivation error does not perform as expected %g != %g' % (op.error(p,q), expected) + +def test_cross_entropy_activation_detection(): + + op = CrossEntropyLoss(LogisticActivation()) + assert op.logistic_activation + + op = CrossEntropyLoss(IdentityActivation()) + assert op.logistic_activation == False + +def test_1d_ndarray(): + + C = rand_safe0() + op = SquareError(IdentityActivation()) + O = rand_safe(10) #10 random numbers between 0 and 1 + T = rand_safe(10) #10 random numbers between 0 and 1 + + Y = op(O,T) + assert Y.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f = op.f(O,T) + assert Y_f.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f_prime = op.f_prime(O,T) + assert Y_f_prime.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_error = op.error(O,T) + assert Y_error.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + for k,(o,t) in enumerate(zip(O,T)): + assert is_close(op(o,t), Y[k]) + assert is_close(op.f(o,t), Y_f[k]) + assert is_close(op.f_prime(o,t), Y_f_prime[k]) + assert is_close(op.error(o,t), Y_error[k]) + +def test_2d_ndarray(): + + C = rand_safe0() + op = SquareError(IdentityActivation()) + O = rand_safe2(3,3) + T = rand_safe2(3,3) + + Y = op(O,T) + assert Y.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f = op.f(O,T) + assert Y_f.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f_prime = op.f_prime(O,T) + assert Y_f_prime.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_error = op.error(O,T) + assert Y_error.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + for k,(o,t) in enumerate(zip(O.flat,T.flat)): + assert is_close(op(o,t), Y.flat[k]) + assert is_close(op.f(o,t), Y_f.flat[k]) + assert is_close(op.f_prime(o,t), Y_f_prime.flat[k]) + assert is_close(op.error(o,t), Y_error.flat[k]) + +def test_3d_ndarray(): + + C = rand_safe0() + op = SquareError(IdentityActivation()) + O = rand_safe3(3,3,3) + T = rand_safe3(3,3,3) + + Y = op(O,T) + assert Y.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f = op.f(O,T) + assert Y_f.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f_prime = op.f_prime(O,T) + assert Y_f_prime.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_error = op.error(O,T) + assert Y_error.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + for k,(o,t) in enumerate(zip(O.flat,T.flat)): + assert is_close(op(o,t), Y.flat[k]) + assert is_close(op.f(o,t), Y_f.flat[k]) + assert is_close(op.f_prime(o,t), Y_f_prime.flat[k]) + assert is_close(op.error(o,t), Y_error.flat[k]) + +def test_4d_ndarray(): + + C = rand_safe0() + op = SquareError(IdentityActivation()) + O = rand_safe4(2,2,2,2) + T = rand_safe4(2,2,2,2) + + Y = op(O,T) + assert Y.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f = op.f(O,T) + assert Y_f.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_f_prime = op.f_prime(O,T) + assert Y_f_prime.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + Y_error = op.error(O,T) + assert Y_error.shape == O.shape + assert Y.dtype == numpy.dtype(float) + + for k,(o,t) in enumerate(zip(O.flat,T.flat)): + assert is_close(op(o,t), Y.flat[k]) + assert is_close(op.f(o,t), Y_f.flat[k]) + assert is_close(op.f_prime(o,t), Y_f_prime.flat[k]) + assert is_close(op.error(o,t), Y_error.flat[k]) diff --git a/xbob/learn/mlp/test_rprop.py b/xbob/learn/mlp/test_rprop.py new file mode 100644 index 0000000000000000000000000000000000000000..16aae8a1beb42bc9c12f4ff862605b7a8dfde7c7 --- /dev/null +++ b/xbob/learn/mlp/test_rprop.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Andre Anjos <andre.anjos@idiap.ch> +# Thu Jul 14 18:53:07 2011 +0200 +# +# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland + +"""Tests for RProp MLP training. +""" + +import numpy + +from .. import MLPBaseTrainer, MLPRPropTrainer, CrossEntropyLoss, SquareError +from ...machine import HyperbolicTangentActivation, LogisticActivation, IdentityActivation, MLP + +def sign(x): + """A handy sign function""" + if (x == 0): return 0 + if (x < 0) : return -1 + return +1 + +class PythonRProp(MLPBaseTrainer): + """A simple version of the R-Prop algorithm written in Python + """ + + def __init__(self, batch_size, cost, machine, train_biases): + + super(PythonRProp, self).__init__(batch_size, cost, machine, train_biases) + + # some constants for RProp + self.DELTA0 = 0.1 + self.DELTA_MIN = 1e-6 + self.DELTA_MAX = 50 + self.ETA_MINUS = 0.5 + self.ETA_PLUS = 1.2 + + # initial derivatives + self.previous_derivatives = [numpy.zeros(k.shape, dtype=float) for k in machine.weights] + self.previous_bias_derivatives = [numpy.zeros(k.shape, dtype=float) for k in machine.biases] + + # initial deltas + self.deltas = [self.DELTA0*numpy.ones(k.shape, dtype=float) for k in machine.weights] + self.bias_deltas = [self.DELTA0*numpy.ones(k.shape, dtype=float) for k in machine.biases] + + def train(self, machine, input, target): + + # Run dataset through + prev_cost = self.cost(machine, input, target) + self.backward_step(machine, input, target) + + # Updates weights and biases + weight_updates = [i * j for (i,j) in zip(self.previous_derivatives, self.derivatives)] + + # Iterate over each weight and bias and see what to do: + new_weights = machine.weights + for k, up in enumerate(weight_updates): + for i in range(up.shape[0]): + for j in range(up.shape[1]): + if up[i,j] > 0: + self.deltas[k][i,j] = min(self.deltas[k][i,j]*ETA_PLUS, DELTA_MAX) + new_weights[k][i,j] -= sign(self.derivatives[k][i,j]) * self.deltas[k][i,j] + self.previous_derivatives[k][i,j] = self.derivatives[k][i,j] + elif up[i,j] < 0: + self.deltas[k][i,j] = max(self.deltas[k][i,j]*ETA_MINUS, DELTA_MIN) + new_weights[k][i,j] -= self.deltas[k][i,j] + self.previous_derivatives[k][i,j] = 0 + else: + new_weights[k][i,j] -= sign(self.derivatives[k][i,j]) * self.deltas[k][i,j] + self.previous_derivatives[k][i,j] = self.derivatives[k][i,j] + machine.weights = new_weights + + if self.train_biases: + bias_updates = [i * j for (i,j) in zip(self.previous_bias_derivatives, self.bias_derivatives)] + new_biases = machine.biases + for k, up in enumerate(bias_updates): + for i in range(up.shape[0]): + if up[i] > 0: + self.bias_deltas[k][i] = min(self.bias_deltas[k][i]*ETA_PLUS, DELTA_MAX) + new_biases[k][i] -= sign(self.bias_derivatives[k][i]) * self.bias_deltas[k][i] + self.previous_bias_derivatives[k][i] = self.bias_derivatives[k][i] + elif up[i] < 0: + self.bias_deltas[k][i] = max(self.bias_deltas[k][i]*ETA_MINUS, DELTA_MIN) + new_biases[k][i] -= self.bias_deltas[k][i] + self.previous_bias_derivatives[k][i] = 0 + else: + new_biases[k][i] -= sign(self.bias_derivatives[k][i]) * self.bias_deltas[k][i] + self.previous_bias_derivatives[k][i] = self.bias_derivatives[k][i] + machine.biases = new_biases + + else: + machine.biases = 0 + + return prev_cost, self.cost(machine, input, target) + +def check_training(machine, cost, bias_training, batch_size): + + python_machine = MLP(machine) + + X = numpy.random.rand(batch_size, machine.weights[0].shape[0]) + T = numpy.zeros((batch_size, machine.weights[-1].shape[1])) + + python_trainer = PythonRProp(batch_size, cost, machine, bias_training) + cxx_trainer = MLPRPropTrainer(batch_size, cost, machine, bias_training) + + # checks previous state matches + for k,D in enumerate(cxx_trainer.deltas): + assert numpy.allclose(D, python_trainer.deltas[k]) + for k,D in enumerate(cxx_trainer.bias_deltas): + assert numpy.allclose(D, python_trainer.bias_deltas[k]) + for k,D in enumerate(cxx_trainer.previous_derivatives): + assert numpy.allclose(D, python_trainer.previous_derivatives[k]) + for k,D in enumerate(cxx_trainer.previous_bias_derivatives): + assert numpy.allclose(D, python_trainer.previous_bias_derivatives[k]) + for k,W in enumerate(machine.weights): + assert numpy.allclose(W, python_machine.weights[k]) + for k,B in enumerate(machine.biases): + assert numpy.allclose(B, python_machine.biases[k]) + assert numpy.alltrue(machine.input_subtract == python_machine.input_subtract) + assert numpy.alltrue(machine.input_divide == python_machine.input_divide) + + prev_cost, cost = python_trainer.train(python_machine, X, T) + #assert cost <= prev_cost #not true for R-Prop + cxx_trainer.train(machine, X, T) + + # checks each component of machine and trainer, make sure they match + for k,D in enumerate(cxx_trainer.deltas): + assert numpy.allclose(D, python_trainer.deltas[k]) + for k,D in enumerate(cxx_trainer.bias_deltas): + assert numpy.allclose(D, python_trainer.bias_deltas[k]) + for k,D in enumerate(cxx_trainer.derivatives): + assert numpy.allclose(D, python_trainer.derivatives[k]) + for k,D in enumerate(cxx_trainer.bias_derivatives): + assert numpy.allclose(D, python_trainer.bias_derivatives[k]) + for k,W in enumerate(machine.weights): + assert numpy.allclose(W, python_machine.weights[k]) + for k,B in enumerate(machine.biases): + assert numpy.allclose(B, python_machine.biases[k]) + assert numpy.alltrue(machine.input_subtract == python_machine.input_subtract) + assert numpy.alltrue(machine.input_divide == python_machine.input_divide) + +def test_2in_1out_nobias(): + + machine = MLP((2, 1)) + machine.randomize() + machine.hidden_activation = LogisticActivation() + machine.output_activation = LogisticActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = CrossEntropyLoss(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE) + +def test_1in_2out_nobias(): + + machine = MLP((1, 2)) + machine.randomize() + machine.hidden_activation = LogisticActivation() + machine.output_activation = LogisticActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = CrossEntropyLoss(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE) + +def test_2in_3_1out_nobias(): + + machine = MLP((2, 3, 1)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE) + +def test_100in_10_10_5out_nobias(): + + machine = MLP((100, 10, 10, 5)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + machine.biases = 0 + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, False, BATCH_SIZE) + +def test_2in_3_1out(): + + machine = MLP((2, 3, 1)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE) + +def test_20in_10_5_3out(): + + machine = MLP((20, 10, 5, 3)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE) + +def test_20in_10_5_3out_with_momentum(): + + machine = MLP((20, 10, 5, 3)) + machine.randomize() + machine.hidden_activation = HyperbolicTangentActivation() + machine.output_activation = HyperbolicTangentActivation() + + BATCH_SIZE = 10 + cost = SquareError(machine.output_activation) + + for k in range(10): + check_training(machine, cost, True, BATCH_SIZE) diff --git a/xbob/learn/mlp/test_shuffler.py b/xbob/learn/mlp/test_shuffler.py new file mode 100644 index 0000000000000000000000000000000000000000..90de091bc471b2c6ac99ce24a19ecfe213da6dc4 --- /dev/null +++ b/xbob/learn/mlp/test_shuffler.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Andre Anjos <andre.anjos@idiap.ch> +# Thu Jul 14 12:51:05 2011 +0200 +# +# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland + +"""All kinds of tests on the DataShuffler class +""" + +import os, sys +import unittest +import time +import bob +import numpy + +class DataShufflerTest(unittest.TestCase): + """Performs various shuffer tests.""" + + def setUp(self): + + self.set1 = [] + self.data1 = numpy.array([1, 0, 0], dtype='float64') + self.target1 = numpy.array([1], dtype='float64') + self.set1.append(self.data1) + self.set1.append(self.data1*2) + self.set1.append(self.data1*3) + self.set1 = numpy.array(self.set1) + + self.set2 = [] + self.data2 = numpy.array([0, 1, 0], dtype='float64') + self.target2 = numpy.array([2], dtype='float64') + self.set2.append(self.data2) + self.set2.append(self.data2*2) + self.set2.append(self.data2*3) + self.set2 = numpy.array(self.set2) + + self.set3 = [] + self.data3 = numpy.array([0, 0, 1], dtype='float64') + self.target3 = numpy.array([3], dtype='float64') + self.set3.append(self.data3) + self.set3.append(self.data3*2) + self.set3.append(self.data3*3) + self.set3 = numpy.array(self.set3) + + def test01_Initialization(self): + + # Test if we can correctly initialize the shuffler + + shuffle = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + + self.assertEqual(shuffle.data_width, 3) + self.assertEqual(shuffle.target_width, 1) + + def test01a_InitializationWithArrays(self): + + # Test if we can initialize the shuffler with simple arrays + data = [ + numpy.zeros((10,2), 'float64'), + numpy.ones ((10,2), 'float64'), + ] + + target = [ + numpy.array([+1,+1], 'float64'), + numpy.array([-1,-1], 'float64'), + ] + + shuffle = bob.trainer.DataShuffler(data, target) + self.assertEqual(shuffle.data_width, 2) + self.assertEqual(shuffle.target_width, 2) + + def test02_Drawing(self): + + # Tests that drawing works in a particular way + + N = 6 #multiple of number of classes + + shuffle = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + + [data, target] = shuffle(N) + + self.assertEqual(data.shape, (N, shuffle.data_width)) + self.assertEqual(target.shape, (N, shuffle.target_width)) + + # Finally, we also test if the data is well separated. We have to have 2 + # of each class since N is multiple of 9 + class1_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data1) != 0]) + self.assertEqual(class1_count, 2) + class2_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data2) != 0]) + self.assertEqual(class2_count, 2) + class3_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data3) != 0]) + self.assertEqual(class3_count, 2) + + N = 28 #not multiple anymore + + [data, target] = shuffle(N) + + self.assertEqual(data.shape, (N, shuffle.data_width)) + self.assertEqual(target.shape, (N, shuffle.target_width)) + + # Finally, we also test if the data is well separated. We have to have 2 + # of each class since N is multiple of 9 + class1_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data1) != 0]) + self.assertEqual(class1_count, 10) + class2_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data2) != 0]) + self.assertEqual(class2_count, 9) + class3_count = len([data[i,:] for i in range(N) \ + if numpy.dot(data[i,:], self.data3) != 0]) + self.assertEqual(class3_count, 9) + + def test03_Seeding(self): + + # Test if we can correctly set the seed and that this act is effective + + # First test that, by making two shufflers, we get different replies + shuffle1 = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + shuffle2 = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + + N = 100 + + # This will use the current time as seed. + [data1, target1] = shuffle1(N) + time.sleep(1) # Sleeps 1 second to make sure we get different seeds + [data2, target2] = shuffle2(N) + + self.assertFalse( (data1 == data2).all() ) + # Note targets will always be the same given N because of the internal + # design of the C++ DataShuffler. + + # Now show that by drawing twice does not get the same replies! + # This indicates that the internal random generator is updated at each draw + # as one expects. + [data1_2, target1_2] = shuffle1(N) + + self.assertFalse( (data1 == data1_2).all() ) + + # Finally show that, by setting the seed, we can get the same results + shuffle1 = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + shuffle2 = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + + # A great seed if you are working in python (the microseconds) + rng1 = bob.core.random.mt19937(32) + rng2 = bob.core.random.mt19937(32) + + [data1, target1] = shuffle1(rng1, N) + [data2, target2] = shuffle2(rng2, N) + + self.assertTrue( (data1 == data2).all() ) + + def test04_Normalization(self): + + # Tests that the shuffler can get the std. normalization right + # Compares results to numpy + shuffle = bob.trainer.DataShuffler([self.set1, self.set2, self.set3], + [self.target1, self.target2, self.target3]) + + npy = numpy.array([[1,0,0], [2,0,0], [3,0,0], + [0,1,0], [0,2,0], [0,3,0], + [0,0,1], [0,0,2], [0,0,3]], 'float64') + precalc_mean = numpy.array(numpy.mean(npy,0)) + precalc_stddev = numpy.array(numpy.std(npy,0, ddof=1)) + [mean, stddev] = shuffle.stdnorm() + + self.assertTrue( (mean == precalc_mean).all() ) + self.assertTrue( (stddev == precalc_stddev).all() ) + + # Now we set the stdnorm flag on and expect data + self.assertFalse( shuffle.auto_stdnorm ) + shuffle.auto_stdnorm = True + self.assertTrue( shuffle.auto_stdnorm ) + + [data, target] = shuffle(10000) + + # Makes sure the data is approximately zero mean and has std.dev. ~ 1 + # Note: Results will not be of a better precision because we only have 9 + # samples in the Shuffler... + self.assertEqual( round(data.mean()), 0 ) + self.assertEqual( round(numpy.std(data, ddof=1)), 1 ) + + def test05_NormalizationBig(self): + + rng = bob.core.random.mt19937() + + set1 = [] + draw25 = bob.core.random.normal_float64(mean=2.0, sigma=5.0) + for i in range(10000): + set1.append(numpy.array([draw25(rng)], dtype='float64')) + set1 = numpy.array(set1) + target1 = numpy.array([1], dtype='float64') + + set2 = [] + draw32 = bob.core.random.normal_float64(mean=3.0, sigma=2.0) + for i in range(10000): + set2.append(numpy.array([draw32(rng)], dtype='float64')) + set2 = numpy.array(set2) + target2 = numpy.array([2], dtype='float64') + + shuffle = bob.trainer.DataShuffler([set1, set2], [target1, target2]) + shuffle.auto_stdnorm = True + prev_mean, prev_stddev = shuffle.stdnorm() + + [data, target] = shuffle(200000) + self.assertTrue( abs(data.mean()) < 1e-1 ) + self.assertTrue( abs(numpy.std(data, ddof=1) - 1.0) < 1e-1 ) + + #note that resetting auto_stdnorm will make the whole go back to normal, + #but the std normalization values remain the same... + shuffle.auto_stdnorm = False + back_mean, back_stddev = shuffle.stdnorm() + self.assertTrue( abs( (back_mean - prev_mean ).sum() ) < 1e-10) + self.assertTrue( abs( (back_stddev - prev_stddev).sum() ) < 1e-10) diff --git a/xbob/learn/mlp/test_utils.py b/xbob/learn/mlp/test_utils.py index 4c7b56a1fd7f0ba8a3aef3004d56ba9c7b6af212..4286c5edbdfb4461deaa861e63abef4ad5a1a3f5 100644 --- a/xbob/learn/mlp/test_utils.py +++ b/xbob/learn/mlp/test_utils.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: set fileencoding=utf-8 : # Andre Anjos <andre.anjos@idiap.ch> -# Thu 13 Jun 2013 15:54:19 CEST +# Thu 13 Jun 2013 15:54:19 CEST """Pythonic implementations of Multi-Layer Perceptrons for code testing """ @@ -14,7 +14,7 @@ class Machine: def __init__(self, bias, weights, hidden_activation, output_activation): """Initializes the MLP with a number of inputs and outputs. Weights are initialized randomly with the specified seed. - + Keyword parameters: bias @@ -71,7 +71,7 @@ class Machine: Returns the outputs of the network for each row in X. Accumulates hidden layer outputs and activations (for backward step). At the end of this procedure: - + self.a Input, including the bias term for all layers. 1 example per row. Bias = first column. @@ -96,3 +96,164 @@ class Machine: self.a.append(self.output_activation(self.z[-1])) return self.a[-1] + +class TrainableMachine(Machine): + """Represents a Multi-Layer Perceptron Machine with a single hidden layer""" + + def backward(self, b): + """Executes the backward step for training. + + In this phase, we calculate the error on the output layer and then use + back-propagation to estimate the error on the hidden layer. We then use + this estimated error to calculate the differences between what the layer + output and the expected value. + + Keyword attributes: + + b + This is the error back-propagated through the last neuron by any of the + available :py:class:`bob.trainer.Cost` functors. Every row in b matches + one example. + + self.d + + The updates for each synapse are simply the multiplication of the a's and + errors's on each end. One important remark to get this computation right: + one must generate a weight change matrix that is of the same size as the + weight matrix. If that is not the case, something is wrong on the logic + + self.d[L] = self.a[L-1] * self.b[L].T / number-of-examples + + N.B.: This **is** a matrix multiplication, despite the fact that ``a`` + and ``b`` are vectors. + + Returns the derivative estimations for every weight in the network + """ + + self.b = [b] + + # calculate the estimated errors on each layer ($\delta$) + for k,w in reversed(list(enumerate(self.weights[1:]))): + if self.has_bias: + delta = numpy.dot(self.b[0], w[1:].T) + act = self.a[k+1][:,1:] + else: + delta = numpy.dot(self.b[0], w.T) + act = self.a[k+1] + self.b.insert(0, delta*self.hidden_activation.f_prime_from_f(act)) + + self.d = [] + for a,b in zip(self.a[:-1], self.b): + self.d.append(numpy.dot(a.T, b) / len(b)) + + return self.d + + def cost(self, cost_object, target): + """Calculates the cost given the target. + + This method must be called after `forward` has been called. + """ + + return cost_object.f(self.a[-1], target).mean(axis=0).sum() + + def unroll(self): + """Unroll its own parameters so it becomes a linear vector""" + + return numpy.hstack([k.flat for k in self.weights]) + + def roll(self, v): + """Roll-up the parameters again, undoes ``unroll()`` above.""" + + retval = [] + marks = list(numpy.cumsum([k.size for k in self.weights])) + marks.insert(0, 0) + + for k,w in enumerate(self.weights): + retval.append(v[marks[k]:marks[k+1]].reshape(w.shape)) + + return retval + +def estimate(f, x, epsilon=1e-4, args=()): + """Estimate the gradient for a given callable f + + Suppose you have a function :math:`f'(x)` that purportedly computes + :math`\frac{\partial f(x)}{\partial x}`. You'd like to check if :math:`f'(x)` + is outputting correct derivative values. You can then use this function to + estimate the gradient around a point and compare it to the output of + :math:`f'(x)`. The estimation can have a precision of up to a few decimal + houses. + + Imagine a random value for :math:`x`, called :math:`x_t` (for test). Now + imagine you modify one of the elements in :math:`x_t` so that + :math:`x_{t+\epsilon}` has that element added with a small (positive) value + :math:`\epsilon` and :math:`x_{t-\epsilon}` has the same value subtracted. + + In this case, one can use a truncated Taylor expansion of the derivative + to calculate the approximate supposed value: + + .. math:: + f'(x_t) \sim \frac{f(x_{t+\epsilon}) - f(x_{t-\epsilon})}{2\epsilon} + + The degree to which these two values should approximate each other will + depend on the details of :math:`f(x)`. But assuming :math:`\epsilon = + 10^{-4}`, you’ll usually find that the left- and right-hand sides of the + above will agree to at least 4 significant digits (and often many more). + + Keyword arguments: + + f + The function which you'd like to have the gradient estimated for. + + x + The input to ``f``. This must be the first parameter ``f`` receives. If + that is not the case, you must write a wrapper around ``f`` so it does the + parameter inversion and provide that wrapper instead. + + If f expects a multi-dimensional array, than this entry should be a + :py:class:`numpy.ndarray` with as many dimensions as required for f. + + precision + The epsilon step + + args (optional) + Extra arguments (a tuple) to ``f`` + + This function returns the estimated value for :math:`f'(x)` given ``x``. + + .. note:: + + Gradient estimation is a powerful tool for testing if a function is + correctly computing the derivative of another function, but can be quite + slow. It therefore is not a good replacement for writing specific code that + can compute the derivative of ``f``. + """ + epsilon = 1e-4 + + if isinstance(x, numpy.ndarray): + + retval = numpy.ndarray(x.shape, dtype=x.dtype) + for k in range(x.size): + xt_plus = x.copy() + xt_plus.flat[k] += epsilon + xt_minus = x.copy() + xt_minus.flat[k] -= epsilon + retval.flat[k] = (f(xt_plus,*args) - f(xt_minus,*args)) / (2*epsilon) + return retval + + else: # x is scalar + return (f(x+epsilon, *args) - f(x-epsilon, *args)) / (2*epsilon) + +def estimate_for_machine(machine, X, cost, target): + + def func(weights): + old = machine.weights + machine.weights = machine.roll(weights) + retval = cost.f(machine.forward(X), target).mean(axis=0).sum() + machine.weights = old + return retval + + weights = machine.unroll() + est = estimate(func, weights) + machine.weights = machine.roll(weights) + + return machine.roll(est) #format using the machine's organization