Skip to content
Snippets Groups Projects
Commit 42b07ac9 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

Initial version

parents
No related branches found
No related tags found
No related merge requests found
*~
*.swp
*.pyc
bin
eggs
parts
.installed.cfg
.mr.developer.cfg
*.egg-info
src
develop-eggs
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2006 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Bootstrap a buildout-based project
Simply run this script in a directory containing a buildout.cfg.
The script accepts buildout command-line options, so you can
use the -c option to specify an alternate configuration file.
"""
import os, shutil, sys, tempfile, urllib, urllib2, subprocess
from optparse import OptionParser
if sys.platform == 'win32':
def quote(c):
if ' ' in c:
return '"%s"' % c # work around spawn lamosity on windows
else:
return c
else:
quote = str
# See zc.buildout.easy_install._has_broken_dash_S for motivation and comments.
stdout, stderr = subprocess.Popen(
[sys.executable, '-Sc',
'try:\n'
' import ConfigParser\n'
'except ImportError:\n'
' print 1\n'
'else:\n'
' print 0\n'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
has_broken_dash_S = bool(int(stdout.strip()))
# In order to be more robust in the face of system Pythons, we want to
# run without site-packages loaded. This is somewhat tricky, in
# particular because Python 2.6's distutils imports site, so starting
# with the -S flag is not sufficient. However, we'll start with that:
if not has_broken_dash_S and 'site' in sys.modules:
# We will restart with python -S.
args = sys.argv[:]
args[0:0] = [sys.executable, '-S']
args = map(quote, args)
os.execv(sys.executable, args)
# Now we are running with -S. We'll get the clean sys.path, import site
# because distutils will do it later, and then reset the path and clean
# out any namespace packages from site-packages that might have been
# loaded by .pth files.
clean_path = sys.path[:]
import site # imported because of its side effects
sys.path[:] = clean_path
for k, v in sys.modules.items():
if k in ('setuptools', 'pkg_resources') or (
hasattr(v, '__path__') and
len(v.__path__) == 1 and
not os.path.exists(os.path.join(v.__path__[0], '__init__.py'))):
# This is a namespace package. Remove it.
sys.modules.pop(k)
is_jython = sys.platform.startswith('java')
setuptools_source = 'http://peak.telecommunity.com/dist/ez_setup.py'
distribute_source = 'http://python-distribute.org/distribute_setup.py'
# parsing arguments
def normalize_to_url(option, opt_str, value, parser):
if value:
if '://' not in value: # It doesn't smell like a URL.
value = 'file://%s' % (
urllib.pathname2url(
os.path.abspath(os.path.expanduser(value))),)
if opt_str == '--download-base' and not value.endswith('/'):
# Download base needs a trailing slash to make the world happy.
value += '/'
else:
value = None
name = opt_str[2:].replace('-', '_')
setattr(parser.values, name, value)
usage = '''\
[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
Bootstraps a buildout-based project.
Simply run this script in a directory containing a buildout.cfg, using the
Python that you want bin/buildout to use.
Note that by using --setup-source and --download-base to point to
local resources, you can keep this script from going over the network.
'''
parser = OptionParser(usage=usage)
parser.add_option("-v", "--version", dest="version",
help="use a specific zc.buildout version")
parser.add_option("-d", "--distribute",
action="store_true", dest="use_distribute", default=False,
help="Use Distribute rather than Setuptools.")
parser.add_option("--setup-source", action="callback", dest="setup_source",
callback=normalize_to_url, nargs=1, type="string",
help=("Specify a URL or file location for the setup file. "
"If you use Setuptools, this will default to " +
setuptools_source + "; if you use Distribute, this "
"will default to " + distribute_source + "."))
parser.add_option("--download-base", action="callback", dest="download_base",
callback=normalize_to_url, nargs=1, type="string",
help=("Specify a URL or directory for downloading "
"zc.buildout and either Setuptools or Distribute. "
"Defaults to PyPI."))
parser.add_option("--eggs",
help=("Specify a directory for storing eggs. Defaults to "
"a temporary directory that is deleted when the "
"bootstrap script completes."))
parser.add_option("-t", "--accept-buildout-test-releases",
dest='accept_buildout_test_releases',
action="store_true", default=False,
help=("Normally, if you do not specify a --version, the "
"bootstrap script and buildout gets the newest "
"*final* versions of zc.buildout and its recipes and "
"extensions for you. If you use this flag, "
"bootstrap and buildout will get the newest releases "
"even if they are alphas or betas."))
parser.add_option("-c", None, action="store", dest="config_file",
help=("Specify the path to the buildout configuration "
"file to be used."))
options, args = parser.parse_args()
# if -c was provided, we push it back into args for buildout's main function
if options.config_file is not None:
args += ['-c', options.config_file]
if options.eggs:
eggs_dir = os.path.abspath(os.path.expanduser(options.eggs))
else:
eggs_dir = tempfile.mkdtemp()
if options.setup_source is None:
if options.use_distribute:
options.setup_source = distribute_source
else:
options.setup_source = setuptools_source
if options.accept_buildout_test_releases:
args.append('buildout:accept-buildout-test-releases=true')
args.append('bootstrap')
try:
import pkg_resources
import setuptools # A flag. Sometimes pkg_resources is installed alone.
if not hasattr(pkg_resources, '_distribute'):
raise ImportError
except ImportError:
ez_code = urllib2.urlopen(
options.setup_source).read().replace('\r\n', '\n')
ez = {}
exec ez_code in ez
setup_args = dict(to_dir=eggs_dir, download_delay=0)
if options.download_base:
setup_args['download_base'] = options.download_base
if options.use_distribute:
setup_args['no_fake'] = True
ez['use_setuptools'](**setup_args)
if 'pkg_resources' in sys.modules:
reload(sys.modules['pkg_resources'])
import pkg_resources
# This does not (always?) update the default working set. We will
# do it.
for path in sys.path:
if path not in pkg_resources.working_set.entries:
pkg_resources.working_set.add_entry(path)
cmd = [quote(sys.executable),
'-c',
quote('from setuptools.command.easy_install import main; main()'),
'-mqNxd',
quote(eggs_dir)]
if not has_broken_dash_S:
cmd.insert(1, '-S')
find_links = options.download_base
if not find_links:
find_links = os.environ.get('bootstrap-testing-find-links')
if find_links:
cmd.extend(['-f', quote(find_links)])
if options.use_distribute:
setup_requirement = 'distribute'
else:
setup_requirement = 'setuptools'
ws = pkg_resources.working_set
setup_requirement_path = ws.find(
pkg_resources.Requirement.parse(setup_requirement)).location
env = dict(
os.environ,
PYTHONPATH=setup_requirement_path)
requirement = 'zc.buildout'
version = options.version
if version is None and not options.accept_buildout_test_releases:
# Figure out the most recent final version of zc.buildout.
import setuptools.package_index
_final_parts = '*final-', '*final'
def _final_version(parsed_version):
for part in parsed_version:
if (part[:1] == '*') and (part not in _final_parts):
return False
return True
index = setuptools.package_index.PackageIndex(
search_path=[setup_requirement_path])
if find_links:
index.add_find_links((find_links,))
req = pkg_resources.Requirement.parse(requirement)
if index.obtain(req) is not None:
best = []
bestv = None
for dist in index[req.project_name]:
distv = dist.parsed_version
if _final_version(distv):
if bestv is None or distv > bestv:
best = [dist]
bestv = distv
elif distv == bestv:
best.append(dist)
if best:
best.sort()
version = best[-1].version
if version:
requirement = '=='.join((requirement, version))
cmd.append(requirement)
if is_jython:
import subprocess
exitcode = subprocess.Popen(cmd, env=env).wait()
else: # Windows prefers this, apparently; otherwise we would prefer subprocess
exitcode = os.spawnle(*([os.P_WAIT, sys.executable] + cmd + [env]))
if exitcode != 0:
sys.stdout.flush()
sys.stderr.flush()
print ("An error occurred when trying to install zc.buildout. "
"Look above this message for any errors that "
"were output by easy_install.")
sys.exit(exitcode)
ws.add_entry(eggs_dir)
ws.require(requirement)
import zc.buildout.buildout
zc.buildout.buildout.main(args)
if not options.eggs: # clean up temporary egg directory
shutil.rmtree(eggs_dir)
; vim: set fileencoding=utf-8 :
; Andre Anjos <andre.anjos@idiap.ch>
; Mon 16 Apr 08:29:18 2012 CEST
[buildout]
parts = python
develop = .
eggs = bob.db.replay
[python]
recipe = zc.recipe.egg
interpreter = python
eggs = ${buildout:eggs}
; vim: set fileencoding=utf-8 :
; Andre Anjos <andre.anjos@idiap.ch>
; Mon 16 Apr 08:29:18 2012 CEST
; Example buildout recipe using a local (off-root) Bob installation
[buildout]
parts = bob python tests
develop = .
eggs = bob
bob.db.replay
[bob]
recipe = local.bob.recipe:config
; Set here the private installation directory for Bob
install-directory = /scratch/aanjos/bob-master/build/newdb/release
[python]
recipe = zc.recipe.egg
interpreter = python
eggs = ${buildout:eggs}
[tests]
recipe = pbp.recipe.noserunner
eggs = ${buildout:eggs}
pbp.recipe.noserunner
script = tests.py
========================
Replay Attack Database
========================
This package contains the access API and descriptions for the `Replay Attack
Database <http://www.idiap.ch/dataset/replayattack/>`_. The actual raw data for
the database should be downloaded from the given URL, this package contains
only the `Bob <http://idiap.ch/software/bob/>`_ accessor methods to use the DB
directly from python, with our certified protocols.
You would normally not install this package unless you are maintaining it. What
you would do instead is to tie it in at the package you need to **use** it.
There are a few ways to achieve this:
1. You can add this package as a requirement at the ``setup.py`` for your own
`satellite package
<https://github.com/idiap/bob/wiki/Virtual-Work-Environments-with-Buildout>`_
or to your Buildout ``.cfg`` file, if you prefer it that way. With this
method, this package gets automatically downloaded and installed on your
working environment, or
2. You can manually download and install this package using commands like
``easy_install`` or ``pip``.
The package is available in two different distribution formats:
a. You can download it from `PyPI <http://pypi.python.org/pypi>`_, or
b. You can download it in its source form from `its git repository
<http://github.com/bioidiap/bob.db.replay.git>`_. When you download the
version at the git repository, you will need to run a command to recreate the
backend SQLite file required for its operation. This means that the database
raw files must be installed somewhere in this case. With option ``a`` you can
run in `dummy` mode and only download the raw data files for the database
once you are happy with your setup.
You can mix and match points 1/2 and a/b above based on your requirements. Here
are some examples:
Modify your setup.py and download from PyPI
===========================================
That is the easiest. Edit your ``setup.py`` in your satellite package and add
the following entry in the ``install_requires`` section (note: ``...`` means
`whatever extra stuff you may have in-between`, don't put that on your
script)::
install_requires=[
...
"bob.db.replay >= 1.1",
],
Proceed normally with your ``boostrap/buildout`` steps and you should be all
set. That means you can now import ``bob.db.replay`` into your scripts.
Modify your buildout.cfg and download from git
==============================================
You will need to add a dependence to `mr.developer
<http://pypi.python.org/pypi/mr.developer/>`_ to be able to install from our
git repositories. Your ``buildout.cfg`` file should contain the following
lines::
[buildout]
...
extensions = mr.developer
auto-checkout = *
eggs = bob
...
bob.db.replay
[sources]
bob.db.replay = git https://github.com/bioidiap/bob.db.replay.git
...
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Wed 18 May 09:28:44 2011
"""The Idiap Replay attack database consists of Photo and Video attacks to
different identities under different illumination conditions.
"""
import os
def dbname():
'''Returns the database name'''
return 'replay'
def version():
'''Returns the current version number defined in setup.py (DRY)'''
import pkg_resources # part of setuptools
return pkg_resources.require('bob.db.replay')[0].version
def location():
'''Returns the directory that contains the data'''
return os.path.dirname(os.path.realpath(__file__))
def files():
'''Returns a python iterable with all auxiliary files needed'''
return ('db.sql3',)
def type():
'''Returns the type of auxiliary files you have for this database
If you return 'sqlite', then we append special actions such as 'dbshell'
on 'bob_dbmanage.py' automatically for you. Otherwise, we don't.
If you use auxiliary text files, just return 'text'. We may provide
special services for those types in the future.
'''
return 'sqlite'
# these are required for the dbmanage.py driver
from .query import Database
from .commands import add_commands
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Tue 20 Mar 19:20:22 2012 CET
"""Checks for installed files.
"""
import os
import sys
# Driver API
# ==========
def checkfiles(args):
"""Checks existence files based on your criteria"""
from .query import Database
db = Database()
r = db.files(
directory=args.directory,
extension=args.extension,
protocol=args.protocol,
support=args.support,
groups=args.group,
cls=args.cls,
light=args.light,
)
# go through all files, check if they are available on the filesystem
good = {}
bad = {}
for id, f in r.items():
if os.path.exists(f): good[id] = f
else: bad[id] = f
# report
output = sys.stdout
if args.selftest:
from bob.db.utils import null
output = null()
if bad:
for id, f in bad.items():
output.write('Cannot find file "%s"\n' % (f,))
output.write('%d files (out of %d) were not found at "%s"\n' % \
(len(bad), len(r), args.directory))
return 0
def add_command(subparsers):
"""Add specific subcommands that the action "checkfiles" can use"""
from argparse import SUPPRESS
parser = subparsers.add_parser('checkfiles', help=checkfiles.__doc__)
from .query import Database
db = Database()
if not db.is_valid():
protocols = ('waiting','for','database','creation')
else:
protocols = db.protocols()
parser.add_argument('-d', '--directory', dest="directory", default='', help="if given, this path will be prepended to every entry checked (defaults to '%(default)s')")
parser.add_argument('-e', '--extension', dest="extension", default='', help="if given, this extension will be appended to every entry checked (defaults to '%(default)s')")
parser.add_argument('-c', '--class', dest="cls", default='', help="if given, limits the check to a particular subset of the data that corresponds to the given class (defaults to '%(default)s')", choices=('real', 'attack', 'enroll'))
parser.add_argument('-g', '--group', dest="group", default='', help="if given, this value will limit the check to those files belonging to a particular protocolar group. (defaults to '%(default)s')", choices=db.groups())
parser.add_argument('-s', '--support', dest="support", default='', help="if given, this value will limit the check to those files using this type of attack support. (defaults to '%(default)s')", choices=db.attack_supports())
parser.add_argument('-x', '--protocol', dest="protocol", default='', help="if given, this value will limit the check to those files for a given protocol. (defaults to '%(default)s')", choices=protocols)
parser.add_argument('-l', '--light', dest="light", default='', help="if given, this value will limit the check to those files shot under a given lighting. (defaults to '%(default)s')", choices=db.lights())
parser.add_argument('--self-test', dest="selftest", default=False,
action='store_true', help=SUPPRESS)
parser.set_defaults(func=checkfiles) #action
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Tue 28 Jun 2011 15:20:09 CEST
"""Commands this database can respond to.
"""
import os
import sys
def reverse(args):
"""Returns a list of file database identifiers given the path stems"""
from .query import Database
db = Database()
output = sys.stdout
if args.selftest:
from bob.db.utils import null
output = null()
r = db.reverse(args.path)
for id in r: output.write('%d\n' % id)
if not r: return 1
return 0
def reverse_command(subparsers):
"""Adds the specific options for the reverse command"""
from argparse import SUPPRESS
parser = subparsers.add_parser('reverse', help=reverse.__doc__)
parser.add_argument('path', nargs='+', type=str, help="one or more path stems to look up. If you provide more than one, files which cannot be reversed will be omitted from the output.")
parser.add_argument('--self-test', dest="selftest", default=False,
action='store_true', help=SUPPRESS)
parser.set_defaults(func=reverse) #action
def path(args):
"""Returns a list of fully formed paths or stems given some file id"""
from .query import Database
db = Database()
output = sys.stdout
if args.selftest:
from bob.db.utils import null
output = null()
r = db.paths(args.id, prefix=args.directory, suffix=args.extension)
for path in r: output.write('%s\n' % path)
if not r: return 1
return 0
def path_command(subparsers):
"""Adds the specific options for the path command"""
from argparse import SUPPRESS
parser = subparsers.add_parser('path', help=path.__doc__)
parser.add_argument('-d', '--directory', dest="directory", default='', help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
parser.add_argument('-e', '--extension', dest="extension", default='', help="if given, this extension will be appended to every entry returned (defaults to '%(default)s')")
parser.add_argument('id', nargs='+', type=int, help="one or more file ids to look up. If you provide more than one, files which cannot be found will be omitted from the output. If you provide a single id to lookup, an error message will be printed if the id does not exist in the database. The exit status will be non-zero in such case.")
parser.add_argument('--self-test', dest="selftest", default=False,
action='store_true', help=SUPPRESS)
parser.set_defaults(func=path) #action
def add_commands(parser):
"""Adds my subset of options and arguments to the top-level parser. For
details on syntax, please consult:
http://docs.python.org/dev/library/argparse.html
The strategy assumed here is that each command will have its own set of
options that are relevant to that command. So, we just scan such commands and
attach the options from those.
"""
from . import dbname, location, version, type, files
from bob.db.utils import standard_commands
from . import __doc__ as dbdoc
from argparse import RawDescriptionHelpFormatter
# creates a top-level parser for this database
myname = dbname()
top_level = parser.add_parser(myname,
formatter_class=RawDescriptionHelpFormatter,
help="Photo/Video Replay attack database", description=dbdoc)
top_level.set_defaults(dbname=myname)
top_level.set_defaults(location=location())
top_level.set_defaults(version=version())
top_level.set_defaults(type=type())
top_level.set_defaults(files=files())
# declare it has subparsers for each of the supported commands
subparsers = top_level.add_subparsers(title="subcommands")
# attach standard commands
standard_commands(subparsers, type(), files())
# get the "create" action from a submodule
from .create import add_command as create_command
create_command(subparsers)
# get the "dumplist" action from a submodule
from .dumplist import add_command as dumplist_command
dumplist_command(subparsers)
# get the "checkfiles" action from a submodule
from .checkfiles import add_command as checkfiles_command
checkfiles_command(subparsers)
# adds the "reverse" command
reverse_command(subparsers)
# adds the "path" command
path_command(subparsers)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Thu 12 May 08:19:50 2011
"""This script creates the Replay-Attack database in a single pass.
"""
import os
import fnmatch
from .models import *
def add_clients(session, protodir, verbose):
"""Add clients to the replay attack database."""
for client in open(os.path.join(protodir, 'clients.txt'), 'rt'):
s = client.strip().split(' ', 2)
if not s: continue #empty line
id = int(s[0])
set = s[1]
if verbose: print "Adding client %d on '%s' set..." % (id, set)
session.add(Client(id, set))
def add_real_lists(session, protodir, verbose):
"""Adds all RCD filelists"""
def add_real_list(session, filename):
"""Adds an RCD filelist and materializes RealAccess'es."""
def parse_real_filename(f):
"""Parses the RCD filename and break it in the relevant chunks."""
v = os.path.splitext(os.path.basename(f))[0].split('_')
client_id = int(v[0].replace('client',''))
path = os.path.splitext(f)[0] #keep only the filename stem
purpose = v[3]
light = v[4]
if len(v) == 6: take = int(v[5]) #authentication session
else: take = 1 #enrollment session
return [client_id, path, light], [purpose, take]
for fname in open(filename, 'rt'):
s = fname.strip()
if not s: continue #emtpy line
filefields, realfields = parse_real_filename(s)
filefields[0] = session.query(Client).filter(Client.id == filefields[0]).one()
file = File(*filefields)
session.add(file)
realfields.insert(0, file)
session.add(RealAccess(*realfields))
add_real_list(session, os.path.join(protodir, 'real-train.txt'))
add_real_list(session, os.path.join(protodir, 'real-devel.txt'))
add_real_list(session, os.path.join(protodir, 'real-test.txt'))
add_real_list(session, os.path.join(protodir, 'recognition-train.txt'))
add_real_list(session, os.path.join(protodir, 'recognition-devel.txt'))
add_real_list(session, os.path.join(protodir, 'recognition-test.txt'))
def add_attack_lists(session, protodir, verbose):
"""Adds all RAD filelists"""
def add_attack_list(session, filename):
"""Adds an RAD filelist and materializes Attacks."""
def parse_attack_filename(f):
"""Parses the RAD filename and break it in the relevant chunks."""
v = os.path.splitext(os.path.basename(f))[0].split('_')
attack_device = v[1] #print, mobile or highdef
client_id = int(v[2].replace('client',''))
path = os.path.splitext(f)[0] #keep only the filename stem
sample_device = v[4] #highdef or mobile
sample_type = v[5] #photo or video
light = v[6]
attack_support = f.split('/')[-2]
return [client_id, path, light], [attack_support, attack_device, sample_type, sample_device]
for fname in open(filename, 'rt'):
s = fname.strip()
if not s: continue #emtpy line
filefields, attackfields = parse_attack_filename(s)
filefields[0] = session.query(Client).filter(Client.id == filefields[0]).one()
file = File(*filefields)
session.add(file)
attackfields.insert(0, file)
session.add(Attack(*attackfields))
add_attack_list(session,os.path.join(protodir, 'attack-grandtest-allsupports-train.txt'))
add_attack_list(session,os.path.join(protodir, 'attack-grandtest-allsupports-devel.txt'))
add_attack_list(session,os.path.join(protodir, 'attack-grandtest-allsupports-test.txt'))
def define_protocols(session, protodir, verbose):
"""Defines all available protocols"""
#figures out which protocols to use
valid = {}
for fname in fnmatch.filter(os.listdir(protodir), 'attack-*-allsupports-train.txt'):
s = fname.split('-', 4)
consider = True
files = {}
for grp in ('train', 'devel', 'test'):
# check attack file
attack = os.path.join(protodir, 'attack-%s-allsupports-%s.txt' % (s[1], grp))
if not os.path.exists(attack):
if verbose:
print "Not considering protocol %s as attack list '%s' was not found" % (s[1], attack)
consider = False
# check real file
real = os.path.join(protodir, 'real-%s-allsupports-%s.txt' % (s[1], grp))
if not os.path.exists(real):
alt_real = os.path.join(protodir, 'real-%s.txt' % (grp,))
if not os.path.exists(alt_real):
if verbose:
print "Not considering protocol %s as real list '%s' or '%s' were not found" % (s[1], real, alt_real)
consider = False
else:
real = alt_real
if consider: files[grp] = (attack, real)
if consider: valid[s[1]] = files
for protocol, groups in valid.iteritems():
if verbose: print "Creating protocol '%s'..." % protocol
# create protocol on the protocol table
obj = Protocol(name=protocol)
for grp, flist in groups.iteritems():
counter = 0
for fname in open(flist[0], 'rt'):
s = os.path.splitext(fname.strip())[0]
q = session.query(Attack).join(File).filter(File.path == s).one()
q.protocols.append(obj)
counter += 1
if verbose: print " -> %5s/%-6s: %d files" % (grp, "attack", counter)
counter = 0
for fname in open(flist[1], 'rt'):
s = os.path.splitext(fname.strip())[0]
q = session.query(RealAccess).join(File).filter(File.path == s).one()
q.protocols.append(obj)
counter += 1
if verbose: print " -> %5s/%-6s: %d files" % (grp, "real", counter)
session.add(obj)
def create_tables(args):
"""Creates all necessary tables (only to be used at the first time)"""
from bob.db.utils import connection_string
from sqlalchemy import create_engine
engine = create_engine(connection_string(args.type, args.location,
args.files[0]), echo=args.verbose)
Client.metadata.create_all(engine)
RealAccess.metadata.create_all(engine)
Attack.metadata.create_all(engine)
Protocol.metadata.create_all(engine)
# Driver API
# ==========
def create(args):
"""Creates or re-creates this database"""
from bob.db.utils import session
dbfile = os.path.join(args.location, args.files[0])
args.verbose = 0 if args.verbose is None else sum(args.verbose)
if args.recreate:
if args.verbose and os.path.exists(dbfile):
print('unlinking %s...' % dbfile)
if os.path.exists(dbfile): os.unlink(dbfile)
if not os.path.exists(os.path.dirname(dbfile)):
os.makedirs(os.path.dirname(dbfile))
# the real work...
create_tables(args)
s = session(args.type, args.location, args.files[0], echo=(args.verbose >= 2))
add_clients(s, args.protodir, args.verbose)
add_real_lists(s, args.protodir, args.verbose)
add_attack_lists(s, args.protodir, args.verbose)
define_protocols(s, args.protodir, args.verbose)
s.commit()
s.close()
return 0
def add_command(subparsers):
"""Add specific subcommands that the action "create" can use"""
parser = subparsers.add_parser('create', help=create.__doc__)
parser.add_argument('-R', '--recreate', action='store_true', default=False,
help="If set, I'll first erase the current database")
parser.add_argument('-v', '--verbose', action='append_const', const=1,
help="Do SQL operations in a verbose way")
parser.add_argument('-D', '--protodir', action='store',
default='/idiap/group/replay/database/protocols/replayattack-database/protocols',
metavar='DIR',
help="Change the relative path to the directory containing the protocol definitions for replay attacks (defaults to %(default)s)")
parser.set_defaults(func=create) #action
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Thu 12 May 14:02:28 2011
"""Dumps lists of files.
"""
import os
import sys
# Driver API
# ==========
def dumplist(args):
"""Dumps lists of files based on your criteria"""
from .query import Database
db = Database()
r = db.files(
directory=args.directory,
extension=args.extension,
protocol=args.protocol,
support=args.support,
groups=args.group,
cls=args.cls,
light=args.light,
)
output = sys.stdout
if args.selftest:
from bob.db.utils import null
output = null()
for id, f in r.items():
output.write('%s\n' % (f,))
return 0
def add_command(subparsers):
"""Add specific subcommands that the action "dumplist" can use"""
from argparse import SUPPRESS
parser = subparsers.add_parser('dumplist', help=dumplist.__doc__)
from .query import Database
db = Database()
if not db.is_valid():
protocols = ('waiting','for','database','creation')
else:
protocols = db.protocols()
parser.add_argument('-d', '--directory', dest="directory", default='', help="if given, this path will be prepended to every entry returned (defaults to '%(default)s')")
parser.add_argument('-e', '--extension', dest="extension", default='', help="if given, this extension will be appended to every entry returned (defaults to '%(default)s')")
parser.add_argument('-c', '--class', dest="cls", default='', help="if given, limits the dump to a particular subset of the data that corresponds to the given class (defaults to '%(default)s')", choices=('real', 'attack', 'enroll'))
parser.add_argument('-g', '--group', dest="group", default='', help="if given, this value will limit the output files to those belonging to a particular protocolar group. (defaults to '%(default)s')", choices=db.groups())
parser.add_argument('-s', '--support', dest="support", default='', help="if given, this value will limit the output files to those using this type of attack support. (defaults to '%(default)s')", choices=db.attack_supports())
parser.add_argument('-x', '--protocol', dest="protocol", default='', help="if given, this value will limit the output files to those for a given protocol. (defaults to '%(default)s')", choices=protocols)
parser.add_argument('-l', '--light', dest="light", default='', help="if given, this value will limit the output files to those shot under a given lighting. (defaults to '%(default)s')", choices=db.lights())
parser.add_argument('--self-test', dest="selftest", default=False,
action='store_true', help=SUPPRESS)
parser.set_defaults(func=dumplist) #action
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Wed 11 May 18:52:38 2011
"""Table models and functionality for the Replay Attack DB.
"""
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from bob.db.sqlalchemy_migration import Enum, relationship
from sqlalchemy.orm import backref
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Client(Base):
__tablename__ = 'client'
set_choices = ('train', 'devel', 'test')
id = Column(Integer, primary_key=True)
set = Column(Enum(*set_choices))
def __init__(self, id, set):
self.id = id
self.set = set
def __repr__(self):
return "<Client('%s', '%s')>" % (self.id, self.set)
class File(Base):
__tablename__ = 'file'
light_choices = ('controlled', 'adverse')
id = Column(Integer, primary_key=True)
client_id = Column(Integer, ForeignKey('client.id')) # for SQL
path = Column(String(100), unique=True)
light = Column(Enum(*light_choices))
# for Python
client = relationship(Client, backref=backref('files', order_by=id))
def __init__(self, client, path, light):
self.client = client
self.path = path
self.light = light
def __repr__(self):
print "<File('%s')>" % self.path
# Intermediate mapping from RealAccess's to Protocol's
realaccesses_protocols = Table('realaccesses_protocols', Base.metadata,
Column('realaccess_id', Integer, ForeignKey('realaccess.id')),
Column('protocol_id', Integer, ForeignKey('protocol.id')),
)
# Intermediate mapping from Attack's to Protocol's
attacks_protocols = Table('attacks_protocols', Base.metadata,
Column('attack_id', Integer, ForeignKey('attack.id')),
Column('protocol_id', Integer, ForeignKey('protocol.id')),
)
class Protocol(Base):
__tablename__ = 'protocol'
id = Column(Integer, primary_key=True)
name = Column(String(20), unique=True)
def __init__(self, name):
self.name = name
def __repr__(self):
return "<Protocol('%s')>" % (self.name,)
class RealAccess(Base):
__tablename__ = 'realaccess'
purpose_choices = ('authenticate', 'enroll')
id = Column(Integer, primary_key=True)
file_id = Column(Integer, ForeignKey('file.id')) # for SQL
purpose = Column(Enum(*purpose_choices))
take = Column(Integer)
# for Python
file = relationship(File, backref=backref('realaccess', order_by=id))
protocols = relationship("Protocol", secondary=realaccesses_protocols,
backref='realaccesses')
def __init__(self, file, purpose, take):
self.file = file
self.purpose = purpose
self.take = take
def __repr__(self):
return "<RealAccess('%s')>" % (self.file.path)
class Attack(Base):
__tablename__ = 'attack'
attack_support_choices = ('fixed', 'hand')
attack_device_choices = ('print', 'mobile', 'highdef', 'mask')
sample_type_choices = ('video', 'photo')
sample_device_choices = ('mobile', 'highdef')
id = Column(Integer, primary_key=True)
file_id = Column(Integer, ForeignKey('file.id')) # for SQL
attack_support = Column(Enum(*attack_support_choices))
attack_device = Column(Enum(*attack_device_choices))
sample_type = Column(Enum(*sample_type_choices))
sample_device = Column(Enum(*sample_device_choices))
# for Python
file = relationship(File, backref=backref('attack', order_by=id))
protocols = relationship("Protocol", secondary=attacks_protocols,
backref='attacks')
def __init__(self, file, attack_support, attack_device, sample_type, sample_device):
self.file = file
self.attack_support = attack_support
self.attack_device = attack_device
self.sample_type = sample_type
self.sample_device = sample_device
def __repr__(self):
return "<Attack('%s')>" % (self.file.path)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Tue 17 May 13:58:09 2011
"""This module provides the Dataset interface allowing the user to query the
replay attack database in the most obvious ways.
"""
import os
import logging
from bob.db import utils
from .models import *
from . import dbname, type, location, files
SQLITE_FILE = os.path.join(location(), files()[0])
class Database(object):
"""The dataset class opens and maintains a connection opened to the Database.
It provides many different ways to probe for the characteristics of the data
and for the data itself inside the database.
"""
def __init__(self):
# opens a session to the database - keep it open until the end
self.connect()
def connect(self):
"""Tries connecting or re-connecting to the database"""
if not os.path.exists(SQLITE_FILE):
self.session = None
else:
self.session = utils.session(type(), location(), files()[0])
def is_valid(self):
"""Returns if a valid session has been opened for reading the database"""
return self.session is not None
def files(self, directory=None, extension=None,
support=Attack.attack_support_choices,
protocol='grandtest',
groups=Client.set_choices,
cls=('attack', 'real'),
light=File.light_choices):
"""Returns a set of filenames for the specific query by the user.
Keyword Parameters:
directory
A directory name that will be prepended to the final filepath returned
extension
A filename extension that will be appended to the final filepath returned
support
One of the valid support types as returned by attack_supports() or all,
as a tuple. If you set this parameter to an empty string or the value
None, we use reset it to the default, which is to get all.
protocol
The protocol for the attack. One of the ones returned by protocols(). If
you set this parameter to an empty string or the value None, we use reset
it to the default, "grandtest".
groups
One of the protocolar subgroups of data as returned by groups() or a
tuple with several of them. If you set this parameter to an empty string
or the value None, we use reset it to the default which is to get all.
cls
Either "attack", "real", "enroll" or any combination of those (in a
tuple). Defines the class of data to be retrieved. If you set this
parameter to an empty string or the value None, we use reset it to the
default, ("real", "attack").
light
One of the lighting conditions as returned by lights() or a combination
of the two (in a tuple), which is also the default.
Returns: A dictionary containing the resolved filenames considering all
the filtering criteria. The keys of the dictionary are unique identities
for each file in the replay attack database. Conserve these numbers if you
wish to save processing results later on.
"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
def check_validity(l, obj, valid, default):
"""Checks validity of user input data against a set of valid values"""
if not l: return default
elif isinstance(l, str): return check_validity((l,), obj, valid, default)
for k in l:
if k not in valid:
raise RuntimeError, 'Invalid %s "%s". Valid values are %s, or lists/tuples of those' % (obj, k, valid)
return l
def make_path(stem, directory, extension):
if not extension: extension = ''
if directory: return os.path.join(directory, stem + extension)
return stem + extension
# check if groups set are valid
VALID_GROUPS = self.groups()
groups = check_validity(groups, "group", VALID_GROUPS, VALID_GROUPS)
# check if supports set are valid
VALID_SUPPORTS = self.attack_supports()
support = check_validity(support, "support", VALID_SUPPORTS, VALID_SUPPORTS)
# by default, do NOT grab enrollment data from the database
VALID_CLASSES = ('real', 'attack', 'enroll')
cls = check_validity(cls, "class", VALID_CLASSES, ('real', 'attack'))
# check protocol validity
if not protocol: protocol = 'grandtest' #default
VALID_PROTOCOLS = self.protocols()
if protocol not in VALID_PROTOCOLS:
raise RuntimeError, 'Invalid protocol "%s". Valid values are %s' % \
(protocol, VALID_PROTOCOLS)
# resolve protocol object
protocol = self.protocol(protocol)
# checks if the light is valid
VALID_LIGHTS = self.lights()
light = check_validity(light, "light", VALID_LIGHTS, VALID_LIGHTS)
# now query the database
retval = {}
# real-accesses are simpler to query
if 'enroll' in cls:
q = self.session.query(RealAccess).join(File).join(Client).filter(Client.set.in_(groups)).filter(RealAccess.purpose=='enroll').filter(File.light.in_(light)).order_by(Client.id)
for key, value in [(k.file.id, k.file.path) for k in q]:
retval[key] = make_path(str(value), directory, extension)
# real-accesses are simpler to query
if 'real' in cls:
q = self.session.query(RealAccess).join(File).join(Client).filter(RealAccess.protocols.contains(protocol)).filter(Client.set.in_(groups)).filter(File.light.in_(light)).order_by(Client.id)
for key, value in [(k.file.id, k.file.path) for k in q]:
retval[key] = make_path(str(value), directory, extension)
# attacks will have to be filtered a little bit more
if 'attack' in cls:
q = self.session.query(Attack).join(File).join(Client).filter(Attack.protocols.contains(protocol)).filter(Client.set.in_(groups)).filter(Attack.attack_support.in_(support)).filter(File.light.in_(light)).order_by(Client.id)
for key, value in [(k.file.id, k.file.path) for k in q]:
retval[key] = make_path(str(value), directory, extension)
return retval
def protocols(self):
"""Returns the names of all registered protocols"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
return tuple([k.name for k in self.session.query(Protocol)])
def has_protocol(self, name):
"""Tells if a certain protocol is available"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
return self.session.query(Protocol).filter(Protocol.name==name).count() != 0
def protocol(self, name):
"""Returns the protocol object in the database given a certain name. Raises
an error if that does not exist."""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
return self.session.query(Protocol).filter(Protocol.name==name).one()
def groups(self):
"""Returns the names of all registered groups"""
return Client.set_choices
def lights(self):
"""Returns light variations available in the database"""
return File.light_choices
def attack_supports(self):
"""Returns attack supports available in the database"""
return Attack.attack_support_choices
def attack_devices(self):
"""Returns attack devices available in the database"""
return Attack.attack_device_choices
def attack_sampling_devices(self):
"""Returns sampling devices available in the database"""
return Attack.sample_device_choices
def attack_sample_types(self):
"""Returns attack sample types available in the database"""
return Attack.sample_type_choices
def paths(self, ids, prefix='', suffix=''):
"""Returns a full file paths considering particular file ids, a given
directory and an extension
Keyword Parameters:
id
The ids of the object in the database table "file". This object should be
a python iterable (such as a tuple or list).
prefix
The bit of path to be prepended to the filename stem
suffix
The extension determines the suffix that will be appended to the filename
stem.
Returns a list (that may be empty) of the fully constructed paths given the
file ids.
"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
fobj = self.session.query(File).filter(File.id.in_(ids))
retval = []
for p in ids:
retval.extend([os.path.join(prefix, str(k.path) + suffix)
for k in fobj if k.id == p])
return retval
def reverse(self, paths):
"""Reverses the lookup: from certain stems, returning file ids
Keyword Parameters:
paths
The filename stems I'll query for. This object should be a python
iterable (such as a tuple or list)
Returns a list (that may be empty).
"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
fobj = self.session.query(File).filter(File.path.in_(paths))
retval = []
for p in paths:
retval.extend([k.id for k in fobj if k.path == p])
return retval
def save_one(self, id, obj, directory, extension):
"""Saves a single object supporting the bob save() protocol.
This method will call save() on the the given object using the correct
database filename stem for the given id.
Keyword Parameters:
id
The id of the object in the database table "file".
obj
The object that needs to be saved, respecting the bob save() protocol.
directory
This is the base directory to which you want to save the data. The
directory is tested for existence and created if it is not there with
os.makedirs()
extension
The extension determines the way each of the arrays will be saved.
"""
if not self.is_valid():
raise RuntimeError, "Database '%s' cannot be found at expected location '%s'. Create it and then try re-connecting using Database.connect()" % (dbname(), SQLITE_FILE)
from bob.io import save
fobj = self.session.query(File).filter_by(id=id).one()
fullpath = os.path.join(directory, str(fobj.path) + extension)
fulldir = os.path.dirname(fullpath)
utils.makedirs_safe(fulldir)
save(obj, fullpath)
def save(self, data, directory, extension):
"""This method takes a dictionary of blitz arrays or bob.database.Array's
and saves the data respecting the original arrangement as returned by
files().
Keyword Parameters:
data
A dictionary with two keys 'real' and 'attack', each containing a
dictionary mapping file ids from the original database to an object that
supports the bob "save()" protocol.
directory
This is the base directory to which you want to save the data. The
directory is tested for existence and created if it is not there with
os.makedirs()
extension
The extension determines the way each of the arrays will be saved.
"""
for key, value in data:
self.save_one(key, value, directory, extension)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Mon Aug 8 12:40:24 2011 +0200
#
# Copyright (C) 2011-2012 Idiap Research Institute, Martigny, Switzerland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""A few checks at the replay attack database.
"""
import os, sys
import unittest
from . import db as replay_db
class ReplayDatabaseTest(unittest.TestCase):
"""Performs various tests on the replay attack database."""
def test01_queryRealAccesses(self):
db = replay_db.Database()
f = db.files(cls='real')
self.assertEqual(len(set(f.values())), 200) #200 unique auth sessions
for k,v in f.items():
self.assertTrue( (v.find('authenticate') != -1) )
self.assertTrue( (v.find('real') != -1) )
self.assertTrue( (v.find('webcam') != -1) )
train = db.files(cls='real', groups='train')
self.assertEqual(len(set(train.values())), 60)
dev = db.files(cls='real', groups='devel')
self.assertEqual(len(set(dev.values())), 60)
test = db.files(cls='real', groups='test')
self.assertEqual(len(set(test.values())), 80)
#tests train, devel and test files are distinct
s = set(train.values() + dev.values() + test.values())
self.assertEqual(len(s), 200)
def queryAttackType(self, protocol, N):
db = replay_db.Database()
f = db.files(cls='attack', protocol=protocol)
self.assertEqual(len(set(f.values())), N)
for k,v in f.items():
self.assertTrue(v.find('attack') != -1)
train = db.files(cls='attack', groups='train', protocol=protocol)
self.assertEqual(len(set(train.values())), int(round(0.3*N)))
dev = db.files(cls='attack', groups='devel', protocol=protocol)
self.assertEqual(len(set(dev.values())), int(round(0.3*N)))
test = db.files(cls='attack', groups='test', protocol=protocol)
self.assertEqual(len(set(test.values())), int(round(0.4*N)))
#tests train, devel and test files are distinct
s = set(train.values() + dev.values() + test.values())
self.assertEqual(len(s), N)
def test02_queryAttacks(self):
self.queryAttackType('grandtest', 1000)
def test03_queryPrintAttacks(self):
self.queryAttackType('print', 200)
def test04_queryMobileAttacks(self):
self.queryAttackType('mobile', 400)
def test05_queryHighDefAttacks(self):
self.queryAttackType('highdef', 400)
def test06_queryPhotoAttacks(self):
self.queryAttackType('photo', 600)
def test07_queryVideoAttacks(self):
self.queryAttackType('video', 400)
def test08_queryEnrollments(self):
db = replay_db.Database()
f = db.files(cls='enroll')
self.assertEqual(len(set(f.values())), 100) #50 clients, 2 conditions
for k,v in f.items():
self.assertTrue(v.find('enroll') != -1)
def test09_manage_get(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay get --dry-run'.split()), 0)
self.assertEqual(main('replay get --version=bla --dry-run'.split()), 0)
self.assertEqual(main('replay get --version=1.3.4 --verbose --dry-run'.split()), 0)
def test10_manage_put(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay put --dry-run tmp'.split()), 0)
self.assertEqual(main('replay put --version=bla --dry-run tmp'.split()), 0)
self.assertEqual(main('replay put --version=1.3.4 --verbose --dry-run tmp'.split()), 0)
def test11_manage_location(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay location'.split()), 0)
def test12_manage_dumplist_1(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay dumplist --self-test'.split()), 0)
def test13_manage_dumplist_2(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay dumplist --class=attack --group=devel --support=hand --protocol=highdef --self-test'.split()), 0)
def test14_manage_checkfiles(self):
from bob.db.script.dbmanage import main
self.assertEqual(main('replay checkfiles --self-test'.split()), 0)
setup.py 0 → 100644
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Sex 10 Ago 2012 14:22:33 CEST
from setuptools import setup, find_packages
# The only thing we do in this file is to call the setup() function with all
# parameters that define our package.
setup(
name='bob.db.replay',
version='master',
description='Replay Attack Database Access API for Bob',
url='http://github.com/bioidiap/bob.db.replay',
license='LICENSE.txt',
author_email='Andre Anjos <andre.anjos@idiap.ch>',
#long_description=open('doc/howto.rst').read(),
# This line is required for any distutils based packaging.
packages=find_packages(),
# Always include your .sql3 file with the package data. This will make sure
# it is packaged correctly for PyPI uploads or for any other package
# management system such as Ubuntu's.
package_data = {
'replay': [
'db/db.sql3',
],
},
install_requires=[
"bob == master", # base signal proc./machine learning library
],
entry_points={
'console_scripts': [
# for tests or db creation, enable the following line:
#'driver.py = bob.db.script.dbmanage:main',
],
'bob.db': [
'replay = replay.db',
]
},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment