Commit bfe48f7a authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Implemented new database interface

Implemented new database interface

Implemented new database interface
parent 8d60fa49
Pipeline #43903 failed with stage
in 13 minutes and 5 seconds
from .csv_dataset import CSVDatasetDevEval
from .file import BioFile
from .file import BioFileSet
from .database import BioDatabase
from .database import ZTBioDatabase
from .filelist import FileListBioDatabase
from . import filelist
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
......@@ -25,7 +25,7 @@ __appropriate__(
BioFile,
BioFileSet,
BioDatabase,
ZTBioDatabase,
FileListBioDatabase
ZTBioDatabase,
CSVDatasetDevEval,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
from bob.pipelines import Sample, DelayedSample, SampleSet
import csv
import bob.io.base
import functools
from abc import ABCMeta, abstractmethod
class CSVSampleLoaderAbstract(metaclass=ABCMeta):
"""
Convert CSV files in the format below to either a list of
:any:`bob.pipelines.DelayedSample` or :any:`bob.pipelines.SampleSet`
.. code-block:: text
PATH,SUBJECT
path_1,subject_1
path_2,subject_2
path_i,subject_j
...
.. note::
This class should be extended
Parameters
----------
data_loader:
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
extension:
The file extension
"""
def __init__(self, data_loader, extension=""):
self.data_loader = data_loader
self.extension = extension
self.excluding_attributes = ["_data", "load", "key"]
@abstractmethod
def __call__(self, filename):
pass
@abstractmethod
def convert_row_to_sample(self, row, header):
pass
@abstractmethod
def convert_samples_to_samplesets(self, samples, group_by_subject=True):
pass
class CSVToSampleLoader(CSVSampleLoaderAbstract):
"""
Simple mechanism to convert CSV files in the format below to either a list of
:any:`bob.pipelines.DelayedSample` or :any:`bob.pipelines.SampleSet`
"""
def __call__(self, filename):
def check_header(header):
"""
A header should have at least "SUBJECT" AND "PATH"
"""
header = [h.lower() for h in header]
if not "subject" in header:
raise ValueError(
"The field `subject` is not available in your dataset."
)
if not "path" in header:
raise ValueError("The field `path` is not available in your dataset.")
with open(filename) as cf:
reader = csv.reader(cf)
header = next(reader)
check_header(header)
return [self.convert_row_to_sample(row, header) for row in reader]
def convert_row_to_sample(self, row, header):
path = row[0]
subject = row[1]
kwargs = dict([[h, r] for h,r in zip(header[2:], row[2:])])
return DelayedSample(
functools.partial(self.data_loader, os.path.join(path, self.extension)),
key=path,
subject=subject,
**kwargs,
)
def convert_samples_to_samplesets(self, samples, group_by_subject=True):
def get_attribute_from_sample(sample):
return dict(
[
[attribute, sample.__dict__[attribute]]
for attribute in list(sample.__dict__.keys())
if attribute not in self.excluding_attributes
]
)
if group_by_subject:
# Grouping sample sets
sample_sets = dict()
for s in samples:
if s.subject not in sample_sets:
sample_sets[s.subject] = SampleSet(
[s], **get_attribute_from_sample(s)
)
sample_sets[s.subject].append(s)
return list(sample_sets.values())
else:
return [SampleSet([s], **get_attribute_from_sample(s)) for s in samples]
class CSVDatasetDevEval:
"""
Generic filelist dataset for :any:`bob.bio.base.pipelines.VanillaBiometrics` pipeline.
Check :ref:`vanilla_biometrics_features` for more details about the Vanilla Biometrics Dataset
interface.
To create a new dataset, you need to provide a directory structure similar to the one below:
.. code-block:: text
my_dataset/
my_dataset/my_protocol/
my_dataset/my_protocol/train.csv
my_dataset/my_protocol/train.csv/dev_enroll.csv
my_dataset/my_protocol/train.csv/dev_probe.csv
my_dataset/my_protocol/train.csv/eval_enroll.csv
my_dataset/my_protocol/train.csv/eval_probe.csv
...
In the above directory structure, inside of `my_dataset` should contain the directories with all
evaluation protocols this dataset might have.
Inside of the `my_protocol` directory should contain at least two csv files:
- dev_enroll.csv
- dev_probe.csv
Those csv files should contain in each row i-) the path to raw data and ii-) the subject label
for enrollment (:ref:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classes.Database.references`) and
probing (:ref:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classes.Database.probes`).
The structure of each CSV file should be as below:
.. code-block:: text
PATH,SUBJECT
path_1,subject_1
path_2,subject_2
path_i,subject_j
...
You might want to ship metadata within your Samples (e.g gender, age, annotation, ...)
To do so is simple, just do as below:
.. code-block:: text
PATH,SUBJECT,METADATA_1,METADATA_2,METADATA_k
path_1,subject_1,A,B,C
path_2,subject_2,A,B,1
path_i,subject_j,2,3,4
...
The files `my_dataset/my_protocol/train.csv/eval_enroll.csv` and `my_dataset/my_protocol/train.csv/eval_probe.csv`
are optional and it is used in case a protocol contains data for evaluation.
Finally, the content of the file `my_dataset/my_protocol/train.csv` is used in the case a protocol
contains data for training (:ref:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classes.Database.background_model_samples`)
Parameters
----------
dataset_path: str
Absolute path of the dataset protocol description
protocol: str
The name of the protocol
csv_to_sample_loader:
"""
def __init__(
self,
dataset_path,
protocol,
csv_to_sample_loader=CSVToSampleLoader(
data_loader=bob.io.base.load, extension=""
),
):
def get_paths():
if not os.path.exists(dataset_path):
raise ValueError(f"The path `{dataset_path}` was not found")
# TODO: Unzip file if dataset path is a zip
protocol_path = os.path.join(dataset_path, protocol)
if not os.path.exists(protocol_path):
raise ValueError(f"The protocol `{protocol}` was not found")
train_csv = os.path.join(protocol_path, "train.csv")
dev_enroll_csv = os.path.join(protocol_path, "dev_enroll.csv")
dev_probe_csv = os.path.join(protocol_path, "dev_probe.csv")
eval_enroll_csv = os.path.join(protocol_path, "eval_enroll.csv")
eval_probe_csv = os.path.join(protocol_path, "eval_probe.csv")
# The minimum required is to have `dev_enroll_csv` and `dev_probe_csv`
train_csv = train_csv if os.path.exists(train_csv) else None
# Eval
eval_enroll_csv = (
eval_enroll_csv if os.path.exists(eval_enroll_csv) else None
)
eval_probe_csv = eval_probe_csv if os.path.exists(eval_probe_csv) else None
# Dev
if not os.path.exists(dev_enroll_csv):
raise ValueError(
f"The file `{dev_enroll_csv}` is required and it was not found"
)
if not os.path.exists(dev_probe_csv):
raise ValueError(
f"The file `{dev_probe_csv}` is required and it was not found"
)
return (
train_csv,
dev_enroll_csv,
dev_probe_csv,
eval_enroll_csv,
eval_probe_csv,
)
(
self.train_csv,
self.dev_enroll_csv,
self.dev_probe_csv,
self.eval_enroll_csv,
self.eval_probe_csv,
) = get_paths()
def get_dict_cache():
cache = dict()
cache["train"] = None
cache["dev_enroll_csv"] = None
cache["dev_probe_csv"] = None
cache["eval_enroll_csv"] = None
cache["eval_probe_csv"] = None
return cache
self.cache = get_dict_cache()
self.csv_to_sample_loader = csv_to_sample_loader
def background_model_samples(self):
self.cache["train"] = (
self.csv_to_sample_loader(self.train_csv)
if self.cache["train"] is None
else self.cache["train"]
)
return self.cache["train"]
def _get_samplesets(self, group="dev", purpose="enroll", group_by_subject=False):
if purpose == "enroll":
cache_label = "dev_enroll_csv" if group == "dev" else "eval_enroll_csv"
else:
cache_label = "dev_probe_csv" if group == "dev" else "eval_probe_csv"
if self.cache[cache_label] is not None:
return self.cache[cache_label]
probes_data = self.csv_to_sample_loader(self.__dict__[cache_label])
sample_sets = self.csv_to_sample_loader.convert_samples_to_samplesets(
probes_data, group_by_subject=group_by_subject
)
self.cache[cache_label] = sample_sets
return self.cache[cache_label]
def references(self, group="dev"):
return self._get_samplesets(
group=group, purpose="enroll", group_by_subject=True
)
def probes(self, group="dev"):
return self._get_samplesets(
group=group, purpose="probe", group_by_subject=False
)
from .models import FileListFile
from .query import FileListBioDatabase
from .driver import Interface
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is shortened.
Parameters:
*args: An iterable of objects to modify
Resolves `Sphinx referencing issues
<https://github.com/sphinx-doc/sphinx/issues/3048>`
"""
for obj in args:
obj.__module__ = __name__
__appropriate__(
FileListFile,
FileListBioDatabase,
Interface,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <laurent.el-shafey@idiap.ch>
#
# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Commands the Verification Filelists database can respond to.
"""
import os
import sys
from bob.db.base.driver import Interface as BaseInterface
def dumplist(args):
"""Dumps lists of files based on your criteria"""
from .query import FileListBioDatabase
db = FileListBioDatabase(args.list_directory, 'bio_filelist', use_dense_probe_file_list=False)
r = db.objects(
purposes=args.purpose,
groups=args.group,
classes=args.sclass,
protocol=args.protocol
)
output = sys.stdout
if args.selftest:
from bob.db.base.utils import null
output = null()
for f in r:
output.write('%s\n' % f.make_path(directory=args.directory, extension=args.extension))
return 0
def checkfiles(args):
"""Checks existence of files based on your criteria"""
from .query import FileListBioDatabase
db = FileListBioDatabase(args.list_directory, 'bio_filelist', use_dense_probe_file_list=False)
r = db.objects(protocol=args.protocol)
# go through all files, check if they are available on the filesystem
good = []
bad = []
for f in r:
if os.path.exists(f.make_path(args.directory, args.extension)):
good.append(f)
else:
bad.append(f)
# report
output = sys.stdout
if args.selftest:
from bob.db.base.utils import null
output = null()
if bad:
for f in bad:
output.write('Cannot find file "%s"\n' % f.make_path(args.directory, args.extension))
output.write('%d files (out of %d) were not found at "%s"\n' %
(len(bad), len(r), args.directory))
return 0
class Interface(BaseInterface):
def name(self):
return 'bio_filelist'
def version(self):
import pkg_resources # part of setuptools
return pkg_resources.require('bob.bio.base')[0].version
def files(self):
return ()
def type(self):
return 'text'
def add_commands(self, parser):
from . import __doc__ as docs
subparsers = self.setup_parser(parser,
"Face Verification File Lists database", docs)
import argparse
# the "dumplist" action
parser = subparsers.add_parser('dumplist', help=dumplist.__doc__)
parser.add_argument('-l', '--list-directory', required=True,
help="The directory which contains the file lists.")
parser.add_argument('-d', '--directory', default='',
help="if given, this path will be prepended to every entry returned.")
parser.add_argument('-e', '--extension', default='',
help="if given, this extension will be appended to every entry returned.")
parser.add_argument('-u', '--purpose',
help="if given, this value will limit the output files to those designed for the given purposes.",
choices=('enroll', 'probe', ''))
parser.add_argument('-g', '--group',
help="if given, this value will limit the output files to those belonging to a particular protocolar group.",
choices=('dev', 'eval', 'world', 'optional_world_1', 'optional_world_2', ''))
parser.add_argument('-c', '--class', dest="sclass",
help="if given, this value will limit the output files to those belonging to the given classes.",
choices=('client', 'impostor', ''))
parser.add_argument('-p', '--protocol', default=None,
help="If set, the protocol is appended to the directory that contains the file lists.")
parser.add_argument('--self-test', dest="selftest", action='store_true', help=argparse.SUPPRESS)
parser.set_defaults(func=dumplist) # action
# the "checkfiles" action
parser = subparsers.add_parser('checkfiles', help=checkfiles.__doc__)
parser.add_argument('-l', '--list-directory', required=True,
help="The directory which contains the file lists.")
parser.add_argument('-d', '--directory', dest="directory", default='',
help="if given, this path will be prepended to every entry returned.")
parser.add_argument('-e', '--extension', dest="extension", default='',
help="if given, this extension will be appended to every entry returned.")
parser.add_argument('-p', '--protocol', default=None,
help="If set, the protocol is appended to the directory that contains the file lists.")
parser.add_argument('--self-test', dest="selftest", action='store_true', help=argparse.SUPPRESS)
parser.set_defaults(func=checkfiles) # action
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Manuel Guenther <Manuel.Guenther@idiap.ch>
# @date: Wed Oct 24 10:47:43 CEST 2012
#
# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This file defines a simple interface that are comparable with other bob.db databases.
"""
import os
import fileinput
import re
class FileListFile(object):
"""
Initialize the File object with the minimum required data.
If the ``model_id`` is not specified, ``model_id`` and ``client_id`` are identical.
If the ``claimed_id`` is not specified, it is expected to be the ``client_id``.
Parameters
----------
client_id : various type
The id of the client, this file belongs to.
The type of it is dependent on your implementation.
If you use an SQL database, this should be an SQL type like Integer or String.
path : str
The path of this file, relative to the basic directory.
If you use an SQL database, this should be the SQL type String.
Please do not specify any file extensions.
file_id : various type
The id of the file.
The type of it is dependent on your implementation.
If you use an SQL database, this should be an SQL type like Integer or String.
If you are using an automatically determined file id, you can skip selecting the file id.
"""
def __init__(self, file_name, client_id, model_id=None, claimed_id=None):
# super(FileListFile, self).__init__(client_id=client_id, path=file_name, file_id=file_name)
super(FileListFile, self).__init__()
self.client_id = client_id
self.path = file_name
self.id = file_name
# Note: in case of probe files, model ids are considered to be the ids of the model for the given probe file.
# Hence, there might be several probe files with the same file id, but different model ids.
# Therefore, please DO NOT USE the model_id outside of this class (or the according database queries).
# when the model id is not specified, we use the client id instead
self._model_id = client_id if model_id is None else model_id
# when the claimed id is not specified, we use the client id instead
self.claimed_id = client_id if claimed_id is None else claimed_id
#############################################################################
# internal access functions for the file lists; do not export!
#############################################################################
class ListReader(object):
def __init__(self, store_lists):
self.m_read_lists = {}
self.m_model_dicts = {}
self.m_store_lists = store_lists
def _read_multi_column_list(self, list_file):
rows = []
if not os.path.isfile(list_file):
raise RuntimeError('File %s does not exist.' % (list_file,))
try:
for line in fileinput.input(list_file):
if line.strip().startswith('#'):
continue
parsed_line = re.findall('[\w/(-.)]+', line)
if len(parsed_line):
# perform some sanity checks
if len(parsed_line) not in (2, 3, 4):
raise IOError("The read line '%s' from file '%s' could not be parsed successfully!" % (
line.rstrip(), list_file))
if len(rows) and len(rows[0]) != len(parsed_line):
raise IOError(
"The parsed line '%s' from file '%s' has a different number of elements than the first parsed line '%s'!" % (
parsed_line, list_file, rows[0]))
# append the read line
rows.append(parsed_line)
fileinput.close()
except IOError as e:
raise RuntimeError("Error reading the file '%s' : '%s'." % (list_file, e))