From a31bbd3923434aab6ba69d4e63880e35a34a0c94 Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 15 May 2020 18:36:24 +0200 Subject: [PATCH 1/6] Make a sampleset work transparently with list of DelayedSamples --- bob/pipelines/sample.py | 10 ++++++++++ bob/pipelines/tests/test_samples.py | 21 ++++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/bob/pipelines/sample.py b/bob/pipelines/sample.py index 746bfb0..8318d1e 100644 --- a/bob/pipelines/sample.py +++ b/bob/pipelines/sample.py @@ -99,19 +99,29 @@ class SampleSet(MutableSequence, _ReprMixin): _copy_attributes(self, parent.__dict__) _copy_attributes(self, kwargs) + + def _load(self): + if isinstance(self.samples, DelayedSample): + self.samples = self.samples.data + def __len__(self): + self._load() return len(self.samples) def __getitem__(self, item): + self._load() return self.samples.__getitem__(item) def __setitem__(self, key, item): + self._load() return self.samples.__setitem__(key, item) def __delitem__(self, item): + self._load() return self.samples.__delitem__(item) def insert(self, index, item): + self._load() # if not item in self.samples: self.samples.insert(index, item) diff --git a/bob/pipelines/tests/test_samples.py b/bob/pipelines/tests/test_samples.py index fc19d76..d622b70 100644 --- a/bob/pipelines/tests/test_samples.py +++ b/bob/pipelines/tests/test_samples.py @@ -2,7 +2,10 @@ import bob.pipelines as mario import numpy import copy - +import pickle +import tempfile +import functools +import os def test_sampleset_collection(): @@ -28,3 +31,19 @@ def test_sampleset_collection(): # Testing iterator for i in sampleset: assert isinstance(i, mario.Sample) + + + def _load(path): + return pickle.loads(open(path, "rb").read()) + + # Testing delayed sample in the sampleset + with tempfile.TemporaryDirectory() as dir_name: + + samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)] + filename = os.path.join(dir_name, "samples.pkl") + with open(filename, "wb") as f: + f.write(pickle.dumps(samples)) + + sampleset = mario.SampleSet(mario.DelayedSample(functools.partial(_load, filename)), key=1) + + assert len(sampleset)==n_samples \ No newline at end of file -- GitLab From 6d4994a8237a5261fbd9ca1b0599e4eac0ecac0d Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 22 May 2020 12:30:37 +0200 Subject: [PATCH 2/6] Adjusted some defaults for our SGE cluster --- bob/pipelines/distributed/sge.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bob/pipelines/distributed/sge.py b/bob/pipelines/distributed/sge.py index 7ee17c5..3c1eea9 100644 --- a/bob/pipelines/distributed/sge.py +++ b/bob/pipelines/distributed/sge.py @@ -262,6 +262,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): # removal before we remove it. # Here the goal is to wait 2 minutes before scaling down since # it is very expensive to get jobs on the SGE grid + self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=60, interval=1000) def _get_worker_spec_options(self, job_spec): @@ -277,6 +278,9 @@ class SGEMultipleQueuesCluster(JobQueueCluster): "io_big=TRUE," if "io_big" in job_spec and job_spec["io_big"] else "" ) + memory = _get_key_from_spec(job_spec, "memory")[:-1] + new_resource_spec += f"mem_free={memory}," + queue = _get_key_from_spec(job_spec, "queue") if queue != "all.q": new_resource_spec += f"{queue}=TRUE" @@ -440,7 +444,9 @@ class SchedulerResourceRestriction(Scheduler): """ def __init__(self, *args, **kwargs): - super(SchedulerResourceRestriction, self).__init__(*args, **kwargs) + super(SchedulerResourceRestriction, self).__init__( + allowed_failures=15, synchronize_worker_interval="240s", *args, **kwargs, + ) self.handlers[ "get_no_worker_tasks_resource_restrictions" ] = self.get_no_worker_tasks_resource_restrictions -- GitLab From 38860b2106e986a36c2ef3642b3294ab868377ae Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 22 May 2020 12:31:43 +0200 Subject: [PATCH 3/6] Allowed to set the argument dask.bag.Bag.partition_size --- bob/pipelines/wrappers.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/bob/pipelines/wrappers.py b/bob/pipelines/wrappers.py index 202eee3..2c8a1e4 100644 --- a/bob/pipelines/wrappers.py +++ b/bob/pipelines/wrappers.py @@ -216,12 +216,13 @@ class CheckpointWrapper(BaseWrapper, TransformerMixin): features, com_feat_index = [], 0 for s, p, should_compute in zip(samples, paths, should_compute_list): if should_compute: - feat = computed_features[com_feat_index] - features.append(feat) + feat = computed_features[com_feat_index] com_feat_index += 1 # save the computed feature if p is not None: self.save(feat) + feat = self.load(s, p) + features.append(feat) else: features.append(self.load(s, p)) return features @@ -398,16 +399,20 @@ class ToDaskBag(TransformerMixin, BaseEstimator): Number of partitions used in :any:`dask.bag.from_sequence` """ - def __init__(self, npartitions=None, **kwargs): + def __init__(self, npartitions=None, partition_size=None, **kwargs): super().__init__(**kwargs) self.npartitions = npartitions + self.partition_size = partition_size def fit(self, X, y=None): return self def transform(self, X): logger.debug(f"{_frmt(self)}.transform") - return dask.bag.from_sequence(X, npartitions=self.npartitions) + if self.partition_size is None: + return dask.bag.from_sequence(X, npartitions=self.npartitions) + else: + return dask.bag.from_sequence(X, partition_size=self.partition_size) def _more_tags(self): return {"stateless": True, "requires_fit": False} -- GitLab From ab9cda40d8892e6c0670660a8669e5bc79e88416 Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 22 May 2020 16:21:48 +0200 Subject: [PATCH 4/6] Implemented some defaults for the Scheduler to work better in our cluster --- bob/pipelines/distributed/sge.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/bob/pipelines/distributed/sge.py b/bob/pipelines/distributed/sge.py index 3c1eea9..202521e 100644 --- a/bob/pipelines/distributed/sge.py +++ b/bob/pipelines/distributed/sge.py @@ -251,7 +251,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): loop=loop, silence_logs=silence_logs, asynchronous=asynchronous, - name=name, + name=name, ) max_jobs = get_max_jobs(sge_job_spec) @@ -279,7 +279,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): ) memory = _get_key_from_spec(job_spec, "memory")[:-1] - new_resource_spec += f"mem_free={memory}," + new_resource_spec += (f"mem_free={memory},") queue = _get_key_from_spec(job_spec, "queue") if queue != "all.q": @@ -289,7 +289,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): return { "queue": queue, - "memory": _get_key_from_spec(job_spec, "memory"), + "memory": "0", "cores": 1, "processes": 1, "log_directory": self.log_directory, @@ -299,7 +299,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): "protocol": self.protocol, "security": None, "resources": _get_key_from_spec(job_spec, "resources"), - "env_extra": self.env_extra, + "env_extra": self.env_extra, } def scale(self, n_jobs, sge_job_spec_key="default"): @@ -445,7 +445,11 @@ class SchedulerResourceRestriction(Scheduler): def __init__(self, *args, **kwargs): super(SchedulerResourceRestriction, self).__init__( - allowed_failures=15, synchronize_worker_interval="240s", *args, **kwargs, + idle_timeout=3600, + allowed_failures=500, + synchronize_worker_interval="240s", + *args, + **kwargs, ) self.handlers[ "get_no_worker_tasks_resource_restrictions" -- GitLab From dadcaec26970346d591993ab38a90e3cb6598052 Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 22 May 2020 16:25:15 +0200 Subject: [PATCH 5/6] Implemented functions to load and save samples from HDF5File --- bob/pipelines/__init__.py | 4 +- bob/pipelines/sample.py | 83 ++++++++++++++++++++++++++++- bob/pipelines/tests/test_samples.py | 62 ++++++++++++++++----- conda/meta.yaml | 1 + requirements.txt | 1 + 5 files changed, 136 insertions(+), 15 deletions(-) diff --git a/bob/pipelines/__init__.py b/bob/pipelines/__init__.py index 948ffdd..cb884d2 100644 --- a/bob/pipelines/__init__.py +++ b/bob/pipelines/__init__.py @@ -1,5 +1,5 @@ from . import utils -from .sample import Sample, DelayedSample, SampleSet +from .sample import Sample, DelayedSample, SampleSet, sample_to_hdf5, hdf5_to_sample from .wrappers import ( BaseWrapper, DelayedSamplesCall, @@ -8,7 +8,7 @@ from .wrappers import ( DaskWrapper, ToDaskBag, wrap, - dask_tags, + dask_tags, ) from . import distributed from . import transformers diff --git a/bob/pipelines/sample.py b/bob/pipelines/sample.py index 8318d1e..e060514 100644 --- a/bob/pipelines/sample.py +++ b/bob/pipelines/sample.py @@ -3,6 +3,8 @@ from collections.abc import MutableSequence, Sequence from .utils import vstack_features import numpy as np +import os +import h5py def _copy_attributes(s, d): @@ -24,6 +26,31 @@ class _ReprMixin: + ")" ) + def __eq__(self, other): + sorted_self = { + k: v for k, v in sorted(self.__dict__.items(), key=lambda item: item[0]) + } + sorted_other = { + k: v for k, v in sorted(other.__dict__.items(), key=lambda item: item[0]) + } + + for s, o in zip(sorted_self, sorted_other): + # Checking keys + if s != o: + return False + + # Checking values + if isinstance(sorted_self[s], np.ndarray) and isinstance( + sorted_self[o], np.ndarray + ): + if not np.allclose(sorted_self[s], sorted_other[o]): + return False + else: + if sorted_self[s] != sorted_other[o]: + return False + + return True + class Sample(_ReprMixin): """Representation of sample. A Sample is a simple container that wraps a @@ -99,7 +126,6 @@ class SampleSet(MutableSequence, _ReprMixin): _copy_attributes(self, parent.__dict__) _copy_attributes(self, kwargs) - def _load(self): if isinstance(self.samples, DelayedSample): self.samples = self.samples.data @@ -146,5 +172,60 @@ class SampleBatch(Sequence, _ReprMixin): def _reader(s): # adding one more dimension to data so they get stacked sample-wise return s.data[None, ...] + arr = vstack_features(_reader, self.samples, dtype=dtype) return np.asarray(arr, dtype, *args, **kwargs) + + +def sample_to_hdf5(sample, hdf5): + """ + Saves the content of sample to hdf5 file + + Paremeters: + ----------- + + sample: :any:`Sample` or :any:`DelayedSample` or :any:`list` + Sample to be saved + + hdf5: :any:`h5py.File` + Pointer to a HDF5 file for writing + + """ + if isinstance(sample, list): + for i, s in enumerate(sample): + group = hdf5.create_group(str(i)) + sample_to_hdf5(s, group) + else: + for s in sample.__dict__: + hdf5[s] = sample.__dict__[s] + + +def hdf5_to_sample(hdf5): + """ + Reads the content of a HDF5File and returns a :any:`Sample` + + Paremeters: + ----------- + + hdf5: :any:`h5py.File` + Pointer to a HDF5 file for reading + + """ + + # Checking if it has groups + has_groups = np.sum([isinstance(hdf5[k], h5py.Group) for k in hdf5.keys()]) > 0 + + if has_groups: + # If has groups, returns a list of Samples + samples = [] + for k in hdf5.keys(): + group = hdf5[k] + samples.append(hdf5_to_sample(group)) + return samples + else: + # If hasn't groups, returns a sample + sample = Sample(None) + for k in hdf5.keys(): + sample.__dict__[k] = hdf5[k].value + + return sample diff --git a/bob/pipelines/tests/test_samples.py b/bob/pipelines/tests/test_samples.py index d622b70..e8065ce 100644 --- a/bob/pipelines/tests/test_samples.py +++ b/bob/pipelines/tests/test_samples.py @@ -1,23 +1,32 @@ -import bob.pipelines as mario -import numpy +from bob.pipelines import ( + Sample, + DelayedSample, + SampleSet, + sample_to_hdf5, + hdf5_to_sample, +) +import bob.io.base +import numpy as np import copy import pickle import tempfile import functools import os +import h5py + def test_sampleset_collection(): n_samples = 10 - X = numpy.ones(shape=(n_samples, 2), dtype=int) - sampleset = mario.SampleSet( - [mario.Sample(data, key=str(i)) for i, data in enumerate(X)], key="1" + X = np.ones(shape=(n_samples, 2), dtype=int) + sampleset = SampleSet( + [Sample(data, key=str(i)) for i, data in enumerate(X)], key="1" ) assert len(sampleset) == n_samples # Testing insert - sample = mario.Sample(X, key=100) + sample = Sample(X, key=100) sampleset.insert(1, sample) assert len(sampleset) == n_samples + 1 @@ -30,20 +39,49 @@ def test_sampleset_collection(): # Testing iterator for i in sampleset: - assert isinstance(i, mario.Sample) - + assert isinstance(i, Sample) def _load(path): return pickle.loads(open(path, "rb").read()) # Testing delayed sample in the sampleset with tempfile.TemporaryDirectory() as dir_name: - - samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)] + + samples = [Sample(data, key=str(i)) for i, data in enumerate(X)] filename = os.path.join(dir_name, "samples.pkl") with open(filename, "wb") as f: f.write(pickle.dumps(samples)) - sampleset = mario.SampleSet(mario.DelayedSample(functools.partial(_load, filename)), key=1) + sampleset = SampleSet(DelayedSample(functools.partial(_load, filename)), key=1) + + assert len(sampleset) == n_samples + + +def test_sample_hdf5(): + n_samples = 10 + X = np.ones(shape=(n_samples, 2), dtype=int) + + samples = [Sample(data, key=str(i), subject="Subject") for i, data in enumerate(X)] + with tempfile.TemporaryDirectory() as dir_name: + + # Single sample + filename = os.path.join(dir_name, "sample.hdf5") + + with h5py.File(filename, "w", driver="core") as hdf5: + sample_to_hdf5(samples[0], hdf5) + + with h5py.File(filename, "r") as hdf5: + sample = hdf5_to_sample(hdf5) + + assert sample == samples[0] + + # List of samples + filename = os.path.join(dir_name, "samples.hdf5") + with h5py.File(filename, "w", driver="core") as hdf5: + sample_to_hdf5(samples, hdf5) + + with h5py.File(filename, "r") as hdf5: + samples_deserialized = hdf5_to_sample(hdf5) - assert len(sampleset)==n_samples \ No newline at end of file + compare = [a == b for a, b in zip(samples_deserialized, samples)] + assert np.sum(compare) == 10 diff --git a/conda/meta.yaml b/conda/meta.yaml index 7060167..fbad296 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -36,6 +36,7 @@ requirements: - dask-jobqueue - distributed - scikit-learn + - h5py test: imports: diff --git a/requirements.txt b/requirements.txt index 00990b5..75b3b71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ scikit-learn dask distributed dask-jobqueue +h5py \ No newline at end of file -- GitLab From 6a9d848e42a64a2e9cbf2a80be86329573dacd0c Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira Date: Fri, 22 May 2020 16:44:36 +0200 Subject: [PATCH 6/6] Fixed sphinx issue --- bob/pipelines/sample.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bob/pipelines/sample.py b/bob/pipelines/sample.py index e060514..90056ab 100644 --- a/bob/pipelines/sample.py +++ b/bob/pipelines/sample.py @@ -181,15 +181,14 @@ def sample_to_hdf5(sample, hdf5): """ Saves the content of sample to hdf5 file - Paremeters: - ----------- + Parameters + ---------- sample: :any:`Sample` or :any:`DelayedSample` or :any:`list` Sample to be saved - hdf5: :any:`h5py.File` + hdf5: `h5py.File` Pointer to a HDF5 file for writing - """ if isinstance(sample, list): for i, s in enumerate(sample): @@ -204,12 +203,11 @@ def hdf5_to_sample(hdf5): """ Reads the content of a HDF5File and returns a :any:`Sample` - Paremeters: - ----------- + Parameters + ---------- - hdf5: :any:`h5py.File` + hdf5: `h5py.File` Pointer to a HDF5 file for reading - """ # Checking if it has groups -- GitLab