Commit e9b884e6 authored by Philip ABBET's avatar Philip ABBET
Browse files

Modify the kill mechanism of the docker containers, needed for beat.web

parent 301eb2db
Pipeline #6778 passed with stage
in 4 minutes and 34 seconds
......@@ -56,6 +56,9 @@ class Server(gevent.Greenlet):
self.stop = gevent.event.Event()
self.stop.clear()
self.must_kill = gevent.event.Event()
self.must_kill.clear()
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
......@@ -102,6 +105,10 @@ class Server(gevent.Greenlet):
while not self.stop.is_set(): #keep on
if self.must_kill.is_set():
self.process.kill()
self.must_kill.clear()
timeout = 1000 #ms
socks = dict(self.poller.poll(timeout)) #yields to the next greenlet
......@@ -329,6 +336,10 @@ class Server(gevent.Greenlet):
self._acknowledge()
def kill(self):
self.must_kill.set()
class Agent(object):
'''Handles synchronous commands.
......@@ -360,6 +371,7 @@ class Agent(object):
self.max_cpu_percent = max_cpu_percent
self.tempdir = None
self.process = None
self.server = None
def __enter__(self):
......@@ -413,7 +425,7 @@ class Agent(object):
configuration.dump_runner_configuration(self.tempdir)
# Server for our single client
server = Server(configuration.input_list, configuration.output_list,
self.server = Server(configuration.input_list, configuration.output_list,
host.ip)
# Figures out the image to use
......@@ -425,7 +437,7 @@ class Agent(object):
# Launches the process (0MQ client)
tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir))
cmd = ['execute', server.address, tmp_dir]
cmd = ['execute', self.server.address, tmp_dir]
if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug')
if daemon > 0:
......@@ -450,8 +462,8 @@ class Agent(object):
"process with `docker kill %s`", self.process.pid)
# Serve asynchronously
server.set_process(self.process)
server.start()
self.server.set_process(self.process)
self.server.start()
timed_out = False
......@@ -472,7 +484,7 @@ class Agent(object):
status = self.process.wait()
finally:
server.stop.set()
self.server.stop.set()
# Collects final information and returns to caller
process = self.process
......@@ -482,16 +494,17 @@ class Agent(object):
stderr = process.stderr,
status = status,
timed_out = timed_out,
statistics = server.last_statistics,
system_error = server.system_error,
user_error = server.user_error,
statistics = self.server.last_statistics,
system_error = self.server.system_error,
user_error = self.server.user_error,
)
process.rm()
self.server = None
return retval
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.process is not None:
self.process.kill()
if self.server is not None:
self.server.kill()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment