docker.py 23.7 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

André Anjos's avatar
André Anjos committed
36

37
38
39
40
"""
======
docker
======
André Anjos's avatar
André Anjos committed
41

42
43
Execution utilities
"""
André Anjos's avatar
André Anjos committed
44
45

import os
46
import shutil
47
import logging
48
import requests
49
import simplejson as json
50

51
from beat.backend.python.execution import MessageHandler
52
from beat.backend.python.data import getAllFilenames
André Anjos's avatar
André Anjos committed
53

54
from .. import stats
55
from .. import utils
André Anjos's avatar
André Anjos committed
56

57
from .remote import RemoteExecutor
58

59

60
61
logger = logging.getLogger(__name__)

Samuel GAIST's avatar
Samuel GAIST committed
62

63
64
class DockerExecutor(RemoteExecutor):
    """DockerExecutor runs the code given an execution block information, externally
André Anjos's avatar
André Anjos committed
65
66


Philip ABBET's avatar
Philip ABBET committed
67
    Parameters:
André Anjos's avatar
André Anjos committed
68

André Anjos's avatar
André Anjos committed
69
70
71
      host (:py:class:`.dock.Host`): A configured docker host that will
        execute the user process. If the host does not have access to the
        required environment, an exception will be raised.
72

Philip ABBET's avatar
Philip ABBET committed
73
      prefix (str): Establishes the prefix of your installation.
André Anjos's avatar
André Anjos committed
74

Philip ABBET's avatar
Philip ABBET committed
75
76
77
78
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
André Anjos's avatar
André Anjos committed
79

André Anjos's avatar
André Anjos committed
80
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
81
82
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
André Anjos's avatar
André Anjos committed
83

André Anjos's avatar
André Anjos committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.
André Anjos's avatar
André Anjos committed
111
112


Philip ABBET's avatar
Philip ABBET committed
113
    Attributes:
André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
      cache (str): The path to the cache currently being used
André Anjos's avatar
André Anjos committed
116

Philip ABBET's avatar
Philip ABBET committed
117
118
      errors (list): A list containing errors found while loading this execution
        block.
André Anjos's avatar
André Anjos committed
119

Philip ABBET's avatar
Philip ABBET committed
120
121
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
André Anjos's avatar
André Anjos committed
122

André Anjos's avatar
André Anjos committed
123
124
      algorithm (.algorithm.Algorithm): An object representing the algorithm to
        be run.
André Anjos's avatar
André Anjos committed
125

Philip ABBET's avatar
Philip ABBET committed
126
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
127
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
128
129
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
130

Philip ABBET's avatar
Philip ABBET committed
131
132
133
134
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
135

André Anjos's avatar
André Anjos committed
136
137
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
André Anjos's avatar
André Anjos committed
138

André Anjos's avatar
André Anjos committed
139
140
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
André Anjos's avatar
André Anjos committed
141

Philip ABBET's avatar
Philip ABBET committed
142
143
      data_sources (list): A list with all data-sources created by our execution
        loader.
André Anjos's avatar
André Anjos committed
144

Philip ABBET's avatar
Philip ABBET committed
145
146
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
André Anjos's avatar
André Anjos committed
147

Philip ABBET's avatar
Philip ABBET committed
148
    """
André Anjos's avatar
André Anjos committed
149

150
151
152
    CONTAINER_PREFIX_PATH = "/beat/prefix"
    CONTAINER_CACHE_PATH = "/beat/cache"

Samuel GAIST's avatar
Samuel GAIST committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def __init__(
        self,
        host,
        prefix,
        data,
        cache=None,
        dataformat_cache=None,
        database_cache=None,
        algorithm_cache=None,
        library_cache=None,
        custom_root_folders=None,
    ):

        super(DockerExecutor, self).__init__(
            prefix,
            data,
            host.ip,
            cache=cache,
            dataformat_cache=dataformat_cache,
            database_cache=database_cache,
            algorithm_cache=algorithm_cache,
            library_cache=library_cache,
            custom_root_folders=custom_root_folders,
        )
André Anjos's avatar
André Anjos committed
177

Philip ABBET's avatar
Philip ABBET committed
178
179
        # Initialisations
        self.host = host
André Anjos's avatar
André Anjos committed
180

Samuel GAIST's avatar
Samuel GAIST committed
181
182
183
    def __create_db_container(
        self, datasets_uid, network_name, configuration_name=None
    ):
184
185
186
187
188
        # Configuration and needed files
        databases_configuration_path = utils.temporary_directory()
        self.dump_databases_provider_configuration(databases_configuration_path)

        # Modify the paths to the databases in the dumped configuration files
