=== modified file 'doc/changes.rst'
@@ -2,6 +2,16 @@
***************
+.. _version_0_12_1:
+
+Version 0.12.1
+==============
+
+* Enforce limits on how long jobs can run for and how large log files
+ can grow to in the scheduler monitor.
+* When killing off a process, escalate through SIGINT, SIGTERM,
+ SIGKILL signals.
+
.. _version_0_12:
Version 0.12
=== modified file 'fake-dispatcher'
@@ -1,4 +1,14 @@
#!/bin/sh
+
+# This is just a silly fake dispatcher script that can be used to test
+# the scheduler without requiring real hardware to be present. To use
+# it, run "lava-scheduler manage scheduler --use-fake". If you want
+# to test something in particular, just hack it as required...
+
+# This makes this script not exit when sent SIGINT, to test the
+# killing of jobs that do not die easily.
+trap "" 2
+
echo starting processing $1
echo error >&2
for i in `seq 100`; do
=== modified file 'lava_scheduler_app/extension.py'
@@ -65,3 +65,20 @@
def contribute_to_settings(self, settings_module):
super(SchedulerExtension, self).contribute_to_settings(settings_module)
settings_module['INSTALLED_APPS'].append('django_tables2')
+ from_module = settings_module.get('SCHEDULER_DAEMON_OPTIONS', {})
+ settings_module['SCHEDULER_DAEMON_OPTIONS'] = {
+ 'LOG_FILE_PATH': None,
+ 'LOG_LEVEL': "WARNING",
+ # 500 megs should be enough for anyone
+ 'LOG_FILE_SIZE_LIMIT': 500*1024*1024,
+ # Jobs always specify a timeout, but I suspect its often too low.
+ # So we don't let it go below this value, which defaults to a day.
+ 'MIN_JOB_TIMEOUT': 24*60*60,
+ }
+ settings_module['SCHEDULER_DAEMON_OPTIONS'].update(from_module)
+
+ def contribute_to_settings_ex(self, settings_module, settings_object):
+ super(SchedulerExtension, self).contribute_to_settings_ex(
+ settings_module, settings_object)
+ settings_module['SCHEDULER_DAEMON_OPTIONS'].update(
+ settings_object.get_setting('SCHEDULER_DAEMON_OPTIONS', {}))
=== modified file 'lava_scheduler_app/management/commands/__init__.py'
@@ -1,23 +1,99 @@
-import logging
-import sys
+import logging.config
+from optparse import make_option
from django.core.management.base import BaseCommand
class SchedulerCommand(BaseCommand):
+ option_list = BaseCommand.option_list + (
+ make_option('-l', '--loglevel',
+ action='store',
+ default=None,
+ help="Log level, default is taken from settings."),
+ make_option('-f', '--logfile',
+ action='store',
+ default=None,
+ help="Path to log file, default is taken from settings."),
+ )
+
log_prefix = ''
- def _configure_logging(self, loglevel, logfile=None):
- logger = logging.getLogger('')
- if logfile is None:
- handler = logging.StreamHandler(sys.stderr)
+
+ _DEFAULT_LOGGING = {
+ 'version': 1,
+ 'disable_existing_loggers': True,
+ 'root': {
+ 'level': None,
+ 'handlers': ['default'],
+ },
+ 'formatters': {
+ 'default': {
+ 'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
+ },
+ },
+ 'handlers': {
+ 'default': {
+ 'level': 'DEBUG',
+ 'class': 'logging.FileHandler',
+ 'formatter': 'verbose'
+ }
+ },
+ }
+
+
+ def _configure(self, options):
+ from django.conf import settings
+
+ daemon_options = settings.SCHEDULER_DAEMON_OPTIONS.copy()
+ if options['logfile'] is not None:
+ daemon_options['LOG_FILE_PATH'] = options['logfile']
+ if options['loglevel'] is not None:
+ daemon_options['LOG_LEVEL'] = options['loglevel']
+
+ if daemon_options['LOG_FILE_PATH'] in [None, '-']:
+ handler = {
+ 'level': 'DEBUG',
+ 'class': 'logging.StreamHandler',
+ 'formatter': 'default',
+ }
else:
- handler = logging.FileHandler(logfile)
+ handler = {
+ 'level': 'DEBUG',
+ 'class': 'logging.FileHandler',
+ 'filename': daemon_options['LOG_FILE_PATH'],
+ 'formatter': 'default'
+ }
+
fmt = "%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
if self.log_prefix:
fmt = self.log_prefix + ' ' + fmt
- handler.setFormatter(logging.Formatter(fmt))
- logger.addHandler(handler)
- logger.setLevel(getattr(logging, loglevel.upper()))
+
+
+ LOGGING = {
+ 'version': 1,
+ 'disable_existing_loggers': True,
+ 'root': {
+ 'level': daemon_options['LOG_LEVEL'].upper(),
+ 'handlers': ['default'],
+ },
+ 'formatters': {'default': {'format': fmt}},
+ 'handlers': {'default': handler}
+ }
+
+ try:
+ import lava.raven
+ except ImportError:
+ pass
+ else:
+ LOGGING['handlers']['sentry'] = {
+ 'level': 'ERROR',
+ 'class': 'raven.contrib.django.handlers.SentryHandler',
+ }
+ LOGGING['root']['handlers'].append('sentry')
+
+
+ logging.config.dictConfig(LOGGING)
+
+ return daemon_options
=== modified file 'lava_scheduler_app/management/commands/scheduler.py'
@@ -35,14 +35,6 @@
dest="dispatcher",
default="lava-dispatch",
help="Dispatcher command to invoke"),
- make_option('-l', '--loglevel',
- action='store',
- default='WARNING',
- help="Log level, default is WARNING"),
- make_option('-f', '--logfile',
- action='store',
- default=None,
- help="Path to log file"),
)
def handle(self, *args, **options):
@@ -51,10 +43,9 @@
from twisted.internet import reactor
from lava_scheduler_daemon.service import BoardSet
-
from lava_scheduler_daemon.dbjobsource import DatabaseJobSource
- self._configure_logging(options['loglevel'], options['logfile'])
+ daemon_options = self._configure(options)
source = DatabaseJobSource()
@@ -67,7 +58,6 @@
else:
dispatcher = options['dispatcher']
service = BoardSet(
- source, dispatcher, reactor, log_file=options['logfile'],
- log_level=options['loglevel'])
+ source, dispatcher, reactor, daemon_options=daemon_options)
reactor.callWhenRunning(service.startService)
reactor.run()
=== modified file 'lava_scheduler_app/management/commands/schedulermonitor.py'
@@ -16,7 +16,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
-from optparse import make_option
import simplejson
@@ -27,40 +26,19 @@
class Command(SchedulerCommand):
help = "Run the LAVA test job scheduler"
- option_list = SchedulerCommand.option_list + (
- make_option('--use-fake',
- action='store_true',
- dest='use_fake',
- default=False,
- help="Use fake dispatcher (for testing)"),
- make_option('--dispatcher',
- action="store",
- dest="dispatcher",
- default="lava-dispatch",
- help="Dispatcher command to invoke"),
- make_option('-l', '--loglevel',
- action='store',
- default='WARNING',
- help="Log level, default is WARNING"),
- make_option('-f', '--logfile',
- action='store',
- default=None,
- help="Path to log file"),
- )
log_prefix = 'M'
def handle(self, *args, **options):
from twisted.internet import reactor
from lava_scheduler_daemon.board import Job
+ daemon_options = self._configure(options)
source = DatabaseJobSource()
dispatcher, board_name, json_file = args
job = Job(
simplejson.load(open(json_file)), dispatcher,
- source, board_name, reactor, log_file=options['logfile'],
- log_level=options['loglevel'])
+ source, board_name, reactor, daemon_options=daemon_options)
def run():
job.run().addCallback(lambda result: reactor.stop())
reactor.callWhenRunning(run)
- self._configure_logging(options['loglevel'], options['logfile'])
reactor.run()
=== modified file 'lava_scheduler_daemon/board.py'
@@ -4,6 +4,7 @@
import tempfile
import logging
+from twisted.internet.error import ProcessExitedAlready
from twisted.internet.protocol import ProcessProtocol
from twisted.internet import defer, task
from twisted.protocols.basic import LineReceiver
@@ -16,14 +17,15 @@
failure.getTraceback())
return eb
+OOB_FD = 3
+
class OOBDataProtocol(LineReceiver):
- logger = logging.getLogger(__name__ + '.OOBDataProtocol')
-
delimiter = '\n'
def __init__(self, source, board_name, _source_lock):
+ self.logger = logging.getLogger(__name__ + '.OOBDataProtocol')
self.source = source
self.board_name = board_name
self._source_lock = _source_lock
@@ -41,18 +43,22 @@
class DispatcherProcessProtocol(ProcessProtocol):
- logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
- def __init__(self, deferred, log_file, source, board_name, _source_lock):
+ def __init__(self, deferred, log_file, job):
+ self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
self.deferred = deferred
self.log_file = log_file
- self.source = source
- self.oob_data = OOBDataProtocol(source, board_name, _source_lock)
+ self.job = job
+ self.oob_data = OOBDataProtocol(
+ job.source, job.board_name, job._source_lock)
def childDataReceived(self, childFD, data):
- if childFD == 3:
+ if childFD == OOB_FD:
self.oob_data.dataReceived(data)
self.log_file.write(data)
+ if self.log_file.tell() > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
+ if not self.job._killing:
+ self.job.cancel("exceeded log size limit")
self.log_file.flush()
def processEnded(self, reason):
@@ -62,55 +68,96 @@
class Job(object):
- logger = logging.getLogger(__name__ + '.Job')
def __init__(self, job_data, dispatcher, source, board_name, reactor,
- log_file, log_level):
+ daemon_options):
self.job_data = job_data
self.dispatcher = dispatcher
self.source = source
self.board_name = board_name
+ self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
self.reactor = reactor
+ self.daemon_options = daemon_options
self._json_file = None
self._source_lock = defer.DeferredLock()
self._checkCancel_call = task.LoopingCall(self._checkCancel)
+ self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
+ self._time_limit_call = None
+ self._killing = False
+ self.job_log_file = None
def _checkCancel(self):
- return self._source_lock.run(
- self.source.jobCheckForCancellation, self.board_name).addCallback(
- self._maybeCancel)
+ if self._killing:
+ self.cancel()
+ else:
+ return self._source_lock.run(
+ self.source.jobCheckForCancellation, self.board_name).addCallback(
+ self._maybeCancel)
+
+ def cancel(self, reason=None):
+ if not self._killing and reason is None:
+ reason = "killing job for unknown reason"
+ if not self._killing:
+ self.logger.info(reason)
+ self.job_log_file.write("\n%s\n" % reason.upper())
+ self._killing = True
+ if self._signals:
+ signame = self._signals.pop(0)
+ else:
+ self.logger.warning("self._signals is empty!")
+ signame = 'SIGKILL'
+ self.logger.info(
+ 'attempting to kill job with signal %s' % signame)
+ try:
+ self._protocol.transport.signalProcess(getattr(signal, signame))
+ except ProcessExitedAlready:
+ pass
def _maybeCancel(self, cancel):
if cancel:
- self._protocol.transport.signalProcess(signal.SIGINT)
+ self.cancel("killing job by user request")
+ else:
+ logging.debug('not cancelling')
+
+ def _time_limit_exceeded(self):
+ self._time_limit_call = None
+ self.cancel("killing job for exceeding timeout")
def run(self):
d = self.source.getLogFileForJobOnBoard(self.board_name)
return d.addCallback(self._run).addErrback(
catchall_errback(self.logger))
- def _run(self, log_file):
+ def _run(self, job_log_file):
d = defer.Deferred()
json_data = self.job_data
fd, self._json_file = tempfile.mkstemp()
with os.fdopen(fd, 'wb') as f:
json.dump(json_data, f)
self._protocol = DispatcherProcessProtocol(
- d, log_file, self.source, self.board_name, self._source_lock)
+ d, job_log_file, self)
+ self.job_log_file = job_log_file
self.reactor.spawnProcess(
self._protocol, self.dispatcher, args=[
- self.dispatcher, self._json_file, '--oob-fd', '3'],
- childFDs={0:0, 1:'r', 2:'r', 3:'r'}, env=None)
+ self.dispatcher, self._json_file, '--oob-fd', str(OOB_FD)],
+ childFDs={0:0, 1:'r', 2:'r', OOB_FD:'r'}, env=None)
self._checkCancel_call.start(10)
+ timeout = max(
+ json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
+ self._time_limit_call = self.reactor.callLater(
+ timeout, self._time_limit_exceeded)
d.addBoth(self._exited)
return d
+
def _exited(self, exit_code):
self.logger.info("job finished on %s", self.job_data['target'])
if self._json_file is not None:
os.unlink(self._json_file)
self.logger.info("reporting job completed")
- self._source_lock.run(self._checkCancel_call.stop)
+ if self._time_limit_call is not None:
+ self._time_limit_call.cancel()
+ self._checkCancel_call.stop()
return self._source_lock.run(
self.source.jobCompleted, self.board_name, exit_code).addCallback(
lambda r:exit_code)
@@ -125,17 +172,16 @@
class MonitorJob(object):
- logger = logging.getLogger(__name__ + '.MonitorJob')
def __init__(self, job_data, dispatcher, source, board_name, reactor,
- log_file, log_level):
+ daemon_options):
+ self.logger = logging.getLogger(__name__ + '.MonitorJob')
self.job_data = job_data
self.dispatcher = dispatcher
self.source = source
self.board_name = board_name
self.reactor = reactor
- self.log_file = log_file
- self.log_level = log_level
+ self.daemon_options = daemon_options
self._json_file = None
def run(self):
@@ -147,9 +193,9 @@
args = [
'setsid', 'lava-server', 'manage', 'schedulermonitor',
self.dispatcher, str(self.board_name), self._json_file,
- '-l', self.log_level]
- if self.log_file:
- args.extend(['-f', self.log_file])
+ '-l', self.daemon_options['LOG_LEVEL']]
+ if self.daemon_options['LOG_FILE_PATH']:
+ args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
self.logger.info('executing "%s"', ' '.join(args))
self.reactor.spawnProcess(
SimplePP(d), 'setsid', childFDs={0:0, 1:1, 2:2},
@@ -213,14 +259,12 @@
job_cls = MonitorJob
- def __init__(self, source, board_name, dispatcher, reactor, log_file,
- log_level, job_cls=None):
+ def __init__(self, source, board_name, dispatcher, reactor, daemon_options, job_cls=None):
self.source = source
self.board_name = board_name
self.dispatcher = dispatcher
self.reactor = reactor
- self.log_file = log_file
- self.log_level = log_level
+ self.daemon_options = daemon_options
if job_cls is not None:
self.job_cls = job_cls
self.running_job = None
@@ -301,7 +345,7 @@
self.logger.info("starting job %r", job_data)
self.running_job = self.job_cls(
job_data, self.dispatcher, self.source, self.board_name,
- self.reactor, self.log_file, self.log_level)
+ self.reactor, self.daemon_options)
d = self.running_job.run()
d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
=== modified file 'lava_scheduler_daemon/service.py'
@@ -9,15 +9,13 @@
class BoardSet(Service):
- logger = logging.getLogger(__name__ + '.BoardSet')
-
- def __init__(self, source, dispatcher, reactor, log_file, log_level):
+ def __init__(self, source, dispatcher, reactor, daemon_options):
+ self.logger = logging.getLogger(__name__ + '.BoardSet')
self.source = source
self.boards = {}
self.dispatcher = dispatcher
self.reactor = reactor
- self.log_file = log_file
- self.log_level = log_level
+ self.daemon_options = daemon_options
self._update_boards_call = LoopingCall(self._updateBoards)
self._update_boards_call.clock = reactor
@@ -37,7 +35,7 @@
else:
new_boards[board_name] = Board(
self.source, board_name, self.dispatcher, self.reactor,
- self.log_file, self.log_level)
+ self.daemon_options)
new_boards[board_name].start()
for board in self.boards.values():
board.stop()
=== modified file 'lava_scheduler_daemon/tests/test_board.py'
@@ -36,7 +36,7 @@
class TestJob(object):
- def __init__(self, job_data, dispatcher, source, board_name, reactor):
+ def __init__(self, job_data, dispatcher, source, board_name, reactor, options):
self.json_data = job_data
self.dispatcher = dispatcher
self.reactor = reactor
@@ -76,7 +76,7 @@
def make_board(self, board_name):
board = Board(
- self.source, board_name, 'script', self.clock, job_cls=TestJob)
+ self.source, board_name, 'script', self.clock, None, job_cls=TestJob)
board.logger.addHandler(self._handler)
board.logger.setLevel(logging.DEBUG)
return board