Commit d58c61d1 authored by Vincent POLLET's avatar Vincent POLLET
Browse files

Merge branch 'unit_test' into 'sliced_stream'

Unit test

See merge request !12
parents 02f5832d 9e5b07a6
Pipeline #43121 passed with stage
in 9 minutes and 33 seconds
from bob.io.stream import Stream, StreamFile, StreamFilter, stream_filter #, StreamServer, StreamRemote
import numpy as np
import bob.io.base
# create data sets
num_frame = 10
num_chan = 3
num_width = 40
num_height = 30
data_shape = (num_frame, num_chan, num_width, num_height)
data_a = np.random.random_sample(data_shape)
data_b = np.random.random_integers(5000, size=data_shape)
assert(data_a.shape == data_shape)
assert(data_b.shape == data_shape)
# create data file
f = bob.io.base.HDF5File('api_test.h5', 'w')
f.set('data_a', data_a)
f.set('data_b', data_b)
del f
# TODO timestamps
# TODO config
# minimal streams with no config
f = StreamFile('api_test.h5')
stream_a = Stream('data_a', f)
stream_b = Stream('data_b', f)
assert(stream_a.shape == data_a.shape)
assert(stream_b.shape == data_b.shape)
assert(stream_a.timestamps == None)
assert(stream_b.timestamps == None)
assert(stream_a.camera == None)
assert(stream_b.camera == None)
# load full datasets
ld_a = stream_a.load()
ld_b = stream_b.load()
assert(ld_a.shape == data_a.shape)
assert(ld_b.shape == data_b.shape)
assert(np.array_equal(ld_a, data_a))
assert(np.array_equal(ld_b, data_b))
# try some indices slices cases and compare data
tests = [ ( None, None, None),
( None, 3, None),
( 5, None, None),
( None, None, 3),
( 1, 10, 3),
( 9, 0, -3),
( -5, -1, None),
( 10, 0, -3)]
for t in tests:
s = slice(t[0], t[1], t[2])
dd = data_a[s]
sd = stream_a.load(s)
assert(np.array_equal(sd, dd))
# test loading one frame with [] operator
# TODO should that really be the right behaviour???
data = stream_a[1]
assert(isinstance(data, np.ndarray))
assert(data.shape == data_a[1].shape)
assert(np.array_equal(data, data_a[1]))
# test view
stream_sl = stream_a[1:4]
data_sl = data_a[1:4]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[1::3]
data_sl = data_a[1::3]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,1]
data_sl = data_a[:,1]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,:,1:2:3,4:5:6]
data_sl = data_a[:,:,1:2:3,4:5:6]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,:,-1:-20:3,-4:5:-6]
data_sl = data_a[:,:,-1:-20:3,-4:5:-6]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
for t in tests:
s = slice(t[0], t[1], t[2])
dd = data_a[s]
sd = stream_a[s]
assert(sd.shape == dd.shape)
assert(sd.ndim == dd.ndim)
assert(np.array_equal(sd.load(), dd))
# test put method & simple filters
def test(data):
assert(np.array_equal(data, data_a[0]))
stream0 = Stream('test')
stream1 = stream0.filter(process_frame = lambda data: data)
stream2 = stream1.filter(process_frame = test)
stream0.put(data_a[0])
# test saving
fo = StreamFile('api_test2.h5', mode='w')
stream = Stream('test')
save_filter = stream.save(fo)
for i in range(data_a.shape[0]):
stream.put(data_a[i])
# test written file
fi = StreamFile('api_test2.h5', mode='r')
stream = Stream('test', fi)
data = stream.load()
assert(np.array_equal(data, data_a))
###########
#fo = StreamFile('api_test.h5', 'w')
#so = Stream('out')
#co = so.clean()
#co.save(fo)
#for i in range(10):
# so[i] = data
# or
#
# so.append(data)
# or
#fo = StreamFile('test.h5', 'w')
#so = Stream('out')
#co = so.clean()
#for i in range(10):
# so[i] = data
#fo.save(co)
......@@ -2,11 +2,12 @@ import json
from copy import deepcopy
from string import Formatter
from functools import wraps
import numpy as np
from builtins import str
from .utils import StreamArray, get_index_list, get_axis_size
from scipy.spatial import cKDTree
import numpy as np
from .utils import StreamArray, get_index_list, get_axis_size
from .stream_file import StreamFile
......@@ -334,6 +335,17 @@ class StreamView(StreamFilter):
else:
raise Exception('view_indices should be a tuple of int / slice object(s) or None')
if not all(self.shape): # true if a dimension in shape is 0
if not all(self.shape):
raise ValueError(
str(view_indices)
+ " in stream with shape "
+ str(parent.shape)
+ " results in empty stream. (shape "
+ str(self.shape)
+ ")"
)
# shape property
@property
def shape(self):
......
......@@ -25,7 +25,7 @@ class StreamFile:
self.camera_config = None
def __enter__(self):
pass
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.hdf5_file, HDF5File):
......
import numpy as np
from scipy.spatial import cKDTree
from skimage import transform
import cv2 as cv
......
......@@ -5,15 +5,16 @@
Test Units
"""
# ==============================================================================
from bob.io.stream import StreamFile, Stream
from bob.ip.stereo import StereoParameters
from bob.ip.stereo.test.test import is_close_enough
import os
from bob.io.base import load
import numpy as np
from bob.io.base import load, HDF5File
from pkg_resources import resource_filename
from bob.ip.stereo import StereoParameters
from bob.ip.stereo.test.test import is_close_enough
import numpy as np
from bob.io.stream import StreamFile, Stream
# ==============================================================================
......@@ -29,11 +30,115 @@ def resource_path(relative_path, package="bob.io.stream"):
return resource_filename(package, relative_path)
def test_stream_write():
"""Test writing and reading back a file through Stream and StreamFile."""
test_data = [
np.arange(5 * 3 * 40 * 52, dtype=np.int16).reshape((5, 3, 40, 52)),
np.arange(5 * 1 * 5 * 5, dtype=np.int8).reshape((5, 1, 5, 5)),
np.arange(1 * 1 * 500 * 400).astype(np.float).reshape((1, 1, 500, 400)),
np.arange(12 * 52).astype(np.float64).reshape((12, 52)),
]
for a_test_data in test_data:
with StreamFile(resource_path("test/data/save_test.h5"), mode="w") as output_file:
stream = Stream("test_data")
save_filter = stream.save(output_file)
for i in range(a_test_data.shape[0]):
stream.put(a_test_data[i])
with StreamFile(resource_path("test/data/save_test.h5"), mode="r") as input_file:
stream = Stream("test_data", input_file)
data = stream.load()
assert np.array_equal(data, a_test_data)
assert data.dtype == a_test_data.dtype
os.remove(resource_path("test/data/save_test.h5"))
def test_stream():
"""
Test that a few transforms (stereo, reproject, adjust, colormap, select, warp) applied with streams provide a
similar result to saved groundtruth.
"""
"""Test some functionality of the stream class: shape, ndim, slicing (view), etc..."""
# create data sets
data_shape = (10, 3, 40, 30) # #frames, #channels, with, height
data_a = np.random.random_sample(data_shape)
data_b = np.random.random_integers(5000, size=data_shape)
# create data file
f = HDF5File(resource_path("test/data/stream_test.h5"), "w")
f.set("data_a", data_a)
f.set("data_b", data_b)
del f
# Streams attributes when config is specified
f = StreamFile(
resource_path("test/data/input_example.h5"),
resource_path("config/idiap_face_streams.json"),
resource_path("config/idiap_face_calibration.json", "bob.ip.stereo"),
)
color = Stream("color", f)
assert color.shape == (1, 3, 1920, 1200)
assert color.timestamps[0] == 46399548
assert color.camera is not None
# Streams attributes when not specified.
f = StreamFile(resource_path("test/data/stream_test.h5"))
stream_a = Stream("data_a", f)
stream_b = Stream("data_b", f)
assert stream_a.shape == data_a.shape
assert stream_b.shape == data_b.shape
assert stream_a.timestamps == None
assert stream_b.timestamps == None
assert stream_a.camera == None
assert stream_b.camera == None
# Test loading entire datasets
ld_a = stream_a.load()
ld_b = stream_b.load()
assert ld_a.shape == data_a.shape
assert ld_b.shape == data_b.shape
assert np.array_equal(ld_a, data_a)
assert np.array_equal(ld_b, data_b)
# Test slicing over the first dimension
test_slices = [
slice(None, None, None),
slice(5, None, None),
slice(None, 3, None),
slice(None, None, 3),
slice(1, 10, 3),
slice(9, 0, -3),
slice(-5, -1, None),
slice(10, 0, -3),
]
for a_slice in test_slices:
gt_slice = data_a[a_slice]
s_slice = stream_a[a_slice].load()
s_l_slice = stream_a.load(a_slice)
assert np.array_equal(s_slice, gt_slice), "Slice " + str(a_slice) + " assertation failed."
assert np.array_equal(s_l_slice, gt_slice), "Slice " + str(a_slice) + " assertation failed."
# test slicing over other dimensions:
test_indices = [
(slice(None, None, None), 1),
(slice(None, None, None), slice(None, None, None), slice(1, 2, 3), slice(4, 5, 6)),
(slice(None, None, None), slice(None, None, None), slice(-1, -6, -3), slice(-4, 5, -6)),
]
for index in test_indices:
gt_slice = data_a[index]
s_slice = stream_a[index]
assert s_slice.shape == gt_slice.shape, "index " + str(index) + " assertation failed."
assert s_slice.ndim == gt_slice.ndim, "index " + str(index) + " assertation failed."
assert np.array_equal(s_slice.load(), gt_slice), "index " + str(index) + " assertation failed."
os.remove(resource_path("test/data/stream_test.h5"))
def test_filters():
"""Test that a few filters provide a similar result to saved groundtruth."""
gt_color = load(resource_path("test/data/reprojection_color.png"))
gt_depth = load(resource_path("test/data/reprojection_depth.png"))
......@@ -80,6 +185,10 @@ def test_stream():
warp_swir_norm = swir_norm.warp(color)
warp_thermal = thermal.normalize().warp(color)
# User defined operations through "filter": eg clipping values bellow average
test_func = lambda data: np.where(data > data.mean(), data - data.mean(), data.mean())
user_filter = color.filter(process_frame=test_func)
# landmarks and bounding box
color.bounding_box[0] = bounding_box
color.image_points[0] = landmarks
......@@ -104,3 +213,6 @@ def test_stream():
assert np.array_equal(rep_color.bounding_box[0], gt_bounding_box)
assert np.allclose(rep_color.image_points[0], gt_landmarks)
# user filter results
assert np.array_equal(user_filter[0], test_func(color[0]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment