@@ -138,6 +138,11 @@ static inline int queue_is_atomic(queue_entry_t *qe)
return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
}
+static inline int queue_is_ordered(queue_entry_t *qe)
+{
+ return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
+}
+
static inline odp_queue_t queue_handle(queue_entry_t *qe)
{
return qe->s.handle;
@@ -514,6 +514,9 @@ odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
/* By default, buffers inherit their pool's zeroization setting */
buf->buf.flags.zeroized = pool->s.flags.zeroized;
+ /* By default, buffers are not associated with an ordered queue */
+ buf->buf.origin_qe = NULL;
+
if (buf->buf.type == ODP_EVENT_PACKET)
packet_init(pool, &buf->pkt, size);
@@ -14,6 +14,7 @@
#include <odp_buffer_inlines.h>
#include <odp_internal.h>
#include <odp/shared_memory.h>
+#include <odp/schedule.h>
#include <odp_schedule_internal.h>
#include <odp/config.h>
#include <odp_packet_io_internal.h>
@@ -21,17 +22,20 @@
#include <odp_debug_internal.h>
#include <odp/hints.h>
#include <odp/sync.h>
+#include <odp_spin_internal.h>
#ifdef USE_TICKETLOCK
#include <odp/ticketlock.h>
#define LOCK(a) odp_ticketlock_lock(a)
#define UNLOCK(a) odp_ticketlock_unlock(a)
#define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a) odp_ticketlock_trylock(a)
#else
#include <odp/spinlock.h>
#define LOCK(a) odp_spinlock_lock(a)
#define UNLOCK(a) odp_spinlock_unlock(a)
#define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a) odp_spinlock_trylock(a)
#endif
#include <string.h>
@@ -73,9 +77,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.dequeue_multi = pktin_deq_multi;
break;
case ODP_QUEUE_TYPE_PKTOUT:
- queue->s.enqueue = pktout_enqueue;
+ queue->s.enqueue = queue_pktout_enq;
queue->s.dequeue = pktout_dequeue;
- queue->s.enqueue_multi = pktout_enq_multi;
+ queue->s.enqueue_multi = queue_pktout_enq_multi;
queue->s.dequeue_multi = pktout_deq_multi;
break;
default:
@@ -89,6 +93,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.head = NULL;
queue->s.tail = NULL;
+ queue->s.reorder_head = NULL;
+ queue->s.reorder_tail = NULL;
+
queue->s.pri_queue = ODP_QUEUE_INVALID;
queue->s.cmd_ev = ODP_EVENT_INVALID;
}
@@ -329,30 +336,134 @@ odp_queue_t odp_queue_lookup(const char *name)
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
{
int sched = 0;
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+ odp_buffer_hdr_t *buf_tail;
+
+ /* Need two locks for enq operations from ordered queues */
+ if (origin_qe) {
+ LOCK(&origin_qe->s.lock);
+ while (!LOCK_TRY(&queue->s.lock)) {
+ UNLOCK(&origin_qe->s.lock);
+ LOCK(&origin_qe->s.lock);
+ }
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ } else {
+ LOCK(&queue->s.lock);
+ }
- LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
+ if (origin_qe)
+ UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
- if (queue->s.head == NULL) {
+ /* We can only complete the enq if we're in order */
+ if (origin_qe) {
+ if (buf_hdr->order > origin_qe->s.order_out) {
+ reorder_enq(queue, origin_qe, buf_hdr);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ /* We're in order, so account for this and proceed with enq */
+ if (!buf_hdr->flags.sustain)
+ order_release(origin_qe, 1);
+
+ /* if this element is linked, restore the linked chain */
+ buf_tail = buf_hdr->link;
+
+ if (buf_tail) {
+ buf_hdr->next = buf_tail;
+ buf_hdr->link = NULL;
+
+ /* find end of the chain */
+ while (buf_tail->next)
+ buf_tail = buf_tail->next;
+ } else {
+ buf_tail = buf_hdr;
+ }
+ } else {
+ buf_tail = buf_hdr;
+ }
+
+ if (!queue->s.head) {
/* Empty queue */
queue->s.head = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
} else {
queue->s.tail->next = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
}
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
}
- UNLOCK(&queue->s.lock);
+
+ /*
+ * If we came from an ordered queue, check to see if our successful
+ * enq has unblocked other buffers in the origin's reorder queue.
+ */
+ if (origin_qe) {
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *placeholder_buf;
+ uint32_t release_count;
+ uint32_t placeholder_count;
+
+ reorder_deq(queue, origin_qe,
+ &reorder_buf, &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
+
+ /* Add released buffers to the queue as well */
+ if (release_count > 0) {
+ queue->s.tail->next = origin_qe->s.reorder_head;
+ queue->s.tail = reorder_prev;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ }
+
+ /* Reflect the above two in the output sequence */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle any unblocked buffers destined for other queues */
+ UNLOCK(&queue->s.lock);
+
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out)
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+ UNLOCK(&origin_qe->s.lock);
+
+ if (reorder_buf)
+ odp_queue_enq(reorder_buf->target_qe->s.handle,
+ (odp_event_t)reorder_buf->handle.handle);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+ } else {
+ UNLOCK(&queue->s.lock);
+ }
/* Add queue to scheduling */
if (sched && schedule_queue(queue))
@@ -364,41 +475,83 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
{
int sched = 0;
- int i;
+ int i, rc, ret_count = 0;
+ int ordered_head[num];
+ int ordered_count = 0;
odp_buffer_hdr_t *tail;
- for (i = 0; i < num - 1; i++)
- buf_hdr[i]->next = buf_hdr[i+1];
+ /* Identify ordered chains in the input buffer list */
+ for (i = 0; i < num; i++) {
+ if (buf_hdr[i]->origin_qe)
+ ordered_head[ordered_count++] = i;
- tail = buf_hdr[num-1];
- buf_hdr[num-1]->next = NULL;
+ buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+ }
- LOCK(&queue->s.lock);
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
- UNLOCK(&queue->s.lock);
- ODP_ERR("Bad queue status\n");
- return -1;
+ if (ordered_count) {
+ if (ordered_head[0] > 0) {
+ tail = buf_hdr[ordered_head[0] - 1];
+ tail->next = NULL;
+ ret_count = ordered_head[0];
+ } else {
+ tail = NULL;
+ ret_count = 0;
+ }
+ } else {
+ tail = buf_hdr[num - 1];
+ ret_count = num;
}
- /* Empty queue */
- if (queue->s.head == NULL)
- queue->s.head = buf_hdr[0];
- else
- queue->s.tail->next = buf_hdr[0];
+ /* Handle regular enq's at start of list */
+ if (tail) {
+ LOCK(&queue->s.lock);
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
- queue->s.tail = tail;
+ /* Handle empty queue */
+ if (queue->s.head)
+ queue->s.tail->next = buf_hdr[0];
+ else
+ queue->s.head = buf_hdr[0];
- if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
- queue->s.status = QUEUE_STATUS_SCHED;
- sched = 1; /* retval: schedule queue */
+ queue->s.tail = tail;
+
+ if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+ queue->s.status = QUEUE_STATUS_SCHED;
+ sched = 1; /* retval: schedule queue */
+ }
+ UNLOCK(&queue->s.lock);
+
+ /* Add queue to scheduling */
+ if (sched && schedule_queue(queue))
+ ODP_ABORT("schedule_queue failed\n");
}
- UNLOCK(&queue->s.lock);
- /* Add queue to scheduling */
- if (sched && schedule_queue(queue))
- ODP_ABORT("schedule_queue failed\n");
+ /* Handle ordered chains in the list */
+ for (i = 0; i < ordered_count; i++) {
+ int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+ int list_count = eol - i;
+
+ if (i < ordered_count - 1)
+ buf_hdr[eol - 1]->next = NULL;
- return num; /* All events enqueued */
+ buf_hdr[ordered_head[i]]->link =
+ list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+ rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+ if (rc < 0)
+ return ret_count;
+
+ if (rc < list_count)
+ return ret_count + rc;
+
+ ret_count += rc;
+ }
+
+ return ret_count;
}
int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -427,6 +580,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
queue = queue_to_qentry(handle);
buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ /* No chains via this entry */
+ buf_hdr->link = NULL;
+
return queue->s.enqueue(queue, buf_hdr);
}
@@ -450,6 +606,18 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
queue->s.head = buf_hdr->next;
buf_hdr->next = NULL;
+ /* Note that order should really be assigned on enq to an
+ * ordered queue rather than deq, however the logic is simpler
+ * to do it here and has the same effect.
+ */
+ if (queue_is_ordered(queue)) {
+ buf_hdr->origin_qe = queue;
+ buf_hdr->order = queue->s.order_in++;
+ buf_hdr->flags.sustain = 0;
+ } else {
+ buf_hdr->origin_qe = NULL;
+ }
+
if (queue->s.head == NULL) {
/* Queue is now empty */
queue->s.tail = NULL;
@@ -489,6 +657,13 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
buf_hdr[i] = hdr;
hdr = hdr->next;
buf_hdr[i]->next = NULL;
+ if (queue_is_ordered(queue)) {
+ buf_hdr[i]->origin_qe = queue;
+ buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->flags.sustain = 0;
+ } else {
+ buf_hdr[i]->origin_qe = NULL;
+ }
}
queue->s.head = hdr;
@@ -537,6 +712,191 @@ odp_event_t odp_queue_deq(odp_queue_t handle)
return ODP_EVENT_INVALID;
}
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+{
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+ int rc, sustain;
+
+ /* Special processing needed only if we came from an ordered queue */
+ if (!origin_qe)
+ return pktout_enqueue(queue, buf_hdr);
+
+ /* Must lock origin_qe for ordered processing */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* We can only complete the enq if we're in order */
+ if (buf_hdr->order > origin_qe->s.order_out) {
+ reorder_enq(queue, origin_qe, buf_hdr);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ /* Perform our enq since we're in order.
+ * Note: Don't hold the origin_qe lock across an I/O operation!
+ * Note that we also cache the sustain flag since the buffer may
+ * be freed by the I/O operation so we can't reference it afterwards.
+ */
+ UNLOCK(&origin_qe->s.lock);
+ sustain = buf_hdr->flags.sustain;
+
+ /* Handle any chained buffers (internal calls) */
+ if (buf_hdr->link) {
+ odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+ odp_buffer_hdr_t *next_buf;
+ int num = 0;
+
+ next_buf = buf_hdr->link;
+ buf_hdr->link = NULL;
+
+ while (next_buf) {
+ buf_hdrs[num++] = next_buf;
+ next_buf = next_buf->next;
+ }
+
+ rc = pktout_enq_multi(queue, buf_hdrs, num);
+ if (rc < num)
+ return -1;
+ } else {
+ rc = pktout_enqueue(queue, buf_hdr);
+ if (!rc)
+ return rc;
+ }
+
+ /* Reacquire the lock following the I/O send. Note that we're still
+ * guaranteed to be in order here since we haven't released
+ * order yet.
+ */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* Account for this ordered enq */
+ if (!sustain)
+ order_release(origin_qe, 1);
+
+ /* Now check to see if our successful enq has unblocked other buffers
+ * in the origin's reorder queue.
+ */
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *xmit_buf;
+ odp_buffer_hdr_t *placeholder_buf;
+ uint32_t release_count;
+ uint32_t placeholder_count;
+
+ reorder_deq(queue, origin_qe,
+ &reorder_buf, &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
+
+ /* Send released buffers as well */
+ if (release_count > 0) {
+ xmit_buf = origin_qe->s.reorder_head;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ UNLOCK(&origin_qe->s.lock);
+
+ do {
+ next_buf = xmit_buf->next;
+ pktout_enqueue(queue, xmit_buf);
+ xmit_buf = next_buf;
+ } while (xmit_buf);
+
+ /* Reacquire the origin_qe lock to continue */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ }
+
+ /* Update the order sequence to reflect the deq'd elements */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle sends to other queues that are ready to go */
+ if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out)
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+
+ /* We're fully done with the origin_qe at last */
+ UNLOCK(&origin_qe->s.lock);
+
+ /* Now send the next buffer to its target queue */
+ if (reorder_buf)
+ odp_queue_enq(reorder_buf->target_qe->s.handle,
+ (odp_event_t)reorder_buf->handle.handle);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+
+ return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ int i, rc, ret_count = 0;
+ int ordered_head[num];
+ int ordered_count = 0;
+
+ /* Identify ordered chains in the input buffer list */
+ for (i = 0; i < num; i++) {
+ if (buf_hdr[i]->origin_qe)
+ ordered_head[ordered_count++] = i;
+
+ buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+ }
+
+ ret_count = ordered_count ? ordered_head[0] : num;
+
+ /* Handle regular enq's at start of list */
+ if (ret_count) {
+ rc = pktout_enq_multi(queue, buf_hdr, ret_count);
+ if (rc < ret_count)
+ return rc;
+ }
+
+ /* Handle ordered chains in the list */
+ for (i = 0; i < ordered_count; i++) {
+ int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+ int list_count = eol - i;
+
+ if (i < ordered_count - 1)
+ buf_hdr[eol - 1]->next = NULL;
+
+ buf_hdr[ordered_head[i]]->link =
+ list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+ rc = queue_pktout_enq(queue, buf_hdr[ordered_head[i]]);
+ if (rc < 0)
+ return ret_count;
+
+ if (rc < list_count)
+ return ret_count + rc;
+
+ ret_count += rc;
+ }
+
+ return ret_count;
+}
void queue_lock(queue_entry_t *queue)
{
@@ -411,8 +411,6 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max)
/*
* Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
*/
static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
unsigned int max_num, unsigned int max_deq)
@@ -497,6 +495,12 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
continue;
}
+ /* For ordered queues we want consecutive events to
+ * be dispatched to separate threads, so do not cache
+ * them locally.
+ */
+ if (queue_is_ordered(qe))
+ max_deq = 1;
num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq);
if (num < 0) {
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 5 + platform/linux-generic/odp_pool.c | 3 + platform/linux-generic/odp_queue.c | 426 +++++++++++++++++++-- platform/linux-generic/odp_schedule.c | 8 +- 4 files changed, 407 insertions(+), 35 deletions(-)