worker.py 12.8 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

36
37
38
39
40
41
42
43

"""Starts the worker process (%(version)s)

Based on the Majordomo Protocol worker example of the ZMQ Guide.

Usage:
  %(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
           [--cache=<path>] [--docker] [--docker-network=<name>]
44
45
           [--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
           <broker_address>
46
47
48
49
50
  %(prog)s (--help | -h)
  %(prog)s (--version | -V)


Options:
51
52
53
54
55
56
57
58
59
60
61
  -h, --help                              Show this screen
  -V, --version                           Show version
  -v, --verbose                           Increases the output verbosity level
  -n <name>, --name=<name>                The unique name of this worker on the database.
                                          This is typically the assigned hostname of the node,
                                          but not necessarily [default: %(hostname)s]
  -p, --prefix=<path>                     Comma-separated list of the prefix(es) of your local data [default: .]
  -c, --cache=<path>                      Cache prefix, otherwise defaults to '<prefix>/cache'
  --docker-network=<name>                 Name of the docker network to use
  --port-range=<range>                    Range of port usable for communication with containers
  --cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
62
63
64
65
66
67
68

"""

import os
import sys
import signal
import tempfile
Samuel GAIST's avatar
Samuel GAIST committed
69
import simplejson as json
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

import zmq

from socket import gethostname
from docopt import docopt

from ..bcpapi import BCP
from ..bcpapi.worker import BeatComputationWorker
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.execution import ExecutionProcess
from ..dock import Host
from ..utils import find_free_port
from ..utils import setup_logging
from ..version import __version__


logger = None


def setup_signal_handler():
    """Install a signal handler"""

    def handler(signum, frame):
        global logger
        logger.info("Signal %d caught, stopping...", signum)
        global stop
        stop = True

    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)


def run(
    broker_address,
    service_name=gethostname(),
    verbose=1,
    prefix=None,
    cache=None,
    use_docker=False,
    docker_network_name=None,
    docker_port_range=None,
    docker_images_cache=None,
112
    docker_cache_mount_point=None,
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
):
    """Start the worker

    Parameters:

        broker_address (str): Address of the broker
        service_name (str): Name to advertise
        verbose (int): Verbosity level
        prefix (str): Path to the prefix
        cache (str): Path to the cache
        use_docker (bool): Whether to use docker as backend
        docker_network_name (str): Docker network name to use
        docker_port_range (str): Port range in the format <start:end>
        docker_images_cache (str): Path to docker images cache
    """

    global logger

    if logger is None:
        if isinstance(service_name, bytes):
            name = service_name.decode("utf-8")
        else:
            name = service_name

        logging_name = "Worker '{}'".format(name)
        logger = setup_logging(verbose, logging_name, __name__)

    setup_signal_handler()

    poller = zmq.Poller()

    port = find_free_port()
    processor_address = "tcp://*:{}".format(port)
    worker_address = "tcp://localhost:{}".format(port)

    worker = BeatComputationWorker(poller, broker_address, service_name, verbose == 3)
149
    processor = BeatComputationProcessor(poller, processor_address, verbose == 3)
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

    execution_processes = []
    global stop
    stop = False
    while not stop:
        try:
            items = dict(poller.poll(worker.timeout))
        except KeyboardInterrupt:
            stop = True
            continue

        if processor.sink in items:
            reply = processor.process()
            reply.pop(0)  # processor name
            status = reply[0]
            job_id = reply[1]

            # Processing job status
            if status == BCP.BCPP_JOB_DONE:
                logger.info("Job {} done".format(job_id))
                execution_process = next(
                    (p for p in execution_processes if p.job_id == job_id), None
                )
                if execution_process is None:
                    logger.warning("Done job {} not found".format(job_id))
                    reply = None
                else:
                    execution_processes.remove(execution_process)
                    del execution_process

            if verbose:
                logger.info("Sending {}".format(reply))

            if reply is not None:
                worker.send(reply)

        if worker.worker in items:
            request = worker.process()

            if request is None:
                # Received something other than request
                continue

            command = request.pop(0)
            # Command: execute <job-id> <json-command>
            if command == BCP.BCPE_EXECUTE:
                job_id = request.pop(0)
                job_data = request.pop(0)
                data = json.loads(job_data)

                reply = [BCP.BCPP_JOB_RECEIVED, job_id]
                worker.send(reply)

                if use_docker:
                    if docker_network_name:
                        data["network_name"] = docker_network_name
                    if docker_port_range:
                        data["port_range"] = docker_port_range
208
209
                    if docker_cache_mount_point:
                        data["cache_mount_point"] = docker_cache_mount_point
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326

                # Start the execution
                logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)

                execution_process = ExecutionProcess(
                    worker_address,
                    job_id,
                    prefix,
                    data,
                    cache,
                    docker=use_docker,
                    images_cache=docker_images_cache,
                )
                execution_processes.append(execution_process)
                execution_process.start()
                execution_process.queue.get()

                reply = [BCP.BCPP_JOB_STARTED, job_id]
                worker.send(reply)

            # Command: cancel
            elif command == BCP.BCPE_CANCEL:
                job_id = request.pop(0)

                execution_process = next(
                    (p for p in execution_processes if p.job_id == job_id), None
                )

                if execution_process is None:
                    reply = [BCP.BCPP_ERROR, b"Unknown job: %s" % job_id]
                else:
                    # Kill the processing thread
                    logger.info("Cancelling the job #%s", execution_process.job_id)
                    execution_process.terminate()
                    execution_process.join()
                    execution_processes.remove(execution_process)
                    del execution_process
                    reply = [BCP.BCPP_JOB_CANCELLED, job_id]

                worker.send(reply)

            # Command: scheduler shutdown
            elif command == BCP.BCPE_SCHEDULER_SHUTDOWN:
                logger.info("The scheduler shut down, we will wait for it")
                worker.destroy()

    worker.destroy()
    # Cleanup
    for execution_process in execution_processes:
        execution_process.terminate()
        execution_process.join()

    processor.destroy()

    if (docker_images_cache is not None) and os.path.exists(docker_images_cache):
        os.remove(docker_images_cache)

    return 0


def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]

    prog = os.path.basename(__name__.split(".")[0])
    completions = dict(prog=prog, version=__version__, hostname=gethostname())
    args = docopt(
        __doc__ % completions,
        argv=argv,
        options_first=True,
        version="v%s" % __version__,
    )
    broker_address = args.pop("<broker_address>")

    global logger

    logging_name = "Worker '" + args.get("--name") + "'"
    logger = setup_logging(args.get("--verbose"), logging_name)

    # Check the prefix path
    prefix = args["--prefix"] if args["--prefix"] is not None else "."
    if not os.path.exists(prefix):
        logger.error("Prefix not found at: '%s'", prefix)
        return 1

    # Check the cache path
    cache = (
        args["--cache"]
        if args["--cache"] is not None
        else os.path.join(prefix, "cache")
    )
    if not os.path.exists(cache):
        logger.error("Cache not found at: '%s'", cache)
        return 1

    # (If necessary) Docker-related initialisations
    docker_images_cache = None
    docker_network_name = None
    docker_port_range = None
    if args["--docker"]:
        docker_images_cache = os.path.join(
            tempfile.gettempdir(), "beat-docker-images.json"
        )
        logger.info("Using docker images cache: '%s'", docker_images_cache)
        Host(images_cache=docker_images_cache, raise_on_errors=False)

        docker_network_name = args.get("--docker-network", None)
        if docker_network_name:
            logger.info("Using docker network: '%s'", docker_network_name)

        docker_port_range = args.get("--port-range", None)
        if docker_port_range:
            if len(docker_port_range.split(":")) != 2:
                logger.error("Invalid port range %s" % docker_port_range)
                return 1
            logger.info("Using port range %s", docker_port_range)

327
328
329
330
331
332
333
334
        docker_cache_mount_point = args.get("--cache-mount-point", None)
        if docker_cache_mount_point:
            if not docker_cache_mount_point.startswith("nfs://"):
                raise RuntimeError(
                    "Invalid nfs mount point: {}".format(docker_cache_mount_point)
                )
            logger.info("Using volume cache mount point %s", docker_cache_mount_point)

335
336
337
338
339
340
341
342
343
344
    return run(
        broker_address,
        service_name=args.get("--name"),
        verbose=args.get("--verbose"),
        prefix=prefix,
        cache=cache,
        use_docker=args["--docker"],
        docker_network_name=docker_network_name,
        docker_port_range=docker_port_range,
        docker_images_cache=docker_images_cache,
345
        docker_cache_mount_point=docker_cache_mount_point,
346
347
348
349
350
    )


if __name__ == "__main__":
    main()