@@ -103,16 +103,23 @@ typedef union odp_buffer_bits_t {
/* forward declaration */
struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
/* Common buffer header */
typedef struct odp_buffer_hdr_t {
- struct odp_buffer_hdr_t *next; /* next buf in a list */
+ struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */
+ union { /* Multi-use secondary link */
+ struct odp_buffer_hdr_t *prev;
+ struct odp_buffer_hdr_t *link;
+ };
odp_buffer_bits_t handle; /* handle */
union {
uint32_t all;
struct {
uint32_t zeroized:1; /* Zeroize buf data on free */
uint32_t hdrdata:1; /* Data is in buffer hdr */
+ uint32_t sustain:1; /* Sustain order */
};
} flags;
int16_t allocator; /* allocating thread id */
@@ -131,6 +138,9 @@ typedef struct odp_buffer_hdr_t {
uint32_t segcount; /* segment count */
uint32_t segsize; /* segment size */
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+ uint64_t order; /* sequence for ordered queues */
+ queue_entry_t *origin_qe; /* ordered queue origin */
+ queue_entry_t *target_qe; /* ordered queue target */
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
@@ -27,10 +27,11 @@ extern "C" {
_ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX,
"ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
-int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
-int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+ int sustain);
int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
@@ -23,6 +23,7 @@ extern "C" {
#include <odp_align_internal.h>
#include <odp/packet_io.h>
#include <odp/align.h>
+#include <odp/hints.h>
#define USE_TICKETLOCK
@@ -45,11 +46,11 @@ extern "C" {
/* forward declaration */
union queue_entry_u;
-typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
+typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *, int);
typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
typedef int (*enq_multi_func_t)(union queue_entry_u *,
- odp_buffer_hdr_t **, int);
+ odp_buffer_hdr_t **, int, int);
typedef int (*deq_multi_func_t)(union queue_entry_u *,
odp_buffer_hdr_t **, int);
@@ -77,6 +78,10 @@ struct queue_entry_s {
odp_pktio_t pktin;
odp_pktio_t pktout;
char name[ODP_QUEUE_NAME_LEN];
+ uint64_t order_in;
+ uint64_t order_out;
+ odp_buffer_hdr_t *reorder_head;
+ odp_buffer_hdr_t *reorder_tail;
};
typedef union queue_entry_u {
@@ -87,12 +92,20 @@ typedef union queue_entry_u {
queue_entry_t *get_qentry(uint32_t queue_id);
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
+
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+ int sustain);
int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+ int sustain);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+ odp_buffer_hdr_t *buf_hdr[], int num, int sustain);
+
int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num);
@@ -104,6 +117,12 @@ void queue_unlock(queue_entry_t *queue);
int queue_sched_atomic(odp_queue_t handle);
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+ odp_pool_t pool, int enq_called);
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void sched_enq_called(void);
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
+
static inline uint32_t queue_to_id(odp_queue_t handle)
{
return _odp_typeval(handle) - 1;
@@ -127,6 +146,11 @@ static inline int queue_is_atomic(queue_entry_t *qe)
return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
}
+static inline int queue_is_ordered(queue_entry_t *qe)
+{
+ return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
+}
+
static inline odp_queue_t queue_handle(queue_entry_t *qe)
{
return qe->s.handle;
@@ -137,6 +161,151 @@ static inline int queue_prio(queue_entry_t *qe)
return qe->s.param.sched.prio;
}
+static inline void reorder_enq(queue_entry_t *queue,
+ uint64_t order,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t *buf_hdr,
+ int sustain)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev =
+ (odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+ while (reorder_buf && order >= reorder_buf->order) {
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ }
+
+ buf_hdr->next = reorder_buf;
+ reorder_prev->next = buf_hdr;
+
+ if (!reorder_buf)
+ origin_qe->s.reorder_tail = buf_hdr;
+
+ buf_hdr->origin_qe = origin_qe;
+ buf_hdr->target_qe = queue;
+ buf_hdr->order = order;
+ buf_hdr->flags.sustain = sustain;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+ origin_qe->s.order_out += count;
+ odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
+}
+
+static inline int reorder_deq(queue_entry_t *queue,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t **reorder_buf_return,
+ odp_buffer_hdr_t **reorder_prev_return,
+ odp_buffer_hdr_t **placeholder_buf_return,
+ int *release_count_return,
+ int *placeholder_count_return)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev = NULL;
+ odp_buffer_hdr_t *placeholder_buf = NULL;
+ odp_buffer_hdr_t *next_buf;
+ int deq_count = 0;
+ int release_count = 0;
+ int placeholder_count = 0;
+
+ while (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out +
+ release_count + placeholder_count) {
+ /*
+ * Elements on the reorder list fall into one of
+ * three categories:
+ *
+ * 1. Those destined for the same queue. These
+ * can be enq'd now if they were waiting to
+ * be unblocked by this enq.
+ *
+ * 2. Those representing placeholders for events
+ * whose ordering was released by a prior
+ * odp_schedule_release_ordered() call. These
+ * can now just be freed.
+ *
+ * 3. Those representing events destined for another
+ * queue. These cannot be consolidated with this
+ * enq since they have a different target.
+ *
+ * Detecting an element with an order sequence gap, an
+ * element in category 3, or running out of elements
+ * stops the scan.
+ */
+ next_buf = reorder_buf->next;
+
+ if (odp_likely(reorder_buf->target_qe == queue)) {
+ /* promote any chain */
+ odp_buffer_hdr_t *reorder_link =
+ reorder_buf->link;
+
+ if (reorder_link) {
+ reorder_buf->next = reorder_link;
+ reorder_buf->link = NULL;
+ while (reorder_link->next)
+ reorder_link = reorder_link->next;
+ reorder_link->next = next_buf;
+ reorder_prev = reorder_link;
+ } else {
+ reorder_prev = reorder_buf;
+ }
+
+ deq_count++;
+ if (!reorder_buf->flags.sustain)
+ release_count++;
+ reorder_buf = next_buf;
+ } else if (!reorder_buf->target_qe) {
+ if (reorder_prev)
+ reorder_prev->next = next_buf;
+ else
+ origin_qe->s.reorder_head = next_buf;
+
+ reorder_buf->next = placeholder_buf;
+ placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ placeholder_count++;
+ } else {
+ break;
+ }
+ }
+
+ *reorder_buf_return = reorder_buf;
+ *reorder_prev_return = reorder_prev;
+ *placeholder_buf_return = placeholder_buf;
+ *release_count_return = release_count;
+ *placeholder_count_return = placeholder_count;
+
+ return deq_count;
+}
+
+static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+{
+ odp_buffer_hdr_t *next_buf = reorder_buf->next;
+ uint64_t order = reorder_buf->order;
+
+ while (reorder_buf->flags.sustain &&
+ next_buf && next_buf->order == order) {
+ reorder_buf = next_buf;
+ next_buf = reorder_buf->next;
+ }
+
+ return !reorder_buf->flags.sustain;
+}
+
+static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
+ odp_buffer_hdr_t *buf_hdr)
+{
+ if (buf_hdr && buf_hdr->origin_qe) {
+ *origin_qe = buf_hdr->origin_qe;
+ *order = buf_hdr->order;
+ } else {
+ get_sched_order(origin_qe, order);
+ }
+}
+
void queue_destroy_finalize(queue_entry_t *qe);
#ifdef __cplusplus
@@ -15,6 +15,7 @@ extern "C" {
#include <odp/buffer.h>
+#include <odp_buffer_internal.h>
#include <odp/queue.h>
#include <odp/packet_io.h>
#include <odp_queue_internal.h>
@@ -28,9 +29,8 @@ static inline int schedule_queue(const queue_entry_t *qe)
return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
}
-
int schedule_pktio_start(odp_pktio_t pktio, int prio);
-
+void odp_schedule_release_context(void);
#ifdef __cplusplus
}
@@ -810,7 +810,7 @@ int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
/* Enqueuing the Packet based on the CoS */
queue = cos->s.queue;
- return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt));
+ return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
}
cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
@@ -482,7 +482,7 @@ int pktout_deq_multi(queue_entry_t *qentry ODP_UNUSED,
}
int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
- odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
+ odp_buffer_hdr_t *buf_hdr ODP_UNUSED, int sustain ODP_UNUSED)
{
ODP_ABORT("attempted enqueue to a pktin queue");
return -1;
@@ -515,14 +515,14 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
return NULL;
if (j > 1)
- queue_enq_multi(qentry, &tmp_hdr_tbl[1], j-1);
+ queue_enq_multi(qentry, &tmp_hdr_tbl[1], j - 1, 0);
buf_hdr = tmp_hdr_tbl[0];
return buf_hdr;
}
int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED,
odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
- int num ODP_UNUSED)
+ int num ODP_UNUSED, int sustain ODP_UNUSED)
{
ODP_ABORT("attempted enqueue to a pktin queue");
return 0;
@@ -560,7 +560,7 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
}
if (j)
- queue_enq_multi(qentry, tmp_hdr_tbl, j);
+ queue_enq_multi(qentry, tmp_hdr_tbl, j, 0);
return nbr;
}
@@ -601,7 +601,7 @@ int pktin_poll(pktio_entry_t *entry)
if (num_enq) {
queue_entry_t *qentry;
qentry = queue_to_qentry(entry->s.inq_default);
- queue_enq_multi(qentry, hdr_tbl, num_enq);
+ queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
}
return 0;
@@ -514,6 +514,9 @@ odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
/* By default, buffers inherit their pool's zeroization setting */
buf->buf.flags.zeroized = pool->s.flags.zeroized;
+ /* By default, buffers are not associated with an ordered queue */
+ buf->buf.origin_qe = NULL;
+
if (buf->buf.type == ODP_EVENT_PACKET)
packet_init(pool, &buf->pkt, size);
@@ -14,6 +14,7 @@
#include <odp_buffer_inlines.h>
#include <odp_internal.h>
#include <odp/shared_memory.h>
+#include <odp/schedule.h>
#include <odp_schedule_internal.h>
#include <odp/config.h>
#include <odp_packet_io_internal.h>
@@ -21,17 +22,20 @@
#include <odp_debug_internal.h>
#include <odp/hints.h>
#include <odp/sync.h>
+#include <odp_spin_internal.h>
#ifdef USE_TICKETLOCK
#include <odp/ticketlock.h>
#define LOCK(a) odp_ticketlock_lock(a)
#define UNLOCK(a) odp_ticketlock_unlock(a)
#define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a) odp_ticketlock_trylock(a)
#else
#include <odp/spinlock.h>
#define LOCK(a) odp_spinlock_lock(a)
#define UNLOCK(a) odp_spinlock_unlock(a)
#define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a) odp_spinlock_trylock(a)
#endif
#include <string.h>
@@ -73,9 +77,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.dequeue_multi = pktin_deq_multi;
break;
case ODP_QUEUE_TYPE_PKTOUT:
- queue->s.enqueue = pktout_enqueue;
+ queue->s.enqueue = queue_pktout_enq;
queue->s.dequeue = pktout_dequeue;
- queue->s.enqueue_multi = pktout_enq_multi;
+ queue->s.enqueue_multi = queue_pktout_enq_multi;
queue->s.dequeue_multi = pktout_deq_multi;
break;
default:
@@ -89,6 +93,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.head = NULL;
queue->s.tail = NULL;
+ queue->s.reorder_head = NULL;
+ queue->s.reorder_tail = NULL;
+
queue->s.pri_queue = ODP_QUEUE_INVALID;
queue->s.cmd_ev = ODP_EVENT_INVALID;
}
@@ -326,33 +333,146 @@ odp_queue_t odp_queue_lookup(const char *name)
}
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
{
int sched = 0;
+ queue_entry_t *origin_qe;
+ uint64_t order;
+ odp_buffer_hdr_t *buf_tail;
+
+ get_queue_order(&origin_qe, &order, buf_hdr);
+
+ /* Need two locks for enq operations from ordered queues */
+ if (origin_qe) {
+ LOCK(&origin_qe->s.lock);
+ while (!LOCK_TRY(&queue->s.lock)) {
+ UNLOCK(&origin_qe->s.lock);
+ LOCK(&origin_qe->s.lock);
+ }
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ } else {
+ LOCK(&queue->s.lock);
+ }
- LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
+ if (origin_qe)
+ UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
- if (queue->s.head == NULL) {
+ /* We can only complete the enq if we're in order */
+ if (origin_qe) {
+ sched_enq_called();
+ if (order > origin_qe->s.order_out) {
+ reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ /* We're in order, so account for this and proceed with enq */
+ if (!sustain) {
+ order_release(origin_qe, 1);
+ sched_order_resolved(buf_hdr);
+ }
+
+ /* if this element is linked, restore the linked chain */
+ buf_tail = buf_hdr->link;
+
+ if (buf_tail) {
+ buf_hdr->next = buf_tail;
+ buf_hdr->link = NULL;
+
+ /* find end of the chain */
+ while (buf_tail->next)
+ buf_tail = buf_tail->next;
+ } else {
+ buf_tail = buf_hdr;
+ }
+ } else {
+ buf_tail = buf_hdr;
+ }
+
+ if (!queue->s.head) {
/* Empty queue */
queue->s.head = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
} else {
queue->s.tail->next = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
}
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
}
- UNLOCK(&queue->s.lock);
+
+ /*
+ * If we came from an ordered queue, check to see if our successful
+ * enq has unblocked other buffers in the origin's reorder queue.
+ */
+ if (origin_qe) {
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *placeholder_buf;
+ int deq_count, release_count, placeholder_count;
+
+ deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
+ &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
+
+ /* Add released buffers to the queue as well */
+ if (deq_count > 0) {
+ queue->s.tail->next = origin_qe->s.reorder_head;
+ queue->s.tail = reorder_prev;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ }
+
+ /* Reflect resolved orders in the output sequence */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle any unblocked complete buffers destined for
+ * other queues. Note that these must be complete because
+ * otherwise another thread is working on it and is
+ * responsible for resolving order when it is complete.
+ */
+ UNLOCK(&queue->s.lock);
+
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out &&
+ reorder_complete(reorder_buf))
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+ UNLOCK(&origin_qe->s.lock);
+
+ if (reorder_buf)
+ queue_enq_internal(reorder_buf);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+ } else {
+ UNLOCK(&queue->s.lock);
+ }
/* Add queue to scheduling */
if (sched && schedule_queue(queue))
@@ -361,18 +481,31 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
return 0;
}
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+ int num, int sustain)
{
int sched = 0;
- int i;
+ int i, rc;
odp_buffer_hdr_t *tail;
+ queue_entry_t *origin_qe;
+ uint64_t order;
+ /* Chain input buffers together */
for (i = 0; i < num - 1; i++)
- buf_hdr[i]->next = buf_hdr[i+1];
+ buf_hdr[i]->next = buf_hdr[i + 1];
+
+ tail = buf_hdr[num - 1];
+ buf_hdr[num - 1]->next = NULL;
- tail = buf_hdr[num-1];
- buf_hdr[num-1]->next = NULL;
+ /* Handle ordered enqueues commonly via links */
+ get_queue_order(&origin_qe, &order, buf_hdr[0]);
+ if (origin_qe) {
+ buf_hdr[0]->link = buf_hdr[0]->next;
+ rc = queue_enq(queue, buf_hdr[0], sustain);
+ return rc == 0 ? num : rc;
+ }
+ /* Handle unordered enqueues */
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
@@ -415,9 +548,26 @@ int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
for (i = 0; i < num; i++)
buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
- return queue->s.enqueue_multi(queue, buf_hdr, num);
+ return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 0);
}
+int odp_queue_enq_multi_sustain(odp_queue_t handle, const odp_event_t ev[],
+ int num)
+{
+ odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
+ queue_entry_t *queue;
+ int i;
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ queue = queue_to_qentry(handle);
+
+ for (i = 0; i < num; i++)
+ buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
+
+ return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
+}
int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
{
@@ -427,9 +577,31 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
queue = queue_to_qentry(handle);
buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
- return queue->s.enqueue(queue, buf_hdr);
+ /* No chains via this entry */
+ buf_hdr->link = NULL;
+
+ return queue->s.enqueue(queue, buf_hdr, 0);
+}
+
+int odp_queue_enq_sustain(odp_queue_t handle, odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr;
+ queue_entry_t *queue;
+
+ queue = queue_to_qentry(handle);
+ buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+
+ /* No chains via this entry */
+ buf_hdr->link = NULL;
+
+ return queue->s.enqueue(queue, buf_hdr, 1);
}
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
+{
+ return buf_hdr->origin_qe->s.enqueue(buf_hdr->target_qe, buf_hdr,
+ buf_hdr->flags.sustain);
+}
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
{
@@ -450,6 +622,18 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
queue->s.head = buf_hdr->next;
buf_hdr->next = NULL;
+ /* Note that order should really be assigned on enq to an
+ * ordered queue rather than deq, however the logic is simpler
+ * to do it here and has the same effect.
+ */
+ if (queue_is_ordered(queue)) {
+ buf_hdr->origin_qe = queue;
+ buf_hdr->order = queue->s.order_in++;
+ buf_hdr->flags.sustain = 0;
+ } else {
+ buf_hdr->origin_qe = NULL;
+ }
+
if (queue->s.head == NULL) {
/* Queue is now empty */
queue->s.tail = NULL;
@@ -489,6 +673,13 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
buf_hdr[i] = hdr;
hdr = hdr->next;
buf_hdr[i]->next = NULL;
+ if (queue_is_ordered(queue)) {
+ buf_hdr[i]->origin_qe = queue;
+ buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->flags.sustain = 0;
+ } else {
+ buf_hdr[i]->origin_qe = NULL;
+ }
}
queue->s.head = hdr;
@@ -537,6 +728,170 @@ odp_event_t odp_queue_deq(odp_queue_t handle)
return ODP_EVENT_INVALID;
}
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+ int sustain)
+{
+ queue_entry_t *origin_qe;
+ uint64_t order;
+ int rc;
+
+ /* Special processing needed only if we came from an ordered queue */
+ get_queue_order(&origin_qe, &order, buf_hdr);
+ if (!origin_qe)
+ return pktout_enqueue(queue, buf_hdr);
+
+ /* Must lock origin_qe for ordered processing */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* We can only complete the enq if we're in order */
+ sched_enq_called();
+ if (order > origin_qe->s.order_out) {
+ reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ /* Perform our enq since we're in order.
+ * Note: Don't hold the origin_qe lock across an I/O operation!
+ */
+ UNLOCK(&origin_qe->s.lock);
+
+ /* Handle any chained buffers (internal calls) */
+ if (buf_hdr->link) {
+ odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+ odp_buffer_hdr_t *next_buf;
+ int num = 0;
+
+ next_buf = buf_hdr->link;
+ buf_hdr->link = NULL;
+
+ while (next_buf) {
+ buf_hdrs[num++] = next_buf;
+ next_buf = next_buf->next;
+ }
+
+ rc = pktout_enq_multi(queue, buf_hdrs, num);
+ if (rc < num)
+ return -1;
+ } else {
+ rc = pktout_enqueue(queue, buf_hdr);
+ if (!rc)
+ return rc;
+ }
+
+ /* Reacquire the lock following the I/O send. Note that we're still
+ * guaranteed to be in order here since we haven't released
+ * order yet.
+ */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* Account for this ordered enq */
+ if (!sustain) {
+ order_release(origin_qe, 1);
+ sched_order_resolved(NULL);
+ }
+
+ /* Now check to see if our successful enq has unblocked other buffers
+ * in the origin's reorder queue.
+ */
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *xmit_buf;
+ odp_buffer_hdr_t *placeholder_buf;
+ int deq_count, release_count, placeholder_count;
+
+ deq_count = reorder_deq(queue, origin_qe,
+ &reorder_buf, &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
+
+ /* Send released buffers as well */
+ if (deq_count > 0) {
+ xmit_buf = origin_qe->s.reorder_head;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ UNLOCK(&origin_qe->s.lock);
+
+ do {
+ next_buf = xmit_buf->next;
+ pktout_enqueue(queue, xmit_buf);
+ xmit_buf = next_buf;
+ } while (xmit_buf);
+
+ /* Reacquire the origin_qe lock to continue */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ }
+
+ /* Update the order sequence to reflect the deq'd elements */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle sends to other queues that are ready to go */
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out &&
+ reorder_complete(reorder_buf))
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+
+ /* We're fully done with the origin_qe at last */
+ UNLOCK(&origin_qe->s.lock);
+
+ /* Now send the next buffer to its target queue */
+ if (reorder_buf)
+ queue_enq_internal(reorder_buf);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+
+ return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+ int num, int sustain)
+{
+ int i, rc;
+ queue_entry_t *origin_qe;
+ uint64_t order;
+
+ /* If we're not ordered, handle directly */
+ get_queue_order(&origin_qe, &order, buf_hdr[0]);
+ if (!origin_qe)
+ return pktout_enq_multi(queue, buf_hdr, num);
+
+ /* Chain input buffers together */
+ for (i = 0; i < num - 1; i++)
+ buf_hdr[i]->next = buf_hdr[i + 1];
+
+ buf_hdr[num - 1]->next = NULL;
+
+ /* Handle commonly via links */
+ buf_hdr[0]->link = buf_hdr[0]->next;
+ rc = queue_pktout_enq(queue, buf_hdr[0], sustain);
+ return rc == 0 ? num : rc;
+}
void queue_lock(queue_entry_t *queue)
{
@@ -553,3 +908,85 @@ void odp_queue_param_init(odp_queue_param_t *params)
{
memset(params, 0, sizeof(odp_queue_param_t));
}
+
+/* These routines exists here rather than in odp_schedule
+ * because they operate on queue interenal structures
+ */
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+ odp_pool_t pool, int enq_called)
+{
+ odp_buffer_t placeholder_buf;
+ odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
+
+ /* Must tlock the origin queue to process the release */
+ LOCK(&origin_qe->s.lock);
+
+ /* If we are in the order we can release immediately since there can
+ * be no confusion about intermediate elements
+ */
+ if (order <= origin_qe->s.order_out) {
+ order_release(origin_qe, 1);
+
+ /* Check if this release allows us to unblock waiters.
+ * Note that we can only process complete waiters since
+ * if the sustain bit is set for a buffer this means that
+ * some other thread is working on it and will be
+ * responsible for resolving order when it is complete.
+ */
+ reorder_buf = origin_qe->s.reorder_head;
+
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out &&
+ reorder_complete(reorder_buf))
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+
+ UNLOCK(&origin_qe->s.lock);
+ if (reorder_buf)
+ queue_enq_internal(reorder_buf);
+ return 0;
+ }
+
+ /* If we are not in order we need a placeholder to represent our
+ * "place in line" unless we have issued enqs, in which case we
+ * already have a place in the reorder queue. If we need a
+ * placeholder, use an element from the same pool we were scheduled
+ * with is from, otherwise just ensure that the final element for our
+ * order is not marked sustain.
+ */
+ if (enq_called) {
+ reorder_buf = NULL;
+ next_buf = origin_qe->s.reorder_head;
+
+ while (next_buf && next_buf->order <= order) {
+ reorder_buf = next_buf;
+ next_buf = next_buf->next;
+ }
+
+ if (reorder_buf && reorder_buf->order == order) {
+ reorder_buf->flags.sustain = 0;
+ return 0;
+ }
+ }
+
+ placeholder_buf = odp_buffer_alloc(pool);
+
+ /* Can't release if no placeholder is available */
+ if (odp_unlikely(placeholder_buf == ODP_BUFFER_INVALID)) {
+ UNLOCK(&origin_qe->s.lock);
+ return -1;
+ }
+
+ placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+
+ /* Copy info to placeholder and add it to the reorder queue */
+ placeholder_buf_hdr->origin_qe = origin_qe;
+ placeholder_buf_hdr->order = order;
+ placeholder_buf_hdr->flags.sustain = 0;
+
+ reorder_enq(NULL, order, origin_qe, placeholder_buf_hdr, 0);
+
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+}
@@ -82,6 +82,10 @@ typedef struct {
odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
queue_entry_t *qe;
+ queue_entry_t *origin_qe;
+ uint64_t order;
+ odp_pool_t pool;
+ int enq_called;
int num;
int index;
int pause;
@@ -99,16 +103,10 @@ odp_thrmask_t *thread_sched_grp_mask(int index);
static void sched_local_init(void)
{
- int i;
-
memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.pri_queue = ODP_QUEUE_INVALID;
sched_local.cmd_ev = ODP_EVENT_INVALID;
- sched_local.qe = NULL;
-
- for (i = 0; i < MAX_DEQ; i++)
- sched_local.buf_hdr[i] = NULL;
}
int odp_schedule_init_global(void)
@@ -260,7 +258,7 @@ int odp_schedule_term_local(void)
return -1;
}
- odp_schedule_release_atomic();
+ odp_schedule_release_context();
sched_local_init();
return 0;
@@ -392,6 +390,27 @@ void odp_schedule_release_atomic(void)
}
}
+void odp_schedule_release_ordered(void)
+{
+ if (sched_local.origin_qe) {
+ int rc = release_order(sched_local.origin_qe,
+ sched_local.order,
+ sched_local.pool,
+ sched_local.enq_called);
+ if (rc == 0)
+ sched_local.origin_qe = NULL;
+ }
+}
+
+void odp_schedule_release_context(void)
+{
+ if (sched_local.origin_qe) {
+ release_order(sched_local.origin_qe, sched_local.order,
+ sched_local.pool, sched_local.enq_called);
+ sched_local.origin_qe = NULL;
+ } else
+ odp_schedule_release_atomic();
+}
static inline int copy_events(odp_event_t out_ev[], unsigned int max)
{
@@ -412,8 +431,6 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max)
/*
* Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
*/
static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
unsigned int max_num, unsigned int max_deq)
@@ -431,7 +448,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
return ret;
}
- odp_schedule_release_atomic();
+ odp_schedule_release_context();
if (odp_unlikely(sched_local.pause))
return 0;
@@ -497,6 +514,13 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
ODP_ABORT("schedule failed\n");
continue;
}
+
+ /* For ordered queues we want consecutive events to
+ * be dispatched to separate threads, so do not cache
+ * them locally.
+ */
+ if (queue_is_ordered(qe))
+ max_deq = 1;
num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq);
if (num < 0) {
@@ -515,7 +539,16 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
sched_local.qe = qe;
ret = copy_events(out_ev, max_num);
- if (queue_is_atomic(qe)) {
+ if (queue_is_ordered(qe)) {
+ sched_local.origin_qe = qe;
+ sched_local.order =
+ sched_local.buf_hdr[0]->order;
+ sched_local.sync =
+ sched_local.buf_hdr[0]->sync;
+ sched_local.enq_called = 0;
+ if (odp_queue_enq(pri_q, ev))
+ ODP_ABORT("schedule failed\n");
+ } else if (queue_is_atomic(qe)) {
/* Hold queue during atomic access */
sched_local.pri_queue = pri_q;
sched_local.cmd_ev = ev;
@@ -746,3 +779,21 @@ int odp_schedule_group_thrmask(odp_schedule_group_t group,
odp_spinlock_unlock(&sched->grp_lock);
return ret;
}
+
+void sched_enq_called(void)
+{
+ sched_local.enq_called = 1;
+}
+
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
+{
+ *origin_qe = sched_local.origin_qe;
+ *order = sched_local.order;
+}
+
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
+{
+ if (buf_hdr)
+ buf_hdr->origin_qe = NULL;
+ sched_local.origin_qe = NULL;
+}
@@ -76,7 +76,7 @@ static int loopback_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_tbl[],
hdr_tbl[i] = odp_buf_to_hdr(_odp_packet_to_buffer(pkt_tbl[i]));
qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
- return queue_enq_multi(qentry, hdr_tbl, len);
+ return queue_enq_multi(qentry, hdr_tbl, len, 0);
}
static int loopback_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 12 +- .../linux-generic/include/odp_packet_io_queue.h | 5 +- .../linux-generic/include/odp_queue_internal.h | 177 +++++++- .../linux-generic/include/odp_schedule_internal.h | 4 +- platform/linux-generic/odp_classification.c | 2 +- platform/linux-generic/odp_packet_io.c | 10 +- platform/linux-generic/odp_pool.c | 3 + platform/linux-generic/odp_queue.c | 471 ++++++++++++++++++++- platform/linux-generic/odp_schedule.c | 73 +++- platform/linux-generic/pktio/loop.c | 2 +- 10 files changed, 715 insertions(+), 44 deletions(-)