dock.py 20.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
6
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
André Anjos's avatar
André Anjos committed
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import os
30
31
import six
import simplejson
32
import socket
33
import tarfile
34
35
36
37
import tempfile
import time
import docker
import subprocess as sp
38
39
40

import logging
logger = logging.getLogger(__name__)
André Anjos's avatar
André Anjos committed
41
42
43
44

from . import stats


45
class Host(object):
Philip ABBET's avatar
Philip ABBET committed
46
    '''An object of this class can connect to the docker host and resolve stuff'''
47

Philip ABBET's avatar
Philip ABBET committed
48
    images_cache = {}
49

André Anjos's avatar
André Anjos committed
50

51
    def __init__(self, images_cache=None, raise_on_errors=True):
André Anjos's avatar
André Anjos committed
52

Philip ABBET's avatar
Philip ABBET committed
53
54
55
56
57
58
        # Initialisations
        self.images_cache_filename = images_cache
        self.base_url = None
        self.containers = []
        self.processing_environments = {}
        self.db_environments = {}
59

Philip ABBET's avatar
Philip ABBET committed
60
61
62
63
        # (If necessary) Load the known infos about the images
        if (self.images_cache_filename is not None) and os.path.exists(self.images_cache_filename):
            with open(self.images_cache_filename, 'r') as f:
                Host.images_cache = simplejson.load(f)
64

Philip ABBET's avatar
Philip ABBET committed
65
66
        # Discover the environments
        (self.processing_environments, self.db_environments) = self._discover_environments(raise_on_errors)
André Anjos's avatar
André Anjos committed
67

Philip ABBET's avatar
Philip ABBET committed
68
69
70
71
        # (If necessary) Save the known infos about the images
        if self.images_cache_filename is not None:
            with open(self.images_cache_filename, 'w') as f:
                simplejson.dump(Host.images_cache, f, indent=4)
72

André Anjos's avatar
André Anjos committed
73

Philip ABBET's avatar
Philip ABBET committed
74
75
    def __contains__(self, key):
        return (key in self.processing_environments) or (key in self.db_environments)
André Anjos's avatar
André Anjos committed
76
77


Philip ABBET's avatar
Philip ABBET committed
78
79
    def __str__(self):
        s = 'Docker host'
80

Philip ABBET's avatar
Philip ABBET committed
81
82
        if self.base_url is not None:
            s += ' (%s)' % self.base_url
83

Philip ABBET's avatar
Philip ABBET committed
84
        return s
André Anjos's avatar
André Anjos committed
85
86


Philip ABBET's avatar
Philip ABBET committed
87
88
    def env2docker(self, key):
        '''Returns a nice docker image name given a BEAT environment key'''
André Anjos's avatar
André Anjos committed
89

Philip ABBET's avatar
Philip ABBET committed
90
        attrs = self.processing_environments[key]
91
        return attrs['image']
92

93

Philip ABBET's avatar
Philip ABBET committed
94
95
    def db2docker(self, db_names):
        '''Returns a nice docker image name given a database name'''
96

Philip ABBET's avatar
Philip ABBET committed
97
98
        def _all_in(db_names, databases):
            return len([ x for x in db_names if x in databases ]) == len(db_names)
99

Philip ABBET's avatar
Philip ABBET committed
100
        attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x['databases']) ][0]
101

102
        return attrs['image']
103
104


Philip ABBET's avatar
Philip ABBET committed
105
106
107
    def teardown(self):
        for container in self.containers:
            self.rm(container)
108

Philip ABBET's avatar
Philip ABBET committed
109
        self.containers = []
André Anjos's avatar
André Anjos committed
110
111


Philip ABBET's avatar
Philip ABBET committed
112
113
    def __exit__(self, *exc):
        self.teardown()
André Anjos's avatar
André Anjos committed
114
115


Philip ABBET's avatar
Philip ABBET committed
116
117
    def full_environment_name(self, name):
        try:
118
            return list(filter(lambda x: x.startswith(name  + ' ('), self.processing_environments.keys()))[0]
Philip ABBET's avatar
Philip ABBET committed
119
120
        except:
            try:
121
                return list(filter(lambda x: x.startswith(name  + ' ('), self.db_environments.keys()))[0]
Philip ABBET's avatar
Philip ABBET committed
122
123
            except:
                return None
124

Philip ABBET's avatar
Philip ABBET committed
125
126
127
128
129
130
    @property
    def ip(self):
        '''The IP address of the docker host'''
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 1))  # connecting to a UDP address doesn't send packets
        return s.getsockname()[0]
131
132


