dock.py 20.8 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
'''Implementation of subprocess-based asynchronous running with greenlets
André Anjos's avatar
André Anjos committed
30
31
32
'''

import os
33
import tarfile
André Anjos's avatar
André Anjos committed
34
35
36
37

import logging
logger = logging.getLogger(__name__)

38
39
40
41
import six
import docker
import requests
import simplejson
42
import socket
André Anjos's avatar
André Anjos committed
43
44
45
46

from . import stats


47
48
class Host(object):
  '''An object of this class can connect to the docker host and resolve stuff'''
49

André Anjos's avatar
André Anjos committed
50

51
  def __init__(self, **kwargs):
André Anjos's avatar
André Anjos committed
52

53
54
    self.client = None
    self.containers = []
André Anjos's avatar
André Anjos committed
55

56
    if 'host' in kwargs and 'port' in kwargs:
57
58
59
60
61
62
63
      host = kwargs.get('host')
      del kwargs['host']
      port = kwargs.get('port')
      del kwargs['port']
      kwargs['base_url'] = "http://%s:%s" % (host, port)

    self.kwargs = kwargs
André Anjos's avatar
André Anjos committed
64
    self.environments = {}
65
    self.db_environments = {}
66
67


68
  def setup(self, raise_on_errors=True):
Philip ABBET's avatar
Philip ABBET committed
69
    self.kwargs.update(**docker.utils.kwargs_from_env(assert_hostname=False))
70

71
    self.client = docker.Client(**self.kwargs)
72

73
    (self.environments, self.db_environments) = self._discover_environments(raise_on_errors)
André Anjos's avatar
André Anjos committed
74
75
76


  def __contains__(self, key):
77
    return (key in self.environments) or (key in self.db_environments)
André Anjos's avatar
André Anjos committed
78
79
80


  def __str__(self):
81
    return self.kwargs['base_url']
André Anjos's avatar
André Anjos committed
82
83
84
85
86
87
88


  def env2docker(self, key):
    '''Returns a nice docker image name given a BEAT environment key'''

    attrs = self.environments[key]

89
90
91
    if attrs['tag'] is not None:
      return attrs['tag']

André Anjos's avatar
André Anjos committed
92
    return attrs['short_id']
93

94

95
96
97
98
99
100
101
  def db2docker(self, db_names):
    '''Returns a nice docker image name given a database name'''

    def _all_in(db_names, databases):
      return len([ x for x in db_names if x in databases ]) == len(db_names)

    attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x['databases']) ][0]
102

103
104
105
106
107
108
109
110
111
    if attrs['tag'] is not None:
      return attrs['tag']

    return attrs['short_id']


  def teardown(self):
    for container in self.containers:
      self.rm(container)
112
113


André Anjos's avatar
André Anjos committed
114
115
116
117
118
119
120
121
122
  def __enter__(self):
    self.setup()
    return self


  def __exit__(self, *exc):
    self.teardown()


123
124
125
  @property
  def ip(self):
    '''The IP address of the docker host'''
126
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Philip ABBET's avatar
Philip ABBET committed
127
    s.connect(('8.8.8.8', 1))  # connecting to a UDP address doesn't send packets
128
    return s.getsockname()[0]
129
130


131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
  def _discover_environments(self, raise_on_errors=True):
    '''Returns a dictionary containing information about docker environments

    Parameters:

      raise_on_errors (bool, Optional): If we should raise an exception
        (``RuntimeError``) in case installed environments override each other
        and we can't know which to use.


    Raises:

      RuntimeError: if you set ``raise_on_errors`` and I found environments
        that override each other for their description keys (``<name>
        (<version>)``), which should be unique.

    '''

    def _describe(image):
      '''Tries to run the "describe" app on the image, collect results'''

152
      status, output = self.get_statusoutput(image, ['describe'])
153
154
155
156
157
158
159
160
161
      if status == 0:
        try:
          return simplejson.loads(output)
        except Exception as e:
          logger.warn("Ignoring potential environment at `%s' since " \
                  "`describe' output cannot be parsed: %s", image, str(e))
      return {}


162
    def _must_replace(image_tag, environments, key):
163
164
165
166
167
168
        # this check avoids we do a new environment and, by mistake, override
        # it with a previous version or the contrary.
        if raise_on_errors:
          raise RuntimeError("Environments at `%s' and `%s' have the " \
                  "same name (`%s'). Distinct environments must be " \
                  "uniquely named. Fix this and re-start." % \
169
                  (image_tag, environments[key]['image'], key))
170

171
172
173
174
175
176
        new_version = None
        previous_version = None

        parts = image_tag.split('/')
        if len(parts) > 1:
          parts = parts[-1].split(':')
177
          if len(parts) > 1:
178
            new_version = parts[-1]
179

180
181
182
        parts = environments[key]['tag'].split('/')
        if len(parts) > 1:
          parts = parts[-1].split(':')
183
          if len(parts) > 1:
184
            previous_version = parts[-1]
185

186
187
        replacement = False
        keep = False
188

189
190
191
192
193
194
195
196
        if (new_version is not None) and (previous_version is not None):
          if new_version == 'latest':
            replacement = True
          elif previous_version == 'latest':
            keep = True
          else:
            try:
              new_version = tuple([ int(x) for x in new_version.split('.') ])
197

198
199
              try:
                previous_version = tuple([ int(x) for x in previous_version.split('.') ])
200

201
                if new_version > previous_version:
202
                  replacement = True
203
204
                else:
                  keep = True
205
              except:
206
                replacement = True
207

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
            except:
              keep = True

        elif new_version is not None:
          replacement = True
        elif previous_version is not None:
          keep = True

        if replacement:
          logger.debug("Overriding **existing** environment `%s' in image `%s'", key, environments[key]['tag'])
        elif keep:
          logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, environments[key]['tag'])
          return False
        else:
          logger.warn("Overriding **existing** environment `%s' image " \
                  "with `%s'. To avoid this warning make " \
                  "sure your docker images do not contain environments " \
                  "with the same names", key, environments[key]['image'])

        return True


    environments = {}
    db_environments = {}

    for image in self.client.images():
      # call the "describe" application on each existing image with "beat" in
      # its name
      tag = image['RepoTags'][0] if image['RepoTags'] else None
      if (tag is None) or (tag.find('beat') == -1):
        continue

      id = image['Id'].split(':')[1][:12]
      logger.debug("Checking image `%s' (%s)...", tag, id)
      description = _describe(tag or id)
      if not description:
        continue

      key = description['name'] + ' (' + description['version'] + ')'

      if description.has_key('databases'):
        if (key in db_environments) and not _must_replace(tag, db_environments, key):
          continue

        db_environments[key] = description
        db_environments[key]['image'] = image['Id']
        db_environments[key]['tag'] = tag
        db_environments[key]['short_id'] = id
        db_environments[key]['nickname'] = tag or id
      else:
        if (key in environments) and not _must_replace(tag, environments, key):
          continue

        environments[key] = description
        environments[key]['image'] = image['Id']
        environments[key]['tag'] = tag
        environments[key]['short_id'] = id
        environments[key]['nickname'] = tag or id
266

267
268
      logger.info("Registered `%s' -> `%s (%s)'", key, tag, id)

269
270
271
272
    logger.debug("Found %d environments and %d database environments", len(environments),
                 len(db_environments))

    return (environments, db_environments)
273
274


275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
  def put_path(self, container, src, dest='/tmp', chmod=None):
    """Puts a given src path into a destination folder

    This method will copy a given ``src`` path into a ``dest`` folder. It
    optionally changes the owner of the resulting path on the image so it is
    accessible by other users than root.


    Parameters:

      container (str, dict): The container where to copy the path.

      src (str): The path, on the host, of the archive to be copied over the
        container. The path may point to a file or directory on the host. Data
        will be copied recursively.

      dest (str, Optional): A path, on the container, where to copy the input
        data. By default, we put data into the temporary directory of the
        container.

      chmod (str, Optional): An optional string to set the mode of the
        resulting path (files and directory). A common reason for this sort of
        thing is to set directories to 755 but files to 644 with the following
        value ``u+rwX,go+rX,go-w``. This will be applied recursively to the
        resulting path on the container. If you set this value to ``None``,
        then the chmod command will not be run.

    """

    # The docker API only accepts in-memory tar archives
    c = six.moves.cStringIO()
    with tarfile.open(mode='w', fileobj=c) as tar:
      tar.add(src, arcname=os.path.basename(src))
    archive = c.getvalue()

    # Place the tarball into the container
    path = os.path.join(dest, os.path.basename(src))
    logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest)
    self.client.put_archive(container, dest, archive)

    if chmod is not None:
      # Change permissions to access the path
      ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest])
      output = self.client.exec_start(ex) #waits until it is executed


321
322
  def create_container(self, image, command, tmp_path=None, host_args=None,
      **args):
323
324
325
326
327
328
329
330
331
332
    """Prepares the docker container for running the user code


    Parameters:

      image (str): A unique identifier for the image to create

      command (list): A list of strings with the command to run inside the
        container.

333
334
335
      tmp_path (str): A path with a file name or directory that will be
        copied into the container (inside ``/tmp``), supposedly with
        information that is used by the command.
336

337
338
339
340
341
342
      host_args (dict): A dictionary that will be transformed into a
        ``HostConfig`` object that will be used for the creation of the
        container.  It is the place too add/drop capabilities for the container
        or to set maximum resource usage (CPU, memory, etc). Keys accepted are
        not arbitrary. Check docker_py's manual for details.

343
344
      args (dict): A list of extra arguments to pass to the underlying
        ``create_container()`` call from the docker Python API.
André Anjos's avatar
André Anjos committed
345
346


347
348
349
350
351
352
353
    Returns:

      docker.Container: The container, already preloaded and ready to run. The
        container is not started by this method.

    """

André Anjos's avatar
André Anjos committed
354
355
    if image in self: #replace by a real image name
      image = self.env2docker(image)
356

357
358
359
360
361
    config_args = dict(
        log_config = docker.utils.LogConfig(type='',
          config={'max-size': '1M', 'max-file': '1'}),
        )

362
363
    if host_args is not None:
      config_args.update(host_args)
364

365
366
367
368
369
    if args.has_key('volumes'):
      volumes = [ v['bind'] for k, v in args['volumes'].items() ]
      config_args['binds'] = args['volumes']
      args['volumes'] = volumes

370
371
372
373
374
375
376
    # see bug: https://github.com/docker/docker-py/issues/1195
    # fix on docker-engine 1.13.0 and superior
    # see changelog: https://github.com/docker/docker/blob/master/CHANGELOG.md
    args['network_disabled'] = False

    # for HostConfig (keys are not arbitrary) - see:
    # http://docker-py.readthedocs.io/en/latest/hostconfig/
377
378
    args['host_config'] = self.client.create_host_config(**config_args)

André Anjos's avatar
André Anjos committed
379
    logger.debug("[docker] create_container %s %s", image, ' '.join(command))
380
381
382
383
    container = self.client.create_container(image=image, command=command,
        **args)
    self.containers.append(container)

384
385
    if tmp_path is not None:
      self.put_path(container, tmp_path)
386
387
388
389
390
391
392

    return container


  def start(self, container):
    '''Starts a given container'''

André Anjos's avatar
André Anjos committed
393
    logger.debug("[docker] start %s", container['Id'][:12])
394
395
396
397
398
399
400
401
    self.client.start(container)


  def status(self, container):
    '''Checks the status of a given container

    '''

André Anjos's avatar
André Anjos committed
402
    try:
André Anjos's avatar
André Anjos committed
403
      logger.debug("[docker] inspect %s", container['Id'][:12])
404
405
406
407
      z = self.client.inspect_container(container)
      return z['State']['Status']
    except Exception as e:
      return None
André Anjos's avatar
André Anjos committed
408
409


410
411
  def rm(self, container):
    '''Removes a given container. If it is not done, kill it first'''
André Anjos's avatar
André Anjos committed
412

413
414
415
416
417
418
    status = self.status(container)

    if status not in ('created', 'exited'):
      logger.warn("Killing container `%s' which is on state `%s'",
          container['Id'], status)
      self.client.kill(container)
André Anjos's avatar
André Anjos committed
419

André Anjos's avatar
André Anjos committed
420
421
    logger.debug("[docker] rm %s", container['Id'][:12])
    self.client.remove_container(container)
422
    self.containers.remove(container)
André Anjos's avatar
André Anjos committed
423
424


425
426
  def get_statusoutput(self, image, command, **kwargs):
    '''Runs a command and retrieves status and output'''
André Anjos's avatar
André Anjos committed
427

428
    container = None
429
    try:
430
      container = self.create_container(image=image, command=command, **kwargs)
431
      self.start(container)
432
433
434
435
436
      status = self.client.wait(container)
      output = self.client.logs(container)
    except Exception:
      return 1, None
    finally:
437
      if container is not None: self.rm(container)
438
439

    return status, output
André Anjos's avatar
André Anjos committed
440
441


442
443
class Popen:
  '''Process-like manager for asynchronously executing user code in a container
André Anjos's avatar
André Anjos committed
444

445
446
447
  The process will be run in the background, and its standard output and
  standard error will be read after it finishes, into a limited size circular
  buffer.
André Anjos's avatar
André Anjos committed
448
449


450
451
452
453
  Parameters:

    host (:py:class:`Host`): The docker host where to start the container.
      The host must be properly initialized, including starting the
454
      appropriate docker-machine, if that is the case (environment setup).
455
456
457
458
459
460
461

    image (str): A string uniquely identifying the image from which to start
      the container

    command (list): A list of strings with the command to run inside the
      container.

462
    path (str, Optional): An archive to copy into the temporary
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
      directory of the container, supposedly with information that is used by
      the command. If a string is given, than it is considered as a path from
      which the archive is going to be created.

    virtual_memory_in_megabytes (int, Optional): The maximum amount of memory
      the user process can consume on the host. If not specified, a memory
      limit is not set.

    max_cpu_percent (float, Optional): The maximum amount of CPU the user
      process may consume on the host. The value ``100`` equals to using 100%
      of a single core. If not specified, then a CPU limitation is not put in
      place.

  '''

  def __init__(self, host, image, command, tmp_archive=None,
      virtual_memory_in_megabytes=0, max_cpu_percent=0, **args):

    self.host = host

    args.update({
484
        'user': 'root', #user `nobody' cannot access the tmp archive...
485
486
487
488
489
        'tty': False,
        'detach': True,
        'stdin_open': False,
        })

490
    host_args = dict()
491

492
493
494
495
496
    if virtual_memory_in_megabytes:
      # For this to work properly, memory swap limitation has to be enabled on
      # the kernel. This typically goes by setting "cgroup_enable=memory" as a
      # boot parameter to kernels which are compiled with this support.
      # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
André Anjos's avatar
André Anjos committed
497

498
499
500
501
      host_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm'
      host_args['memswap_limit'] = host_args['mem_limit']
      logger.debug('Setting maximum memory to %dMB' % \
          (virtual_memory_in_megabytes,))
502

503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
    if max_cpu_percent:
      # The period corresponds to the scheduling interval for the CFS in Linux
      # The quota corresponds to a fraction or a multiple of the period, the
      # container will get. A quota that is 2x the period gets the container up
      # to 200% cpu time (2 cores). If the quota is 0.5x the period, the
      # container gets up to 50% the cpu time. Each core represents 100%. A
      # system with 2 cores has 200% computing power.
      #
      # More info:
      # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
      #
      # For this to work properly, CPU bandwidth provisioning for the Linux CFS
      # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
      #
      # If your system is running on a virtual machine, having more cores
      # available to docker engine normally translates to more precise
      # scheduling.
      period = 100000 #microseconds

      quota = max_cpu_percent/100.
      host_args['cpu_period'] = period
      host_args['cpu_quota'] = int(period * quota)
      logger.debug('Setting CPU quota to %d%%' % max_cpu_percent)
526

527
528
529
530
    # creates the container
    self.container = self.host.create_container(image=image,
        command=command, tmp_path=tmp_archive,
        host_args=host_args, **args)
531
532
533
534
535
536
537
538
539
540
541
542
543
544

    # Starts the container
    self.host.start(self.container)

    # Gets start point statistics
    self.previous_stats = None


  def __enter__(self):
    return self


  def __exit__(self, *exc):
    self.rm()
André Anjos's avatar
André Anjos committed
545
546


André Anjos's avatar
André Anjos committed
547
548
549
550
551
  @property
  def pid(self):
    return self.container['Id'][:12]


552
553
554
555
556
557
558
559
560
561
562
563
  def wait(self, timeout=None):
    '''Reads stdout and stderr until the underlying processes finishes

    Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which
    we read the stdout and stderr data into a circular buffer, keep only the
    last N bytes of each stream.

    This method will call :py:meth:`file.readline` on both stdout and stderr
    streams attached to the process. These methods are "green". They will yield
    once they are blocked.


564
565
566
567
568
569
    Parameters:

      timeout (float, Optional): A timeout in seconds to wait for the user
        process to finish. If a timeout value is not given, waits forever.


570
571
572
573
574
575
576
    Returns:

      int: Returns the status code of the process


    Raises:

577
      requests.exceptions.ReadTimeout: if the process times out
578
579
580

    '''

581
    return self.host.client.wait(self.container, timeout)
582

583
584
585
  @property
  def stdout(self):
    '''Returns the stdout'''
586

587
588
589
590
591
592
593
594
    return self.host.client.logs(self.container, stdout=True, stderr=False)


  @property
  def stderr(self):
    '''Returns the stderr'''

    return self.host.client.logs(self.container, stdout=False, stderr=True)
595
596


597
598
  def rm(self):
    '''Remove the container. If it is not stopped yet, kill it first'''
599

600
    self.host.rm(self.container)
601
602


603
604
  def status(self):
    '''Returns my own "docker" status'''
605

606
    return self.host.status(self.container)
André Anjos's avatar
André Anjos committed
607
608
609


  def kill(self):
610
    '''Stop container'''
André Anjos's avatar
André Anjos committed
611

612
613
614
615
616
617
618
    if self.status() == 'running':
        self.host.client.kill(self.container)


  def _stats(self):

    return self.host.client.stats(self.container, decode=True, stream=False)
André Anjos's avatar
André Anjos committed
619
620
621


  def statistics(self):
622
    '''If the process is still active, returns usage statistics by ``pusutil``
André Anjos's avatar
André Anjos committed
623
624
625
626
627
628
629
630
631
632
633


    Returns:

      stats (dict): A dictionary object with all collected statistics


    Raises:

      RuntimeError: In case the process is not active anymore.

634
    '''
André Anjos's avatar
André Anjos committed
635

636
    data = self._stats()
André Anjos's avatar
André Anjos committed
637

638
639
640
641
642
643
644
645
646
647
648
    # If CPU statistics can't be retrieved
    if not data['cpu_stats'].has_key('system_cpu_usage'):
        data['cpu_stats'] = dict(data['precpu_stats'])

    # If memory statistics can't be retrieved
    if len(data['memory_stats']) == 0:
        data['memory_stats'] = dict(
            limit=0,
            max_usage=0
        )

649
650
    previous_cpu = self.previous_stats['cpu_stats'] \
        if self.previous_stats else None
André Anjos's avatar
André Anjos committed
651
652
653

    # merge statistics
    retval = dict(
654
655
            cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']),
            memory=stats.memory_statistics(data['memory_stats']),
André Anjos's avatar
André Anjos committed
656
            )
657
658
659

    self.previous_stats = data

André Anjos's avatar
André Anjos committed
660
    return retval