Samuel GAIST's avatar
Samuel GAIST committed
189
        root_folder = os.path.join(databases_configuration_path, "prefix", "databases")
190
191
192
193

        database_paths = {}

        for db_name in self.databases.keys():
Samuel GAIST's avatar
Samuel GAIST committed
194
            json_path = os.path.join(root_folder, db_name + ".json")
195

Samuel GAIST's avatar
Samuel GAIST committed
196
            with open(json_path, "r") as f:
197
                db_data = json.load(f)
198

Samuel GAIST's avatar
Samuel GAIST committed
199
200
            database_paths[db_name] = db_data["root_folder"]
            db_data["root_folder"] = os.path.join("/databases", db_name)
201

Samuel GAIST's avatar
Samuel GAIST committed
202
            with open(json_path, "w") as f:
203
                json.dump(db_data, f, indent=4)
204
205
206
207

        # Determine the docker image to use for the databases
        try:
            databases_environment = self.host.db2docker(database_paths.keys())
208
        except Exception:
Samuel GAIST's avatar
Samuel GAIST committed
209
210
211
212
213
214
215
216
            raise RuntimeError(
                "No environment found for the databases `%s' "
                "- available environments are %s"
                % (
                    ", ".join(database_paths.keys()),
                    ", ".join(self.host.db_environments.keys()),
                )
            )
217
218
219
220
221
222

        # Creation of the container
        # Note: we only support one databases image loaded at the same time
        database_port = utils.find_free_port()

        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
223
224
            "databases_provider",
            "0.0.0.0:%d" % database_port,
225
226
            self.CONTAINER_PREFIX_PATH,
            self.CONTAINER_CACHE_PATH,
227
228
        ]

229
230
231
        if configuration_name:
            cmd.append(configuration_name)

232
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
233
            cmd.insert(1, "--debug")
234
235
236
237
238
239
240
241

        databases_info_name = "beat_db_%s" % utils.id_generator()
        databases_info = self.host.create_container(databases_environment, cmd)
        databases_info.uid = datasets_uid
        databases_info.network_name = network_name
        databases_info.set_name(databases_info_name)

        # Specify the volumes to mount inside the container
242
243
244
245
        databases_info.add_volume(
            databases_configuration_path, self.CONTAINER_PREFIX_PATH
        )
        databases_info.add_volume(self.cache, self.CONTAINER_CACHE_PATH)
246
247

        for db_name, db_path in database_paths.items():
Samuel GAIST's avatar
Samuel GAIST committed
248
            databases_info.add_volume(db_path, os.path.join("/databases", db_name))
249
250
251
252

        # Start the container
        while True:
            try:
Samuel GAIST's avatar
Samuel GAIST committed
253
254
255
                databases_info.add_port(
                    database_port, database_port, host_address=self.host.ip
                )
256
257
258
259
260

                self.host.start(databases_info)

                break
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
261
                if str(e).find("port is already allocated") < 0:
262
263
264
265
266
                    break

                databases_info.reset_ports()
                database_port = utils.find_free_port()

Samuel GAIST's avatar
Samuel GAIST committed
267
268
269
270
                cmd = [
                    x if not x.startswith("0.0.0.0:") else "0.0.0.0:%d" % database_port
                    for x in cmd
                ]
271
272
273
274
275
276
277
                databases_info.command = cmd

        database_ip = self.host.get_ipaddress(databases_info)
        retval = dict(
            configuration_path=databases_configuration_path,
            container=databases_info,
            address=database_ip,
Samuel GAIST's avatar
Samuel GAIST committed
278
            port=database_port,
279
280
281
        )
        return retval

282
283
284
    def __setup_io_volumes(
        self, algorithm_container, docker_cache_mount_point, configuration
    ):
285
286
287
288
289
290
291
292
293
294
295
296
297
        """Setup all the volumes for input and output files.

        Parameters:

          algorithm_container: container that will execute an algorithm

          configuration: json object containing the algorithm parameters
        """

        for item in configuration["inputs"].values():
            file_path = item["path"]
            source_path = os.path.join(self.cache, file_path)

298
299
300
301
302
303
304
305
306
307
308
309
310
311
            if docker_cache_mount_point is None:
                if os.path.isfile(source_path):
                    algorithm_container.add_volume(
                        source_path, os.path.join(self.CONTAINER_CACHE_PATH, file_path)
                    )
                else:
                    all_files = getAllFilenames(source_path)
                    for file_list in all_files:
                        for file_ in file_list:
                            target_path = file_[len(self.cache) + 1 :]
                            cache_path = os.path.join(
                                self.CONTAINER_CACHE_PATH, target_path
                            )
                            algorithm_container.add_volume(file_, cache_path)
312
            else:
313
314
315
316
                input_folder = file_path[: file_path.rfind("/")]
                source_folder = os.path.join(docker_cache_mount_point, input_folder)
                target_folder = os.path.join(self.CONTAINER_CACHE_PATH, input_folder)
                algorithm_container.add_volume(source_folder, target_folder)
317
318
319
320
321
322

        def __add_writable_volume(file_path):
            output_folder = file_path[: file_path.rfind("/")]
            source_folder = os.path.join(self.cache, output_folder)
            if not os.path.exists(source_folder):
                os.makedirs(source_folder)
323
324
325
326

            if docker_cache_mount_point is not None:
                source_folder = os.path.join(docker_cache_mount_point, output_folder)

327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
            algorithm_container.add_volume(
                source_folder,
                os.path.join(self.CONTAINER_CACHE_PATH, output_folder),
                read_only=False,
            )

        for item in configuration.get("outputs", {}).values():
            file_path = item["path"]
            __add_writable_volume(file_path)

        result = configuration.get("result")
        if result:
            file_path = result["path"]
            __add_writable_volume(file_path)

Samuel GAIST's avatar
Samuel GAIST committed
342
343
344
    def process(
        self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
    ):
Philip ABBET's avatar
Philip ABBET committed
345
        """Executes the user algorithm code using an external program.
André Anjos's avatar
André Anjos committed
346

Philip ABBET's avatar
Philip ABBET committed
347
348
        The execution interface follows the backend API as described in our
        documentation.
André Anjos's avatar
André Anjos committed
349

André Anjos's avatar
André Anjos committed
350
351
352
353
354
        We use green subprocesses this implementation. Each co-process is
        linked to us via 2 uni-directional pipes which work as datain and
        dataout end-points. The parent process (i.e. the current one)
        establishes the connection to the child and then can pass/receive
        commands, data and logs.
André Anjos's avatar
André Anjos committed
355

André Anjos's avatar
André Anjos committed
356
357
358
359
360
361
        Usage of the data pipes (datain, dataout) is **synchronous** - you send
        a command and block for an answer. The co-process is normally
        controlled by the current process, except for data requests, which are
        user-code driven.  The nature of our problem does not require an
        *asynchronous* implementation which, in turn, would require a much more
        complex set of dependencies (on asyncio or Twisted for example).
André Anjos's avatar
André Anjos committed
362
363


Philip ABBET's avatar
Philip ABBET committed
364
        Parameters:
André Anjos's avatar
André Anjos committed
365

André Anjos's avatar
André Anjos committed
366
367
368
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
André Anjos's avatar
André Anjos committed
369

André Anjos's avatar
André Anjos committed
370
371
372
373
374
375
376
377
378
379
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.

          timeout_in_minutes (:py:class:`int`, Optional): The number of minutes
            to wait for the user process to execute. After this amount of time,
            the user process is killed with ``signal.SIGKILL``. If set to zero,
            no timeout will be applied.
André Anjos's avatar
André Anjos committed
380
381


Philip ABBET's avatar
Philip ABBET committed
382
        Returns:
André Anjos's avatar
André Anjos committed
383

André Anjos's avatar
André Anjos committed
384
385
          dict: A dictionary which is JSON formattable containing the summary
          of this block execution.
André Anjos's avatar
André Anjos committed
386

Philip ABBET's avatar
Philip ABBET committed
387
        """
André Anjos's avatar
André Anjos committed
388

Philip ABBET's avatar
Philip ABBET committed
389
        if not self.valid:
Samuel GAIST's avatar
Samuel GAIST committed
390
391
392
            raise RuntimeError(
                "execution information is bogus:\n  * %s" % "\n  * ".join(self.errors)
            )
André Anjos's avatar
André Anjos committed
393

Philip ABBET's avatar
Philip ABBET committed
394
        # Determine the docker image to use for the processing
Samuel GAIST's avatar
Samuel GAIST committed
395
        processing_environment = "%(name)s (%(version)s)" % self.data["environment"]
Philip ABBET's avatar
Philip ABBET committed
396
        if processing_environment not in self.host:
Samuel GAIST's avatar
Samuel GAIST committed
397
398
399
400
401
402
403
404
405
            raise RuntimeError(
                "Environment `%s' is not available on docker "
                "host `%s' - available environments are %s"
                % (
                    processing_environment,
                    self.host,
                    ", ".join(self.host.processing_environments.keys()),
                )
            )
406

