Commit 8daded4a authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch '19-submit-extactly-the-same-job-several-times' into 'master'

Resolve "Submit extactly the same job several times"

Closes #19

See merge request !10
parents 6b1dc47d bb504220
Pipeline #14148 passed with stages
in 8 minutes and 13 seconds
......@@ -99,6 +99,15 @@ being executed. Just submit jobs with the ``--stop-on-failure`` option.
The ``--stop-on-failure`` option is under development and might not work
properly. Use this option with care.
Also, you can submit the same job several times in a way that each one will
depend on the last one. This is useful when for GPU training when your jobs
gets killed because you run out of time but you want to submit the same job
again.
.. code-block:: sh
$ jman submit --repeat 5 -- myscript.py
While the jobs run, the output and error stream are captured in log files, which are written into a ``logs`` directory.
This directory can be changed by specifying:
......
......@@ -132,9 +132,12 @@ def submit(args):
kwargs['dry_run'] = args.dry_run
kwargs['stop_on_failure'] = args.stop_on_failure
# submit the job
job_id = jm.submit(args.job, **kwargs)
# submit the job(s)
for _ in range(args.repeat):
job_id = jm.submit(args.job, **kwargs)
dependencies = kwargs.get('dependencies', [])
dependencies.append(job_id)
kwargs['dependencies'] = dependencies
if args.print_id:
print (job_id, end='')
......@@ -293,6 +296,7 @@ def main(command_line_options = None):
submit_parser.add_argument('-t', '--array', '--parametric', metavar='(first-)last(:step)', help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).")
submit_parser.add_argument('-z', '--dry-run', action='store_true', help='Do not really submit anything, just print out what would submit in this case')
submit_parser.add_argument('-i', '--io-big', action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput.')
submit_parser.add_argument('-r', '--repeat', type=int, metavar='N', default=1, help='Submits the job N times. Each job will depend on the job before.')
submit_parser.add_argument('-o', '--print-id', action='store_true', help='Prints the new job id (so that they can be parsed by automatic scripts).')
submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER, help = "The job that should be executed. Sometimes a -- is required to separate the job from other command line options.")
submit_parser.set_defaults(func=submit)
......
......@@ -59,6 +59,7 @@ class GridTKTest(unittest.TestCase):
jman.main([self.jman, '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', bash, script_1])
jman.main([self.jman, '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', bash, script_2])
jman.main([self.jman, '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_3', '--dependencies', '1', '2', '--exec-dir', rdir, bash, "test_array.sh"])
jman.main([self.jman, '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--repeat', '2', bash, script_1])
# check that the database was created successfully
self.assertTrue(os.path.exists(self.database))
......@@ -67,19 +68,24 @@ class GridTKTest(unittest.TestCase):
# test that the list command works (should also work with the "default" grid manager
jman.main([self.jman, '--database', self.database, 'list', '--job-ids', '1'])
jman.main([self.jman, '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies', '--print-times'])
jman.main([self.jman, '--database', self.database, 'list', '--job-ids', '4-5', '--print-array-jobs', '--print-dependencies', '--print-times'])
# get insight into the database
job_manager = gridtk.local.JobManagerLocal(database=self.database)
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 3)
self.assertEqual(len(jobs), 5)
self.assertEqual(jobs[0].id, 1)
self.assertEqual(jobs[1].id, 2)
self.assertEqual(jobs[2].id, 3)
self.assertEqual(jobs[3].id, 4)
self.assertEqual(jobs[4].id, 5)
self.assertEqual(len(jobs[1].array), 4)
self.assertEqual(jobs[0].status, 'submitted')
self.assertEqual(jobs[1].status, 'submitted')
self.assertEqual(jobs[2].status, 'submitted')
self.assertEqual(jobs[3].status, 'submitted')
self.assertEqual(jobs[4].status, 'submitted')
self.assertTrue(all(j.submit_time is not None for j in jobs))
self.assertTrue(all(j.start_time is None for j in jobs))
self.assertTrue(all(j.finish_time is None for j in jobs))
......@@ -97,6 +103,14 @@ class GridTKTest(unittest.TestCase):
self.assertEqual(waited[0].id, 1)
self.assertEqual(waited[1].id, 2)
# check dependencies for --repeat
waiting = jobs[3].get_jobs_waiting_for_us()
self.assertEqual(len(waiting), 1)
self.assertEqual(waiting[0].id, 5)
waited = jobs[4].get_jobs_we_wait_for()
self.assertEqual(len(waited), 1)
self.assertEqual(waited[0].id, 4)
job_manager.unlock()
# now, start the local execution of the job in a parallel job
......@@ -111,7 +125,7 @@ class GridTKTest(unittest.TestCase):
# now, the first job needs to have status failure, and the second needs to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 3)
self.assertEqual(len(jobs), 5)
if jobs[0].status in ('submitted', 'queued', 'executing'):
# on slow machines, we don0t want the tests to fail, so we just skip
job_manager.unlock()
......@@ -147,7 +161,7 @@ class GridTKTest(unittest.TestCase):
# Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 3)
self.assertEqual(len(jobs), 5)
if jobs[0].status in ('queued', 'executing') or jobs[1].status == 'queued':
# on slow machines, we don0t want the tests to fail, so we just skip
job_manager.unlock()
......@@ -195,7 +209,7 @@ class GridTKTest(unittest.TestCase):
# check that exactly four output and four error files have been created
files = os.listdir(self.log_dir)
self.assertEqual(len(files), 12)
self.assertEqual(len(files), 16)
for i in range(1,8,2):
self.assertTrue('test_2.o2.%d'%i in files)
self.assertTrue('test_2.e2.%d'%i in files)
......@@ -203,7 +217,7 @@ class GridTKTest(unittest.TestCase):
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 3)
self.assertEqual(len(jobs), 5)
self.assertEqual(jobs[1].status, 'failure')
self.assertEqual(jobs[1].array[0].status, 'failure')
self.assertEqual(jobs[1].array[0].result, 1)
......@@ -232,7 +246,7 @@ class GridTKTest(unittest.TestCase):
jman.main([self.jman, '--database', self.database, 'report'])
# clean-up
jman.main([self.jman, '--local', '--database', self.database, 'delete', '--job-ids', '1-3'])
jman.main([self.jman, '--local', '--database', self.database, 'delete', '--job-ids', '1-5'])
# check that the database and the log files are gone
self.assertEqual(len(os.listdir(self.temp_dir)), 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment