@@ -20,6 +20,7 @@
#define GLOBALS_SHM_NAME "test_globals"
#define MSG_POOL_NAME "msg_pool"
+#define QUEUE_CTX_POOL_NAME "queue_ctx_pool"
#define SHM_MSG_POOL_NAME "shm_msg_pool"
#define SHM_THR_ARGS_NAME "shm_thr_args"
@@ -59,7 +60,19 @@ typedef struct {
int enable_excl_atomic;
} thread_args_t;
+typedef struct {
+ uint64_t sequence;
+} buf_contents;
+
+typedef struct {
+ odp_buffer_t ctx_handle;
+ uint64_t sequence;
+ uint64_t lock_sequence;
+ odp_schedule_order_lock_t order_lock;
+} queue_context;
+
odp_pool_t pool;
+odp_pool_t queue_ctx_pool;
static int exit_schedule_loop(void)
{
@@ -327,6 +340,12 @@ void scheduler_test_groups(void)
rc = odp_schedule_group_join(mygrp1, &mymask);
CU_ASSERT_FATAL(rc == 0);
+ /* Tell scheduler we're about to request an event.
+ * Not needed, but a convenient place to test this API.
+ */
+ odp_schedule_prefetch(1);
+
+ /* Now get the event from Queue 1 */
ev = odp_schedule(&from, ODP_SCHED_WAIT);
CU_ASSERT_FATAL(ev != ODP_EVENT_INVALID);
CU_ASSERT_FATAL(from == queue_grp1);
@@ -350,6 +369,8 @@ void scheduler_test_groups(void)
CU_ASSERT_FATAL(odp_queue_destroy(queue_grp2) == 0);
}
+ CU_ASSERT_FATAL(odp_schedule_group_destroy(mygrp1) == 0);
+ CU_ASSERT_FATAL(odp_schedule_group_destroy(mygrp2) == 0);
CU_ASSERT_FATAL(odp_pool_destroy(p) == 0);
}
@@ -358,6 +379,8 @@ static void *schedule_common_(void *arg)
thread_args_t *args = (thread_args_t *)arg;
odp_schedule_sync_t sync;
test_globals_t *globals;
+ queue_context *qctx;
+ buf_contents *bctx;
globals = args->globals;
sync = args->sync;
@@ -389,6 +412,17 @@ static void *schedule_common_(void *arg)
if (num == 0)
continue;
+ if (sync == ODP_SCHED_SYNC_ORDERED) {
+ qctx = odp_queue_context(from);
+ bctx = odp_buffer_addr(
+ odp_buffer_from_event(events[0]));
+ odp_schedule_order_lock(&qctx->order_lock);
+ CU_ASSERT(bctx->sequence ==
+ qctx->lock_sequence);
+ qctx->lock_sequence += num;
+ odp_schedule_order_unlock(&qctx->order_lock);
+ }
+
for (j = 0; j < num; j++)
odp_event_free(events[j]);
} else {
@@ -397,6 +431,15 @@ static void *schedule_common_(void *arg)
if (buf == ODP_BUFFER_INVALID)
continue;
num = 1;
+ if (sync == ODP_SCHED_SYNC_ORDERED) {
+ qctx = odp_queue_context(from);
+ bctx = odp_buffer_addr(buf);
+ odp_schedule_order_lock(&qctx->order_lock);
+ CU_ASSERT(bctx->sequence ==
+ qctx->lock_sequence);
+ qctx->lock_sequence += num;
+ odp_schedule_order_unlock(&qctx->order_lock);
+ }
odp_buffer_free(buf);
}
@@ -484,6 +527,13 @@ static void fill_queues(thread_args_t *args)
buf = odp_buffer_alloc(pool);
CU_ASSERT_FATAL(buf != ODP_BUFFER_INVALID);
ev = odp_buffer_to_event(buf);
+ if (sync == ODP_SCHED_SYNC_ORDERED) {
+ queue_context *qctx =
+ odp_queue_context(queue);
+ buf_contents *bctx =
+ odp_buffer_addr(buf);
+ bctx->sequence = qctx->sequence++;
+ }
if (!(CU_ASSERT(odp_queue_enq(queue, ev) == 0)))
odp_buffer_free(buf);
else
@@ -495,6 +545,32 @@ static void fill_queues(thread_args_t *args)
globals->buf_count = buf_count;
}
+static void reset_queues(thread_args_t *args)
+{
+ int i, j, k;
+ int num_prio = args->num_prio;
+ int num_queues = args->num_queues;
+ char name[32];
+
+ for (i = 0; i < num_prio; i++) {
+ for (j = 0; j < num_queues; j++) {
+ odp_queue_t queue;
+
+ snprintf(name, sizeof(name),
+ "sched_%d_%d_o", i, j);
+ queue = odp_queue_lookup(name);
+ CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
+
+ for (k = 0; k < args->num_bufs; k++) {
+ queue_context *qctx =
+ odp_queue_context(queue);
+ qctx->sequence = 0;
+ qctx->lock_sequence = 0;
+ }
+ }
+ }
+}
+
static void schedule_common(odp_schedule_sync_t sync, int num_queues,
int num_prio, int enable_schd_multi)
{
@@ -519,6 +595,8 @@ static void schedule_common(odp_schedule_sync_t sync, int num_queues,
fill_queues(&args);
schedule_common_(&args);
+ if (sync == ODP_SCHED_SYNC_ORDERED)
+ reset_queues(&args);
}
static void parallel_execute(odp_schedule_sync_t sync, int num_queues,
@@ -559,6 +637,10 @@ static void parallel_execute(odp_schedule_sync_t sync, int num_queues,
/* Wait for worker threads to terminate */
odp_cunit_thread_exit(&args->cu_thr);
+
+ /* Cleanup ordered queues for next pass */
+ if (sync == ODP_SCHED_SYNC_ORDERED)
+ reset_queues(args);
}
/* 1 queue 1 thread ODP_SCHED_SYNC_NONE */
@@ -810,9 +892,23 @@ void scheduler_test_pause_resume(void)
static int create_queues(void)
{
- int i, j, prios;
+ int i, j, prios, rc;
+ odp_pool_param_t params;
+ odp_buffer_t queue_ctx_buf;
+ queue_context *qctx;
prios = odp_schedule_num_prio();
+ odp_pool_param_init(¶ms);
+ params.buf.size = sizeof(queue_context);
+ params.buf.num = prios * QUEUES_PER_PRIO;
+ params.type = ODP_POOL_BUFFER;
+
+ queue_ctx_pool = odp_pool_create(QUEUE_CTX_POOL_NAME, ¶ms);
+
+ if (queue_ctx_pool == ODP_POOL_INVALID) {
+ printf("Pool creation failed (queue ctx).\n");
+ return -1;
+ }
for (i = 0; i < prios; i++) {
odp_queue_param_t p;
@@ -850,6 +946,31 @@ static int create_queues(void)
printf("Schedule queue create failed.\n");
return -1;
}
+
+ queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool);
+
+ if (queue_ctx_buf == ODP_BUFFER_INVALID) {
+ printf("Cannot allocate queue ctx buf\n");
+ return -1;
+ }
+
+ qctx = odp_buffer_addr(queue_ctx_buf);
+ qctx->ctx_handle = queue_ctx_buf;
+ qctx->sequence = 0;
+ qctx->lock_sequence = 0;
+ rc = odp_schedule_order_lock_init(&qctx->order_lock, q);
+
+ if (rc != 0) {
+ printf("Ordered lock init failed\n");
+ return -1;
+ }
+
+ rc = odp_queue_context_set(q, qctx);
+
+ if (rc != 0) {
+ printf("Cannot set queue context\n");
+ return -1;
+ }
}
}
@@ -919,11 +1040,15 @@ int scheduler_suite_init(void)
static int destroy_queue(const char *name)
{
odp_queue_t q;
+ queue_context *qctx;
q = odp_queue_lookup(name);
if (q == ODP_QUEUE_INVALID)
return -1;
+ qctx = odp_queue_context(q);
+ if (qctx)
+ odp_buffer_free(qctx->ctx_handle);
return odp_queue_destroy(q);
}
@@ -952,6 +1077,9 @@ static int destroy_queues(void)
}
}
+ if (odp_pool_destroy(queue_ctx_pool) != 0)
+ return -1;
+
return 0;
}
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- test/validation/scheduler/scheduler.c | 130 +++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-)