dock.py 21.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
'''Implementation of subprocess-based asynchronous running with greenlets
André Anjos's avatar
André Anjos committed
30
31
32
'''

import os
33
import tarfile
André Anjos's avatar
André Anjos committed
34
35
36
37

import logging
logger = logging.getLogger(__name__)

38
39
40
41
import six
import docker
import requests
import simplejson
42
import socket
André Anjos's avatar
André Anjos committed
43
44
45
46

from . import stats


47
48
class Host(object):
  '''An object of this class can connect to the docker host and resolve stuff'''
49

50
51
  images_cache = {}

André Anjos's avatar
André Anjos committed
52

53
  def __init__(self, **kwargs):
André Anjos's avatar
André Anjos committed
54

55
56
    self.client = None
    self.containers = []
André Anjos's avatar
André Anjos committed
57

58
    if 'host' in kwargs and 'port' in kwargs:
59
60
61
62
63
64
      host = kwargs.get('host')
      del kwargs['host']
      port = kwargs.get('port')
      del kwargs['port']
      kwargs['base_url'] = "http://%s:%s" % (host, port)

65
66
67
68
69
    self.images_cache_filename = None
    if 'images_cache' in kwargs:
      self.images_cache_filename = kwargs.get('images_cache')
      del kwargs['images_cache']

70
    self.kwargs = kwargs
André Anjos's avatar
André Anjos committed
71
    self.environments = {}
72
    self.db_environments = {}
73
74


75
  def setup(self, raise_on_errors=True):
Philip ABBET's avatar
Philip ABBET committed
76
    self.kwargs.update(**docker.utils.kwargs_from_env(assert_hostname=False))
77

78
    self.client = docker.Client(**self.kwargs)
79

80
81
82
83
    if (self.images_cache_filename is not None) and os.path.exists(self.images_cache_filename):
      with open(self.images_cache_filename, 'r') as f:
        Host.images_cache = simplejson.load(f)

84
    (self.environments, self.db_environments) = self._discover_environments(raise_on_errors)
André Anjos's avatar
André Anjos committed
85

86
87
88
89
    if self.images_cache_filename is not None:
      with open(self.images_cache_filename, 'w') as f:
        simplejson.dump(Host.images_cache, f, indent=4)

André Anjos's avatar
André Anjos committed
90
91

  def __contains__(self, key):
92
    return (key in self.environments) or (key in self.db_environments)
André Anjos's avatar
André Anjos committed
93
94
95


  def __str__(self):
96
    return self.kwargs.get('base_url', 'Host')
André Anjos's avatar
André Anjos committed
97
98
99
100
101
102
103


  def env2docker(self, key):
    '''Returns a nice docker image name given a BEAT environment key'''

    attrs = self.environments[key]

104
105
106
    if attrs['tag'] is not None:
      return attrs['tag']

André Anjos's avatar
André Anjos committed
107
    return attrs['short_id']
108

109

110
111
112
113
114
115
116
  def db2docker(self, db_names):
    '''Returns a nice docker image name given a database name'''

    def _all_in(db_names, databases):
      return len([ x for x in db_names if x in databases ]) == len(db_names)

    attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x['databases']) ][0]
117

118
119
120
121
122
123
124
125
126
    if attrs['tag'] is not None:
      return attrs['tag']

    return attrs['short_id']


  def teardown(self):
    for container in self.containers:
      self.rm(container)
127
128


André Anjos's avatar
André Anjos committed
129
130
131
132
133
134
135
136
137
  def __enter__(self):
    self.setup()
    return self


  def __exit__(self, *exc):
    self.teardown()


138
139
140
  @property
  def ip(self):
    '''The IP address of the docker host'''
141
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Philip ABBET's avatar
Philip ABBET committed
142
    s.connect(('8.8.8.8', 1))  # connecting to a UDP address doesn't send packets
143
    return s.getsockname()[0]
144
145


146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
  def _discover_environments(self, raise_on_errors=True):
    '''Returns a dictionary containing information about docker environments

    Parameters:

      raise_on_errors (bool, Optional): If we should raise an exception
        (``RuntimeError``) in case installed environments override each other
        and we can't know which to use.


    Raises:

      RuntimeError: if you set ``raise_on_errors`` and I found environments
        that override each other for their description keys (``<name>
        (<version>)``), which should be unique.

    '''

    def _describe(image):
      '''Tries to run the "describe" app on the image, collect results'''

167
168
169
      if Host.images_cache.has_key(image):
        return Host.images_cache[image]

170
      status, output = self.get_statusoutput(image, ['describe'])
171
172
      if status == 0:
        try:
173
174
175
          infos = simplejson.loads(output)
          Host.images_cache[image] = infos
          return infos
176
177
178
179
180
181
        except Exception as e:
          logger.warn("Ignoring potential environment at `%s' since " \
                  "`describe' output cannot be parsed: %s", image, str(e))
      return {}


182
    def _must_replace(image_tag, environments, key):
183
184
185
186
187
188
        # this check avoids we do a new environment and, by mistake, override
        # it with a previous version or the contrary.
        if raise_on_errors:
          raise RuntimeError("Environments at `%s' and `%s' have the " \
                  "same name (`%s'). Distinct environments must be " \
                  "uniquely named. Fix this and re-start." % \
189
                  (image_tag, environments[key]['image'], key))
190

191
192
193
194
195
196
        new_version = None
        previous_version = None

        parts = image_tag.split('/')
        if len(parts) > 1:
          parts = parts[-1].split(':')
197
          if len(parts) > 1:
198
            new_version = parts[-1]
199

200
201
202
        parts = environments[key]['tag'].split('/')
        if len(parts) > 1:
          parts = parts[-1].split(':')
203
          if len(parts) > 1:
204
            previous_version = parts[-1]
205

206
207
        replacement = False
        keep = False
208

209
210
211
212
213
214
215
216
        if (new_version is not None) and (previous_version is not None):
          if new_version == 'latest':
            replacement = True
          elif previous_version == 'latest':
            keep = True
          else:
            try:
              new_version = tuple([ int(x) for x in new_version.split('.') ])
217

218
219
              try:
                previous_version = tuple([ int(x) for x in previous_version.split('.') ])
220

221
                if new_version > previous_version:
222
                  replacement = True
223
224
                else:
                  keep = True
225
              except:
226
                replacement = True
227

228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
            except:
              keep = True

        elif new_version is not None:
          replacement = True
        elif previous_version is not None:
          keep = True

        if replacement:
          logger.debug("Overriding **existing** environment `%s' in image `%s'", key, environments[key]['tag'])
        elif keep:
          logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, environments[key]['tag'])
          return False
        else:
          logger.warn("Overriding **existing** environment `%s' image " \
                  "with `%s'. To avoid this warning make " \
                  "sure your docker images do not contain environments " \
                  "with the same names", key, environments[key]['image'])

        return True


    environments = {}
    db_environments = {}

    for image in self.client.images():
      # call the "describe" application on each existing image with "beat" in
      # its name
      tag = image['RepoTags'][0] if image['RepoTags'] else None
      if (tag is None) or (tag.find('beat') == -1):
        continue

      id = image['Id'].split(':')[1][:12]
      logger.debug("Checking image `%s' (%s)...", tag, id)
      description = _describe(tag or id)
      if not description:
        continue

      key = description['name'] + ' (' + description['version'] + ')'

      if description.has_key('databases'):
        if (key in db_environments) and not _must_replace(tag, db_environments, key):
          continue

        db_environments[key] = description
        db_environments[key]['image'] = image['Id']
        db_environments[key]['tag'] = tag
        db_environments[key]['short_id'] = id
        db_environments[key]['nickname'] = tag or id
      else:
        if (key in environments) and not _must_replace(tag, environments, key):
          continue

        environments[key] = description
        environments[key]['image'] = image['Id']
        environments[key]['tag'] = tag
        environments[key]['short_id'] = id
        environments[key]['nickname'] = tag or id
286

287
288
      logger.info("Registered `%s' -> `%s (%s)'", key, tag, id)

289
290
291
292
    logger.debug("Found %d environments and %d database environments", len(environments),
                 len(db_environments))

    return (environments, db_environments)
293
294


295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
  def put_path(self, container, src, dest='/tmp', chmod=None):
    """Puts a given src path into a destination folder

    This method will copy a given ``src`` path into a ``dest`` folder. It
    optionally changes the owner of the resulting path on the image so it is
    accessible by other users than root.


    Parameters:

      container (str, dict): The container where to copy the path.

      src (str): The path, on the host, of the archive to be copied over the
        container. The path may point to a file or directory on the host. Data
        will be copied recursively.

      dest (str, Optional): A path, on the container, where to copy the input
        data. By default, we put data into the temporary directory of the
        container.

      chmod (str, Optional): An optional string to set the mode of the
        resulting path (files and directory). A common reason for this sort of
        thing is to set directories to 755 but files to 644 with the following
        value ``u+rwX,go+rX,go-w``. This will be applied recursively to the
        resulting path on the container. If you set this value to ``None``,
        then the chmod command will not be run.

    """

    # The docker API only accepts in-memory tar archives
    c = six.moves.cStringIO()
    with tarfile.open(mode='w', fileobj=c) as tar:
      tar.add(src, arcname=os.path.basename(src))
    archive = c.getvalue()

    # Place the tarball into the container
    path = os.path.join(dest, os.path.basename(src))
    logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest)
    self.client.put_archive(container, dest, archive)

    if chmod is not None:
      # Change permissions to access the path
      ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest])
      output = self.client.exec_start(ex) #waits until it is executed


341
342
  def create_container(self, image, command, tmp_path=None, host_args=None,
      **args):
343
344
345
346
347
348
349
350
351
352
    """Prepares the docker container for running the user code


    Parameters:

      image (str): A unique identifier for the image to create

      command (list): A list of strings with the command to run inside the
        container.

353
354
355
      tmp_path (str): A path with a file name or directory that will be
        copied into the container (inside ``/tmp``), supposedly with
        information that is used by the command.
356

357
358
359
360
361
362
      host_args (dict): A dictionary that will be transformed into a
        ``HostConfig`` object that will be used for the creation of the
        container.  It is the place too add/drop capabilities for the container
        or to set maximum resource usage (CPU, memory, etc). Keys accepted are
        not arbitrary. Check docker_py's manual for details.

363
364
      args (dict): A list of extra arguments to pass to the underlying
        ``create_container()`` call from the docker Python API.
André Anjos's avatar
André Anjos committed
365
366


367
368
369
370
371
372
373
    Returns:

      docker.Container: The container, already preloaded and ready to run. The
        container is not started by this method.

    """

André Anjos's avatar
André Anjos committed
374
375
    if image in self: #replace by a real image name
      image = self.env2docker(image)
376

377
378
379
380
381
    config_args = dict(
        log_config = docker.utils.LogConfig(type='',
          config={'max-size': '1M', 'max-file': '1'}),
        )

382
383
    if host_args is not None:
      config_args.update(host_args)
384

385
386
387
388
389
    if args.has_key('volumes'):
      volumes = [ v['bind'] for k, v in args['volumes'].items() ]
      config_args['binds'] = args['volumes']
      args['volumes'] = volumes

390
391
392
393
394
395
396
    # see bug: https://github.com/docker/docker-py/issues/1195
    # fix on docker-engine 1.13.0 and superior
    # see changelog: https://github.com/docker/docker/blob/master/CHANGELOG.md
    args['network_disabled'] = False

    # for HostConfig (keys are not arbitrary) - see:
    # http://docker-py.readthedocs.io/en/latest/hostconfig/
397
398
    args['host_config'] = self.client.create_host_config(**config_args)

André Anjos's avatar
André Anjos committed
399
    logger.debug("[docker] create_container %s %s", image, ' '.join(command))
400
401
402
403
    container = self.client.create_container(image=image, command=command,
        **args)
    self.containers.append(container)

404
405
    if tmp_path is not None:
      self.put_path(container, tmp_path)
406
407
408
409
410
411
412

    return container


  def start(self, container):
    '''Starts a given container'''

André Anjos's avatar
André Anjos committed
413
    logger.debug("[docker] start %s", container['Id'][:12])
414
415
416
417
418
419
420
421
    self.client.start(container)


  def status(self, container):
    '''Checks the status of a given container

    '''

André Anjos's avatar
André Anjos committed
422
    try:
André Anjos's avatar
André Anjos committed
423
      logger.debug("[docker] inspect %s", container['Id'][:12])
424
425
426
427
      z = self.client.inspect_container(container)
      return z['State']['Status']
    except Exception as e:
      return None
André Anjos's avatar
André Anjos committed
428
429


430
431
  def rm(self, container):
    '''Removes a given container. If it is not done, kill it first'''
André Anjos's avatar
André Anjos committed
432

433
434
435
436
437
438
    status = self.status(container)

    if status not in ('created', 'exited'):
      logger.warn("Killing container `%s' which is on state `%s'",
          container['Id'], status)
      self.client.kill(container)
André Anjos's avatar
André Anjos committed
439

André Anjos's avatar
André Anjos committed
440
441
    logger.debug("[docker] rm %s", container['Id'][:12])
    self.client.remove_container(container)
442
    self.containers.remove(container)
André Anjos's avatar
André Anjos committed
443
444


445
446
  def get_statusoutput(self, image, command, **kwargs):
    '''Runs a command and retrieves status and output'''
André Anjos's avatar
André Anjos committed
447

448
    container = None
449
    try:
450
      container = self.create_container(image=image, command=command, **kwargs)
451
      self.start(container)
452
453
454
455
456
      status = self.client.wait(container)
      output = self.client.logs(container)
    except Exception:
      return 1, None
    finally:
457
      if container is not None: self.rm(container)
458
459

    return status, output
André Anjos's avatar
André Anjos committed
460
461


462
463
class Popen:
  '''Process-like manager for asynchronously executing user code in a container
André Anjos's avatar
André Anjos committed
464

465
466
467
  The process will be run in the background, and its standard output and
  standard error will be read after it finishes, into a limited size circular
  buffer.
André Anjos's avatar
André Anjos committed
468
469


470
471
472
473
  Parameters:

    host (:py:class:`Host`): The docker host where to start the container.
      The host must be properly initialized, including starting the
474
      appropriate docker-machine, if that is the case (environment setup).
475
476
477
478
479
480
481

    image (str): A string uniquely identifying the image from which to start
      the container

    command (list): A list of strings with the command to run inside the
      container.

482
    path (str, Optional): An archive to copy into the temporary
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
      directory of the container, supposedly with information that is used by
      the command. If a string is given, than it is considered as a path from
      which the archive is going to be created.

    virtual_memory_in_megabytes (int, Optional): The maximum amount of memory
      the user process can consume on the host. If not specified, a memory
      limit is not set.

    max_cpu_percent (float, Optional): The maximum amount of CPU the user
      process may consume on the host. The value ``100`` equals to using 100%
      of a single core. If not specified, then a CPU limitation is not put in
      place.

  '''

  def __init__(self, host, image, command, tmp_archive=None,
      virtual_memory_in_megabytes=0, max_cpu_percent=0, **args):

    self.host = host

    args.update({
504
        'user': 'root', #user `nobody' cannot access the tmp archive...
505
506
507
508
509
        'tty': False,
        'detach': True,
        'stdin_open': False,
        })

510
    host_args = dict()
511

512
513
514
515
516
    if virtual_memory_in_megabytes:
      # For this to work properly, memory swap limitation has to be enabled on
      # the kernel. This typically goes by setting "cgroup_enable=memory" as a
      # boot parameter to kernels which are compiled with this support.
      # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
André Anjos's avatar
André Anjos committed
517

518
519
520
521
      host_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm'
      host_args['memswap_limit'] = host_args['mem_limit']
      logger.debug('Setting maximum memory to %dMB' % \
          (virtual_memory_in_megabytes,))
522

523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
    if max_cpu_percent:
      # The period corresponds to the scheduling interval for the CFS in Linux
      # The quota corresponds to a fraction or a multiple of the period, the
      # container will get. A quota that is 2x the period gets the container up
      # to 200% cpu time (2 cores). If the quota is 0.5x the period, the
      # container gets up to 50% the cpu time. Each core represents 100%. A
      # system with 2 cores has 200% computing power.
      #
      # More info:
      # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
      #
      # For this to work properly, CPU bandwidth provisioning for the Linux CFS
      # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
      #
      # If your system is running on a virtual machine, having more cores
      # available to docker engine normally translates to more precise
      # scheduling.
      period = 100000 #microseconds

      quota = max_cpu_percent/100.
      host_args['cpu_period'] = period
      host_args['cpu_quota'] = int(period * quota)
      logger.debug('Setting CPU quota to %d%%' % max_cpu_percent)
