This package is part of the signal-processing and machine learning toolbox Bob.
Motivation
This package provides a way to define efficient processing pipelines, based on the concept of "streams", to load and process or save time series data stored in hdf5 files. The stream abstraction allows to define pipelines of processing steps with an easy syntax, and provides a way to access the processed data with a "numpy-like" api. It was designed to minimize computation and disk access by loading only necessary data and buffering "frames" of processed data.
Installation
Complete bob's installation instructions. Then, to install this package, run:
$ conda install bob.io.stream
Example
To use bob.io.stream
to read/write data for a new project with a sensor, it is recommended to follow these steps:
1. Write a python file defining the metadata that you will need for your streams and streamfiles. There should be only one such file per project to define the metadata for the whole project, however the metadata values don't matter in this file, they will be read/written for each data file.
import datetime import numpy as np from bob.io.stream import MetaDatum ### Streamfile metadata ### Version = MetaDatum("version", value="v1.0.1") # eg: version of the sensor of the project DateTime = MetaDatum("datetime", value=datetime.datetime.now()) # eg: date the data file is written. ### Stream metadata ### # global stream metadata, eg parameter of the sensor that captured this stream SensorParam1 = MetaDatum("param1", value=1000, unit="xx") SensorParam2 = MetaDatum("param2", value=2.1, unit="xx") # This file should also define the metadata that will be used during the processing ProcessingParam1 = MetaDatum("procparam1", value="xyz") # Metadata that store information for each point of the data time serie, for instance timestamps that keep track # of when each data point in the time serie was recorded. Values don't matter, but they should be a numpy array. Timestamps = MetaDatum("timestamps", value=np.nan * np.empty(20), unit="1e-7s")
2. Write a configuration file defining your processing pipeline, and set the stream's metadata. Unlike the previous file, there can (and should) be a config file for each processing or acquisition that is performed.
from bob.io.stream import Stream, MetaData # import the metadata instances from your metadata file. from myproject.metadata import Version, DateTime, SensorParam1, SensorParam2, ProcessingParam1, Timestamps ###### StreamFile metadata ###### stream_file_metadata = MetaData() stream_file_metadata.add_metadatum(Version) stream_file_metadata.add_metadatum(DateTime) ###### Raw sensor stream metadata ###### _stream_metadata = MetaData() _stream_metadata.add_metadatum(Timestamps) _stream_metadata.add_metadatum(SensorParam1) _stream_metadata.add_metadatum(SensorParam2) # raw stream inherit all their metadata _raw_stream_metadata_info = { Timestamps.name: {"per_frame": True, "herited": True}, SensorParam1.name: {"per_frame": False, "herited": True}, SensorParam2.name: {"per_frame": False, "herited": True}, } # Eg there are 3 data streams in the StreamFile _stream1 = Stream("stream1", metadata=_stream_metadata, metadata_info=_raw_stream_metadata_info) _stream2 = Stream("stream2", metadata=_stream_metadata, metadata_info=_raw_stream_metadata_info) _stream3 = Stream("stream3", metadata=_stream_metadata, metadata_info=_raw_stream_metadata_info) ###### Processing streams ###### # define your pipeline here, eg views and processing filters # view streams inherit their metadata from their parents _stream1_view = stream1[1:12:3] _stream2_view = stream2[2::2].normalize() # In general, stream filters don't automatically inherit metadata, as the metadata could change _stream2_view_metadata = MetaData() _stream2_view_metadata.add_metadatum(ProcessingParam1) _stream2_view.set_metadata(_stream2_view_metadata, metadata_info={"per_frame": False, "herited": False}) _processing_stream1 = _stream1_view.adjust( _stream2_view, metadata=_stream_metadata, metadata_info=_raw_stream_metadata_info ) _pipeline_output = _processing_stream1.stack(_stream2_view).stack(_stream3) # It is recommended to consider the inner variable of the config file as private # The stream to be used for the processing can be exposed, for instance in a dictionary streams = { "processing_stream": _processing_stream1, "nn_input": _pipeline_output }
-
Use your streams from other python scripts (eg training scripts).
from bob.io.stream import StreamFile, Stream from bob.io.stream.utils import import_config from myproject import mymodel data_config = import_config("path/to/config/file.py") with StreamFile("data_file.h5", data_config.stream_file_metadata, mode="r") as data_f: input = config.streams["nn_input"] input.set_source(data_f) # do stuff with input data (and metadata) mymodel(input[0:5])
Contact
For questions or reporting issues to this software package, contact our development mailing list.