algorithm.py 12.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
André Anjos's avatar
André Anjos committed
35
36


37
"""
38
39
40
==================
Algorithm executor
==================
André Anjos's avatar
André Anjos committed
41

42
A class that can setup and execute algorithm blocks on the backend
43
"""
André Anjos's avatar
André Anjos committed
44

45
import logging
André Anjos's avatar
André Anjos committed
46
47
import os
import simplejson
48
import zmq
André Anjos's avatar
André Anjos committed
49

50
51
52
53
54
from ..algorithm import Algorithm
from ..helpers import create_inputs_from_configuration
from ..helpers import create_outputs_from_configuration
from ..helpers import AccessMode
from .. import stats
André Anjos's avatar
André Anjos committed
55

56
from .loop import LoopChannel
57

58
59
logger = logging.getLogger(__name__)

André Anjos's avatar
André Anjos committed
60

61
class AlgorithmExecutor(object):
Philip ABBET's avatar
Philip ABBET committed
62
63
64
65
66
67
68
69
70
71
    """Executors runs the code given an execution block information

    Parameters:

      socket (zmq.Socket): A pre-connected socket to send and receive messages
        from.

      directory (str): The path to a directory containing all the information
        required to run the user experiment.

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.  """
André Anjos's avatar
André Anjos committed
92

93
94
95
96
97
98
99
100
101
102
103
    def __init__(
        self,
        socket,
        directory,
        dataformat_cache=None,
        database_cache=None,
        library_cache=None,
        cache_root="/cache",
        db_socket=None,
        loop_socket=None,
    ):
André Anjos's avatar
André Anjos committed
104

Philip ABBET's avatar
Philip ABBET committed
105
        self.socket = socket
106
        self.db_socket = db_socket
107
108
109
        self.loop_socket = loop_socket
        self.loop_channel = None

110
111
112
        self.configuration = os.path.join(directory, "configuration.json")
        with open(self.configuration, "rb") as f:
            self.data = simplejson.loads(f.read().decode("utf-8"))
113

114
        self.prefix = os.path.join(directory, "prefix")
115
        self._runner = None
André Anjos's avatar
André Anjos committed
116

117
        # Temporary caches, if the user has not set them, for performance
Philip ABBET's avatar
Philip ABBET committed
118
119
120
        database_cache = database_cache if database_cache is not None else {}
        dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
        library_cache = library_cache if library_cache is not None else {}
André Anjos's avatar
André Anjos committed
121

122
        # Load the algorithm
123
124
125
        self.algorithm = Algorithm(
            self.prefix, self.data["algorithm"], dataformat_cache, library_cache
        )
André Anjos's avatar
André Anjos committed
126

127
128
129
130
131
132
133
        if db_socket:
            db_access_mode = AccessMode.REMOTE
            databases = None
        else:
            db_access_mode = AccessMode.LOCAL
            databases = database_cache

134
135
        if self.algorithm.type == Algorithm.LEGACY:
            # Loads algorithm inputs
136
            (self.input_list, self.data_loaders) = create_inputs_from_configuration(
137
138
139
140
141
142
143
144
                self.data,
                self.algorithm,
                self.prefix,
                cache_root,
                cache_access=AccessMode.LOCAL,
                db_access=db_access_mode,
                socket=self.db_socket,
                databases=databases,
145
            )
André Anjos's avatar
André Anjos committed
146

147
148
            # Loads algorithm outputs
            (self.output_list, _) = create_outputs_from_configuration(
149
150
151
152
153
154
                self.data,
                self.algorithm,
                self.prefix,
                cache_root,
                input_list=self.input_list,
                data_loaders=self.data_loaders,
155
156
157
            )

        else:
158
            (self.input_list, self.data_loaders) = create_inputs_from_configuration(
159
160
161
162
163
164
165
166
                self.data,
                self.algorithm,
                self.prefix,
                cache_root,
                cache_access=AccessMode.LOCAL,
                db_access=db_access_mode,
                socket=self.db_socket,
                databases=databases,
167
168
169
170
            )

            # Loads algorithm outputs
            (self.output_list, _) = create_outputs_from_configuration(
171
172
173
174
175
176
                self.data,
                self.algorithm,
                self.prefix,
                cache_root,
                input_list=self.input_list,
                data_loaders=self.data_loaders,
177
                loop_socket=self.loop_socket,
178
            )
André Anjos's avatar
André Anjos committed
179

180
181
        if self.loop_socket:
            self.loop_channel = LoopChannel(self.loop_socket)
182
            self.loop_channel.setup(self.algorithm, self.prefix)
André Anjos's avatar
André Anjos committed
183

184
185
186
187
188
189
190
191
192
193
194
    @property
    def runner(self):
        """Returns the algorithm runner

        This property allows for lazy loading of the runner
        """

        if self._runner is None:
            self._runner = self.algorithm.runner()
        return self._runner

Philip ABBET's avatar
Philip ABBET committed
195
196
    def setup(self):
        """Sets up the algorithm to start processing"""
André Anjos's avatar
André Anjos committed
197

198
        retval = self.runner.setup(self.data["parameters"])
Philip ABBET's avatar
Philip ABBET committed
199
200
        logger.debug("User algorithm is setup")
        return retval
André Anjos's avatar
André Anjos committed
201

202
203
204
205
206
207
208
    def prepare(self):
        """Prepare the algorithm"""

        retval = self.runner.prepare(self.data_loaders)
        logger.debug("User algorithm is prepared")
        return retval

Philip ABBET's avatar
Philip ABBET committed
209
210
211
    def process(self):
        """Executes the user algorithm code using the current interpreter.
        """
André Anjos's avatar
André Anjos committed
212

213
        if self.algorithm.is_autonomous:
214
            if self.analysis:
215
216
217
                result = self.runner.process(
                    data_loaders=self.data_loaders, output=self.output_list[0]
                )
218
            else:
219
220
221
222
223
                result = self.runner.process(
                    data_loaders=self.data_loaders,
                    outputs=self.output_list,
                    loop_channel=self.loop_channel,
                )
224
225

            if not result:
Philip ABBET's avatar
Philip ABBET committed
226
                return False
André Anjos's avatar
André Anjos committed
227

228
        else:
229
            if self.algorithm.type == Algorithm.LEGACY:
230
231
232
233
                logger.warning(
                    "%s is using LEGACY I/O API, please upgrade this algorithm as soon as possible"
                    % self.algorithm.name
                )
234

235
236
237
238
239
240
241
242
            while self.input_list.hasMoreData():
                main_group = self.input_list.main_group
                main_group.restricted_access = False
                main_group.next()
                main_group.restricted_access = True

                if self.algorithm.type == Algorithm.LEGACY:
                    if self.analysis:
243
244
245
                        result = self.runner.process(
                            inputs=self.input_list, output=self.output_list[0]
                        )
246
                    else:
247
248
249
                        result = self.runner.process(
                            inputs=self.input_list, outputs=self.output_list
                        )
250

251
                elif self.algorithm.is_sequential:
252
                    if self.analysis:
253
254
255
256
257
                        result = self.runner.process(
                            inputs=self.input_list,
                            data_loaders=self.data_loaders,
                            output=self.output_list[0],
                        )
258
                    else:
259
260
261
262
263
264
265
266
267
                        try:
                            result = self.runner.process(
                                inputs=self.input_list,
                                data_loaders=self.data_loaders,
                                outputs=self.output_list,
                                loop_channel=self.loop_channel,
                            )
                        except Exception:
                            result = None
268
269

                if not result:
270
                    self.done({})
271
272
                    return False

273
274
275
        for output in self.output_list:
            output.close()

Philip ABBET's avatar
Philip ABBET committed
276
        missing_data_outputs = [x for x in self.output_list if x.isDataMissing()]
André Anjos's avatar
André Anjos committed
277

Philip ABBET's avatar
Philip ABBET committed
278
        if missing_data_outputs:
279
280
281
282
            raise RuntimeError(
                "Missing data on the following output(s): %s"
                % ", ".join([x.name for x in missing_data_outputs])
            )
André Anjos's avatar
André Anjos committed
283

Philip ABBET's avatar
Philip ABBET committed
284
285
        # Send the done command
        statistics = stats.io_statistics(self.data, self.input_list, self.output_list)
André Anjos's avatar
André Anjos committed
286

Philip ABBET's avatar
Philip ABBET committed
287
        logger.debug("Statistics: " + simplejson.dumps(statistics, indent=4))
André Anjos's avatar
André Anjos committed
288

Philip ABBET's avatar
Philip ABBET committed
289
        self.done(statistics)
André Anjos's avatar
André Anjos committed
290

Philip ABBET's avatar
Philip ABBET committed
291
        return True
André Anjos's avatar
André Anjos committed
292

Philip ABBET's avatar
Philip ABBET committed
293
294
    def done(self, statistics):
        """Indicates the infrastructure the execution is done"""
André Anjos's avatar
André Anjos committed
295

296
        if self.db_socket:
297
298
            logger.debug("send to db: (don) done")
            self.db_socket.send_string("don", zmq.SNDMORE)
299
300
301
            self.db_socket.send_string(simplejson.dumps(statistics))

            answer = self.db_socket.recv()  # ack
302
            logger.debug("recv from db: %s", answer)
303

304
        if self.loop_socket:
305
306
            logger.debug("send to loop: (don) done")
            self.loop_socket.send_string("don")
307
308

            answer = self.loop_socket.recv()  # ack
309
            logger.debug("recv from loop: %s", answer)
310

311
312
        logger.debug("send: (don) done")
        self.socket.send_string("don", zmq.SNDMORE)
313
        self.socket.send_string(simplejson.dumps(statistics))
314

315
        answer = self.socket.recv()  # ack
316
        logger.debug("recv: %s", answer)
André Anjos's avatar
André Anjos committed
317

Philip ABBET's avatar
Philip ABBET committed
318
319
320
    @property
    def schema_version(self):
        """Returns the schema version"""
321
        return self.data.get("schema_version", 1)
André Anjos's avatar
André Anjos committed
322

Philip ABBET's avatar
Philip ABBET committed
323
324
    @property
    def analysis(self):
325
326
327
        """A boolean that indicates if the current block is an analysis block
        """

328
        return "result" in self.data