...
 
Commits (2)
......@@ -40,6 +40,13 @@ import glob
import click
import simplejson
import six
import curses
import textwrap
import threading
import queue
import signal
from datetime import timedelta
from beat.core.experiment import Experiment
from beat.core.execution import DockerExecutor
......@@ -840,8 +847,9 @@ def plot(ctx, names, force, remote, show, output_folder):
@experiments.command()
@click.argument("name", nargs=1)
@click.option("--watch", help="Start monitoring the execution", is_flag=True)
@click.pass_context
def start(ctx, name):
def start(ctx, name, watch):
"""Start an experiment on the platform"""
config = ctx.meta.get("config")
......@@ -854,6 +862,8 @@ def start(ctx, name):
name, webapi.platform, six.moves.http_client.responses[status]
)
)
elif watch:
ctx.invoke(monitor, name=name)
@experiments.command()
......@@ -895,8 +905,8 @@ def reset(ctx, name):
@experiments.command()
@click.argument("name", nargs=1)
@click.pass_context
def status(ctx, name):
"""Shows the current status of a running experiment"""
def runstatus(ctx, name):
"""Shows the status of an experiment on the platform"""
config = ctx.meta.get("config")
......@@ -927,3 +937,223 @@ def status(ctx, name):
else:
data = simplejson.loads(answer)
print(simplejson.dumps(data, indent=4))
# The monitoring implementation has been inspired from
# https://medium.com/greedygame-engineering/an-elegant-way-to-run-periodic-tasks-in-python-61b7c477b679
class ProgramKilled(Exception):
"""CTRL + C has been used"""
pass
def signal_handler(signum, frame):
"""Basic signal handler for processing keyboard interruption"""
raise ProgramKilled
class ExperimentMonitor(threading.Thread):
"""Thread doing the monitoring of an experiment"""
def __init__(self, interval, config, name):
super(ExperimentMonitor, self).__init__()
self.daemon = False
self.stop_event = threading.Event()
self.interval = interval
self.config = config
self.name = name
self.stopped = False
self.queue = queue.Queue()
def stop(self):
"""Stop the thread cleanly"""
self.stopped = True
self.stop_event.set()
self.join()
def run(self):
"""Periodically calls the platform instance to get the status of the
selected experiment.
"""
fields = ",".join(["status", "blocks_status", "done"])
self.stopped = False
with common.make_webapi(self.config) as webapi:
first_run = True
while not self.stop_event.wait(
0 if first_run else self.interval.total_seconds()
):
status, answer = webapi.get(
"/api/v1/experiments/{}/?fields={}".format(self.name, fields)
)
if status != six.moves.http_client.OK:
logger.error(
"failed to get current state of {} on `{}', reason: {}".format(
self.name,
webapi.platform,
six.moves.http_client.responses[status],
)
)
self.stop_event.set()
self.queue.put({"error": status})
else:
data = simplejson.loads(answer)
self.queue.put(data)
if first_run:
first_run = False
def replace_line(pad, line, text):
"""Replaces the content of a ncurses pad line"""
pad.move(line, 0)
pad.clrtoeol()
pad.addstr(line, 0, text)
def process_input(monitor, pad, delta, pad_height, height, width):
"""Processes the keyboard input of an ncurses pad"""
if pad:
try:
ch = pad.getch()
except curses.error:
pass
else:
if ch == curses.KEY_UP:
delta = max(delta - 1, 0)
elif ch == curses.KEY_DOWN:
delta = min(delta + 1, pad_height - height)
elif ch == ord("q"):
monitor.stop()
pad.refresh(delta, 0, 0, 0, height - 1, width - 1)
return delta
@experiments.command()
@click.argument("name", nargs=1)
@click.pass_context
def monitor(ctx, name):
"""Monitor a running experiment"""
config = ctx.meta.get("config")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
monitor = ExperimentMonitor(interval=timedelta(seconds=5), config=config, name=name)
monitor.start()
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
initialised = False
killed = False
pad = None
line = 0
delta = 0
height, width = stdscr.getmaxyx()
pad_height = height
STATIC_LINE_COUNT = 3 # Number of known lines that will be shown
while True:
try:
try:
data = monitor.queue.get(True, 0.2)
except queue.Empty:
delta = process_input(monitor, pad, delta, pad_height, height, width)
else:
if "error" in data:
killed = True
break
height, width = stdscr.getmaxyx()
if not initialised:
nb_blocks = len(data["blocks_status"]) + STATIC_LINE_COUNT
pad_height = max(nb_blocks, height)
pad = curses.newpad(pad_height, width)
pad.timeout(200)
pad.keypad(True)
line = 0
replace_line(
pad,
line,
textwrap.shorten("Name: {name}".format(name=name), width=width),
)
line += 1
replace_line(
pad,
line,
textwrap.shorten("Status: {status}".format(**data), width=width),
)
blocks = data["blocks_status"]
text_width = int(width / 2)
for block_name, block_status in blocks.items():
line += 1
pad.move(line, 0)
pad.clrtoeol()
pad.addstr(
line,
0,
textwrap.shorten(
"Name: {block_name} ".format(block_name=block_name),
width=text_width,
),
)
pad.addstr(
line,
text_width,
textwrap.shorten(
"Status: {block_status}".format(block_status=block_status),
width=text_width,
),
)
pad.refresh(delta, 0, 0, 0, height - 1, width - 1)
if data["done"]:
monitor.stop()
if not initialised:
initialised = True
finally:
delta = process_input(monitor, pad, delta, pad_height, height, width)
if not monitor.isAlive():
break
except ProgramKilled:
monitor.stop()
killed = True
break
if not killed:
line += 1
pad.timeout(-1)
pad.addstr(
line,
0,
textwrap.shorten("Experiment done, press any key to leave", width=width),
curses.A_BOLD,
)
pad.move(line, 0)
pad.refresh(pad_height - height, 0, 0, 0, height - 1, width - 1)
pad.getkey()
curses.echo()
curses.nocbreak()
curses.endwin()
......@@ -275,3 +275,35 @@ def test_draw():
nose.tools.assert_true(
os.path.exists(os.path.join(tmp_prefix, "experiments", obj + ".png"))
)
@slow
@nose.tools.with_setup(teardown=cleanup)
@skipif(disconnected, "missing test platform (%s)" % platform)
def test_start():
obj = "user/user/double_triangle/1/double_triangle"
nose.tools.eq_(call("start", obj), 0)
@slow
@nose.tools.with_setup(teardown=cleanup)
@skipif(disconnected, "missing test platform (%s)" % platform)
def test_cancel():
obj = "user/user/double_triangle/1/double_triangle"
nose.tools.eq_(call("cancel", obj), 0)
@slow
@nose.tools.with_setup(teardown=cleanup)
@skipif(disconnected, "missing test platform (%s)" % platform)
def test_reset():
obj = "user/user/double_triangle/1/double_triangle"
nose.tools.eq_(call("reset", obj), 0)
@slow
@nose.tools.with_setup(teardown=cleanup)
@skipif(disconnected, "missing test platform (%s)" % platform)
def test_runstatus():
obj = "user/user/double_triangle/1/double_triangle"
nose.tools.eq_(call("runstatus", obj), 0)
......@@ -46,6 +46,7 @@ requirements:
- click
- click-plugins
- graphviz
- ncurses
test:
requires:
......