Commit d0f060ed authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[experiments] Implement basic monitoring of an experiment

It provides a simple ncurses based cli GUI.
parent e4e0ce00
......@@ -40,6 +40,13 @@ import glob
import click
import simplejson
import six
import curses
import textwrap
import threading
import queue
import signal
from datetime import timedelta
from beat.core.experiment import Experiment
from beat.core.execution import DockerExecutor
......@@ -840,8 +847,9 @@ def plot(ctx, names, force, remote, show, output_folder):
@experiments.command()
@click.argument("name", nargs=1)
@click.option("--watch", help="Start monitoring the execution", is_flag=True)
@click.pass_context
def start(ctx, name):
def start(ctx, name, watch):
"""Start an experiment on the platform"""
config = ctx.meta.get("config")
......@@ -854,6 +862,8 @@ def start(ctx, name):
name, webapi.platform, six.moves.http_client.responses[status]
)
)
elif watch:
ctx.invoke(monitor, name=name)
@experiments.command()
......@@ -927,3 +937,223 @@ def status(ctx, name):
else:
data = simplejson.loads(answer)
print(simplejson.dumps(data, indent=4))
# The monitoring implementation has been inspired from
# https://medium.com/greedygame-engineering/an-elegant-way-to-run-periodic-tasks-in-python-61b7c477b679
class ProgramKilled(Exception):
"""CTRL + C has been used"""
pass
def signal_handler(signum, frame):
"""Basic signal handler for processing keyboard interruption"""
raise ProgramKilled
class ExperimentMonitor(threading.Thread):
"""Thread doing the monitoring of an experiment"""
def __init__(self, interval, config, name):
super(ExperimentMonitor, self).__init__()
self.daemon = False
self.stop_event = threading.Event()
self.interval = interval
self.config = config
self.name = name
self.stopped = False
self.queue = queue.Queue()
def stop(self):
"""Stop the thread cleanly"""
self.stopped = True
self.stop_event.set()
self.join()
def run(self):
"""Periodically calls the platform instance to get the status of the
selected experiment.
"""
fields = ",".join(["status", "blocks_status", "done"])
self.stopped = False
with common.make_webapi(self.config) as webapi:
first_run = True
while not self.stop_event.wait(
0 if first_run else self.interval.total_seconds()
):
status, answer = webapi.get(
"/api/v1/experiments/{}/?fields={}".format(self.name, fields)
)
if status != six.moves.http_client.OK:
logger.error(
"failed to get current state of {} on `{}', reason: {}".format(
self.name,
webapi.platform,
six.moves.http_client.responses[status],
)
)
self.stop_event.set()
self.queue.put({"error": status})
else:
data = simplejson.loads(answer)
self.queue.put(data)
if first_run:
first_run = False
def replace_line(pad, line, text):
"""Replaces the content of a ncurses pad line"""
pad.move(line, 0)
pad.clrtoeol()
pad.addstr(line, 0, text)
def process_input(monitor, pad, delta, pad_height, height, width):
"""Processes the keyboard input of an ncurses pad"""
if pad:
try:
ch = pad.getch()
except curses.error:
pass
else:
if ch == curses.KEY_UP:
delta = max(delta - 1, 0)
elif ch == curses.KEY_DOWN:
delta = min(delta + 1, pad_height - height)
elif ch == ord("q"):
monitor.stop()
pad.refresh(delta, 0, 0, 0, height - 1, width - 1)
return delta
@experiments.command()
@click.argument("name", nargs=1)
@click.pass_context
def monitor(ctx, name):
"""Monitor a running experiment"""
config = ctx.meta.get("config")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
monitor = ExperimentMonitor(interval=timedelta(seconds=5), config=config, name=name)
monitor.start()
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
initialised = False
killed = False
pad = None
line = 0
delta = 0
height, width = stdscr.getmaxyx()
pad_height = height
STATIC_LINE_COUNT = 3 # Number of known lines that will be shown
while True:
try:
try:
data = monitor.queue.get(True, 0.2)
except queue.Empty:
delta = process_input(monitor, pad, delta, pad_height, height, width)
else:
if "error" in data:
killed = True
break
height, width = stdscr.getmaxyx()
if not initialised:
nb_blocks = len(data["blocks_status"]) + STATIC_LINE_COUNT
pad_height = max(nb_blocks, height)
pad = curses.newpad(pad_height, width)
pad.timeout(200)
pad.keypad(True)
line = 0
replace_line(
pad,
line,
textwrap.shorten("Name: {name}".format(name=name), width=width),
)
line += 1
replace_line(
pad,
line,
textwrap.shorten("Status: {status}".format(**data), width=width),
)
blocks = data["blocks_status"]
text_width = int(width / 2)
for block_name, block_status in blocks.items():
line += 1
pad.move(line, 0)
pad.clrtoeol()
pad.addstr(
line,
0,
textwrap.shorten(
"Name: {block_name} ".format(block_name=block_name),
width=text_width,
),
)
pad.addstr(
line,
text_width,
textwrap.shorten(
"Status: {block_status}".format(block_status=block_status),
width=text_width,
),
)
pad.refresh(delta, 0, 0, 0, height - 1, width - 1)
if data["done"]:
monitor.stop()
if not initialised:
initialised = True
finally:
delta = process_input(monitor, pad, delta, pad_height, height, width)
if not monitor.isAlive():
break
except ProgramKilled:
monitor.stop()
killed = True
break
if not killed:
line += 1
pad.timeout(-1)
pad.addstr(
line,
0,
textwrap.shorten("Experiment done, press any key to leave", width=width),
curses.A_BOLD,
)
pad.move(line, 0)
pad.refresh(pad_height - height, 0, 0, 0, height - 1, width - 1)
pad.getkey()
curses.echo()
curses.nocbreak()
curses.endwin()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment