Commit c18502b0 authored by André Anjos's avatar André Anjos 💬
Browse files

[models] Move heart-rate and face detection to this package

parent 2b86a4e9
......@@ -8,8 +8,16 @@
import os
import sys
import pkg_resources
import numpy
import bob.io.base
import bob.db.base
from bob.db.base.driver import Interface as BaseInterface
DATABASE_LOCATION = '/idiap/resource/database/HCI_Tagging'
# Driver API
# ==========
......@@ -33,6 +41,49 @@ def dumplist(args):
return 0
def create_meta(args):
"""Runs the face detection, heart-rate estimation, save outputs at package"""
from . import Database
db = Database()
objects = db.objects()
if args.selftest:
objects = objects[:5]
basedir = pkg_resources.resource_filename(__name__, 'data')
for obj in objects:
try:
max_faces = 0
if args.selftest: max_faces = 2
detections = obj.run_face_detector(args.directory, max_faces)
quality = []
arr = []
for k in sorted(detections.keys()):
det = detections[k]
quality.append(det['quality'])
bb = det['boundingbox']
arr.append([k, bb.topleft[1], bb.topleft[0], bb.size[1], bb.size[0]])
output = obj.make_path(basedir, '.hdf5')
outdir = os.path.dirname(output)
if not os.path.exists(outdir): os.makedirs(outdir)
h5 = bob.io.base.HDF5File(output, 'w')
h5.set('detections', numpy.array(arr))
h5.set_attribute('quality', numpy.array(quality))
h5.set_attribute('heartrate_bpm',
obj.estimate_heartrate_in_bpm(args.directory))
h5.close()
finally:
if args.selftest:
if os.path.exists(basedir):
import shutil
shutil.rmtree(basedir)
return 0
def checkfiles(args):
"""Checks the existence of the files based on your criteria"""
......@@ -95,7 +146,7 @@ class Interface(BaseInterface):
# add the dumplist command
dump_message = "Dumps list of files based on your criteria"
dump_parser = subparsers.add_parser('dumplist', help=dump_message)
dump_parser.add_argument('-d', '--directory', dest="directory", default='', help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
dump_parser.add_argument('-d', '--directory', dest="directory", default=DATABASE_LOCATION, help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
dump_parser.add_argument('-e', '--extension', dest="extension", default='', help="if given, this extension will be appended to every entry returned (defaults to '%(default)s')")
dump_parser.add_argument('--self-test', dest="selftest", default=False, action='store_true', help=SUPPRESS)
dump_parser.set_defaults(func=dumplist) #action
......@@ -103,7 +154,14 @@ class Interface(BaseInterface):
# add the checkfiles command
check_message = "Check if the files exist, based on your criteria"
check_parser = subparsers.add_parser('checkfiles', help=check_message)
check_parser.add_argument('-d', '--directory', dest="directory", default='', help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
check_parser.add_argument('-d', '--directory', dest="directory", default=DATABASE_LOCATION, help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
check_parser.add_argument('-e', '--extension', dest="extension", default='', help="if given, this extension will be appended to every entry returned (defaults to '%(default)s')")
check_parser.add_argument('--self-test', dest="selftest", default=False, action='store_true', help=SUPPRESS)
check_parser.set_defaults(func=checkfiles) #action
# add the create_meta command
meta_message = create_meta.__doc__
meta_parser = subparsers.add_parser('mkmeta', help=create_meta.__doc__)
meta_parser.add_argument('-d', '--directory', dest="directory", default=DATABASE_LOCATION, help="This path points to the location where the database raw files are installed (defaults to '%(default)s')")
meta_parser.add_argument('--self-test', dest="selftest", default=False, action='store_true', help=SUPPRESS)
meta_parser.set_defaults(func=create_meta) #action
......@@ -4,9 +4,12 @@
# Wed 30 Sep 2015 12:13:47 CEST
import os
import logging
import numpy
import bob.db.base
import bob.io.base
import bob.ip.facedetect
def bdf_load_signal(fn, name='EXG3', start=None, end=None):
......@@ -191,6 +194,55 @@ class File(object):
return bob.io.video.reader(path)
def run_face_detector(self, directory, max_frames=0):
"""Runs bob.ip.facedetect stock detector on the whole video.
Parameters:
directory
A directory name that leads to the location the database is installed
on the local disk
max_frames (int): If set, delimits the maximum number of frames to treat
from the associated video file.
Returns:
dict: A dictionary containing the detected face bounding boxes and
quality information.
"""
detections = {}
data = self.load_video(directory)
if max_frames: data = data[:max_frames]
for k, frame in enumerate(data):
bb, quality = bob.ip.facedetect.detect_single_face(frame)
detections[k] = {'boundingbox': bb, 'quality': quality}
return detections
def estimate_heartrate_in_bpm(self, directory):
"""Estimates the person's heart rate using the ECG sensor data
Keyword parameters:
directory
A directory name that leads to the location the database is installed
on the local disk
"""
from .utils import estimate_average_heartrate, chooser
estimates = []
for channel in ('EXG1', 'EXG2', 'EXG3'):
signal, freq = bdf_load_signal(self.make_path(directory), channel)
avg_hr, peaks = estimate_average_heartrate(signal, freq)
estimates.append(avg_hr)
return chooser(estimates)
def save(self, data, directory=None, extension='.hdf5'):
"""Saves the input data at the specified location and using the given
extension.
......@@ -210,5 +262,6 @@ class File(object):
"""
path = self.make_path(directory, extension)
bob.db.base.utils.makedirs_safe(os.path.dirname(path))
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
bob.io.base.save(data, path)
......@@ -10,7 +10,8 @@ import os, sys
import unittest
from . import Database
DATABASE_LOCATION = '/idiap/resource/database/HCI_Tagging'
from .driver import DATABASE_LOCATION
def db_available(test):
"""Decorator for detecting if we're running the test at Idiap"""
......@@ -100,3 +101,18 @@ class CmdLineTest(unittest.TestCase):
]
self.assertEqual(main(args), 0)
@db_available
def test03_can_create_meta(self):
from bob.db.base.script.dbmanage import main
args = [
'hci_tagging',
'mkmeta',
'--self-test',
'--directory=%s' % DATABASE_LOCATION,
]
self.assertEqual(main(args), 0)
#!/usr/bin/env python
# encoding: utf-8
# Andre Anjos <andre.anjos@idiap.ch>
# Thu 1 Oct 11:00:44 CEST 2015
'''Utilities for Remote Photo-Plethysmography Benchmarking'''
import numpy
import bob.ip.facedetect
import antispoofing.utils.faceloc
from mne.preprocessing.ecg import qrs_detector
def estimate_average_heartrate(s, sampling_frequency):
'''Estimates the average heart rate taking as base the input signal and its
sampling frequency.
This method will use the Pam-Tompkins detector available the MNE package to
clean-up and estimate the heart-beat frequency based on the ECG sensor
information provided.
Returns:
float: The estimated average heart-rate in beats-per-minute
'''
peaks = qrs_detector(sampling_frequency, s)
instantaneous_rates = (sampling_frequency * 60) / numpy.diff(peaks)
# remove instantaneous rates which are lower than 30, higher than 240
selector = (instantaneous_rates>30) & (instantaneous_rates<240)
return float(numpy.nan_to_num(instantaneous_rates[selector].mean())), peaks
def plot_signal(s, sampling_frequency):
'''Estimates the heart rate taking as base the input signal and its sampling
frequency, plots QRS peaks discovered on the base signal.
This method will use the Pam-Tompkins detector available the MNE package to
clean-up and estimate the heart-beat frequency based on the ECG sensor
information provided.
Returns:
float: The estimated average heart-rate in beats-per-minute
'''
avg, peaks = estimate_average_heartrate(s, sampling_frequency)
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1)
fig.set_size_inches(18, 6, forward=True)
ax.plot(numpy.arange(0, len(s)/sampling_frequency, 1/sampling_frequency),
s, label='Raw signal');
xmin, xmax, ymin, ymax = plt.axis()
ax.vlines(peaks / sampling_frequency, ymin, ymax, colors='r', label='P-T QRS detector')
plt.xlim(0, len(s)/sampling_frequency)
plt.ylabel('uV')
plt.xlabel('time (s)')
plt.title('Average heart-rate = %d bpm' % avg)
ax.grid(True)
ax.legend(loc='best', fancybox=True, framealpha=0.5)
plt.show()
return avg, peaks
def chooser(average_rates):
'''Chooses the averate heart-rate from the estimates of 3 sensors. Avoid
rates from sensors which are far way from the other ones.'''
agreement = 3. #bpm
non_zero = [k for k in average_rates if int(k)]
if len(non_zero) == 0: return 0 #unknown!
elif len(non_zero) == 1: return non_zero[0]
elif len(non_zero) == 2:
agree = abs(non_zero[0] - non_zero[1]) < agreement
if agree: return numpy.mean(non_zero)
else: #chooses the lowest
return sorted(non_zero)[0]
# else, there are 3 values and we must do a more complex heuristic
r0_agrees_with_r1 = abs(average_rates[0] - average_rates[1]) < agreement
r0_agrees_with_r2 = abs(average_rates[0] - average_rates[2]) < agreement
r1_agrees_with_r2 = abs(average_rates[1] - average_rates[2]) < agreement
if r0_agrees_with_r1:
if r1_agrees_with_r2: #all 3 agree
return numpy.mean(average_rates)
else: #exclude r2
return numpy.mean(average_rates[:2])
else:
if r1_agrees_with_r2: #exclude r0
return numpy.mean(average_rates[1:])
else: #no agreement at all pick mid-way
return sorted(average_rates)[1]
if r1_agrees_with_r2:
if r0_agrees_with_r1: #all 3 agree
return numpy.mean(average_rates)
else: #exclude r0
return numpy.mean(average_rates[1:])
else:
if r0_agrees_with_r1: #exclude r2
return numpy.mean(average_rates[:2])
else: #no agreement at all pick middle way
return sorted(average_rates)[1]
def detect_faces_on_video(data):
"""Detect faces on a video sequence using bob.ip.facedetect"""
detections = {}
for k, frame in enumerate(data):
bb, quality = bob.ip.facedetect.detect_single_face(frame)
if quality > 20:
detections[k] = antispoofing.utils.faceloc.BoundingBox(bb.topleft[1], bb.topleft[0], bb.size[1], bb.size[0])
return detections
......@@ -41,6 +41,8 @@ setup(
'bob.db.base',
'bob.io.video',
'python-edf',
'bob.ip.facedetect',
'mne',
],
classifiers=[
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment