diff mbox

[Branch,~linaro-validation/lava-scheduler/trunk] Rev 66: The scheduler now runs a subprocess which runs the dispatcher and reports

Message ID 20110818044413.19141.77917.launchpad@ackee.canonical.com
State Accepted
Headers show

Commit Message

Michael-Doyle Hudson Aug. 18, 2011, 4:44 a.m. UTC
Merge authors:
  Michael Hudson-Doyle (mwhudson)
------------------------------------------------------------
revno: 66 [merge]
committer: Michael-Doyle Hudson <michael.hudson@linaro.org>
branch nick: trunk
timestamp: Thu 2011-08-18 16:41:46 +1200
message:
  The scheduler now runs a subprocess which runs the dispatcher and reports
  completion status to the db; this means that the scheduler can be killed and
  restarted while jobs are still running.
added:
  lava-scheduler-monitor
modified:
  lava-scheduler
  lava_scheduler_app/tests.py
  lava_scheduler_daemon/board.py
  lava_scheduler_daemon/dbjobsource.py
  lava_scheduler_daemon/main.py
  lava_scheduler_daemon/tests/test_board.py
  setup.py


--
lp:lava-scheduler
https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk

You are subscribed to branch lp:lava-scheduler.
To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription
diff mbox

Patch

=== modified file 'lava-scheduler'
--- lava-scheduler	2011-07-27 07:02:12 +0000
+++ lava-scheduler	2011-08-18 02:44:06 +0000
@@ -6,6 +6,6 @@ 
 # at package build time.
 os.environ['DJANGO_SETTINGS_MODULE'] = 'lava_server.settings.development'
 
-from lava_scheduler_daemon.main import main
+from lava_scheduler_daemon.main import daemon_main
 
-main()
+daemon_main()

=== added file 'lava-scheduler-monitor'
--- lava-scheduler-monitor	1970-01-01 00:00:00 +0000
+++ lava-scheduler-monitor	2011-08-18 02:44:06 +0000
@@ -0,0 +1,11 @@ 
+#!/usr/bin/python
+
+import os
+
+# The following line is mangled to point at the debian settings file
+# at package build time.
+os.environ['DJANGO_SETTINGS_MODULE'] = 'lava_server.settings.development'
+
+from lava_scheduler_daemon.main import monitor_main
+
+monitor_main()

=== modified file 'lava_scheduler_app/tests.py'
--- lava_scheduler_app/tests.py	2011-07-27 10:10:10 +0000
+++ lava_scheduler_app/tests.py	2011-08-18 03:24:09 +0000
@@ -195,17 +195,7 @@ 
             requested_device=device, definition=json.dumps(definition))
         transaction.commit()
         self.assertEqual(
-            definition, DatabaseJobSource().getJobForBoard_impl('panda01')[0])
-
-    def test_getJobForBoard_returns_writable_file(self):
-        device = self.factory.make_device(hostname='panda01')
-        definition = {'foo': 'bar'}
-        self.factory.make_testjob(
-            requested_device=device, definition=json.dumps(definition))
-        transaction.commit()
-        log_file = DatabaseJobSource().getJobForBoard_impl('panda01')[1]
-        log_file.write('a')
-        log_file.close()
+            definition, DatabaseJobSource().getJobForBoard_impl('panda01'))
 
     def test_getJobForBoard_returns_None_if_no_job(self):
         self.factory.make_device(hostname='panda01')
@@ -223,7 +213,7 @@ 
         transaction.commit()
         definition['target'] = 'panda01'
         self.assertEqual(
-            definition, DatabaseJobSource().getJobForBoard_impl('panda01')[0])
+            definition, DatabaseJobSource().getJobForBoard_impl('panda01'))
 
     def test_getJobForBoard_prefers_older(self):
         panda_type = self.factory.ensure_device_type(name='panda')
@@ -240,7 +230,7 @@ 
         transaction.commit()
         self.assertEqual(
             first_definition,
-            DatabaseJobSource().getJobForBoard_impl('panda01')[0])
+            DatabaseJobSource().getJobForBoard_impl('panda01'))
 
     def test_getJobForBoard_prefers_directly_targeted(self):
         panda_type = self.factory.ensure_device_type(name='panda')
@@ -258,7 +248,7 @@ 
         transaction.commit()
         self.assertEqual(
             device_definition,
-            DatabaseJobSource().getJobForBoard_impl('panda01')[0])
+            DatabaseJobSource().getJobForBoard_impl('panda01'))
 
     def test_getJobForBoard_avoids_targeted_to_other_board_of_same_type(self):
         panda_type = self.factory.ensure_device_type(name='panda')
@@ -352,3 +342,13 @@ 
         DatabaseJobSource().jobCompleted_impl('panda01')
         device = Device.objects.get(pk=device.pk)
         self.assertEquals(None, device.current_job)
