Commit b3f66798 authored by Manuel Günther's avatar Manuel Günther

Added machine name for running jobs (old databases cannot be read any more).

parent 3f8c067e
......@@ -3,6 +3,7 @@ from __future__ import print_function
import os
import subprocess
import socket # to get the host name
from .models import Base, Job, ArrayJob, Status
from .tools import logger
......@@ -117,8 +118,11 @@ class JobManager:
return
job = jobs[0]
# get the machine name we are executing on; this might only work at idiap
machine_name = socket.gethostname()
# set the 'executing' status to the job
job.execute(array_id)
job.execute(array_id, machine_name)
if job.status == 'failure':
# there has been a dependent job that has failed before
......@@ -170,12 +174,12 @@ class JobManager:
# configuration for jobs
if print_dependencies:
fields = ("job-id", "queue", "status", "job-name", "dependencies", "submitted command line")
lengths = (20, 9, 14, 20, 30, 43)
lengths = (20, 14, 14, 20, 30, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:<%d}" % lengths
dependency_length = lengths[4]
else:
fields = ("job-id", "queue", "status", "job-name", "submitted command line")
lengths = (20, 9, 14, 20, 43)
lengths = (20, 14, 14, 20, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:<%d}" % lengths
dependency_length = 0
......
......@@ -27,6 +27,7 @@ class ArrayJob(Base):
job_id = Column(Integer, ForeignKey('Job.unique'))
status = Column(Enum(*Status))
result = Column(Integer)
machine_name = Column(String(10))
job = relationship("Job", backref='array', order_by=id)
......@@ -35,6 +36,7 @@ class ArrayJob(Base):
self.job_id = job_id
self.status = Status[0]
self.result = None
self.machine_name = None # will be set later, by the Job class
def std_out_file(self):
return self.job.std_out_file() + "." + str(self.id) if self.job.log_dir else None
......@@ -52,9 +54,10 @@ class ArrayJob(Base):
"""Formats the current job into a nicer string to fit into a table."""
job_id = "%d - %d" % (self.job.id, self.id)
queue = self.job.queue_name if self.machine_name is None else self.machine_name
status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
return format.format(job_id, self.job.queue_name, status)
return format.format(job_id, queue, status)
class Job(Base):
......@@ -65,6 +68,7 @@ class Job(Base):
command_line = Column(String(255)) # The command line to execute, converted to one string
name = Column(String(20)) # A hand-chosen name for the task
queue_name = Column(String(20)) # The name of the queue
machine_name = Column(String(10)) # The name of the machine in which the job is run
grid_arguments = Column(String(255)) # The kwargs arguments for the job submission (e.g. in the grid)
id = Column(Integer, unique = True) # The ID of the job as given from the grid
log_dir = Column(String(255)) # The directory where the log files will be put to
......@@ -74,11 +78,12 @@ class Job(Base):
status = Column(Enum(*Status))
result = Column(Integer)
def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', stop_on_failure = False, **kwargs):
def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
"""Constructs a Job object without an ID (needs to be set later)."""
self.command_line = dumps(command_line)
self.name = name
self.queue_name = queue_name # will be set during the queue command later
self.machine_name = machine_name # will be set during the execute command later
self.grid_arguments = dumps(kwargs)
self.log_dir = log_dir
self.stop_on_failure = stop_on_failure
......@@ -128,13 +133,17 @@ class Job(Base):
array_job.status = new_status
def execute(self, array_id = None):
def execute(self, array_id = None, machine_name = None):
"""Sets the status of this job to 'executing'."""
self.status = 'executing'
if array_id is not None:
for array_job in self.array:
if array_job.id == array_id:
array_job.status = 'executing'
if machine_name is not None:
array_job.machine_name = machine_name
elif machine_name is not None:
self.machine_name = machine_name
# sometimes, the 'finish' command did not work for array jobs,
# so check if any old job still has the 'executing' flag set
......@@ -205,13 +214,15 @@ class Job(Base):
def __str__(self):
id = "%d" % self.id
if self.machine_name: m = "%s - %s" % (self.queue_name, self.machine_name)
else: m = self.queue_name
if self.array: a = "[%d-%d:%d]" % self.get_array()
else: a = ""
if self.name is not None: n = "<Job: %s %s - '%s'>" % (id, a, self.name)
else: n = "<Job: %s>" % id
if self.result is not None: r = "%s (%d)" % (self.status, self.result)
else: r = "%s" % self.status
return "%s : %s -- %s" % (n, r, " ".join(self.get_command_line()))
return "%s | %s : %s -- %s" % (n, m, r, " ".join(self.get_command_line()))
def format(self, format, dependencies = 0, limit_command_line = None):
"""Formats the current job into a nicer string to fit into a table."""
......@@ -221,14 +232,15 @@ class Job(Base):
job_id = "%d" % self.id + (" [%d-%d:%d]" % self.get_array() if self.array else "")
status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
queue = self.queue_name if self.machine_name is None else self.machine_name
if dependencies:
deps = str([dep.id for dep in self.get_jobs_we_wait_for()])
if dependencies < len(deps):
deps = deps[:dependencies-3] + '...'
return format.format(job_id, self.queue_name, status, self.name, deps, command_line)
return format.format(job_id, queue, status, self.name, deps, command_line)
else:
return format.format(job_id, self.queue_name, status, self.name, command_line)
return format.format(job_id, queue, status, self.name, command_line)
......
......@@ -100,7 +100,7 @@ class JobManagerSGE(JobManager):
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
if job.status in ('queued', 'executing'):
status = qstat(job.id, context=self.context)
if len(status) == 0:
job.status = 'failure'
......
......@@ -9,7 +9,7 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)):
setup(
name='gridtk',
version='1.0.4a1',
version='1.1.1a0',
description='SGE Grid and Local Submission and Monitoring Tools for Idiap',
url='https://github.com/idiap/gridtk',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment