Commit 4cdcd6fd authored by Vincent POLLET's avatar Vincent POLLET
Browse files

Merge sliced_stream branch

parents 7fe1e766 02f5832d
Pipeline #43137 failed with stage
in 4 minutes and 59 seconds
......@@ -77,14 +77,19 @@ assert(isinstance(data, np.ndarray))
assert(data.shape == data_a[1].shape)
assert(np.array_equal(data, data_a[1]))
# test slicing in first dimension
# test view
stream_sl = stream_a[1:4]
data_sl = data_a[1:4]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
#assert(np.array_equal(stream_sl.load(), data_sl))
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[1::3]
data_sl = data_a[1::3]
assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
stream_sl = stream_a[:,1]
data_sl = data_a[:,1]
......@@ -104,6 +109,39 @@ assert(stream_sl.shape == data_sl.shape)
assert(stream_sl.ndim == data_sl.ndim)
assert(np.array_equal(stream_sl.load(), data_sl))
for t in tests:
s = slice(t[0], t[1], t[2])
dd = data_a[s]
sd = stream_a[s]
assert(sd.shape == dd.shape)
assert(sd.ndim == dd.ndim)
assert(np.array_equal(sd.load(), dd))
# test put method & simple filters
def test(data):
assert(np.array_equal(data, data_a[0]))
stream0 = Stream('test')
stream1 = stream0.filter(process_frame = lambda data: data)
stream2 = stream1.filter(process_frame = test)
stream0.put(data_a[0])
# test saving
fo = StreamFile('api_test2.h5', mode='w')
stream = Stream('test')
save_filter = stream.save(fo)
for i in range(data_a.shape[0]):
stream.put(data_a[i])
# test written file
fi = StreamFile('api_test2.h5', mode='r')
stream = Stream('test', fi)
data = stream.load()
assert(np.array_equal(data, data_a))
###########
#fo = StreamFile('api_test.h5', 'w')
......
......@@ -5,7 +5,7 @@ from functools import wraps
import numpy as np
from builtins import str
from .utils import StreamArray
from .utils import StreamArray, get_index_list, get_axis_size
from .stream_file import StreamFile
......@@ -18,12 +18,16 @@ class Stream:
def __init__(self, name=None, parent=None):
self.name = name
self.parent = parent
self.child = None
if isinstance(parent, Stream):
parent.child = self
self.reset()
# reset stream
def reset(self):
self._loaded = None
self._data = None
self._shape = None
self.__bounding_box = StreamArray(self)
self.__image_points = StreamArray(self)
......@@ -63,14 +67,34 @@ class Stream:
# shape property
@property
def shape(self):
# if parent is file return dataset shape
if isinstance(self.parent, StreamFile):
return self.parent.get_stream_shape(self.name)
else:
# if has parent return parent shape
elif self.parent is not None:
return self.parent.shape
# if parent is None return internal _shape
else:
return self._shape
# recursively set shape (if not yet defined)
@shape.setter
def shape(self, value):
raise Exception("not yet implemented")
# can only set shape if undefined
if self.shape is not None:
raise Exception("shape is already set")
# set dataset dimension (should this be allowed?)
if isinstance(self.parent, StreamFile):
parent.set_dataset_shape(name=self.name, shape=value)
# set recursively
elif self.parent is not None:
self.parent.shape = value
# end of recursion
else:
if isinstance(value, tuple):
self._shape = value
else:
raise(Exception('shape must be a tuple of int or None'))
# ndim property
@property
......@@ -89,7 +113,7 @@ class Stream:
def timestamps(self, value):
raise Exception("not yet implemented")
# camera
# camera property
@property
def camera(self):
if isinstance(self.parent, StreamFile):
......@@ -101,7 +125,7 @@ class Stream:
def camera(self, value):
raise Exception("not yet implemented")
# bounding_box
# bounding_box property
@property
def bounding_box(self):
if isinstance(self.parent, StreamFile):
......@@ -109,7 +133,7 @@ class Stream:
else:
return self.parent.bounding_box
# image points
# image points property
@property
def image_points(self):
if isinstance(self.parent, StreamFile):
......@@ -149,55 +173,9 @@ class Stream:
def __setitem__(self, index, data):
raise Exception("not yet implemented")
# get list of frame indices
def get_frame_indices(self, index):
# None index is equivalent to [:] i.e. slice(None, None, None)
if index is None:
index = slice(None, None, None)
# frame index transform to list
if isinstance(index, int):
indices = [index]
# slice transform to list
elif isinstance(index, slice):
# start value: handle None and negative
if index.start is not None:
if index.start < 0:
start = self.shape[0] + index.start
else:
start = index.start
# boundary case
if start >= self.shape[0]:
start = self.shape[0] - 1
else:
start = 0
# stop value: handle None and negative
if index.stop is not None:
if index.stop < 0:
stop = self.shape[0] + index.stop
else:
stop = index.stop
# boundary case
if stop >= self.shape[0]:
stop = self.shape[0] - 1
else:
stop = self.shape[0]
# step value: handle None
if index.step is not None:
step = index.step
else:
step = 1
# generate list
indices = list(range(start, stop, step))
# pass lists thru
elif isinstance(index, list):
indices = index
else:
raise Exception("index can only be None, int, slice or list")
return indices
# load one or several frames
def load(self, index=None):
indices = self.get_frame_indices(index)
indices = get_index_list(index, self.shape[0])
# return buffered data OR load from file OR process data
if self._loaded == indices and self._data is not None:
# print('loaded', self.name)
......@@ -208,18 +186,19 @@ class Stream:
self._loaded = indices
return self._data
# get size of a given (sliced) axis
def get_axis_size(self, axis, _slice=None):
if _slice is None:
return self.shape[axis]
else:
if isinstance(_slice, int):
return 1
elif isinstance(_slice, slice):
# TODO check this works always
return len(range(*_slice.indices(self.shape[axis])))
else:
raise Exception('_slice can be None, int or slice')
# put one frame downstream
# TODO define behaviour across all states
def put(self, data, timestamp=None):
# set _shape to keep track of size of frames
# TODO shape property (and ndim property)
if self._shape is None:
self._shape = tuple([None, *data.shape])
# check if same shape
elif data.shape != self._shape[1:]:
raise(Exception('data must be the same shape as previous frames, use reset() to clear shape.'))
# put downstream
if self.child is not None:
self.child.put(data, timestamp)
# filters
def get_available_filters(self):
......@@ -244,25 +223,27 @@ def stream_filter(name):
################################################################################
# Stream Filters
### Filter general class ###
### Filter general class & simple filter ###
@stream_filter('filter')
class StreamFilter(Stream):
def __init__(self, name, parent):
super().__init__(name=name, parent=parent)
pass
def get_frame_indices(self, index):
return super().get_frame_indices(index)
def __init__(self, name, parent, process_frame=None):
self.__process_frame = process_frame
self.filter_name = name
super().__init__(name=parent.name, parent=parent)
def process(self, data, indices):
assert isinstance(indices, list)
return np.stack([self.process_frame(data[i], i, indices[i]) for i in range(data.shape[0])])
def process_frame(self, data, data_index, stream_index):
return data
if self.__process_frame is not None:
return self.__process_frame(data)
else:
return data
# load one or several frames
def load(self, index=None):
indices = self.get_frame_indices(index)
indices = get_index_list(index, self.shape[0])
# return buffered data OR process data
if self._loaded == indices and self._data is not None:
# print('loaded', self.name)
......@@ -274,11 +255,28 @@ class StreamFilter(Stream):
self._loaded = indices
return self._data
# put one frame downstream
# TODO define behaviour across all states
def put(self, data, timestamp=None):
# process data
# TODO check shape
# -1 implies frame is the latest
indices = [-1]
data = np.stack([data])
data = self.process(data, indices)
data = data[0]
# buffer data
self._loaded = indices
self._data = data
# put downstream
if self.child is not None:
self.child.put(data, timestamp)
################################################################################
### astype ###
# TODO make that work more as numpy...
@stream_filter("astype")
class AsType(StreamFilter):
class StreamAsType(StreamFilter):
def __init__(self, name, parent, dtype=None):
super().__init__(name=name, parent=parent)
if dtype is not None:
......@@ -342,7 +340,7 @@ class StreamView(StreamFilter):
if self.bulk_view is not None and self.parent.ndim <= len(self.bulk_view):
raise Exception('view dimension exceed parent dimension')
# first dimension ...
__shape = [self.parent.get_axis_size(0, self.frame_view)]
__shape = [get_axis_size(self.parent.shape, 0, self.frame_view)]
# ... and others
for d in range(1, self.parent.ndim):
if self.bulk_view is not None and d-1 < len(self.bulk_view):
......@@ -351,9 +349,9 @@ class StreamView(StreamFilter):
if isinstance(__view_index, int):
pass
else:
__shape.append(self.parent.get_axis_size(d, __view_index))
__shape.append(get_axis_size(self.parent.shape, d, __view_index))
else:
__shape.append(self.parent.get_axis_size(d))
__shape.append(get_axis_size(self.parent.shape, d))
return tuple(__shape)
# ndim property
......@@ -368,17 +366,11 @@ class StreamView(StreamFilter):
__ndim -= 1
return __ndim
# translate frame indices
def get_frame_indices(self, index):
if self.frame_view is None:
return super().get_frame_indices(index)
elif isinstance(self.frame_view, slice):
# TODO perform translation....
return super().get_frame_indices(index)
# load one or several frames
def load(self, index=None):
indices = self.get_frame_indices(index)
print('view load', indices)
parent_indices = get_index_list(self.frame_view, self.parent.shape[0])
view_indices = get_index_list(index, self.shape[0])
indices = [parent_indices[i] for i in view_indices]
return super().load(indices)
def process(self, data, indices):
......@@ -394,4 +386,62 @@ class StreamView(StreamFilter):
return super().process(data, indices)
def process_frame(self, data, data_index, stream_index):
return data[self.__bulk_view_full]
\ No newline at end of file
return data[self.__bulk_view_full]
################################################################################
### ajust ###
@stream_filter("adjust")
class StreamAdjust(StreamFilter):
def __init__(self, adjust_to, name, parent):
super().__init__(name=name, parent=parent)
self.adjust_to = adjust_to
def set_source(self, src):
super().set_source(src)
self.adjust_to.set_source(src)
@property
def shape(self):
return (self.adjust_to.shape[0], self.parent.shape[1], self.parent.shape[2], self.parent.shape[3])
@property
def timestamps(self):
return self.adjust_to.timestamps
def load(self, index):
# TODO load only relevant data if cropped
if isinstance(index, tuple):
index = index[0]
print("WARNING: cropping not yet implemented")
# original stream indices
old_indices = get_index_list(index, self.shape[0])
selected_timestamps = [self.adjust_to.timestamps[i] for i in old_indices]
kdtree = cKDTree(self.parent.timestamps[:, np.newaxis])
def get_index(val, kdtree):
_, i = kdtree.query(val, k=1)
return i
new_indices = [get_index(ts[np.newaxis], kdtree) for ts in selected_timestamps]
if False:
print("DEBUG: indices alignement:")
print(old_indices)
print(new_indices)
print(self.parent.timestamps)
print(self.adjust_to.timestamps)
return super().load(new_indices)
################################################################################
### save ###
@stream_filter("save")
class StreamAdjust(StreamFilter):
def __init__(self, file, name, parent):
super().__init__(name=name, parent=parent)
self.file = file
def put(self, data, timestamp=None):
if self.file is not None and isinstance(self.file, StreamFile):
self.file.put_frame(self.name, data)
else:
raise(Exception('save: file is not defined'))
return data
\ No newline at end of file
......@@ -232,3 +232,7 @@ class StreamFile:
# TODO rotate if requested
return data
# write frame data
def put_frame(self, name, data):
self.hdf5_file.append(name, data)
......@@ -41,6 +41,67 @@ def _crop(video, mask, crop):
mask = mask[:, starty:starty + height, startx:startx + width]
return video, mask
################################################################################
# return a list of indices given an index (int / slice / list / None) and a size
# TODO: check all boundary cases
def get_index_list(index, size):
# None index is equivalent to [:] i.e. slice(None, None, None)
if index is None:
index = slice(None, None, None)
# frame index transform to list
if isinstance(index, int):
indices = [index]
# slice transform to list
elif isinstance(index, slice):
# start value: handle None and negative
if index.start is not None:
if index.start < 0:
start = size + index.start
else:
start = index.start
# boundary case
if start >= size:
start = size - 1
else:
start = 0
# stop value: handle None and negative
if index.stop is not None:
if index.stop < 0:
stop = size + index.stop
else:
stop = index.stop
# boundary case
if stop >= size:
stop = size - 1
else:
stop = size
# step value: handle None
if index.step is not None:
step = index.step
else:
step = 1
# generate list
indices = list(range(start, stop, step))
# pass lists thru
elif isinstance(index, list):
indices = index
else:
raise Exception("index can only be None, int, slice or list")
return indices
################################################################################
# get size of a given (sliced) axis
def get_axis_size(shape, axis, _slice=None):
if _slice is None:
return shape[axis]
else:
if isinstance(_slice, int):
return 1
elif isinstance(_slice, slice):
# TODO check this works always
return len(range(*_slice.indices(shape[axis])))
else:
raise Exception('_slice can be None, int or slice')
################################################################################
class StreamArray():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment