diff mbox

[API-NEXT,PATCHv2,09/11] linux-generic: queue: implement ordered queues

Message ID 1440461669-18578-10-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 25, 2015, 12:14 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |  12 +-
 .../linux-generic/include/odp_packet_io_queue.h    |   5 +-
 .../linux-generic/include/odp_queue_internal.h     | 177 +++++++-
 .../linux-generic/include/odp_schedule_internal.h  |   4 +-
 platform/linux-generic/odp_classification.c        |   2 +-
 platform/linux-generic/odp_packet_io.c             |  10 +-
 platform/linux-generic/odp_pool.c                  |   3 +
 platform/linux-generic/odp_queue.c                 | 471 ++++++++++++++++++++-
 platform/linux-generic/odp_schedule.c              |  73 +++-
 platform/linux-generic/pktio/loop.c                |   2 +-
 10 files changed, 715 insertions(+), 44 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ae799dd..ca4d314 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,16 +103,23 @@  typedef union odp_buffer_bits_t {
 
 /* forward declaration */
 struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
-	struct odp_buffer_hdr_t *next;       /* next buf in a list */
+	struct odp_buffer_hdr_t *next;       /* next buf in a list--keep 1st */
+	union {                              /* Multi-use secondary link */
+		struct odp_buffer_hdr_t *prev;
+		struct odp_buffer_hdr_t *link;
+	};
 	odp_buffer_bits_t        handle;     /* handle */
 	union {
 		uint32_t all;
 		struct {
 			uint32_t zeroized:1; /* Zeroize buf data on free */
 			uint32_t hdrdata:1;  /* Data is in buffer hdr */
+			uint32_t sustain:1;  /* Sustain order */
 		};
 	} flags;
 	int16_t                  allocator;  /* allocating thread id */
@@ -131,6 +138,9 @@  typedef struct odp_buffer_hdr_t {
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+	uint64_t                 order;      /* sequence for ordered queues */
+	queue_entry_t           *origin_qe;  /* ordered queue origin */
+		queue_entry_t   *target_qe;  /* ordered queue target */
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h
index c3b8309..12e2b9f 100644
--- a/platform/linux-generic/include/odp_packet_io_queue.h
+++ b/platform/linux-generic/include/odp_packet_io_queue.h
@@ -27,10 +27,11 @@  extern "C" {
 _ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX,
 		   "ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
 
-int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
 odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
 
-int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+		    int sustain);
 int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 
 
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 61d0c43..163172c 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -23,6 +23,7 @@  extern "C" {
 #include <odp_align_internal.h>
 #include <odp/packet_io.h>
 #include <odp/align.h>
+#include <odp/hints.h>
 
 
 #define USE_TICKETLOCK
@@ -45,11 +46,11 @@  extern "C" {
 /* forward declaration */
 union queue_entry_u;
 
-typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
+typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *, int);
 typedef	odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
 
 typedef int (*enq_multi_func_t)(union queue_entry_u *,
-				odp_buffer_hdr_t **, int);
+				odp_buffer_hdr_t **, int, int);
 typedef	int (*deq_multi_func_t)(union queue_entry_u *,
 				odp_buffer_hdr_t **, int);
 
@@ -77,6 +78,10 @@  struct queue_entry_s {
 	odp_pktio_t       pktin;
 	odp_pktio_t       pktout;
 	char              name[ODP_QUEUE_NAME_LEN];
+	uint64_t          order_in;
+	uint64_t          order_out;
+	odp_buffer_hdr_t *reorder_head;
+	odp_buffer_hdr_t *reorder_tail;
 };
 
 typedef union queue_entry_u {
@@ -87,12 +92,20 @@  typedef union queue_entry_u {
 
 queue_entry_t *get_qentry(uint32_t queue_id);
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
+
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+		    int sustain);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+		     int sustain);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+			   odp_buffer_hdr_t *buf_hdr[], int num, int sustain);
+
 int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
 int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 			  int num);
@@ -104,6 +117,12 @@  void queue_unlock(queue_entry_t *queue);
 
 int queue_sched_atomic(odp_queue_t handle);
 
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+		  odp_pool_t pool, int enq_called);
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void sched_enq_called(void);
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
+
 static inline uint32_t queue_to_id(odp_queue_t handle)
 {
 	return _odp_typeval(handle) - 1;
@@ -127,6 +146,11 @@  static inline int queue_is_atomic(queue_entry_t *qe)
 	return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
 }
 
+static inline int queue_is_ordered(queue_entry_t *qe)
+{
+	return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
+}
+
 static inline odp_queue_t queue_handle(queue_entry_t *qe)
 {
 	return qe->s.handle;
@@ -137,6 +161,151 @@  static inline int queue_prio(queue_entry_t *qe)
 	return qe->s.param.sched.prio;
 }
 
+static inline void reorder_enq(queue_entry_t *queue,
+			       uint64_t order,
+			       queue_entry_t *origin_qe,
+			       odp_buffer_hdr_t *buf_hdr,
+			       int sustain)
+{
+	odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev =
+		(odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+	while (reorder_buf && order >= reorder_buf->order) {
+		reorder_prev = reorder_buf;
+		reorder_buf  = reorder_buf->next;
+	}
+
+	buf_hdr->next = reorder_buf;
+	reorder_prev->next = buf_hdr;
+
+	if (!reorder_buf)
+		origin_qe->s.reorder_tail = buf_hdr;
+
+	buf_hdr->origin_qe     = origin_qe;
+	buf_hdr->target_qe     = queue;
+	buf_hdr->order         = order;
+	buf_hdr->flags.sustain = sustain;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+	origin_qe->s.order_out += count;
+	odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
+}
+
+static inline int reorder_deq(queue_entry_t *queue,
+			      queue_entry_t *origin_qe,
+			      odp_buffer_hdr_t **reorder_buf_return,
+			      odp_buffer_hdr_t **reorder_prev_return,
+			      odp_buffer_hdr_t **placeholder_buf_return,
+			      int *release_count_return,
+			      int *placeholder_count_return)
+{
+	odp_buffer_hdr_t *reorder_buf     = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev    = NULL;
+	odp_buffer_hdr_t *placeholder_buf = NULL;
+	odp_buffer_hdr_t *next_buf;
+	int               deq_count = 0;
+	int               release_count = 0;
+	int               placeholder_count = 0;
+
+	while (reorder_buf &&
+	       reorder_buf->order <= origin_qe->s.order_out +
+	       release_count + placeholder_count) {
+		/*
+		 * Elements on the reorder list fall into one of
+		 * three categories:
+		 *
+		 * 1. Those destined for the same queue.  These
+		 *    can be enq'd now if they were waiting to
+		 *    be unblocked by this enq.
+		 *
+		 * 2. Those representing placeholders for events
+		 *    whose ordering was released by a prior
+		 *    odp_schedule_release_ordered() call.  These
+		 *    can now just be freed.
+		 *
+		 * 3. Those representing events destined for another
+		 *    queue. These cannot be consolidated with this
+		 *    enq since they have a different target.
+		 *
+		 * Detecting an element with an order sequence gap, an
+		 * element in category 3, or running out of elements
+		 * stops the scan.
+		 */
+		next_buf = reorder_buf->next;
+
+		if (odp_likely(reorder_buf->target_qe == queue)) {
+			/* promote any chain */
+			odp_buffer_hdr_t *reorder_link =
+				reorder_buf->link;
+
+			if (reorder_link) {
+				reorder_buf->next = reorder_link;
+				reorder_buf->link = NULL;
+				while (reorder_link->next)
+					reorder_link = reorder_link->next;
+				reorder_link->next = next_buf;
+				reorder_prev = reorder_link;
+			} else {
+				reorder_prev = reorder_buf;
+			}
+
+			deq_count++;
+			if (!reorder_buf->flags.sustain)
+				release_count++;
+			reorder_buf = next_buf;
+		} else if (!reorder_buf->target_qe) {
+			if (reorder_prev)
+				reorder_prev->next = next_buf;
+			else
+				origin_qe->s.reorder_head = next_buf;
+
+			reorder_buf->next = placeholder_buf;
+			placeholder_buf = reorder_buf;
+
+			reorder_buf = next_buf;
+			placeholder_count++;
+		} else {
+			break;
+		}
+	}
+
+	*reorder_buf_return = reorder_buf;
+	*reorder_prev_return = reorder_prev;
+	*placeholder_buf_return = placeholder_buf;
+	*release_count_return = release_count;
+	*placeholder_count_return = placeholder_count;
+
+	return deq_count;
+}
+
+static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+{
+	odp_buffer_hdr_t *next_buf = reorder_buf->next;
+	uint64_t order = reorder_buf->order;
+
+	while (reorder_buf->flags.sustain &&
+	       next_buf && next_buf->order == order) {
+		reorder_buf = next_buf;
+		next_buf = reorder_buf->next;
+	}
+
+	return !reorder_buf->flags.sustain;
+}
+
+static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
+				   odp_buffer_hdr_t *buf_hdr)
+{
+	if (buf_hdr && buf_hdr->origin_qe) {
+		*origin_qe = buf_hdr->origin_qe;
+		*order     = buf_hdr->order;
+	} else {
+		get_sched_order(origin_qe, order);
+	}
+}
+
 void queue_destroy_finalize(queue_entry_t *qe);
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h
index 4c6577d..6f9cbdc 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -15,6 +15,7 @@  extern "C" {
 
 
 #include <odp/buffer.h>
+#include <odp_buffer_internal.h>
 #include <odp/queue.h>
 #include <odp/packet_io.h>
 #include <odp_queue_internal.h>
@@ -28,9 +29,8 @@  static inline int schedule_queue(const queue_entry_t *qe)
 	return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
 }
 
-
 int schedule_pktio_start(odp_pktio_t pktio, int prio);
-
+void odp_schedule_release_context(void);
 
 #ifdef __cplusplus
 }
diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
index fdb544d..6c1aff4 100644
--- a/platform/linux-generic/odp_classification.c
+++ b/platform/linux-generic/odp_classification.c
@@ -810,7 +810,7 @@  int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
 
 	/* Enqueuing the Packet based on the CoS */
 	queue = cos->s.queue;
-	return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt));
+	return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
 }
 
 cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index 135e84f..866ae38 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -482,7 +482,7 @@  int pktout_deq_multi(queue_entry_t *qentry ODP_UNUSED,
 }
 
 int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
-		  odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
+		  odp_buffer_hdr_t *buf_hdr ODP_UNUSED, int sustain ODP_UNUSED)
 {
 	ODP_ABORT("attempted enqueue to a pktin queue");
 	return -1;
@@ -515,14 +515,14 @@  odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
 		return NULL;
 
 	if (j > 1)
-		queue_enq_multi(qentry, &tmp_hdr_tbl[1], j-1);
+		queue_enq_multi(qentry, &tmp_hdr_tbl[1], j - 1, 0);
 	buf_hdr = tmp_hdr_tbl[0];
 	return buf_hdr;
 }
 
 int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED,
 		    odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
-		    int num ODP_UNUSED)
+		    int num ODP_UNUSED, int sustain ODP_UNUSED)
 {
 	ODP_ABORT("attempted enqueue to a pktin queue");
 	return 0;
@@ -560,7 +560,7 @@  int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
 	}
 
 	if (j)
-		queue_enq_multi(qentry, tmp_hdr_tbl, j);
+		queue_enq_multi(qentry, tmp_hdr_tbl, j, 0);
 	return nbr;
 }
 
@@ -601,7 +601,7 @@  int pktin_poll(pktio_entry_t *entry)
 	if (num_enq) {
 		queue_entry_t *qentry;
 		qentry = queue_to_qentry(entry->s.inq_default);
-		queue_enq_multi(qentry, hdr_tbl, num_enq);
+		queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
 	}
 
 	return 0;
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 14221fd..30d4b2b 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -514,6 +514,9 @@  odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
 	/* By default, buffers inherit their pool's zeroization setting */
 	buf->buf.flags.zeroized = pool->s.flags.zeroized;
 
+	/* By default, buffers are not associated with an ordered queue */
+	buf->buf.origin_qe = NULL;
+
 	if (buf->buf.type == ODP_EVENT_PACKET)
 		packet_init(pool, &buf->pkt, size);
 
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 4a0df11..0227236 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@ 
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -21,17 +22,20 @@ 
 #include <odp_debug_internal.h>
 #include <odp/hints.h>
 #include <odp/sync.h>
+#include <odp_spin_internal.h>
 
 #ifdef USE_TICKETLOCK
 #include <odp/ticketlock.h>
 #define LOCK(a)      odp_ticketlock_lock(a)
 #define UNLOCK(a)    odp_ticketlock_unlock(a)
 #define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a)  odp_ticketlock_trylock(a)
 #else
 #include <odp/spinlock.h>
 #define LOCK(a)      odp_spinlock_lock(a)
 #define UNLOCK(a)    odp_spinlock_unlock(a)
 #define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a)  odp_spinlock_trylock(a)
 #endif
 
 #include <string.h>
@@ -73,9 +77,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 		queue->s.dequeue_multi = pktin_deq_multi;
 		break;
 	case ODP_QUEUE_TYPE_PKTOUT:
-		queue->s.enqueue = pktout_enqueue;
+		queue->s.enqueue = queue_pktout_enq;
 		queue->s.dequeue = pktout_dequeue;
-		queue->s.enqueue_multi = pktout_enq_multi;
+		queue->s.enqueue_multi = queue_pktout_enq_multi;
 		queue->s.dequeue_multi = pktout_deq_multi;
 		break;
 	default:
@@ -89,6 +93,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 	queue->s.head = NULL;
 	queue->s.tail = NULL;
 
+	queue->s.reorder_head = NULL;
+	queue->s.reorder_tail = NULL;
+
 	queue->s.pri_queue = ODP_QUEUE_INVALID;
 	queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -326,33 +333,146 @@  odp_queue_t odp_queue_lookup(const char *name)
 }
 
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 {
 	int sched = 0;
+	queue_entry_t *origin_qe;
+	uint64_t order;
+	odp_buffer_hdr_t *buf_tail;
+
+	get_queue_order(&origin_qe, &order, buf_hdr);
+
+	/* Need two locks for enq operations from ordered queues */
+	if (origin_qe) {
+		LOCK(&origin_qe->s.lock);
+		while (!LOCK_TRY(&queue->s.lock)) {
+			UNLOCK(&origin_qe->s.lock);
+			LOCK(&origin_qe->s.lock);
+		}
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	} else {
+		LOCK(&queue->s.lock);
+	}
 
-	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
+		if (origin_qe)
+			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
 	}
 
-	if (queue->s.head == NULL) {
+	/* We can only complete the enq if we're in order */
+	if (origin_qe) {
+		sched_enq_called();
+		if (order > origin_qe->s.order_out) {
+			reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+			/* This enq can't complete until order is restored, so
+			 * we're done here.
+			 */
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			return 0;
+		}
+
+		/* We're in order, so account for this and proceed with enq */
+		if (!sustain) {
+			order_release(origin_qe, 1);
+			sched_order_resolved(buf_hdr);
+		}
+
+		/* if this element is linked, restore the linked chain */
+		buf_tail = buf_hdr->link;
+
+		if (buf_tail) {
+			buf_hdr->next = buf_tail;
+			buf_hdr->link = NULL;
+
+			/* find end of the chain */
+			while (buf_tail->next)
+				buf_tail = buf_tail->next;
+		} else {
+			buf_tail = buf_hdr;
+		}
+	} else {
+		buf_tail = buf_hdr;
+	}
+
+	if (!queue->s.head) {
 		/* Empty queue */
 		queue->s.head = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	} else {
 		queue->s.tail->next = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
 		sched = 1; /* retval: schedule queue */
 	}
-	UNLOCK(&queue->s.lock);
+
+	/*
+	 * If we came from an ordered queue, check to see if our successful
+	 * enq has unblocked other buffers in the origin's reorder queue.
+	 */
+	if (origin_qe) {
+		odp_buffer_hdr_t *reorder_buf;
+		odp_buffer_hdr_t *next_buf;
+		odp_buffer_hdr_t *reorder_prev;
+		odp_buffer_hdr_t *placeholder_buf;
+		int               deq_count, release_count, placeholder_count;
+
+		deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
+					&reorder_prev, &placeholder_buf,
+					&release_count, &placeholder_count);
+
+		/* Add released buffers to the queue as well */
+		if (deq_count > 0) {
+			queue->s.tail->next       = origin_qe->s.reorder_head;
+			queue->s.tail             = reorder_prev;
+			origin_qe->s.reorder_head = reorder_prev->next;
+			reorder_prev->next        = NULL;
+		}
+
+		/* Reflect resolved orders in the output sequence */
+		order_release(origin_qe, release_count + placeholder_count);
+
+		/* Now handle any unblocked complete buffers destined for
+		 * other queues.  Note that these must be complete because
+		 * otherwise another thread is working on it and is
+		 * responsible for resolving order when it is complete.
+		 */
+		UNLOCK(&queue->s.lock);
+
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out &&
+		    reorder_complete(reorder_buf))
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+
+		if (reorder_buf)
+			queue_enq_internal(reorder_buf);
+
+		/* Free all placeholder bufs that are now released */
+		while (placeholder_buf) {
+			next_buf = placeholder_buf->next;
+			odp_buffer_free(placeholder_buf->handle.handle);
+			placeholder_buf = next_buf;
+		}
+	} else {
+		UNLOCK(&queue->s.lock);
+	}
 
 	/* Add queue to scheduling */
 	if (sched && schedule_queue(queue))
@@ -361,18 +481,31 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 	return 0;
 }
 
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+		    int num, int sustain)
 {
 	int sched = 0;
-	int i;
+	int i, rc;
 	odp_buffer_hdr_t *tail;
+	queue_entry_t *origin_qe;
+	uint64_t order;
 
+	/* Chain input buffers together */
 	for (i = 0; i < num - 1; i++)
-		buf_hdr[i]->next = buf_hdr[i+1];
+		buf_hdr[i]->next = buf_hdr[i + 1];
+
+	tail = buf_hdr[num - 1];
+	buf_hdr[num - 1]->next = NULL;
 
-	tail = buf_hdr[num-1];
-	buf_hdr[num-1]->next = NULL;
+	/* Handle ordered enqueues commonly via links */
+	get_queue_order(&origin_qe, &order, buf_hdr[0]);
+	if (origin_qe) {
+		buf_hdr[0]->link = buf_hdr[0]->next;
+		rc = queue_enq(queue, buf_hdr[0], sustain);
+		return rc == 0 ? num : rc;
+	}
 
+	/* Handle unordered enqueues */
 	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
@@ -415,9 +548,26 @@  int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
 	for (i = 0; i < num; i++)
 		buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
 
-	return queue->s.enqueue_multi(queue, buf_hdr, num);
+	return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 0);
 }
 
