local.py 14.3 KB
Newer Older
André Anjos's avatar
André Anjos committed
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

André Anjos's avatar
André Anjos committed
36

37 38 39 40 41 42 43
"""
=====
local
=====

Execution utilities
"""
André Anjos's avatar
André Anjos committed
44 45 46 47 48


import os
import sys
import tempfile
49
import shutil
50
import zmq
51
import time
52
import logging
53

54 55 56 57
from beat.backend.python.execution import AlgorithmExecutor
from beat.backend.python.execution import MessageHandler
from beat.backend.python.execution import LoopExecutor
from beat.backend.python.execution import LoopMessageHandler
58

59 60 61 62
from .base import BaseExecutor

logger = logging.getLogger(__name__)

63

64
class LocalExecutor(BaseExecutor):
65
    """LocalExecutor runs the code given an execution block information
André Anjos's avatar
André Anjos committed
66 67


Philip ABBET's avatar
Philip ABBET committed
68
    Parameters:
André Anjos's avatar
André Anjos committed
69

70
      prefix (str): Establishes the prefix of your installation.
André Anjos's avatar
André Anjos committed
71

Philip ABBET's avatar
Philip ABBET committed
72 73 74 75
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
André Anjos's avatar
André Anjos committed
76

André Anjos's avatar
André Anjos committed
77
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
78 79
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
André Anjos's avatar
André Anjos committed
80

André Anjos's avatar
André Anjos committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.

      custom_root_folders (:py:class:`dict`, Optional): A dictionary where the
        keys are database identifiers (``<db_name>/<version>``) and the values
        are paths to the given database's files. These values will override the
        value found in the database's metadata.
113

André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
    Attributes:
André Anjos's avatar
André Anjos committed
116

Philip ABBET's avatar
Philip ABBET committed
117
      cache (str): The path to the cache currently being used
André Anjos's avatar
André Anjos committed
118

Philip ABBET's avatar
Philip ABBET committed
119 120
      errors (list): A list containing errors found while loading this execution
        block.
André Anjos's avatar
André Anjos committed
121

Philip ABBET's avatar
Philip ABBET committed
122 123
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
André Anjos's avatar
André Anjos committed
124

André Anjos's avatar
André Anjos committed
125
      algorithm (.algorithm.Algorithm): An object representing the
Philip ABBET's avatar
Philip ABBET committed
126
        algorithm to be run.
André Anjos's avatar
André Anjos committed
127

Philip ABBET's avatar
Philip ABBET committed
128
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
129
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
130 131
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
132

Philip ABBET's avatar
Philip ABBET committed
133 134 135 136
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
137

André Anjos's avatar
André Anjos committed
138 139
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
André Anjos's avatar
André Anjos committed
140

André Anjos's avatar
André Anjos committed
141 142
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
André Anjos's avatar
André Anjos committed
143

Philip ABBET's avatar
Philip ABBET committed
144 145
      data_sources (list): A list with all data-sources created by our execution
        loader.
André Anjos's avatar
André Anjos committed
146

Philip ABBET's avatar
Philip ABBET committed
147 148
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
André Anjos's avatar
André Anjos committed
149

Philip ABBET's avatar
Philip ABBET committed
150 151 152 153
      custom_root_folders (dict): A dictionary where the keys are database
        identifiers (`<db_name>/<version>`) and the values are paths to the
        given database's files. These values will override the value found
        in the database's metadata.
154

Philip ABBET's avatar
Philip ABBET committed
155
    """
André Anjos's avatar
André Anjos committed
156

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    def __init__(
        self,
        prefix,
        data,
        cache=None,
        dataformat_cache=None,
        database_cache=None,
        algorithm_cache=None,
        library_cache=None,
        custom_root_folders=None,
    ):

        super(LocalExecutor, self).__init__(
            prefix,
            data,
            cache=cache,
            dataformat_cache=dataformat_cache,
            database_cache=database_cache,
            algorithm_cache=algorithm_cache,
            library_cache=library_cache,
            custom_root_folders=custom_root_folders,
        )
179

180 181 182 183 184 185 186 187 188 189 190 191 192 193
        self.working_dir = None
        self.executor = None
        self.message_handler = None
        self.executor_socket = None

        self.loop_executor = None
        self.loop_message_handler = None
        self.loop_socket = None

        self.zmq_context = None

    def __cleanup(self):
        if self.loop_executor:
            self.loop_executor.wait()
194
            self.loop_executor.close()
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212

        for handler in [self.message_handler, self.loop_message_handler]:
            if handler:
                handler.kill()
                handler.join()
                handler.destroy()

        for socket in [self.executor_socket, self.loop_socket]:
            if socket:
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()

        if self.zmq_context is not None:
            self.zmq_context.destroy()

        if self.working_dir is not None:
            shutil.rmtree(self.working_dir)

213 214 215
    def process(
        self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
    ):
Philip ABBET's avatar
Philip ABBET committed
216
        """Executes the user algorithm code
André Anjos's avatar
André Anjos committed
217

Philip ABBET's avatar
Philip ABBET committed
218
        Parameters:
André Anjos's avatar
André Anjos committed
219

André Anjos's avatar
André Anjos committed
220 221 222
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
André Anjos's avatar
André Anjos committed
223

André Anjos's avatar
André Anjos committed
224 225 226 227 228
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.
André Anjos's avatar
André Anjos committed
229

Philip ABBET's avatar
Philip ABBET committed
230 231
          timeout_in_minutes (int): The number of minutes to wait for the user
            process to execute. After this amount of time, the user process is
André Anjos's avatar
André Anjos committed
232 233
            killed with ``signal.SIGKILL``. If set to zero, no timeout will be
            applied.
André Anjos's avatar
André Anjos committed
234

Philip ABBET's avatar
Philip ABBET committed
235
        Returns:
André Anjos's avatar
André Anjos committed
236

Philip ABBET's avatar
Philip ABBET committed
237 238
          dict: A dictionary which is JSON formattable containing the summary of
            this block execution.
André Anjos's avatar
André Anjos committed
239

Philip ABBET's avatar
Philip ABBET committed
240
        """
André Anjos's avatar
André Anjos committed
241

242
        def _create_result(status, error_message=""):
Philip ABBET's avatar
Philip ABBET committed
243
            return {
244 245 246 247 248 249 250 251 252
                "status": status,
                "statistics": {
                    "data": self.io_statistics,
                    "cpu": {
                        "user": 0.0,
                        "system": 0.0,
                        "total": 0.0,
                        "percent": 0.0,
                        "processors": 1,
253
                    },
254
                    "memory": {"rss": 0, "limit": 0, "percent": 0.0},
Philip ABBET's avatar
Philip ABBET committed
255
                },
256 257 258 259 260
                "stderr": "",
                "stdout": "",
                "system_error": "",
                "user_error": error_message,
                "timed_out": False,
Philip ABBET's avatar
Philip ABBET committed
261
            }
262

Philip ABBET's avatar
Philip ABBET committed
263 264
        def _process_exception(exception, prefix, contribution_kind):
            import traceback
265

Philip ABBET's avatar
Philip ABBET committed
266 267
            exc_type, exc_value, exc_traceback = sys.exc_info()
            tb = traceback.extract_tb(exc_traceback)
268

269
            contributions_prefix = os.path.join(prefix, contribution_kind) + os.sep
270

Philip ABBET's avatar
Philip ABBET committed
271 272 273
            for first_line, line in enumerate(tb):
                if line[0].startswith(contributions_prefix):
                    break
274

Philip ABBET's avatar
Philip ABBET committed
275 276
            if first_line == len(tb):
                first_line = 0
277

278 279
            s = "".join(traceback.format_list(tb[first_line:]))
            s = s.replace(contributions_prefix, "").strip()
280

Philip ABBET's avatar
Philip ABBET committed
281
            return "%s\n%s: %s" % (s, type(exception).__name__, exception)
282

Philip ABBET's avatar
Philip ABBET committed
283
        if not self.valid:
284 285 286
            raise RuntimeError(
                "execution information is bogus:\n  * %s" % "\n  * ".join(self.errors)
            )
André Anjos's avatar
André Anjos committed
287

288
        self.message_handler = MessageHandler("127.0.0.1")
289
        self.message_handler.start()
290

291 292 293
        self.zmq_context = zmq.Context()
        self.executor_socket = self.zmq_context.socket(zmq.PAIR)
        self.executor_socket.connect(self.message_handler.address)
294

295
        self.working_dir = tempfile.mkdtemp(prefix=__name__)
296
        working_prefix = os.path.join(self.working_dir, "prefix")
297

298
        self.dump_runner_configuration(self.working_dir)
299 300
        self.algorithm.export(working_prefix)

301 302
        if self.loop_algorithm:
            self.loop_algorithm.export(working_prefix)
303
            self.loop_message_handler = LoopMessageHandler("127.0.0.1")
304 305 306
            self.loop_socket = self.zmq_context.socket(zmq.PAIR)
            self.loop_socket.connect(self.loop_message_handler.address)

307 308 309 310 311 312
            self.loop_executor = LoopExecutor(
                self.loop_message_handler,
                self.working_dir,
                database_cache=self.databases,
                cache_root=self.cache,
            )
313 314 315 316

            retval = self.loop_executor.setup()
            if not retval:
                self.__cleanup()
317 318 319
                raise RuntimeError(
                    "Loop algorithm {} setup failed".format(self.algorithm.name)
                )
320 321 322 323

            prepared = self.loop_executor.prepare()
            if not prepared:
                self.__cleanup()
324 325 326
                raise RuntimeError(
                    "Loop algorithm {} prepare failed".format(self.algorithm.name)
                )
327 328 329

            self.loop_executor.process()

330 331 332 333 334 335 336
        self.executor = AlgorithmExecutor(
            self.executor_socket,
            self.working_dir,
            database_cache=self.databases,
            cache_root=self.cache,
            loop_socket=self.loop_socket,
        )
337

338 339 340 341 342 343
        try:
            status = self.executor.setup()
        except Exception:
            status = 0

        if not status:
344
            self.__cleanup()
345
            raise RuntimeError("Algorithm {} setup failed".format(self.algorithm.name))
346

347
        prepared = self.executor.prepare()
348
        if not prepared:
349
            self.__cleanup()
350 351 352
            raise RuntimeError(
                "Algorithm {} prepare failed".format(self.algorithm.name)
            )
André Anjos's avatar
André Anjos committed
353

Philip ABBET's avatar
Philip ABBET committed
354
        _start = time.time()
355 356

        try:
357
            processed = self.executor.process()
Philip ABBET's avatar
Philip ABBET committed
358
        except Exception as e:
359
            message = _process_exception(e, self.prefix, "databases")
360
            self.__cleanup()
Philip ABBET's avatar
Philip ABBET committed
361
            return _create_result(1, message)
362

363
        if not processed:
364
            self.__cleanup()
365 366 367
            raise RuntimeError(
                "Algorithm {} process failed".format(self.algorithm.name)
            )
368

Philip ABBET's avatar
Philip ABBET committed
369
        proc_time = time.time() - _start
370

Philip ABBET's avatar
Philip ABBET committed
371
        # some local information
372
        logger.debug("Total processing time was %.3f seconds", proc_time)
373

374
        self.__cleanup()
375

Philip ABBET's avatar
Philip ABBET committed
376
        return _create_result(0)