diff mbox

[API-NEXT,PATCHv13,08/13] linux-generic: queue: modify structs for ordered queue support

Message ID 1439126687-19932-9-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 9, 2015, 1:24 p.m. UTC
Add additional fields to internal buffer and queue structs to enable
support for ordered queues.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |  15 ++-
 .../linux-generic/include/odp_queue_internal.h     | 121 +++++++++++++++++++++
 .../linux-generic/include/odp_schedule_internal.h  |   2 +-
 3 files changed, 136 insertions(+), 2 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ae799dd..6badeba 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,16 +103,23 @@  typedef union odp_buffer_bits_t {
 
 /* forward declaration */
 struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
-	struct odp_buffer_hdr_t *next;       /* next buf in a list */
+	struct odp_buffer_hdr_t *next;       /* next buf in a list--keep 1st */
+	union {                              /* Multi-use secondary link */
+		struct odp_buffer_hdr_t *prev;
+		struct odp_buffer_hdr_t *link;
+	};
 	odp_buffer_bits_t        handle;     /* handle */
 	union {
 		uint32_t all;
 		struct {
 			uint32_t zeroized:1; /* Zeroize buf data on free */
 			uint32_t hdrdata:1;  /* Data is in buffer hdr */
+			uint32_t sustain:1;  /* Sustain order */
 		};
 	} flags;
 	int16_t                  allocator;  /* allocating thread id */
@@ -131,6 +138,12 @@  typedef struct odp_buffer_hdr_t {
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+	uint64_t                 order;      /* sequence for ordered queues */
+	queue_entry_t           *origin_qe;  /* ordered queue origin */
+	union {
+		queue_entry_t   *target_qe;  /* ordered queue target */
+		uint64_t         sync;       /* for ordered synchronization */
+	};
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 61d0c43..11bc837 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -23,6 +23,7 @@  extern "C" {
 #include <odp_align_internal.h>
 #include <odp/packet_io.h>
 #include <odp/align.h>
+#include <odp/hints.h>
 
 
 #define USE_TICKETLOCK
@@ -77,6 +78,12 @@  struct queue_entry_s {
 	odp_pktio_t       pktin;
 	odp_pktio_t       pktout;
 	char              name[ODP_QUEUE_NAME_LEN];
+	uint64_t          order_in;
+	uint64_t          order_out;
+	odp_buffer_hdr_t *reorder_head;
+	odp_buffer_hdr_t *reorder_tail;
+	odp_atomic_u64_t  sync_in;
+	odp_atomic_u64_t  sync_out;
 };
 
 typedef union queue_entry_u {
@@ -93,6 +100,10 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+			   odp_buffer_hdr_t *buf_hdr[], int num);
+
 int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
 int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 			  int num);
@@ -137,6 +148,116 @@  static inline int queue_prio(queue_entry_t *qe)
 	return qe->s.param.sched.prio;
 }
 
+static inline void reorder_enq(queue_entry_t *queue,
+			       queue_entry_t *origin_qe,
+			       odp_buffer_hdr_t *buf_hdr)
+{
+	odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev =
+		(odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+	while (reorder_buf && buf_hdr->order >= reorder_buf->order) {
+		reorder_prev = reorder_buf;
+		reorder_buf  = reorder_buf->next;
+	}
+
+	buf_hdr->next = reorder_buf;
+	reorder_prev->next = buf_hdr;
+
+	if (!reorder_buf)
+		origin_qe->s.reorder_tail = buf_hdr;
+
+	buf_hdr->target_qe = queue;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+	origin_qe->s.order_out += count;
+}
+
+static inline void reorder_deq(queue_entry_t *queue,
+			       queue_entry_t *origin_qe,
+			       odp_buffer_hdr_t **reorder_buf_return,
+			       odp_buffer_hdr_t **reorder_prev_return,
+			       odp_buffer_hdr_t **placeholder_buf_return,
+			       uint32_t *release_count_return,
+			       uint32_t *placeholder_count_return)
+{
+	odp_buffer_hdr_t *reorder_buf     = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev    = NULL;
+	odp_buffer_hdr_t *placeholder_buf = NULL;
+	odp_buffer_hdr_t *next_buf;
+	uint32_t          release_count = 0;
+	uint32_t          placeholder_count = 0;
+
+	while (reorder_buf &&
+	       reorder_buf->order <= origin_qe->s.order_out +
+	       release_count + placeholder_count) {
+		/*
+		 * Elements on the reorder list fall into one of
+		 * three categories:
+		 *
+		 * 1. Those destined for the same queue.  These
+		 *    can be enq'd now if they were waiting to
+		 *    be unblocked by this enq.
+		 *
+		 * 2. Those representing placeholders for events
+		 *    whose ordering was released by a prior
+		 *    odp_schedule_release_ordered() call.  These
+		 *    can now just be freed.
+		 *
+		 * 3. Those representing events destined for another
+		 *    queue. These cannot be consolidated with this
+		 *    enq since they have a different target.
+		 *
+		 * Detecting an element with an order sequence gap, an
+		 * element in category 3, or running out of elements
+		 * stops the scan.
+		 */
+		next_buf = reorder_buf->next;
+
+		if (odp_likely(reorder_buf->target_qe == queue)) {
+			/* promote any chain */
+			odp_buffer_hdr_t *reorder_link =
+				reorder_buf->link;
+
+			if (reorder_link) {
+				reorder_buf->next = reorder_link;
+				reorder_buf->link = NULL;
+				while (reorder_link->next)
+					reorder_link = reorder_link->next;
+				reorder_link->next = next_buf;
+				reorder_prev = reorder_link;
+			} else {
+				reorder_prev = reorder_buf;
+			}
+
+			if (!reorder_buf->flags.sustain)
+				release_count++;
+			reorder_buf = next_buf;
+		} else if (!reorder_buf->target_qe) {
+			if (reorder_prev)
+				reorder_prev->next = next_buf;
+			else
+				origin_qe->s.reorder_head = next_buf;
+
+			reorder_buf->next = placeholder_buf;
+			placeholder_buf = reorder_buf;
+
+			reorder_buf = next_buf;
+			placeholder_count++;
+		} else {
+			break;
+		}
+	}
+
+	*reorder_buf_return = reorder_buf;
+	*reorder_prev_return = reorder_prev;
+	*placeholder_buf_return = placeholder_buf;
+	*release_count_return = release_count;
+	*placeholder_count_return = placeholder_count;
+}
+
 void queue_destroy_finalize(queue_entry_t *qe);
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h
index 4c6577d..6ea90fb 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -15,6 +15,7 @@  extern "C" {
 
 
 #include <odp/buffer.h>
+#include <odp_buffer_internal.h>
 #include <odp/queue.h>
 #include <odp/packet_io.h>
 #include <odp_queue_internal.h>
@@ -28,7 +29,6 @@  static inline int schedule_queue(const queue_entry_t *qe)
 	return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
 }
 
-
 int schedule_pktio_start(odp_pktio_t pktio, int prio);