agent.py 13.9 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import os
import shutil
31
import simplejson
32
import glob
André Anjos's avatar
André Anjos committed
33
34
35
36

import logging
logger = logging.getLogger(__name__)

37
38
import gevent
import zmq.green as zmq
André Anjos's avatar
André Anjos committed
39

40
41
import requests
from gevent import monkey
42
43
monkey.patch_socket(dns=False)
monkey.patch_ssl()
44

André Anjos's avatar
André Anjos committed
45
from . import utils
46
from . import dock
André Anjos's avatar
André Anjos committed
47
48
from . import baseformat

49
from beat.backend.python.message_handler import MessageHandler
50

André Anjos's avatar
André Anjos committed
51

52
53
54
55
56
57
58
59
60
61
class Server(MessageHandler):
  '''A 0MQ server for our communication with the user process'''

  def __init__(self, input_list, output_list, host_address):

    # Starts our 0MQ server
    self.context = zmq.Context()
    self.socket = self.context.socket(zmq.PAIR)

    self.address = 'tcp://' + host_address
62
    port = self.socket.bind_to_random_port(self.address, min_port=50000)
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    self.address += ':%d' % port
    logger.debug("zmq server bound to `%s'", self.address)

    super(Server, self).__init__(input_list, self.context, self.socket)

    self.output_list = output_list

    # implementations
    self.callbacks.update(dict(
      wrt = self.write,
      idm = self.is_data_missing,
      oic = self.output_is_connected,
    ))


78
79
80
81
  def destroy(self):
    self.context.destroy()


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
  def __str__(self):
    return 'Server(%s)' % self.address


  def _get_output_candidate(self, name):

    retval = self.output_list[name]
    if retval is None: raise RuntimeError("Could not find output `%s'" % name)
    return retval


  def write(self, name, packed):
    """Syntax: wrt output data"""

    logger.debug('recv: wrt %s <bin> (size=%d)', name, len(packed))

    # Get output object
    output_candidate = self._get_output_candidate(name)
    if output_candidate is None:
      raise RuntimeError("Could not find output `%s' to write to" % name)

    data = output_candidate.data_sink.dataformat.type()
    data.unpack(packed)
    output_candidate.write(data)

    logger.debug('send: ack')
    self.socket.send('ack')


  def is_data_missing(self, name):
    """Syntax: idm output"""

    logger.debug('recv: idm %s', name)

    output_candidate = self._get_output_candidate(name)
    what = 'tru' if output_candidate.isDataMissing() else 'fal'
    logger.debug('send: %s', what)
    self.socket.send(what)


  def output_is_connected(self, name):
    """Syntax: oic output"""

    logger.debug('recv: oic %s', name)

    output_candidate = self._get_output_candidate(name)
    what = 'tru' if output_candidate.isConnected() else 'fal'
    logger.debug('send: %s', what)
    self.socket.send(what)



André Anjos's avatar
André Anjos committed
134
class Agent(object):
135
  '''Handles synchronous commands.
André Anjos's avatar
André Anjos committed
136

137
138
  We use the greenlets for this implementation. Objects of this class are in
  charge of three separate tasks:
André Anjos's avatar
André Anjos committed
139

140
  1. Handling the execution of the user process (in a docker container)
André Anjos's avatar
André Anjos committed
141
142
143
144
145
146
147
148
149
150
151
152
153
  3. Implementing a pipe-based API for I/O that the user process can query


  Parameters:

    virtual_memory_in_megabytes (int, optional): The amount of virtual memory
      (in Megabytes) available for the job. If set to zero, no limit will be
      applied.

    max_cpu_percent (int): The maximum amount of CPU usage allowed in a system.
      This number must be an integer number between 0 and
      ``100*number_of_cores`` in your system. For instance, if your system has
      2 cores, this number can go between 0 and 200. If it is <= 0, then we
154
      don't track CPU usage.
André Anjos's avatar
André Anjos committed
155
156
157

  '''

158
  def __init__(self, virtual_memory_in_megabytes, max_cpu_percent):
André Anjos's avatar
André Anjos committed
159
160
161
162

    self.virtual_memory_in_megabytes = virtual_memory_in_megabytes
    self.max_cpu_percent = max_cpu_percent
    self.tempdir = None
163
    self.db_tempdir = None
André Anjos's avatar
André Anjos committed
164
    self.process = None
165
    self.db_process = None
166
    self.server = None
André Anjos's avatar
André Anjos committed
167
168
169
170
171
172
173
174
175
176


  def __enter__(self):
    '''Start of context manager'''

    logger.debug("Entering processing context...")

    # Creates a temporary directory for the user process
    self.tempdir = utils.temporary_directory()
    logger.debug("Created temporary directory `%s'", self.tempdir)
