docker.py 22.9 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

André Anjos's avatar
André Anjos committed
36

37
38
39
40
"""
======
docker
======
André Anjos's avatar
André Anjos committed
41

42
43
Execution utilities
"""
André Anjos's avatar
André Anjos committed
44
45

import os
46
import shutil
47
import logging
48
import requests
49
import simplejson as json
50

51
from beat.backend.python.execution import MessageHandler
52
from beat.backend.python.data import getAllFilenames
André Anjos's avatar
André Anjos committed
53

54
from .. import stats
55
from .. import utils
André Anjos's avatar
André Anjos committed
56

57
from .remote import RemoteExecutor
58

59

60
61
logger = logging.getLogger(__name__)

Samuel GAIST's avatar
Samuel GAIST committed
62

63
64
class DockerExecutor(RemoteExecutor):
    """DockerExecutor runs the code given an execution block information, externally
André Anjos's avatar
André Anjos committed
65
66


Philip ABBET's avatar
Philip ABBET committed
67
    Parameters:
André Anjos's avatar
André Anjos committed
68

André Anjos's avatar
André Anjos committed
69
70
71
      host (:py:class:`.dock.Host`): A configured docker host that will
        execute the user process. If the host does not have access to the
        required environment, an exception will be raised.
72

Philip ABBET's avatar
Philip ABBET committed
73
      prefix (str): Establishes the prefix of your installation.
André Anjos's avatar
André Anjos committed
74

Philip ABBET's avatar
Philip ABBET committed
75
76
77
78
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
André Anjos's avatar
André Anjos committed
79

André Anjos's avatar
André Anjos committed
80
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
81
82
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
André Anjos's avatar
André Anjos committed
83

André Anjos's avatar
André Anjos committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.
André Anjos's avatar
André Anjos committed
111
112


Philip ABBET's avatar
Philip ABBET committed
113
    Attributes:
André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
      cache (str): The path to the cache currently being used
André Anjos's avatar
André Anjos committed
116

Philip ABBET's avatar
Philip ABBET committed
117
118
      errors (list): A list containing errors found while loading this execution
        block.
André Anjos's avatar
André Anjos committed
119

Philip ABBET's avatar
Philip ABBET committed
120
121
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
André Anjos's avatar
André Anjos committed
122

André Anjos's avatar
André Anjos committed
123
124
      algorithm (.algorithm.Algorithm): An object representing the algorithm to
        be run.
André Anjos's avatar
André Anjos committed
125

Philip ABBET's avatar
Philip ABBET committed
126
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
127
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
128
129
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
130

Philip ABBET's avatar
Philip ABBET committed
131
132
133
134
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
135

André Anjos's avatar
André Anjos committed
136
137
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
André Anjos's avatar
André Anjos committed
138

André Anjos's avatar
André Anjos committed
139
140
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
André Anjos's avatar
André Anjos committed
141

Philip ABBET's avatar
Philip ABBET committed
142
143
      data_sources (list): A list with all data-sources created by our execution
        loader.
André Anjos's avatar
André Anjos committed
144

Philip ABBET's avatar
Philip ABBET committed
145
146
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
André Anjos's avatar
André Anjos committed
147

Philip ABBET's avatar
Philip ABBET committed
148
    """
André Anjos's avatar
André Anjos committed
149

150
151
152
    CONTAINER_PREFIX_PATH = "/beat/prefix"
    CONTAINER_CACHE_PATH = "/beat/cache"

Samuel GAIST's avatar
Samuel GAIST committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def __init__(
        self,
        host,
        prefix,
        data,
        cache=None,
        dataformat_cache=None,
        database_cache=None,
        algorithm_cache=None,
        library_cache=None,
        custom_root_folders=None,
    ):

        super(DockerExecutor, self).__init__(
            prefix,
            data,
            host.ip,
            cache=cache,
            dataformat_cache=dataformat_cache,
            database_cache=database_cache,
            algorithm_cache=algorithm_cache,
            library_cache=library_cache,
            custom_root_folders=custom_root_folders,
        )
André Anjos's avatar
André Anjos committed
177

Philip ABBET's avatar
Philip ABBET committed
178
179
        # Initialisations
        self.host = host
André Anjos's avatar
André Anjos committed
180

Samuel GAIST's avatar
Samuel GAIST committed
181
182
183
    def __create_db_container(
        self, datasets_uid, network_name, configuration_name=None
    ):
184
185
186
187
188
        # Configuration and needed files
        databases_configuration_path = utils.temporary_directory()
        self.dump_databases_provider_configuration(databases_configuration_path)

        # Modify the paths to the databases in the dumped configuration files
Samuel GAIST's avatar
Samuel GAIST committed
189
        root_folder = os.path.join(databases_configuration_path, "prefix", "databases")
190
191
192
193

        database_paths = {}

        for db_name in self.databases.keys():
Samuel GAIST's avatar
Samuel GAIST committed
194
            json_path = os.path.join(root_folder, db_name + ".json")
195

Samuel GAIST's avatar
Samuel GAIST committed
196
            with open(json_path, "r") as f:
197
                db_data = json.load(f)
198

Samuel GAIST's avatar
Samuel GAIST committed
199
200
            database_paths[db_name] = db_data["root_folder"]
            db_data["root_folder"] = os.path.join("/databases", db_name)
201

Samuel GAIST's avatar
Samuel GAIST committed
202
            with open(json_path, "w") as f:
203
                json.dump(db_data, f, indent=4)
204
205
206
207

        # Determine the docker image to use for the databases
        try:
            databases_environment = self.host.db2docker(database_paths.keys())
208
        except Exception:
Samuel GAIST's avatar
Samuel GAIST committed
209
210
211
212
213
214
215
216
            raise RuntimeError(
                "No environment found for the databases `%s' "
                "- available environments are %s"
                % (
                    ", ".join(database_paths.keys()),
                    ", ".join(self.host.db_environments.keys()),
                )
            )
217
218
219
220
221
222

        # Creation of the container
        # Note: we only support one databases image loaded at the same time
        database_port = utils.find_free_port()

        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
223
224
            "databases_provider",
            "0.0.0.0:%d" % database_port,
225
226
            self.CONTAINER_PREFIX_PATH,
            self.CONTAINER_CACHE_PATH,
227
228
        ]

229
230
231
        if configuration_name:
            cmd.append(configuration_name)

232
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
233
            cmd.insert(1, "--debug")
234
235
236
237
238
239
240
241

        databases_info_name = "beat_db_%s" % utils.id_generator()
        databases_info = self.host.create_container(databases_environment, cmd)
        databases_info.uid = datasets_uid
        databases_info.network_name = network_name
        databases_info.set_name(databases_info_name)

        # Specify the volumes to mount inside the container
242
243
244
245
        databases_info.add_volume(
            databases_configuration_path, self.CONTAINER_PREFIX_PATH
        )
        databases_info.add_volume(self.cache, self.CONTAINER_CACHE_PATH)
246
247

        for db_name, db_path in database_paths.items():
Samuel GAIST's avatar
Samuel GAIST committed
248
            databases_info.add_volume(db_path, os.path.join("/databases", db_name))
249
250
251
252

        # Start the container
        while True:
            try:
Samuel GAIST's avatar
Samuel GAIST committed
253
254
255
                databases_info.add_port(
                    database_port, database_port, host_address=self.host.ip
                )
256
257
258
259
260

                self.host.start(databases_info)

                break
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
261
                if str(e).find("port is already allocated") < 0:
262
263
264
265
266
                    break

                databases_info.reset_ports()
                database_port = utils.find_free_port()

Samuel GAIST's avatar
Samuel GAIST committed
267
268
269
270
                cmd = [
                    x if not x.startswith("0.0.0.0:") else "0.0.0.0:%d" % database_port
                    for x in cmd
                ]
271
272
273
274
275
276
277
                databases_info.command = cmd

        database_ip = self.host.get_ipaddress(databases_info)
        retval = dict(
            configuration_path=databases_configuration_path,
            container=databases_info,
            address=database_ip,
Samuel GAIST's avatar
Samuel GAIST committed
278
            port=database_port,
279
280
281
        )
        return retval

282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    def __setup_io_volumes(self, algorithm_container, configuration):
        """Setup all the volumes for input and output files.

        Parameters:

          algorithm_container: container that will execute an algorithm

          configuration: json object containing the algorithm parameters
        """

        for item in configuration["inputs"].values():
            file_path = item["path"]
            source_path = os.path.join(self.cache, file_path)

            if os.path.isfile(source_path):
                algorithm_container.add_volume(
                    source_path, os.path.join(self.CONTAINER_CACHE_PATH, file_path)
                )
            else:
                all_files = getAllFilenames(source_path)
                for file_list in all_files:
                    for file_ in file_list:
                        target_path = file_[len(self.cache) + 1 :]
                        cache_path = os.path.join(
                            self.CONTAINER_CACHE_PATH, target_path
                        )
                        algorithm_container.add_volume(file_, cache_path)

        def __add_writable_volume(file_path):
            output_folder = file_path[: file_path.rfind("/")]
            source_folder = os.path.join(self.cache, output_folder)
            if not os.path.exists(source_folder):
                os.makedirs(source_folder)
            algorithm_container.add_volume(
                source_folder,
                os.path.join(self.CONTAINER_CACHE_PATH, output_folder),
                read_only=False,
            )

        for item in configuration.get("outputs", {}).values():
            file_path = item["path"]
            __add_writable_volume(file_path)

        result = configuration.get("result")
        if result:
            file_path = result["path"]
            __add_writable_volume(file_path)

Samuel GAIST's avatar
Samuel GAIST committed
330
331
332
    def process(
        self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
    ):
Philip ABBET's avatar
Philip ABBET committed
333
        """Executes the user algorithm code using an external program.
André Anjos's avatar
André Anjos committed
334

Philip ABBET's avatar
Philip ABBET committed
335
336
        The execution interface follows the backend API as described in our
        documentation.
André Anjos's avatar
André Anjos committed
337

André Anjos's avatar
André Anjos committed
338
339
340
341
342
        We use green subprocesses this implementation. Each co-process is
        linked to us via 2 uni-directional pipes which work as datain and
        dataout end-points. The parent process (i.e. the current one)
        establishes the connection to the child and then can pass/receive
        commands, data and logs.
André Anjos's avatar
André Anjos committed
343

André Anjos's avatar
André Anjos committed
344
345
346
347
348
349
        Usage of the data pipes (datain, dataout) is **synchronous** - you send
        a command and block for an answer. The co-process is normally
        controlled by the current process, except for data requests, which are
        user-code driven.  The nature of our problem does not require an
        *asynchronous* implementation which, in turn, would require a much more
        complex set of dependencies (on asyncio or Twisted for example).
André Anjos's avatar
André Anjos committed
350
351


Philip ABBET's avatar
Philip ABBET committed
352
        Parameters:
André Anjos's avatar
André Anjos committed
353

André Anjos's avatar
André Anjos committed
354
355
356
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
André Anjos's avatar
André Anjos committed
357

André Anjos's avatar
André Anjos committed
358
359
360
361
362
363
364
365
366
367
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.

          timeout_in_minutes (:py:class:`int`, Optional): The number of minutes
            to wait for the user process to execute. After this amount of time,
            the user process is killed with ``signal.SIGKILL``. If set to zero,
            no timeout will be applied.
André Anjos's avatar
André Anjos committed
368
369


Philip ABBET's avatar
Philip ABBET committed
370
        Returns:
André Anjos's avatar
André Anjos committed
371

André Anjos's avatar
André Anjos committed
372
373
          dict: A dictionary which is JSON formattable containing the summary
          of this block execution.
André Anjos's avatar
André Anjos committed
374

Philip ABBET's avatar
Philip ABBET committed
375
        """
André Anjos's avatar
André Anjos committed
376

Philip ABBET's avatar
Philip ABBET committed
377
        if not self.valid:
Samuel GAIST's avatar
Samuel GAIST committed
378
379
380
            raise RuntimeError(
                "execution information is bogus:\n  * %s" % "\n  * ".join(self.errors)
            )
André Anjos's avatar
André Anjos committed
381

Philip ABBET's avatar
Philip ABBET committed
382
        # Determine the docker image to use for the processing
Samuel GAIST's avatar
Samuel GAIST committed
383
        processing_environment = "%(name)s (%(version)s)" % self.data["environment"]
Philip ABBET's avatar
Philip ABBET committed
384
        if processing_environment not in self.host:
Samuel GAIST's avatar
Samuel GAIST committed
385
386
387
388
389
390
391
392
393
            raise RuntimeError(
                "Environment `%s' is not available on docker "
                "host `%s' - available environments are %s"
                % (
                    processing_environment,
                    self.host,
                    ", ".join(self.host.processing_environments.keys()),
                )
            )
394

Philip ABBET's avatar
Philip ABBET committed
395
396
        # Creates the message handler
        algorithm_container = None
397

Philip ABBET's avatar
Philip ABBET committed
398
399
        def _kill():
            self.host.kill(algorithm_container)
400

401
        address = self.host.ip
Samuel GAIST's avatar
Samuel GAIST committed
402
        port_range = self.data.pop("port_range", None)
403
        if port_range:
Samuel GAIST's avatar
Samuel GAIST committed
404
            min_port, max_port = port_range.split(":")
405
            port = utils.find_free_port_in_range(int(min_port), int(max_port))
Samuel GAIST's avatar
Samuel GAIST committed
406
            address += ":{}".format(port)
407

408
        self.message_handler = MessageHandler(address, kill_callback=_kill)
409

Samuel GAIST's avatar
Samuel GAIST committed
410
        # ----- (If necessary) Instantiate the docker container that provide the databases
411

Samuel GAIST's avatar
Samuel GAIST committed
412
413
        datasets_uid = self.data.pop("datasets_uid", None)
        network_name = self.data.pop("network_name", "bridge")
414
        databases_infos = {}
415

416
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
417
418
419
            databases_infos["db"] = self.__create_db_container(
                datasets_uid, network_name
            )
420

Samuel GAIST's avatar
Samuel GAIST committed
421
        # ----- Instantiate the algorithm container
422

423
424
425
        # Configuration and needed files
        configuration_path = utils.temporary_directory()
        self.dump_runner_configuration(configuration_path)
426

427
428
429
        loop_algorithm_container = None
        loop_algorithm_container_ip = None
        loop_algorithm_container_port = None
430

431
432
        if self.loop_algorithm is not None:
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
433
434
435
                databases_infos["loop_db"] = self.__create_db_container(
                    datasets_uid, network_name, "loop"
                )
436

437
            loop_algorithm_container_port = utils.find_free_port()
Philip ABBET's avatar
Philip ABBET committed
438
            cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
439
440
                "loop_execute",
                "0.0.0.0:{}".format(loop_algorithm_container_port),
441
442
                self.CONTAINER_PREFIX_PATH,
                self.CONTAINER_CACHE_PATH,
Philip ABBET's avatar
Philip ABBET committed
443
            ]
444

445
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
446
447
448
449
450
451
                cmd.append(
                    "tcp://{}:{}".format(
                        databases_infos["loop_db"]["address"],
                        databases_infos["loop_db"]["port"],
                    )
                )
452

453
            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
454
                cmd.insert(1, "--debug")
455

Samuel GAIST's avatar
Samuel GAIST committed
456
457
458
            loop_algorithm_container = self.host.create_container(
                processing_environment, cmd
            )
459
460
            loop_algorithm_container.uid = datasets_uid
            loop_algorithm_container.network_name = network_name
461

462
            # Volumes
Samuel GAIST's avatar
Samuel GAIST committed
463
            loop_algorithm_container.add_volume(
464
465
                configuration_path, self.CONTAINER_PREFIX_PATH
            )
466
            self.__setup_io_volumes(loop_algorithm_container, self.data["loop"])
467

Philip ABBET's avatar
Philip ABBET committed
468
            # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
469
470
471
472
473
474
475
            self.host.start(
                loop_algorithm_container,
                virtual_memory_in_megabytes=virtual_memory_in_megabytes,
                max_cpu_percent=max_cpu_percent,
            )
            loop_algorithm_container_ip = self.host.get_ipaddress(
                loop_algorithm_container
476
            )
477

Philip ABBET's avatar
Philip ABBET committed
478
479
        # Command to execute
        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
480
            "execute",
481
            "--cache={}".format(self.CONTAINER_CACHE_PATH),
482
            self.message_handler.address,
483
            self.CONTAINER_PREFIX_PATH,
Philip ABBET's avatar
Philip ABBET committed
484
        ]
485

486
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
487
488
489
490
            cmd.append(
                "tcp://%s:%d"
                % (databases_infos["db"]["address"], databases_infos["db"]["port"])
            )
491
492

        if self.loop_algorithm is not None:
Samuel GAIST's avatar
Samuel GAIST committed
493
            cmd.append(
494
                "--loop=tcp://%s:%d"
Samuel GAIST's avatar
Samuel GAIST committed
495
496
                % (loop_algorithm_container_ip, loop_algorithm_container_port)
            )
497

Philip ABBET's avatar
Philip ABBET committed
498
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
499
            cmd.insert(1, "--debug")
500

Philip ABBET's avatar
Philip ABBET committed
501
502
        # Creation of the container
        algorithm_container = self.host.create_container(processing_environment, cmd)
503
        algorithm_container.uid = datasets_uid
504
        algorithm_container.network_name = network_name
505

Philip ABBET's avatar
Philip ABBET committed
506
        # Volumes
507
        algorithm_container.add_volume(configuration_path, self.CONTAINER_PREFIX_PATH)
508
        self.__setup_io_volumes(algorithm_container, self.data)
509

Philip ABBET's avatar
Philip ABBET committed
510
        # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
511
512
513
514
        self.host.start(
            algorithm_container,
            virtual_memory_in_megabytes=virtual_memory_in_megabytes,
            max_cpu_percent=max_cpu_percent,
Philip ABBET's avatar
Philip ABBET committed
515
        )
516

Philip ABBET's avatar
Philip ABBET committed
517
518
        # Process the messages until the container is done
        self.message_handler.start()
519

Philip ABBET's avatar
Philip ABBET committed
520
        timed_out = False
521

Philip ABBET's avatar
Philip ABBET committed
522
523
524
        try:
            timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None
            status = self.host.wait(algorithm_container, timeout)
525

Philip ABBET's avatar
Philip ABBET committed
526
        except requests.exceptions.ReadTimeout:
Samuel GAIST's avatar
Samuel GAIST committed
527
528
529
            logger.warn(
                "user process has timed out after %d minutes", timeout_in_minutes
            )
530
531
            timed_out = True

Philip ABBET's avatar
Philip ABBET committed
532
533
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
534

Samuel GAIST's avatar
Samuel GAIST committed
535
        except KeyboardInterrupt:  # Developer pushed CTRL-C
Philip ABBET's avatar
Philip ABBET committed
536
537
538
            logger.info("stopping user process on CTRL-C console request")
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
539

540
        finally:
541
542
            for name, databases_info in databases_infos.items():
                logger.debug("Stopping database container " + name)
Samuel GAIST's avatar
Samuel GAIST committed
543
                container = databases_info["container"]
544
545
                self.host.kill(container)
                self.host.wait(container)
546

Philip ABBET's avatar
Philip ABBET committed
547
548
            self.message_handler.stop.set()
            self.message_handler.join()
549

Philip ABBET's avatar
Philip ABBET committed
550
        # Collects final information and returns to caller
551
        container_log = self.host.logs(algorithm_container)
552
553

        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
554
            stdout = ""
555
            stderr = container_log
556
557
        else:
            stdout = container_log
Samuel GAIST's avatar
Samuel GAIST committed
558
            stderr = ""
559
560
561
562

        if logger.getEffectiveLevel() <= logging.DEBUG:
            logger.debug("Log of the container: " + container_log)

Philip ABBET's avatar
Philip ABBET committed
563
        retval = dict(
Samuel GAIST's avatar
Samuel GAIST committed
564
565
566
567
568
569
570
            status=status,
            stdout=stdout,
            stderr=stderr,
            timed_out=timed_out,
            statistics=self.host.statistics(algorithm_container),
            system_error=self.message_handler.system_error,
            user_error=self.message_handler.user_error,
Philip ABBET's avatar
Philip ABBET committed
571
        )
572

Samuel GAIST's avatar
Samuel GAIST committed
573
574
        retval["statistics"]["data"] = self.message_handler.statistics
        stats.update(retval["statistics"]["data"], self.io_statistics)
575

Philip ABBET's avatar
Philip ABBET committed
576
        self.host.rm(algorithm_container)
André Anjos's avatar
André Anjos committed
577

578
        for name, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
579
            container = databases_info["container"]
580
            db_container_log = self.host.logs(container)
581
582

            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
583
584
585
                logger.debug(
                    "Log of the" + name + "database container: " + db_container_log
                )
586

587
            if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
588
                retval["stderr"] += "\n" + db_container_log
589
            else:
Samuel GAIST's avatar
Samuel GAIST committed
590
                retval["stdout"] += "\n" + db_container_log
591

592
593
594
595
            self.host.rm(container)

        if loop_algorithm_container:
            self.host.rm(loop_algorithm_container)
André Anjos's avatar
André Anjos committed
596

Philip ABBET's avatar
Philip ABBET committed
597
598
        self.message_handler.destroy()
        self.message_handler = None
André Anjos's avatar
André Anjos committed
599

600
        if not self.debug:
601
            for _, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
602
                shutil.rmtree(databases_info["configuration_path"])
603
604
            shutil.rmtree(configuration_path)

Philip ABBET's avatar
Philip ABBET committed
605
        return retval