+
+    def test_getLogFileForJobOnBoard_returns_writable_file(self):
+        device, job = self.get_device_and_running_job()
+        definition = {'foo': 'bar'}
+        self.factory.make_testjob(
+            requested_device=device, definition=json.dumps(definition))
+        transaction.commit()
+        log_file = DatabaseJobSource().getLogFileForJobOnBoard_impl('panda01')
+        log_file.write('a')
+        log_file.close()

=== modified file 'lava_scheduler_daemon/board.py'
--- lava_scheduler_daemon/board.py	2011-08-18 02:25:09 +0000
+++ lava_scheduler_daemon/board.py	2011-08-18 04:02:33 +0000
@@ -71,8 +71,13 @@ 
         self._json_file = None
 
     def run(self):
+        d = self.source.getLogFileForJobOnBoard(self.board_name)
+        return d.addCallback(self._run).addErrback(
+            catchall_errback(self.logger))
+
+    def _run(self, log_file):
         d = defer.Deferred()
-        json_data, log_file = self.job_data
+        json_data = self.job_data
         fd, self._json_file = tempfile.mkstemp()
         with os.fdopen(fd, 'wb') as f:
             json.dump(json_data, f)
@@ -86,7 +91,48 @@ 
         return d
 
     def _exited(self, result):
-        self.logger.info("job finished on %s", self.job_data[0]['target'])
+        self.logger.info("job finished on %s", self.job_data['target'])
+        if self._json_file is not None:
+            os.unlink(self._json_file)
+        self.logger.info("reporting job completed")
+        return self.source.jobCompleted(self.board_name).addCallback(
+            lambda r:result)
+
+
+class SimplePP(ProcessProtocol):
+    def __init__(self, d):
+        self.d = d
+    def processEnded(self, reason):
+        self.d.callback(None)
+
+
+class MonitorJob(object):
+
+    logger = logging.getLogger(__name__ + '.MonitorJob')
+
+    def __init__(self, job_data, dispatcher, source, board_name, reactor):
+        self.job_data = job_data
+        self.dispatcher = dispatcher
+        self.source = source
+        self.board_name = board_name
+        self.reactor = reactor
+        self._json_file = None
+
+    def run(self):
+        d = defer.Deferred()
+        json_data = self.job_data
+        fd, self._json_file = tempfile.mkstemp()
+        with os.fdopen(fd, 'wb') as f:
+            json.dump(json_data, f)
+        self.reactor.spawnProcess(
+            SimplePP(d), 'lava-scheduler-monitor', childFDs={0:0, 1:1, 2:2},
+            env=None, args=[
+                'lava-scheduler-monitor', self.dispatcher,
+                self.board_name, self._json_file])
+        d.addBoth(self._exited)
+        return d
+
+    def _exited(self, result):
         if self._json_file is not None:
             os.unlink(self._json_file)
         return result
@@ -140,7 +186,7 @@ 
     not always do what you expect.  So don't mess around in that way please.
     """
 
-    job_cls = Job
+    job_cls = MonitorJob
 
     def __init__(self, source, board_name, dispatcher, reactor, job_cls=None):
         self.source = source
@@ -231,16 +277,11 @@ 
         d = self.running_job.run()
         d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
 
-    def _cbJobFinished(self, result):
-        self.logger.info("reporting job completed")
-        self.source.jobCompleted(
-            self.board_name).addCallback(self._cbJobCompleted)
-
     def _ebJobFinished(self, result):
         self.logger.exception(result.value)
         self._checkForJob()
 
-    def _cbJobCompleted(self, result):
+    def _cbJobFinished(self, result):
         self.running_job = None
         if self._stopping_deferreds:
             self._finish_stop()

=== modified file 'lava_scheduler_daemon/dbjobsource.py'
--- lava_scheduler_daemon/dbjobsource.py	2011-08-18 02:19:55 +0000
+++ lava_scheduler_daemon/dbjobsource.py	2011-08-18 03:24:09 +0000
@@ -94,10 +94,7 @@ 
                     json_data = json.loads(job.definition)
                     json_data['target'] = device.hostname
                     transaction.commit()
-                    log_file = job.log_file
-                    log_file.file.close()
-                    log_file.open('wb')
-                    return json_data, log_file
+                    return json_data
             else:
                 # We don't really need to rollback here, as no modifying
                 # operations have been made to the database.  But Django is
@@ -111,6 +108,19 @@ 
         return self.deferForDB(self.getJobForBoard_impl, board_name)
 
     @transaction.commit_on_success()
+    def getLogFileForJobOnBoard_impl(self, board_name):
+        device = Device.objects.get(hostname=board_name)
+        device.status = Device.IDLE
+        job = device.current_job
+        log_file = job.log_file
+        log_file.file.close()
+        log_file.open('wb')
+        return log_file
+
+    def getLogFileForJobOnBoard(self, board_name):
+        return self.deferForDB(self.getLogFileForJobOnBoard_impl, board_name)
+
+    @transaction.commit_on_success()
     def jobCompleted_impl(self, board_name):
         self.logger.debug('marking job as complete on %s', board_name)
         device = Device.objects.get(hostname=board_name)

=== modified file 'lava_scheduler_daemon/main.py'
--- lava_scheduler_daemon/main.py	2011-08-17 03:18:13 +0000
+++ lava_scheduler_daemon/main.py	2011-08-18 03:34:27 +0000
@@ -2,14 +2,29 @@ 
 import os
 import sys
 
-from twisted.internet import reactor
+from twisted.internet import defer, reactor
 
 from lava_scheduler_daemon.service import BoardSet
 from lava_scheduler_daemon.config import get_config
 
 from lava_scheduler_daemon.dbjobsource import DatabaseJobSource
 
-def main():
+def _configure_logging():
+    logger = logging.getLogger('')
+    config = get_config('logging')
+    level = config.get("logging", "level")
+    destination = config.get("logging", 'destination', None)
+    if destination == '-':
+        handler = logging.StreamHandler(sys.stdout)
+    else:
+        handler = logging.FileHandler(destination)
+    handler.setFormatter(
+        logging.Formatter("[%(levelname)s] [%(name)s] %(message)s"))
+    logger.addHandler(handler)
+    logger.setLevel(getattr(logging, level))
+
+
+def daemon_main():
     source = DatabaseJobSource()
 
     if sys.argv[1:] == ['--use-fake']:
@@ -27,17 +42,20 @@ 
     service = BoardSet(source, dispatcher, reactor)
     reactor.callWhenRunning(service.startService)
 
-    logger = logging.getLogger('')
-    config = get_config('logging')
-    level = config.get("logging", "level")
-    destination = config.get("logging", 'destination', None)
-    if destination == '-':
-        handler = logging.StreamHandler(sys.stdout)
-    else:
-        handler = logging.FileHandler(destination)
-    handler.setFormatter(
-        logging.Formatter("[%(levelname)s] [%(name)s] %(message)s"))
-    logger.addHandler(handler)
-    logger.setLevel(getattr(logging, level))
-
+    _configure_logging()
+
+    reactor.run()
+
+
+def monitor_main():
+    import json
+    from lava_scheduler_daemon.board import Job
+    source = DatabaseJobSource()
+    dispatcher, board_name, json_file = sys.argv[1:]
+    job = Job(
+        json.load(open(json_file)), dispatcher, source, board_name, reactor)
+    def run():
+        job.run().addCallback(lambda result: reactor.stop())
+    reactor.callWhenRunning(run)
+    _configure_logging()
     reactor.run()

=== modified file 'lava_scheduler_daemon/tests/test_board.py'
--- lava_scheduler_daemon/tests/test_board.py	2011-08-05 10:50:10 +0000
+++ lava_scheduler_daemon/tests/test_board.py	2011-08-18 04:29:11 +0000
@@ -101,27 +101,11 @@ 
         self.source._completeCall('getJobForBoard', 'board', ({}, None))
         self.assertEqual('R', b._state_name())
 
-    def test_completion_calls_jobCompleted(self):
-        b = self.make_board('board')
-        b.start()
-        self.source._completeCall('getJobForBoard', 'board', ({}, None))
-        b.running_job.deferred.callback('path')
-        self.assertEqual(
-            1, len(self.source._calls['board']['jobCompleted']))
-
-    def test_still_running_during_jobCompleted(self):
-        b = self.make_board('board')
-        b.start()
-        self.source._completeCall('getJobForBoard', 'board', ({}, None))
-        b.running_job.deferred.callback('path')
-        self.assertEqual('R', b._state_name())
-
     def test_check_again_on_completion(self):
         b = self.make_board('board')
         b.start()
         self.source._completeCall('getJobForBoard', 'board', ({}, None))
         b.running_job.deferred.callback('path')
-        self.source._completeCall('jobCompleted', 'board', None)
         self.assertEqual('C', b._state_name())
 
     def test_stop_while_checking_moves_to_check_plus_stop(self):
@@ -161,7 +145,6 @@ 
         self.assertEqual(0, len(stop_results))
         self.source._completeCall('getJobForBoard', 'board', ({}, None))
         b.running_job.deferred.callback(None)
-        self.source._completeCall('jobCompleted', 'board', None)
         self.assertEqual(1, len(stop_results))
         self.assertEqual('S', b._state_name())
 
@@ -175,7 +158,6 @@ 
         s.addCallback(stop_results.append)
         self.assertEqual(0, len(stop_results))
         b.running_job.deferred.callback(None)
-        self.source._completeCall('jobCompleted', 'board', None)
         self.assertEqual(1, len(stop_results))
         self.assertEqual('S', b._state_name())
 

=== modified file 'setup.py'
--- setup.py	2011-07-27 10:24:50 +0000
+++ setup.py	2011-08-18 02:44:06 +0000
@@ -32,7 +32,7 @@ 
     [lava_server.extensions]
     scheduler = lava_scheduler_app.extension:SchedulerExtension
     """,
-    scripts=["lava-scheduler"],
+    scripts=["lava-scheduler", "lava-scheduler-monitor"],
     install_requires=[
         "lava-server >= 0.4a1",
         "twisted",