dock.py 19.1 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
'''Implementation of subprocess-based asynchronous running with greenlets
André Anjos's avatar
André Anjos committed
30
31
32
'''

import os
33
import tarfile
André Anjos's avatar
André Anjos committed
34
35
36
37

import logging
logger = logging.getLogger(__name__)

38
39
40
41
import six
import docker
import requests
import simplejson
42
import socket
André Anjos's avatar
André Anjos committed
43
44
45
46

from . import stats


47
48
class Host(object):
  '''An object of this class can connect to the docker host and resolve stuff'''
49

André Anjos's avatar
André Anjos committed
50

51
  def __init__(self, **kwargs):
André Anjos's avatar
André Anjos committed
52

53
54
    self.client = None
    self.containers = []
André Anjos's avatar
André Anjos committed
55

56
    if 'host' in kwargs and 'port' in kwargs:
57
58
59
60
61
62
63
      host = kwargs.get('host')
      del kwargs['host']
      port = kwargs.get('port')
      del kwargs['port']
      kwargs['base_url'] = "http://%s:%s" % (host, port)

    self.kwargs = kwargs
André Anjos's avatar
André Anjos committed
64
    self.environments = {}
65
66


67
  def setup(self, raise_on_errors=True):
68

69
    self.client = docker.Client(**self.kwargs)
70

71
    self.environments = self._discover_environments(raise_on_errors)
André Anjos's avatar
André Anjos committed
72
73
74
75
76
77
78


  def __contains__(self, key):
    return key in self.environments


  def __str__(self):
79
    return self.kwargs['base_url']
André Anjos's avatar
André Anjos committed
80
81
82
83
84
85
86
87
88


  def env2docker(self, key):
    '''Returns a nice docker image name given a BEAT environment key'''

    attrs = self.environments[key]

    if attrs['tag'] is not None: return attrs['tag']
    return attrs['short_id']
89

90
91
92
93
94
95

  def teardown(self):

    for container in self.containers: self.rm(container)


André Anjos's avatar
André Anjos committed
96
97
98
99
100
101
102
103
104
  def __enter__(self):
    self.setup()
    return self


  def __exit__(self, *exc):
    self.teardown()


105
106
107
  @property
  def ip(self):
    '''The IP address of the docker host'''
108
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Philip ABBET's avatar
Philip ABBET committed
109
    s.connect(('8.8.8.8', 1))  # connecting to a UDP address doesn't send packets
110
    return s.getsockname()[0]
111
112


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
  def _discover_environments(self, raise_on_errors=True):
    '''Returns a dictionary containing information about docker environments

    Parameters:

      raise_on_errors (bool, Optional): If we should raise an exception
        (``RuntimeError``) in case installed environments override each other
        and we can't know which to use.


    Raises:

      RuntimeError: if you set ``raise_on_errors`` and I found environments
        that override each other for their description keys (``<name>
        (<version>)``), which should be unique.

    '''

    def _describe(image):
      '''Tries to run the "describe" app on the image, collect results'''

134
      status, output = self.get_statusoutput(image, ['describe'])
135
136
137
138
139
140
141
142
143
144
145
      if status == 0:
        try:
          return simplejson.loads(output)
        except Exception as e:
          logger.warn("Ignoring potential environment at `%s' since " \
                  "`describe' output cannot be parsed: %s", image, str(e))
      return {}

    retval = {}

    for image in self.client.images():
146
147
      # call the "describe" application on each existing image with "beat" in
      # its name
148
      tag = image['RepoTags'][0] if image['RepoTags'] else None
149
150
151
      if (tag is None) or (tag.find('beat') == -1):
        continue

152
153
      id = image['Id'].split(':')[1][:12]
      logger.debug("Checking image `%s' (%s)...", tag, id)
154
      description = _describe(tag or id)
155
156
157
158
159
160
161
162
163
164
165
166
      if not description: continue

      key = description['name'] + ' (' + description['version'] + ')'

      if key in retval:

        # this check avoids we do a new environment and, by mistake, override
        # it with a previous version or the contrary.
        if raise_on_errors:
          raise RuntimeError("Environments at `%s' and `%s' have the " \
                  "same name (`%s'). Distinct environments must be " \
                  "uniquely named. Fix this and re-start." % \
167
                  (tag or id, retval[key]['image'], key))
168
        else:
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
          new_version = None
          previous_version = None

          parts = tag.split('/')
          if len(parts) > 1:
            parts = parts[-1].split(':')
            if len(parts) > 1:
              new_version = parts[-1]

          parts = retval[key]['tag'].split('/')
          if len(parts) > 1:
            parts = parts[-1].split(':')
            if len(parts) > 1:
              previous_version = parts[-1]

          replacement = False
          keep = False

          if (new_version is not None) and (previous_version is not None):
            if new_version == 'latest':
              replacement = True
            elif previous_version == 'latest':
              keep = True
            else:
              try:
                new_version = tuple([ int(x) for x in new_version.split('.') ])

                try:
                  previous_version = tuple([ int(x) for x in previous_version.split('.') ])

                  if new_version > previous_version:
                    replacement = True
                  else:
                    keep = True
                except:
                  replacement = True

              except:
                keep = True

          elif new_version is not None:
            replacement = True
          elif previous_version is not None:
            keep = True

          if replacement:
            logger.debug("Overriding **existing** environment `%s' in image `%s'", key, retval[key]['tag'])
          elif keep:
            logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, retval[key]['tag'])
            continue
          else:
            logger.warn("Overriding **existing** environment `%s' image " \
                    "with `%s' (it was `%s). To avoid this warning make " \
                    "sure your docker images do not contain environments " \
                    "with the same names", key, retval[key]['image'],
                    image['Id'])
225
226
227
228
229

      retval[key] = description
      retval[key]['image'] = image['Id']
      retval[key]['tag'] = tag
      retval[key]['short_id'] = id
230
      retval[key]['nickname'] = tag or id
231
232
233
234
235
236
      logger.info("Registered `%s' -> `%s (%s)'", key, tag, id)

    logger.debug("Found %d environments", len(retval))
    return retval


237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
  def put_path(self, container, src, dest='/tmp', chmod=None):
    """Puts a given src path into a destination folder

    This method will copy a given ``src`` path into a ``dest`` folder. It
    optionally changes the owner of the resulting path on the image so it is
    accessible by other users than root.


    Parameters:

      container (str, dict): The container where to copy the path.

      src (str): The path, on the host, of the archive to be copied over the
        container. The path may point to a file or directory on the host. Data
        will be copied recursively.

      dest (str, Optional): A path, on the container, where to copy the input
        data. By default, we put data into the temporary directory of the
        container.

      chmod (str, Optional): An optional string to set the mode of the
        resulting path (files and directory). A common reason for this sort of
        thing is to set directories to 755 but files to 644 with the following
        value ``u+rwX,go+rX,go-w``. This will be applied recursively to the
        resulting path on the container. If you set this value to ``None``,
        then the chmod command will not be run.

    """

    # The docker API only accepts in-memory tar archives
    c = six.moves.cStringIO()
    with tarfile.open(mode='w', fileobj=c) as tar:
      tar.add(src, arcname=os.path.basename(src))
    archive = c.getvalue()

    # Place the tarball into the container
    path = os.path.join(dest, os.path.basename(src))
    logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest)
    self.client.put_archive(container, dest, archive)

    if chmod is not None:
      # Change permissions to access the path
      ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest])
      output = self.client.exec_start(ex) #waits until it is executed


283
284
  def create_container(self, image, command, tmp_path=None, host_args=None,
      **args):
285
286
287
288
289
290
291
292
293
294
    """Prepares the docker container for running the user code


    Parameters:

      image (str): A unique identifier for the image to create

      command (list): A list of strings with the command to run inside the
        container.

295
296
297
      tmp_path (str): A path with a file name or directory that will be
        copied into the container (inside ``/tmp``), supposedly with
        information that is used by the command.
298

299
300
301
302
303
304
      host_args (dict): A dictionary that will be transformed into a
        ``HostConfig`` object that will be used for the creation of the
        container.  It is the place too add/drop capabilities for the container
        or to set maximum resource usage (CPU, memory, etc). Keys accepted are
        not arbitrary. Check docker_py's manual for details.

305
306
      args (dict): A list of extra arguments to pass to the underlying
        ``create_container()`` call from the docker Python API.
André Anjos's avatar
André Anjos committed
307
308


309
310
311
312
313
314
315
    Returns:

      docker.Container: The container, already preloaded and ready to run. The
        container is not started by this method.

    """

André Anjos's avatar
André Anjos committed
316
317
    if image in self: #replace by a real image name
      image = self.env2docker(image)
318

319
320
321
322
323
    config_args = dict(
        log_config = docker.utils.LogConfig(type='',
          config={'max-size': '1M', 'max-file': '1'}),
        )

324
325
    if host_args is not None:
      config_args.update(host_args)
326

327
328
329
330
331
332
333
    # see bug: https://github.com/docker/docker-py/issues/1195
    # fix on docker-engine 1.13.0 and superior
    # see changelog: https://github.com/docker/docker/blob/master/CHANGELOG.md
    args['network_disabled'] = False

    # for HostConfig (keys are not arbitrary) - see:
    # http://docker-py.readthedocs.io/en/latest/hostconfig/
334
335
    args['host_config'] = self.client.create_host_config(**config_args)

André Anjos's avatar
André Anjos committed
336
    logger.debug("[docker] create_container %s %s", image, ' '.join(command))
337
338
339
340
    container = self.client.create_container(image=image, command=command,
        **args)
    self.containers.append(container)

341
342
    if tmp_path is not None:
      self.put_path(container, tmp_path)
343
344
345
346
347
348
349

    return container


  def start(self, container):
    '''Starts a given container'''

André Anjos's avatar
André Anjos committed
350
    logger.debug("[docker] start %s", container['Id'][:12])
351
352
353
354
355
356
357
358
    self.client.start(container)


  def status(self, container):
    '''Checks the status of a given container

    '''

André Anjos's avatar
André Anjos committed
359
    try:
André Anjos's avatar
André Anjos committed
360
      logger.debug("[docker] inspect %s", container['Id'][:12])
361
362
363
364
      z = self.client.inspect_container(container)
      return z['State']['Status']
    except Exception as e:
      return None
André Anjos's avatar
André Anjos committed
365
366


367
368
  def rm(self, container):
    '''Removes a given container. If it is not done, kill it first'''
André Anjos's avatar
André Anjos committed
369

370
371
372
373
374
375
    status = self.status(container)

    if status not in ('created', 'exited'):
      logger.warn("Killing container `%s' which is on state `%s'",
          container['Id'], status)
      self.client.kill(container)
André Anjos's avatar
André Anjos committed
376

André Anjos's avatar
André Anjos committed
377
378
    logger.debug("[docker] rm %s", container['Id'][:12])
    self.client.remove_container(container)
379
    self.containers.remove(container)
André Anjos's avatar
André Anjos committed
380
381


382
383
  def get_statusoutput(self, image, command, **kwargs):
    '''Runs a command and retrieves status and output'''
André Anjos's avatar
André Anjos committed
384

385
    container = None
386
    try:
387
      container = self.create_container(image=image, command=command, **kwargs)
388
      self.start(container)
389
390
391
392
393
      status = self.client.wait(container)
      output = self.client.logs(container)
    except Exception:
      return 1, None
    finally:
394
      if container is not None: self.rm(container)
395
396

    return status, output
André Anjos's avatar
André Anjos committed
397
398


399
400
class Popen:
  '''Process-like manager for asynchronously executing user code in a container
André Anjos's avatar
André Anjos committed
401

402
403
404
  The process will be run in the background, and its standard output and
  standard error will be read after it finishes, into a limited size circular
  buffer.
André Anjos's avatar
André Anjos committed
405
406


407
408
409
410
  Parameters:

    host (:py:class:`Host`): The docker host where to start the container.
      The host must be properly initialized, including starting the
411
      appropriate docker-machine, if that is the case (environment setup).
412
413
414
415
416
417
418

    image (str): A string uniquely identifying the image from which to start
      the container

    command (list): A list of strings with the command to run inside the
      container.

419
    path (str, Optional): An archive to copy into the temporary
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
      directory of the container, supposedly with information that is used by
      the command. If a string is given, than it is considered as a path from
      which the archive is going to be created.

    virtual_memory_in_megabytes (int, Optional): The maximum amount of memory
      the user process can consume on the host. If not specified, a memory
      limit is not set.

    max_cpu_percent (float, Optional): The maximum amount of CPU the user
      process may consume on the host. The value ``100`` equals to using 100%
      of a single core. If not specified, then a CPU limitation is not put in
      place.

  '''

  def __init__(self, host, image, command, tmp_archive=None,
      virtual_memory_in_megabytes=0, max_cpu_percent=0, **args):

    self.host = host

    args.update({
441
        'user': 'root', #user `nobody' cannot access the tmp archive...
442
443
444
445
446
        'tty': False,
        'detach': True,
        'stdin_open': False,
        })

447
    host_args = dict()
448

449
450
451
452
453
    if virtual_memory_in_megabytes:
      # For this to work properly, memory swap limitation has to be enabled on
      # the kernel. This typically goes by setting "cgroup_enable=memory" as a
      # boot parameter to kernels which are compiled with this support.
      # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
André Anjos's avatar
André Anjos committed
454

455
456
457
458
      host_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm'
      host_args['memswap_limit'] = host_args['mem_limit']
      logger.debug('Setting maximum memory to %dMB' % \
          (virtual_memory_in_megabytes,))
459

460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
    if max_cpu_percent:
      # The period corresponds to the scheduling interval for the CFS in Linux
      # The quota corresponds to a fraction or a multiple of the period, the
      # container will get. A quota that is 2x the period gets the container up
      # to 200% cpu time (2 cores). If the quota is 0.5x the period, the
      # container gets up to 50% the cpu time. Each core represents 100%. A
      # system with 2 cores has 200% computing power.
      #
      # More info:
      # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
      #
      # For this to work properly, CPU bandwidth provisioning for the Linux CFS
      # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
      #
      # If your system is running on a virtual machine, having more cores
      # available to docker engine normally translates to more precise
      # scheduling.
      period = 100000 #microseconds

      quota = max_cpu_percent/100.
      host_args['cpu_period'] = period
      host_args['cpu_quota'] = int(period * quota)
      logger.debug('Setting CPU quota to %d%%' % max_cpu_percent)
483

484
485
486
487
    # creates the container
    self.container = self.host.create_container(image=image,
        command=command, tmp_path=tmp_archive,
        host_args=host_args, **args)
488
489
490
491
492
493
494
495
496
497
498
499
500
501

    # Starts the container
    self.host.start(self.container)

    # Gets start point statistics
    self.previous_stats = None


  def __enter__(self):
    return self


  def __exit__(self, *exc):
    self.rm()
André Anjos's avatar
André Anjos committed
502
503


André Anjos's avatar
André Anjos committed
504
505
506
507
508
  @property
  def pid(self):
    return self.container['Id'][:12]


509
510
511
512
513
514
515
516
517
518
519
520
  def wait(self, timeout=None):
    '''Reads stdout and stderr until the underlying processes finishes

    Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which
    we read the stdout and stderr data into a circular buffer, keep only the
    last N bytes of each stream.

    This method will call :py:meth:`file.readline` on both stdout and stderr
    streams attached to the process. These methods are "green". They will yield
    once they are blocked.


521
522
523
524
525
526
    Parameters:

      timeout (float, Optional): A timeout in seconds to wait for the user
        process to finish. If a timeout value is not given, waits forever.


527
528
529
530
531
532
533
    Returns:

      int: Returns the status code of the process


    Raises:

534
      requests.exceptions.ReadTimeout: if the process times out
535
536
537

    '''

538
    return self.host.client.wait(self.container, timeout)
539

540
541
542
  @property
  def stdout(self):
    '''Returns the stdout'''
543

544
545
546
547
548
549
550
551
    return self.host.client.logs(self.container, stdout=True, stderr=False)


  @property
  def stderr(self):
    '''Returns the stderr'''

    return self.host.client.logs(self.container, stdout=False, stderr=True)
552
553


554
555
  def rm(self):
    '''Remove the container. If it is not stopped yet, kill it first'''
556

557
    self.host.rm(self.container)
558
559


560
561
  def status(self):
    '''Returns my own "docker" status'''
562

563
    return self.host.status(self.container)
André Anjos's avatar
André Anjos committed
564
565
566


  def kill(self):
567
    '''Stop container'''
André Anjos's avatar
André Anjos committed
568

569
570
571
572
573
574
575
    if self.status() == 'running':
        self.host.client.kill(self.container)


  def _stats(self):

    return self.host.client.stats(self.container, decode=True, stream=False)
André Anjos's avatar
André Anjos committed
576
577
578


  def statistics(self):
579
    '''If the process is still active, returns usage statistics by ``pusutil``
André Anjos's avatar
André Anjos committed
580
581
582
583
584
585
586
587
588
589
590


    Returns:

      stats (dict): A dictionary object with all collected statistics


    Raises:

      RuntimeError: In case the process is not active anymore.

591
    '''
André Anjos's avatar
André Anjos committed
592

593
    data = self._stats()
André Anjos's avatar
André Anjos committed
594

595
596
    previous_cpu = self.previous_stats['cpu_stats'] \
        if self.previous_stats else None
André Anjos's avatar
André Anjos committed
597
598
599

    # merge statistics
    retval = dict(
600
601
            cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']),
            memory=stats.memory_statistics(data['memory_stats']),
André Anjos's avatar
André Anjos committed
602
            )
603
604
605

    self.previous_stats = data

André Anjos's avatar
André Anjos committed
606
    return retval