546

547
548
549
550
    # creates the container
    self.container = self.host.create_container(image=image,
        command=command, tmp_path=tmp_archive,
        host_args=host_args, **args)
551
552
553
554
555
556
557
558
559
560
561
562
563
564

    # Starts the container
    self.host.start(self.container)

    # Gets start point statistics
    self.previous_stats = None


  def __enter__(self):
    return self


  def __exit__(self, *exc):
    self.rm()
André Anjos's avatar
André Anjos committed
565
566


André Anjos's avatar
André Anjos committed
567
568
569
570
571
  @property
  def pid(self):
    return self.container['Id'][:12]


572
573
574
575
576
577
578
579
580
581
582
583
  def wait(self, timeout=None):
    '''Reads stdout and stderr until the underlying processes finishes

    Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which
    we read the stdout and stderr data into a circular buffer, keep only the
    last N bytes of each stream.

    This method will call :py:meth:`file.readline` on both stdout and stderr
    streams attached to the process. These methods are "green". They will yield
    once they are blocked.


584
585
586
587
588
589
    Parameters:

      timeout (float, Optional): A timeout in seconds to wait for the user
        process to finish. If a timeout value is not given, waits forever.


590
591
592
593
594
595
596
    Returns:

      int: Returns the status code of the process


    Raises:

597
      requests.exceptions.ReadTimeout: if the process times out
598
599
600

    '''

601
    return self.host.client.wait(self.container, timeout)
602

603
604
605
  @property
  def stdout(self):
    '''Returns the stdout'''
606

607
608
609
610
611
612
613
614
    return self.host.client.logs(self.container, stdout=True, stderr=False)


  @property
  def stderr(self):
    '''Returns the stderr'''

    return self.host.client.logs(self.container, stdout=False, stderr=True)
615
616


617
618
  def rm(self):
    '''Remove the container. If it is not stopped yet, kill it first'''
619

620
    self.host.rm(self.container)
621
622


623
624
  def status(self):
    '''Returns my own "docker" status'''
625

626
    return self.host.status(self.container)
André Anjos's avatar
André Anjos committed
627
628
629


  def kill(self):
630
    '''Stop container'''
André Anjos's avatar
André Anjos committed
631

632
633
634
635
636
637
638
    if self.status() == 'running':
        self.host.client.kill(self.container)


  def _stats(self):

    return self.host.client.stats(self.container, decode=True, stream=False)
André Anjos's avatar
André Anjos committed
639
640
641


  def statistics(self):
642
    '''If the process is still active, returns usage statistics by ``pusutil``
André Anjos's avatar
André Anjos committed
643
644
645
646
647
648
649
650
651
652
653


    Returns:

      stats (dict): A dictionary object with all collected statistics


    Raises:

      RuntimeError: In case the process is not active anymore.

654
    '''
André Anjos's avatar
André Anjos committed
655

656
    data = self._stats()
André Anjos's avatar
André Anjos committed
657

658
659
660
661
662
663
664
665
666
667
668
    # If CPU statistics can't be retrieved
    if not data['cpu_stats'].has_key('system_cpu_usage'):
        data['cpu_stats'] = dict(data['precpu_stats'])

    # If memory statistics can't be retrieved
    if len(data['memory_stats']) == 0:
        data['memory_stats'] = dict(
            limit=0,
            max_usage=0
        )

669
670
    previous_cpu = self.previous_stats['cpu_stats'] \
        if self.previous_stats else None
André Anjos's avatar
André Anjos committed
671
672
673

    # merge statistics
    retval = dict(
674
675
            cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']),
            memory=stats.memory_statistics(data['memory_stats']),
André Anjos's avatar
André Anjos committed
676
            )
677
678
679

    self.previous_stats = data

André Anjos's avatar
André Anjos committed
680
    return retval