Commit 69d4dc3b authored by André Anjos's avatar André Anjos 💬

[async] Single-thread implementation based on greenlets

parent b4466e92
Pipeline #959 failed with stage
......@@ -28,25 +28,28 @@
import os
import shutil
import threading
import logging
logger = logging.getLogger(__name__)
import zmq
import gevent
import zmq.green as zmq
from . import utils
from . import async
from . import baseformat
class Server(threading.Thread):
class Server(gevent.Greenlet):
'''A 0MQ server for our communication with the user process'''
def __init__(self, input_list, output_list):
super(Server, self).__init__()
self._stop = threading.Event()
# An event unblocking a graceful stop
self.stop = gevent.event.Event()
self.stop.clear()
# Starts our 0MQ server
self.context = zmq.Context()
......@@ -78,14 +81,6 @@ class Server(threading.Thread):
)
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
def set_process(self, process):
self.process = process
......@@ -94,12 +89,10 @@ class Server(threading.Thread):
logger.debug("0MQ server thread started")
while not self.stopped():
while not self.stop.is_set(): #keep on
timeout = 1000 #ms
socks = dict(self.poller.poll(1000)) #blocks here, for 1 second at most
if self.stopped(): break #break immediately
socks = dict(self.poller.poll(timeout)) #yields to the next greenlet
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
......@@ -126,7 +119,7 @@ class Server(threading.Thread):
logger.error(message, exc_info=True)
self.system_error = message
self.process.kill()
self.stop()
self.stop.set()
break
else:
......@@ -135,7 +128,7 @@ class Server(threading.Thread):
logger.error(message)
self.system_error = message
self.process.kill()
self.stop()
self.stop.set()
break
self.socket.setsockopt(zmq.LINGER, 0)
......@@ -296,7 +289,7 @@ class Server(threading.Thread):
logger.debug('send: ack')
self.socket.send('ack')
logger.debug('setting stop condition for 0MQ server thread')
self.stop()
self.stop.set()
def done(self, wait_time):
......@@ -329,20 +322,8 @@ class Server(threading.Thread):
class Agent(object):
'''Handles asynchronous stdout/stderr readout and synchronous commands.
We use the standard subprocess/threading modules for this implementation.
Each co-process is linked to us via 5 uni-directional pipes which data work
as stdout, stderr, stdin, datain and dataout end-points. The parent
co-process establishes the connection to the child and then can pass/receive
commands, data and logs.
Usage of the data pipes (datain, dataout) is **synchronous** - you send a
command and block for an answer. The co-process is normally controlled by the
(current) parent process, except for data requests, which are algorithm
driven. The nature of our problem does not require an *asynchronous*
implementation which, in turn, would require a much more complex set of
dependencies (asyncio or Twisted).
Objects of this class are in charge of three separate tasks:
We use the greenlets for this implementation. Objects of this class are in
charge of three separate tasks:
1. Handling the execution of the user process (as a separate process)
2. Making sure the user process does not consume more resources than it is
......@@ -468,7 +449,7 @@ class Agent(object):
timeout = (60*timeout_in_minutes) if timeout_in_minutes else None
status = self.process.wait(timeout)
except async.subprocess.TimeoutExpired:
except async.TimeoutExpired:
logger.warn("user process has timed out after %d minutes",
timeout_in_minutes)
self.process.kill()
......@@ -481,13 +462,7 @@ class Agent(object):
status = self.process.wait()
finally:
server.stop()
timeout = 5 #seconds
server.join(timeout=timeout) #at most, wait ``timeout``
if server.is_alive():
logger.error("0MQ poller thread is still alive after a timeout " \
"of %d seconds", timeout)
server.stop.set()
# If status is negative, convert it to a positive value (group signal)
if status < 0: status *= -1
......
......@@ -26,17 +26,12 @@
###############################################################################
'''Implementation of subprocess-based asynchronous running with threads
'''Implementation of subprocess-based asynchronous running with greenlets
'''
import os
import sys
import errno
import signal
import time
import threading
import collections
import pkg_resources
import distutils.spawn
import logging
......@@ -44,19 +39,22 @@ logger = logging.getLogger(__name__)
import psutil
import distutils.version
python_version = distutils.version.LooseVersion('%d.%d.%d' % \
sys.version_info[:3])
version_330 = distutils.version.LooseVersion('3.3.0')
if python_version < version_330:
import subprocess32 as subprocess
else:
import subprocess
import gevent
import gevent.timeout
import gevent.subprocess
import pkg_resources
from . import stats
class circbuffer(object):
# Figures out the expected TimeoutExpired exception
import six
if six.PY2:
from gevent.timeout import Timeout as TimeoutExpired
else:
from subprocess import TimeoutExpired as TimeoutExpired
class _circbuffer(object):
'''A configurable circular buffer used for outputting stdout/sterr
You may used it like this::
......@@ -99,47 +97,22 @@ class circbuffer(object):
logger.debug('[%s] closed', self.name)
return self.buf.clear()
def __str__(self):
return '[%s] %d bytes' % (self.name, len(self.buf))
class Reader(threading.Thread):
def __init__(self, name, lock, collector, source):
super(Reader, self).__init__(name=name)
self.__stop = threading.Event()
self.lock = lock
self.collector = collector
self.source = source
def stop(self):
self.__stop.set()
def stopped(self):
return self.__stop.isSet()
def run(self):
while not self.stopped():
try:
logger.debug('[%s] Blocking on read()', self.name)
data = os.read(self.source.fileno(), 65536).decode('utf-8') #no GIL
logger.debug('[%s] Read operation exited successfully', self.name)
except ValueError as e: #file is already closed
logger.debug('[%s] Read operation on closed file', self.name)
data = ""
if data == "": #process has finished
logger.debug('[%s] Data is empty, stopping...', self.name)
self.stop()
with self.lock: self.collector.write(data)
def _read_stream(stream, buf):
'''Reads stream, write on buffer, yields if blocked'''
return
try:
if not stream.closed:
buf.write(stream.read())
except RuntimeError:
pass
def sandbox_memory(cmd, virtual_memory_in_megabytes):
"""Returns the command-line for a memory-sandbox executable"""
def _sandbox_memory(cmd, virtual_memory_in_megabytes):
'''Returns the command-line for a memory-sandbox executable'''
if virtual_memory_in_megabytes > 0:
logger.info("Setting maximum virtual memory usage to %d megabyte(s)",
......@@ -158,15 +131,8 @@ def sandbox_memory(cmd, virtual_memory_in_megabytes):
return cmd
def pop(d, key, default):
"""Reads a key from the dictionary ``d`` returns it and removes it"""
retval = d.get(key, default)
if key in d: del d[key]
return retval
def resolve_cpulimit_path(exe):
"""Returns the path to cpulimit"""
'''Returns the path to cpulimit'''
FIXED_LOCATIONS = [
'/usr/local/bin/cpulimit',
......@@ -206,14 +172,14 @@ def resolve_cpulimit_path(exe):
return retval
class Popen(subprocess.Popen):
"""Manager for an asynchronous process.
class Popen(gevent.subprocess.Popen):
'''Manager for an asynchronous process.
The process will be run in the background, and its standard output and
standard error will be read asynchronously, into a limited size circular
buffer. This implementation, despite using Python threads, will be able to
execute the readout in parallel, since the stream ``read()`` operation
unblocks the Python global interpreter lock (GIL).
buffer. This implementation, despite using Greenlets, will be able to execute
the readout in parallel, since the stream ``read()`` operation yields the
next greenlet
Parameters:
......@@ -248,7 +214,7 @@ class Popen(subprocess.Popen):
OSError: If ``cmd`` points to something that cannot be executed.
"""
'''
def __init__(self, cmd, buflen=65500, virtual_memory_in_megabytes=0,
max_cpu_percent=0, cpulimit_path=None):
......@@ -256,44 +222,23 @@ class Popen(subprocess.Popen):
debug = logger.getEffectiveLevel() <= logging.DEBUG
name = os.path.basename(cmd[0])
self.__stdout = circbuffer(buflen, name='%s:stdout' % name)
self.__stderr = circbuffer(buflen, name='%s:stderr' % name)
self.__lock = threading.Lock()
self.__stdout = _circbuffer(buflen, name='%s:stdout' % name)
self.__stderr = _circbuffer(buflen, name='%s:stderr' % name)
# hooks-in memory usage containment
virtual_memory_in_megabytes = max(virtual_memory_in_megabytes, 0)
cmd = sandbox_memory(cmd, virtual_memory_in_megabytes)
cmd = _sandbox_memory(cmd, virtual_memory_in_megabytes)
logger.debug("Running command `%s'" % ' '.join(cmd))
self.__stdout_thread = None
self.__stderr_thread = None
super(Popen, self).__init__(
cmd,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=gevent.subprocess.PIPE,
stderr=gevent.subprocess.PIPE,
bufsize=1 if debug else -1,
)
# setup threads to read stdout and stderr
self.__stdout_thread = Reader(
name="%s-stdout-thread" % name,
lock=self.__lock,
collector=self.__stdout, #write here
source=self.stdout, #read from this stream
)
self.__stdout_thread.start()
self.__stderr_thread = Reader(
name="%s-stderr-thread" % name,
lock=self.__lock,
collector=self.__stderr, #write here
source=self.stderr, #read from this stream
)
self.__stderr_thread.start()
# if we need to use a cpu limitation
max_cpu_percent = max(0, max_cpu_percent)
max_cpu_percent = min(max_cpu_percent, 100*psutil.cpu_count())
......@@ -315,44 +260,69 @@ class Popen(subprocess.Popen):
self.cpulimit_process = None
def __del__(self):
if self.__stdout_thread and self.__stdout_thread.is_alive():
self.stdout.close() #unblock any read() operations on this stream
if self.__stderr_thread and self.__stderr_thread.is_alive():
self.stderr.close() #unblock any read() operations on this stream
def wait(self, timeout=None):
'''Reads stdout and stderr until the underlying processes finishes
Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which
we read the stdout and stderr data into a circular buffer, keep only the
last N bytes of each stream.
This method will call :py:meth:`file.readline` on both stdout and stderr
streams attached to the process. These methods are "green". They will yield
once they are blocked.
Returns:
int: Returns the status code of the process
Raises:
gevent.timeout.Timeout: under Python 2, if the process times out
subprocess.TimeoutExpired: under Python 3, if the process times out
'''
gevent.spawn(_read_stream, self.stdout, self.__stdout)
gevent.spawn(_read_stream, self.stderr, self.__stderr)
retval = super(Popen, self).wait(timeout)
if retval is None and timeout is not None:
raise TimeoutExpired(timeout)
return retval
def peek_stdout(self):
'''Returns the last N bytes of stdout'''
return self.__stdout.read()
def peek_stderr(self):
'''Returns the last N bytes of stderr'''
return self.__stderr.read()
def kill(self):
"""Before killing myself, make sure to kill all children."""
'''Before killing myself, make sure to kill all children.'''
p = psutil.Process(self.pid)
for child in p.children(): child.kill()
super(Popen, self).kill()
self.stdout.close()
self.stderr.close()
self.wait() #avoids zombies
if self.cpulimit_process:
# the cpulimit process is --lazy, so it should die automatically
# after the attached process is killed
self.cpulimit_process.stdout.close()
self.cpulimit_process.stderr.close()
self.cpulimit_process.wait() #avoids zombie process
def peek_stdout(self):
"""Returns both stdout and stderr as strings"""
with self.__lock:
return self.__stdout.read()
def peek_stderr(self):
"""Returns both stdout and stderr as strings"""
with self.__lock:
return self.__stderr.read()
def statistics(self):
"""If the process is still active, returns usage statistics by ``pusutil``
'''If the process is still active, returns usage statistics by ``pusutil``
Returns:
......@@ -364,7 +334,7 @@ class Popen(subprocess.Popen):
RuntimeError: In case the process is not active anymore.
"""
'''
def sum_tuples(t):
retval = list(t[0])
......
......@@ -39,7 +39,7 @@ import pkg_resources
import nose
import nose.tools
from ..async import Popen, subprocess, resolve_cpulimit_path
from ..async import Popen, resolve_cpulimit_path, TimeoutExpired
from .utils import slow
# in case you want to see the printouts dynamically, set to ``True``
......@@ -72,12 +72,12 @@ def test_echo_hello_world():
def test_timeout():
sleep_for = 10 # seconds
sleep_for = 100 # seconds
p = Popen(["sleep", str(sleep_for)])
try:
p.wait(timeout=0.5)
retval = p.wait(timeout=0.5)
assert False, "timeout never occurred after %d seconds" % sleep_for
except subprocess.TimeoutExpired as e:
except TimeoutExpired as e:
p.kill()
status = p.wait()
nose.tools.eq_(status, -signal.SIGKILL)
......@@ -120,15 +120,16 @@ def test_limit_stdout():
sys.executable,
"-c",
'; '.join([
"for k in range(2**17): __import__('sys').stdout.write('%d ' % k)",
"__import__('sys').stdout.flush()",
"import sys",
"sys.stdout.write(' '.join([str(k) for k in range(2**17)]))",
"sys.stdout.flush()",
]),
])
status = p.wait()
nose.tools.eq_(status, 0)
data = p.peek_stdout()
nose.tools.eq_(len(data), 65500)
expected = '%d ' % ((2**17)-1)
expected = str((2**17)-1)
assert data.endswith(expected)
nose.tools.eq_(p.peek_stderr(), '')
......@@ -139,19 +140,47 @@ def test_limit_stderr():
sys.executable,
"-c",
'; '.join([
"for k in range(2**17): __import__('sys').stderr.write('%d ' % k)",
"__import__('sys').stderr.flush()",
"import sys",
"sys.stderr.write(' '.join([str(k) for k in range(2**17)]))",
"sys.stderr.flush()",
]),
])
status = p.wait()
nose.tools.eq_(status, 0)
data = p.peek_stderr()
nose.tools.eq_(len(data), 65500)
expected = '%d ' % ((2**17)-1)
expected = str((2**17)-1)
assert data.endswith(expected)
nose.tools.eq_(p.peek_stdout(), '')
def test_limit_both():
p = Popen([
sys.executable,
'-c' ,
'; '.join([
"import sys",
"sys.stderr.write(' '.join([str(k) for k in range(2**17)]))",
"sys.stderr.flush()",
"sys.stdout.write(' '.join([str(k) for k in range(2**17)]))",
"sys.stdout.flush()",
]),
])
status = p.wait()
nose.tools.eq_(status, 0)
data = p.peek_stdout()
nose.tools.eq_(len(data), 65500)
expected = str((2**17)-1)
assert data.endswith(expected)
data = p.peek_stderr()
nose.tools.eq_(len(data), 65500)
expected = str((2**17)-1)
assert data.endswith(expected)
def run_cpulimit(processes, max_cpu_percent, sleep_time):
program = pkg_resources.resource_filename(__name__, 'cpu_stress.py')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment