docker.py 17.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

28
29
30
31
"""
======
docker
======
André Anjos's avatar
André Anjos committed
32

33
34
Execution utilities
"""
André Anjos's avatar
André Anjos committed
35
36

import os
37
38
import requests
import simplejson
Philip ABBET's avatar
Philip ABBET committed
39
import zmq
40
import docker
41
42

import logging
André Anjos's avatar
André Anjos committed
43
44
logger = logging.getLogger(__name__)

45
from .. import stats
46
47
from .. import message_handler
from .. import utils
André Anjos's avatar
André Anjos committed
48

49
from .remote import RemoteExecutor
50

51

52
53
class DockerExecutor(RemoteExecutor):
    """DockerExecutor runs the code given an execution block information, externally
André Anjos's avatar
André Anjos committed
54
55


Philip ABBET's avatar
Philip ABBET committed
56
    Parameters:
André Anjos's avatar
André Anjos committed
57

André Anjos's avatar
André Anjos committed
58
59
60
      host (:py:class:`.dock.Host`): A configured docker host that will
        execute the user process. If the host does not have access to the
        required environment, an exception will be raised.
61

Philip ABBET's avatar
Philip ABBET committed
62
      prefix (str): Establishes the prefix of your installation.
André Anjos's avatar
André Anjos committed
63

Philip ABBET's avatar
Philip ABBET committed
64
65
66
67
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
André Anjos's avatar
André Anjos committed
68

André Anjos's avatar
André Anjos committed
69
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
70
71
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
André Anjos's avatar
André Anjos committed
72

André Anjos's avatar
André Anjos committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.
André Anjos's avatar
André Anjos committed
100
101


Philip ABBET's avatar
Philip ABBET committed
102
    Attributes:
André Anjos's avatar
André Anjos committed
103

Philip ABBET's avatar
Philip ABBET committed
104
      cache (str): The path to the cache currently being used
André Anjos's avatar
André Anjos committed
105

Philip ABBET's avatar
Philip ABBET committed
106
107
      errors (list): A list containing errors found while loading this execution
        block.
André Anjos's avatar
André Anjos committed
108

Philip ABBET's avatar
Philip ABBET committed
109
110
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
André Anjos's avatar
André Anjos committed
111

André Anjos's avatar
André Anjos committed
112
113
      algorithm (.algorithm.Algorithm): An object representing the algorithm to
        be run.
André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
116
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
117
118
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
119

Philip ABBET's avatar
Philip ABBET committed
120
121
122
123
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
André Anjos's avatar
André Anjos committed
124

André Anjos's avatar
André Anjos committed
125
126
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
André Anjos's avatar
André Anjos committed
127

André Anjos's avatar
André Anjos committed
128
129
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
André Anjos's avatar
André Anjos committed
130

Philip ABBET's avatar
Philip ABBET committed
131
132
      data_sources (list): A list with all data-sources created by our execution
        loader.
André Anjos's avatar
André Anjos committed
133

Philip ABBET's avatar
Philip ABBET committed
134
135
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
André Anjos's avatar
André Anjos committed
136

Philip ABBET's avatar
Philip ABBET committed
137
    """
André Anjos's avatar
André Anjos committed
138

Philip ABBET's avatar
Philip ABBET committed
139
140
    def __init__(self, host, prefix, data, cache=None, dataformat_cache=None,
            database_cache=None, algorithm_cache=None, library_cache=None,
141
            custom_root_folders=None):
André Anjos's avatar
André Anjos committed
142

143
        super(DockerExecutor, self).__init__(prefix, data, host.ip, cache=cache,
Philip ABBET's avatar
Philip ABBET committed
144
145
146
147
148
                                             dataformat_cache=dataformat_cache,
                                             database_cache=database_cache,
                                             algorithm_cache=algorithm_cache,
                                             library_cache=library_cache,
                                             custom_root_folders=custom_root_folders)
André Anjos's avatar
André Anjos committed
149

Philip ABBET's avatar
Philip ABBET committed
150
151
        # Initialisations
        self.host = host
André Anjos's avatar
André Anjos committed
152
153


Philip ABBET's avatar
Philip ABBET committed
154
155
156
    def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
                timeout_in_minutes=0):
        """Executes the user algorithm code using an external program.
André Anjos's avatar
André Anjos committed
157

Philip ABBET's avatar
Philip ABBET committed
158
159
        The execution interface follows the backend API as described in our
        documentation.
André Anjos's avatar
André Anjos committed
160

André Anjos's avatar
André Anjos committed
161
162
163
164
165
        We use green subprocesses this implementation. Each co-process is
        linked to us via 2 uni-directional pipes which work as datain and
        dataout end-points. The parent process (i.e. the current one)
        establishes the connection to the child and then can pass/receive
        commands, data and logs.
André Anjos's avatar
André Anjos committed
166

André Anjos's avatar
André Anjos committed
167
168
169
170
171
172
        Usage of the data pipes (datain, dataout) is **synchronous** - you send
        a command and block for an answer. The co-process is normally
        controlled by the current process, except for data requests, which are
        user-code driven.  The nature of our problem does not require an
        *asynchronous* implementation which, in turn, would require a much more
        complex set of dependencies (on asyncio or Twisted for example).
André Anjos's avatar
André Anjos committed
173
174


Philip ABBET's avatar
Philip ABBET committed
175
        Parameters:
André Anjos's avatar
André Anjos committed
176

André Anjos's avatar
André Anjos committed
177
178
179
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
André Anjos's avatar
André Anjos committed
180

André Anjos's avatar
André Anjos committed
181
182
183
184
185
186
187
188
189
190
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.

          timeout_in_minutes (:py:class:`int`, Optional): The number of minutes
            to wait for the user process to execute. After this amount of time,
            the user process is killed with ``signal.SIGKILL``. If set to zero,
            no timeout will be applied.
André Anjos's avatar
André Anjos committed
191
192


Philip ABBET's avatar
Philip ABBET committed
193
        Returns:
André Anjos's avatar
André Anjos committed
194

André Anjos's avatar
André Anjos committed
195
196
          dict: A dictionary which is JSON formattable containing the summary
          of this block execution.
André Anjos's avatar
André Anjos committed
197

Philip ABBET's avatar
Philip ABBET committed
198
        """
André Anjos's avatar
André Anjos committed
199

Philip ABBET's avatar
Philip ABBET committed
200
201
202
        if not self.valid:
            raise RuntimeError("execution information is bogus:\n  * %s" % \
                    '\n  * '.join(self.errors))
André Anjos's avatar
André Anjos committed
203
204


Philip ABBET's avatar
Philip ABBET committed
205
206
207
208
209
210
        # Determine the docker image to use for the processing
        processing_environment = '%(name)s (%(version)s)' % self.data['environment']
        if processing_environment not in self.host:
            raise RuntimeError("Environment `%s' is not available on docker " \
                "host `%s' - available environments are %s" % (processing_environment,
                  self.host, ", ".join(self.host.processing_environments.keys())))
211
212


Philip ABBET's avatar
Philip ABBET committed
213
214
        # Creates the message handler
        algorithm_container = None
215

Philip ABBET's avatar
Philip ABBET committed
216
217
        def _kill():
            self.host.kill(algorithm_container)
218

219
220
221
222
223
224
225
226
        address = self.host.ip
        port_range = self.data.pop('port_range', None)
        if port_range:
            min_port, max_port = port_range.split(':')
            port = utils.find_free_port_in_range(int(min_port), int(max_port))
            address +=  ':{}'.format(port)

        self.message_handler = message_handler.MessageHandler(address,
227
                                                              kill_callback=_kill)
228

229

Philip ABBET's avatar
Philip ABBET committed
230
        #----- (If necessary) Instantiate the docker container that provide the databases
231

Philip ABBET's avatar
Philip ABBET committed
232
        databases_container = None
233
        datasets_uid =  self.data.pop('datasets_uid', None)
234
        network_name = self.data.pop('network_name', 'bridge')
235

236
        if len(self.databases) > 0:
237

Philip ABBET's avatar
Philip ABBET committed
238
239
240
            # Configuration and needed files
            databases_configuration_path = utils.temporary_directory()
            self.dump_databases_provider_configuration(databases_configuration_path)
241

Philip ABBET's avatar
Philip ABBET committed
242
243
            # Modify the paths to the databases in the dumped configuration files
            root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases')
244

Philip ABBET's avatar
Philip ABBET committed
245
            database_paths = {}
246

247
248
            for db_name in self.databases.keys():
                json_path = os.path.join(root_folder, db_name + '.json')
249

250
251
                with open(json_path, 'r') as f:
                    db_data = simplejson.load(f)
252

253
254
                database_paths[db_name] = db_data['root_folder']
                db_data['root_folder'] = os.path.join('/databases', db_name)
255

256
257
                with open(json_path, 'w') as f:
                    simplejson.dump(db_data, f, indent=4)
258

Philip ABBET's avatar
Philip ABBET committed
259
260
261
262
263
264
265
266
            # Determine the docker image to use for the databases
            try:
                databases_environment = self.host.db2docker(database_paths.keys())
            except:
                raise RuntimeError("No environment found for the databases `%s' " \
                    "- available environments are %s" % (
                      ", ".join(database_paths.keys()),
                      ", ".join(self.host.db_environments.keys())))
267

Philip ABBET's avatar
Philip ABBET committed
268
269
            # Creation of the container
            # Note: we only support one databases image loaded at the same time
270
            database_port = utils.find_free_port()
271

Philip ABBET's avatar
Philip ABBET committed
272
            cmd = [
273
                'databases_provider',
274
275
276
                '0.0.0.0:%d' % database_port,
                '/beat/prefix',
                '/beat/cache'
Philip ABBET's avatar
Philip ABBET committed
277
            ]
278

279
280
281
            if logger.getEffectiveLevel() <= logging.DEBUG:
                cmd.insert(1, '--debug')

Philip ABBET's avatar
Philip ABBET committed
282
            databases_container = self.host.create_container(databases_environment, cmd)
283
            databases_container.uid = datasets_uid
284
            databases_container.network_name = network_name
285

Philip ABBET's avatar
Philip ABBET committed
286
            # Specify the volumes to mount inside the container
287
288
289
            databases_container.add_volume(databases_configuration_path, '/beat/prefix')
            databases_container.add_volume(self.cache, '/beat/cache')

290
291
            for db_name, db_path in database_paths.items():
                databases_container.add_volume(db_path, os.path.join('/databases', db_name))
292

Philip ABBET's avatar
Philip ABBET committed
293
            # Start the container
294
295
296
297
298
299
300
301
302
303
304
305
            while True:
                try:
                    databases_container.add_port(database_port, database_port, host_address=self.host.ip)

                    self.host.start(databases_container)

                    break
                except Exception as e:
                    if str(e).find('port is already allocated') < 0:
                        break

                    databases_container.reset_ports()
306
                    database_port = utils.find_free_port()
307
308
309

                    cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
                    databases_container.command = cmd
310

311
312
313
314
315
316
317
318
            database_ip = self.host.ip
            client = docker.from_env()
            for container in client.containers.list():
                newtwork_settings = container.attrs['NetworkSettings']
                for port, mapping in newtwork_settings['Ports'].items():
                    if port.startswith('%d' % database_port):
                        database_ip = newtwork_settings['Networks'][network_name]['IPAddress']

319

Philip ABBET's avatar
Philip ABBET committed
320
        #----- Instantiate the algorithm container
321

Philip ABBET's avatar
Philip ABBET committed
322
323
324
        # Configuration and needed files
        configuration_path = utils.temporary_directory()
        self.dump_runner_configuration(configuration_path)
325

Philip ABBET's avatar
Philip ABBET committed
326
327
        # Command to execute
        cmd = [
328
            'execute',
329
            '--cache=/beat/cache',
330
            self.message_handler.address,
331
            '/beat/prefix'
Philip ABBET's avatar
Philip ABBET committed
332
        ]
333

334
        if len(self.databases) > 0:
335
            cmd.append('tcp://' + database_ip + ':%d' % database_port)
336

Philip ABBET's avatar
Philip ABBET committed
337
338
        if logger.getEffectiveLevel() <= logging.DEBUG:
            cmd.insert(1, '--debug')
339

Philip ABBET's avatar
Philip ABBET committed
340
341
        # Creation of the container
        algorithm_container = self.host.create_container(processing_environment, cmd)
342
        algorithm_container.uid = datasets_uid
343
        algorithm_container.network_name = network_name
344

Philip ABBET's avatar
Philip ABBET committed
345
        # Volumes
346
347
        algorithm_container.add_volume(configuration_path, '/beat/prefix')
        algorithm_container.add_volume(self.cache, '/beat/cache', read_only=False)
348

Philip ABBET's avatar
Philip ABBET committed
349
350
351
352
353
        # Start the container
        self.host.start(algorithm_container,
                        virtual_memory_in_megabytes=virtual_memory_in_megabytes,
                        max_cpu_percent=max_cpu_percent
        )
354
355


Philip ABBET's avatar
Philip ABBET committed
356
357
        # Process the messages until the container is done
        self.message_handler.start()
358

Philip ABBET's avatar
Philip ABBET committed
359
        timed_out = False
360

Philip ABBET's avatar
Philip ABBET committed
361
362
363
        try:
            timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None
            status = self.host.wait(algorithm_container, timeout)
364

Philip ABBET's avatar
Philip ABBET committed
365
366
        except requests.exceptions.ReadTimeout:
            logger.warn("user process has timed out after %d minutes", timeout_in_minutes)
367
368
            timed_out = True

Philip ABBET's avatar
Philip ABBET committed
369
370
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
371

Philip ABBET's avatar
Philip ABBET committed
372
373
374
375
        except KeyboardInterrupt: # Developer pushed CTRL-C
            logger.info("stopping user process on CTRL-C console request")
            self.host.kill(algorithm_container)
            status = self.host.wait(algorithm_container)
376

377
        finally:
Philip ABBET's avatar
Philip ABBET committed
378
379
380
            if databases_container is not None:
                self.host.kill(databases_container)
                self.host.wait(databases_container)
381

Philip ABBET's avatar
Philip ABBET committed
382
383
            self.message_handler.stop.set()
            self.message_handler.join()
384
385


Philip ABBET's avatar
Philip ABBET committed
386
        # Collects final information and returns to caller
387
        container_log = self.host.logs(algorithm_container)
388
389

        if status != 0:
390
            stdout = ''
391
            stderr = container_log
392
393
394
        else:
            stdout = container_log
            stderr = ''
395
396
397
398

        if logger.getEffectiveLevel() <= logging.DEBUG:
            logger.debug("Log of the container: " + container_log)

Philip ABBET's avatar
Philip ABBET committed
399
        retval = dict(
400
            status = status,
401
            stdout = stdout,
402
            stderr = stderr,
Philip ABBET's avatar
Philip ABBET committed
403
404
405
406
407
            timed_out = timed_out,
            statistics = self.host.statistics(algorithm_container),
            system_error = self.message_handler.system_error,
            user_error = self.message_handler.user_error,
        )
408

Philip ABBET's avatar
Philip ABBET committed
409
410
        retval['statistics']['data'] = self.message_handler.statistics
        stats.update(retval['statistics']['data'], self.io_statistics)
411

Philip ABBET's avatar
Philip ABBET committed
412
        self.host.rm(algorithm_container)
André Anjos's avatar
André Anjos committed
413

Philip ABBET's avatar
Philip ABBET committed
414
        if databases_container is not None:
415
            db_container_log = self.host.logs(databases_container)
416
417
418
419

            if logger.getEffectiveLevel() <= logging.DEBUG:
                logger.debug("Log of the database container: " + db_container_log)

420
            if status != 0:
421
                retval['stderr'] += '\n' + db_container_log
422
423
            else:
                retval['stdout'] += '\n' + db_container_log
424

Philip ABBET's avatar
Philip ABBET committed
425
            self.host.rm(databases_container)
André Anjos's avatar
André Anjos committed
426

Philip ABBET's avatar
Philip ABBET committed
427
428
        self.message_handler.destroy()
        self.message_handler = None
André Anjos's avatar
André Anjos committed
429

Philip ABBET's avatar
Philip ABBET committed
430
        return retval