diff --git a/beat/core/scripts/worker.py b/beat/core/scripts/worker.py index 24754d33d5d9e41f5f2b0d124ebf25c3d9fca265..72d6343f664b319614d0dae9a854d9a88e4b348a 100755 --- a/beat/core/scripts/worker.py +++ b/beat/core/scripts/worker.py @@ -144,15 +144,23 @@ def main(user_input=None): ) - # Change the verbosity level + # Setup the logging + formatter = logging.Formatter(fmt="[%(asctime)s - Worker '" + args['--name'] + \ + "' - %(name)s] %(levelname)s: %(message)s", + datefmt="%d/%b/%Y %H:%M:%S") + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger = logging.getLogger(__name__) + logger.addHandler(handler) + if args['--verbose'] == 1: - logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.INFO) + logger.setLevel(logging.INFO) elif args['--verbose'] >= 2: - logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.DEBUG) + logger.setLevel(logging.DEBUG) else: - logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.WARNING) - - logger = logging.getLogger(__name__) + logger.setLevel(logging.WARNING) # Check the prefix path @@ -240,12 +248,14 @@ def main(user_input=None): if result.has_key('result'): content = simplejson.dumps(result['result']) - logger.debug('send: """%s"""' % content.rstrip()) status = WorkerController.DONE if result['result']['status'] != 0: status = WorkerController.JOB_ERROR + logger.info("Job #%s completed", current_job_id) + logger.debug('send: """%s"""' % content.rstrip()) + message = [ status, current_job_id, @@ -302,14 +312,14 @@ def main(user_input=None): # Check that the worker isn't busy if execution_process is not None: socket.send_multipart([ - WorkerController.JOB_ERROR, + WorkerController.ERROR, job_id, 'Worker is already busy' ]) continue # Start the execution - logger.info("Running '%s'", data['algorithm']) + logger.info("Running '%s' with job id #%s", data['algorithm'], job_id) current_job_id = job_id execution_process = ExecutionProcess(multiprocessing.Queue(), prefix, data, cache, @@ -330,6 +340,8 @@ def main(user_input=None): continue # Kill the processing thread + logger.info("Cancelling the job #%s", current_job_id) + execution_process.terminate() execution_process.join() execution_process = None