Commit ce1c62ae authored by André Anjos's avatar André Anjos 💬
Browse files

[utils] Add debugging tools for face detection, heart-rate estimation

parent 0996bac8
......@@ -4,127 +4,12 @@
# Wed 30 Sep 2015 12:13:47 CEST
import os
import logging
import pkg_resources
import numpy
import bob.db.base
import bob.io.base
import bob.ip.facedetect
def bdf_load_signal(fn, name='EXG3', start=None, end=None):
"""Loads a signal named ``name`` from the BDF filenamed ``fn``
Parameters:
fn (path): The full path to the file to read
name (str): The name of the channel to read.
start (int, option): Start time in seconds
end (int, optional): End time in seconds
List of physiological channels used (there are more available, but contain no
meaningful data) on the Mahnob HCI-Tagging database:
These are the 32 electrodes from the EEG cap (measurements in uV; for full
positioning details, see the full database description report, available on
the database website):
* AF3
* AF4
* C3
* C4
* CP1
* CP2
* CP5
* CP6
* Cz
* F3
* F4
* F7
* F8
* FC1
* FC2
* FC5
* FC6
* Fp1
* Fp2
* Fz
* O1
* O2
* Oz
* P3
* P4
* P7
* P8
* PO3
* PO4
* Pz
* T7
* T8
These are ECG sensors (measurements in uV):
* EXG1: Upper right corner of chest, under clavicle bone
* EXG2: Upper left corner of chest, under clavicle bone
* EXG3: Left side of abdomen (very clean)
Other sensors:
* GSR1: Galvanic skin response (in Ohm)
* Resp: Respiration belt (in uV)
* Status: Status channel containing markers (Boolean)
* Temp: Skin temperature on the left pinky (Celsius)
"""
import edflib
with edflib.EdfReader(fn) as e:
# get the status information, so we how the video is synchronized
status_index = e.getSignalTextLabels().index('Status')
sample_frequency = e.samplefrequency(status_index)
status_size = e.samples_in_file(status_index)
status = numpy.zeros((status_size,), dtype='float64')
e.readsignal(status_index, 0, status_size, status)
status = status.round().astype('int')
nz_status = status.nonzero()[0]
# because we're interested in the video bits, make sure to get data
# from that period only
video_start = nz_status[0]
video_end = nz_status[-1]
# retrieve information from this rather chaotic API
index = e.getSignalTextLabels().index(name)
sample_frequency = e.samplefrequency(index)
video_start_seconds = video_start/sample_frequency
if start is not None:
start += video_start_seconds
start *= sample_frequency
if start < video_start: start = video_start
start = int(start)
else:
start = video_start
if end is not None:
end += video_start_seconds
end *= sample_frequency
if end > video_end: end = video_end
end = int(end)
else:
end = video_end
# now read the data into a numpy array (read everything)
container = numpy.zeros((end-start,), dtype='float64')
e.readsignal(index, start, end-start, container)
return container, sample_frequency
from . import utils
class File(object):
......@@ -253,7 +138,7 @@ class File(object):
estimates = []
for channel in ('EXG1', 'EXG2', 'EXG3'):
signal, freq = bdf_load_signal(self.make_path(directory), channel)
signal, freq = utils.bdf_load_signal(self.make_path(directory), channel)
avg_hr, peaks = estimate_average_heartrate(signal, freq)
estimates.append(avg_hr)
return chooser(estimates)
......
......@@ -5,12 +5,130 @@
'''Utilities for Remote Photo-Plethysmography Benchmarking'''
import os
import numpy
import bob.io.video
import bob.ip.draw
import bob.ip.facedetect
import antispoofing.utils.faceloc
from mne.preprocessing.ecg import qrs_detector
def bdf_load_signal(fn, name='EXG3', start=None, end=None):
"""Loads a signal named ``name`` from the BDF filenamed ``fn``
Parameters:
fn (path): The full path to the file to read
name (str): The name of the channel to read.
start (int, option): Start time in seconds
end (int, optional): End time in seconds
List of physiological channels used (there are more available, but contain no
meaningful data) on the Mahnob HCI-Tagging database:
These are the 32 electrodes from the EEG cap (measurements in uV; for full
positioning details, see the full database description report, available on
the database website):
* AF3
* AF4
* C3
* C4
* CP1
* CP2
* CP5
* CP6
* Cz
* F3
* F4
* F7
* F8
* FC1
* FC2
* FC5
* FC6
* Fp1
* Fp2
* Fz
* O1
* O2
* Oz
* P3
* P4
* P7
* P8
* PO3
* PO4
* Pz
* T7
* T8
These are ECG sensors (measurements in uV):
* EXG1: Upper right corner of chest, under clavicle bone
* EXG2: Upper left corner of chest, under clavicle bone
* EXG3: Left side of abdomen (very clean)
Other sensors:
* GSR1: Galvanic skin response (in Ohm)
* Resp: Respiration belt (in uV)
* Status: Status channel containing markers (Boolean)
* Temp: Skin temperature on the left pinky (Celsius)
"""
import edflib
with edflib.EdfReader(fn) as e:
# get the status information, so we how the video is synchronized
status_index = e.getSignalTextLabels().index('Status')
sample_frequency = e.samplefrequency(status_index)
status_size = e.samples_in_file(status_index)
status = numpy.zeros((status_size,), dtype='float64')
e.readsignal(status_index, 0, status_size, status)
status = status.round().astype('int')
nz_status = status.nonzero()[0]
# because we're interested in the video bits, make sure to get data
# from that period only
video_start = nz_status[0]
video_end = nz_status[-1]
# retrieve information from this rather chaotic API
index = e.getSignalTextLabels().index(name)
sample_frequency = e.samplefrequency(index)
video_start_seconds = video_start/sample_frequency
if start is not None:
start += video_start_seconds
start *= sample_frequency
if start < video_start: start = video_start
start = int(start)
else:
start = video_start
if end is not None:
end += video_start_seconds
end *= sample_frequency
if end > video_end: end = video_end
end = int(end)
else:
end = video_end
# now read the data into a numpy array (read everything)
container = numpy.zeros((end-start,), dtype='float64')
e.readsignal(index, start, end-start, container)
return container, sample_frequency
def estimate_average_heartrate(s, sampling_frequency):
'''Estimates the average heart rate taking as base the input signal and its
sampling frequency.
......@@ -33,7 +151,7 @@ def estimate_average_heartrate(s, sampling_frequency):
return float(numpy.nan_to_num(instantaneous_rates[selector].mean())), peaks
def plot_signal(s, sampling_frequency):
def plot_signal(s, sampling_frequency, channel_name):
'''Estimates the heart rate taking as base the input signal and its sampling
frequency, plots QRS peaks discovered on the base signal.
......@@ -49,11 +167,7 @@ def plot_signal(s, sampling_frequency):
avg, peaks = estimate_average_heartrate(s, sampling_frequency)
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1)
fig.set_size_inches(18, 6, forward=True)
ax = plt.gca()
ax.plot(numpy.arange(0, len(s)/sampling_frequency, 1/sampling_frequency),
s, label='Raw signal');
xmin, xmax, ymin, ymax = plt.axis()
......@@ -61,10 +175,9 @@ def plot_signal(s, sampling_frequency):
plt.xlim(0, len(s)/sampling_frequency)
plt.ylabel('uV')
plt.xlabel('time (s)')
plt.title('Average heart-rate = %d bpm' % avg)
plt.title('Channel %s - Average heart-rate = %d bpm' % (channel_name, avg))
ax.grid(True)
ax.legend(loc='best', fancybox=True, framealpha=0.5)
plt.show()
return avg, peaks
......@@ -114,12 +227,44 @@ def chooser(average_rates):
return sorted(average_rates)[1]
def detect_faces_on_video(data):
"""Detect faces on a video sequence using bob.ip.facedetect"""
def annotate_video(video, annotations, output, thickness=3,
color=(255, 0, 0)):
'''Annotates the input video with the detected bounding boxes'''
detections = {}
for k, frame in enumerate(data):
bb, quality = bob.ip.facedetect.detect_single_face(frame)
if quality > 20:
detections[k] = antispoofing.utils.faceloc.BoundingBox(bb.topleft[1], bb.topleft[0], bb.size[1], bb.size[0])
return detections
directory = os.path.dirname(output)
if not os.path.exists(directory): os.makedirs(directory)
writer = bob.io.video.writer(output, height=video.height, width=video.width,
framerate=video.frame_rate, codec=video.codec_name)
for k, frame in enumerate(video):
bb = annotations.get(k)
if bb is not None:
for t in range(thickness):
bob.ip.draw.box(frame, (bb[1]-t, bb[0]-t),
(bb[3]+2*t, bb[2]+2*t), color)
writer.append(frame)
del writer
def explain_heartrate(obj, dbdir, output):
'''Explains why the currently chosen heart-rate is what it is'''
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
directory = os.path.dirname(output)
if not os.path.exists(directory): os.makedirs(directory)
# plots
estimates = []
pp = PdfPages(output)
for k, channel in enumerate(('EXG1', 'EXG2', 'EXG3')):
plt.figure(figsize=(12,4))
signal, freq = bdf_load_signal(obj.make_path(dbdir), channel)
avg_hr, peaks = plot_signal(signal, freq, channel)
estimates.append(avg_hr)
pp.savefig()
estimated = chooser(estimates)
pp.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment