Commit 757dcda3 authored by André Anjos's avatar André Anjos 💬
Browse files

Start to implement FrameExtractor

parent 3c68fb9f
......@@ -28,6 +28,7 @@ setup(
install_requires=[
'setuptools',
'xbob.blitz',
'xbob.io',
],
nameapace_packages=[
......
/**
* @author Andre Anjos <andre.anjos@idiap.ch>
* @date Thu 6 Feb 09:00:05 2014
*
* @brief Bindings to the base class bob::ap::FrameExtractor
*/
#include <xbob.blitz/cppapi.h>
#include <xbob.blitz/cleanup.h>
#include <bob/ap/FrameExtractor.h>
PyDoc_STRVAR(s_frame_extractor_str, XBOB_EXT_MODULE_PREFIX ".FrameExtractor");
PyDoc_STRVAR(s_frame_extractor_doc,
"FrameExtractor(sampling_frequency, [win_length_ms=20., [win_shift_ms=10.]]) -> new FrameExtractor\n\
FrameExtractor(other) -> new FrameExtractor\n\
\n\
This class is a base type for classes that perform audio processing on a\n\
frame basis. It *can* be instantiated from Python, but not very useful by\n\
itself.\n\
\n\
You can instantiate objects of this class by passing a set of construction\n\
parameters or another object of which the base type is ``FrameExtractor``.\n\
\n\
Parameters:\n\
\n\
sampling_frequency\n\
[float] the sampling frequency/frequency rate\n\
\n\
win_length_ms\n\
[float] the window length in miliseconds\n\
\n\
win_shift_ms\n\
[float] the window shift in miliseconds\n\
\n\
other\n\
[FrameExtractor] an object of which is or inherits from a FrameExtractor\n\
that will be deep-copied into a new instance.\n\
\n\
"
);
/**
* Represents either an FrameExtractor
*/
typedef struct {
PyObject_HEAD
bob::ap::FrameExtractor* cxx;
} PyBobApFrameExtractorObject;
extern PyTypeObject PyBobApFrameExtractor_Type; //forward declaration
int PyBobApFrameExtractor_Check(PyObject* o) {
return PyObject_IsInstance(o, reinterpret_cast<PyObject*>(&PyBobApFrameExtractor_Type));
}
static void PyBobApFrameExtractor_Delete (PyBobApFrameExtractorObject* o) {
delete o->cxx;
Py_TYPE(o)->tp_free((PyObject*)o);
}
static int PyBobApFrameExtractor_InitCopy
(PyBobApFrameExtractorObject* self, PyObject* args, PyObject* kwds) {
/* Parses input arguments in a single shot */
static const char* const_kwlist[] = {"sampling_frequency, ", 0};
static char** kwlist = const_cast<char**>(const_kwlist);
PyObject* other = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!", kwlist,
&PyBobApFrameExtractor_Type, &other)) return -1;
auto copy = reinterpret_cast<PyBobApFrameExtractorObject*>(other);
try {
self->cxx = new bob::ap::FrameExtractor(*(copy->cxx));
if (!self->cxx) {
PyErr_Format(PyExc_MemoryError, "cannot create new object of type `%s' - no more memory", Py_TYPE(self)->tp_name);
return -1;
}
}
catch (std::exception& ex) {
PyErr_SetString(PyExc_RuntimeError, ex.what());
return -1;
}
catch (...) {
PyErr_Format(PyExc_RuntimeError, "cannot create new object of type `%s' - unknown exception thrown", Py_TYPE(self)->tp_name);
return -1;
}
return 0;
}
static int PyBobApFrameExtractor_InitParameters(PyBobApFrameExtractorObject* self,
PyObject *args, PyObject* kwds) {
/* Parses input arguments in a single shot */
static const char* const_kwlist[] = {
"sampling_frequency",
"win_length_ms",
"win_shift_ms",
0};
static char** kwlist = const_cast<char**>(const_kwlist);
double sampling_frequency = 0.;
double win_length_ms = 20.;
double win_shift_ms = 10.;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "d|dd", kwlist, &length)) return -1;
try {
self->cxx = new bob::ap::FrameExtractor(sampling_frequency, win_length_ms, win_shift_ms);
if (!self->cxx) {
PyErr_Format(PyExc_MemoryError, "cannot create new object of type `%s' - no more memory", Py_TYPE(self)->tp_name);
return -1;
}
}
catch (std::exception& ex) {
PyErr_SetString(PyExc_RuntimeError, ex.what());
return -1;
}
catch (...) {
PyErr_Format(PyExc_RuntimeError, "cannot create new object of type `%s' - unknown exception thrown", Py_TYPE(self)->tp_name);
return -1;
}
return 0; ///< SUCCESS
}
static int PyBobApFrameExtractor_Init(PyBobApFrameExtractorObject* self,
PyObject* args, PyObject* kwds) {
Py_ssize_t nargs = (args?PyTuple_Size(args):0) + (kwds?PyDict_Size(kwds):0);
switch (nargs) {
case 1:
{
PyObject* arg = 0; ///< borrowed (don't delete)
if (PyTuple_Size(args)) arg = PyTuple_GET_ITEM(args, 0);
else {
PyObject* tmp = PyDict_Values(kwds);
auto tmp_ = make_safe(tmp);
arg = PyList_GET_ITEM(tmp, 0);
}
if (PyBobApFrameExtractor_Check(arg)) {
return PyBobApFrameExtractor_InitCopy(self, args, kwds);
}
else {
return PyBobApFrameExtractor_InitParameters(self, args, kwds);
}
PyErr_Format(PyExc_TypeError, "cannot initialize `%s' with `%s' (see help)", Py_TYPE(self)->tp_name, Py_TYPE(arg)->tp_name);
}
break;
default:
return PyBobApFrameExtractor_InitParameters(self, args, kwds);
}
return -1;
}
static PyObject* PyBobApFrameExtractor_Repr(PyBobApFrameExtractorObject* self) {
return
# if PY_VERSION_HEX >= 0x03000000
PyUnicode_FromFormat
# else
PyString_FromFormat
# endif
("%s(sampling_frequency=%f, win_length_ms=%f, win_shift_ms=%f)", Py_TYPE(self)->tp_name, self->cxx->getSamplingFrequency(), self->cxx->getWinLengthMs(), self->cxx->getWinShiftMs());
}
static PyObject* PyBobApFrameExtractor_RichCompare (PyBobApFrameExtractorObject* self,
PyObject* other, int op) {
if (!PyBobApFrameExtractor_Check(other)) {
PyErr_Format(PyExc_TypeError, "cannot compare `%s' with `%s'",
Py_TYPE(self)->tp_name, Py_TYPE(other)->tp_name);
return 0;
}
auto other_ = reinterpret_cast<PyBobApFrameExtractorObject*>(other);
switch (op) {
case Py_EQ:
if (self->cxx->operator==(*other_->cxx)) Py_RETURN_TRUE;
Py_RETURN_FALSE;
break;
case Py_NE:
if (self->cxx->operator!=(*other_->cxx)) Py_RETURN_TRUE;
Py_RETURN_FALSE;
break;
default:
Py_INCREF(Py_NotImplemented);
return Py_NotImplemented;
}
}
PyDoc_STRVAR(s_sampling_frequency_str, "sampling_frequency");
PyDoc_STRVAR(s_sampling_frequency_doc,
"The sampling frequency/frequency rate"
);
static PyObject* PyBobApFrameExtractor_GetSamplingFrequency
(PyBobApFrameExtractorObject* self, void* /*closure*/) {
return Py_BuildValue("d", self->cxx->getSamplingFrequency());
}
static int PyBobApFrameExtractor_SetSamplingFrequency
(PyBobApFrameExtractorObject* self, PyObject* o, void* /*closure*/) {
if (!PyNumber_Check(o)) {
PyErr_Format(PyExc_TypeError, "`%s' sampling frequency can only be set using a number, not `%s'", Py_TYPE(self)->tp_name, Py_TYPE(o)->tp_name);
return -1;
}
double d = PyFloat_AsFloat(o);
if (PyErr_Occurred()) return -1;
try {
self->cxx->setSamplingFrequency(d);
}
catch (std::exception& ex) {
PyErr_SetString(PyExc_RuntimeError, ex.what());
return -1;
}
catch (...) {
PyErr_Format(PyExc_RuntimeError, "cannot reset `sampling_frequency' of %s: unknown exception caught", Py_TYPE(self)->tp_name);
return -1;
}
return 0;
}
static PyGetSetDef PyBobApFrameExtractor_getseters[] = {
{
s_sampling_frequency_str,
(getter)PyBobApFrameExtractor_GetSamplingFrequency,
(setter)PyBobApFrameExtractor_SetSamplingFrequency,
s_sampling_frequency_doc,
0
},
{0} /* Sentinel */
};
PyDoc_STRVAR(s_shape_str, "get_shape");
PyDoc_STRVAR(s_shape_doc,
"Computes the shape of the output features, given the size of an input\n\
array or an input array.\n\
");
static PyObject* PyBobApFrameExtractor_GetShape
(PyBobApFrameExtractorObject* self, void* /*closure*/) {
return Py_BuildValue("(n)", self->cxx->getLength());
}
PyTypeObject PyBobApFrameExtractor_Type = {
PyVarObject_HEAD_INIT(0, 0)
s_frame_extractor_str, /*tp_name*/
sizeof(PyBobApFrameExtractorObject), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)PyBobApFrameExtractor_Delete, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
(reprfunc)PyBobApFrameExtractor_Repr, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
(ternaryfunc)PyBobApFrameExtractor_Call, /* tp_call */
(reprfunc)PyBobApFrameExtractor_Repr, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
s_frame_extractor_doc, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
(richcmpfunc)PyBobApFrameExtractor_RichCompare, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
0, /* tp_methods */
0, /* tp_members */
PyBobApFrameExtractor_getseters, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)PyBobApFrameExtractor_Init, /* tp_init */
0, /* tp_alloc */
0, /* tp_new */
};
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Elie Khoury <Elie.Khoury@idiap.ch>
#
# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland
import os, sys
import unittest
import bob
import numpy
import array
import math
import time
#############################################################################
# Tests blitz-based extrapolation implementation with values returned
#############################################################################
########################## Values used for the computation ##################
eps = 1e-3
#############################################################################
numpy.set_printoptions(precision=2, threshold=numpy.nan, linewidth=200)
def _read(filename):
"""Read video.FrameContainer containing preprocessed frames"""
fileName, fileExtension = os.path.splitext(filename)
wav_filename = filename
import scipy.io.wavfile
rate, data = scipy.io.wavfile.read(str(wav_filename)) # the data is read in its native format
if data.dtype =='int16':
data = numpy.cast['float'](data)
return [rate,data]
def compare(v1, v2, width):
return abs(v1-v2) <= width
def mel_python(f):
import math
return 2595.0*math.log10(1.+f/700.0)
def mel_inv_python(value):
return 700.0 * (10 ** (value / 2595.0) - 1)
def sig_norm(win_length, frame, flag):
gain = 0.0
for i in range(win_length):
gain = gain + frame[i] * frame[i]
ENERGY_FLOOR = 1.0
if gain < ENERGY_FLOOR:
gain = math.log(ENERGY_FLOOR)
else:
gain = math.log(gain)
if(flag and gain != 0.0):
for i in range(win_length):
frame[i] = frame[i] / gain
return gain
def pre_emphasis(frame, win_length, a):
if (a < 0.0) or (a >= 1.0):
print("Error: The emphasis coeff. should be between 0 and 1")
if (a == 0.0):
return frame
else:
for i in range(win_length - 1, 0, -1):
frame[i] = frame[i] - a * frame[i - 1]
frame[0] = (1. - a) * frame[0]
return frame
def hamming_window(vector, hamming_kernel, win_length):
for i in range(win_length):
vector[i] = vector[i] * hamming_kernel[i]
return vector
def log_filter_bank(x, n_filters, p_index, win_size):
x1 = numpy.array(x, dtype=numpy.complex128)
complex_ = bob.sp.fft(x1)
for i in range(0, int(win_size / 2) + 1):
re = complex_[i].real
im = complex_[i].imag
x[i] = math.sqrt(re * re + im * im)
filters = log_triangular_bank(x, n_filters, p_index)
return filters
def log_triangular_bank(data, n_filters, p_index):
a = 1.0 / (p_index[1:n_filters+2] - p_index[0:n_filters+1] + 1)
vec1 = list(numpy.arange(p_index[i], p_index[i + 1]) for i in range(0, n_filters))
vec2 = list(numpy.arange(p_index[i+1], p_index[i + 2] + 1) for i in range(0, n_filters))
res_ = numpy.array([(numpy.sum(data[vec1[i]]*(1.0 - a [i]* (p_index[i + 1]-(vec1[i])))) +
numpy.sum(data[vec2[i]] * (1.0 - a[i+1] * ( (vec2[i]) - p_index[i + 1]))))
for i in range(0, n_filters)])
FBANK_OUT_FLOOR = 1.0
filters = numpy.log(numpy.where(res_ < FBANK_OUT_FLOOR, FBANK_OUT_FLOOR, res_))
return filters
def dct_transform(filters, n_filters, dct_kernel, n_ceps, dct_norm):
if dct_norm:
dct_coeff = numpy.sqrt(2.0/(n_filters))
else :
dct_coeff = 1.0
ceps = numpy.zeros(n_ceps + 1)
vec = numpy.array(range(1, n_filters + 1))
for i in range(1, n_ceps + 1):
ceps[i - 1] = numpy.sum(filters[vec - 1] * dct_kernel[i - 1][0:n_filters])
ceps[i - 1] = ceps[i - 1] * dct_coeff
return ceps
def cepstral_features_extraction(obj, rate_wavsample, win_length_ms, win_shift_ms, n_filters, n_ceps, dct_norm, f_min, f_max, delta_win,
pre_emphasis_coef, mel_scale, with_energy, with_delta, with_delta_delta):
#########################
## Initialisation part ##
#########################
c = bob.ap.Ceps(rate_wavsample[0], win_length_ms, win_shift_ms, n_filters, n_ceps, f_min, f_max, delta_win, pre_emphasis_coef)
c.dct_norm = dct_norm
c.mel_scale = mel_scale
c.with_energy = with_energy
c.with_delta = with_delta
c.with_delta_delta = with_delta_delta
#ct = bob.ap.TestCeps(c)
sf = rate_wavsample[0]
data = rate_wavsample[1]
win_length = int (sf * win_length_ms / 1000)
win_shift = int (sf * win_shift_ms / 1000)
win_size = int (2.0 ** math.ceil(math.log(win_length) / math.log(2)))
m = int (math.log(win_size) / math.log(2))
# Hamming initialisation
cst = 2 * math.pi / (win_length - 1.0)
hamming_kernel = numpy.zeros(win_length)
for i in range(win_length):
hamming_kernel[i] = (0.54 - 0.46 * math.cos(i * cst))
# Compute cut-off frequencies
p_index = numpy.array(numpy.zeros(n_filters + 2), dtype=numpy.int16)
if(mel_scale):
# Mel scale
m_max = mel_python(f_max)
#obj.assertAlmostEqual(ct.herz_to_mel(f_max), m_max, 7, "Error in Mel...")
m_min = mel_python(f_min)
#obj.assertAlmostEqual(ct.herz_to_mel(f_min), m_min, 7, "Error in Mel...")
for i in range(n_filters + 2):
alpha = ((i) / (n_filters + 1.0))
f = mel_inv_python(m_min * (1 - alpha) + m_max * alpha)
#obj.assertAlmostEqual(ct.mel_to_herz(m_min * (1 - alpha) + m_max * alpha), f, 7, "Error in MelInv...")
factor = f / (sf * 1.0)
p_index[i] = int (round((win_size) * factor))
else:
#linear scale
for i in range(n_filters + 2):
alpha = (i) / (n_filters + 1.0)
f = f_min * (1.0 - alpha) + f_max * alpha
p_index[i] = int (round((win_size / (sf * 1.0) * f)))
#Cosine transform initialisation
dct_kernel = [ [ 0 for i in range(n_filters) ] for j in range(n_ceps) ]
for i in range(1, n_ceps + 1):
for j in range(1, n_filters + 1):
dct_kernel[i - 1][j - 1] = math.cos(math.pi * i * (j - 0.5) / n_filters)
######################################
### End of the Initialisation part ###
######################################
######################################
### Core code ###
######################################
data_size = data.shape[0]
n_frames = int(1 + (data_size - win_length) / win_shift)
# create features set
ceps_sequence = numpy.zeros(n_ceps)
dim0 = n_ceps
if(with_energy):
dim0 += + 1
dim = dim0
if(with_delta):
dim += dim0
if(with_delta_delta):
dim += dim0
else:
with_delta_delta = False
params = [ [ 0 for i in range(dim) ] for j in range(n_frames) ]
# compute cepstral coefficients
delta = 0
for i in range(n_frames):
# create a frame
frame = numpy.zeros(win_size, dtype=numpy.float64)
som = 0.0
vec = numpy.arange(win_length)
frame[vec] = data[vec + i * win_shift]
som = numpy.sum(frame)
som = som / win_size
frame = frame - som
if (with_energy):
energy = sig_norm(win_length, frame, False)
#e1 = ct.log_energy(frame)
#obj.assertAlmostEqual(e1, energy, 7, "Error in Energy Computation...")
f2 = numpy.copy(frame)
# pre-emphasis filtering
frame = pre_emphasis(frame, win_length, pre_emphasis_coef)
#ct.pre_emphasis(f2)
#for kk in range(len(frame)):
# obj.assertAlmostEqual(frame[kk], f2[kk], 7, "Error in Pre-Emphasis Computation...")
# Hamming windowing
f2 = numpy.copy(frame)
frame = hamming_window(frame, hamming_kernel, win_length)
#ct.hamming_window(f2)
#for kk in range(len(frame)):
# obj.assertAlmostEqual(frame[kk], f2[kk], 7, "Error in Pre-Emphasis Computation...")
f2=numpy.copy(frame)
filters = log_filter_bank(frame, n_filters, p_index, win_size)
#filt2 = ct.log_filter_bank(f2, win_size, n_filters)
#for kk in range(len(filters)):
# obj.assertAlmostEqual(filters[kk], filt2[kk], 7, "Error in log Filtering")
ceps = dct_transform(filters, n_filters, dct_kernel, n_ceps, dct_norm)
#ceps2 = ct.apply_dct(n_ceps)
if(with_energy):
d1 = n_ceps + 1
ceps[n_ceps] = energy
#print(ceps)
else:
d1 = n_ceps
# stock the results in params matrix
vec=numpy.arange(d1)
params[i][0:d1]=ceps[vec]
# compute Delta coefficient
if(with_delta):
som = 0.0
for i in range(1,delta_win+1):
som = som + i*i
som = som *2
for i in range(n_frames):
for k in range(n_ceps):
params[i][d1+k] = 0.0
for l in range(1, delta_win+1):
if (i+l < n_frames):
p_ind = i+l
else:
p_ind = n_frames - 1
if (i-l > 0):
n_ind = i-l
else:
n_ind = 0
params[i][d1+k] = params[i][d1+k] + l * (params[p_ind][k] - params[n_ind][k])
params[i][d1+k] = params[i][d1+k] / som
# compute Delta of the Energy
if(with_delta and with_energy):
som = 0.0
vec=numpy.arange(1,delta_win+1)
som = 2.0* numpy.sum(vec*vec)
for i in range(n_frames):
k = n_ceps
params[i][d1+k] = 0.0
for l in range(1, delta_win+1):
if (i+l < n_frames):
p_ind = i+l
else:
p_ind = n_frames - 1