diff --git a/beat/core/scripts/worker.py b/beat/core/scripts/worker.py index 72d6343f664b319614d0dae9a854d9a88e4b348a..770786dd2da2e081de4e5503fb6561bb24e3eb65 100755 --- a/beat/core/scripts/worker.py +++ b/beat/core/scripts/worker.py @@ -73,10 +73,11 @@ stop = False class ExecutionProcess(multiprocessing.Process): - def __init__(self, queue, prefix, data, cache, docker, images_cache=None): + def __init__(self, queue, job_id, prefix, data, cache, docker, images_cache=None): super(ExecutionProcess, self).__init__() self.queue = queue + self.job_id = job_id self.prefix = prefix self.data = data self.cache = cache @@ -235,57 +236,57 @@ def main(user_input=None): # Process the requests - execution_process = None - current_job_id = None + execution_processes = [] while not stop: # Send the result of the processing (if any) - if (execution_process is not None) and not execution_process.is_alive(): - if execution_process.exitcode == 0: - result = execution_process.queue.get() - else: - result = dict(system_error='Execution error in the subprocess') + for execution_process in execution_processes: + if not execution_process.is_alive(): + if execution_process.exitcode == 0: + result = execution_process.queue.get() + else: + result = dict(system_error='Execution error in the subprocess') - if result.has_key('result'): - content = simplejson.dumps(result['result']) + if result.has_key('result'): + content = simplejson.dumps(result['result']) - status = WorkerController.DONE - if result['result']['status'] != 0: - status = WorkerController.JOB_ERROR + status = WorkerController.DONE + if result['result']['status'] != 0: + status = WorkerController.JOB_ERROR - logger.info("Job #%s completed", current_job_id) - logger.debug('send: """%s"""' % content.rstrip()) + logger.info("Job #%s completed", execution_process.job_id) + logger.debug('send: """%s"""' % content.rstrip()) - message = [ - status, - current_job_id, - content - ] - elif result.has_key('error'): - logger.error(result['error']) + message = [ + status, + execution_process.job_id, + content + ] + elif result.has_key('error'): + logger.error(result['error']) - message = [ - WorkerController.JOB_ERROR, - current_job_id, - ] + message = [ + WorkerController.JOB_ERROR, + execution_process.job_id, + ] - message += result['details'] + message += result['details'] - else: - logger.error(result['system_error']) + else: + logger.error(result['system_error']) - message = [ - WorkerController.ERROR, - current_job_id, - result['system_error'] - ] + message = [ + WorkerController.ERROR, + execution_process.job_id, + result['system_error'] + ] - socket.send_multipart(message) + socket.send_multipart(message) - execution_process = None + execution_processes.remove(execution_process) - if execution_process is None: + if len(execution_processes) == 0: timeout = 1000 # ms else: timeout = 100 @@ -309,46 +310,42 @@ def main(user_input=None): job_id = parts[1] data = simplejson.loads(parts[2]) - # Check that the worker isn't busy - if execution_process is not None: - socket.send_multipart([ - WorkerController.ERROR, - job_id, - 'Worker is already busy' - ]) - continue - # Start the execution logger.info("Running '%s' with job id #%s", data['algorithm'], job_id) - current_job_id = job_id - execution_process = ExecutionProcess(multiprocessing.Queue(), prefix, data, cache, - docker=args['--docker'], images_cache=docker_images_cache) + execution_process = ExecutionProcess(multiprocessing.Queue(), job_id, prefix, + data, cache, docker=args['--docker'], + images_cache=docker_images_cache) execution_process.start() execution_process.queue.get() + execution_processes.append(execution_process) + # Command: cancel elif command == WorkerController.CANCEL: - # Check that the worker is busy - if execution_process is None: + job_id = parts[1] + + try: + execution_process = [ p for p in execution_processes if p.job_id == job_id ] + except: socket.send_multipart([ WorkerController.ERROR, - "Worker isn't busy" + "Unknown job: %s" % job_id ]) continue # Kill the processing thread - logger.info("Cancelling the job #%s", current_job_id) + logger.info("Cancelling the job #%s", execution_process.job_id) execution_process.terminate() execution_process.join() - execution_process = None + execution_processes.remove(execution_process) socket.send_multipart([ WorkerController.CANCELLED, - current_job_id, + job_id, ]) @@ -356,7 +353,7 @@ def main(user_input=None): # Cleanup - if execution_process is not None: + for execution_process in execution_processes: execution_process.terminate() execution_process.join() diff --git a/beat/core/test/test_worker.py b/beat/core/test/test_worker.py index 84ff35a31424b41cf6072c1a86b49a004d313bf3..408f056957f756e3f97d899a4a3c8b6a08aff4c7 100644 --- a/beat/core/test/test_worker.py +++ b/beat/core/test/test_worker.py @@ -339,7 +339,7 @@ class TestOneWorker(unittest.TestCase): config['algorithm'] = 'user/integers_echo_slow/1' self.controller.execute(WORKER1, 1, config) - self.controller.cancel(WORKER1) + self.controller.cancel(WORKER1, 1) (worker, status, job_id, data) = self._wait() @@ -349,8 +349,8 @@ class TestOneWorker(unittest.TestCase): self.assertEqual(len(data), 0) - def test_error_cancel_free_worker(self): - self.controller.cancel(WORKER1) + def test_error_cancel_unknown_job(self): + self.controller.cancel(WORKER1, 1) (worker, status, job_id, data) = self._wait() diff --git a/beat/core/worker.py b/beat/core/worker.py index e244bd2051696a15b78d3163df5642c6295ec277..2e85379d242b28a9322ab8ff88d787df28435163 100755 --- a/beat/core/worker.py +++ b/beat/core/worker.py @@ -93,9 +93,10 @@ class WorkerController(object): ]) - def cancel(self, worker): + def cancel(self, worker, job_id): self.socket.send_multipart([ str(worker), + str(job_id), WorkerController.CANCEL ])