Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
bob
bob.pad.face
Commits
0b334636
Commit
0b334636
authored
Nov 10, 2020
by
Amir MOHAMMADI
Browse files
Merge branch 'dask-pipelines' into 'master'
Remove deprecated code See merge request
!108
parents
cdce425f
676fac8a
Pipeline
#45486
failed with stages
in 5 minutes and 10 seconds
Changes
24
Pipelines
2
Expand all
Hide whitespace changes
Inline
Side-by-side
bob/pad/face/__init__.py
View file @
0b334636
from
.
import
extractor
,
preprocessor
,
database
,
test
from
.
import
extractor
,
preprocessor
,
database
def
get_config
():
...
...
bob/pad/face/config/vanilla_pad/qm_svm.py
View file @
0b334636
...
...
@@ -8,7 +8,7 @@ from bob.pad.face.extractor import ImageQualityMeasure
from
sklearn.svm
import
SVC
from
sklearn.model_selection
import
GridSearchCV
from
sklearn.pipeline
import
make_pipeline
from
bob.pad.
base.pipelines.vanilla_pad
import
FrameContainers
ToFrames
from
bob.pad.
face.transformer
import
Video
ToFrames
import
bob.pipelines
as
mario
database
=
globals
().
get
(
"database"
)
...
...
@@ -42,7 +42,7 @@ extractor = mario.wrap(
)
# new stuff #
frame_cont_to_array
=
FrameContainers
ToFrames
()
frame_cont_to_array
=
Video
ToFrames
()
param_grid
=
[
{
"C"
:
[
1
,
10
,
100
,
1000
],
"kernel"
:
[
"linear"
]},
...
...
bob/pad/face/extractor/LTSS.py
deleted
100644 → 0
View file @
cdce425f
#!/usr/bin/env python
# encoding: utf-8
import
numpy
from
bob.bio.base.extractor
import
Extractor
from
bob.core.log
import
setup
logger
=
setup
(
"bob.pad.face"
)
from
scipy.fftpack
import
rfft
class
LTSS
(
Extractor
,
object
):
"""Compute Long-term spectral statistics of a pulse signal.
The features are described in the following article:
H. Muckenhirn, P. Korshunov, M. Magimai-Doss, and S. Marcel
Long-Term Spectral Statistics for Voice Presentation Attack Detection,
IEEE/ACM Trans. Audio, Speech and Language Processing. vol 25, n. 11, 2017
Attributes
----------
window_size : :obj:`int`
The size of the window where FFT is computed
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
concat : :obj:`bool`
Flag if you would like to concatenate features from the 3 color channels
time : :obj:`int`
The length of the signal to consider (in seconds)
"""
def
__init__
(
self
,
window_size
=
25
,
framerate
=
25
,
nfft
=
64
,
concat
=
False
,
debug
=
False
,
time
=
0
,
**
kwargs
):
"""Init function
Parameters
----------
window_size : :obj:`int`
The size of the window where FFT is computed
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
concat : :obj:`bool`
Flag if you would like to concatenate features from the 3 color channels
time : :obj:`int`
The length of the signal to consider (in seconds)
"""
super
(
LTSS
,
self
).
__init__
(
**
kwargs
)
self
.
framerate
=
framerate
# TODO: try to use window size as NFFT - Guillaume HEUSCH, 04-07-2018
self
.
nfft
=
nfft
self
.
debug
=
debug
self
.
window_size
=
window_size
self
.
concat
=
concat
self
.
time
=
time
def
_get_ltss
(
self
,
signal
):
"""Compute long term spectral statistics for a signal
Parameters
----------
signal : :py:class:`numpy.ndarray`
The signal
Returns
-------
:py:class:`numpy.ndarray`
The spectral statistics of the signal.
"""
window_stride
=
int
(
self
.
window_size
/
2
)
# log-magnitude of DFT coefficients
log_mags
=
[]
# go through windows
for
w
in
range
(
0
,
(
signal
.
shape
[
0
]
-
self
.
window_size
),
window_stride
):
# n is even, as a consequence the fft is as follows [y(0), Re(y(1)), Im(y(1)), ..., Re(y(n/2))]
# i.e. each coefficient, except first and last, is represented by two numbers (real + imaginary)
fft
=
rfft
(
signal
[
w
:
w
+
self
.
window_size
],
n
=
self
.
nfft
)
# the magnitude is the norm of the complex numbers, so its size is n/2 + 1
mags
=
numpy
.
zeros
((
int
(
self
.
nfft
/
2
)
+
1
),
dtype
=
numpy
.
float64
)
# first coeff is real
if
abs
(
fft
[
0
])
<
1
:
mags
[
0
]
=
1
else
:
mags
[
0
]
=
abs
(
fft
[
0
])
# go through coeffs 2 to n/2
index
=
1
for
i
in
range
(
1
,
(
fft
.
shape
[
0
]
-
1
),
2
):
mags
[
index
]
=
numpy
.
sqrt
(
fft
[
i
]
**
2
+
fft
[
i
+
1
]
**
2
)
if
mags
[
index
]
<
1
:
mags
[
index
]
=
1
index
+=
1
# last coeff is real too
if
abs
(
fft
[
-
1
])
<
1
:
mags
[
index
]
=
1
else
:
mags
[
index
]
=
abs
(
fft
[
-
1
])
log_mags
.
append
(
numpy
.
log
(
mags
))
# build final feature
log_mags
=
numpy
.
array
(
log_mags
)
mean
=
numpy
.
mean
(
log_mags
,
axis
=
0
)
std
=
numpy
.
std
(
log_mags
,
axis
=
0
)
ltss
=
numpy
.
concatenate
([
mean
,
std
])
return
ltss
def
__call__
(
self
,
signal
):
"""Computes the long-term spectral statistics for given pulse signals.
Parameters
----------
signal: numpy.ndarray
The signal
Returns
-------
feature: numpy.ndarray
the computed LTSS features
"""
# sanity check
if
signal
.
ndim
==
1
:
if
numpy
.
isnan
(
numpy
.
sum
(
signal
)):
return
if
signal
.
ndim
==
2
and
(
signal
.
shape
[
1
]
==
3
):
for
i
in
range
(
signal
.
shape
[
1
]):
if
numpy
.
isnan
(
numpy
.
sum
(
signal
[:,
i
])):
return
# truncate the signal according to time
if
self
.
time
>
0
:
number_of_frames
=
self
.
time
*
self
.
framerate
# check that the truncated signal is not longer
# than the original one
if
number_of_frames
<
signal
.
shape
[
0
]:
if
signal
.
ndim
==
1
:
signal
=
signal
[:
number_of_frames
]
if
signal
.
ndim
==
2
and
(
signal
.
shape
[
1
]
==
3
):
new_signal
=
numpy
.
zeros
((
number_of_frames
,
3
))
for
i
in
range
(
signal
.
shape
[
1
]):
new_signal
[:,
i
]
=
signal
[:
number_of_frames
,
i
]
signal
=
new_signal
else
:
logger
.
warning
(
"Sequence should be truncated to {}, but only contains {} => keeping original one"
.
format
(
number_of_frames
,
signal
.
shape
[
0
]))
# also, be sure that the window_size is not bigger that the signal
if
self
.
window_size
>
int
(
signal
.
shape
[
0
]
/
2
):
self
.
window_size
=
int
(
signal
.
shape
[
0
]
/
2
)
logger
.
warning
(
"Window size reduced to {}"
.
format
(
self
.
window_size
))
# we have a single pulse
if
signal
.
ndim
==
1
:
feature
=
self
.
_get_ltss
(
signal
)
# pulse for the 3 color channels
if
signal
.
ndim
==
2
and
(
signal
.
shape
[
1
]
==
3
):
if
not
self
.
concat
:
feature
=
self
.
_get_ltss
(
signal
[:,
1
])
else
:
ltss
=
[]
for
i
in
range
(
signal
.
shape
[
1
]):
ltss
.
append
(
self
.
_get_ltss
(
signal
[:,
i
]))
feature
=
numpy
.
concatenate
([
ltss
[
0
],
ltss
[
1
],
ltss
[
2
]])
if
numpy
.
isnan
(
numpy
.
sum
(
feature
)):
logger
.
warn
(
"Feature not extracted"
)
return
if
numpy
.
sum
(
feature
)
==
0
:
logger
.
warn
(
"Feature not extracted"
)
return
return
feature
bob/pad/face/extractor/LiSpectralFeatures.py
deleted
100644 → 0
View file @
cdce425f
#!/usr/bin/env python
# encoding: utf-8
import
numpy
from
bob.bio.base.extractor
import
Extractor
from
bob.core.log
import
setup
logger
=
setup
(
"bob.pad.face"
)
class
LiSpectralFeatures
(
Extractor
,
object
):
"""Compute features from pulse signals in the three color channels.
The features are described in the following article:
X. Li, J. Komulainen, G. Zhao, P-C Yuen and M. Pietikainen,
Generalized Face Anti-spoofing by Detecting Pulse From Face Videos
Intl Conf. on Pattern Recognition (ICPR), 2016.
Attributes
----------
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
"""
def
__init__
(
self
,
framerate
=
25
,
nfft
=
512
,
debug
=
False
,
**
kwargs
):
"""Init function
Parameters
----------
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
"""
super
(
LiSpectralFeatures
,
self
).
__init__
()
self
.
framerate
=
framerate
self
.
nfft
=
nfft
self
.
debug
=
debug
def
__call__
(
self
,
signal
):
"""Compute the frequency features for the given signal.
Parameters
----------
signal : :py:class:`numpy.ndarray`
The signal
Returns
-------
:py:class:`numpy.ndarray`
the computed features
"""
# sanity check
assert
signal
.
ndim
==
2
and
signal
.
shape
[
1
]
==
3
,
"You should provide 3 pulse signals"
for
i
in
range
(
3
):
if
numpy
.
isnan
(
numpy
.
sum
(
signal
[:,
i
])):
return
feature
=
numpy
.
zeros
(
6
)
# when keypoints have not been detected, the pulse is zero everywhere
# hence, no psd and no features
zero_counter
=
0
for
i
in
range
(
3
):
if
numpy
.
sum
(
signal
[:,
i
])
==
0
:
zero_counter
+=
1
if
zero_counter
==
3
:
logger
.
warn
(
"Feature is all zeros"
)
return
feature
# get the frequency spectrum
spectrum_dim
=
int
((
self
.
nfft
/
2
)
+
1
)
ffts
=
numpy
.
zeros
((
3
,
spectrum_dim
))
f
=
numpy
.
fft
.
fftfreq
(
self
.
nfft
)
*
self
.
framerate
f
=
abs
(
f
[:
spectrum_dim
])
for
i
in
range
(
3
):
ffts
[
i
]
=
abs
(
numpy
.
fft
.
rfft
(
signal
[:,
i
],
n
=
self
.
nfft
))
# find the max of the frequency spectrum in the range of interest
first
=
numpy
.
where
(
f
>
0.7
)[
0
]
last
=
numpy
.
where
(
f
<
4
)[
0
]
first_index
=
first
[
0
]
last_index
=
last
[
-
1
]
range_of_interest
=
range
(
first_index
,
last_index
+
1
,
1
)
# build the feature vector
for
i
in
range
(
3
):
total_power
=
numpy
.
sum
(
ffts
[
i
,
range_of_interest
])
max_power
=
numpy
.
max
(
ffts
[
i
,
range_of_interest
])
feature
[
i
]
=
max_power
if
total_power
==
0
:
feature
[
i
+
3
]
=
0
else
:
feature
[
i
+
3
]
=
max_power
/
total_power
# plot stuff, if asked for
if
self
.
debug
:
from
matplotlib
import
pyplot
for
i
in
range
(
3
):
max_idx
=
numpy
.
argmax
(
ffts
[
i
,
range_of_interest
])
f_max
=
f
[
range_of_interest
[
max_idx
]]
logger
.
debug
(
"Inferred HR = {}"
.
format
(
f_max
*
60
))
pyplot
.
plot
(
f
,
ffts
[
i
],
'k'
)
xmax
,
xmin
,
ymax
,
ymin
=
pyplot
.
axis
()
pyplot
.
vlines
(
f
[
range_of_interest
[
max_idx
]],
ymin
,
ymax
,
color
=
'red'
)
pyplot
.
vlines
(
f
[
first_index
],
ymin
,
ymax
,
color
=
'green'
)
pyplot
.
vlines
(
f
[
last_index
],
ymin
,
ymax
,
color
=
'green'
)
pyplot
.
show
()
if
numpy
.
isnan
(
numpy
.
sum
(
feature
)):
logger
.
warn
(
"Feature not extracted"
)
return
return
feature
bob/pad/face/extractor/NormalizeLength.py
deleted
100644 → 0
View file @
cdce425f
#!/usr/bin/env python
# encoding: utf-8
import
numpy
from
bob.bio.base.extractor
import
Extractor
import
logging
logger
=
logging
.
getLogger
(
"bob.pad.face"
)
class
NormalizeLength
(
Extractor
,
object
):
"""
Normalize the length of feature vectors, such that
they all have the same dimensions
**Parameters:**
length: int
The final length of the final feature vector
requires_training: boolean
This extractor actually may requires "training".
The goal here is to retrieve the length of the shortest sequence
debug: boolean
Plot stuff
"""
def
__init__
(
self
,
length
=-
1
,
debug
=
False
,
requires_training
=
True
,
**
kwargs
):
super
(
NormalizeLength
,
self
).
__init__
(
requires_training
=
requires_training
,
**
kwargs
)
self
.
length
=
length
self
.
debug
=
debug
def
__call__
(
self
,
signal
):
"""
Normalize the length of the signal
**Parameters:**
signal: numpy.array
The signal
**Returns:**
signal: numpy.array
the signal with the provided length
"""
# we have a single pulse signal
if
signal
.
ndim
==
1
:
signal
=
signal
[:
self
.
length
]
# we have 3 pulse signal (Li's preprocessing)
# in this case, return the signal corresponding to the green channel
if
signal
.
ndim
==
2
and
(
signal
.
shape
[
1
]
==
3
):
signal
=
signal
[:
self
.
length
,
1
]
if
numpy
.
isnan
(
numpy
.
sum
(
signal
)):
return
if
signal
.
shape
[
0
]
<
self
.
length
:
logger
.
debug
(
"signal shorter than training shape: {} vs {}"
.
format
(
signal
.
shape
[
0
],
self
.
length
))
import
sys
sys
.
exit
()
tmp
=
numpy
.
zeros
((
self
.
length
),
dtype
=
signal
.
dtype
)
tmp
[:,
signal
.
shape
[
0
]]
signal
=
tmp
if
self
.
debug
:
from
matplotlib
import
pyplot
pyplot
.
plot
(
signal
,
'k'
)
pyplot
.
title
(
'Signal truncated'
)
pyplot
.
show
()
return
signal
def
train
(
self
,
training_data
,
extractor_file
):
"""
This function determines the shortest length across the training set.
It will be used to normalize the length of all the sequences.
**Parameters:**
training_data : [object] or [[object]]
A list of *preprocessed* data that can be used for training the extractor.
Data will be provided in a single list, if ``split_training_features_by_client = False`` was specified in the constructor,
otherwise the data will be split into lists, each of which contains the data of a single (training-)client.
extractor_file : str
The file to write.
This file should be readable with the :py:meth:`load` function.
"""
self
.
length
=
100000
for
i
in
range
(
len
(
training_data
)):
if
training_data
[
i
].
shape
[
0
]
<
self
.
length
:
self
.
length
=
training_data
[
i
].
shape
[
0
]
logger
.
info
(
"Signals will be truncated to {} dimensions"
.
format
(
self
.
length
))
bob/pad/face/extractor/OpticalFlow.py
deleted
100644 → 0
View file @
cdce425f
from
bob.bio.video
import
FrameContainer
from
bob.io.base
import
HDF5File
from
bob.ip.optflow.liu.cg
import
flow
from
collections
import
Iterator
from
functools
import
partial
import
bob.pipelines
as
mario
import
logging
logger
=
logging
.
getLogger
(
__name__
)
def
_check_frame
(
frame
):
if
frame
.
dtype
==
"uint8"
:
return
frame
.
astype
(
"float64"
)
/
255.0
return
frame
.
astype
(
"float64"
)
class
_Reader
:
def
__init__
(
self
,
i1
,
flow_method
):
self
.
i1
=
_check_frame
(
i1
)
self
.
flow_method
=
flow_method
def
__call__
(
self
,
i2
):
i2
=
_check_frame
(
i2
)
flows
=
self
.
flow_method
(
self
.
i1
,
i2
)[:
2
]
self
.
i1
=
i2
return
flows
class
OpticalFlow
(
object
):
"""An optical flow extractor
For more information see :any:`bob.ip.optflow.liu.cg.flow`.
Attributes
----------
alpha : float
Regularization weight
inner : int
The number of inner fixed point iterations
iterations : int
The number of conjugate-gradient (CG) iterations
min_width : int
Width of the coarsest level
outer : int
The number of outer fixed point iterations
ratio : float
Downsample ratio
"""
def
__init__
(
self
,
alpha
=
0.02
,
ratio
=
0.75
,
min_width
=
30
,
outer
=
20
,
inner
=
1
,
iterations
=
50
,
**
kwargs
):
super
().
__init__
(
**
kwargs
)
self
.
alpha
=
alpha
self
.
ratio
=
ratio
self
.
min_width
=
min_width
self
.
outer
=
outer
self
.
inner
=
inner
self
.
iterations
=
iterations
def
__call__
(
self
,
video
):
"""Computes optical flows on a video
Please note that the video should either be uint8 or float64 with values from 0
to 1.
Parameters
----------
video : numpy.ndarray
The video. Can be a FrameContainer, generator, bob.io.video.reader, or a
numpy array.
Returns
-------
numpy.ndarray
The flows calculated for each pixel. The output shape will be
[number_of_frames - 1, 2, height, width].
"""
if
isinstance
(
video
,
FrameContainer
):
video
=
video
.
as_array
()
if
not
isinstance
(
video
,
Iterator
):
video
=
iter
(
video
)
i1
=
next
(
video
)
reader
=
_Reader
(
i1
,
partial
(
flow
,
alpha
=
self
.
alpha
,
ratio
=
self
.
ratio
,
min_width
=
self
.
min_width
,
n_outer_fp_iterations
=
self
.
outer
,
n_inner_fp_iterations
=
self
.
inner
,
n_cg_iterations
=
self
.
iterations
,
),
)
flows
=
mario
.
utils
.
vstack_features
(
reader
,
video
)
shape
=
list
(
flows
.
shape
)
shape
[
0
]
=
2
shape
.
insert
(
0
,
-
1
)
return
flows
.
reshape
(
shape
)
def
write_feature
(
self
,
feature
,
feature_file
):
if
not
isinstance
(
feature_file
,
HDF5File
):
feature_file
=
HDF5File
(
feature_file
,
"w"
)
feature_file
.
set
(
"uv"
,
feature
)
feature_file
.
set_attribute
(
"method"
,
"liu.cg"
,
"uv"
)
feature_file
.
set_attribute
(
"alpha"
,
self
.
alpha
,
"uv"
)
feature_file
.
set_attribute
(
"ratio"
,
self
.
ratio
,
"uv"
)
feature_file
.
set_attribute
(
"min_width"
,
self
.
min_width
,
"uv"
)
feature_file
.
set_attribute
(
"n_outer_fp_iterations"
,
self
.
outer
,
"uv"
)
feature_file
.
set_attribute
(
"n_inner_fp_iterations"
,
self
.
inner
,
"uv"
)
feature_file
.
set_attribute
(
"n_iterations"
,
self
.
iterations
,
"uv"
)
def
read_feature
(
self
,
feature_file
):
if
not
isinstance
(
feature_file
,
HDF5File
):
feature_file
=
HDF5File
(
feature_file
,
"r"
)
return
feature_file
[
"uv"
]
bob/pad/face/extractor/PPGSecure.py
deleted
100644 → 0
View file @
cdce425f
#!/usr/bin/env python
# encoding: utf-8
import
numpy
from
bob.bio.base.extractor
import
Extractor
from
bob.core.log
import
setup
logger
=
setup
(
"bob.pad.face"
)
class
PPGSecure
(
Extractor
,
object
):
"""Extract frequency spectra from pulse signals.
The feature are extracted according to what is described in
the following article:
E.M Nowara, A. Sabharwal and A. Veeraraghavan,
"PPGSecure: Biometric Presentation Attack Detection using Photoplethysmograms",
IEEE Intl Conf. on Automatic Face and Gesture Recognition, 2017.
Attributes
----------
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff