Commit d66176e9 authored by Manuel Günther's avatar Manuel Günther

First running version of bob.bio.video

parents
*~
*.swp
*.pyc
bin
eggs
parts
.installed.cfg
.mr.developer.cfg
*.egg-info
src
develop-eggs
sphinx
dist
This diff is collapsed.
include README.rst bootstrap-buildout.py buildout.cfg COPYING version.txt
recursive-include doc *.py *.rst
Example buildout environment
============================
This simple example demonstrates how to wrap Bob-based scripts on buildout
environments. This may be useful for homework assignments, tests or as a way to
distribute code to reproduce your publication. In summary, if you need to give
out code to others, we recommend you do it following this template so your code
can be tested, documented and run in an orderly fashion.
Installation
------------
.. note::
To follow these instructions locally you will need a local copy of this
package. For that, you can use the github tarball API to download the package::
$ wget --no-check-certificate https://github.com/idiap/bob.project.example/tarball/master -O- | tar xz
$ mv idiap-bob.project* bob.project.example
Documentation and Further Information
-------------------------------------
Please refer to the latest Bob user guide, accessing from the `Bob website
<http://idiap.github.com/bob/>`_ for how to create your own packages based on
this example. In particular, the Section entitled `Organize Your Work in
Satellite Packages <http://www.idiap.ch/software/bob/docs/releases/last/sphinx/html/OrganizeYourCode.html>`_
contains details on how to setup, build and roll out your code.
#see http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
__import__('pkg_resources').declare_namespace(__name__)
#see http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
__import__('pkg_resources').declare_namespace(__name__)
from .utils import *
from . import preprocessor
from . import extractor
from . import algorithm
from . import test
import bob.bio.base
import bob.io.base
from .. import utils
class Algorithm (bob.bio.base.algorithm.Algorithm):
def __init__(self,
algorithm,
frame_selector = utils.FrameSelector(selection_style='all'),
compressed_io = False
):
# load algorithm configuration
if isinstance(algorithm, str):
self.algorithm = bob.bio.base.load_resource(algorithm, "algorithm")
elif isinstance(algorithm, bob.bio.base.algorithm.Algorithm):
self.algorithm = algorithm
else:
raise ValueError("The given algorithm could not be interpreter")
bob.bio.base.algorithm.Algorithm.__init__(
self,
self.algorithm.performs_projection,
self.algorithm.requires_projector_training,
self.algorithm.split_training_features_by_client,
self.algorithm.use_projected_features_for_enrollment,
self.algorithm.requires_enroller_training,
algorithm=algorithm,
frame_selector=frame_selector,
compressed_io=compressed_io
)
self.frame_selector = frame_selector
# if we select the frames during feature extraction, for enrollment we use all files
# otherwise select frames during enrollment (or enroller training)
self.enroll_frame_selector = (lambda i : i) if self.use_projected_features_for_enrollment else frame_selector
self.compressed_io = compressed_io
# PROJECTION
def train_projector(self, data_list, projector_file):
"""Trains the projector using features from selected frames."""
if self.split_training_features_by_client:
training_features = [[frame[1] for frame_container in client_containers for frame in self.frame_selector(frame_container)] for client_containers in data_list]
else:
training_features = [frame[1] for frame_container in data_list for frame in self.frame_selector(frame_container)]
self.algorithm.train_projector(training_features, projector_file)
def load_projector(self, projector_file):
return self.algorithm.load_projector(projector_file)
def project(self, frame_container):
"""Projects each frame and saves them in a frame container."""
fc = utils.FrameContainer()
for index, frame, quality in self.frame_selector(frame_container):
# extract features
projected = self.algorithm.project(frame)
features = projected if isinstance(projected, (list,tuple)) else projected.copy()
# add image to frame container
fc.add(index, features, quality)
return fc
def save_feature(self, frames, projected_file):
if self.compressed_io:
return utils.save_compressed(frame_container, projected_file, self.algorithm.write_feature)
else:
frame_container.save(bob.io.base.HDF5File(projected_file, 'w'), self.algorithm.write_feature)
def read_feature(self, projected_file):
if self.compressed_io:
return utils.load_compressed(projected_file, self.algorithm.read_feature)
else:
return utils.FrameContainer(bob.io.base.HDF5File(projected_file), self.algorithm.read_feature)
# ENROLLMENT
def train_enroller(self, training_frames, enroller_file):
features = [[frame[1] for frame_container in client_frames for frame in self.enroll_frame_selector(frame_container)] for client_frames in training_frames]
self.algorithm.train_enroller(features, enroller_file)
def load_enroller(self, enroller_file):
self.algorithm.load_enroller(enroller_file)
def enroll(self, enroll_frames):
"""Enrolls the model from features of all images of all videos."""
features = [frame[1] for frame_container in enroll_frames for frame in self.enroll_frame_selector(frame_container)]
return self.algorithm.enroll(features)
def save_model(self, model, filename):
"""Saves the model using the algorithm's save function."""
self.algorithm.save_model(model, filename)
# SCORING
def read_model(self, filename):
"""Reads the model using the algorithm's read function."""
return self.algorithm.read_model(filename)
def read_probe(self, filename):
"""Reads the model using the algorithm's read function."""
try:
if self.compressed_io:
return utils.load_compressed(filename, self.algorithm.read_probe)
else:
return utils.FrameContainer(bob.io.base.HDF5File(filename), self.algorithm.read_probe)
except IOError:
return self.algorithm.read_probe(filename)
def score(self, model, probe):
"""Computes the score between the given model and the probe, which is a list of frames."""
if isinstance(probe, utils.FrameContainer):
features = [frame[1] for frame in probe]
return self.algorithm.score_for_multiple_probes(model, features)
else:
return self.algorithm.score(model, probe)
def score_for_multiple_probes(self, model, probes):
"""Computes the score between the given model and the probes, where each probe is a list of frames."""
probe = [frame[1] for frame in probe for probe in probes]
return self.algorithm.score_for_multiple_probes(model, probe)
from .Algorithm import Algorithm
from . import mobio
from . import youtube
#!/usr/bin/env python
import bob.db.mobio
import bob.bio.base
mobio_video_directory = "[YOUR_MOBIO_VIDEO_DIRECTORY]"
database = bob.bio.base.database.DatabaseBobZT(
database = bob.db.mobio.Database(
original_directory = mobio_video_directory,
original_extension = '.mp4',
),
name = "mobio",
protocol = 'male',
models_depend_on_protocol = True,
all_files_options = {'subworld' : 'twothirds-subsampled'},
extractor_training_options = {'subworld' : 'twothirds-subsampled'},
projector_training_options = {'subworld' : 'twothirds-subsampled'},
enroller_training_options = {'subworld' : 'twothirds-subsampled'},
)
#!/usr/bin/env python
import bob.db.youtube
import bob.bio.base
youtube_directory = "[YOUR_YOUTUBE_DIRECTORY]"
database = bob.bio.base.database.DatabaseBobZT(
database = bob.db.youtube.Database(
original_directory = youtube_directory,
),
name = "youtube",
protocol = 'fold1',
models_depend_on_protocol = True,
training_depends_on_protocol = True,
all_files_options = {'subworld' : 'fivefolds'},
extractor_training_options = {'subworld' : 'fivefolds'},
projector_training_options = {'subworld' : 'fivefolds'},
enroller_training_options = {'subworld' : 'fivefolds'},
)
import bob.bio.base
import bob.io.base
import os
from .. import utils
class Extractor (bob.bio.base.extractor.Extractor):
def __init__(self,
extractor,
frame_selector = utils.FrameSelector(selection_style='all'),
compressed_io = False
):
# load extractor configuration
if isinstance(extractor, str):
self.extractor = bob.bio.base.load_resource(extractor, "extractor")
elif isinstance(extractor, bob.bio.base.extractor.Extractor):
self.extractor = extractor
else:
raise ValueError("The given extractor could not be interpreter")
self.frame_selector = frame_selector
self.compressed_io = compressed_io
# register extractor's details
bob.bio.base.extractor.Extractor.__init__(self, requires_training=self.extractor.requires_training, split_training_data_by_client=self.extractor.split_training_data_by_client, extractor=extractor, frame_selector=frame_selector, compressed_io=compressed_io)
def __call__(self, frame_container, annotations=None):
"""Extracts the frames from the video and returns a frame container."""
# now, go through the frames and extract the features
fc = utils.FrameContainer()
for index, frame, quality in self.frame_selector(frame_container):
# extract features
extracted = self.extractor(frame)
# add features to new frame container
fc.add(index, extracted, quality)
return fc
def read_feature(self, filename):
if self.compressed_io:
return utils.load_compressed(filename, self.extractor.read_feature)
else:
return utils.FrameContainer(bob.io.base.HDF5File(filename), self.extractor.read_feature)
def write_feature(self, frame_container, filename):
if self.compressed_io:
return utils.save_compressed(frame_container, filename, self.extractor.write_feature)
else:
frame_container.save(bob.io.base.HDF5File(filename, 'w'), self.extractor.save_feature)
def train(self, data_list, extractor_file):
"""Trains the feature extractor with the image data of the given frames."""
if self.split_training_data_by_client:
features = [[frame[1] for frame_container in client_containers for frame in self.frame_selector(frame_container)] for client_containers in data_list]
else:
features = [frame[1] for frame_container in data_list for frame in self.frame_selector(frame_container)]
self.extractor.train(features, extractor_file)
def load(self, extractor_file):
self.extractor.load(extractor_file)
from .Extractor import Extractor
import bob.bio.base
import numpy
import glob
import os
import bob.io.base
from .. import utils
class Preprocessor (bob.bio.base.preprocessor.Preprocessor):
def __init__(self,
preprocessor = 'landmark-detect',
frame_selector = utils.FrameSelector(),
quality_function = None,
compressed_io = False
):
# load preprocessor configuration
if isinstance(preprocessor, str):
self.preprocessor = bob.bio.base.load_resource(preprocessor, "preprocessor")
elif isinstance(preprocessor, bob.bio.base.preprocessor.Preprocessor):
self.preprocessor = preprocessor
else:
raise ValueError("The given algorithm could not be interpreter")
bob.bio.base.preprocessor.Preprocessor.__init__(self, preprocessor=preprocessor, frame_selector=frame_selector, compressed_io=compressed_io)
self.frame_selector = frame_selector
self.quality_function = quality_function
self.compressed_io = compressed_io
def __call__(self, frame_container, annotations=None):
"""Extracts the frames from the video and returns a frame container.
Faces are extracted for all frames in the given frame container, using the ``preprocessor`` specified in the contructor.
If given, the annotations need to be in a dictionary.
The key is either the frame number (for video data) or the image name (for image list data).
The value is another dictionary, building the relation between keypoint names and their location, e.g., {'leye' : (le_y, le_x), 'reye' : (re_y, re_x)}
The annotations for the according frames, if present, are passed to the preprocessor.
"""
annots = None
fc = utils.FrameContainer()
for index, frame, _ in frame_container:
# if annotations are given, we take them
if annotations is not None: annots = annotations[index]
# preprocess image (by default: detect a face)
preprocessed = self.preprocessor(frame, annots)
if preprocessed is not None:
# compute the quality of the detection
if self.quality_function is not None:
quality = self.quality_function(preprocessed)
elif hasattr(self.preprocessor, 'quality'):
quality = self.preprocessor.quality
else:
quality = None
# add image to frame container
if hasattr(preprocessed, 'copy'):
preprocessed = preprocessed.copy()
fc.add(index, preprocessed, quality)
return fc
def read_original_data(self, data):
return self.frame_selector(data)
def read_data(self, filename):
if self.compressed_io:
return utils.load_compressed(filename, self.preprocessor.read_data)
else:
return utils.FrameContainer(bob.io.base.HDF5File(filename), self.preprocessor.read_data)
def save_data(self, frame_container, filename):
if self.compressed_io:
return utils.save_compressed(frame_container, filename, self.preprocessor.write_data)
else:
frame_container.save(bob.io.base.HDF5File(filename, 'w'), self.preprocessor.write_data)
from .Preprocessor import Preprocessor
import os
import numpy
import bob.bio.base
import bob.bio.base.test.dummy.algorithm
import bob.bio.video
import bob.io.base
import bob.io.base.test_utils
import bob.io.image
import bob.io.video
from nose.plugins.skip import SkipTest
import pkg_resources
regenerate_refs = False
def test_algorithm():
filename = bob.io.base.test_utils.temporary_filename()
# load test data
extracted_file = pkg_resources.resource_filename("bob.bio.video.test", "data/extracted.hdf5")
extractor = bob.bio.video.extractor.Extractor('dummy', compressed_io=False)
extracted = extractor.read_feature(extracted_file)
# use video tool with dummy face recognition tool, which contains all required functionality
algorithm = bob.bio.video.algorithm.Algorithm(bob.bio.base.test.dummy.algorithm.DummyAlgorithm(), compressed_io=False)
try:
# projector training
algorithm.train_projector([extracted] * 25, filename)
assert os.path.exists(filename)
algorithm2 = bob.bio.video.algorithm.Algorithm("bob.bio.base.test.dummy.algorithm.DummyAlgorithm()", compressed_io=False)
# load projector; will perform checks internally
algorithm2.load_projector(filename)
projected = algorithm2.project(extracted)
reference_file = pkg_resources.resource_filename("bob.bio.video.test", "data/projected.hdf5")
if regenerate_refs:
algorithm.write_feature(projected, reference_file)
projected2 = algorithm.read_feature(reference_file)
assert projected.is_similar_to(projected2)
finally:
if os.path.exists(filename):
os.remove(filename)
try:
# perform enroller training
algorithm.train_enroller([[projected] * 5] * 5, filename)
assert os.path.exists(filename)
# load projector; will perform checks internally
algorithm2.load_enroller(filename)
# enroll features
model = algorithm2.enroll([projected] * 5)
reference_file = pkg_resources.resource_filename("bob.bio.video.test", "data/model.hdf5")
if regenerate_refs:
algorithm.write_model(model, reference_file)
model2 = algorithm2.read_model(reference_file)
assert numpy.allclose(model, model2)
finally:
if os.path.exists(filename):
os.remove(filename)
# score
score = algorithm.score(model, projected)
assert abs(score - 691.868650) < 1e-4, "The score %f is not close to %f" % (score, 691.868650)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Manuel Guenther <Manuel.Guenther@idiap.ch>
# @date: Thu May 24 10:41:42 CEST 2012
#
# Copyright (C) 2011-2012 Idiap Research Institute, Martigny, Switzerland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from nose.plugins.skip import SkipTest
import bob.bio.base
def _check_database(database, groups = ('dev',), protocol = None, training_depends = False, models_depend = False):
assert isinstance(database, bob.bio.base.database.DatabaseBob)
if protocol: database.protocol = protocol
assert len(database.all_files()) > 0
assert len(database.training_files('train_extractor')) > 0
assert len(database.training_files('train_enroller', arrange_by_client = True)) > 0
for group in groups:
model_ids = database.model_ids(group)
assert len(model_ids) > 0
assert database.client_id_from_model_id(model_ids[0]) is not None
assert len(database.enroll_files(model_ids[0], group)) > 0
assert len(database.probe_files(model_ids[0], group)) > 0
assert database.training_depends_on_protocol == training_depends
assert database.models_depend_on_protocol == models_depend
def _check_database_zt(database, groups = ('dev', 'eval'), protocol = None, training_depends = False, models_depend = False):
_check_database(database, groups, protocol, training_depends, models_depend)
assert isinstance(database, bob.bio.base.database.DatabaseBobZT)
for group in groups:
t_model_ids = database.t_model_ids(group)
assert len(t_model_ids) > 0
assert database.client_id_from_model_id(t_model_ids[0]) is not None
assert len(database.t_enroll_files(t_model_ids[0], group)) > 0
assert len(database.z_probe_files(group)) > 0
def test_mobio():
database = bob.bio.base.load_resource('mobio-video', 'database')
try:
_check_database_zt(database, models_depend=True)
_check_database_zt(database, protocol = 'female', models_depend=True)
except IOError as e:
raise SkipTest("The database could not be queried; probably the db.sql3 file is missing. Here is the import error: '%s'" % e)
def test_youtube():
database = bob.bio.base.load_resource('youtube', 'database')
try:
_check_database_zt(database, training_depends=True, models_depend=True)
_check_database_zt(database, protocol = 'fold7', training_depends=True, models_depend=True)
except IOError as e:
raise SkipTest("The database could not be queried; probably the db.sql3 file is missing. Here is the import error: '%s'" % e)
import os
import numpy
import bob.bio.base
import bob.bio.base.test.dummy.extractor
import bob.bio.video
import bob.io.base
import bob.io.base.test_utils
import bob.io.image
import bob.io.video
import pkg_resources
regenerate_refs = False
def test_extractor():
filename = bob.io.base.test_utils.temporary_filename()
try:
# load test data
preprocessed_video_file = pkg_resources.resource_filename("bob.bio.video.test", "data/preprocessed.hdf5")
preprocessor = bob.bio.video.preprocessor.Preprocessor('face-crop-eyes', compressed_io=False)
preprocessed_video = preprocessor.read_data(preprocessed_video_file)
extractor = bob.bio.video.extractor.Extractor(bob.bio.base.test.dummy.extractor.DummyExtractor(), compressed_io=False)
extractor.train([preprocessed_video]*5, filename)
assert os.path.exists(filename)
extractor2 = bob.bio.video.extractor.Extractor("dummy", compressed_io=False)
extractor2.load(filename)
extracted = extractor2(preprocessed_video)
assert isinstance(extracted, bob.bio.video.FrameContainer)
reference_file = pkg_resources.resource_filename("bob.bio.video.test", "data/extracted.hdf5")
if regenerate_refs:
extracted.save(bob.io.base.HDF5File(reference_file, 'w'))
reference_data = bob.bio.video.FrameContainer(bob.io.base.HDF5File(reference_file, 'r'))
assert extracted.is_similar_to(reference_data)
finally:
if os.path.exists(filename):
os.remove(filename)
import os
import numpy
import bob.io.base
import bob.io.image
import bob.io.video
import bob.bio.base
import bob.bio.video
import bob.db.verification.utils
from nose.plugins.skip import SkipTest
import pkg_resources
regenerate_refs = False
def test_annotations():
# use annotations to grep
image_files = [pkg_resources.resource_filename("bob.bio.face.test", "data/testimage.jpg")]
annotations = {os.path.basename(image_files[0]) : bob.db.verification.utils.read_annotation_file(pkg_resources.resource_filename("bob.bio.face.test", "data/testimage.pos"), 'named')}
# video preprocessor using a face crop preprocessor
frame_selector = bob.bio.video.FrameSelector(selection_style="all")
preprocessor = bob.bio.video.preprocessor.Preprocessor('face-crop-eyes', frame_selector, compressed_io=False)
# read original data
original = preprocessor.read_original_data(image_files)
assert isinstance(original, bob.bio.video.FrameContainer)
assert len(original) == 1
assert original[0][0] == os.path.basename(image_files[0])
# preprocess data including annotations
preprocessed = preprocessor(original, annotations)
assert isinstance(preprocessed, bob.bio.video.FrameContainer)
assert len(preprocessed) == 1
assert preprocessed[0][0] == os.path.basename(image_files[0])
assert preprocessed[0][2] is None
assert numpy.allclose(preprocessed[0][1], bob.io.base.load(pkg_resources.resource_filename("bob.bio.face.test", "data/cropped.hdf5")))
def test_detect():
# load test video
video_file = pkg_resources.resource_filename("bob.bio.video.test", "data/testvideo.avi")
frame_selector = bob.bio.video.FrameSelector(max_number_of_frames=3, selection_style="spread")
preprocessor = bob.bio.video.preprocessor.Preprocessor('face-detect', frame_selector, compressed_io=False)
video = preprocessor.read_original_data(video_file)
assert isinstance(video, bob.bio.video.FrameContainer)
preprocessed_video = preprocessor(video)
assert isinstance(preprocessed_video, bob.bio.video.FrameContainer)
reference_file = pkg_resources.resource_filename("bob.bio.video.test", "data/preprocessed.hdf5")
if regenerate_refs:
preprocessed_video.save(bob.io.base.HDF5File(reference_file, 'w'))
reference_data = bob.bio.video.FrameContainer(bob.io.base.HDF5File(reference_file, 'r'))
assert preprocessed_video.is_similar_to(reference_data)
def test_flandmark():
video_file = pkg_resources.resource_filename("bob.bio.video.test", "data/testvideo.avi")
frame_selector = bob.bio.video.FrameSelector(max_number_of_frames=3, selection_style="spread")
preprocessor = bob.bio.video.preprocessor.Preprocessor('landmark-detect', frame_selector, compressed_io=False)
video = preprocessor.read_original_data(video_file)
assert isinstance(video, bob.bio.video.FrameContainer)
preprocessed_video = preprocessor(video)
assert isinstance(preprocessed_video, bob.bio.video.FrameContainer)
reference_file = pkg_resources.resource_filename("bob.bio.video.test", "data/preprocessed-flandmark.hdf5")
if regenerate_refs:
preprocessed_video.save(bob.io.base.HDF5File(reference_file, 'w'))
reference_data = bob.bio.video.FrameContainer(bob.io.base.HDF5File(reference_file, 'r'))
assert preprocessed_video.is_similar_to(reference_data)
import os
import numpy
import bob.io.base
import bob.io.base.test_utils
import bob.io.image
import bob.io.video
import bob.db.verification.utils
from nose.plugins.skip import SkipTest
import bob.bio.base
import bob.bio.video