dock.py 32.7 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

André Anjos's avatar
André Anjos committed
36

37
38
39
40
41
42
43
"""
====
dock
====

Docker helper classes
"""
44
import ast
Samuel GAIST's avatar
Samuel GAIST committed
45
import logging
André Anjos's avatar
André Anjos committed
46
import os
47
import socket
Samuel GAIST's avatar
Samuel GAIST committed
48
import subprocess as sp  # nosec
49
50
import tempfile
import time
André Anjos's avatar
André Anjos committed
51

Samuel GAIST's avatar
Samuel GAIST committed
52
53
import docker
import simplejson as json
Samuel GAIST's avatar
Samuel GAIST committed
54

55
56
from packaging import version

57
58
from beat.core import stats

59
60
from .utils import build_env_name

61
logger = logging.getLogger(__name__)
André Anjos's avatar
André Anjos committed
62
63


64
class Host(object):
Samuel GAIST's avatar
Samuel GAIST committed
65
    """An object of this class can connect to the docker host and resolve stuff"""
66

Philip ABBET's avatar
Philip ABBET committed
67
    images_cache = {}
68

69
    def __init__(self, images_cache=None, raise_on_errors=True, discover=True):
André Anjos's avatar
André Anjos committed
70

Philip ABBET's avatar
Philip ABBET committed
71
        # Initialisations
72
        self.raise_on_errors = raise_on_errors
Philip ABBET's avatar
Philip ABBET committed
73
74
75
76
77
        self.images_cache_filename = images_cache
        self.base_url = None
        self.containers = []
        self.processing_environments = {}
        self.db_environments = {}
78

Philip ABBET's avatar
Philip ABBET committed
79
        # (If necessary) Load the known infos about the images
Samuel GAIST's avatar
Samuel GAIST committed
80
81
82
83
        if (self.images_cache_filename is not None) and os.path.exists(
            self.images_cache_filename
        ):
            with open(self.images_cache_filename, "r") as f:
84
                Host.images_cache = json.load(f)
85

Philip ABBET's avatar
Philip ABBET committed
86
        # Discover the environments
87
        if discover:
Samuel GAIST's avatar
Samuel GAIST committed
88
89
90
            (
                self.processing_environments,
                self.db_environments,
91
92
93
            ) = self._discover_environments_using_labels()

            if not self.db_environments and not self.processing_environments:
94
95
96
97
                (
                    self.processing_environments,
                    self.db_environments,
                ) = self._discover_environments_using_describe()
André Anjos's avatar
André Anjos committed
98

Philip ABBET's avatar
Philip ABBET committed
99
100
        # (If necessary) Save the known infos about the images
        if self.images_cache_filename is not None:
Samuel GAIST's avatar
Samuel GAIST committed
101
            with open(self.images_cache_filename, "w") as f:
102
                json.dump(Host.images_cache, f, indent=4)
103

Philip ABBET's avatar
Philip ABBET committed
104
105
    def __contains__(self, key):
        return (key in self.processing_environments) or (key in self.db_environments)
André Anjos's avatar
André Anjos committed
106

Philip ABBET's avatar
Philip ABBET committed
107
    def __str__(self):
Samuel GAIST's avatar
Samuel GAIST committed
108
        s = "Docker host"
109

Philip ABBET's avatar
Philip ABBET committed
110
        if self.base_url is not None:
Samuel GAIST's avatar
Samuel GAIST committed
111
            s += " (%s)" % self.base_url
112

Philip ABBET's avatar
Philip ABBET committed
113
        return s
André Anjos's avatar
André Anjos committed
114

Philip ABBET's avatar
Philip ABBET committed
115
    def env2docker(self, key):
116
        """Returns a nice docker image name given a BEAT environment key"""
André Anjos's avatar
André Anjos committed
117

Philip ABBET's avatar
Philip ABBET committed
118
        attrs = self.processing_environments[key]
Samuel GAIST's avatar
Samuel GAIST committed
119
        return attrs["image"]
120

Philip ABBET's avatar
Philip ABBET committed
121
    def db2docker(self, db_names):
122
        """Returns a nice docker image name given a database name"""
123

Philip ABBET's avatar
Philip ABBET committed
124
        def _all_in(db_names, databases):
Samuel GAIST's avatar
Samuel GAIST committed
125
            return len([x for x in db_names if x in databases]) == len(db_names)
126

Samuel GAIST's avatar
Samuel GAIST committed
127
128
129
130
131
        attrs = [
            x
            for x in self.db_environments.values()
            if _all_in(db_names, x["databases"])
        ][0]
132

Samuel GAIST's avatar
Samuel GAIST committed
133
        return attrs["image"]
134

135
136
137
138
139
140
    def dbenv2docker(self, key):
        """Returns a nice docker image name given a BEAT database environment key"""

        attrs = self.db_environments[key]
        return attrs["image"]

Philip ABBET's avatar
Philip ABBET committed
141
142
143
    def teardown(self):
        for container in self.containers:
            self.rm(container)
144

Philip ABBET's avatar
Philip ABBET committed
145
        self.containers = []
André Anjos's avatar
André Anjos committed
146

Philip ABBET's avatar
Philip ABBET committed
147
148
    def __exit__(self, *exc):
        self.teardown()
André Anjos's avatar
André Anjos committed
149

Philip ABBET's avatar
Philip ABBET committed
150
151
    def full_environment_name(self, name):
        try:
Samuel GAIST's avatar
Samuel GAIST committed
152
153
154
155
156
157
            return list(
                filter(
                    lambda x: x.startswith(name + " ("),
                    self.processing_environments.keys(),
                )
            )[0]
Samuel GAIST's avatar
Samuel GAIST committed
158
        except IndexError:
Philip ABBET's avatar
Philip ABBET committed
159
            try:
Samuel GAIST's avatar
Samuel GAIST committed
160
161
162
163
164
                return list(
                    filter(
                        lambda x: x.startswith(name + " ("), self.db_environments.keys()
                    )
                )[0]
Samuel GAIST's avatar
Samuel GAIST committed
165
            except IndexError:
Philip ABBET's avatar
Philip ABBET committed
166
                return None
167

Philip ABBET's avatar
Philip ABBET committed
168
169
    @property
    def ip(self):
170
        """The IP address of the docker host"""
Philip ABBET's avatar
Philip ABBET committed
171
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Samuel GAIST's avatar
Samuel GAIST committed
172
        s.connect(("8.8.8.8", 1))  # connecting to a UDP address doesn't send packets
Philip ABBET's avatar
Philip ABBET committed
173
        return s.getsockname()[0]
174

175
    def _discover_environments_using_describe(self):
176
        """Returns a dictionary containing information about docker environments
177

Philip ABBET's avatar
Philip ABBET committed
178
        Raises:
179

180
181
182
          RuntimeError: if you set ``raise_on_errors`` in the constructor and I
            found environments that override each other for their description
            keys (``<name>(<version>)``), which should be unique.
183

184
        """
185

Philip ABBET's avatar
Philip ABBET committed
186
        def _describe(image):
187
            """Tries to run the "describe" app on the image, collect results"""
188

189
            if image in Host.images_cache:
Philip ABBET's avatar
Philip ABBET committed
190
                return Host.images_cache[image]
191

Samuel GAIST's avatar
Samuel GAIST committed
192
            cmd = ["docker", "run", "--rm=true", image, "describe"]
193
194
195

            (status, stdout, stderr) = self._exec(cmd)

Philip ABBET's avatar
Philip ABBET committed
196
197
            if status == 0:
                try:
198
                    infos = json.loads(stdout)
Philip ABBET's avatar
Philip ABBET committed
199
200
201
                    Host.images_cache[image] = infos
                    return infos
                except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
202
                    logger.warning(
Samuel GAIST's avatar
Samuel GAIST committed
203
204
205
206
207
                        "Ignoring potential environment at `%s' since "
                        "`describe' output cannot be parsed: %s",
                        image,
                        str(e),
                    )
208
            else:
Samuel GAIST's avatar
Samuel GAIST committed
209
                logger.warning(
Samuel GAIST's avatar
Samuel GAIST committed
210
211
212
213
                    "Execution failed with status {}: \n"
                    "stdout: '{}'\n"
                    "stderr: '{}'".format(status, stdout, stderr)
                )
Philip ABBET's avatar
Philip ABBET committed
214
            return {}
215

216
        def _must_replace(image, environments, key):
217
218
            # this check avoids we do a new environment and, by mistake,
            # override it with a previous version or the contrary.
219
            if self.raise_on_errors:
Samuel GAIST's avatar
Samuel GAIST committed
220
221
222
223
224
225
                raise RuntimeError(
                    "Environments at '%s' and '%s' have the "
                    "same name ('%s'). Distinct environments must be "
                    "uniquely named. Fix this and re-start."
                    % (image, environments[key]["image"], key)
                )
Philip ABBET's avatar
Philip ABBET committed
226
227
228
229

            new_version = None
            previous_version = None

Samuel GAIST's avatar
Samuel GAIST committed
230
            parts = image.split("/")
Philip ABBET's avatar
Philip ABBET committed
231
            if len(parts) > 1:
Samuel GAIST's avatar
Samuel GAIST committed
232
                parts = parts[-1].split(":")
Philip ABBET's avatar
Philip ABBET committed
233
234
235
                if len(parts) > 1:
                    new_version = parts[-1]

Samuel GAIST's avatar
Samuel GAIST committed
236
            parts = environments[key]["image"].split("/")
Philip ABBET's avatar
Philip ABBET committed
237
            if len(parts) > 1:
Samuel GAIST's avatar
Samuel GAIST committed
238
                parts = parts[-1].split(":")
Philip ABBET's avatar
Philip ABBET committed
239
240
241
242
243
244
245
                if len(parts) > 1:
                    previous_version = parts[-1]

            replacement = False
            keep = False

            if (new_version is not None) and (previous_version is not None):
Samuel GAIST's avatar
Samuel GAIST committed
246
                if new_version == "latest":
Philip ABBET's avatar
Philip ABBET committed
247
                    replacement = True
Samuel GAIST's avatar
Samuel GAIST committed
248
                elif previous_version == "latest":
Philip ABBET's avatar
Philip ABBET committed
249
                    keep = True
250
                else:
Philip ABBET's avatar
Philip ABBET committed
251
                    try:
Samuel GAIST's avatar
Samuel GAIST committed
252
                        new_version = tuple([int(x) for x in new_version.split(".")])
Philip ABBET's avatar
Philip ABBET committed
253
254

                        try:
Samuel GAIST's avatar
Samuel GAIST committed
255
256
257
                            previous_version = tuple(
                                [int(x) for x in previous_version.split(".")]
                            )
Philip ABBET's avatar
Philip ABBET committed
258
259
260
261
262

                            if new_version > previous_version:
                                replacement = True
                            else:
                                keep = True
Samuel GAIST's avatar
Samuel GAIST committed
263
                        except Exception:
Philip ABBET's avatar
Philip ABBET committed
264
265
                            replacement = True

Samuel GAIST's avatar
Samuel GAIST committed
266
                    except Exception:
Philip ABBET's avatar
Philip ABBET committed
267
268
269
                        keep = True

            elif new_version is not None:
270
                replacement = True
Philip ABBET's avatar
Philip ABBET committed
271
272
            elif previous_version is not None:
                keep = True
273

Philip ABBET's avatar
Philip ABBET committed
274
            if replacement:
Samuel GAIST's avatar
Samuel GAIST committed
275
276
277
278
279
                logger.debug(
                    "Overriding **existing** environment '%s' in image '%s'",
                    key,
                    environments[key]["image"],
                )
Philip ABBET's avatar
Philip ABBET committed
280
            elif keep:
Samuel GAIST's avatar
Samuel GAIST committed
281
282
283
284
285
                logger.debug(
                    "Environment '%s' already existing in image '%s', we'll keep it",
                    key,
                    environments[key]["image"],
                )
Philip ABBET's avatar
Philip ABBET committed
286
287
                return False
            else:
Samuel GAIST's avatar
Samuel GAIST committed
288
                logger.warning(
Samuel GAIST's avatar
Samuel GAIST committed
289
290
291
292
293
294
295
                    "Overriding **existing** environment '%s' image "
                    "with '%s'. To avoid this warning make "
                    "sure your docker images do not contain environments "
                    "with the same names",
                    key,
                    environments[key]["image"],
                )
Philip ABBET's avatar
Philip ABBET committed
296
297

            return True
298

Philip ABBET's avatar
Philip ABBET committed
299
300
        environments = {}
        db_environments = {}
301

Samuel GAIST's avatar
Samuel GAIST committed
302
        cmd = ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"]
303
304
305
306

        (status, stdout, stderr) = self._exec(cmd)

        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
307
308
309
            logger.error(
                "Failed to retrieve the list of docker images, reason:\n\n%s", stderr
            )
310
311
            return (environments, db_environments)

Samuel GAIST's avatar
Samuel GAIST committed
312
        images = [x for x in stdout.split("\n") if x.find("beat.env.") >= 0]
313
314
315
316

        for image in images:
            # Call the "describe" application on each existing image
            description = _describe(image)
317

Philip ABBET's avatar
Philip ABBET committed
318
            if not description:
319
                logger.debug("Description not found for", image)
Philip ABBET's avatar
Philip ABBET committed
320
                continue
321

322
            key = build_env_name(description)
323

Samuel GAIST's avatar
Samuel GAIST committed
324
325
326
327
            if "databases" in description:
                if (key in db_environments) and not _must_replace(
                    image, db_environments, key
                ):
Philip ABBET's avatar
Philip ABBET committed
328
                    continue
329

Philip ABBET's avatar
Philip ABBET committed
330
                db_environments[key] = description
Samuel GAIST's avatar
Samuel GAIST committed
331
                db_environments[key]["image"] = image
Philip ABBET's avatar
Philip ABBET committed
332
            else:
Samuel GAIST's avatar
Samuel GAIST committed
333
334
335
                if (key in environments) and not _must_replace(
                    image, environments, key
                ):
Philip ABBET's avatar
Philip ABBET committed
336
                    continue
337

Philip ABBET's avatar
Philip ABBET committed
338
                environments[key] = description
Samuel GAIST's avatar
Samuel GAIST committed
339
                environments[key]["image"] = image
340
341

            logger.info("Registered '%s' -> '%s'", key, image)
342

Samuel GAIST's avatar
Samuel GAIST committed
343
344
345
346
347
        logger.debug(
            "Found %d environments and %d database environments",
            len(environments),
            len(db_environments),
        )
348

Philip ABBET's avatar
Philip ABBET committed
349
        return (environments, db_environments)
350

351
352
353
354
355
356
    def _discover_environments_using_labels(self):
        """Search BEAT runtime environments using docker labels"""

        def _must_replace(key, image, environments):
            environment = environments[key]
            if environment["image"] not in image.tags:
Samuel GAIST's avatar
Samuel GAIST committed
357
                logger.warning(
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
                    "Different images providing the same environment: {} VS {}".format(
                        environment["image"], image.tags
                    )
                )
                if self.raise_on_errors:
                    raise RuntimeError(
                        "Environments at '%s' and '%s' have the "
                        "same name ('%s'). Distinct environments must be "
                        "uniquely named. Fix this and re-start."
                        % (image.tags[0], environments[key]["image"], key)
                    )
                else:
                    logger.debug("Keeping more recent")

            current_version = "{}{}".format(
                environment["version"], environment["revision"]
            )
            new_version = "{}{}".format(
                image.labels["beat.env.version"], image.labels["beat.env.revision"]
            )
            current_version = version.parse(current_version)
            new_version = version.parse(new_version)
            return new_version > current_version

        def _parse_image_info(image):
            labels = image.labels
            data = {
                "image": image.tags[0],
                "name": labels["beat.env.name"],
                "version": labels["beat.env.version"],
                "revision": labels["beat.env.revision"],
            }

            database_list = labels.get("beat.env.databases")
            if database_list:
                data["databases"] = ast.literal_eval(database_list)

            capabilities = labels.get("beat.env.capabilities")
            if capabilities:
                data["capabilities"] = ast.literal_eval(capabilities)

            return data

        def _process_image_list(image_list):
            environments = {}

            for image in image_list:
                if not len(image.tags):
Samuel GAIST's avatar
Samuel GAIST committed
406
                    logger.warning("Untagged image, skipping")
407
408
409
                    continue

                image_info = _parse_image_info(image)
410
                key = build_env_name(image_info)
411
                image_name = image_info["image"]
412
413
414
415

                if key in environments:
                    if _must_replace(key, image, environments):
                        environments[key] = image_info
416
                        logger.info("Updated '%s' -> '%s'", key, image_name)
417
418
                else:
                    environments[key] = image_info
419
420
421
                    Host.images_cache[image_name] = environments[key]
                    logger.info("Registered '%s' -> '%s'", key, image_name)

422
423
424
425
            return environments

        client = docker.from_env()

426
427
428
429
430
431
432
433
434
435
436
437
        try:
            databases = client.images.list(
                filters={"label": ["beat.env.type=database"]}
            )
        except Exception as e:
            if self.raise_on_errors:
                raise
            else:
                logger.error("Docker error: {}".format(e))
                return {}, {}
        else:
            db_environments = _process_image_list(databases)
438

439
440
441
442
443
444
445
446
447
448
449
450
        try:
            executors = client.images.list(
                filters={"label": ["beat.env.type=execution"]}
            )
        except Exception as e:
            if self.raise_on_errors:
                raise
            else:
                logger.error("Docker error: {}".format(e))
                return {}, {}
        else:
            environments = _process_image_list(executors)
451

452
453
454
455
456
457
        logger.debug(
            "Found %d environments and %d database environments",
            len(environments),
            len(db_environments),
        )

458
459
        return environments, db_environments

Philip ABBET's avatar
Philip ABBET committed
460
    def create_container(self, image, command):
461

462
        if image in self:  # Replace by a real image name
Philip ABBET's avatar
Philip ABBET committed
463
            image = self.env2docker(image)
464

Philip ABBET's avatar
Philip ABBET committed
465
        return Container(image, command)
466

Philip ABBET's avatar
Philip ABBET committed
467
468
    def start(self, container, virtual_memory_in_megabytes=0, max_cpu_percent=0):
        """Starts the execution of a container
469

Philip ABBET's avatar
Philip ABBET committed
470
        The process will be run in the background, and its standard output and
471
472
        standard error will be read after it finishes, into a limited size
        circular buffer.
473

474

Philip ABBET's avatar
Philip ABBET committed
475
        Parameters:
476

Philip ABBET's avatar
Philip ABBET committed
477
          container (:py:class:`Container`): The container.
478

André Anjos's avatar
André Anjos committed
479
480
481
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The maximum
            amount of memory the user process can consume on the host. If not
            specified, a memory limit is not set.
482

André Anjos's avatar
André Anjos committed
483
484
485
486
          max_cpu_percent (:py:class:`float`, Optional): The maximum amount of
            CPU the user process may consume on the host. The value ``100``
            equals to using 100% of a single core. If not specified, then a CPU
            limitation is not put in place.
Philip ABBET's avatar
Philip ABBET committed
487
        """
488

489
        cmd = ["docker", "run", "--tty", "--interactive", "--detach", "--read-only"]
490

491
492
493
        network = container.network
        if network:
            cmd.append(network)
494
495
496
497
498

        user = container.user
        if user:
            cmd.append(user)

499
500
501
502
        name = container.name
        if name:
            cmd.append(name)

503
504
505
506
        workdir = container.workdir
        if workdir:
            cmd.append(workdir)

507
508
509
510
        entrypoint = container.entrypoint
        if entrypoint:
            cmd.append(entrypoint)

Philip ABBET's avatar
Philip ABBET committed
511
512
        if container.image in Host.images_cache:
            image_infos = Host.images_cache[container.image]
Samuel GAIST's avatar
Samuel GAIST committed
513
514
515
            if ("capabilities" in image_infos) and (
                "gpu" in image_infos["capabilities"]
            ):
516
517
                if os.path.exists("/proc/driver/nvidia"):
                    cmd.append("--gpus=all")
Philip ABBET's avatar
Philip ABBET committed
518

Philip ABBET's avatar
Philip ABBET committed
519
        if virtual_memory_in_megabytes:
520
            # For this to work properly, memory swap limitation has to be
Samuel GAIST's avatar
Samuel GAIST committed
521
            #  enabled on the kernel. This typically goes by setting
522
523
            # "cgroup_enable=memory" as a boot parameter to kernels which are
            # compiled with this support.
Philip ABBET's avatar
Philip ABBET committed
524
            # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
Samuel GAIST's avatar
Samuel GAIST committed
525
526
527
            logger.debug("Setting maximum memory to %dMB" % virtual_memory_in_megabytes)
            cmd.append("--memory=%dm" % virtual_memory_in_megabytes)
            cmd.append("--memory-swap=%dm" % virtual_memory_in_megabytes)
Philip ABBET's avatar
Philip ABBET committed
528
529

        if max_cpu_percent:
530
531
            # The period corresponds to the scheduling interval for the CFS in
            # Linux. The quota corresponds to a fraction or a multiple of the
Samuel GAIST's avatar
Samuel GAIST committed
532
            #  period, the container will get. A quota that is 2x the period
533
534
535
536
            # gets the container up to 200% cpu time (2 cores). If the quota is
            # 0.5x the period, the container gets up to 50% the cpu time. Each
            # core represents 100%. A system with 2 cores has 200% computing
            # power.
Philip ABBET's avatar
Philip ABBET committed
537
538
539
540
            #
            # More info:
            # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
            #
541
542
543
            # For this to work properly, CPU bandwidth provisioning for the
            # Linux CFS must be enabled on the kernel. More info on how to do
            # it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
Philip ABBET's avatar
Philip ABBET committed
544
545
546
547
            #
            # If your system is running on a virtual machine, having more cores
            # available to docker engine normally translates to more precise
            # scheduling.
548
            period = 100000  # microseconds
Philip ABBET's avatar
Philip ABBET committed
549
            quota = max_cpu_percent / 100.0
550

Samuel GAIST's avatar
Samuel GAIST committed
551
            logger.debug("Setting CPU quota to %d%%" % max_cpu_percent)
Philip ABBET's avatar
Philip ABBET committed
552

Samuel GAIST's avatar
Samuel GAIST committed
553
554
            cmd.append("--cpu-period=%d" % period)
            cmd.append("--cpu-quota=%d" % int(quota * period))
555

Philip ABBET's avatar
Philip ABBET committed
556
        # Mount the volumes
557
        cmd.extend(container.volumes)
558

559
560
561
        # Add tmpfs entries
        cmd.extend(container.temporary_filesystems)

562
        # Expose the ports
563
        cmd.extend(container.ports)
564

565
566
567
        # Environment variables
        cmd.extend(container.environment_variables)

568
569
        cmd.append(container.image)
        cmd.extend(container.command)
Philip ABBET's avatar
Philip ABBET committed
570
571

        # Instantiate the container
Samuel GAIST's avatar
Samuel GAIST committed
572
573
574
575
576
        logger.debug(
            'Creation and start of a container: image=%s, command="%s"',
            container.image,
            " ".join(container.command),
        )
577

Samuel GAIST's avatar
Samuel GAIST committed
578
        logger.debug("Docker command: %s", " ".join(cmd))
579

580
581
        (status, stdout, stderr) = self._exec(cmd)

Samuel GAIST's avatar
Samuel GAIST committed
582
583
        if stdout != "":
            container.id = stdout.replace("\n", "")
Philip ABBET's avatar
Philip ABBET committed
584
            self.containers.append(container)
585

586
587
588
        if status != 0:
            message = "Failed to create the container, reason:\n\n%s" % stderr
            logger.error(message)
589

590
591
            if container.id is not None:
                self.rm(container)
592

593
            raise RuntimeError(message)
594

595
        logger.debug("Container ID: '%s'", container.id)
596

Philip ABBET's avatar
Philip ABBET committed
597
    def wait(self, container, timeout=None):
598
        """Wait for the container to finish its job
599

Philip ABBET's avatar
Philip ABBET committed
600
        Parameters:
André Anjos's avatar
André Anjos committed
601

André Anjos's avatar
André Anjos committed
602
603
604
          timeout (:py:class:`float`, Optional): A timeout in seconds to wait
            for the user process to finish. If a timeout value is not given,
            waits forever.
605
606
        """

Samuel GAIST's avatar
Samuel GAIST committed
607
608
609
        (status, stdout, stderr) = self._exec(
            ["docker", "wait", container.id], timeout=timeout
        )
610
611
612
613
        if status != 0:
            return None

        return int(stdout)
André Anjos's avatar
André Anjos committed
614

Philip ABBET's avatar
Philip ABBET committed
615
    def status(self, container):
616
        """Checks the status of a given container"""
617

618
        logger.debug("Inspect container %s", container.id)
André Anjos's avatar
André Anjos committed
619

Samuel GAIST's avatar
Samuel GAIST committed
620
        (status, stdout, stderr) = self._exec(["docker", "inspect", container.id])
621
622
        if status != 0:
            return None
André Anjos's avatar
André Anjos committed
623

624
        try:
625
            return json.loads(stdout)[0]["State"]["Status"]
Samuel GAIST's avatar
Samuel GAIST committed
626
        except Exception:
627
            return None
André Anjos's avatar
André Anjos committed
628

Philip ABBET's avatar
Philip ABBET committed
629
    def logs(self, container):
630
631
        """Returns the logs of a container"""

Samuel GAIST's avatar
Samuel GAIST committed
632
        (status, stdout, stderr) = self._exec(["docker", "logs", container.id])
633
        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
634
            return ""
André Anjos's avatar
André Anjos committed
635

Samuel GAIST's avatar
Samuel GAIST committed
636
        return stdout.replace("\r\n", "\n")
André Anjos's avatar
André Anjos committed
637

Philip ABBET's avatar
Philip ABBET committed
638
    def statistics(self, container):
639
        """Returns the statistics about a container"""
640

641
        client = docker.APIClient()
Philip ABBET's avatar
Philip ABBET committed
642

643
        data = client.stats(container.id, stream=False)
Philip ABBET's avatar
Philip ABBET committed
644
645

        # If CPU statistics can't be retrieved
Samuel GAIST's avatar
Samuel GAIST committed
646
647
        if "system_cpu_usage" not in data["cpu_stats"]:
            data["cpu_stats"] = dict(data["precpu_stats"])
Philip ABBET's avatar
Philip ABBET committed
648
649

        # If memory statistics can't be retrieved
Samuel GAIST's avatar
Samuel GAIST committed
650
651
        if len(data["memory_stats"]) == 0:
            data["memory_stats"] = dict(limit=0, max_usage=0)
Philip ABBET's avatar
Philip ABBET committed
652

Samuel GAIST's avatar
Samuel GAIST committed
653
        previous_cpu = container._stats["cpu_stats"] if container._stats else None
Philip ABBET's avatar
Philip ABBET committed
654
655
656

        # merge statistics
        retval = dict(
Samuel GAIST's avatar
Samuel GAIST committed
657
658
            cpu=stats.cpu_statistics(previous_cpu, data["cpu_stats"]),
            memory=stats.memory_statistics(data["memory_stats"]),
Philip ABBET's avatar
Philip ABBET committed
659
660
661
662
663
664
665
        )

        container._stats = data

        return retval

    def rm(self, container):
666
        """Removes a given container. If it is not done, kill it first"""
Philip ABBET's avatar
Philip ABBET committed
667

668
        if container.id is None:
Philip ABBET's avatar
Philip ABBET committed
669
670
671
672
            return

        status = self.status(container)

Samuel GAIST's avatar
Samuel GAIST committed
673
        if status not in ("created", "exited"):
Samuel GAIST's avatar
Samuel GAIST committed
674
            logger.warning(
Samuel GAIST's avatar
Samuel GAIST committed
675
676
677
                "Killing container '%s' which is on state '%s'", container.id, status
            )
            self._exec(["docker", "container", "stop", container.id])
678
679

        logger.debug("Remove container %s", container.id)
Samuel GAIST's avatar
Samuel GAIST committed
680
        (status, stdout, stderr) = self._exec(["docker", "rm", container.id])
Philip ABBET's avatar
Philip ABBET committed
681
682
683

        self.containers.remove(container)

684
        container.id = None
Philip ABBET's avatar
Philip ABBET committed
685
686

    def kill(self, container):
687
        """Stop a container"""
Philip ABBET's avatar
Philip ABBET committed
688

Samuel GAIST's avatar
Samuel GAIST committed
689
690
        if self.status(container) == "running":
            self._exec(["docker", "kill", container.id])
Philip ABBET's avatar
Philip ABBET committed
691
692

    def run(self, image, command):
693
        """Runs a command and retrieves its status and output"""
Philip ABBET's avatar
Philip ABBET committed
694
695
696
697
698
699
700

        container = None
        try:
            container = self.create_container(image, command)
            self.start(container)
            status = self.wait(container)
            output = self.logs(container)
André Anjos's avatar
André Anjos committed
701

Samuel GAIST's avatar
Samuel GAIST committed
702
        except Exception:
Philip ABBET's avatar
Philip ABBET committed
703
            return 1, None
André Anjos's avatar
André Anjos committed
704

Philip ABBET's avatar
Philip ABBET committed
705
706
707
        finally:
            if container is not None:
                self.rm(container)
André Anjos's avatar
André Anjos committed
708

Philip ABBET's avatar
Philip ABBET committed
709
        return status, output
André Anjos's avatar
André Anjos committed
710

711
712
713
714
    def get_ipaddress(self, container):
        """ Returns the ip address of the given container"""

        cmd = [
Samuel GAIST's avatar
Samuel GAIST committed
715
716
717
718
719
            "docker",
            "inspect",
            "--format",
            "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
            container.id,
720
721
722
723
724
        ]

        (status, stdout, stderr) = self._exec(cmd)

        if status != 0:
Samuel GAIST's avatar
Samuel GAIST committed
725
726
727
728
            logger.error(
                "Failed to retrieve the ip address of the container, reason:\n\n%s",
                stderr,
            )
729
730
            return None

Samuel GAIST's avatar
Samuel GAIST committed
731
        return stdout.replace("\n", "")
732

733
734
735
736
737
    def _exec(self, command, timeout=None):
        process_stdout = tempfile.NamedTemporaryFile()
        process_stderr = tempfile.NamedTemporaryFile()

        def _read_streams():
Samuel GAIST's avatar
Samuel GAIST committed
738
            with open(process_stdout.name, "r") as f:
739
740
                stdout = f.read()

Samuel GAIST's avatar
Samuel GAIST committed
741
            with open(process_stderr.name, "r") as f:
742
743
744
745
                stderr = f.read()

            return (stdout, stderr)

746
747
        try:
            process = sp.Popen(command, stdout=process_stdout, stderr=process_stderr)
748
        except IOError as e:
749
750
751
752
753
            if self.raise_on_errors:
                raise

            logger.error("Docker can not be found")
            return (-1, None, e)
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770

        if timeout is None:
            process.communicate()
        else:
            start = time.time()
            while process.poll() is None:
                time.sleep(0.1)
                if time.time() - start >= timeout:
                    process.kill()
                    (stdout, stderr) = _read_streams()
                    return (None, stdout, stderr)

        (stdout, stderr) = _read_streams()

        return (process.returncode, stdout, stderr)


771
# ----------------------------------------------------------
772
773


774
class Container:
775
776
777
778
779
780
781
    """This class represents a Docker container with its set of parameters

    Parameters:
        :param str image: Name of the image to use to build the container

        :param str command: Command to execute in the container.
    """
André Anjos's avatar
André Anjos committed
782

Philip ABBET's avatar
Philip ABBET committed
783
784
785
    def __init__(self, image, command):
        self.image = image
        self.command = command
786
        self.network_name = None
787
        self.uid = None
788
        self.id = None
789
790
        self._volumes = {}
        self._ports = {}
791
        self._environment_variables = {}
Philip ABBET's avatar
Philip ABBET committed
792
        self._stats = None
793
        self._name = None
794
        self._workdir = None
795
        self._entrypoint = None
796
        self._temporary_filesystems = {"/tmp": "500k", "/run": "500k"}  # nosec
797

798
799
800
801
802
803
804
805
806
807
808
        client = docker.from_env()
        docker_image = client.images.get(image)
        custom_tmpfs = docker_image.labels.get("beat.env.custom_tmpfs")

        if custom_tmpfs is not None:
            # import ipdb; ipdb.set_trace()
            custom_tmpfs = json.loads(custom_tmpfs)

            for path, size in custom_tmpfs.items():
                self._temporary_filesystems[path] = size

809
    def set_name(self, name):
Samuel GAIST's avatar
Samuel GAIST committed
810
        """Set the name to be used by the container in place of the docker
811
812
813
        auto generated one.
        """
        self._name = name
814

815
    def set_workdir(self, workdir):
Samuel GAIST's avatar
Samuel GAIST committed
816
        """Set the work folder to be used by the container"""
817
818
        self._workdir = workdir

819
    def set_entrypoint(self, entrypoint):
Samuel GAIST's avatar
Samuel GAIST committed
820
        """Set the entry point to be used by the container"""
821
822
        self._entrypoint = entrypoint

Philip ABBET's avatar
Philip ABBET committed
823
    def add_volume(self, path, mount_path, read_only=True):
824
825
826
827
828
829
830
831
832
833
        """Add a volume to be mounted on the container

        Parameters:
            :param str path: Source path of the volume on disk

            :param str mount_path: Path of the volume in the container

            :param boolean read_only: Whether the volume will be read only
        """

Samuel GAIST's avatar
Samuel GAIST committed
834
        self._volumes[path] = {"bind": mount_path, "mode": "ro" if read_only else "rw"}
835

836
837
838
839
840
841
842
843
844
845
    def add_tmpfs(self, path, size):
        """Add a tmpfs to be mounted on the container

        Parameters:
            :param str path: Target path for the tmpfs
            :param str size: Size of the tmps. Unlimited if empty
        """

        self._temporary_filesystems[path] = size

846
    def add_port(self, container_port, host_port, host_address=None):
847
848
849
850
851
852
853
854
855
856
        """Add a port binding

        Parameters:
            :param int container_port: Port to bind from the container

            :param int host_port: Port to bind to on the host

            :param str host_address: Address of the host
        """

857
858
859
860
        if host_address is not None:
            value = (host_address, host_port)
        else:
            value = [host_port]
