worker.py 13.5 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Starts the worker process (%(version)s)

Usage:
  %(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
           [--cache=<path>] [--docker] <address>
  %(prog)s (--help | -h)
  %(prog)s (--version | -V)


Options:
  -h, --help                Show this screen
  -V, --version             Show version
  -v, --verbose             Increases the output verbosity level
  -n <name>, --name=<name>  The unique name of this worker on the database.
                            This is typically the assigned hostname of the node,
                            but not necessarily [default: %(hostname)s]
45
  -p, --prefix=<path>       Comma-separated list of the prefix(es) of your local data [default: .]
Philip ABBET's avatar
Philip ABBET committed
46
47
48
49
50
51
  -c, --cache=<path>        Cache prefix, otherwise defaults to '<prefix>/cache'

"""

import os
import sys
Philip ABBET's avatar
Philip ABBET committed
52
53
import logging
import zmq
Philip ABBET's avatar
Philip ABBET committed
54
55
import signal
import simplejson
Philip ABBET's avatar
Philip ABBET committed
56
import multiprocessing
57
58
59
60
61
try:
    import Queue
except ImportError:
    import queue as Queue

Philip ABBET's avatar
Philip ABBET committed
62
import tempfile
Philip ABBET's avatar
Philip ABBET committed
63
64
65
66
67
68
69
70
71
72
from docopt import docopt
from socket import gethostname

from ..version import __version__
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor
from ..dock import Host
from ..worker import WorkerController

stop = False
73
logger = None
Philip ABBET's avatar
Philip ABBET committed
74
75


Philip ABBET's avatar
Philip ABBET committed
76
77
78
79
#----------------------------------------------------------


class ExecutionProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
80

81
    def __init__(self, queue, job_id, prefix, data, cache, docker, images_cache=None):
Philip ABBET's avatar
Philip ABBET committed
82
83
84
        super(ExecutionProcess, self).__init__()

        self.queue = queue
85
        self.job_id = job_id
Philip ABBET's avatar
Philip ABBET committed
86
87
88
89
90
        self.prefix = prefix
        self.data = data
        self.cache = cache
        self.docker = docker
        self.images_cache = images_cache
Philip ABBET's avatar
Philip ABBET committed
91
92
93


    def run(self):
Philip ABBET's avatar
Philip ABBET committed
94
95
96
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        signal.signal(signal.SIGINT, signal.SIG_DFL)

97
        logger.debug("Process (pid=%d) started for job #%s", self.pid, self.job_id)
Philip ABBET's avatar
Philip ABBET committed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
        self.queue.put('STARTED')

        # Create the executor
        try:
            if self.docker:
                host = Host(images_cache=self.images_cache, raise_on_errors=False)
                executor = DockerExecutor(host, self.prefix, self.data, cache=self.cache)
            else:
                executor = LocalExecutor(self.prefix, self.data, cache=self.cache)

            if not executor.valid:
                self.queue.put(dict(
                    error = "Failed to load the execution information",
                    details = executor.errors
                ))
                return

            # Execute the algorithm
            with executor:
                result = executor.process()
Philip ABBET's avatar
Philip ABBET committed
118

Philip ABBET's avatar
Philip ABBET committed
119
120
121
122
123
124
125
126
            self.queue.put(dict(
                result = result
            ))
        except:
            import traceback
            self.queue.put(dict(
                system_error = traceback.format_exc()
            ))
Philip ABBET's avatar
Philip ABBET committed
127

128
129
130
131
132
133
134
135
136
137
138
139
140
        self.queue.close()

        logger.debug("Process (pid=%d) done", self.pid)
        return 0


#----------------------------------------------------------


def connect_to_scheduler(address, name):
    # Starts our 0MQ server
    context = zmq.Context()
    socket = context.socket(zmq.DEALER)
141
    socket.setsockopt_string(zmq.IDENTITY, name)
142
143
144
145
146
147
148
149
150
151
152

    if address.find('://') < 0:
        address = 'tcp://' + address

    socket.connect(address)
    logger.info("Connected to '%s'", address)

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)

    # Tell the scheduler we are ready
153
    socket.send(WorkerController.READY)
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

    # Wait for a response from the scheduler
    logger.info("Waiting for the scheduler...")

    while not stop:
        socks = dict(poller.poll(100))
        if not (socket in socks) or (socks[socket] != zmq.POLLIN):
            continue

        response = socket.recv()

        if response != WorkerController.ACK:
            logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'",
                         address, WorkerController.ACK, response)
            socket.setsockopt(zmq.LINGER, 0)
            socket.close()
            context.destroy()
            return (None, None, None)

        break

    if stop:
        socket.setsockopt(zmq.LINGER, 0)
        socket.close()
        context.destroy()
        return (None, None, None)

    logger.info("The scheduler answered")
    return (context, socket, poller)

Philip ABBET's avatar
Philip ABBET committed
184

Philip ABBET's avatar
Philip ABBET committed
185
#----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209


def main(user_input=None):

    # Parse the command-line arguments
    if user_input is not None:
        arguments = user_input
    else:
        arguments = sys.argv[1:]

    prog = os.path.basename(sys.argv[0])
    completions = dict(
        prog=prog,
        version=__version__,
        hostname=gethostname(),
    )
    args = docopt(
        __doc__ % completions,
        argv=arguments,
        options_first=True,
        version='v%s' % __version__,
    )


210
211
212
213
214
215
216
217
    # Setup the logging
    formatter = logging.Formatter(fmt="[%(asctime)s - Worker '" + args['--name'] + \
                                      "' - %(name)s] %(levelname)s: %(message)s",
                                  datefmt="%d/%b/%Y %H:%M:%S")

    handler = logging.StreamHandler()
    handler.setFormatter(formatter)

218
219
220
221
222
    beat_core_logger = logging.getLogger('beat.core')
    beat_core_logger.addHandler(handler)

    beat_backend_logger = logging.getLogger('beat.backend.python')
    beat_backend_logger.addHandler(handler)
223

Philip ABBET's avatar
Philip ABBET committed
224
    if args['--verbose'] == 1:
225
226
227
228
229
230
231
232
        beat_core_logger.setLevel(logging.INFO)
        beat_backend_logger.setLevel(logging.INFO)
    elif args['--verbose'] == 2:
        beat_core_logger.setLevel(logging.DEBUG)
        beat_backend_logger.setLevel(logging.INFO)
    elif args['--verbose'] >= 3:
        beat_core_logger.setLevel(logging.DEBUG)
        beat_backend_logger.setLevel(logging.DEBUG)
Philip ABBET's avatar
Philip ABBET committed
233
    else:
234
235
        beat_core_logger.setLevel(logging.WARNING)
        beat_backend_logger.setLevel(logging.WARNING)
Philip ABBET's avatar
Philip ABBET committed
236

237
    global logger
238
239
    logger = logging.getLogger(__name__)

Philip ABBET's avatar
Philip ABBET committed
240
241

    # Check the prefix path
242
243
244
245
    prefix = args['--prefix'] if args['--prefix'] is not None else '.'
    if not os.path.exists(prefix):
        logger.error("Prefix not found at: '%s'", prefix)
        return 1
Philip ABBET's avatar
Philip ABBET committed
246
247
248
249
250
251
252
253
254
255
256


    # Check the cache path
    cache = args['--cache'] if args['--cache'] is not None else os.path.join(prefix, 'cache')
    if not os.path.exists(cache):
        logger.error("Cache not found at: '%s'", cache)
        return 1


    # Install a signal handler
    def handler(signum, frame):
Philip ABBET's avatar
Philip ABBET committed
257
        # Ignore further signals
Philip ABBET's avatar
Philip ABBET committed
258
259
260
261
262
263
264
265
266
267
268
269
        signal.signal(signal.SIGTERM, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)

        logger.info("Signal %d caught, terminating...", signum)
        global stop
        stop = True

    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)


    # (If necessary) Docker-related initialisations
Philip ABBET's avatar
Philip ABBET committed
270
    docker_images_cache = None
Philip ABBET's avatar
Philip ABBET committed
271
    if args['--docker']:
Philip ABBET's avatar
Philip ABBET committed
272
        docker_images_cache = os.path.join(tempfile.gettempdir(), 'beat-docker-images.json')
273
        logger.info("Using docker images cache: '%s'", docker_images_cache)
Philip ABBET's avatar
Philip ABBET committed
274
        host = Host(images_cache=docker_images_cache, raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
275
276


277
278
279
280
    # Establish a connection with the scheduler
    (context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
    if context is None:
        return 1
Philip ABBET's avatar
Philip ABBET committed
281

Philip ABBET's avatar
Philip ABBET committed
282
283

    # Process the requests
284
    execution_processes = []
285
286
    scheduler_available = True
    global stop
Philip ABBET's avatar
Philip ABBET committed
287
288

    while not stop:
289
290
291
292
293
294
295
        # If necessary, wait for the comeback of the scheduler
        if not scheduler_available:
            (context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
            if context is None:
                break
            scheduler_available = True

Philip ABBET's avatar
Philip ABBET committed
296
        # Send the result of the processing (if any)
297
        for execution_process in execution_processes:
298
299
300
301
            try:
                result = execution_process.queue.get_nowait()
            except Queue.Empty:
                continue
Philip ABBET's avatar
Philip ABBET committed
302

303
            execution_process.join()
Philip ABBET's avatar
Philip ABBET committed
304

305
            if 'result' in result:
306
                content = simplejson.dumps(result['result'])
307

308
309
310
                status = WorkerController.DONE
                if result['result']['status'] != 0:
                    status = WorkerController.JOB_ERROR
311

312
313
                logger.info("Job #%s completed", execution_process.job_id)
                logger.debug('send: """%s"""' % content.rstrip())
Philip ABBET's avatar
Philip ABBET committed
314

315
316
317
318
319
                message = [
                    status,
                    execution_process.job_id,
                    content
                ]
320
            elif 'error' in result:
321
                logger.error(result['error'])
Philip ABBET's avatar
Philip ABBET committed
322

323
324
325
326
                message = [
                    WorkerController.JOB_ERROR,
                    execution_process.job_id,
                ]
Philip ABBET's avatar
Philip ABBET committed
327

328
                message += result['details']
Philip ABBET's avatar
Philip ABBET committed
329

330
331
332
333
334
335
336
337
            else:
                logger.error(result['system_error'])

                message = [
                    WorkerController.ERROR,
                    execution_process.job_id,
                    result['system_error']
                ]
Philip ABBET's avatar
Philip ABBET committed
338

339
            socket.send_multipart(message)
Philip ABBET's avatar
Philip ABBET committed
340

341
            execution_processes.remove(execution_process)
Philip ABBET's avatar
Philip ABBET committed
342
343


344
        if len(execution_processes) == 0:
Philip ABBET's avatar
Philip ABBET committed
345
346
347
348
            timeout = 1000 # ms
        else:
            timeout = 100

Philip ABBET's avatar
Philip ABBET committed
349

Philip ABBET's avatar
Philip ABBET committed
350
351
352
353
        socks = dict(poller.poll(timeout))
        if not (socket in socks) or (socks[socket] != zmq.POLLIN):
            continue

Philip ABBET's avatar
Philip ABBET committed
354

Philip ABBET's avatar
Philip ABBET committed
355
356
357
358
359
360
361
        # Read the next command
        parts = socket.recv_multipart()

        command = parts[0]

        logger.debug("recv: %s", command)

Philip ABBET's avatar
Philip ABBET committed
362

Philip ABBET's avatar
Philip ABBET committed
363
364
365
366
367
        # Command: execute <job-id> <json-command>
        if command == WorkerController.EXECUTE:
            job_id = parts[1]
            data = simplejson.loads(parts[2])

Philip ABBET's avatar
Philip ABBET committed
368
            # Start the execution
369
            logger.info("Running '%s' with job id #%s", data['algorithm'], job_id)
Philip ABBET's avatar
Philip ABBET committed
370

371
372
373
            execution_process = ExecutionProcess(multiprocessing.Queue(), job_id, prefix,
                                                 data, cache, docker=args['--docker'],
                                                 images_cache=docker_images_cache)
Philip ABBET's avatar
Philip ABBET committed
374
            execution_process.start()
Philip ABBET's avatar
Philip ABBET committed
375

Philip ABBET's avatar
Philip ABBET committed
376
            execution_process.queue.get()
Philip ABBET's avatar
Philip ABBET committed
377

378
379
            execution_processes.append(execution_process)

Philip ABBET's avatar
Philip ABBET committed
380
381
382

        # Command: cancel
        elif command == WorkerController.CANCEL:
383
384
385
            job_id = parts[1]

            try:
Philip ABBET's avatar
Philip ABBET committed
386
                execution_process = [ p for p in execution_processes if p.job_id == job_id ][0]
387
            except:
Philip ABBET's avatar
Philip ABBET committed
388
389
                socket.send_multipart([
                    WorkerController.ERROR,
390
                    "Unknown job: %s" % job_id
Philip ABBET's avatar
Philip ABBET committed
391
392
393
394
                ])
                continue

            # Kill the processing thread
395
            logger.info("Cancelling the job #%s", execution_process.job_id)
396

Philip ABBET's avatar
Philip ABBET committed
397
398
            execution_process.terminate()
            execution_process.join()
399
            execution_processes.remove(execution_process)
Philip ABBET's avatar
Philip ABBET committed
400
401
402

            socket.send_multipart([
                WorkerController.CANCELLED,
403
                job_id,
Philip ABBET's avatar
Philip ABBET committed
404
405
406
            ])


407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
        # Command: scheduler shutdown
        elif command == WorkerController.SCHEDULER_SHUTDOWN:
            logger.info("The scheduler shut down, we will wait for it")
            scheduler_available = False

            socket.setsockopt(zmq.LINGER, 0)
            socket.close()
            context.destroy()

            poller = None
            socket = None
            context = None


    if socket:
422
        socket.send(WorkerController.EXIT)
Philip ABBET's avatar
Philip ABBET committed
423
424
425


    # Cleanup
426
    for execution_process in execution_processes:
Philip ABBET's avatar
Philip ABBET committed
427
428
429
        execution_process.terminate()
        execution_process.join()

430
431
432
433
    if context:
        socket.setsockopt(zmq.LINGER, 0)
        socket.close()
        context.destroy()
Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
436
437
    if (docker_images_cache is not None) and os.path.exists(docker_images_cache):
        os.remove(docker_images_cache)

Philip ABBET's avatar
Philip ABBET committed
438
    return 0