grid.py 4.94 KB
Newer Older
1
2
3
4
5
from __future__ import print_function

import sys
import os
import math
Manuel Günther's avatar
Manuel Günther committed
6
from .. import grid
7
8
9
10
11
12
from .command_line import command_line

import bob.core
import logging
logger = logging.getLogger("bob.bio.base")

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def indices(list_to_split, number_of_parallel_jobs, task_id=None):
  """This function returns the first and last index for the files for the current job ID.
     If no job id is set (e.g., because a sub-job is executed locally), it simply returns all indices."""

  if number_of_parallel_jobs is None or number_of_parallel_jobs == 1:
    return None

  # test if the 'SEG_TASK_ID' environment is set
  sge_task_id = os.getenv('SGE_TASK_ID') if task_id is None else task_id
  if sge_task_id is None:
    # task id is not set, so this function is not called from a grid job
    # hence, we process the whole list
    return (0,len(list_to_split))
  else:
    job_id = int(sge_task_id) - 1
    # compute number of files to be executed
    number_of_objects_per_job = int(math.ceil(float(len(list_to_split) / float(number_of_parallel_jobs))))
    start = job_id * number_of_objects_per_job
    end = min((job_id + 1) * number_of_objects_per_job, len(list_to_split))
    return (start, end)
33
34
35


class GridSubmission:
Manuel Günther's avatar
Manuel Günther committed
36
  def __init__(self, args, command_line_parameters, executable = 'verify.py', first_fake_job_id = 0):
37
    # find, where the executable is installed
Manuel Günther's avatar
Manuel Günther committed
38
39
    import bob.extension

40
41
42
    if command_line_parameters is None:
      command_line_parameters = sys.argv[1:]

Manuel Günther's avatar
Manuel Günther committed
43
44
45
    executables = bob.extension.find_executable(executable, prefixes = ['bin'])
    if not len(executables):
      raise IOError("Could not find the '%s' executable." % executable)
46
47
    executable = executables[0]
    assert os.path.isfile(executable)
48
49
    self.executable = executable

50
    if args.grid is not None:
Manuel Günther's avatar
Manuel Günther committed
51
      assert isinstance(args.grid, grid.Grid)
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

      # find, where jman is installed
      jmans = bob.extension.find_executable('jman', prefixes = ['bin'])
      if not len(jmans):
        raise IOError("Could not find the 'jman' executable. Have you installed GridTK?")
      jman = jmans[0]
      assert os.path.isfile(jman)

      self.args = args
      self.command_line = [p for p in command_line_parameters if not p.startswith('--skip') and p not in ('-q', '--dry-run', '-o', '--execute-only')]
      self.fake_job_id = first_fake_job_id

      import gridtk
      # setup logger
      bob.core.log.set_verbosity_level(bob.core.log.setup("gridtk"), args.verbose)
      Manager = gridtk.local.JobManagerLocal if args.grid.is_local() else gridtk.sge.JobManagerSGE
      self.job_manager = Manager(database = args.gridtk_database_file, wrapper_script=jman)
      self.submitted_job_ids = []

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

  def submit(self, command, number_of_parallel_jobs = 1, dependencies=[], name = None, **kwargs):
    """Submit a grid job with the given command, which is added to the default command line.
    If the name is not given, it will take the second parameter of the ``command`` as name.
    """
    dependencies = dependencies + self.args.external_dependencies

    # create the command to be executed
    cmd = [self.executable] + self.command_line
    cmd += command.split()

    # if no job name is specified, create one
    if name is None:
      name = command.split()[1]
    # generate log directory
    log_dir = os.path.join(self.args.grid_log_directory, name)

    # generate job array
    if number_of_parallel_jobs > 1:
      array = (1,number_of_parallel_jobs,1)
    else:
      array = None

    # submit the job to the job manager
    if not self.args.dry_run:
      job_id = self.job_manager.submit(
          command_line = cmd,
          name = name,
          array = array,
          dependencies = dependencies,
          log_dir = log_dir,
102
          stop_on_failure = self.args.stop_on_failure,
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
          **kwargs
      )
      logger.info("submitted: job '%s' with id '%d' and dependencies '%s'" % (name, job_id, dependencies))
      self.submitted_job_ids.append(job_id)
      return job_id
    else:
      self.fake_job_id += 1
      print ('would have submitted job', name, 'with id', self.fake_job_id, 'with parameters', kwargs, end='')
      if array:
        print (' using', array[1], 'parallel jobs', end='')
      print (' as:', command_line(cmd), '\nwith dependencies', dependencies)
      return self.fake_job_id


  def grid_job_id(self):
    id = os.getenv('JOB_ID')
    if id is not None:
      return int(id)
    return id

  def execute_local(self):
    """Starts the local deamon and waits until it has finished."""
    logger.info("Starting jman deamon to run the jobs on the local machine.")
    failures = self.job_manager.run_scheduler(job_ids=self.submitted_job_ids, parallel_jobs=self.args.grid.number_of_parallel_processes, sleep_time=self.args.grid.scheduler_sleep_time, die_when_finished=True, nice=self.args.nice)
    if failures:
      logger.error("The jobs with the following IDS did not finish successfully: '%s'.", ', '.join([str(f) for f in failures]))
      self.job_manager.report(job_ids = failures[:1], output=False)