Philip ABBET's avatar
Philip ABBET committed
133
134
    def _discover_environments(self, raise_on_errors=True):
        '''Returns a dictionary containing information about docker environments
135

Philip ABBET's avatar
Philip ABBET committed
136
        Parameters:
137

Philip ABBET's avatar
Philip ABBET committed
138
139
140
          raise_on_errors (bool, Optional): If we should raise an exception
            (``RuntimeError``) in case installed environments override each other
            and we can't know which to use.
141
142


Philip ABBET's avatar
Philip ABBET committed
143
        Raises:
144

Philip ABBET's avatar
Philip ABBET committed
145
146
147
          RuntimeError: if you set ``raise_on_errors`` and I found environments
            that override each other for their description keys (``<name>
            (<version>)``), which should be unique.
148

Philip ABBET's avatar
Philip ABBET committed
149
        '''
150

Philip ABBET's avatar
Philip ABBET committed
151
152
        def _describe(image):
            '''Tries to run the "describe" app on the image, collect results'''
153

154
            if image in Host.images_cache:
Philip ABBET's avatar
Philip ABBET committed
155
                return Host.images_cache[image]
156

157
158
159
160
161
162
163
164
165
166
            cmd = [
                'docker',
                'run',
                '--rm=true',
                image,
                'describe'
            ]

            (status, stdout, stderr) = self._exec(cmd)

Philip ABBET's avatar
Philip ABBET committed
167
168
            if status == 0:
                try:
169
                    infos = simplejson.loads(stdout)
Philip ABBET's avatar
Philip ABBET committed
170
171
172
173
174
                    Host.images_cache[image] = infos
                    return infos
                except Exception as e:
                    logger.warn("Ignoring potential environment at `%s' since " \
                            "`describe' output cannot be parsed: %s", image, str(e))
175
176
177
178
            else:
                logger.warn("Execution failed with status {}: \n"
                            "stdout: '{}'\n"
                            "stderr: '{}'".format(status, stdout, stderr))
Philip ABBET's avatar
Philip ABBET committed
179
            return {}
180
181


182
183
184
        def _must_replace(image, environments, key):
            # this check avoids we do a new environment and, by mistake, override
            # it with a previous version or the contrary.
Philip ABBET's avatar
Philip ABBET committed
185
            if raise_on_errors:
186
187
                raise RuntimeError("Environments at '%s' and '%s' have the " \
                        "same name ('%s'). Distinct environments must be " \
Philip ABBET's avatar
Philip ABBET committed
188
                        "uniquely named. Fix this and re-start." % \
189
                        (image, environments[key]['image'], key))
Philip ABBET's avatar
Philip ABBET committed
190
191
192
193

            new_version = None
            previous_version = None

194
            parts = image.split('/')
Philip ABBET's avatar
Philip ABBET committed
195
196
197
198
199
            if len(parts) > 1:
                parts = parts[-1].split(':')
                if len(parts) > 1:
                    new_version = parts[-1]

200
            parts = environments[key]['image'].split('/')
Philip ABBET's avatar
Philip ABBET committed
201
202
203
204
205
206
207
208
209
210
211
212
213
            if len(parts) > 1:
                parts = parts[-1].split(':')
                if len(parts) > 1:
                    previous_version = parts[-1]

            replacement = False
            keep = False

            if (new_version is not None) and (previous_version is not None):
                if new_version == 'latest':
                    replacement = True
                elif previous_version == 'latest':
                    keep = True
214
                else:
Philip ABBET's avatar
Philip ABBET committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
                    try:
                        new_version = tuple([ int(x) for x in new_version.split('.') ])

                        try:
                            previous_version = tuple([ int(x) for x in previous_version.split('.') ])

                            if new_version > previous_version:
                                replacement = True
                            else:
                                keep = True
                        except:
                            replacement = True

                    except:
                        keep = True

            elif new_version is not None:
232
                replacement = True
Philip ABBET's avatar
Philip ABBET committed
233
234
            elif previous_version is not None:
                keep = True
235

Philip ABBET's avatar
Philip ABBET committed
236
            if replacement:
237
                logger.debug("Overriding **existing** environment '%s' in image '%s'", key, environments[key]['image'])
Philip ABBET's avatar
Philip ABBET committed
238
            elif keep:
239
                logger.debug("Environment '%s' already existing in image '%s', we'll keep it", key, environments[key]['image'])
Philip ABBET's avatar
Philip ABBET committed
240
241
                return False
            else:
242
243
                logger.warn("Overriding **existing** environment '%s' image " \
                        "with '%s'. To avoid this warning make " \
Philip ABBET's avatar
Philip ABBET committed
244
245
246
247
                        "sure your docker images do not contain environments " \
                        "with the same names", key, environments[key]['image'])

            return True