Philip ABBET's avatar
Philip ABBET committed
407
408
        # Creates the message handler
        algorithm_container = None
409

Philip ABBET's avatar
Philip ABBET committed
410
411
        def _kill():
            self.host.kill(algorithm_container)
412

413
        address = self.host.ip
Samuel GAIST's avatar
Samuel GAIST committed
414
        port_range = self.data.pop("port_range", None)
415
        if port_range:
Samuel GAIST's avatar
Samuel GAIST committed
416
            min_port, max_port = port_range.split(":")
417
            port = utils.find_free_port_in_range(int(min_port), int(max_port))
Samuel GAIST's avatar
Samuel GAIST committed
418
            address += ":{}".format(port)
419

420
421
        volume_cache_mount_point = self.data.pop("cache_mount_point", None)

422
        self.message_handler = MessageHandler(address, kill_callback=_kill)
423

Samuel GAIST's avatar
Samuel GAIST committed
424
        # ----- (If necessary) Instantiate the docker container that provide the databases
425

Samuel GAIST's avatar
Samuel GAIST committed
426
427
        datasets_uid = self.data.pop("datasets_uid", None)
        network_name = self.data.pop("network_name", "bridge")
428
        databases_infos = {}
429

430
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
431
432
433
            databases_infos["db"] = self.__create_db_container(
                datasets_uid, network_name
            )
434

Samuel GAIST's avatar
Samuel GAIST committed
435
        # ----- Instantiate the algorithm container
436

437
438
439
        # Configuration and needed files
        configuration_path = utils.temporary_directory()
        self.dump_runner_configuration(configuration_path)
440

441
442
443
        loop_algorithm_container = None
        loop_algorithm_container_ip = None
        loop_algorithm_container_port = None
444

445
446
        if self.loop_algorithm is not None:
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
447
448
449
                databases_infos["loop_db"] = self.__create_db_container(
                    datasets_uid, network_name, "loop"
                )
450

451
            loop_algorithm_container_port = utils.find_free_port()
Philip ABBET's avatar
Philip ABBET committed
452
            cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
453
454
                "loop_execute",
                "0.0.0.0:{}".format(loop_algorithm_container_port),
455
456
                self.CONTAINER_PREFIX_PATH,
                self.CONTAINER_CACHE_PATH,
Philip ABBET's avatar
Philip ABBET committed
457
            ]
458

459
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
460
461
462
463
464
465
                cmd.append(
                    "tcp://{}:{}".format(
                        databases_infos["loop_db"]["address"],
                        databases_infos["loop_db"]["port"],
                    )
                )
466

467
            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
468
                cmd.insert(1, "--debug")
469

Samuel GAIST's avatar
Samuel GAIST committed
470
471
472
            loop_algorithm_container = self.host.create_container(
                processing_environment, cmd
            )
473
474
            loop_algorithm_container.uid = datasets_uid
            loop_algorithm_container.network_name = network_name
475

476
            # Volumes
Samuel GAIST's avatar
Samuel GAIST committed
477
            loop_algorithm_container.add_volume(
478
479
                configuration_path, self.CONTAINER_PREFIX_PATH
            )
480
481
482
            self.__setup_io_volumes(
                loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
            )
483

Philip ABBET's avatar
Philip ABBET committed
484
            # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
485
486
487
488
489
490
491
            self.host.start(
                loop_algorithm_container,
                virtual_memory_in_megabytes=virtual_memory_in_megabytes,
                max_cpu_percent=max_cpu_percent,
            )
            loop_algorithm_container_ip = self.host.get_ipaddress(
                loop_algorithm_container
492
            )
493

Philip ABBET's avatar
Philip ABBET committed
494
495
        # Command to execute
        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
496
            "execute",
497
            "--cache={}".format(self.CONTAINER_CACHE_PATH),
498
            self.message_handler.address,
499
            self.CONTAINER_PREFIX_PATH,
Philip ABBET's avatar
Philip ABBET committed
500
        ]
501

502
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
503
504
505
506
            cmd.append(
                "tcp://%s:%d"
                % (databases_infos["db"]["address"], databases_infos["db"]["port"])
            )
507
508

        if self.loop_algorithm is not None:
Samuel GAIST's avatar
Samuel GAIST committed
509
            cmd.append(
510
                "--loop=tcp://%s:%d"
Samuel GAIST's avatar
Samuel GAIST committed
511
512
                % (loop_algorithm_container_ip, loop_algorithm_container_port)
            )
513

Philip ABBET's avatar
Philip ABBET committed
514
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
515
            cmd.insert(1, "--debug")
516

Philip ABBET's avatar
Philip ABBET committed
517
518
        # Creation of the container
        algorithm_container = self.host.create_container(processing_environment, cmd)
519
        algorithm_container.uid = datasets_uid
520
        algorithm_container.network_name = network_name
521

Philip ABBET's avatar
Philip ABBET committed
522
        # Volumes
523
        algorithm_container.add_volume(configuration_path, self.CONTAINER_PREFIX_PATH)
524
525
526
        self.__setup_io_volumes(
            algorithm_container, volume_cache_mount_point, self.data
        )
527

Philip ABBET's avatar
Philip ABBET committed
528
        # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
529
530
531
532
        self.host.start(
            algorithm_container,
            virtual_memory_in_megabytes=virtual_memory_in_megabytes,
            max_cpu_percent=max_cpu_percent,
Philip ABBET's avatar
Philip ABBET committed
533
        )
534

Philip ABBET's avatar
Philip ABBET committed
535
536
        # Process the messages until the container is done
        self.message_handler.start()
537

Philip ABBET's avatar
Philip ABBET committed
538
        timed_out = False
539

Philip ABBET's avatar
Philip ABBET committed
540
541
542
        try:
            timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None
            status = self.host.wait(algorithm_container, timeout)
543

Philip ABBET's avatar
Philip ABBET committed
544
        except requests.exceptions.ReadTimeout:
Samuel GAIST's avatar
Samuel GAIST committed
545
546
547
            logger.warn(
                "user process has timed out after %d minutes", timeout_in_minutes
            )
548
549
            timed_out = True

Philip ABBET's avatar
Philip ABBET committed
550
551
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
552

Samuel GAIST's avatar
Samuel GAIST committed
553
        except KeyboardInterrupt:  # Developer pushed CTRL-C
Philip ABBET's avatar
Philip ABBET committed
554
555
556
            logger.info("stopping user process on CTRL-C console request")
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
557

558
        finally:
559
560
            for name, databases_info in databases_infos.items():
                logger.debug("Stopping database container " + name)
Samuel GAIST's avatar
Samuel GAIST committed
561
                container = databases_info["container"]
562
563
                self.host.kill(container)
                self.host.wait(container)
564

Philip ABBET's avatar
Philip ABBET committed
565
566
            self.message_handler.stop.set()
            self.message_handler.join()
567

Philip ABBET's avatar
Philip ABBET committed
568
        # Collects final information and returns to caller
569
        container_log = self.host.logs(algorithm_container)
570
571

        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
572
            stdout = ""
573
            stderr = container_log
574
575
        else:
            stdout = container_log
Samuel GAIST's avatar
Samuel GAIST committed
576
            stderr = ""
577
578
579
580

        if logger.getEffectiveLevel() <= logging.DEBUG:
            logger.debug("Log of the container: " + container_log)

Philip ABBET's avatar
Philip ABBET committed
581
        retval = dict(
Samuel GAIST's avatar
Samuel GAIST committed
582
583
584
585
586
587
588
            status=status,
            stdout=stdout,
            stderr=stderr,
            timed_out=timed_out,
            statistics=self.host.statistics(algorithm_container),
            system_error=self.message_handler.system_error,
            user_error=self.message_handler.user_error,
Philip ABBET's avatar
Philip ABBET committed
589
        )
590

Samuel GAIST's avatar
Samuel GAIST committed
591
592
        retval["statistics"]["data"] = self.message_handler.statistics
        stats.update(retval["statistics"]["data"], self.io_statistics)
593

Philip ABBET's avatar
Philip ABBET committed
594
        self.host.rm(algorithm_container)
André Anjos's avatar
André Anjos committed
595

596
        for name, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
597
            container = databases_info["container"]
598
            db_container_log = self.host.logs(container)
599
600

            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
601
602
603
                logger.debug(
                    "Log of the" + name + "database container: " + db_container_log
                )
604

605
            if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
606
                retval["stderr"] += "\n" + db_container_log
607
            else:
Samuel GAIST's avatar
Samuel GAIST committed
608
                retval["stdout"] += "\n" + db_container_log
609

610
611
612
613
            self.host.rm(container)

        if loop_algorithm_container:
            self.host.rm(loop_algorithm_container)
André Anjos's avatar
André Anjos committed
614

Philip ABBET's avatar
Philip ABBET committed
615
616
        self.message_handler.destroy()
        self.message_handler = None
André Anjos's avatar
André Anjos committed
617

618
        if not self.debug:
619
            for _, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
620
                shutil.rmtree(databases_info["configuration_path"])
621
622
            shutil.rmtree(configuration_path)

Philip ABBET's avatar
Philip ABBET committed
623
        return retval