Commit cd062475 authored by Manuel Günther's avatar Manuel Günther

Made database handling more safe -> allow parallel database access by changing...

Made database handling more safe -> allow parallel database access by changing the type of database that is written (old databases are still readable); jman ls will now sort the jobs according to their ID.
parent 3c823c0f
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
import os import os
import time import time
import anydbm import gdbm, anydbm
from cPickle import dumps, loads from cPickle import dumps, loads
from .tools import qsub, qstat, qdel, logger from .tools import qsub, qstat, qdel, logger
from .setshell import environ from .setshell import environ
...@@ -322,20 +322,38 @@ class JobManager: ...@@ -322,20 +322,38 @@ class JobManager:
""" """
self.state_file = statefile self.state_file = statefile
self.state_db = anydbm.open(self.state_file, 'c') self.context = environ(context)
self.job = {} self.job = {}
if os.path.exists(self.state_file):
try:
db = gdbm.open(self.state_file, 'r')
except:
db = anydbm.open(self.state_file, 'r')
logger.debug("Loading previous state...") logger.debug("Loading previous state...")
for k in self.state_db.keys(): for ks in db.keys():
ki = loads(k) ki = loads(ks)
self.job[ki] = loads(self.state_db[k]) self.job[ki] = loads(db[ks])
logger.debug("Job %d loaded" % ki) logger.debug("Job %d loaded" % ki)
self.context = environ(context) db.close()
def __del__(self): def __del__(self):
"""Safely terminates the JobManager""" """Safely terminates the JobManager"""
try:
db = gdbm.open(self.state_file, 'c')
except:
db = anydbm.open(self.state_file, 'c')
# synchronize jobs
for ks in sorted(db.keys()):
ki = loads(ks)
if ki not in self.job:
del db[ks]
logger.debug("Job %d deleted from database" % ki)
for ki in sorted(self.job.keys()):
ks = dumps(ki)
db[ks] = dumps(self.job[ki])
logger.debug("Job %d added or updated in database" % ki)
db.close()
db = anydbm.open(self.state_file, 'n') # erase previously recorded jobs
for k in sorted(self.job.keys()): db[dumps(k)] = dumps(self.job[k])
if not self.job: if not self.job:
logger.debug("Removing file %s because there are no more jobs to store" \ logger.debug("Removing file %s because there are no more jobs to store" \
% self.state_file) % self.state_file)
...@@ -402,7 +420,7 @@ class JobManager: ...@@ -402,7 +420,7 @@ class JobManager:
header = ' '.join(header) header = ' '.join(header)
return '\n'.join([header] + [delimiter] + \ return '\n'.join([header] + [delimiter] + \
[self[k].row(fmt, maxcmdline) for k in self.job]) [self[k].row(fmt, maxcmdline) for k in sorted(self.job.keys())])
def clear(self): def clear(self):
"""Clear the whole job queue""" """Clear the whole job queue"""
......
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='gridtk', name='gridtk',
version='0.3.4', version='0.3.5',
description='SGE Grid Submission and Monitoring Tools for Idiap', description='SGE Grid Submission and Monitoring Tools for Idiap',
url='https://github.com/idiap/gridtk', url='https://github.com/idiap/gridtk',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment