diff --git a/beat/core/agent.py b/beat/core/agent.py index 8b61307b46d61fd8f5449843e450bd930c4ab04d..ea50d0f2d47bfa6dc8289f6bd114f19096f7e7a9 100755 --- a/beat/core/agent.py +++ b/beat/core/agent.py @@ -56,6 +56,9 @@ class Server(gevent.Greenlet): self.stop = gevent.event.Event() self.stop.clear() + self.must_kill = gevent.event.Event() + self.must_kill.clear() + # Starts our 0MQ server self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) @@ -102,6 +105,10 @@ class Server(gevent.Greenlet): while not self.stop.is_set(): #keep on + if self.must_kill.is_set(): + self.process.kill() + self.must_kill.clear() + timeout = 1000 #ms socks = dict(self.poller.poll(timeout)) #yields to the next greenlet @@ -329,6 +336,10 @@ class Server(gevent.Greenlet): self._acknowledge() + def kill(self): + self.must_kill.set() + + class Agent(object): '''Handles synchronous commands. @@ -360,6 +371,7 @@ class Agent(object): self.max_cpu_percent = max_cpu_percent self.tempdir = None self.process = None + self.server = None def __enter__(self): @@ -413,7 +425,7 @@ class Agent(object): configuration.dump_runner_configuration(self.tempdir) # Server for our single client - server = Server(configuration.input_list, configuration.output_list, + self.server = Server(configuration.input_list, configuration.output_list, host.ip) # Figures out the image to use @@ -425,7 +437,7 @@ class Agent(object): # Launches the process (0MQ client) tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir)) - cmd = ['execute', server.address, tmp_dir] + cmd = ['execute', self.server.address, tmp_dir] if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug') if daemon > 0: @@ -450,8 +462,8 @@ class Agent(object): "process with `docker kill %s`", self.process.pid) # Serve asynchronously - server.set_process(self.process) - server.start() + self.server.set_process(self.process) + self.server.start() timed_out = False @@ -472,7 +484,7 @@ class Agent(object): status = self.process.wait() finally: - server.stop.set() + self.server.stop.set() # Collects final information and returns to caller process = self.process @@ -482,16 +494,17 @@ class Agent(object): stderr = process.stderr, status = status, timed_out = timed_out, - statistics = server.last_statistics, - system_error = server.system_error, - user_error = server.user_error, + statistics = self.server.last_statistics, + system_error = self.server.system_error, + user_error = self.server.user_error, ) process.rm() + self.server = None return retval def kill(self): """Stops the user process by force - to be called from signal handlers""" - if self.process is not None: - self.process.kill() + if self.server is not None: + self.server.kill()