Commit 9e5b07a6 authored by Vincent POLLET's avatar Vincent POLLET
Browse files

Remove temporary test file

parent ff8a2a1f
Pipeline #43092 passed with stage
in 5 minutes and 12 seconds
from bob.io.stream import Stream, StreamFile, StreamFilter, stream_filter #, StreamServer, StreamRemote
import numpy as np
import bob.io.base
# create data sets
num_frame = 10
num_chan = 3
num_width = 40
num_height = 30
data_shape = (num_frame, num_chan, num_width, num_height)
data_a = np.random.random_sample(data_shape)
data_b = np.random.random_integers(5000, size=data_shape)
assert(data_a.shape == data_shape)
assert(data_b.shape == data_shape)
# create data file
f = bob.io.base.HDF5File('api_test.h5', 'w')
f.set('data_a', data_a)
f.set('data_b', data_b)
del f
# TODO timestamps
# TODO config
# minimal streams with no config
f = StreamFile('api_test.h5')
stream_a = Stream('data_a', f)
stream_b = Stream('data_b', f)
assert(stream_a.shape == data_a.shape)
assert(stream_b.shape == data_b.shape)
assert(stream_a.timestamps == None)
assert(stream_b.timestamps == None)
assert(stream_a.camera == None)
assert(stream_b.camera == None)
# load full datasets
ld_a = stream_a.load()
ld_b = stream_b.load()
assert(ld_a.shape == data_a.shape)
assert(ld_b.shape == data_b.shape)
assert(np.array_equal(ld_a, data_a))
assert(np.array_equal(ld_b, data_b))
# try some indices slices cases and compare data
tests = [ ( None, None, None),
( None, 3, None),
( 5, None, None),
( None, None, 3),
( 1, 10, 3),
( 9, 0, -3),
( -5, -1, None),
( 10, 0, -3)]
for t in tests:
s = slice(t[0], t[1], t[2])
dd = data_a[s]
sd = stream_a.load(s)
assert(np.array_equal(sd, dd))
# test loading one frame with [] operator
# TODO should that really be the right behaviour???
data = stream_a[1]
assert(isinstance(data, np.ndarray))
assert(data.shape == data_a[1].shape)
assert(np.array_equal(data, data_a[1]))
# test view
stream_sl = stream_a[1:4]
data_sl = data_a[1:4]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[1::3]
data_sl = data_a[1::3]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,1]
data_sl = data_a[:,1]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,:,1:2:3,4:5:6]
data_sl = data_a[:,:,1:2:3,4:5:6]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,:,-1:-20:3,-4:5:-6]
data_sl = data_a[:,:,-1:-20:3,-4:5:-6]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
for t in tests:
s = slice(t[0], t[1], t[2])
dd = data_a[s]
sd = stream_a[s]
assert(sd.shape == dd.shape)
assert(sd.ndim == dd.ndim)
assert(np.array_equal(sd.load(), dd))
# test put method & simple filters
def test(data):
assert(np.array_equal(data, data_a[0]))
stream0 = Stream('test')
stream1 = stream0.filter(process_frame = lambda data: data)
stream2 = stream1.filter(process_frame = test)
stream0.put(data_a[0])
# test saving
fo = StreamFile('api_test2.h5', mode='w')
stream = Stream('test')
save_filter = stream.save(fo)
for i in range(data_a.shape[0]):
stream.put(data_a[i])
# test written file
fi = StreamFile('api_test2.h5', mode='r')
stream = Stream('test', fi)
data = stream.load()
assert(np.array_equal(data, data_a))
###########
#fo = StreamFile('api_test.h5', 'w')
#so = Stream('out')
#co = so.clean()
#co.save(fo)
#for i in range(10):
# so[i] = data
# or
#
# so.append(data)
# or
#fo = StreamFile('test.h5', 'w')
#so = Stream('out')
#co = so.clean()
#for i in range(10):
# so[i] = data
#fo.save(co)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment