docker.py 26.2 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

André Anjos's avatar
André Anjos committed
36

37
38
39
40
"""
======
docker
======
André Anjos's avatar
André Anjos committed
41

42
43
Execution utilities
"""
Samuel GAIST's avatar
Samuel GAIST committed
44
import logging
André Anjos's avatar
André Anjos committed
45
import os
46
import shutil
Samuel GAIST's avatar
Samuel GAIST committed
47

Samuel GAIST's avatar
Samuel GAIST committed
48
49
from collections import namedtuple

50
import requests
51
import simplejson as json
52

53
from beat.backend.python.data import getAllFilenames
Samuel GAIST's avatar
Samuel GAIST committed
54
from beat.backend.python.execution import MessageHandler
André Anjos's avatar
André Anjos committed
55

56
from .. import stats
57
from .. import utils
58
from .remote import RemoteExecutor
59

60
61
logger = logging.getLogger(__name__)

Samuel GAIST's avatar
Samuel GAIST committed
62

63
64
class DockerExecutor(RemoteExecutor):
    """DockerExecutor runs the code given an execution block information, externally
André Anjos's avatar
André Anjos committed
65
66


Philip ABBET's avatar
Philip ABBET committed
67
    Parameters:
André Anjos's avatar
André Anjos committed
68

André Anjos's avatar
André Anjos committed
69
70
71
      host (:py:class:`.dock.Host`): A configured docker host that will
        execute the user process. If the host does not have access to the
        required environment, an exception will be raised.
72

Philip ABBET's avatar
Philip ABBET committed
73
      prefix (str): Establishes the prefix of your installation.
André Anjos's avatar
André Anjos committed
74

Philip ABBET's avatar
Philip ABBET committed
75
76
77
78
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
André Anjos's avatar
André Anjos committed
79

André Anjos's avatar
André Anjos committed
80
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
81
82
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
André Anjos's avatar
André Anjos committed
83

André Anjos's avatar
André Anjos committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.
André Anjos's avatar
André Anjos committed
111
112


Philip ABBET's avatar
Philip ABBET committed
113
    Attributes:
André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
      cache (str): The path to the cache currently being used
André Anjos's avatar
André Anjos committed
116

Philip ABBET's avatar
Philip ABBET committed
117
118
      errors (list): A list containing errors found while loading this execution
        block.
André Anjos's avatar
André Anjos committed
119

Philip ABBET's avatar
Philip ABBET committed
120
121
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
André Anjos's avatar
André Anjos committed
122

André Anjos's avatar
André Anjos committed
123
124
      algorithm (.algorithm.Algorithm): An object representing the algorithm to
        be run.
André Anjos's avatar
André Anjos committed
125

Philip ABBET's avatar
Philip ABBET committed
126
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
127
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
128
129
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
130

Philip ABBET's avatar
Philip ABBET committed
131
132
133
134
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
135

André Anjos's avatar
André Anjos committed
136
137
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
André Anjos's avatar
André Anjos committed
138

André Anjos's avatar
André Anjos committed
139
140
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
André Anjos's avatar
André Anjos committed
141

Philip ABBET's avatar
Philip ABBET committed
142
143
      data_sources (list): A list with all data-sources created by our execution
        loader.
André Anjos's avatar
André Anjos committed
144

Philip ABBET's avatar
Philip ABBET committed
145
146
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
André Anjos's avatar
André Anjos committed
147

Philip ABBET's avatar
Philip ABBET committed
148
    """
André Anjos's avatar
André Anjos committed
149

150
151
152
    CONTAINER_PREFIX_PATH = "/beat/prefix"
    CONTAINER_CACHE_PATH = "/beat/cache"

Samuel GAIST's avatar
Samuel GAIST committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def __init__(
        self,
        host,
        prefix,
        data,
        cache=None,
        dataformat_cache=None,
        database_cache=None,
        algorithm_cache=None,
        library_cache=None,
        custom_root_folders=None,
    ):

        super(DockerExecutor, self).__init__(
            prefix,
            data,
            host.ip,
            cache=cache,
            dataformat_cache=dataformat_cache,
            database_cache=database_cache,
            algorithm_cache=algorithm_cache,
            library_cache=library_cache,
            custom_root_folders=custom_root_folders,
        )
André Anjos's avatar
André Anjos committed
177

Philip ABBET's avatar
Philip ABBET committed
178
179
        # Initialisations
        self.host = host
André Anjos's avatar
André Anjos committed
180

Samuel GAIST's avatar
Samuel GAIST committed
181
182
183
    def __create_db_container(
        self, datasets_uid, network_name, configuration_name=None
    ):
184
185
186
187
188
        # Configuration and needed files
        databases_configuration_path = utils.temporary_directory()
        self.dump_databases_provider_configuration(databases_configuration_path)

        # Modify the paths to the databases in the dumped configuration files
Samuel GAIST's avatar
Samuel GAIST committed
189
        root_folder = os.path.join(databases_configuration_path, "prefix", "databases")
190

191
192
193
        DatabaseInfo = namedtuple("DatabaseInfo", ["path", "environment"])

        databases_infos = {}
194

Samuel GAIST's avatar
Samuel GAIST committed
195
196
197
198
        for (
            db_name,
            db_object,
        ) in self.databases.items():
Samuel GAIST's avatar
Samuel GAIST committed
199
            json_path = os.path.join(root_folder, db_name + ".json")
200

Samuel GAIST's avatar
Samuel GAIST committed
201
            with open(json_path, "r") as f:
202
                db_data = json.load(f)
203

204
205
206
            system_path = db_data["root_folder"]
            container_path = os.path.join("/databases", db_name)
            db_data["root_folder"] = container_path
207

Samuel GAIST's avatar
Samuel GAIST committed
208
            with open(json_path, "w") as f:
209
                json.dump(db_data, f, indent=4)
210

211
212
            databases_infos[db_name] = DatabaseInfo(
                system_path, utils.build_env_name(db_object.environment)
Samuel GAIST's avatar
Samuel GAIST committed
213
            )
214

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
        databases_environment = None

        requesting_environments = {
            name: info
            for name, info in databases_infos.items()
            if info.environment is not None
        }

        if requesting_environments:
            if len(requesting_environments) != len(self.databases):
                raise RuntimeError(
                    "Selected databases ({}) are not all providing"
                    " an environment.".format(list(self.databases.keys()))
                )

            requested_environments = {
                info.environment
                for info in requesting_environments.values()
                if info.environment is not None
            }
            if len(requested_environments) > 1:
                raise RuntimeError(
                    "Selected databases ({}) are requesting different environments,"
                    "only one is supported".format(list(requesting_environments.keys()))
                )

            # All databases are requesting the same environment
            db_environment = next(iter(requested_environments))
            try:
                databases_environment = self.host.dbenv2docker(db_environment)
            except Exception:
                raise RuntimeError(
                    "Environment {} not found - available environments are {}".format(
                        db_environment, list(self.host.db_environments.keys())
                    )
                )

        if not databases_environment:
            # Determine the docker image to use for the databases
            database_list = databases_infos.keys()
            try:
                databases_environment = self.host.db2docker(database_list)
            except Exception:
                raise RuntimeError(
                    "No environment found for the databases `%s' "
                    "- available environments are %s"
                    % (
                        ", ".join(database_list),
                        ", ".join(self.host.db_environments.keys()),
                    )
                )

267
268
269
270
271
        # Creation of the container
        # Note: we only support one databases image loaded at the same time
        database_port = utils.find_free_port()

        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
272
273
            "databases_provider",
            "0.0.0.0:%d" % database_port,
274
275
            self.CONTAINER_PREFIX_PATH,
            self.CONTAINER_CACHE_PATH,
276
277
        ]

278
279
280
        if configuration_name:
            cmd.append(configuration_name)

281
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
282
            cmd.insert(1, "--debug")
283
284
285
286
287
288
289
290

        databases_info_name = "beat_db_%s" % utils.id_generator()
        databases_info = self.host.create_container(databases_environment, cmd)
        databases_info.uid = datasets_uid
        databases_info.network_name = network_name
        databases_info.set_name(databases_info_name)

        # Specify the volumes to mount inside the container
291
292
293
294
        databases_info.add_volume(
            databases_configuration_path, self.CONTAINER_PREFIX_PATH
        )
        databases_info.add_volume(self.cache, self.CONTAINER_CACHE_PATH)
295

296
297
        for db_name, db_info in databases_infos.items():
            databases_info.add_volume(db_info.path, os.path.join("/databases", db_name))
298
299
300
301

        # Start the container
        while True:
            try:
Samuel GAIST's avatar
Samuel GAIST committed
302
303
304
                databases_info.add_port(
                    database_port, database_port, host_address=self.host.ip
                )
305
306
307
308
309

                self.host.start(databases_info)

                break
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
310
                if str(e).find("port is already allocated") < 0:
311
312
313
314
315
                    break

                databases_info.reset_ports()
                database_port = utils.find_free_port()

Samuel GAIST's avatar
Samuel GAIST committed
316
317
318
319
                cmd = [
                    x if not x.startswith("0.0.0.0:") else "0.0.0.0:%d" % database_port
                    for x in cmd
                ]
320
321
322
323
324
325
326
                databases_info.command = cmd

        database_ip = self.host.get_ipaddress(databases_info)
        retval = dict(
            configuration_path=databases_configuration_path,
            container=databases_info,
            address=database_ip,
Samuel GAIST's avatar
Samuel GAIST committed
327
            port=database_port,
328
329
330
        )
        return retval

331
332
333
    def __setup_io_volumes(
        self, algorithm_container, docker_cache_mount_point, configuration
    ):
334
335
336
337
338
339
340
341
342
343
344
345
346
        """Setup all the volumes for input and output files.

        Parameters:

          algorithm_container: container that will execute an algorithm

          configuration: json object containing the algorithm parameters
        """

        for item in configuration["inputs"].values():
            file_path = item["path"]
            source_path = os.path.join(self.cache, file_path)

347
348
349
350
351
352
353
354
355
356
357
358
359
360
            if docker_cache_mount_point is None:
                if os.path.isfile(source_path):
                    algorithm_container.add_volume(
                        source_path, os.path.join(self.CONTAINER_CACHE_PATH, file_path)
                    )
                else:
                    all_files = getAllFilenames(source_path)
                    for file_list in all_files:
                        for file_ in file_list:
                            target_path = file_[len(self.cache) + 1 :]
                            cache_path = os.path.join(
                                self.CONTAINER_CACHE_PATH, target_path
                            )
                            algorithm_container.add_volume(file_, cache_path)
361
            else:
362
363
364
365
                input_folder = file_path[: file_path.rfind("/")]
                source_folder = os.path.join(docker_cache_mount_point, input_folder)
                target_folder = os.path.join(self.CONTAINER_CACHE_PATH, input_folder)
                algorithm_container.add_volume(source_folder, target_folder)
366
367
368
369
370
371

        def __add_writable_volume(file_path):
            output_folder = file_path[: file_path.rfind("/")]
            source_folder = os.path.join(self.cache, output_folder)
            if not os.path.exists(source_folder):
                os.makedirs(source_folder)
372
373
374
375

            if docker_cache_mount_point is not None:
                source_folder = os.path.join(docker_cache_mount_point, output_folder)

376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
            algorithm_container.add_volume(
                source_folder,
                os.path.join(self.CONTAINER_CACHE_PATH, output_folder),
                read_only=False,
            )

        for item in configuration.get("outputs", {}).values():
            file_path = item["path"]
            __add_writable_volume(file_path)

        result = configuration.get("result")
        if result:
            file_path = result["path"]
            __add_writable_volume(file_path)

391
392
    def __setup_databases_raw_access(self, algorithm_container):
        """Add volumes to the algorithm container if the database allows that"""
393
394
395

        for database_name, database in self.databases.items():
            db_data = database.data
396
397
398
399
            if db_data.get("direct_rawdata_access", False):
                algorithm_container.add_volume(
                    db_data["root_folder"], os.path.join("/databases", database_name)
                )
400

Samuel GAIST's avatar
Samuel GAIST committed
401
402
403
    def process(
        self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
    ):
Philip ABBET's avatar
Philip ABBET committed
404
        """Executes the user algorithm code using an external program.
André Anjos's avatar
André Anjos committed
405

Philip ABBET's avatar
Philip ABBET committed
406
407
        The execution interface follows the backend API as described in our
        documentation.
André Anjos's avatar
André Anjos committed
408

André Anjos's avatar
André Anjos committed
409
410
411
412
413
        We use green subprocesses this implementation. Each co-process is
        linked to us via 2 uni-directional pipes which work as datain and
        dataout end-points. The parent process (i.e. the current one)
        establishes the connection to the child and then can pass/receive
        commands, data and logs.
André Anjos's avatar
André Anjos committed
414

André Anjos's avatar
André Anjos committed
415
416
417
418
419
420
        Usage of the data pipes (datain, dataout) is **synchronous** - you send
        a command and block for an answer. The co-process is normally
        controlled by the current process, except for data requests, which are
        user-code driven.  The nature of our problem does not require an
        *asynchronous* implementation which, in turn, would require a much more
        complex set of dependencies (on asyncio or Twisted for example).
André Anjos's avatar
André Anjos committed
421
422


Philip ABBET's avatar
Philip ABBET committed
423
        Parameters:
André Anjos's avatar
André Anjos committed
424

André Anjos's avatar
André Anjos committed
425
426
427
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
André Anjos's avatar
André Anjos committed
428

André Anjos's avatar
André Anjos committed
429
430
431
432
433
434
435
436
437
438
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.

          timeout_in_minutes (:py:class:`int`, Optional): The number of minutes
            to wait for the user process to execute. After this amount of time,
            the user process is killed with ``signal.SIGKILL``. If set to zero,
            no timeout will be applied.
André Anjos's avatar
André Anjos committed
439
440


Philip ABBET's avatar
Philip ABBET committed
441
        Returns:
André Anjos's avatar
André Anjos committed
442

André Anjos's avatar
André Anjos committed
443
444
          dict: A dictionary which is JSON formattable containing the summary
          of this block execution.
André Anjos's avatar
André Anjos committed
445

Philip ABBET's avatar
Philip ABBET committed
446
        """
André Anjos's avatar
André Anjos committed
447

Philip ABBET's avatar
Philip ABBET committed
448
        if not self.valid:
Samuel GAIST's avatar
Samuel GAIST committed
449
450
451
            raise RuntimeError(
                "execution information is bogus:\n  * %s" % "\n  * ".join(self.errors)
            )
André Anjos's avatar
André Anjos committed
452

Philip ABBET's avatar
Philip ABBET committed
453
        # Determine the docker image to use for the processing
454
        processing_environment = utils.build_env_name(self.data["environment"])
Philip ABBET's avatar
Philip ABBET committed
455
        if processing_environment not in self.host:
Samuel GAIST's avatar
Samuel GAIST committed
456
457
458
459
460
461
462
463
464
            raise RuntimeError(
                "Environment `%s' is not available on docker "
                "host `%s' - available environments are %s"
                % (
                    processing_environment,
                    self.host,
                    ", ".join(self.host.processing_environments.keys()),
                )
            )
465

Philip ABBET's avatar
Philip ABBET committed
466
467
        # Creates the message handler
        algorithm_container = None
468

Philip ABBET's avatar
Philip ABBET committed
469
470
        def _kill():
            self.host.kill(algorithm_container)
471

472
        address = self.host.ip
Samuel GAIST's avatar
Samuel GAIST committed
473
        port_range = self.data.pop("port_range", None)
474
        if port_range:
Samuel GAIST's avatar
Samuel GAIST committed
475
            min_port, max_port = port_range.split(":")
476
            port = utils.find_free_port_in_range(int(min_port), int(max_port))
Samuel GAIST's avatar
Samuel GAIST committed
477
            address += ":{}".format(port)
478

479
480
        volume_cache_mount_point = self.data.pop("cache_mount_point", None)

481
        self.message_handler = MessageHandler(address, kill_callback=_kill)
482

Samuel GAIST's avatar
Samuel GAIST committed
483
        # ----- (If necessary) Instantiate the docker container that provide the databases
484

485
        datasets_uid = self.data.pop("datasets_uid", os.geteuid())
Samuel GAIST's avatar
Samuel GAIST committed
486
        network_name = self.data.pop("network_name", "bridge")
487
        databases_infos = {}
488

489
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
490
491
492
            databases_infos["db"] = self.__create_db_container(
                datasets_uid, network_name
            )
493

Samuel GAIST's avatar
Samuel GAIST committed
494
        # ----- Instantiate the algorithm container
495

496
497
498
        # Configuration and needed files
        configuration_path = utils.temporary_directory()
        self.dump_runner_configuration(configuration_path)
499

500
501
502
        loop_algorithm_container = None
        loop_algorithm_container_ip = None
        loop_algorithm_container_port = None
503

504
505
        if self.loop_algorithm is not None:
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
506
507
508
                databases_infos["loop_db"] = self.__create_db_container(
                    datasets_uid, network_name, "loop"
                )
509

510
            loop_algorithm_container_port = utils.find_free_port()
Philip ABBET's avatar
Philip ABBET committed
511
            cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
512
513
                "loop_execute",
                "0.0.0.0:{}".format(loop_algorithm_container_port),
514
515
                self.CONTAINER_PREFIX_PATH,
                self.CONTAINER_CACHE_PATH,
Philip ABBET's avatar
Philip ABBET committed
516
            ]
517

518
            if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
519
520
521
522
523
524
                cmd.append(
                    "tcp://{}:{}".format(
                        databases_infos["loop_db"]["address"],
                        databases_infos["loop_db"]["port"],
                    )
                )
525

526
            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
527
                cmd.insert(1, "--debug")
528

Samuel GAIST's avatar
Samuel GAIST committed
529
530
531
            loop_algorithm_container = self.host.create_container(
                processing_environment, cmd
            )
532
533
            loop_algorithm_container.uid = datasets_uid
            loop_algorithm_container.network_name = network_name
534

535
            # Volumes
Samuel GAIST's avatar
Samuel GAIST committed
536
            loop_algorithm_container.add_volume(
537
538
                configuration_path, self.CONTAINER_PREFIX_PATH
            )
539
540
541
            self.__setup_io_volumes(
                loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
            )
542

543
            self.__setup_databases_raw_access(loop_algorithm_container)
544

Philip ABBET's avatar
Philip ABBET committed
545
            # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
546
547
548
549
550
551
552
            self.host.start(
                loop_algorithm_container,
                virtual_memory_in_megabytes=virtual_memory_in_megabytes,
                max_cpu_percent=max_cpu_percent,
            )
            loop_algorithm_container_ip = self.host.get_ipaddress(
                loop_algorithm_container
553
            )
554

Philip ABBET's avatar
Philip ABBET committed
555
556
        # Command to execute
        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
557
            "execute",
558
            "--cache={}".format(self.CONTAINER_CACHE_PATH),
559
            self.message_handler.address,
560
            self.CONTAINER_PREFIX_PATH,
Philip ABBET's avatar
Philip ABBET committed
561
        ]
562

563
        if len(self.databases) > 0:
Samuel GAIST's avatar
Samuel GAIST committed
564
565
566
567
            cmd.append(
                "tcp://%s:%d"
                % (databases_infos["db"]["address"], databases_infos["db"]["port"])
            )
568
569

        if self.loop_algorithm is not None:
Samuel GAIST's avatar
Samuel GAIST committed
570
            cmd.append(
571
                "--loop=tcp://%s:%d"
Samuel GAIST's avatar
Samuel GAIST committed
572
573
                % (loop_algorithm_container_ip, loop_algorithm_container_port)
            )
574

Philip ABBET's avatar
Philip ABBET committed
575
        if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
576
            cmd.insert(1, "--debug")
577

Philip ABBET's avatar
Philip ABBET committed
578
579
        # Creation of the container
        algorithm_container = self.host.create_container(processing_environment, cmd)
580
        algorithm_container.uid = datasets_uid
581
        algorithm_container.network_name = network_name
582

Philip ABBET's avatar
Philip ABBET committed
583
        # Volumes
584
        algorithm_container.add_volume(configuration_path, self.CONTAINER_PREFIX_PATH)
585
586
587
        self.__setup_io_volumes(
            algorithm_container, volume_cache_mount_point, self.data
        )
588

589
        self.__setup_databases_raw_access(algorithm_container)
590

Philip ABBET's avatar
Philip ABBET committed
591
        # Start the container
Samuel GAIST's avatar
Samuel GAIST committed
592
593
594
595
        self.host.start(
            algorithm_container,
            virtual_memory_in_megabytes=virtual_memory_in_megabytes,
            max_cpu_percent=max_cpu_percent,
Philip ABBET's avatar
Philip ABBET committed
596
        )
597

Philip ABBET's avatar
Philip ABBET committed
598
599
        # Process the messages until the container is done
        self.message_handler.start()
600

Philip ABBET's avatar
Philip ABBET committed
601
        timed_out = False
602

Philip ABBET's avatar
Philip ABBET committed
603
604
605
        try:
            timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None
            status = self.host.wait(algorithm_container, timeout)
606

Philip ABBET's avatar
Philip ABBET committed
607
        except requests.exceptions.ReadTimeout:
Samuel GAIST's avatar
Samuel GAIST committed
608
            logger.warning(
Samuel GAIST's avatar
Samuel GAIST committed
609
610
                "user process has timed out after %d minutes", timeout_in_minutes
            )
611
612
            timed_out = True

Philip ABBET's avatar
Philip ABBET committed
613
614
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
615

Samuel GAIST's avatar
Samuel GAIST committed
616
        except KeyboardInterrupt:  # Developer pushed CTRL-C
Philip ABBET's avatar
Philip ABBET committed
617
618
619
            logger.info("stopping user process on CTRL-C console request")
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
620

621
        finally:
622
623
            for name, databases_info in databases_infos.items():
                logger.debug("Stopping database container " + name)
Samuel GAIST's avatar
Samuel GAIST committed
624
                container = databases_info["container"]
625
626
                self.host.kill(container)
                self.host.wait(container)
627

Philip ABBET's avatar
Philip ABBET committed
628
629
            self.message_handler.stop.set()
            self.message_handler.join()
630

Philip ABBET's avatar
Philip ABBET committed
631
        # Collects final information and returns to caller
632
        container_log = self.host.logs(algorithm_container)
633
634

        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
635
            stdout = ""
636
            stderr = container_log
637
638
        else:
            stdout = container_log
Samuel GAIST's avatar
Samuel GAIST committed
639
            stderr = ""
640
641
642
643

        if logger.getEffectiveLevel() <= logging.DEBUG:
            logger.debug("Log of the container: " + container_log)

Philip ABBET's avatar
Philip ABBET committed
644
        retval = dict(
Samuel GAIST's avatar
Samuel GAIST committed
645
646
647
648
649
650
651
            status=status,
            stdout=stdout,
            stderr=stderr,
            timed_out=timed_out,
            statistics=self.host.statistics(algorithm_container),
            system_error=self.message_handler.system_error,
            user_error=self.message_handler.user_error,
Philip ABBET's avatar
Philip ABBET committed
652
        )
653

Samuel GAIST's avatar
Samuel GAIST committed
654
655
        retval["statistics"]["data"] = self.message_handler.statistics
        stats.update(retval["statistics"]["data"], self.io_statistics)
656

Philip ABBET's avatar
Philip ABBET committed
657
        self.host.rm(algorithm_container)
André Anjos's avatar
André Anjos committed
658

659
        for name, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
660
            container = databases_info["container"]
661
            db_container_log = self.host.logs(container)
662
663

            if logger.getEffectiveLevel() <= logging.DEBUG:
Samuel GAIST's avatar
Samuel GAIST committed
664
665
666
                logger.debug(
                    "Log of the" + name + "database container: " + db_container_log
                )
667

668
            if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
669
                retval["stderr"] += "\n" + db_container_log
670
            else:
Samuel GAIST's avatar
Samuel GAIST committed
671
                retval["stdout"] += "\n" + db_container_log
672

673
674
675
676
            self.host.rm(container)

        if loop_algorithm_container:
            self.host.rm(loop_algorithm_container)
André Anjos's avatar
André Anjos committed
677

Philip ABBET's avatar
Philip ABBET committed
678
679
        self.message_handler.destroy()
        self.message_handler = None
André Anjos's avatar
André Anjos committed
680

681
        if not self.debug:
682
            for _, databases_info in databases_infos.items():
Samuel GAIST's avatar
Samuel GAIST committed
683
                shutil.rmtree(databases_info["configuration_path"])
684
685
            shutil.rmtree(configuration_path)

Philip ABBET's avatar
Philip ABBET committed
686
        return retval