+int odp_queue_enq_multi_sustain(odp_queue_t handle, const odp_event_t ev[],
+				int num)
+{
+	odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
+	queue_entry_t *queue;
+	int i;
+
+	if (num > QUEUE_MULTI_MAX)
+		num = QUEUE_MULTI_MAX;
+
+	queue = queue_to_qentry(handle);
+
+	for (i = 0; i < num; i++)
+		buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
+
+	return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
+}
 
 int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 {
@@ -427,9 +577,31 @@  int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 	queue   = queue_to_qentry(handle);
 	buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
-	return queue->s.enqueue(queue, buf_hdr);
+	/* No chains via this entry */
+	buf_hdr->link = NULL;
+
+	return queue->s.enqueue(queue, buf_hdr, 0);
+}
+
+int odp_queue_enq_sustain(odp_queue_t handle, odp_event_t ev)
+{
+	odp_buffer_hdr_t *buf_hdr;
+	queue_entry_t *queue;
+
+	queue   = queue_to_qentry(handle);
+	buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+
+	/* No chains via this entry */
+	buf_hdr->link = NULL;
+
+	return queue->s.enqueue(queue, buf_hdr, 1);
 }
 
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
+{
+	return buf_hdr->origin_qe->s.enqueue(buf_hdr->target_qe, buf_hdr,
+					     buf_hdr->flags.sustain);
+}
 
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 {
@@ -450,6 +622,18 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	queue->s.head = buf_hdr->next;
 	buf_hdr->next = NULL;
 
+	/* Note that order should really be assigned on enq to an
+	 * ordered queue rather than deq, however the logic is simpler
+	 * to do it here and has the same effect.
+	 */
+	if (queue_is_ordered(queue)) {
+		buf_hdr->origin_qe = queue;
+		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->flags.sustain = 0;
+	} else {
+		buf_hdr->origin_qe = NULL;
+	}
+
 	if (queue->s.head == NULL) {
 		/* Queue is now empty */
 		queue->s.tail = NULL;
@@ -489,6 +673,13 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		buf_hdr[i]       = hdr;
 		hdr              = hdr->next;
 		buf_hdr[i]->next = NULL;
+		if (queue_is_ordered(queue)) {
+			buf_hdr[i]->origin_qe = queue;
+			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->flags.sustain = 0;
+		} else {
+			buf_hdr[i]->origin_qe = NULL;
+		}
 	}
 
 	queue->s.head = hdr;
@@ -537,6 +728,170 @@  odp_event_t odp_queue_deq(odp_queue_t handle)
 	return ODP_EVENT_INVALID;
 }
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+		     int sustain)
+{
+	queue_entry_t *origin_qe;
+	uint64_t order;
+	int rc;
+
+	/* Special processing needed only if we came from an ordered queue */
+	get_queue_order(&origin_qe, &order, buf_hdr);
+	if (!origin_qe)
+		return pktout_enqueue(queue, buf_hdr);
+
+	/* Must lock origin_qe for ordered processing */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* We can only complete the enq if we're in order */
+	sched_enq_called();
+	if (order > origin_qe->s.order_out) {
+		reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+		/* This enq can't complete until order is restored, so
+		 * we're done here.
+		 */
+		UNLOCK(&origin_qe->s.lock);
+		return 0;
+	}
+
+	/* Perform our enq since we're in order.
+	 * Note: Don't hold the origin_qe lock across an I/O operation!
+	 */
+	UNLOCK(&origin_qe->s.lock);
+
+	/* Handle any chained buffers (internal calls) */
+	if (buf_hdr->link) {
+		odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+		odp_buffer_hdr_t *next_buf;
+		int num = 0;
+
+		next_buf = buf_hdr->link;
+		buf_hdr->link = NULL;
+
+		while (next_buf) {
+			buf_hdrs[num++] = next_buf;
+			next_buf = next_buf->next;
+		}
+
+		rc = pktout_enq_multi(queue, buf_hdrs, num);
+		if (rc < num)
+			return -1;
+	} else {
+		rc = pktout_enqueue(queue, buf_hdr);
+		if (!rc)
+			return rc;
+	}
+
+	/* Reacquire the lock following the I/O send. Note that we're still
+	 * guaranteed to be in order here since we haven't released
+	 * order yet.
+	 */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* Account for this ordered enq */
+	if (!sustain) {
+		order_release(origin_qe, 1);
+		sched_order_resolved(NULL);
+	}
+
+	/* Now check to see if our successful enq has unblocked other buffers
+	 * in the origin's reorder queue.
+	 */
+	odp_buffer_hdr_t *reorder_buf;
+	odp_buffer_hdr_t *next_buf;
+	odp_buffer_hdr_t *reorder_prev;
+	odp_buffer_hdr_t *xmit_buf;
+	odp_buffer_hdr_t *placeholder_buf;
+	int               deq_count, release_count, placeholder_count;
+
+	deq_count = reorder_deq(queue, origin_qe,
+				&reorder_buf, &reorder_prev, &placeholder_buf,
+				&release_count, &placeholder_count);
+
+	/* Send released buffers as well */
+	if (deq_count > 0) {
+		xmit_buf = origin_qe->s.reorder_head;
+		origin_qe->s.reorder_head = reorder_prev->next;
+		reorder_prev->next = NULL;
+		UNLOCK(&origin_qe->s.lock);
+
+		do {
+			next_buf = xmit_buf->next;
+			pktout_enqueue(queue, xmit_buf);
+			xmit_buf = next_buf;
+		} while (xmit_buf);
+
+		/* Reacquire the origin_qe lock to continue */
+		LOCK(&origin_qe->s.lock);
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	}
+
+	/* Update the order sequence to reflect the deq'd elements */
+	order_release(origin_qe, release_count + placeholder_count);
+
+	/* Now handle sends to other queues that are ready to go */
+	if (reorder_buf &&
+	    reorder_buf->order <= origin_qe->s.order_out &&
+	    reorder_complete(reorder_buf))
+		origin_qe->s.reorder_head = reorder_buf->next;
+	else
+		reorder_buf = NULL;
+
+	/* We're fully done with the origin_qe at last */
+	UNLOCK(&origin_qe->s.lock);
+
+	/* Now send the next buffer to its target queue */
+	if (reorder_buf)
+		queue_enq_internal(reorder_buf);
+
+	/* Free all placeholder bufs that are now released */
+	while (placeholder_buf) {
+		next_buf = placeholder_buf->next;
+		odp_buffer_free(placeholder_buf->handle.handle);
+		placeholder_buf = next_buf;
+	}
+
+	return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+			   int num, int sustain)
+{
+	int i, rc;
+	queue_entry_t *origin_qe;
+	uint64_t order;
+
+	/* If we're not ordered, handle directly */
+	get_queue_order(&origin_qe, &order, buf_hdr[0]);
+	if (!origin_qe)
+		return pktout_enq_multi(queue, buf_hdr, num);
+
+	/* Chain input buffers together */
+	for (i = 0; i < num - 1; i++)
+		buf_hdr[i]->next = buf_hdr[i + 1];
+
+	buf_hdr[num - 1]->next = NULL;
+
+	/* Handle commonly via links */
+	buf_hdr[0]->link = buf_hdr[0]->next;
+	rc = queue_pktout_enq(queue, buf_hdr[0], sustain);
+	return rc == 0 ? num : rc;
+}
 
 void queue_lock(queue_entry_t *queue)
 {
@@ -553,3 +908,85 @@  void odp_queue_param_init(odp_queue_param_t *params)
 {
 	memset(params, 0, sizeof(odp_queue_param_t));
 }
+
+/* These routines exists here rather than in odp_schedule
+ * because they operate on queue interenal structures
+ */
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+		  odp_pool_t pool, int enq_called)
+{
+	odp_buffer_t placeholder_buf;
+	odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
+
+	/* Must tlock the origin queue to process the release */
+	LOCK(&origin_qe->s.lock);
+
+	/* If we are in the order we can release immediately since there can
+	 * be no confusion about intermediate elements
+	 */
+	if (order <= origin_qe->s.order_out) {
+		order_release(origin_qe, 1);
+
+		/* Check if this release allows us to unblock waiters.
+		 * Note that we can only process complete waiters since
+		 * if the sustain bit is set for a buffer this means that
+		 * some other thread is working on it and will be
+		 * responsible for resolving order when it is complete.
+		 */
+		reorder_buf = origin_qe->s.reorder_head;
+
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out &&
+		    reorder_complete(reorder_buf))
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+
+		UNLOCK(&origin_qe->s.lock);
+		if (reorder_buf)
+			queue_enq_internal(reorder_buf);
+		return 0;
+	}
+
+	/* If we are not in order we need a placeholder to represent our
+	 * "place in line" unless we have issued enqs, in which case we
+	 * already have a place in the reorder queue. If we need a
+	 * placeholder, use an element from the same pool we were scheduled
+	 * with is from, otherwise just ensure that the final element for our
+	 * order is not marked sustain.
+	 */
+	if (enq_called) {
+		reorder_buf = NULL;
+		next_buf    = origin_qe->s.reorder_head;
+
+		while (next_buf && next_buf->order <= order) {
+			reorder_buf = next_buf;
+			next_buf = next_buf->next;
+		}
+
+		if (reorder_buf && reorder_buf->order == order) {
+			reorder_buf->flags.sustain = 0;
+			return 0;
+		}
+	}
+
+	placeholder_buf = odp_buffer_alloc(pool);
+
+	/* Can't release if no placeholder is available */
+	if (odp_unlikely(placeholder_buf == ODP_BUFFER_INVALID)) {
+		UNLOCK(&origin_qe->s.lock);
+		return -1;
+	}
+
+	placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+
+	/* Copy info to placeholder and add it to the reorder queue */
+	placeholder_buf_hdr->origin_qe     = origin_qe;
+	placeholder_buf_hdr->order         = order;
+	placeholder_buf_hdr->flags.sustain = 0;
+
+	reorder_enq(NULL, order, origin_qe, placeholder_buf_hdr, 0);
+
+	UNLOCK(&origin_qe->s.lock);
+	return 0;
+}
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 65b7b3c..3ad5f39 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -82,6 +82,10 @@  typedef struct {
 
 	odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
 	queue_entry_t *qe;
+	queue_entry_t *origin_qe;
+	uint64_t order;
+	odp_pool_t pool;
+	int enq_called;
 	int num;
 	int index;
 	int pause;
@@ -99,16 +103,10 @@  odp_thrmask_t *thread_sched_grp_mask(int index);
 
 static void sched_local_init(void)
 {
-	int i;
-
 	memset(&sched_local, 0, sizeof(sched_local_t));
 
 	sched_local.pri_queue = ODP_QUEUE_INVALID;
 	sched_local.cmd_ev    = ODP_EVENT_INVALID;
-	sched_local.qe        = NULL;
-
-	for (i = 0; i < MAX_DEQ; i++)
-		sched_local.buf_hdr[i] = NULL;
 }
 
 int odp_schedule_init_global(void)
@@ -260,7 +258,7 @@  int odp_schedule_term_local(void)
 		return -1;
 	}
 
-	odp_schedule_release_atomic();
+	odp_schedule_release_context();
 
 	sched_local_init();
 	return 0;
@@ -392,6 +390,27 @@  void odp_schedule_release_atomic(void)
 	}
 }
 
+void odp_schedule_release_ordered(void)
+{
+	if (sched_local.origin_qe) {
+		int rc = release_order(sched_local.origin_qe,
+				       sched_local.order,
+				       sched_local.pool,
+				       sched_local.enq_called);
+		if (rc == 0)
+			sched_local.origin_qe = NULL;
+	}
+}
+
+void odp_schedule_release_context(void)
+{
+	if (sched_local.origin_qe) {
+		release_order(sched_local.origin_qe, sched_local.order,
+			      sched_local.pool, sched_local.enq_called);
+		sched_local.origin_qe = NULL;
+	} else
+		odp_schedule_release_atomic();
+}
 
 static inline int copy_events(odp_event_t out_ev[], unsigned int max)
 {
@@ -412,8 +431,6 @@  static inline int copy_events(odp_event_t out_ev[], unsigned int max)
 
 /*
  * Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
  */
 static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		    unsigned int max_num, unsigned int max_deq)
@@ -431,7 +448,7 @@  static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		return ret;
 	}
 
-	odp_schedule_release_atomic();
+	odp_schedule_release_context();
 
 	if (odp_unlikely(sched_local.pause))
 		return 0;
@@ -497,6 +514,13 @@  static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 					ODP_ABORT("schedule failed\n");
 				continue;
 			}
+
+			/* For ordered queues we want consecutive events to
+			 * be dispatched to separate threads, so do not cache
+			 * them locally.
+			 */
+			if (queue_is_ordered(qe))
+				max_deq = 1;
 			num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq);
 
 			if (num < 0) {
@@ -515,7 +539,16 @@  static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 			sched_local.qe    = qe;
 			ret = copy_events(out_ev, max_num);
 
-			if (queue_is_atomic(qe)) {
+			if (queue_is_ordered(qe)) {
+				sched_local.origin_qe = qe;
+				sched_local.order =
+					sched_local.buf_hdr[0]->order;
+				sched_local.sync =
+					sched_local.buf_hdr[0]->sync;
+				sched_local.enq_called = 0;
+				if (odp_queue_enq(pri_q, ev))
+					ODP_ABORT("schedule failed\n");
+			} else if (queue_is_atomic(qe)) {
 				/* Hold queue during atomic access */
 				sched_local.pri_queue = pri_q;
 				sched_local.cmd_ev    = ev;
@@ -746,3 +779,21 @@  int odp_schedule_group_thrmask(odp_schedule_group_t group,
 	odp_spinlock_unlock(&sched->grp_lock);
 	return ret;
 }
+
+void sched_enq_called(void)
+{
+	sched_local.enq_called = 1;
+}
+
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
+{
+	*origin_qe = sched_local.origin_qe;
+	*order     = sched_local.order;
+}
+
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
+{
+	if (buf_hdr)
+		buf_hdr->origin_qe = NULL;
+	sched_local.origin_qe = NULL;
+}
diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
index 188a9ee..8cf4034 100644
--- a/platform/linux-generic/pktio/loop.c
+++ b/platform/linux-generic/pktio/loop.c
@@ -76,7 +76,7 @@  static int loopback_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_tbl[],
 		hdr_tbl[i] = odp_buf_to_hdr(_odp_packet_to_buffer(pkt_tbl[i]));
 
 	qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
-	return queue_enq_multi(qentry, hdr_tbl, len);
+	return queue_enq_multi(qentry, hdr_tbl, len, 0);
 }
 
 static int loopback_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)