248
249


Philip ABBET's avatar
Philip ABBET committed
250
251
        environments = {}
        db_environments = {}
252
253


254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
        cmd = [
            'docker',
            'images',
            '--format',
            '{{.Repository}}:{{.Tag}}',
        ]

        (status, stdout, stderr) = self._exec(cmd)

        if status != 0:
            logger.error("Failed to retrieve the list of docker images, reason:\n\n%s", stderr)
            return (environments, db_environments)

        images = [ x for x in stdout.split('\n') if x.find('beat') >= 0 ]


        for image in images:
            # Call the "describe" application on each existing image
            description = _describe(image)
Philip ABBET's avatar
Philip ABBET committed
273
274
            if not description:
                continue
275

Philip ABBET's avatar
Philip ABBET committed
276
            key = description['name'] + ' (' + description['version'] + ')'
277

278
            if 'databases' in description:
279
                if (key in db_environments) and not _must_replace(image, db_environments, key):
Philip ABBET's avatar
Philip ABBET committed
280
                    continue
281

Philip ABBET's avatar
Philip ABBET committed
282
                db_environments[key] = description
283
                db_environments[key]['image'] = image
Philip ABBET's avatar
Philip ABBET committed
284
            else:
285
                if (key in environments) and not _must_replace(image, environments, key):
Philip ABBET's avatar
Philip ABBET committed
286
                    continue
287

Philip ABBET's avatar
Philip ABBET committed
288
                environments[key] = description
289
290
291
                environments[key]['image'] = image

            logger.info("Registered '%s' -> '%s'", key, image)
292
293


Philip ABBET's avatar
Philip ABBET committed
294
295
        logger.debug("Found %d environments and %d database environments", len(environments),
                     len(db_environments))
296

Philip ABBET's avatar
Philip ABBET committed
297
        return (environments, db_environments)
298
299


Philip ABBET's avatar
Philip ABBET committed
300
    def create_container(self, image, command):
301

Philip ABBET's avatar
Philip ABBET committed
302
303
        if image in self: # Replace by a real image name
            image = self.env2docker(image)
304

Philip ABBET's avatar
Philip ABBET committed
305
        return Container(image, command)
306

307

Philip ABBET's avatar
Philip ABBET committed
308
309
    def start(self, container, virtual_memory_in_megabytes=0, max_cpu_percent=0):
        """Starts the execution of a container
310

Philip ABBET's avatar
Philip ABBET committed
311
312
313
        The process will be run in the background, and its standard output and
        standard error will be read after it finishes, into a limited size circular
        buffer.
314

315

Philip ABBET's avatar
Philip ABBET committed
316
        Parameters:
317

Philip ABBET's avatar
Philip ABBET committed
318
          container (:py:class:`Container`): The container.
319

Philip ABBET's avatar
Philip ABBET committed
320
321
322
          virtual_memory_in_megabytes (int, Optional): The maximum amount of memory
            the user process can consume on the host. If not specified, a memory
            limit is not set.
323

Philip ABBET's avatar
Philip ABBET committed
324
325
326
327
          max_cpu_percent (float, Optional): The maximum amount of CPU the user
            process may consume on the host. The value ``100`` equals to using 100%
            of a single core. If not specified, then a CPU limitation is not put in
            place.
328

Philip ABBET's avatar
Philip ABBET committed
329
        """
330

331
332
333
334
335
        cmd = [
            'docker',
            'run',
            '-tid',
        ]
336

Philip ABBET's avatar
Philip ABBET committed
337
338
339
340
341
342
        if container.image in Host.images_cache:
            image_infos = Host.images_cache[container.image]
            if ('capabilities' in image_infos) and ('gpu' in image_infos['capabilities']):
                cmd[0] = 'nvidia-docker'


Philip ABBET's avatar
Philip ABBET committed
343
344
345
346
347
348
        if virtual_memory_in_megabytes:
            # For this to work properly, memory swap limitation has to be enabled on
            # the kernel. This typically goes by setting "cgroup_enable=memory" as a
            # boot parameter to kernels which are compiled with this support.
            # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
            logger.debug('Setting maximum memory to %dMB' % virtual_memory_in_megabytes)
349
350
            cmd.append('--memory=%dm' % virtual_memory_in_megabytes)
            cmd.append('--memory-swap=%dm' % virtual_memory_in_megabytes)
Philip ABBET's avatar
Philip ABBET committed
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370

        if max_cpu_percent:
            # The period corresponds to the scheduling interval for the CFS in Linux
            # The quota corresponds to a fraction or a multiple of the period, the
            # container will get. A quota that is 2x the period gets the container up
            # to 200% cpu time (2 cores). If the quota is 0.5x the period, the
            # container gets up to 50% the cpu time. Each core represents 100%. A
            # system with 2 cores has 200% computing power.
            #
            # More info:
            # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
            #
            # For this to work properly, CPU bandwidth provisioning for the Linux CFS
            # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
            #
            # If your system is running on a virtual machine, having more cores
            # available to docker engine normally translates to more precise
            # scheduling.
            period = 100000 #microseconds
            quota = max_cpu_percent / 100.0
371

Philip ABBET's avatar
Philip ABBET committed
372
373
            logger.debug('Setting CPU quota to %d%%' % max_cpu_percent)

374
375
376
            cmd.append('--cpu-period=%d' % period)
            cmd.append('--cpu-quota=%d' % int(quota * period))

Philip ABBET's avatar
Philip ABBET committed
377
        # Mount the volumes
378
379
380
381
382
383
384
385
386
387
388
389
390
391
        for k, v in container.volumes.items():
            cmd.append('--volume=%s:%s:%s' % (k, v['bind'], v['mode']))


        # Expose the ports
        for k, v in container.ports.items():
            cmd.append('-p')
            if isinstance(v, tuple):
                cmd.append('%s:%d:%d' % (v[0], v[1], k))
            else:
                cmd.append('%d:%d' % (v[0], k))

        cmd.append(container.image)
        cmd.extend(container.command)
Philip ABBET's avatar
Philip ABBET committed
392
393
394


        # Instantiate the container
395
396
        logger.debug("Creation and start of a container: image=%s, command=\"%s\"",
                     container.image, ' '.join(container.command))
397

398
        logger.debug("Docker command: %s", ' '.join(cmd))
399

400
401
402
403
        (status, stdout, stderr) = self._exec(cmd)

        if stdout != '':
            container.id = stdout.replace('\n', '')
Philip ABBET's avatar
Philip ABBET committed
404
            self.containers.append(container)
405

406
407
408
        if status != 0:
            message = "Failed to create the container, reason:\n\n%s" % stderr
            logger.error(message)
409

410
411
            if container.id is not None:
                self.rm(container)
412

413
            raise RuntimeError(message)
414

415
        logger.debug("Container ID: '%s'", container.id)
416
417


Philip ABBET's avatar
Philip ABBET committed
418
419
    def wait(self, container, timeout=None):
        '''Wait for the container to finish its job
420

Philip ABBET's avatar
Philip ABBET committed
421
        Parameters:
André Anjos's avatar
André Anjos committed
422

Philip ABBET's avatar
Philip ABBET committed
423
424
425
          timeout (float, Optional): A timeout in seconds to wait for the user
            process to finish. If a timeout value is not given, waits forever.
        '''
426
427
428
429
430
431
        (status, stdout, stderr) = self._exec(['docker', 'wait', container.id],
                                              timeout=timeout)
        if status != 0:
            return None

        return int(stdout)
André Anjos's avatar
André Anjos committed
432
433


Philip ABBET's avatar
Philip ABBET committed
434
435
    def status(self, container):
        '''Checks the status of a given container'''
436

437
        logger.debug("Inspect container %s", container.id)
André Anjos's avatar
André Anjos committed
438

439
440
441
        (status, stdout, stderr) = self._exec(['docker', 'inspect', container.id])
        if status != 0:
            return None
André Anjos's avatar
André Anjos committed
442

443
444
445
446
        try:
            return simplejson.loads(stdout)[0]['State']['Status']
        except:
            return None
André Anjos's avatar
André Anjos committed
447
448


Philip ABBET's avatar
Philip ABBET committed
449
450
    def logs(self, container):
        '''Returns the logs of a container'''
451
452
453
454
455
        (status, stdout, stderr) = self._exec(['docker', 'logs', container.id])
        if status != 0:
            return ''

        return stdout.replace('\r\n', '\n')
André Anjos's avatar
André Anjos committed
456
457


Philip ABBET's avatar
Philip ABBET committed
458
    def statistics(self, container):
459
460
        '''Returns the statistics about a container'''

461
        client = docker.APIClient()
Philip ABBET's avatar
Philip ABBET committed
462

463
        data = client.stats(container.id, decode=True, stream=False)
Philip ABBET's avatar
Philip ABBET committed
464
465

        # If CPU statistics can't be retrieved
466
        if 'system_cpu_usage' not in data['cpu_stats']:
Philip ABBET's avatar
Philip ABBET committed
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
            data['cpu_stats'] = dict(data['precpu_stats'])

        # If memory statistics can't be retrieved
        if len(data['memory_stats']) == 0:
            data['memory_stats'] = dict(
                limit=0,
                max_usage=0
            )

        previous_cpu = container._stats['cpu_stats'] if container._stats else None

        # merge statistics
        retval = dict(
            cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']),
            memory=stats.memory_statistics(data['memory_stats']),
        )

        container._stats = data

        return retval


    def rm(self, container):
        '''Removes a given container. If it is not done, kill it first'''

492
        if container.id is None:
Philip ABBET's avatar
Philip ABBET committed
493
494
495
496
497
            return

        status = self.status(container)

        if status not in ('created', 'exited'):
498
499
500
501
502
            logger.warn("Killing container '%s' which is on state '%s'", container.id, status)
            self._exec(['docker', 'container', 'stop', container.id])

        logger.debug("Remove container %s", container.id)
        (status, stdout, stderr) = self._exec(['docker', 'rm', container.id])
Philip ABBET's avatar
Philip ABBET committed
503
504
505

        self.containers.remove(container)

506
        container.id = None
Philip ABBET's avatar
Philip ABBET committed
507
508
509
510
511
512


    def kill(self, container):
        '''Stop a container'''

        if self.status(container) == 'running':
513
            self._exec(['docker', 'kill', container.id])
Philip ABBET's avatar
Philip ABBET committed
514
515
516
517
518
519
520
521
522
523
524


    def run(self, image, command):
        '''Runs a command and retrieves its status and output'''

        container = None
        try:
            container = self.create_container(image, command)
            self.start(container)
            status = self.wait(container)
            output = self.logs(container)
André Anjos's avatar
André Anjos committed
525

526
        except Exception as e:
Philip ABBET's avatar
Philip ABBET committed
527
            return 1, None
André Anjos's avatar
André Anjos committed
528

Philip ABBET's avatar
Philip ABBET committed
529
530
531
        finally:
            if container is not None:
                self.rm(container)
André Anjos's avatar
André Anjos committed
532

Philip ABBET's avatar
Philip ABBET committed
533
        return status, output
André Anjos's avatar
André Anjos committed
534
535


536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
    def _exec(self, command, timeout=None):
        process_stdout = tempfile.NamedTemporaryFile()
        process_stderr = tempfile.NamedTemporaryFile()

        def _read_streams():
            with open(process_stdout.name, 'r') as f:
                stdout = f.read()

            with open(process_stderr.name, 'r') as f:
                stderr = f.read()

            return (stdout, stderr)

        process = sp.Popen(command, stdout=process_stdout, stderr=process_stderr)

        if timeout is None:
            process.communicate()
        else:
            start = time.time()
            while process.poll() is None:
                time.sleep(0.1)
                if time.time() - start >= timeout:
                    process.kill()
                    (stdout, stderr) = _read_streams()
                    return (None, stdout, stderr)

        (stdout, stderr) = _read_streams()

        return (process.returncode, stdout, stderr)


567
#----------------------------------------------------------
568
569


570
class Container:
André Anjos's avatar
André Anjos committed
571

Philip ABBET's avatar
Philip ABBET committed
572
573
574
575
    def __init__(self, image, command):
        self.image = image
        self.command = command
        self.volumes = {}
576
577
        self.ports = {}
        self.id = None
Philip ABBET's avatar
Philip ABBET committed
578
        self._stats = None
579
580


Philip ABBET's avatar
Philip ABBET committed
581
582
583
584
585
    def add_volume(self, path, mount_path, read_only=True):
        self.volumes[path] = {
          'bind': mount_path,
          'mode': 'ro' if read_only else 'rw',
        }
586
587


588
589
590
591
592
    def add_port(self, container_port, host_port, host_address=None):
        if host_address is not None:
            value = (host_address, host_port)
        else:
            value = [host_port]
593

594
        self.ports[container_port] = value
595
596


597
598
    def reset_ports(self):
        self.ports = {}
599
600


601
602
603
    @property
    def command_line(self):
        cmd = "docker run -ti --rm=true "
604

605
606
        for k, v in self.volumes.items():
            cmd += "--volume %s:%s:%s " % (k, v['bind'], v['mode'])
607

608
609
610
611
612
        for k, v in self.ports.items():
            if isinstance(v, tuple):
                cmd += "-p %s:%d:%d " % (v[0], v[1], k)
            else:
                cmd += "-p %d:%d " % (v[0], k)
613

614
        cmd += "%s " % self.image
615

616
        cmd += '"%s"' % ' '.join(self.command)
617

618
        return cmd