177
178
    self.db_tempdir = utils.temporary_directory()
    logger.debug("Created temporary directory `%s'", self.db_tempdir)
André Anjos's avatar
André Anjos committed
179
    self.process = None
180
    self.db_process = None
André Anjos's avatar
André Anjos committed
181
182
183
184
185
186
187
188
189

    return self

  def __exit__(self, exc_type, exc_value, traceback):

    if self.tempdir is not None and os.path.exists(self.tempdir):
      shutil.rmtree(self.tempdir)
      self.tempdir = None

190
191
192
193
    if self.db_tempdir is not None and os.path.exists(self.db_tempdir):
      shutil.rmtree(self.db_tempdir)
      self.db_tempdir = None

André Anjos's avatar
André Anjos committed
194
    self.process = None
195
    self.db_process = None
André Anjos's avatar
André Anjos committed
196
197
198
    logger.debug("Exiting processing context...")


199
  def run(self, configuration, host, timeout_in_minutes=0, daemon=0, db_address=None):
200
    """Runs the algorithm code
André Anjos's avatar
André Anjos committed
201
202
203
204
205
206
207


    Parameters:

      configuration (object): A *valid*, preloaded
        :py:class:`beat.core.execution.Executor` object.

208
209
210
211
      host (:py:class:Host): A configured docker host that will execute the
        user process. If the host does not have access to the required
        environment, an exception will be raised.

212
213
      timeout_in_minutes (int): The number of minutes to wait for the user
        process to execute. After this amount of time, the user process is
André Anjos's avatar
André Anjos committed
214
215
216
        killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
        be applied.

217
218
219
220
      daemon (int): If this variable is set, then we don't really start the
        user process, but just kick out 0MQ server, print the command-line and
        sleep for that many seconds. You're supposed to start the client by
        hand then and debug it.
André Anjos's avatar
André Anjos committed
221
222
223
224
225
226

    """

    # Recursively copies configuration data to <tempdir>/prefix
    configuration.dump_runner_configuration(self.tempdir)

227
228
229
    if db_address is not None:
      configuration.dump_databases_provider_configuration(self.db_tempdir)

230
231
232
233
234
      # Modify the paths to the databases in the dumped configuration files
      root_folder = os.path.join(self.db_tempdir, 'prefix', 'databases')

      database_paths = {}

235
236
237
      if not configuration.data.has_key('datasets_root_path'):
        for db_name in configuration.databases.keys():
          json_path = os.path.join(root_folder, db_name + '.json')
238

239
240
          with open(json_path, 'r') as f:
            db_data = simplejson.load(f)
241

242
243
          database_paths[db_name] = db_data['root_folder']
          db_data['root_folder'] = os.path.join('/databases', db_name)
244

245
246
          with open(json_path, 'w') as f:
            simplejson.dump(db_data, f, indent=4)
247

André Anjos's avatar
André Anjos committed
248
    # Server for our single client
249
    self.server = Server(configuration.input_list, configuration.output_list,
250
        host.ip)
André Anjos's avatar
André Anjos committed
251

252
    # Figures out the images to use
253
254
255
256
257
258
    envkey = '%(name)s (%(version)s)' % configuration.data['environment']
    if envkey not in host:
      raise RuntimeError("Environment `%s' is not available on docker " \
          "host `%s' - available environments are %s" % (envkey, host,
            ", ".join(host.environments.keys())))

259
260
261
262
263
264
265
266
267
    if db_address is not None:
      try:
        db_envkey = host.db2docker(database_paths.keys())
      except:
        raise RuntimeError("No environment found for the databases `%s' " \
            "- available environments are %s" % (
              ", ".join(database_paths.keys()),
              ", ".join(host.db_environments.keys())))

André Anjos's avatar
André Anjos committed
268
    # Launches the process (0MQ client)
269
    tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir))
270
    cmd = ['execute', self.server.address, tmp_dir]
André Anjos's avatar
André Anjos committed
271
272
273
    if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug')

    if daemon > 0:
274
      image = host.env2docker(envkey)
André Anjos's avatar
André Anjos committed
275
      logger.debug("Daemon mode: start the user process with the following " \
276
              "command: `docker run -ti %s %s'", image, ' '.join(cmd))
André Anjos's avatar
André Anjos committed
277
278
      cmd = ['sleep', str(daemon)]
      logger.debug("Daemon mode: sleeping for %d seconds", daemon)
279
    else:
280
281
      if db_address is not None:
        tmp_dir = os.path.join('/tmp', os.path.basename(self.db_tempdir))
282
283
284
285
        db_cmd = ['databases_provider', db_address, tmp_dir]

        volumes = {}

286
287
288
289
290
291
292
293
294
        if not configuration.data.has_key('datasets_root_path'):
          for db_name, db_path in database_paths.items():
            volumes[db_path] = {
              'bind': os.path.join('/databases', db_name),
              'mode': 'ro',
            }
        else:
          volumes[configuration.data['datasets_root_path']] = {
            'bind': configuration.data['datasets_root_path'],
295
296
            'mode': 'ro',
          }
297

298
        # Note: we only support one databases image loaded at the same time
299
300
        self.db_process = dock.Popen(
          host,
301
          db_envkey,
302
303
          command=db_cmd,
          tmp_archive=self.db_tempdir,
304
          volumes=volumes
305
306
        )

307
308
      volumes = {}
      if not configuration.proxy_mode:
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
        volumes[configuration.cache] = {
          'bind': '/cache',
          'mode': 'rw',
        }

        # for name, details in configuration.data['inputs'].items():
        #   if 'database' in details:
        #     continue
        #
        #   basename = os.path.join(configuration.cache, details['path'])
        #   filenames = glob.glob(basename + '*.data')
        #   filenames.extend(glob.glob(basename + '*.data.checksum'))
        #   filenames.extend(glob.glob(basename + '*.data.index'))
        #   filenames.extend(glob.glob(basename + '*.data.index.checksum'))
        #
        #   for filename in filenames:
        #     volumes[filename] = {
        #       'bind': os.path.join('/cache', filename.replace(configuration.cache + '/', '')),
        #       'mode': 'ro',
        #     }
        #
        # if 'result' in configuration.data:
        #   outputs_config = {
        #     'result': configuration.data['result']
        #   }
        # else:
        #   outputs_config = configuration.data['outputs']
        #
        # for name, details in outputs_config.items():
        #   basename = os.path.join(configuration.cache, details['path'])
        #   dirname = os.path.dirname(basename)
        #
        #   volumes[dirname] = {
        #     'bind': os.path.join('/cache', dirname.replace(configuration.cache + '/', '')),
        #     'mode': 'rw',
        #   }
345

346
      self.process = dock.Popen(
347
348
349
350
351
352
        host,
        envkey,
        command=cmd,
        tmp_archive=self.tempdir,
        virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
        max_cpu_percent=self.max_cpu_percent,
353
        volumes=volumes
354
      )
André Anjos's avatar
André Anjos committed
355
356
357

    # provide a tip on how to stop the test
    if daemon > 0:
358
359
      logger.debug("To stop the daemon, press CTRL-c or kill the user " \
              "process with `docker kill %s`", self.process.pid)
André Anjos's avatar
André Anjos committed
360
361

    # Serve asynchronously
362
363
    self.server.set_process(self.process)
    self.server.start()
André Anjos's avatar
André Anjos committed
364
365
366
367
368
369
370

    timed_out = False

    try:
      timeout = (60*timeout_in_minutes) if timeout_in_minutes else None
      status = self.process.wait(timeout)

371
    except requests.exceptions.ReadTimeout:
André Anjos's avatar
André Anjos committed
372
373
374
375
      logger.warn("user process has timed out after %d minutes",
              timeout_in_minutes)
      self.process.kill()
      status = self.process.wait()
376
377
378
379
380

      if self.db_process is not None:
        self.db_process.kill()
        self.db_process.wait()

André Anjos's avatar
André Anjos committed
381
382
383
384
385
386
387
      timed_out = True

    except KeyboardInterrupt: #developer pushed CTRL-C
      logger.info("stopping user process on CTRL-C console request")
      self.process.kill()
      status = self.process.wait()

388
389
390
391
      if self.db_process is not None:
        self.db_process.kill()
        self.db_process.wait()

André Anjos's avatar
André Anjos committed
392
    finally:
393
      self.server.stop.set()
André Anjos's avatar
André Anjos committed
394
395
396
397

    # Collects final information and returns to caller
    process = self.process
    self.process = None
398
    retval = dict(
399
400
401
402
        stdout = process.stdout,
        stderr = process.stderr,
        status = status,
        timed_out = timed_out,
403
404
405
        statistics = self.server.last_statistics,
        system_error = self.server.system_error,
        user_error = self.server.user_error,
406
407
        )
    process.rm()
408
409
410
411
412
413
414

    if self.db_process is not None:
      retval['stdout'] += '\n' + self.db_process.stdout
      retval['stderr'] += '\n' + self.db_process.stderr
      self.db_process.rm()
      self.db_process = None

415
    self.server.destroy()
416
    self.server = None
417
    return retval
André Anjos's avatar
André Anjos committed
418
419
420
421
422


  def kill(self):
    """Stops the user process by force - to be called from signal handlers"""

423
424
    if self.server is not None:
      self.server.kill()