861

862
        self._ports[container_port] = value
863

864
865
866
867
868
869
870
871
872
873
874
    def add_environment_variable(self, name, value):
        """Add an environment variable

        Parameters:
            :param str name: Name of the variable

            :param str value: Content of the variable
        """

        self._environment_variables[name] = value

875
    def reset_ports(self):
876
877
        """Empty the port bindings"""

878
879
        self._ports = {}

880
881
    @property
    def name(self):
Samuel GAIST's avatar
Samuel GAIST committed
882
        name = ""
883
        if self._name:
Samuel GAIST's avatar
Samuel GAIST committed
884
            name = "--name=%s" % self._name
885
886
        return name

887
888
    @property
    def workdir(self):
Samuel GAIST's avatar
Samuel GAIST committed
889
        workdir = ""
890
        if self._workdir:
Samuel GAIST's avatar
Samuel GAIST committed
891
            workdir = "--workdir=%s" % self._workdir
892
893
        return workdir

894
895
    @property
    def entrypoint(self):
Samuel GAIST's avatar
Samuel GAIST committed
896
        entrypoint = ""
897
        if self._entrypoint:
Samuel GAIST's avatar
Samuel GAIST committed
898
            entrypoint = "--entrypoint=%s" % self._entrypoint
899
900
        return entrypoint

901
902
903
904
905
906
907
908
    @property
    def volumes(self):
        """Returns the volumes of this container in a suitable form to build
        a command to start the container.
        """

        volumes = []
        for k, v in self._volumes.items():
Samuel GAIST's avatar
Samuel GAIST committed
909
910
911
912
913
914
915
916
917
918
            if k.startswith("nfs://"):
                addr, src = k[6:].split(":")
                volumes.append(
                    "--mount=type=volume,"
                    "dst={dst},"
                    "volume-driver=local,"
                    "volume-opt=type=nfs,"
                    "volume-opt=device=:{src},"
                    "volume-opt=o=addr={addr}".format(dst=v["bind"], src=src, addr=addr)
                )
919
            else:
Samuel GAIST's avatar
Samuel GAIST committed
920
                if k.startswith("file://"):
921
                    k = k[6:]
Samuel GAIST's avatar
Samuel GAIST committed
922
                volumes.append("--volume=%s:%s:%s" % (k, v["bind"], v["mode"]))
923
        return volumes
924

925
926
927
928
929
930
931
932
933
934
935
    @property
    def temporary_filesystems(self):
        tempfs_list = []
        for path, size in self._temporary_filesystems.items():
            tmpfs_string = "--tmpfs={}:rw,noexec,nosuid".format(path)
            if size:
                tmpfs_string += ",size={}".format(size)

            tempfs_list.append(tmpfs_string)
        return tempfs_list

936
    @property
937
938
939
940
941
942
943
    def ports(self):
        """Returns the ports of this container in a suitable form to build
        a command to start the container.
        """

        ports = []
        for k, v in self._ports.items():
Samuel GAIST's avatar
Samuel GAIST committed
944
            ports.append("-p")
945
            if isinstance(v, tuple):
Samuel GAIST's avatar
Samuel GAIST committed
946
                ports.append("%s:%d:%d" % (v[0], v[1], k))
947
            else:
Samuel GAIST's avatar
Samuel GAIST committed
948
                ports.append("%d:%d" % (v[0], k))
949
        return ports
950

951
952
    @property
    def environment_variables(self):
Samuel GAIST's avatar
Samuel GAIST committed
953
        """Returns the environment variables to set on this container."""
954
955
956

        environment_variables = []
        for k, v in self._environment_variables.items():
Samuel GAIST's avatar
Samuel GAIST committed
957
            environment_variables.append("--env={}={}".format(k, v))
958
959
        return environment_variables

960
961
    @property
    def network(self):
Samuel GAIST's avatar
Samuel GAIST committed
962
        network = ""
963
        if self.network_name:
Samuel GAIST's avatar
Samuel GAIST committed
964
            network = "--network=" + self.network_name
965
        return network
966
967
968

    @property
    def user(self):
Samuel GAIST's avatar
Samuel GAIST committed
969
        user = ""
970
        if self.uid:
Samuel GAIST's avatar
Samuel GAIST committed
971
            user = "--user={0}:{0}".format(self.uid)
972
973
        return user

974
    @property
975
    def command_line(self):
976
977
978
979
980
981
982
        """Returns the complete docker command to start the container and
        execute the specified command.

        Returns:
            str: Command to execute
        """

983
        cmd = "docker run -ti --rm=true "
984
        cmd += "%s " % self.network
985
        cmd += "%s " % self.user
Samuel GAIST's avatar
Samuel GAIST committed
986
987
988
        cmd += " ".join(self.volumes)
        cmd += " ".join(self.ports)
        cmd += " ".join(self.environment_variables)
989
        cmd += "%s " % self.name
990
        cmd += "%s " % self.workdir
991
        cmd += "%s " % self.entrypoint
992

993
        cmd += "%s " % self.image
994