Commit b2e79a6c authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Extractors to transformers

parent 4d3e7ce8
......@@ -8,9 +8,12 @@ import bob.ip.base
import numpy
from bob.bio.base.extractor import Extractor
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.utils import check_array
from bob.pipelines.sample import SampleBatch
class DCTBlocks(Extractor):
class DCTBlocks(TransformerMixin, BaseEstimator):
"""Extracts *Discrete Cosine Transform* (DCT) features from (overlapping) image blocks.
These features are based on the :py:class:`bob.ip.base.DCTFeatures` class.
......@@ -51,16 +54,12 @@ class DCTBlocks(Extractor):
auto_reduce_coefficients=False,
):
# call base class constructor
Extractor.__init__(
self,
block_size=block_size,
block_overlap=block_overlap,
number_of_dct_coefficients=number_of_dct_coefficients,
normalize_blocks=normalize_blocks,
normalize_dcts=normalize_dcts,
auto_reduce_coefficients=auto_reduce_coefficients,
)
self.block_size = (block_size,)
self.block_overlap = block_overlap
self.number_of_dct_coefficients = number_of_dct_coefficients
self.normalize_blocks = normalize_blocks
self.normalize_dcts = normalize_dcts
self.auto_reduce_coefficients = auto_reduce_coefficients
# block parameters
block_size = (
......@@ -89,13 +88,12 @@ class DCTBlocks(Extractor):
)
self.number_of_dct_coefficients = number_of_dct_coefficients
self.block_size = block_size
self.block_size = block_size
self.block_overlap = block_overlap
self.normalize_blocks = normalize_blocks
self.normalize_dcts = normalize_dcts
self._init_non_pickables()
def _init_non_pickables(self):
self.dct_features = bob.ip.base.DCTFeatures(
self.number_of_dct_coefficients,
......@@ -105,7 +103,7 @@ class DCTBlocks(Extractor):
self.normalize_dcts,
)
def __call__(self, image):
def transform(self, X):
"""__call__(image) -> feature
Computes and returns DCT blocks for the given input image.
......@@ -121,21 +119,29 @@ class DCTBlocks(Extractor):
The extracted DCT features for all blocks inside the image.
The first index is the block index, while the second index is the DCT coefficient.
"""
assert isinstance(image, numpy.ndarray)
assert image.ndim == 2
assert image.dtype == numpy.float64
# Computes DCT features
return self.dct_features(image)
def _extract(image):
assert isinstance(image, numpy.ndarray)
assert image.ndim == 2
assert image.dtype == numpy.float64
# re-define the train function to get it non-documented
def train(*args, **kwargs):
raise NotImplementedError(
"This function is not implemented and should not be called."
)
# Computes DCT features
return self.dct_features(image)
if isinstance(X, SampleBatch):
extracted = []
X = check_array(X, allow_nd=True)
for x in X:
extracted.append(_extract(x))
return extracted
else:
return _extract(X)
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def load(*args, **kwargs):
pass
def fit(self, X, y=None):
return self
def __getstate__(self):
d = dict(self.__dict__)
......
......@@ -7,10 +7,12 @@ import bob.io.base
import numpy
import math
from bob.bio.base.extractor import Extractor
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.utils import check_array
from bob.pipelines.sample import SampleBatch
class GridGraph(Extractor):
class GridGraph(TransformerMixin, BaseEstimator):
"""Extracts Gabor jets in a grid structure [GHW12]_ using functionalities from :ref:`bob.ip.gabor <bob.ip.gabor>`.
The grid can be either aligned to the eye locations (in which case the grid might be rotated), or a fixed grid graph can be extracted.
......@@ -71,25 +73,6 @@ class GridGraph(Extractor):
first_node=None, # one or two integral values, or None -> automatically determined
):
# call base class constructor
Extractor.__init__(
self,
gabor_directions=gabor_directions,
gabor_scales=gabor_scales,
gabor_sigma=gabor_sigma,
gabor_maximum_frequency=gabor_maximum_frequency,
gabor_frequency_step=gabor_frequency_step,
gabor_power_of_k=gabor_power_of_k,
gabor_dc_free=gabor_dc_free,
normalize_gabor_jets=normalize_gabor_jets,
eyes=eyes,
nodes_between_eyes=nodes_between_eyes,
nodes_along_eyes=nodes_along_eyes,
nodes_above_eyes=nodes_above_eyes,
nodes_below_eyes=nodes_below_eyes,
node_distance=node_distance,
first_node=first_node,
)
self.gabor_directions = gabor_directions
self.gabor_scales = gabor_scales
......@@ -145,13 +128,12 @@ class GridGraph(Extractor):
if not hasattr(self, "_aligned_graph"):
self._aligned_graph = None
if isinstance(self.node_distance, (int, float)):
self.node_distance = (int(self.node_distance), int(self.node_distance))
self._graph = None
def _extractor(self, image):
"""Creates an extractor based on the given image.
If an aligned graph was specified in the constructor, it is simply returned.
......@@ -203,7 +185,7 @@ class GridGraph(Extractor):
return self._graph
def __call__(self, image):
def transform(self, X):
"""__call__(image) -> feature
Returns a list of Gabor jets extracted from the given image.
......@@ -220,24 +202,34 @@ class GridGraph(Extractor):
The 2D location of the jet's nodes is not returned.
"""
assert image.ndim == 2
assert isinstance(image, numpy.ndarray)
image = image.astype(numpy.float64)
assert image.dtype == numpy.float64
def _extract(image):
assert image.ndim == 2
assert isinstance(image, numpy.ndarray)
image = image.astype(numpy.float64)
assert image.dtype == numpy.float64
extractor = self._extractor(image)
extractor = self._extractor(image)
# perform Gabor wavelet transform
self.gwt.transform(image, self.trafo_image)
# extract face graph
jets = extractor.extract(self.trafo_image)
# perform Gabor wavelet transform
self.gwt.transform(image, self.trafo_image)
# extract face graph
jets = extractor.extract(self.trafo_image)
# normalize the Gabor jets of the graph only
if self.normalize_jets:
[j.normalize() for j in jets]
# normalize the Gabor jets of the graph only
if self.normalize_jets:
[j.normalize() for j in jets]
# return the extracted face graph
return self.__class__.serialize_jets(jets)
# return the extracted face graph
return self.__class__.serialize_jets(jets)
if isinstance(X, SampleBatch):
extracted = []
X = check_array(X, allow_nd=True)
for x in X:
extracted.append(_extract(x))
return extracted
else:
return _extract(X)
def write_feature(self, feature, feature_file):
"""Writes the feature extracted by the `__call__` function to the given file.
......@@ -276,15 +268,6 @@ class GridGraph(Extractor):
bob.ip.gabor.load_jets(bob.io.base.HDF5File(feature_file))
)
# re-define the train function to get it non-documented
def train(*args, **kwargs):
raise NotImplementedError(
"This function is not implemented and should not be called."
)
def load(*args, **kwargs):
pass
def __getstate__(self):
d = dict(self.__dict__)
d.pop("gwt")
......@@ -305,3 +288,9 @@ class GridGraph(Extractor):
sj.jet = jet.jet
serialize_jets.append(sj)
return serialize_jets
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None):
return self
\ No newline at end of file
......@@ -8,10 +8,12 @@ import bob.ip.base
import numpy
import math
from bob.bio.base.extractor import Extractor
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.utils import check_array
from bob.pipelines.sample import SampleBatch
class LGBPHS(Extractor):
class LGBPHS(TransformerMixin, BaseEstimator):
"""Extracts *Local Gabor Binary Pattern Histogram Sequences* (LGBPHS) [ZSG05]_ from the images, using functionality from :ref:`bob.ip.base <bob.ip.base>` and :ref:`bob.ip.gabor <bob.ip.gabor>`.
The block size and the overlap of the blocks can be varied, as well as the parameters of the Gabor wavelet (:py:class:`bob.ip.gabor.Transform`) and the LBP extractor (:py:class:`bob.ip.base.LBP`).
......@@ -80,30 +82,6 @@ class LGBPHS(Extractor):
sparse_histogram=False,
split_histogram=None,
):
# call base class constructor
Extractor.__init__(
self,
block_size=block_size,
block_overlap=block_overlap,
gabor_directions=gabor_directions,
gabor_scales=gabor_scales,
gabor_sigma=gabor_sigma,
gabor_maximum_frequency=gabor_maximum_frequency,
gabor_frequency_step=gabor_frequency_step,
gabor_power_of_k=gabor_power_of_k,
gabor_dc_free=gabor_dc_free,
use_gabor_phases=use_gabor_phases,
lbp_radius=lbp_radius,
lbp_neighbor_count=lbp_neighbor_count,
lbp_uniform=lbp_uniform,
lbp_circular=lbp_circular,
lbp_rotation_invariant=lbp_rotation_invariant,
lbp_compare_to_average=lbp_compare_to_average,
lbp_add_average=lbp_add_average,
sparse_histogram=sparse_histogram,
split_histogram=split_histogram,
)
# block parameters
self.block_size = (
block_size
......@@ -214,7 +192,7 @@ class LGBPHS(Extractor):
values.append(array[i])
return numpy.array([indices, values], dtype=numpy.float64)
def __call__(self, image):
def transform(self, X):
"""__call__(image) -> feature
Extracts the local Gabor binary pattern histogram sequence from the given image.
......@@ -230,79 +208,81 @@ class LGBPHS(Extractor):
The list of Gabor jets extracted from the image.
The 2D location of the jet's nodes is not returned.
"""
""""""
assert image.ndim == 2
assert isinstance(image, numpy.ndarray)
assert image.dtype == numpy.float64
# perform GWT on image
if self.trafo_image is None or self.trafo_image.shape[1:3] != image.shape:
# create trafo image
self.trafo_image = numpy.ndarray(
(self.gwt.number_of_wavelets, image.shape[0], image.shape[1]),
numpy.complex128,
)
# perform Gabor wavelet transform
self.gwt.transform(image, self.trafo_image)
jet_length = self.gwt.number_of_wavelets * (2 if self.use_phases else 1)
lgbphs_array = None
# iterate through the layers of the trafo image
for j in range(self.gwt.number_of_wavelets):
# compute absolute part of complex response
abs_image = numpy.abs(self.trafo_image[j])
# Computes LBP histograms
abs_blocks = bob.ip.base.lbphs(
abs_image, self.lbp, self.block_size, self.block_overlap
)
# Converts to Blitz array (of different dimensionalities)
self.n_bins = abs_blocks.shape[1]
self.n_blocks = abs_blocks.shape[0]
if self.split is None:
shape = (self.n_blocks * self.n_bins * jet_length,)
elif self.split == "blocks":
shape = (self.n_blocks, self.n_bins * jet_length)
elif self.split == "wavelets":
shape = (jet_length, self.n_bins * self.n_blocks)
elif self.split == "both":
shape = (jet_length * self.n_blocks, self.n_bins)
else:
raise ValueError(
"The split parameter must be one of ['blocks', 'wavelets', 'both'] or None"
def _extract(image):
assert image.ndim == 2
assert isinstance(image, numpy.ndarray)
assert image.dtype == numpy.float64
# perform GWT on image
if self.trafo_image is None or self.trafo_image.shape[1:3] != image.shape:
# create trafo image
self.trafo_image = numpy.ndarray(
(self.gwt.number_of_wavelets, image.shape[0], image.shape[1]),
numpy.complex128,
)
# create new array if not done yet
if lgbphs_array is None:
lgbphs_array = numpy.ndarray(shape, "float64")
# perform Gabor wavelet transform
self.gwt.transform(image, self.trafo_image)
# fill the array with the absolute values of the Gabor wavelet transform
self._fill(lgbphs_array, abs_blocks, j)
jet_length = self.gwt.number_of_wavelets * (2 if self.use_phases else 1)
if self.use_phases:
# compute phase part of complex response
phase_image = numpy.angle(self.trafo_image[j])
lgbphs_array = None
# iterate through the layers of the trafo image
for j in range(self.gwt.number_of_wavelets):
# compute absolute part of complex response
abs_image = numpy.abs(self.trafo_image[j])
# Computes LBP histograms
phase_blocks = bob.ip.base.lbphs(
phase_image, self.lbp, self.block_size, self.block_overlap
abs_blocks = bob.ip.base.lbphs(
abs_image, self.lbp, self.block_size, self.block_overlap
)
# fill the array with the phases at the end of the blocks
self._fill(lgbphs_array, phase_blocks, j + self.gwt.number_of_wavelets)
# return the concatenated list of all histograms
return self._sparsify(lgbphs_array)
# re-define the train function to get it non-documented
def train(*args, **kwargs):
raise NotImplementedError(
"This function is not implemented and should not be called."
)
def load(*args, **kwargs):
pass
# Converts to Blitz array (of different dimensionalities)
self.n_bins = abs_blocks.shape[1]
self.n_blocks = abs_blocks.shape[0]
if self.split is None:
shape = (self.n_blocks * self.n_bins * jet_length,)
elif self.split == "blocks":
shape = (self.n_blocks, self.n_bins * jet_length)
elif self.split == "wavelets":
shape = (jet_length, self.n_bins * self.n_blocks)
elif self.split == "both":
shape = (jet_length * self.n_blocks, self.n_bins)
else:
raise ValueError(
"The split parameter must be one of ['blocks', 'wavelets', 'both'] or None"
)
# create new array if not done yet
if lgbphs_array is None:
lgbphs_array = numpy.ndarray(shape, "float64")
# fill the array with the absolute values of the Gabor wavelet transform
self._fill(lgbphs_array, abs_blocks, j)
if self.use_phases:
# compute phase part of complex response
phase_image = numpy.angle(self.trafo_image[j])
# Computes LBP histograms
phase_blocks = bob.ip.base.lbphs(
phase_image, self.lbp, self.block_size, self.block_overlap
)
# fill the array with the phases at the end of the blocks
self._fill(lgbphs_array, phase_blocks, j + self.gwt.number_of_wavelets)
# return the concatenated list of all histograms
return self._sparsify(lgbphs_array)
if isinstance(X, SampleBatch):
extracted = []
X = check_array(X, allow_nd=True)
for x in X:
extracted.append(_extract(x))
return extracted
else:
return _extract(X)
def __getstate__(self):
d = dict(self.__dict__)
......@@ -313,3 +293,9 @@ class LGBPHS(Extractor):
def __setstate__(self, d):
self.__dict__ = d
self._init_non_pickables()
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None):
return self
\ No newline at end of file
......@@ -33,124 +33,144 @@ import pkg_resources
regenerate_refs = False
def _compare(data, reference, write_function = bob.bio.base.save, read_function = bob.bio.base.load, atol = 1e-5, rtol = 1e-8):
# write reference?
if regenerate_refs:
write_function(data, reference)
# compare reference
reference = read_function(reference)
assert numpy.allclose(data, reference, atol=atol, rtol=rtol)
def _compare(
data,
reference,
write_function=bob.bio.base.save,
read_function=bob.bio.base.load,
atol=1e-5,
rtol=1e-8,
):
# write reference?
if regenerate_refs:
write_function(data, reference)
def _data():
return bob.bio.base.load(pkg_resources.resource_filename('bob.bio.face.test', 'data/cropped.hdf5'))
# compare reference
reference = read_function(reference)
assert numpy.allclose(data, reference, atol=atol, rtol=rtol)
def test_dct_blocks():
# read input
data = _data()
dct = bob.bio.base.load_resource('dct-blocks', 'extractor', preferred_package='bob.bio.face')
assert isinstance(dct, bob.bio.face.extractor.DCTBlocks)
assert isinstance(dct, bob.bio.base.extractor.Extractor)
assert not dct.requires_training
def _data():
return bob.bio.base.load(
pkg_resources.resource_filename("bob.bio.face.test", "data/cropped.hdf5")
)
# generate smaller extractor, using mixed tuple and int input for the block size and overlap
dct = bob.bio.face.extractor.DCTBlocks(8, (0,0), 15)
# extract features
feature = dct(data)
assert feature.ndim == 2
# feature dimension is one lower than the block size, since blocks are normalized by default
assert feature.shape == (80, 14)
reference = pkg_resources.resource_filename('bob.bio.face.test', 'data/dct_blocks.hdf5')
_compare(feature, reference, dct.write_feature, dct.read_feature)
def test_dct_blocks():
# read input
data = _data()
dct = bob.bio.base.load_resource(
"dct-blocks", "extractor", preferred_package="bob.bio.face"
)
assert isinstance(dct, bob.bio.face.extractor.DCTBlocks)
# generate smaller extractor, using mixed tuple and int input for the block size and overlap
dct = bob.bio.face.extractor.DCTBlocks(8, (0, 0), 15)
# extract features
feature = dct.transform(data)
assert feature.ndim == 2
# feature dimension is one lower than the block size, since blocks are normalized by default
assert feature.shape == (80, 14)
reference = pkg_resources.resource_filename(
"bob.bio.face.test", "data/dct_blocks.hdf5"
)
_compare(feature, reference)
def test_graphs():
data = _data()
graph = bob.bio.base.load_resource('grid-graph', 'extractor', preferred_package='bob.bio.face')
assert isinstance(graph, bob.bio.face.extractor.GridGraph)
assert isinstance(graph, bob.bio.base.extractor.Extractor)
assert not graph.requires_training
# generate smaller extractor, using mixed tuple and int input for the node distance and first location
graph = bob.bio.face.extractor.GridGraph(node_distance = 24)
# extract features
feature = graph(data)
reference = pkg_resources.resource_filename('bob.bio.face.test', 'data/graph_regular.hdf5')
# write reference?
if regenerate_refs:
graph.write_feature(feature, reference)
# compare reference
reference = graph.read_feature(reference)
assert len(reference) == len(feature)
assert all(isinstance(f, bob.ip.gabor.Jet) for f in feature)
assert all(numpy.allclose(r.jet, f.jet) for r,f in zip(reference, feature))
# get reference face graph extractor
cropper = bob.bio.base.load_resource('face-crop-eyes', 'preprocessor', preferred_package='bob.bio.face')
eyes = cropper.cropped_positions
# generate aligned graph extractor
graph = bob.bio.face.extractor.GridGraph(
# setup of the aligned grid
eyes = eyes,
nodes_between_eyes = 4,
nodes_along_eyes = 2,
nodes_above_eyes = 2,
nodes_below_eyes = 7
)
nodes = graph._extractor(data).nodes
assert len(nodes) == 100
assert numpy.allclose(nodes[22], eyes['reye'])
assert numpy.allclose(nodes[27], eyes['leye'])
assert nodes[0] < eyes['reye']
assert nodes[-1] > eyes['leye']
data = _data()
graph = bob.bio.base.load_resource(
"grid-graph", "extractor", preferred_package="bob.bio.face"
)
assert isinstance(graph, bob.bio.face.extractor.GridGraph)
# generate smaller extractor, using mixed tuple and int input for the node distance and first location
graph = bob.bio.face.extractor.GridGraph(node_distance=24)
# extract features
feature = graph.transform(data)
reference = pkg_resources.resource_filename(
"bob.bio.face.test", "data/graph_regular.hdf5"
)
# write reference?
if regenerate_refs:
graph.write_feature(feature, reference)
# compare reference
reference = graph.read_feature(reference)
assert len(reference) == len(feature)
assert all(isinstance(f, bob.ip.gabor.Jet) for f in feature)
assert all(numpy.allclose(r.jet, f.jet) for r, f in zip(reference, feature))
# get reference face graph extractor
cropper = bob.bio.base.load_resource(
"face-crop-eyes", "preprocessor", preferred_package="bob.bio.face"
)
eyes = cropper.cropped_positions
# generate aligned graph extractor
graph = bob.bio.face.extractor.GridGraph(
# setup of the aligned grid
eyes=eyes,
nodes_between_eyes=4,