Commit dfca1e82 authored by Philip ABBET's avatar Philip ABBET
Browse files

[scripts] worker.py: supports several jobs in parallel

parent 92dadf8f
......@@ -73,10 +73,11 @@ stop = False
class ExecutionProcess(multiprocessing.Process):
def __init__(self, queue, prefix, data, cache, docker, images_cache=None):
def __init__(self, queue, job_id, prefix, data, cache, docker, images_cache=None):
super(ExecutionProcess, self).__init__()
self.queue = queue
self.job_id = job_id
self.prefix = prefix
self.data = data
self.cache = cache
......@@ -235,57 +236,57 @@ def main(user_input=None):
# Process the requests
execution_process = None
current_job_id = None
execution_processes = []
while not stop:
# Send the result of the processing (if any)
if (execution_process is not None) and not execution_process.is_alive():
if execution_process.exitcode == 0:
result = execution_process.queue.get()
else:
result = dict(system_error='Execution error in the subprocess')
for execution_process in execution_processes:
if not execution_process.is_alive():
if execution_process.exitcode == 0:
result = execution_process.queue.get()
else:
result = dict(system_error='Execution error in the subprocess')
if result.has_key('result'):
content = simplejson.dumps(result['result'])
if result.has_key('result'):
content = simplejson.dumps(result['result'])
status = WorkerController.DONE
if result['result']['status'] != 0:
status = WorkerController.JOB_ERROR
status = WorkerController.DONE
if result['result']['status'] != 0:
status = WorkerController.JOB_ERROR
logger.info("Job #%s completed", current_job_id)
logger.debug('send: """%s"""' % content.rstrip())
logger.info("Job #%s completed", execution_process.job_id)
logger.debug('send: """%s"""' % content.rstrip())
message = [
status,
current_job_id,
content
]
elif result.has_key('error'):
logger.error(result['error'])
message = [
status,
execution_process.job_id,
content
]
elif result.has_key('error'):
logger.error(result['error'])
message = [
WorkerController.JOB_ERROR,
current_job_id,
]
message = [
WorkerController.JOB_ERROR,
execution_process.job_id,
]
message += result['details']
message += result['details']
else:
logger.error(result['system_error'])
else:
logger.error(result['system_error'])
message = [
WorkerController.ERROR,
current_job_id,
result['system_error']
]
message = [
WorkerController.ERROR,
execution_process.job_id,
result['system_error']
]
socket.send_multipart(message)
socket.send_multipart(message)
execution_process = None
execution_processes.remove(execution_process)
if execution_process is None:
if len(execution_processes) == 0:
timeout = 1000 # ms
else:
timeout = 100
......@@ -309,46 +310,42 @@ def main(user_input=None):
job_id = parts[1]
data = simplejson.loads(parts[2])
# Check that the worker isn't busy
if execution_process is not None:
socket.send_multipart([
WorkerController.ERROR,
job_id,
'Worker is already busy'
])
continue
# Start the execution
logger.info("Running '%s' with job id #%s", data['algorithm'], job_id)
current_job_id = job_id
execution_process = ExecutionProcess(multiprocessing.Queue(), prefix, data, cache,
docker=args['--docker'], images_cache=docker_images_cache)
execution_process = ExecutionProcess(multiprocessing.Queue(), job_id, prefix,
data, cache, docker=args['--docker'],
images_cache=docker_images_cache)
execution_process.start()
execution_process.queue.get()
execution_processes.append(execution_process)
# Command: cancel
elif command == WorkerController.CANCEL:
# Check that the worker is busy
if execution_process is None:
job_id = parts[1]
try:
execution_process = [ p for p in execution_processes if p.job_id == job_id ]
except:
socket.send_multipart([
WorkerController.ERROR,
"Worker isn't busy"
"Unknown job: %s" % job_id
])
continue
# Kill the processing thread
logger.info("Cancelling the job #%s", current_job_id)
logger.info("Cancelling the job #%s", execution_process.job_id)
execution_process.terminate()
execution_process.join()
execution_process = None
execution_processes.remove(execution_process)
socket.send_multipart([
WorkerController.CANCELLED,
current_job_id,
job_id,
])
......@@ -356,7 +353,7 @@ def main(user_input=None):
# Cleanup
if execution_process is not None:
for execution_process in execution_processes:
execution_process.terminate()
execution_process.join()
......
......@@ -339,7 +339,7 @@ class TestOneWorker(unittest.TestCase):
config['algorithm'] = 'user/integers_echo_slow/1'
self.controller.execute(WORKER1, 1, config)
self.controller.cancel(WORKER1)
self.controller.cancel(WORKER1, 1)
(worker, status, job_id, data) = self._wait()
......@@ -349,8 +349,8 @@ class TestOneWorker(unittest.TestCase):
self.assertEqual(len(data), 0)
def test_error_cancel_free_worker(self):
self.controller.cancel(WORKER1)
def test_error_cancel_unknown_job(self):
self.controller.cancel(WORKER1, 1)
(worker, status, job_id, data) = self._wait()
......
......@@ -93,9 +93,10 @@ class WorkerController(object):
])
def cancel(self, worker):
def cancel(self, worker, job_id):
self.socket.send_multipart([
str(worker),
str(job_id),
WorkerController.CANCEL
])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment