=== modified file 'lava_scheduler_app/api.py'
@@ -2,10 +2,12 @@
from simplejson import JSONDecodeError
from django.db.models import Count
from linaro_django_xmlrpc.models import ExposedAPI
+from lava_scheduler_app import utils
from lava_scheduler_app.models import (
Device,
DeviceType,
JSONDataError,
+ DevicesUnavailableException,
TestJob,
)
from lava_scheduler_app.views import (
@@ -35,14 +37,22 @@
raise xmlrpclib.Fault(404, "Specified device not found.")
except DeviceType.DoesNotExist:
raise xmlrpclib.Fault(404, "Specified device type not found.")
- return job.id
+ except DevicesUnavailableException as e:
+ raise xmlrpclib.Fault(400, str(e))
+ if isinstance(job, type(list())):
+ return job
+ else:
+ return job.id
def resubmit_job(self, job_id):
try:
job = TestJob.objects.accessible_by_principal(self.user).get(pk=job_id)
except TestJob.DoesNotExist:
raise xmlrpclib.Fault(404, "Specified job not found.")
- return self.submit_job(job.definition)
+ if job.is_multinode:
+ return self.submit_job(job.multinode_definition)
+ else:
+ return self.submit_job(job.definition)
def cancel_job(self, job_id):
if not self.user:
@@ -50,7 +60,13 @@
job = TestJob.objects.get(pk=job_id)
if not job.can_cancel(self.user):
raise xmlrpclib.Fault(403, "Permission denied.")
- job.cancel()
+ if job.is_multinode:
+ multinode_jobs = TestJob.objects.all().filter(
+ target_group=job.target_group)
+ for multinode_job in multinode_jobs:
+ multinode_job.cancel()
+ else:
+ job.cancel()
return True
def job_output(self, job_id):
=== modified file 'lava_scheduler_app/management/commands/scheduler.py'
@@ -43,7 +43,7 @@
from twisted.internet import reactor
- from lava_scheduler_daemon.service import BoardSet
+ from lava_scheduler_daemon.service import JobQueue
from lava_scheduler_daemon.dbjobsource import DatabaseJobSource
daemon_options = self._configure(options)
@@ -58,7 +58,7 @@
'fake-dispatcher')
else:
dispatcher = options['dispatcher']
- service = BoardSet(
+ service = JobQueue(
source, dispatcher, reactor, daemon_options=daemon_options)
reactor.callWhenRunning(service.startService)
reactor.run()
=== modified file 'lava_scheduler_app/management/commands/schedulermonitor.py'
@@ -31,7 +31,7 @@
def handle(self, *args, **options):
from twisted.internet import reactor
- from lava_scheduler_daemon.board import Job
+ from lava_scheduler_daemon.job import Job
daemon_options = self._configure(options)
source = DatabaseJobSource()
dispatcher, board_name, json_file = args
=== added file 'lava_scheduler_app/migrations/0030_auto__add_field_testjob_sub_id__add_field_testjob_target_group.py'
@@ -0,0 +1,165 @@
+# -*- coding: utf-8 -*-
+from south.db import db
+from south.v2 import SchemaMigration
+
+
+class Migration(SchemaMigration):
+
+ def forwards(self, orm):
+ # Adding field 'TestJob.sub_id'
+ db.add_column('lava_scheduler_app_testjob', 'sub_id',
+ self.gf('django.db.models.fields.CharField')(default='', max_length=200, blank=True),
+ keep_default=False)
+
+ # Adding field 'TestJob.target_group'
+ db.add_column('lava_scheduler_app_testjob', 'target_group',
+ self.gf('django.db.models.fields.CharField')(default=None, max_length=64, null=True, blank=True),
+ keep_default=False)
+
+ def backwards(self, orm):
+ # Deleting field 'TestJob.sub_id'
+ db.delete_column('lava_scheduler_app_testjob', 'sub_id')
+
+ # Deleting field 'TestJob.target_group'
+ db.delete_column('lava_scheduler_app_testjob', 'target_group')
+
+ models = {
+ 'auth.group': {
+ 'Meta': {'object_name': 'Group'},
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
+ 'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
+ },
+ 'auth.permission': {
+ 'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
+ 'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
+ },
+ 'auth.user': {
+ 'Meta': {'object_name': 'User'},
+ 'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+ 'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
+ 'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+ 'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
+ 'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+ 'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+ 'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
+ 'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
+ },
+ 'contenttypes.contenttype': {
+ 'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
+ 'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
+ },
+ 'dashboard_app.bundle': {
+ 'Meta': {'ordering': "['-uploaded_on']", 'object_name': 'Bundle'},
+ '_gz_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'gz_content'"}),
+ '_raw_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'content'"}),
+ 'bundle_stream': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'bundles'", 'to': "orm['dashboard_app.BundleStream']"}),
+ 'content_filename': ('django.db.models.fields.CharField', [], {'max_length': '256'}),
+ 'content_sha1': ('django.db.models.fields.CharField', [], {'max_length': '40', 'unique': 'True', 'null': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_deserialized': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'uploaded_by': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'uploaded_bundles'", 'null': 'True', 'to': "orm['auth.User']"}),
+ 'uploaded_on': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.utcnow'})
+ },
+ 'dashboard_app.bundlestream': {
+ 'Meta': {'object_name': 'BundleStream'},
+ 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_anonymous': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+ 'pathname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '128'}),
+ 'slug': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+ },
+ 'lava_scheduler_app.device': {
+ 'Meta': {'object_name': 'Device'},
+ 'current_job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'on_delete': 'models.SET_NULL', 'to': "orm['lava_scheduler_app.TestJob']", 'blank': 'True', 'unique': 'True'}),
+ 'device_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.DeviceType']"}),
+ 'device_version': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '200', 'null': 'True', 'blank': 'True'}),
+ 'health_status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+ 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '200', 'primary_key': 'True'}),
+ 'last_health_report_job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'on_delete': 'models.SET_NULL', 'to': "orm['lava_scheduler_app.TestJob']", 'blank': 'True', 'unique': 'True'}),
+ 'status': ('django.db.models.fields.IntegerField', [], {'default': '1'}),
+ 'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'})
+ },
+ 'lava_scheduler_app.devicestatetransition': {
+ 'Meta': {'object_name': 'DeviceStateTransition'},
+ 'created_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+ 'device': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'transitions'", 'to': "orm['lava_scheduler_app.Device']"}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'job': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.TestJob']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'message': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'new_state': ('django.db.models.fields.IntegerField', [], {}),
+ 'old_state': ('django.db.models.fields.IntegerField', [], {})
+ },
+ 'lava_scheduler_app.devicetype': {
+ 'Meta': {'object_name': 'DeviceType'},
+ 'display': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
+ 'health_check_job': ('django.db.models.fields.TextField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
+ 'name': ('django.db.models.fields.SlugField', [], {'max_length': '50', 'primary_key': 'True'})
+ },
+ 'lava_scheduler_app.jobfailuretag': {
+ 'Meta': {'object_name': 'JobFailureTag'},
+ 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '256'})
+ },
+ 'lava_scheduler_app.tag': {
+ 'Meta': {'object_name': 'Tag'},
+ 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.SlugField', [], {'unique': 'True', 'max_length': '50'})
+ },
+ 'lava_scheduler_app.testjob': {
+ 'Meta': {'object_name': 'TestJob'},
+ '_results_bundle': ('django.db.models.fields.related.OneToOneField', [], {'null': 'True', 'db_column': "'results_bundle_id'", 'on_delete': 'models.SET_NULL', 'to': "orm['dashboard_app.Bundle']", 'blank': 'True', 'unique': 'True'}),
+ '_results_link': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '400', 'null': 'True', 'db_column': "'results_link'", 'blank': 'True'}),
+ 'actual_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+ 'definition': ('django.db.models.fields.TextField', [], {}),
+ 'description': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '200', 'null': 'True', 'blank': 'True'}),
+ 'end_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+ 'failure_comment': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'failure_tags': ('django.db.models.fields.related.ManyToManyField', [], {'symmetrical': 'False', 'related_name': "'failure_tags'", 'blank': 'True', 'to': "orm['lava_scheduler_app.JobFailureTag']"}),
+ 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+ 'health_check': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'log_file': ('django.db.models.fields.files.FileField', [], {'default': 'None', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
+ 'priority': ('django.db.models.fields.IntegerField', [], {'default': '50'}),
+ 'requested_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+ 'requested_device_type': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.DeviceType']"}),
+ 'start_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+ 'status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+ 'sub_id': ('django.db.models.fields.CharField', [], {'max_length': '200', 'blank': 'True'}),
+ 'submit_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+ 'submit_token': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['linaro_django_xmlrpc.AuthToken']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'submitter': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'to': "orm['auth.User']"}),
+ 'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'target_group': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '64', 'null': 'True', 'blank': 'True'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+ },
+ 'linaro_django_xmlrpc.authtoken': {
+ 'Meta': {'object_name': 'AuthToken'},
+ 'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
+ 'description': ('django.db.models.fields.TextField', [], {'default': "''", 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'last_used_on': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
+ 'secret': ('django.db.models.fields.CharField', [], {'default': "'7rf4239t35kqjrcixn4srgw00r61ncuq51jna0d6xbwpg2ur2annw5y1gkr9yt6ys9gh06b3wtcum4j0f2pdn5crul72mu1e1tw4at9jfgwk18asogkgoqcbc20ftylx'", 'unique': 'True', 'max_length': '128'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'auth_tokens'", 'to': "orm['auth.User']"})
+ }
+ }
+
+ complete_apps = ['lava_scheduler_app']
=== added file 'lava_scheduler_app/migrations/0031_auto__add_field_testjob_multinode_definition.py'
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+import datetime
+from south.db import db
+from south.v2 import SchemaMigration
+from django.db import models
+
+
+class Migration(SchemaMigration):
+
+ def forwards(self, orm):
+ # Adding field 'TestJob.multinode_definition'
+ db.add_column('lava_scheduler_app_testjob', 'multinode_definition',
+ self.gf('django.db.models.fields.TextField')(default='', blank=True),
+ keep_default=False)
+
+
+ def backwards(self, orm):
+ # Deleting field 'TestJob.multinode_definition'
+ db.delete_column('lava_scheduler_app_testjob', 'multinode_definition')
+
+
+ models = {
+ 'auth.group': {
+ 'Meta': {'object_name': 'Group'},
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
+ 'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
+ },
+ 'auth.permission': {
+ 'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
+ 'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
+ },
+ 'auth.user': {
+ 'Meta': {'object_name': 'User'},
+ 'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+ 'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
+ 'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+ 'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
+ 'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+ 'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+ 'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
+ 'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
+ },
+ 'contenttypes.contenttype': {
+ 'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
+ 'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
+ },
+ 'dashboard_app.bundle': {
+ 'Meta': {'ordering': "['-uploaded_on']", 'object_name': 'Bundle'},
+ '_gz_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'gz_content'"}),
+ '_raw_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'content'"}),
+ 'bundle_stream': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'bundles'", 'to': "orm['dashboard_app.BundleStream']"}),
+ 'content_filename': ('django.db.models.fields.CharField', [], {'max_length': '256'}),
+ 'content_sha1': ('django.db.models.fields.CharField', [], {'max_length': '40', 'unique': 'True', 'null': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_deserialized': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'uploaded_by': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'uploaded_bundles'", 'null': 'True', 'to': "orm['auth.User']"}),
+ 'uploaded_on': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.utcnow'})
+ },
+ 'dashboard_app.bundlestream': {
+ 'Meta': {'object_name': 'BundleStream'},
+ 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_anonymous': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'name': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+ 'pathname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '128'}),
+ 'slug': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+ },
+ 'lava_scheduler_app.device': {
+ 'Meta': {'object_name': 'Device'},
+ 'current_job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'on_delete': 'models.SET_NULL', 'to': "orm['lava_scheduler_app.TestJob']", 'blank': 'True', 'unique': 'True'}),
+ 'device_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.DeviceType']"}),
+ 'device_version': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '200', 'null': 'True', 'blank': 'True'}),
+ 'health_status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+ 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '200', 'primary_key': 'True'}),
+ 'last_health_report_job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'on_delete': 'models.SET_NULL', 'to': "orm['lava_scheduler_app.TestJob']", 'blank': 'True', 'unique': 'True'}),
+ 'status': ('django.db.models.fields.IntegerField', [], {'default': '1'}),
+ 'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'})
+ },
+ 'lava_scheduler_app.devicestatetransition': {
+ 'Meta': {'object_name': 'DeviceStateTransition'},
+ 'created_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+ 'device': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'transitions'", 'to': "orm['lava_scheduler_app.Device']"}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'job': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.TestJob']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'message': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'new_state': ('django.db.models.fields.IntegerField', [], {}),
+ 'old_state': ('django.db.models.fields.IntegerField', [], {})
+ },
+ 'lava_scheduler_app.devicetype': {
+ 'Meta': {'object_name': 'DeviceType'},
+ 'display': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
+ 'health_check_job': ('django.db.models.fields.TextField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
+ 'name': ('django.db.models.fields.SlugField', [], {'max_length': '50', 'primary_key': 'True'})
+ },
+ 'lava_scheduler_app.jobfailuretag': {
+ 'Meta': {'object_name': 'JobFailureTag'},
+ 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '256'})
+ },
+ 'lava_scheduler_app.tag': {
+ 'Meta': {'object_name': 'Tag'},
+ 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.SlugField', [], {'unique': 'True', 'max_length': '50'})
+ },
+ 'lava_scheduler_app.testjob': {
+ 'Meta': {'object_name': 'TestJob'},
+ '_results_bundle': ('django.db.models.fields.related.OneToOneField', [], {'null': 'True', 'db_column': "'results_bundle_id'", 'on_delete': 'models.SET_NULL', 'to': "orm['dashboard_app.Bundle']", 'blank': 'True', 'unique': 'True'}),
+ '_results_link': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '400', 'null': 'True', 'db_column': "'results_link'", 'blank': 'True'}),
+ 'actual_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+ 'definition': ('django.db.models.fields.TextField', [], {}),
+ 'description': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '200', 'null': 'True', 'blank': 'True'}),
+ 'end_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+ 'failure_comment': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+ 'failure_tags': ('django.db.models.fields.related.ManyToManyField', [], {'symmetrical': 'False', 'related_name': "'failure_tags'", 'blank': 'True', 'to': "orm['lava_scheduler_app.JobFailureTag']"}),
+ 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+ 'health_check': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+ 'log_file': ('django.db.models.fields.files.FileField', [], {'default': 'None', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
+ 'multinode_definition': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
+ 'priority': ('django.db.models.fields.IntegerField', [], {'default': '50'}),
+ 'requested_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+ 'requested_device_type': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.DeviceType']"}),
+ 'start_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+ 'status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+ 'sub_id': ('django.db.models.fields.CharField', [], {'max_length': '200', 'blank': 'True'}),
+ 'submit_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+ 'submit_token': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['linaro_django_xmlrpc.AuthToken']", 'null': 'True', 'on_delete': 'models.SET_NULL', 'blank': 'True'}),
+ 'submitter': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'to': "orm['auth.User']"}),
+ 'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'}),
+ 'target_group': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '64', 'null': 'True', 'blank': 'True'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+ },
+ 'linaro_django_xmlrpc.authtoken': {
+ 'Meta': {'object_name': 'AuthToken'},
+ 'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
+ 'description': ('django.db.models.fields.TextField', [], {'default': "''", 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'last_used_on': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
+ 'secret': ('django.db.models.fields.CharField', [], {'default': "'g4fgt7t5qdghq3qo3t3h5dhbj6fes2zh8n6lkncc0u0rcqxy0kaez7aacw05nc0oxjc3060pj0f1fsunjpo1btk6urfpt8xfmgefcatgmh1e7kj0ams90ikni05sd5qk'", 'unique': 'True', 'max_length': '128'}),
+ 'user': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'auth_tokens'", 'to': "orm['auth.User']"})
+ }
+ }
+
+ complete_apps = ['lava_scheduler_app']
\ No newline at end of file
=== modified file 'lava_scheduler_app/models.py'
@@ -1,5 +1,6 @@
import logging
import os
+import uuid
import simplejson
import urlparse
@@ -18,6 +19,7 @@
from dashboard_app.models import Bundle, BundleStream
from lava_dispatcher.job import validate_job_data
+from lava_scheduler_app import utils
from linaro_django_xmlrpc.models import AuthToken
@@ -26,6 +28,10 @@
"""Error raised when JSON is syntactically valid but ill-formed."""
+class DevicesUnavailableException(UserWarning):
+ """Error raised when required number of devices are unavailable."""
+
+
class Tag(models.Model):
name = models.SlugField(unique=True)
@@ -44,6 +50,38 @@
raise ValidationError(e)
+def check_device_availability(requested_devices):
+ """Checks whether the number of devices requested is available.
+
+ See utils.requested_device_count() for details of REQUESTED_DEVICES
+ dictionary format.
+
+ Returns True if the requested number of devices are available, else
+ raises DevicesUnavailableException.
+ """
+ device_types = DeviceType.objects.values_list('name').filter(
+ models.Q(device__status=Device.IDLE) | \
+ models.Q(device__status=Device.RUNNING)
+ ).annotate(
+ num_count=models.Count('name')
+ ).order_by('name')
+
+ if requested_devices:
+ all_devices = {}
+ for dt in device_types:
+ # dt[0] -> device type name
+ # dt[1] -> device type count
+ all_devices[dt[0]] = dt[1]
+
+ for board, count in requested_devices.iteritems():
+ if all_devices.get(board, None) and count <= all_devices[board]:
+ continue
+ else:
+ raise DevicesUnavailableException(
+ "Required number of device(s) unavailable.")
+ return True
+
+
class DeviceType(models.Model):
"""
A class of device, for example a pandaboard or a snowball.
@@ -245,6 +283,20 @@
id = models.AutoField(primary_key=True)
+ sub_id = models.CharField(
+ verbose_name=_(u"Sub ID"),
+ blank=True,
+ max_length=200
+ )
+
+ target_group = models.CharField(
+ verbose_name=_(u"Target Group"),
+ blank=True,
+ max_length=64,
+ null=True,
+ default=None
+ )
+
submitter = models.ForeignKey(
User,
verbose_name=_(u"Submitter"),
@@ -320,6 +372,11 @@
editable=False,
)
+ multinode_definition = models.TextField(
+ editable=False,
+ blank=True
+ )
+
log_file = models.FileField(
upload_to='lava-logs', default=None, null=True, blank=True)
@@ -386,17 +443,34 @@
@classmethod
def from_json_and_user(cls, json_data, user, health_check=False):
+ requested_devices = utils.requested_device_count(json_data)
+ check_device_availability(requested_devices)
job_data = simplejson.loads(json_data)
validate_job_data(job_data)
+
+ # Validate job, for parameters, specific to multinode that has been
+ # input by the user. These parameters are reserved by LAVA and
+ # generated during job submissions.
+ reserved_job_params = ["group_size", "role", "sub_id", "target_group"]
+ reserved_params_found = set(reserved_job_params).intersection(
+ set(job_data.keys()))
+ if reserved_params_found:
+ raise JSONDataError("Reserved parameters found in job data %s" %
+ str([x for x in reserved_params_found]))
+
if 'target' in job_data:
target = Device.objects.get(hostname=job_data['target'])
device_type = None
elif 'device_type' in job_data:
target = None
device_type = DeviceType.objects.get(name=job_data['device_type'])
+ elif 'device_group' in job_data:
+ target = None
+ device_type = None
else:
raise JSONDataError(
- "Neither 'target' nor 'device_type' found in job data.")
+ "No 'target' or 'device_type' or 'device_group' are found "
+ "in job data.")
priorities = dict([(j.upper(), i) for i, j in cls.PRIORITY_CHOICES])
priority = cls.MEDIUM
@@ -449,6 +523,7 @@
bundle_stream.is_public)
server = action['parameters']['server']
parsed_server = urlparse.urlsplit(server)
+ action["parameters"]["server"] = utils.rewrite_hostname(server)
if parsed_server.hostname is None:
raise ValueError("invalid server: %s" % server)
@@ -458,15 +533,49 @@
tags.append(Tag.objects.get(name=tag_name))
except Tag.DoesNotExist:
raise JSONDataError("tag %r does not exist" % tag_name)
- job = TestJob(
- definition=json_data, submitter=submitter,
- requested_device=target, requested_device_type=device_type,
- description=job_name, health_check=health_check, user=user,
- group=group, is_public=is_public, priority=priority)
- job.save()
- for tag in tags:
- job.tags.add(tag)
- return job
+
+ if 'device_group' in job_data:
+ target_group = str(uuid.uuid4())
+ node_json = utils.split_multi_job(job_data, target_group)
+ job_list = []
+ try:
+ parent_id = (TestJob.objects.latest('id')).id + 1
+ except:
+ parent_id = 1
+ child_id = 0
+
+ for role in node_json:
+ role_count = len(node_json[role])
+ for c in range(0, role_count):
+ device_type = DeviceType.objects.get(
+ name=node_json[role][c]["device_type"])
+ sub_id = '.'.join([str(parent_id), str(child_id)])
+
+ # Add sub_id to the generated job dictionary.
+ node_json[role][c]["sub_id"] = sub_id
+
+ job = TestJob(
+ sub_id=sub_id, submitter=submitter,
+ requested_device=target, description=job_name,
+ requested_device_type=device_type,
+ definition=simplejson.dumps(node_json[role][c]),
+ multinode_definition=json_data,
+ health_check=health_check, user=user, group=group,
+ is_public=is_public, priority=priority,
+ target_group=target_group)
+ job.save()
+ job_list.append(sub_id)
+ child_id += 1
+ return job_list
+
+ else:
+ job = TestJob(
+ definition=simplejson.dumps(job_data), submitter=submitter,
+ requested_device=target, requested_device_type=device_type,
+ description=job_name, health_check=health_check, user=user,
+ group=group, is_public=is_public, priority=priority)
+ job.save()
+ return job
def _can_admin(self, user):
""" used to check for things like if the user can cancel or annotate
@@ -529,6 +638,22 @@
"LAVA job notification: " + description, mail,
settings.SERVER_EMAIL, recipients)
+ @property
+ def sub_jobs_list(self):
+ if self.is_multinode:
+ jobs = TestJob.objects.filter(
+ target_group=self.target_group).order_by('id')
+ return jobs
+ else:
+ return None
+
+ @property
+ def is_multinode(self):
+ if self.target_group:
+ return True
+ else:
+ return False
+
class DeviceStateTransition(models.Model):
created_on = models.DateTimeField(auto_now_add=True)
=== modified file 'lava_scheduler_app/templates/lava_scheduler_app/job_definition.html'
@@ -10,7 +10,7 @@
{% endblock %}
{% block content %}
-<h2>Job defintion file - {{ job.id }} </h2>
+<h2>Job definition file - {{ job.id }} </h2>
<a href="{% url lava.scheduler.job.definition.plain job.pk %}">Download as text file</a>
<pre class="brush: js">{{ job.definition }}</pre>
=== modified file 'lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html'
@@ -62,6 +62,17 @@
<dt>Finished at:</dt>
<dd>{{ job.end_time|default:"not finished" }}</dd>
+
+ {% if job.is_multinode %}
+ <dt>Sub Jobs:</dt>
+ {% for subjob in job.sub_jobs_list %}
+ <dd>
+ <a href="{% url lava.scheduler.job.detail subjob.pk %}">
+ {{ subjob.sub_id }}</a>
+ </dd>
+ {% endfor %}
+ {% endif %}
+
</dl>
<h2>Views</h2>
<ul>
@@ -76,6 +87,11 @@
<li>
<a href="{% url lava.scheduler.job.definition job.pk %}">Definition</a>
</li>
+ {% if job.is_multinode %}
+ <li>
+ <a href="{% url lava.scheduler.job.multinode_definition job.pk %}"> Multinode Definition</a>
+ </li>
+ {% endif %}
{% if job.results_link %}
<li>
<a href="{{ job.results_link }}">Results Bundle</a>
=== modified file 'lava_scheduler_app/templates/lava_scheduler_app/job_submit.html'
@@ -31,6 +31,16 @@
To view the full job list click <a href="{{ list_url }}">here</a>.
</div>
+{% elif job_list %}
+{% url lava.scheduler.job.list as list_url %}
+<div id="job-success">Multinode Job submission successfull!
+<br>
+<br>
+Jobs with ID {{ job_list }}</a> has been created.
+<br>
+To view the full job list click <a href="{{ list_url }}">here</a>.
+</div>
+
{% else %}
{% if error %}
=== added file 'lava_scheduler_app/templates/lava_scheduler_app/multinode_job_definition.html'
@@ -0,0 +1,21 @@
+{% extends "lava_scheduler_app/job_sidebar.html" %}
+
+{% block extrahead %}
+{{ block.super }}
+<script type="text/javascript" src="{{ STATIC_URL }}lava_scheduler_app/js/shCore.js"></script>
+<script type="text/javascript" src="{{ STATIC_URL }}lava_scheduler_app/js/shBrushJScript.js"></script>
+
+<link href="{{ STATIC_URL }}lava_scheduler_app/css/shCore.css" rel="stylesheet" type="text/css" />
+<link href="{{ STATIC_URL }}lava_scheduler_app/css/shThemeDefault.css" rel="stylesheet" type="text/css" />
+{% endblock %}
+
+{% block content %}
+<h2>Multinode Job definition file - {{ job.sub_id }} </h2>
+<a href="{% url lava.scheduler.job.multinode_definition.plain job.pk %}">Download as text file</a>
+<pre class="brush: js">{{ job.multinode_definition }}</pre>
+
+<script type="text/javascript">
+ SyntaxHighlighter.all()
+</script>
+
+{% endblock %}
=== modified file 'lava_scheduler_app/urls.py'
@@ -81,6 +81,12 @@
url(r'^job/(?P<pk>[0-9]+)/definition/plain$',
'job_definition_plain',
name='lava.scheduler.job.definition.plain'),
+ url(r'^job/(?P<pk>[0-9]+)/multinode_definition$',
+ 'multinode_job_definition',
+ name='lava.scheduler.job.multinode_definition'),
+ url(r'^job/(?P<pk>[0-9]+)/multinode_definition/plain$',
+ 'multinode_job_definition_plain',
+ name='lava.scheduler.job.multinode_definition.plain'),
url(r'^job/(?P<pk>[0-9]+)/log_file$',
'job_log_file',
name='lava.scheduler.job.log_file'),
=== added file 'lava_scheduler_app/utils.py'
@@ -0,0 +1,117 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Neil Williams <neil.williams@linaro.org>
+# Senthil Kumaran <senthil.kumaran@linaro.org>
+#
+# This file is part of LAVA Scheduler.
+#
+# LAVA Scheduler is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License version 3 as
+# published by the Free Software Foundation
+#
+# LAVA Scheduler is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import copy
+import socket
+import urlparse
+import simplejson
+
+
+def rewrite_hostname(result_url):
+ """If URL has hostname value as localhost/127.0.0.*, change it to the
+ actual server FQDN.
+
+ Returns the RESULT_URL (string) re-written with hostname.
+
+ See https://cards.linaro.org/browse/LAVA-611
+ """
+ host = urlparse.urlparse(result_url).netloc
+ if host == "localhost":
+ result_url = result_url.replace("localhost", socket.getfqdn())
+ elif host.startswith("127.0.0"):
+ ip_pat = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
+ result_url = re.sub(ip_pat, socket.getfqdn(), result_url)
+ return result_url
+
+
+def split_multi_job(json_jobdata, target_group):
+ node_json = {}
+ all_nodes = {}
+ node_actions = {}
+
+ # Check if we are operating on multinode job data. Else return the job
+ # data as it is.
+ if "device_group" in json_jobdata and target_group:
+ pass
+ else:
+ return json_jobdata
+
+ # get all the roles and create node action list for each role.
+ for group in json_jobdata["device_group"]:
+ node_actions[group["role"]] = []
+
+ # Take each action and assign it to proper roles. If roles are not
+ # specified for a specific action, then assign it to all the roles.
+ all_actions = json_jobdata["actions"]
+ for role in node_actions.keys():
+ for action in all_actions:
+ new_action = copy.deepcopy(action)
+ if 'parameters' in new_action \
+ and 'role' in new_action["parameters"]:
+ if new_action["parameters"]["role"] == role:
+ new_action["parameters"].pop('role', None)
+ node_actions[role].append(new_action)
+ else:
+ node_actions[role].append(new_action)
+
+ group_count = 0
+ for clients in json_jobdata["device_group"]:
+ group_count += int(clients["count"])
+ for clients in json_jobdata["device_group"]:
+ role = str(clients["role"])
+ count = int(clients["count"])
+ node_json[role] = []
+ for c in range(0, count):
+ node_json[role].append({})
+ node_json[role][c]["timeout"] = json_jobdata["timeout"]
+ node_json[role][c]["job_name"] = json_jobdata["job_name"]
+ node_json[role][c]["tags"] = clients["tags"]
+ node_json[role][c]["group_size"] = group_count
+ node_json[role][c]["target_group"] = target_group
+ node_json[role][c]["actions"] = node_actions[role]
+
+ node_json[role][c]["role"] = role
+ # multinode node stage 2
+ node_json[role][c]["logging_level"] = json_jobdata["logging_level"]
+ node_json[role][c]["device_type"] = clients["device_type"]
+
+ return node_json
+
+
+def requested_device_count(json_data):
+ """Utility function check the requested number of devices for each
+ device_type in a multinode job.
+
+ JSON_DATA is the job definition string.
+
+ Returns requested_device which is a dictionary of the following format:
+
+ {'kvm': 1, 'qemu': 3, 'panda': 1}
+
+ If the job is not a multinode job, then return an empty dictionary.
+ """
+ job_data = simplejson.loads(json_data)
+ requested_devices = {}
+ if 'device_group' in job_data:
+ for device_group in job_data['device_group']:
+ device_type = device_group['device_type']
+ count = device_group['count']
+ requested_devices[device_type] = count
+ return requested_devices
=== modified file 'lava_scheduler_app/views.py'
@@ -51,6 +51,7 @@
DeviceType,
DeviceStateTransition,
TestJob,
+ JSONDataError,
validate_job_json,
)
@@ -74,10 +75,16 @@
def pklink(record):
+ job_id = record.pk
+ try:
+ if record.sub_id:
+ job_id = record.sub_id
+ except:
+ pass
return mark_safe(
'<a href="%s">%s</a>' % (
record.get_absolute_url(),
- escape(record.pk)))
+ escape(job_id)))
class IDLinkColumn(Column):
@@ -100,11 +107,11 @@
def all_jobs_with_device_sort():
- return TestJob.objects.select_related(
- "actual_device", "requested_device", "requested_device_type",
- "submitter", "user", "group")\
- .extra(select={'device_sort': 'coalesce(actual_device_id, requested_device_id, '
- 'requested_device_type_id)'}).all()
+ jobs = TestJob.objects.select_related("actual_device", "requested_device",
+ "requested_device_type", "submitter", "user", "group")\
+ .extra(select={'device_sort': 'coalesce(actual_device_id, '
+ 'requested_device_id, requested_device_type_id)'}).all()
+ return jobs.order_by('submit_time')
class JobTable(DataTablesTable):
@@ -124,7 +131,7 @@
else:
return ''
- id = RestrictedIDLinkColumn()
+ sub_id = RestrictedIDLinkColumn()
status = Column()
priority = Column()
device = Column(accessor='device_sort')
@@ -135,7 +142,7 @@
duration = Column()
datatable_opts = {
- 'aaSorting': [[0, 'desc']],
+ 'aaSorting': [[6, 'desc']],
}
searchable_columns = ['description']
@@ -296,6 +303,10 @@
class Meta:
exclude = ('status', 'submitter', 'end_time', 'priority', 'description')
+ datatable_opts = {
+ 'aaSorting': [[2, 'desc']],
+ }
+
def failed_jobs_json(request):
return FailedJobTable.json(request, params=(request,))
@@ -499,6 +510,10 @@
class Meta:
exclude = ('description', 'device')
+ datatable_opts = {
+ 'aaSorting': [[4, 'desc']],
+ }
+
def health_jobs_json(request, pk):
device = get_object_or_404(Device, pk=pk)
@@ -582,12 +597,15 @@
job = TestJob.from_json_and_user(
request.POST.get("json-input"), request.user)
- response_data["job_id"] = job.id
+ if isinstance(job, type(list())):
+ response_data["job_list"] = job
+ else:
+ response_data["job_id"] = job.id
return render_to_response(
"lava_scheduler_app/job_submit.html",
response_data, RequestContext(request))
- except Exception as e:
+ except (JSONDataError, ValueError) as e:
response_data["error"] = str(e)
response_data["json_input"] = request.POST.get("json-input")
return render_to_response(
@@ -664,6 +682,25 @@
return response
+def multinode_job_definition(request, pk):
+ job = get_restricted_job(request.user, pk)
+ log_file = job.output_file()
+ return render_to_response(
+ "lava_scheduler_app/multinode_job_definition.html",
+ {
+ 'job': job,
+ 'job_file_present': bool(log_file),
+ },
+ RequestContext(request))
+
+
+def multinode_job_definition_plain(request, pk):
+ job = get_restricted_job(request.user, pk)
+ response = HttpResponse(job.multinode_definition, mimetype='text/plain')
+ response['Content-Disposition'] = "attachment; filename=multinode_job_%d.json" % job.id
+ return response
+
+
@BreadCrumb("Complete log", parent=job_detail, needs=['pk'])
def job_log_file(request, pk):
job = get_restricted_job(request.user, pk)
@@ -764,7 +801,13 @@
def job_cancel(request, pk):
job = get_restricted_job(request.user, pk)
if job.can_cancel(request.user):
- job.cancel()
+ if job.is_multinode:
+ multinode_jobs = TestJob.objects.all().filter(
+ target_group=job.target_group)
+ for multinode_job in multinode_jobs:
+ multinode_job.cancel()
+ else:
+ job.cancel()
return redirect(job)
else:
return HttpResponseForbidden(
@@ -773,11 +816,38 @@
@post_only
def job_resubmit(request, pk):
+
+ response_data = {
+ 'is_authorized': False,
+ 'bread_crumb_trail': BreadCrumbTrail.leading_to(job_list),
+ }
+
job = get_restricted_job(request.user, pk)
if job.can_resubmit(request.user):
- definition = job.definition
- job = TestJob.from_json_and_user(definition, request.user)
- return redirect(job)
+ response_data["is_authorized"] = True
+
+ if job.is_multinode:
+ definition = job.multinode_definition
+ else:
+ definition = job.definition
+
+ try:
+ job = TestJob.from_json_and_user(definition, request.user)
+
+ if isinstance(job, type(list())):
+ response_data["job_list"] = job
+ return render_to_response(
+ "lava_scheduler_app/job_submit.html",
+ response_data, RequestContext(request))
+ else:
+ return redirect(job)
+ except Exception as e:
+ response_data["error"] = str(e)
+ response_data["json_input"] = definition
+ return render_to_response(
+ "lava_scheduler_app/job_submit.html",
+ response_data, RequestContext(request))
+
else:
return HttpResponseForbidden(
"you cannot re-submit this job", content_type="text/plain")
=== removed file 'lava_scheduler_daemon/board.py'
@@ -1,355 +0,0 @@
-import json
-import os
-import signal
-import tempfile
-import logging
-
-from twisted.internet.error import ProcessDone, ProcessExitedAlready
-from twisted.internet.protocol import ProcessProtocol
-from twisted.internet import defer, task
-
-
-def catchall_errback(logger):
- def eb(failure):
- logger.error(
- '%s: %s\n%s', failure.type.__name__, failure.value,
- failure.getTraceback())
- return eb
-
-
-class DispatcherProcessProtocol(ProcessProtocol):
-
- def __init__(self, deferred, job):
- self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
- self.deferred = deferred
- self.log_size = 0
- self.job = job
-
- def childDataReceived(self, childFD, data):
- self.log_size += len(data)
- if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
- if not self.job._killing:
- self.job.cancel("exceeded log size limit")
-
- def childConnectionLost(self, childFD):
- self.logger.info("childConnectionLost for %s: %s",
- self.job.board_name, childFD)
-
- def processExited(self, reason):
- self.logger.info("processExited for %s: %s",
- self.job.board_name, reason.value)
-
- def processEnded(self, reason):
- self.logger.info("processEnded for %s: %s",
- self.job.board_name, reason.value)
- self.deferred.callback(reason.value.exitCode)
-
-
-class Job(object):
-
- def __init__(self, job_data, dispatcher, source, board_name, reactor,
- daemon_options):
- self.job_data = job_data
- self.dispatcher = dispatcher
- self.source = source
- self.board_name = board_name
- self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
- self.reactor = reactor
- self.daemon_options = daemon_options
- self._json_file = None
- self._source_lock = defer.DeferredLock()
- self._checkCancel_call = task.LoopingCall(self._checkCancel)
- self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
- self._time_limit_call = None
- self._killing = False
- self._kill_reason = ''
-
- def _checkCancel(self):
- if self._killing:
- self.cancel()
- else:
- return self._source_lock.run(
- self.source.jobCheckForCancellation,
- self.board_name).addCallback(self._maybeCancel)
-
- def cancel(self, reason=None):
- if not self._killing:
- if reason is None:
- reason = "killing job for unknown reason"
- self._kill_reason = reason
- self.logger.info(reason)
- self._killing = True
- if self._signals:
- signame = self._signals.pop(0)
- else:
- self.logger.warning("self._signals is empty!")
- signame = 'SIGKILL'
- self.logger.info(
- 'attempting to kill job with signal %s' % signame)
- try:
- self._protocol.transport.signalProcess(getattr(signal, signame))
- except ProcessExitedAlready:
- pass
-
- def _maybeCancel(self, cancel):
- if cancel:
- self.cancel("killing job by user request")
- else:
- logging.debug('not cancelling')
-
- def _time_limit_exceeded(self):
- self._time_limit_call = None
- self.cancel("killing job for exceeding timeout")
-
- def run(self):
- d = self.source.getOutputDirForJobOnBoard(self.board_name)
- return d.addCallback(self._run).addErrback(
- catchall_errback(self.logger))
-
- def _run(self, output_dir):
- d = defer.Deferred()
- json_data = self.job_data
- fd, self._json_file = tempfile.mkstemp()
- with os.fdopen(fd, 'wb') as f:
- json.dump(json_data, f)
- self._protocol = DispatcherProcessProtocol(d, self)
- self.reactor.spawnProcess(
- self._protocol, self.dispatcher, args=[
- self.dispatcher, self._json_file, '--output-dir', output_dir],
- childFDs={0: 0, 1: 'r', 2: 'r'}, env=None)
- self._checkCancel_call.start(10)
- timeout = max(
- json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
- self._time_limit_call = self.reactor.callLater(
- timeout, self._time_limit_exceeded)
- d.addBoth(self._exited)
- return d
-
- def _exited(self, exit_code):
- self.logger.info("job finished on %s", self.job_data['target'])
- if self._json_file is not None:
- os.unlink(self._json_file)
- self.logger.info("reporting job completed")
- if self._time_limit_call is not None:
- self._time_limit_call.cancel()
- self._checkCancel_call.stop()
- return self._source_lock.run(
- self.source.jobCompleted,
- self.board_name,
- exit_code,
- self._killing).addCallback(
- lambda r: exit_code)
-
-
-class SchedulerMonitorPP(ProcessProtocol):
-
- def __init__(self, d, board_name):
- self.d = d
- self.board_name = board_name
- self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP')
-
- def childDataReceived(self, childFD, data):
- self.logger.warning(
- "scheduler monitor for %s produced output: %r on fd %s",
- self.board_name, data, childFD)
-
- def processEnded(self, reason):
- if not reason.check(ProcessDone):
- self.logger.error(
- "scheduler monitor for %s crashed: %s",
- self.board_name, reason)
- self.d.callback(None)
-
-
-class MonitorJob(object):
-
- def __init__(self, job_data, dispatcher, source, board_name, reactor,
- daemon_options):
- self.logger = logging.getLogger(__name__ + '.MonitorJob')
- self.job_data = job_data
- self.dispatcher = dispatcher
- self.source = source
- self.board_name = board_name
- self.reactor = reactor
- self.daemon_options = daemon_options
- self._json_file = None
-
- def run(self):
- d = defer.Deferred()
- json_data = self.job_data
- fd, self._json_file = tempfile.mkstemp()
- with os.fdopen(fd, 'wb') as f:
- json.dump(json_data, f)
-
- childFDs = {0: 0, 1: 1, 2: 2}
- args = [
- 'setsid', 'lava-server', 'manage', 'schedulermonitor',
- self.dispatcher, str(self.board_name), self._json_file,
- '-l', self.daemon_options['LOG_LEVEL']]
- if self.daemon_options['LOG_FILE_PATH']:
- args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
- childFDs = None
- self.logger.info('executing "%s"', ' '.join(args))
- self.reactor.spawnProcess(
- SchedulerMonitorPP(d, self.board_name), 'setsid',
- childFDs=childFDs, env=None, args=args)
- d.addBoth(self._exited)
- return d
-
- def _exited(self, result):
- if self._json_file is not None:
- os.unlink(self._json_file)
- return result
-
-
-class Board(object):
- """
- A board runs jobs. A board can be in four main states:
-
- * stopped (S)
- * the board is not looking for or processing jobs
- * checking (C)
- * a call to check for a new job is in progress
- * waiting (W)
- * no job was found by the last call to getJobForBoard and so the board
- is waiting for a while before calling again.
- * running (R)
- * a job is running (or a job has completed but the call to jobCompleted
- on the job source has not)
-
- In addition, because we can't stop a job instantly nor abort a check for a
- new job safely (because a if getJobForBoard returns a job, it has already
- been marked as started), there are variations on the 'checking' and
- 'running' states -- 'checking with stop requested' (C+S) and 'running with
- stop requested' (R+S). Even this is a little simplistic as there is the
- possibility of .start() being called before the process of stopping
- completes, but we deal with this by deferring any actions taken by
- .start() until the board is really stopped.
-
- Events that cause state transitions are:
-
- * start() is called. We cheat and pretend that this can only happen in
- the stopped state by stopping first, and then move into the C state.
-
- * stop() is called. If we in the C or R state we move to C+S or R+S
- resepectively. If we are in S, C+S or R+S, we stay there. If we are
- in W, we just move straight to S.
-
- * getJobForBoard() returns a job. We can only be in C or C+S here, and
- move into R or R+S respectively.
-
- * getJobForBoard() indicates that there is no job to perform. Again we
- can only be in C or C+S and move into W or S respectively.
-
- * a job completes (i.e. the call to jobCompleted() on the source
- returns). We can only be in R or R+S and move to C or S respectively.
-
- * the timer that being in state W implies expires. We move into C.
-
- The cheating around start means that interleaving start and stop calls may
- not always do what you expect. So don't mess around in that way please.
- """
-
- job_cls = MonitorJob
-
- def __init__(self, source, board_name, dispatcher, reactor, daemon_options,
- job_cls=None):
- self.source = source
- self.board_name = board_name
- self.dispatcher = dispatcher
- self.reactor = reactor
- self.daemon_options = daemon_options
- if job_cls is not None:
- self.job_cls = job_cls
- self.running_job = None
- self._check_call = None
- self._stopping_deferreds = []
- self.logger = logging.getLogger(__name__ + '.Board.' + board_name)
- self.checking = False
-
- def _state_name(self):
- if self.running_job:
- state = "R"
- elif self._check_call:
- assert not self._stopping_deferreds
- state = "W"
- elif self.checking:
- state = "C"
- else:
- assert not self._stopping_deferreds
- state = "S"
- if self._stopping_deferreds:
- state += "+S"
- return state
-
- def start(self):
- self.logger.debug("start requested")
- self.stop().addCallback(self._start)
-
- def _start(self, ignored):
- self.logger.debug("starting")
- self._stopping_deferreds = []
- self._checkForJob()
-
- def stop(self):
- self.logger.debug("stopping")
- if self._check_call is not None:
- self._check_call.cancel()
- self._check_call = None
-
- if self.running_job is not None or self.checking:
- self.logger.debug("job running; deferring stop")
- self._stopping_deferreds.append(defer.Deferred())
- return self._stopping_deferreds[-1]
- else:
- self.logger.debug("stopping immediately")
- return defer.succeed(None)
-
- def _checkForJob(self):
- self.logger.debug("checking for job")
- self._check_call = None
- self.checking = True
- self.source.getJobForBoard(self.board_name).addCallbacks(
- self._maybeStartJob, self._ebCheckForJob)
-
- def _ebCheckForJob(self, result):
- self.logger.error(
- '%s: %s\n%s', result.type.__name__, result.value,
- result.getTraceback())
- self._maybeStartJob(None)
-
- def _finish_stop(self):
- self.logger.debug(
- "calling %s deferreds returned from stop()",
- len(self._stopping_deferreds))
- for d in self._stopping_deferreds:
- d.callback(None)
- self._stopping_deferreds = []
-
- def _maybeStartJob(self, job_data):
- self.checking = False
- if job_data is None:
- self.logger.debug("no job found")
- if self._stopping_deferreds:
- self._finish_stop()
- else:
- self._check_call = self.reactor.callLater(
- 10, self._checkForJob)
- return
- self.logger.info("starting job %r", job_data)
- self.running_job = self.job_cls(
- job_data, self.dispatcher, self.source, self.board_name,
- self.reactor, self.daemon_options)
- d = self.running_job.run()
- d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
-
- def _ebJobFinished(self, result):
- self.logger.exception(result.value)
- self._checkForJob()
-
- def _cbJobFinished(self, result):
- self.running_job = None
- if self._stopping_deferreds:
- self._finish_stop()
- else:
- self._checkForJob()
=== modified file 'lava_scheduler_daemon/dbjobsource.py'
@@ -3,6 +3,7 @@
import os
import shutil
import urlparse
+import copy
from dashboard_app.models import Bundle
@@ -92,21 +93,135 @@
transaction.leave_transaction_management()
return self.deferToThread(wrapper, *args, **kw)
- def getBoardList_impl(self):
+ def _get_health_check_jobs(self):
+ """Gets the list of configured boards and checks which are the boards
+ that require health check.
+
+ Returns JOB_LIST which is a set of health check jobs. If no health
+ check jobs are available returns an empty set.
+ """
+ job_list = set()
configured_boards = [
x.hostname for x in dispatcher_config.get_devices()]
boards = []
for d in Device.objects.all():
if d.hostname in configured_boards:
- boards.append({'hostname': d.hostname})
- return boards
-
- def getBoardList(self):
- return self.deferForDB(self.getBoardList_impl)
+ boards.append(d)
+
+ for device in boards:
+ if device.status != Device.IDLE:
+ continue
+ if not device.device_type.health_check_job:
+ run_health_check = False
+ elif device.health_status == Device.HEALTH_UNKNOWN:
+ run_health_check = True
+ elif device.health_status == Device.HEALTH_LOOPING:
+ run_health_check = True
+ elif not device.last_health_report_job:
+ run_health_check = True
+ else:
+ run_health_check = device.last_health_report_job.end_time < \
+ datetime.datetime.now() - datetime.timedelta(days=1)
+ if run_health_check:
+ job_list.add(self._getHealthCheckJobForBoard(device))
+ return job_list
+
+ def _fix_device(self, device, job):
+ """Associate an available/idle DEVICE to the given JOB.
+
+ Returns the job with actual_device set to DEVICE.
+
+ If we are unable to grab the DEVICE then we return None.
+ """
+ DeviceStateTransition.objects.create(
+ created_by=None, device=device, old_state=device.status,
+ new_state=Device.RUNNING, message=None, job=job).save()
+ device.status = Device.RUNNING
+ device.current_job = job
+ try:
+ # The unique constraint on current_job may cause this to
+ # fail in the case of concurrent requests for different
+ # boards grabbing the same job. If there are concurrent
+ # requests for the *same* board they may both return the
+ # same job -- this is an application level bug though.
+ device.save()
+ except IntegrityError:
+ self.logger.info(
+ "job %s has been assigned to another board -- rolling back",
+ job.id)
+ transaction.rollback()
+ return None
+ else:
+ job.actual_device = device
+ job.log_file.save(
+ 'job-%s.log' % job.id, ContentFile(''), save=False)
+ job.submit_token = AuthToken.objects.create(user=job.submitter)
+ job.definition = simplejson.dumps(self._get_json_data(job),
+ sort_keys=True,
+ indent=4 * ' ')
+ job.save()
+ transaction.commit()
+ return job
+
+ def getJobList_impl(self):
+ jobs = TestJob.objects.all().filter(
+ status=TestJob.SUBMITTED).order_by('-priority', 'submit_time')
+ job_list = self._get_health_check_jobs()
+ devices = None
+ configured_boards = [
+ x.hostname for x in dispatcher_config.get_devices()]
+ self.logger.debug("Number of configured_devices: %d" % len(configured_boards))
+ for job in jobs:
+ if job.actual_device:
+ job_list.add(job)
+ elif job.requested_device:
+ self.logger.debug("Checking Requested Device")
+ devices = Device.objects.all().filter(
+ hostname=job.requested_device.hostname,
+ status=Device.IDLE)
+ elif job.requested_device_type:
+ self.logger.debug("Checking Requested Device Type")
+ devices = Device.objects.all().filter(
+ device_type=job.requested_device_type,
+ status=Device.IDLE)
+ else:
+ continue
+ if devices:
+ for d in devices:
+ self.logger.debug("Checking %s" % d.hostname)
+ if d.hostname in configured_boards:
+ if job:
+ job = self._fix_device(d, job)
+ if job:
+ job_list.add(job)
+
+ # Remove scheduling multinode jobs until all the jobs in the
+ # target_group are assigned devices.
+ final_job_list = copy.deepcopy(job_list)
+ for job in job_list:
+ if job.is_multinode:
+ multinode_jobs = TestJob.objects.all().filter(
+ target_group=job.target_group)
+
+ jobs_with_device = 0
+ for multinode_job in multinode_jobs:
+ if multinode_job.actual_device:
+ jobs_with_device += 1
+
+ if len(multinode_jobs) != jobs_with_device:
+ final_job_list.difference_update(set(multinode_jobs))
+
+ return final_job_list
+
+ def getJobList(self):
+ return self.deferForDB(self.getJobList_impl)
def _get_json_data(self, job):
json_data = simplejson.loads(job.definition)
- json_data['target'] = job.actual_device.hostname
+ if job.actual_device:
+ json_data['target'] = job.actual_device.hostname
+ elif job.requested_device:
+ json_data['target'] = job.requested_device.hostname
for action in json_data['actions']:
if not action['command'].startswith('submit_results'):
continue
@@ -171,64 +286,19 @@
else:
return None
- def getJobForBoard_impl(self, board_name):
- while True:
- device = Device.objects.get(hostname=board_name)
- if device.status != Device.IDLE:
- return None
- if not device.device_type.health_check_job:
- run_health_check = False
- elif device.health_status == Device.HEALTH_UNKNOWN:
- run_health_check = True
- elif device.health_status == Device.HEALTH_LOOPING:
- run_health_check = True
- elif not device.last_health_report_job:
- run_health_check = True
- else:
- run_health_check = device.last_health_report_job.end_time < datetime.datetime.now() - datetime.timedelta(days=1)
- if run_health_check:
- job = self._getHealthCheckJobForBoard(device)
- else:
- job = self._getJobFromQueue(device)
- if job:
- DeviceStateTransition.objects.create(
- created_by=None, device=device, old_state=device.status,
- new_state=Device.RUNNING, message=None, job=job).save()
- job.status = TestJob.RUNNING
- job.start_time = datetime.datetime.utcnow()
- job.actual_device = device
- device.status = Device.RUNNING
- shutil.rmtree(job.output_dir, ignore_errors=True)
- device.current_job = job
- try:
- # The unique constraint on current_job may cause this to
- # fail in the case of concurrent requests for different
- # boards grabbing the same job. If there are concurrent
- # requests for the *same* board they may both return the
- # same job -- this is an application level bug though.
- device.save()
- except IntegrityError:
- self.logger.info(
- "job %s has been assigned to another board -- "
- "rolling back", job.id)
- transaction.rollback()
- continue
- else:
- job.log_file.save(
- 'job-%s.log' % job.id, ContentFile(''), save=False)
- job.submit_token = AuthToken.objects.create(user=job.submitter)
- job.save()
- json_data = self._get_json_data(job)
- transaction.commit()
- return json_data
- else:
- # _getHealthCheckJobForBoard can offline the board, so commit
- # in this branch too.
- transaction.commit()
- return None
+ def getJobDetails_impl(self, job):
+ job.status = TestJob.RUNNING
+ job.start_time = datetime.datetime.utcnow()
+ shutil.rmtree(job.output_dir, ignore_errors=True)
+ job.log_file.save('job-%s.log' % job.id, ContentFile(''), save=False)
+ job.submit_token = AuthToken.objects.create(user=job.submitter)
+ job.save()
+ json_data = self._get_json_data(job)
+ transaction.commit()
+ return json_data
- def getJobForBoard(self, board_name):
- return self.deferForDB(self.getJobForBoard_impl, board_name)
+ def getJobDetails(self, job):
+ return self.deferForDB(self.getJobDetails_impl, job)
def getOutputDirForJobOnBoard_impl(self, board_name):
device = Device.objects.get(hostname=board_name)
=== added file 'lava_scheduler_daemon/job.py'
@@ -0,0 +1,281 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Senthil Kumaran <senthil.kumaran@linaro.org>
+#
+# This file is part of LAVA Scheduler.
+#
+# LAVA Scheduler is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License version 3 as
+# published by the Free Software Foundation
+#
+# LAVA Scheduler is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import os
+import signal
+import tempfile
+import logging
+
+from twisted.internet.error import ProcessDone, ProcessExitedAlready
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet import defer, task
+
+
+def catchall_errback(logger):
+ def eb(failure):
+ logger.error(
+ '%s: %s\n%s', failure.type.__name__, failure.value,
+ failure.getTraceback())
+ return eb
+
+
+class DispatcherProcessProtocol(ProcessProtocol):
+
+ def __init__(self, deferred, job):
+ self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
+ self.deferred = deferred
+ self.log_size = 0
+ self.job = job
+
+ def childDataReceived(self, childFD, data):
+ self.log_size += len(data)
+ if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
+ if not self.job._killing:
+ self.job.cancel("exceeded log size limit")
+
+ def childConnectionLost(self, childFD):
+ self.logger.info("childConnectionLost for %s: %s",
+ self.job.board_name, childFD)
+
+ def processExited(self, reason):
+ self.logger.info("processExited for %s: %s",
+ self.job.board_name, reason.value)
+
+ def processEnded(self, reason):
+ self.logger.info("processEnded for %s: %s",
+ self.job.board_name, reason.value)
+ self.deferred.callback(reason.value.exitCode)
+
+
+class Job(object):
+
+ def __init__(self, job_data, dispatcher, source, board_name, reactor,
+ daemon_options):
+ self.job_data = job_data
+ self.dispatcher = dispatcher
+ self.source = source
+ self.board_name = board_name
+ self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
+ self.reactor = reactor
+ self.daemon_options = daemon_options
+ self._json_file = None
+ self._source_lock = defer.DeferredLock()
+ self._checkCancel_call = task.LoopingCall(self._checkCancel)
+ self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
+ self._time_limit_call = None
+ self._killing = False
+ self._kill_reason = ''
+
+ def _checkCancel(self):
+ if self._killing:
+ self.cancel()
+ else:
+ return self._source_lock.run(
+ self.source.jobCheckForCancellation,
+ self.board_name).addCallback(self._maybeCancel)
+
+ def cancel(self, reason=None):
+ if not self._killing:
+ if reason is None:
+ reason = "killing job for unknown reason"
+ self._kill_reason = reason
+ self.logger.info(reason)
+ self._killing = True
+ if self._signals:
+ signame = self._signals.pop(0)
+ else:
+ self.logger.warning("self._signals is empty!")
+ signame = 'SIGKILL'
+ self.logger.info(
+ 'attempting to kill job with signal %s' % signame)
+ try:
+ self._protocol.transport.signalProcess(getattr(signal, signame))
+ except ProcessExitedAlready:
+ pass
+
+ def _maybeCancel(self, cancel):
+ if cancel:
+ self.cancel("killing job by user request")
+ else:
+ logging.debug('not cancelling')
+
+ def _time_limit_exceeded(self):
+ self._time_limit_call = None
+ self.cancel("killing job for exceeding timeout")
+
+ def run(self):
+ d = self.source.getOutputDirForJobOnBoard(self.board_name)
+ return d.addCallback(self._run).addErrback(
+ catchall_errback(self.logger))
+
+ def _run(self, output_dir):
+ d = defer.Deferred()
+ json_data = self.job_data
+ fd, self._json_file = tempfile.mkstemp()
+ with os.fdopen(fd, 'wb') as f:
+ json.dump(json_data, f)
+ self._protocol = DispatcherProcessProtocol(d, self)
+ self.reactor.spawnProcess(
+ self._protocol, self.dispatcher, args=[
+ self.dispatcher, self._json_file, '--output-dir', output_dir],
+ childFDs={0: 0, 1: 'r', 2: 'r'}, env=None)
+ self._checkCancel_call.start(10)
+ timeout = max(
+ json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
+ self._time_limit_call = self.reactor.callLater(
+ timeout, self._time_limit_exceeded)
+ d.addBoth(self._exited)
+ return d
+
+ def _exited(self, exit_code):
+ self.logger.info("job finished on %s", self.job_data['target'])
+ if self._json_file is not None:
+ os.unlink(self._json_file)
+ self.logger.info("reporting job completed")
+ if self._time_limit_call is not None:
+ self._time_limit_call.cancel()
+ self._checkCancel_call.stop()
+ return self._source_lock.run(
+ self.source.jobCompleted,
+ self.board_name,
+ exit_code,
+ self._killing).addCallback(
+ lambda r: exit_code)
+
+
+class SchedulerMonitorPP(ProcessProtocol):
+
+ def __init__(self, d, board_name):
+ self.d = d
+ self.board_name = board_name
+ self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP')
+
+ def childDataReceived(self, childFD, data):
+ self.logger.warning(
+ "scheduler monitor for %s produced output: %r on fd %s",
+ self.board_name, data, childFD)
+
+ def processEnded(self, reason):
+ if not reason.check(ProcessDone):
+ self.logger.error(
+ "scheduler monitor for %s crashed: %s",
+ self.board_name, reason)
+ self.d.callback(None)
+
+
+class MonitorJob(object):
+
+ def __init__(self, job_data, dispatcher, source, board_name, reactor,
+ daemon_options):
+ self.logger = logging.getLogger(__name__ + '.MonitorJob')
+ self.job_data = job_data
+ self.dispatcher = dispatcher
+ self.source = source
+ self.board_name = board_name
+ self.reactor = reactor
+ self.daemon_options = daemon_options
+ self._json_file = None
+
+ def run(self):
+ d = defer.Deferred()
+ json_data = self.job_data
+ fd, self._json_file = tempfile.mkstemp()
+ with os.fdopen(fd, 'wb') as f:
+ json.dump(json_data, f)
+
+ childFDs = {0: 0, 1: 1, 2: 2}
+ args = [
+ 'setsid', 'lava-server', 'manage', 'schedulermonitor',
+ self.dispatcher, str(self.board_name), self._json_file,
+ '-l', self.daemon_options['LOG_LEVEL']]
+ if self.daemon_options['LOG_FILE_PATH']:
+ args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
+ childFDs = None
+ self.logger.info('executing "%s"', ' '.join(args))
+ self.reactor.spawnProcess(
+ SchedulerMonitorPP(d, self.board_name), 'setsid',
+ childFDs=childFDs, env=None, args=args)
+ d.addBoth(self._exited)
+ return d
+
+ def _exited(self, result):
+ if self._json_file is not None:
+ os.unlink(self._json_file)
+ return result
+
+
+class JobRunner(object):
+ job_cls = MonitorJob
+
+ def __init__(self, source, job, dispatcher, reactor, daemon_options,
+ job_cls=None):
+ self.source = source
+ self.dispatcher = dispatcher
+ self.reactor = reactor
+ self.daemon_options = daemon_options
+ self.job = job
+ if job.actual_device:
+ self.board_name = job.actual_device.hostname
+ elif job.requested_device:
+ self.board_name = job.requested_device.hostname
+ if job_cls is not None:
+ self.job_cls = job_cls
+ self.running_job = None
+ self.logger = logging.getLogger(__name__ + '.JobRunner.' + str(job.id))
+
+ def start(self):
+ self.logger.debug("processing job")
+ if self.job is None:
+ self.logger.debug("no job found for processing")
+ return
+ self.source.getJobDetails(self.job).addCallbacks(
+ self._startJob, self._ebStartJob)
+
+ def _startJob(self, job_data):
+ if job_data is None:
+ self.logger.debug("no job found")
+ return
+ self.logger.info("starting job %r", job_data)
+
+ self.running_job = self.job_cls(
+ job_data, self.dispatcher, self.source, self.board_name,
+ self.reactor, self.daemon_options)
+ d = self.running_job.run()
+ d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
+
+ def _ebStartJob(self, result):
+ self.logger.error(
+ '%s: %s\n%s', result.type.__name__, result.value,
+ result.getTraceback())
+ return
+
+ def stop(self):
+ self.logger.debug("stopping")
+
+ if self.running_job is not None:
+ self.logger.debug("job running; deferring stop")
+ else:
+ self.logger.debug("stopping immediately")
+ return defer.succeed(None)
+
+ def _ebJobFinished(self, result):
+ self.logger.exception(result.value)
+
+ def _cbJobFinished(self, result):
+ self.running_job = None
=== modified file 'lava_scheduler_daemon/service.py'
@@ -1,58 +1,56 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Senthil Kumaran <senthil.kumaran@linaro.org>
+#
+# This file is part of LAVA Scheduler.
+#
+# LAVA Scheduler is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License version 3 as
+# published by the Free Software Foundation
+#
+# LAVA Scheduler is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
+
import logging
from twisted.application.service import Service
from twisted.internet import defer
from twisted.internet.task import LoopingCall
-from lava_scheduler_daemon.board import Board, catchall_errback
-
-
-class BoardSet(Service):
+from lava_scheduler_daemon.job import JobRunner, catchall_errback
+
+
+class JobQueue(Service):
def __init__(self, source, dispatcher, reactor, daemon_options):
- self.logger = logging.getLogger(__name__ + '.BoardSet')
+ self.logger = logging.getLogger(__name__ + '.JobQueue')
self.source = source
- self.boards = {}
self.dispatcher = dispatcher
self.reactor = reactor
self.daemon_options = daemon_options
- self._update_boards_call = LoopingCall(self._updateBoards)
- self._update_boards_call.clock = reactor
-
- def _updateBoards(self):
- self.logger.debug("Refreshing board list")
- return self.source.getBoardList().addCallback(
- self._cbUpdateBoards).addErrback(catchall_errback(self.logger))
-
- def _cbUpdateBoards(self, board_cfgs):
- '''board_cfgs is an array of dicts {hostname=name} '''
- new_boards = {}
- for board_cfg in board_cfgs:
- board_name = board_cfg['hostname']
-
- if board_cfg['hostname'] in self.boards:
- board = self.boards.pop(board_name)
- new_boards[board_name] = board
- else:
- self.logger.info("Adding board: %s" % board_name)
- new_boards[board_name] = Board(
- self.source, board_name, self.dispatcher, self.reactor,
- self.daemon_options)
- new_boards[board_name].start()
- for board in self.boards.values():
- self.logger.info("Removing board: %s" % board.board_name)
- board.stop()
- self.boards = new_boards
+ self._check_job_call = LoopingCall(self._checkJobs)
+ self._check_job_call.clock = reactor
+
+ def _checkJobs(self):
+ self.logger.debug("Refreshing jobs")
+ return self.source.getJobList().addCallback(
+ self._cbCheckJobs).addErrback(catchall_errback(self.logger))
+
+ def _cbCheckJobs(self, job_list):
+ for job in job_list:
+ new_job = JobRunner(self.source, job, self.dispatcher,
+ self.reactor, self.daemon_options)
+ self.logger.info("Starting Job: %d " % job.id)
+ new_job.start()
def startService(self):
- self._update_boards_call.start(20)
+ self._check_job_call.start(20)
def stopService(self):
- self._update_boards_call.stop()
- ds = []
- dead_boards = []
- for board in self.boards.itervalues():
- ds.append(board.stop().addCallback(dead_boards.append))
- self.logger.info(
- "waiting for %s boards", len(self.boards) - len(dead_boards))
- return defer.gatherResults(ds)
+ self._check_job_call.stop()
+ return None
=== modified file 'lava_scheduler_daemon/tests/test_board.py'
@@ -38,7 +38,7 @@
class TestJob(object):
- def __init__(self, job_data, dispatcher, source, board_name, reactor, options, use_celery):
+ def __init__(self, job_data, dispatcher, source, board_name, reactor, options):
self.json_data = job_data
self.dispatcher = dispatcher
self.reactor = reactor