=== modified file '.bzrignore'
@@ -1,1 +1,3 @@
*.egg-info
+./twistd.pid
+./_trial_temp
=== added file 'fake-dispatcher'
@@ -0,0 +1,6 @@
+#!/bin/sh
+echo starting processing $1
+echo error >&2
+sleep 10
+cat $1
+echo ending
=== added file 'lava-scheduler-daemon.tac'
@@ -0,0 +1,22 @@
+import logging
+import sys
+
+from twisted.application import service
+from twisted.application import internet
+from twisted.python import filepath
+from twisted.internet import reactor
+
+from lava_scheduler_daemon.service import BoardSet
+from lava_scheduler_daemon.jobsource import DirectoryJobSource
+
+application = service.Application("lava scheduler daemon")
+
+source = DirectoryJobSource(filepath.FilePath('/tmp/lava-jobs'))
+board_set = BoardSet(source, 'fake-dispatcher', reactor)
+board_set.setServiceParent(application)
+
+logger = logging.getLogger('')
+handler = logging.StreamHandler(sys.stdout)
+handler.setFormatter(logging.Formatter("[%(name)s] %(message)s"))
+logger.addHandler(handler)
+logger.setLevel(logging.DEBUG)
=== added directory 'lava_scheduler_daemon'
=== added file 'lava_scheduler_daemon/__init__.py'
=== added file 'lava_scheduler_daemon/board.py'
@@ -0,0 +1,223 @@
+import json
+import os
+import tempfile
+import logging
+
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet import defer
+
+
+logger = logging.getLogger(__name__)
+
+
+class DispatcherProcessProtocol(ProcessProtocol):
+
+ logger = logger.getChild('DispatcherProcessProtocol')
+
+ def __init__(self, deferred):
+ self.deferred = deferred
+
+ def connectionMade(self):
+ fd, self._logpath = tempfile.mkstemp()
+ self._output = os.fdopen(fd, 'wb')
+
+ def outReceived(self, text):
+ self._output.write(text)
+
+ errReceived = outReceived
+
+ def _cleanUp(self, result):
+ os.unlink(self._logpath)
+ return result
+
+ def processEnded(self, reason):
+ # This discards the process exit value.
+ self._output.close()
+ self.deferred.callback(self._logpath)
+ self.deferred.addCallback(self._cleanUp)
+
+
+class Job(object):
+
+ logger = logger.getChild('Job')
+
+ def __init__(self, json_data, dispatcher, reactor):
+ self.json_data = json_data
+ self.dispatcher = dispatcher
+ self.reactor = reactor
+ self._json_file = None
+
+ def run(self):
+ d = defer.Deferred()
+ fd, self._json_file = tempfile.mkstemp()
+ with os.fdopen(fd, 'wb') as f:
+ json.dump(self.json_data, f)
+ self.reactor.spawnProcess(
+ DispatcherProcessProtocol(d), self.dispatcher,
+ args=[self.dispatcher, self._json_file],
+ childFDs={0:0, 1:'r', 2:'r'})
+ d.addBoth(self._exited)
+ return d
+
+ def _exited(self, log_file_path):
+ self.logger.info("job finished on %s", self.json_data['target'])
+ if self._json_file is not None:
+ os.unlink(self._json_file)
+ return log_file_path
+
+
+class Board(object):
+ """
+
+ A board runs jobs. A board can be in four main states:
+
+ * stopped (S)
+ * the board is not looking for or processing jobs
+ * checking (C)
+ * a call to check for a new job is in progress
+ * waiting (W)
+ * no job was found by the last call to getJobForBoard and so the board
+ is waiting for a while before calling again.
+ * running (R)
+ * a job is running (or a job has completed but the call to jobCompleted
+ on the job source has not)
+
+ In addition, because we can't stop a job instantly nor abort a check for a
+ new job safely (because a if getJobForBoard returns a job, it has already
+ been marked as started), there are variations on the 'checking' and
+ 'running' states -- 'checking with stop requested' (C+S) and 'running with
+ stop requested' (R+S). Even this is a little simplistic as there is the
+ possibility of .start() being called before the process of stopping
+ completes, but we deal with this by deferring any actions taken by
+ .start() until the board is really stopped.
+
+ Events that cause state transitions are:
+
+ * start() is called. We cheat and pretend that this can only happen in
+ the stopped state by stopping first, and then move into the C state.
+
+ * stop() is called. If we in the C or R state we move to C+S or R+S
+ resepectively. If we are in S, C+S or R+S, we stay there. If we are
+ in W, we just move straight to S.
+
+ * getJobForBoard() returns a job. We can only be in C or C+S here, and
+ move into R or R+S respectively.
+
+ * getJobForBoard() indicates that there is no job to perform. Again we
+ can only be in C or C+S and move into W or S respectively.
+
+ * a job completes (i.e. the call to jobCompleted() on the source
+ returns). We can only be in R or R+S and move to C or S respectively.
+
+ * the timer that being in state W implies expires. We move into C.
+
+ The cheating around start means that interleaving start and stop calls may
+ not always do what you expect. So don't mess around in that way please.
+ """
+
+ logger = logger.getChild('Board')
+
+ job_cls = Job
+
+ def __init__(self, source, board_name, dispatcher, reactor, job_cls=None):
+ self.source = source
+ self.board_name = board_name
+ self.dispatcher = dispatcher
+ self.reactor = reactor
+ if job_cls is not None:
+ self.job_cls = job_cls
+ self.running_job = None
+ self._check_call = None
+ self._stopping_deferreds = []
+ self.logger = self.logger.getChild(board_name)
+ self.checking = False
+
+ def _state_name(self):
+ if self.running_job:
+ state = "R"
+ elif self._check_call:
+ assert not self._stopping_deferreds
+ state = "W"
+ elif self.checking:
+ state = "C"
+ else:
+ assert not self._stopping_deferreds
+ state = "S"
+ if self._stopping_deferreds:
+ state += "+S"
+ return state
+
+ def start(self):
+ self.logger.debug("start requested")
+ self.stop().addCallback(self._start)
+
+ def _start(self, ignored):
+ self.logger.debug("starting")
+ self._stopping_deferreds = []
+ self._checkForJob()
+
+ def stop(self):
+ self.logger.debug("stopping")
+ if self._check_call is not None:
+ self._check_call.cancel()
+ self._check_call = None
+
+ if self.running_job is not None or self.checking:
+ self.logger.debug("job running; deferring stop")
+ self._stopping_deferreds.append(defer.Deferred())
+ return self._stopping_deferreds[-1]
+ else:
+ self.logger.debug("stopping immediately")
+ return defer.succeed(None)
+
+ def _checkForJob(self):
+ self.logger.debug("checking for job")
+ self._check_call = None
+ self.checking = True
+ self.source.getJobForBoard(self.board_name).addCallbacks(
+ self._maybeStartJob, self._ebCheckForJob)
+
+ def _ebCheckForJob(self, result):
+ self.logger.exception(result.value)
+ self._maybeStartJob(None)
+
+ def _finish_stop(self):
+ self.logger.debug(
+ "calling %s deferreds returned from stop()",
+ len(self._stopping_deferreds))
+ for d in self._stopping_deferreds:
+ d.callback(None)
+ self._stopping_deferreds = []
+
+ def _maybeStartJob(self, json_data):
+ self.checking = False
+ if json_data is None:
+ self.logger.debug("no job found")
+ if self._stopping_deferreds:
+ self._finish_stop()
+ else:
+ self._check_call = self.reactor.callLater(
+ 10, self._checkForJob)
+ return
+ self.logger.debug("starting job")
+ self.running_job = self.job_cls(
+ json_data, self.dispatcher, self.reactor)
+ d = self.running_job.run()
+ d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
+
+ def _cbJobFinished(self, log_file_path):
+ self.logger.info("reporting job completed")
+ self.source.jobCompleted(
+ self.board_name, log_file_path). addCallback(
+ self._cbJobCompleted)
+
+ def _ebJobFinished(self, result):
+ self.logger.exception(result.value)
+ self._checkForJob()
+
+ def _cbJobCompleted(self, result):
+ self.running_job = None
+ if self._stopping_deferreds:
+ self._finish_stop()
+ else:
+ self._checkForJob()
=== added file 'lava_scheduler_daemon/jobsource.py'
@@ -0,0 +1,95 @@
+import json
+import logging
+
+from twisted.internet import defer
+
+from zope.interface import (
+ implements,
+ Interface,
+ )
+
+logger = logging.getLogger(__name__)
+
+
+class IJobSource(Interface):
+
+ def getBoardList():
+ """Get the list of currently configured board names."""
+
+ def getJobForBoard(board_name):
+ """Return the json data of a job for board_name to run.
+
+ The job should be marked as started before it is returned.
+ """
+
+ def jobCompleted(board_name, log_file_path):
+ """Mark the job currently running on `board_name` as completed."""
+
+
+class DirectoryJobSource(object):
+
+ implements(IJobSource)
+
+ logger = logger.getChild('DirectoryJobSource')
+
+ def __init__(self, directory):
+ self.directory = directory
+ if not self.directory.isdir():
+ self.logger.critical("%s is not a directory", self.directory)
+ raise RuntimeError("%s must be a directory" % self.directory)
+ boards = self.directory.child('boards')
+ if not boards.isdir():
+ self.logger.critical("%s is not a directory", boards)
+ raise RuntimeError("%s must be a directory" % boards)
+ for subdir in 'incoming', 'completed', 'broken':
+ subdir = self.directory.child(subdir)
+ if not subdir.isdir():
+ subdir.createDirectory()
+ self.logger.info("starting to look for jobs in %s", self.directory)
+
+ def _getBoardList(self):
+ return self.directory.child('boards').listdir()
+
+ def getBoardList(self):
+ return defer.maybeDeferred(self._getBoardList)
+
+ def _jsons(self, kind):
+ files = self.directory.child(kind).globChildren("*.json")
+ for json_file in files:
+ yield (json.load(json_file.open()), json_file)
+
+ def _board_dir(self, board_name):
+ return self.directory.child('boards').child(board_name)
+
+ def _getJobForBoard(self, board_name):
+ self.logger.debug('getting job for %s', board_name)
+ board_dir = self._board_dir(board_name)
+ if board_dir.listdir() != []:
+ self.logger.debug('board %s busy', board_name)
+ return None
+ for json_data, json_file in self._jsons('incoming'):
+ self.logger.debug('considering %s for %s', json_file, board_name)
+ if json_data['target'] == board_name:
+ self.logger.debug('running %s on %s', json_file, board_name)
+ json_file.moveTo(board_dir.child(json_file.basename()))
+ return json_data
+ else:
+ return None
+
+ def getJobForBoard(self, board_name):
+ return defer.maybeDeferred(self._getJobForBoard, board_name)
+
+ def _jobCompleted(self, board_name, log_file_path):
+ [json_file] = self._board_dir(board_name).children()
+ completed = self.directory.child('completed')
+ counter = 0
+ while True:
+ fname = '%03d%s' % (counter, json_file.basename())
+ if not completed.child(fname).exists():
+ break
+ counter += 1
+ json_file.moveTo(completed.child(fname))
+
+ def jobCompleted(self, board_name, log_file_path):
+ return defer.maybeDeferred(
+ self._jobCompleted, board_name, log_file_path)
=== added file 'lava_scheduler_daemon/service.py'
@@ -0,0 +1,56 @@
+import logging
+
+from twisted.application.service import Service
+from twisted.internet import defer
+from twisted.internet.task import LoopingCall
+
+from lava_scheduler_daemon.board import Board
+
+
+logger = logging.getLogger(__name__)
+
+
+class BoardSet(Service):
+
+ logger = logger.getChild('BoardSet')
+
+ def __init__(self, source, dispatcher, reactor):
+ self.source = source
+ self.boards = {}
+ self.dispatcher = dispatcher
+ self.reactor = reactor
+ self._update_boards_call = LoopingCall(self._updateBoards)
+ self._update_boards_call.clock = reactor
+
+ def _updateBoards(self):
+ self.logger.info("Refreshing board list")
+ return self.source.getBoardList().addCallback(self._cbUpdateBoards)
+
+ def _cbUpdateBoards(self, board_names):
+ self.logger.info("New board list %s", board_names)
+ new_boards = {}
+ for board_name in board_names:
+ if board_name in self.boards:
+ new_boards[board_name] = self.boards.pop(board_name)
+ else:
+ new_boards[board_name] = Board(
+ self.source, board_name, self.dispatcher, self.reactor)
+ new_boards[board_name].start()
+ for board in self.boards.values():
+ board.stop()
+ self.boards = new_boards
+
+ def startService(self):
+ self._update_boards_call.start(20)
+
+ def stopService(self):
+ self._update_boards_call.stop()
+ ds = []
+ dead_boards = []
+ for board in self.boards.itervalues():
+ ds.append(board.stop().addCallback(dead_boards.append))
+ self.logger.info(
+ "waiting for %s boards", len(self.boards) - len(dead_boards))
+ return defer.gatherResults(ds)
+
+
=== added directory 'lava_scheduler_daemon/tests'
=== added file 'lava_scheduler_daemon/tests/__init__.py'
=== added file 'lava_scheduler_daemon/tests/test_board.py'
@@ -0,0 +1,184 @@
+from collections import defaultdict
+import logging
+
+from twisted.internet import defer
+from twisted.internet.task import Clock
+from twisted.trial.unittest import TestCase
+
+from lava_scheduler_daemon.board import Board
+
+def stub_method(method_name):
+ def method_impl(self, board_name, *args):
+ assert method_name not in self._requests[board_name], (
+ 'overlapping call to %s on %s' % (method_name, board_name))
+ d = self._requests[method_name][board_name] = defer.Deferred()
+ def _remove_request(result):
+ del self._requests[method_name][board_name]
+ return result
+ d.addBoth(_remove_request)
+ self._calls[board_name][method_name].append(args)
+ return d
+ return method_impl
+
+
+class TestJobSource(object):
+
+ def __init__(self):
+ self._calls = defaultdict(lambda :defaultdict(list))
+ self._requests = defaultdict(dict)
+
+ jobCompleted = stub_method('jobCompleted')
+ getJobForBoard = stub_method('getJobForBoard')
+
+ def _completeCall(self, method_name, board_name, result):
+ self._requests[method_name][board_name].callback(result)
+
+class TestJob(object):
+
+ def __init__(self, json_data, dispatcher, reactor):
+ self.json_data = json_data
+ self.dispatcher = dispatcher
+ self.reactor = reactor
+ self.deferred = defer.Deferred()
+
+ def run(self):
+ return self.deferred
+
+
+class AppendingHandler(logging.Handler):
+
+ def __init__(self, target_list):
+ logging.Handler.__init__(self)
+ self.target_list = target_list
+
+ def emit(self, record):
+ self.target_list.append((record.levelno, self.format(record)))
+
+
+class TestBoard(TestCase):
+
+ def setUp(self):
+ TestCase.setUp(self)
+ self.clock = Clock()
+ self.source = TestJobSource()
+ self._log_messages = []
+ self._handler = AppendingHandler(self._log_messages)
+ self.addCleanup(self._checkNoLogs)
+
+ def _checkNoLogs(self):
+ warnings = [message for (level, message) in self._log_messages
+ if level >= logging.WARNING]
+ if warnings:
+ self.fail("Logged warnings: %s" % warnings)
+
+ def make_board(self, board_name):
+ board = Board(self.source, board_name, 'script', self.clock, TestJob)
+ board.logger.addHandler(self._handler)
+ board.logger.setLevel(logging.DEBUG)
+ return board
+
+ def test_initial_state_is_stopped(self):
+ b = self.make_board('board')
+ self.assertEqual('S', b._state_name())
+
+ def test_start_checks(self):
+ b = self.make_board('board')
+ b.start()
+ self.assertEqual('C', b._state_name())
+
+ def test_no_job_waits(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', None)
+ self.assertEqual('W', b._state_name())
+
+ def test_actual_job_runs(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', {})
+ self.assertEqual('R', b._state_name())
+
+ def test_completion_calls_jobCompleted(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', {})
+ b.running_job.deferred.callback('path')
+ self.assertEqual(
+ 1, len(self.source._calls['board']['jobCompleted']))
+
+ def test_still_running_during_jobCompleted(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', {})
+ b.running_job.deferred.callback('path')
+ self.assertEqual('R', b._state_name())
+
+ def test_check_again_on_completion(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', {})
+ b.running_job.deferred.callback('path')
+ self.source._completeCall('jobCompleted', 'board', None)
+ self.assertEqual('C', b._state_name())
+
+ def test_stop_while_checking_moves_to_check_plus_stop(self):
+ b = self.make_board('board')
+ b.start()
+ b.stop()
+ self.assertEqual('C+S', b._state_name())
+
+ def test_stop_while_checking_no_job_stops(self):
+ b = self.make_board('board')
+ b.start()
+ s = b.stop()
+ stop_results = []
+ s.addCallback(stop_results.append)
+ self.assertEqual(0, len(stop_results))
+ self.source._completeCall('getJobForBoard', 'board', None)
+ self.assertEqual(1, len(stop_results))
+ self.assertEqual('S', b._state_name())
+
+ def test_stop_while_checking_actual_job_runs(self):
+ b = self.make_board('board')
+ b.start()
+ s = b.stop()
+ stop_results = []
+ s.addCallback(stop_results.append)
+ self.assertEqual(0, len(stop_results))
+ self.source._completeCall('getJobForBoard', 'board', {})
+ self.assertEqual(0, len(stop_results))
+ self.assertEqual('R+S', b._state_name())
+
+ def test_stop_while_checking_actual_job_stops_on_complete(self):
+ b = self.make_board('board')
+ b.start()
+ s = b.stop()
+ stop_results = []
+ s.addCallback(stop_results.append)
+ self.assertEqual(0, len(stop_results))
+ self.source._completeCall('getJobForBoard', 'board', {})
+ b.running_job.deferred.callback(None)
+ self.source._completeCall('jobCompleted', 'board', None)
+ self.assertEqual(1, len(stop_results))
+ self.assertEqual('S', b._state_name())
+
+ def test_stop_while_running_job_stops_on_complete(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', {})
+ self.assertEqual('R', b._state_name())
+ s = b.stop()
+ stop_results = []
+ s.addCallback(stop_results.append)
+ self.assertEqual(0, len(stop_results))
+ b.running_job.deferred.callback(None)
+ self.source._completeCall('jobCompleted', 'board', None)
+ self.assertEqual(1, len(stop_results))
+ self.assertEqual('S', b._state_name())
+
+ def test_wait_expires_check_again(self):
+ b = self.make_board('board')
+ b.start()
+ self.source._completeCall('getJobForBoard', 'board', None)
+ self.clock.advance(10000) # hack: the delay should be config data
+ self.assertEqual('C', b._state_name())