@@ -44,6 +44,7 @@ typedef struct {
int num_workers;
odp_barrier_t barrier;
int buf_count;
+ int buf_count_cpy;
odp_ticketlock_t lock;
odp_spinlock_t atomic_lock;
} test_globals_t;
@@ -63,10 +64,12 @@ typedef struct {
typedef struct {
uint64_t sequence;
uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
+ uint64_t output_sequence;
} buf_contents;
typedef struct {
odp_buffer_t ctx_handle;
+ odp_queue_t pq_handle;
uint64_t sequence;
uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
} queue_context;
@@ -384,17 +387,21 @@ static void *schedule_common_(void *arg)
odp_schedule_sync_t sync;
test_globals_t *globals;
queue_context *qctx;
- buf_contents *bctx;
+ buf_contents *bctx, *bctx_cpy;
+ odp_pool_t pool;
globals = args->globals;
sync = args->sync;
+ pool = odp_pool_lookup(MSG_POOL_NAME);
+ CU_ASSERT_FATAL(pool != ODP_POOL_INVALID);
+
if (args->num_workers > 1)
odp_barrier_wait(&globals->barrier);
while (1) {
odp_event_t ev;
- odp_buffer_t buf;
+ odp_buffer_t buf, buf_cpy;
odp_queue_t from = ODP_QUEUE_INVALID;
int num = 0;
int locked;
@@ -407,7 +414,9 @@ static void *schedule_common_(void *arg)
odp_ticketlock_unlock(&globals->lock);
if (args->enable_schd_multi) {
- odp_event_t events[BURST_BUF_SIZE];
+ odp_event_t events[BURST_BUF_SIZE],
+ ev_cpy[BURST_BUF_SIZE];
+ odp_buffer_t buf_cpy[BURST_BUF_SIZE];
int j;
num = odp_schedule_multi(&from, ODP_SCHED_NO_WAIT,
events, BURST_BUF_SIZE);
@@ -419,11 +428,33 @@ static void *schedule_common_(void *arg)
if (sync == ODP_SCHED_SYNC_ORDERED) {
uint32_t ndx;
uint32_t ndx_max = odp_queue_lock_count(from);
+ int rc;
qctx = odp_queue_context(from);
+
+ for (j = 0; j < num; j++) {
+ bctx = odp_buffer_addr(
+ odp_buffer_from_event
+ (events[j]));
+
+ buf_cpy[j] = odp_buffer_alloc(pool);
+ CU_ASSERT_FATAL(buf_cpy[j] !=
+ ODP_BUFFER_INVALID);
+ bctx_cpy = odp_buffer_addr(buf_cpy[j]);
+ memcpy(bctx_cpy, bctx,
+ sizeof(buf_contents));
+ bctx_cpy->output_sequence =
+ bctx_cpy->sequence;
+ ev_cpy[j] =
+ odp_buffer_to_event(buf_cpy[j]);
+ }
+
+ rc = odp_queue_enq_multi(qctx->pq_handle,
+ ev_cpy, num);
+ CU_ASSERT(rc == num);
+
bctx = odp_buffer_addr(
odp_buffer_from_event(events[0]));
-
for (ndx = 0; ndx < ndx_max; ndx++) {
odp_schedule_order_lock(ndx);
CU_ASSERT(bctx->sequence ==
@@ -444,9 +475,20 @@ static void *schedule_common_(void *arg)
if (sync == ODP_SCHED_SYNC_ORDERED) {
uint32_t ndx;
uint32_t ndx_max = odp_queue_lock_count(from);
+ int rc;
qctx = odp_queue_context(from);
bctx = odp_buffer_addr(buf);
+ buf_cpy = odp_buffer_alloc(pool);
+ CU_ASSERT_FATAL(buf_cpy != ODP_BUFFER_INVALID);
+ bctx_cpy = odp_buffer_addr(buf_cpy);
+ memcpy(bctx_cpy, bctx, sizeof(buf_contents));
+ bctx_cpy->output_sequence = bctx_cpy->sequence;
+
+ rc = odp_queue_enq(qctx->pq_handle,
+ odp_buffer_to_event
+ (buf_cpy));
+ CU_ASSERT(rc == 0);
for (ndx = 0; ndx < ndx_max; ndx++) {
odp_schedule_order_lock(ndx);
@@ -456,6 +498,7 @@ static void *schedule_common_(void *arg)
odp_schedule_order_unlock(ndx);
}
}
+
odp_buffer_free(buf);
}
@@ -491,6 +534,52 @@ static void *schedule_common_(void *arg)
odp_ticketlock_unlock(&globals->lock);
}
+ if (args->num_workers > 1)
+ odp_barrier_wait(&globals->barrier);
+
+ if (sync == ODP_SCHED_SYNC_ORDERED &&
+ odp_ticketlock_trylock(&globals->lock) &&
+ globals->buf_count_cpy > 0) {
+ odp_event_t ev;
+ odp_queue_t pq;
+ uint64_t seq;
+ uint64_t bcount = 0;
+ int i, j;
+ char name[32];
+ uint64_t num_bufs = args->num_bufs;
+ uint64_t buf_count = globals->buf_count_cpy;
+
+ for (i = 0; i < args->num_prio; i++) {
+ for (j = 0; j < args->num_queues; j++) {
+ snprintf(name, sizeof(name),
+ "poll_%d_%d_o", i, j);
+ pq = odp_queue_lookup(name);
+ CU_ASSERT_FATAL(pq != ODP_QUEUE_INVALID);
+
+ seq = 0;
+ while (1) {
+ ev = odp_queue_deq(pq);
+
+ if (ev == ODP_EVENT_INVALID) {
+ CU_ASSERT(seq == num_bufs);
+ break;
+ }
+
+ bctx = odp_buffer_addr(
+ odp_buffer_from_event(ev));
+
+ CU_ASSERT(bctx->sequence == seq);
+ seq++;
+ bcount++;
+ odp_event_free(ev);
+ }
+ }
+ }
+ CU_ASSERT(bcount == buf_count);
+ globals->buf_count_cpy = 0;
+ odp_ticketlock_unlock(&globals->lock);
+ }
+
return NULL;
}
@@ -559,6 +648,7 @@ static void fill_queues(thread_args_t *args)
}
globals->buf_count = buf_count;
+ globals->buf_count_cpy = buf_count;
}
static void reset_queues(thread_args_t *args)
@@ -915,13 +1005,13 @@ static int create_queues(void)
int i, j, prios, rc;
odp_pool_param_t params;
odp_buffer_t queue_ctx_buf;
- queue_context *qctx;
+ queue_context *qctx, *pqctx;
uint32_t ndx;
prios = odp_schedule_num_prio();
odp_pool_param_init(¶ms);
params.buf.size = sizeof(queue_context);
- params.buf.num = prios * QUEUES_PER_PRIO;
+ params.buf.num = prios * QUEUES_PER_PRIO * 2;
params.type = ODP_POOL_BUFFER;
queue_ctx_pool = odp_pool_create(QUEUE_CTX_POOL_NAME, ¶ms);
@@ -939,7 +1029,7 @@ static int create_queues(void)
for (j = 0; j < QUEUES_PER_PRIO; j++) {
/* Per sched sync type */
char name[32];
- odp_queue_t q;
+ odp_queue_t q, pq;
snprintf(name, sizeof(name), "sched_%d_%d_n", i, j);
p.sched.sync = ODP_SCHED_SYNC_NONE;
@@ -959,6 +1049,31 @@ static int create_queues(void)
return -1;
}
+ snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+ pq = odp_queue_create(name, ODP_QUEUE_TYPE_POLL, NULL);
+ if (pq == ODP_QUEUE_INVALID) {
+ printf("Poll queue create failed.\n");
+ return -1;
+ }
+
+ queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool);
+
+ if (queue_ctx_buf == ODP_BUFFER_INVALID) {
+ printf("Cannot allocate poll queue ctx buf\n");
+ return -1;
+ }
+
+ pqctx = odp_buffer_addr(queue_ctx_buf);
+ pqctx->ctx_handle = queue_ctx_buf;
+ pqctx->sequence = 0;
+
+ rc = odp_queue_context_set(pq, pqctx);
+
+ if (rc != 0) {
+ printf("Cannot set poll queue context\n");
+ return -1;
+ }
+
snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
p.sched.sync = ODP_SCHED_SYNC_ORDERED;
p.sched.lock_count =
@@ -988,6 +1103,7 @@ static int create_queues(void)
qctx = odp_buffer_addr(queue_ctx_buf);
qctx->ctx_handle = queue_ctx_buf;
+ qctx->pq_handle = pq;
qctx->sequence = 0;
for (ndx = 0;
@@ -1105,11 +1221,17 @@ static int destroy_queues(void)
snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
if (destroy_queue(name) != 0)
return -1;
+
+ snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+ if (destroy_queue(name) != 0)
+ return -1;
}
}
- if (odp_pool_destroy(queue_ctx_pool) != 0)
+ if (odp_pool_destroy(queue_ctx_pool) != 0) {
+ fprintf(stderr, "error: failed to destroy queue ctx pool\n");
return -1;
+ }
return 0;
}
Extend scheduler CUnit test to add explicit tests that ordered queues perform reorder processing properly. This fixes Bug https://bugs.linaro.org/show_bug.cgi?id=1824 Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- test/validation/scheduler/scheduler.c | 138 ++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 8 deletions(-)