diff mbox

[API-NEXT,PATCHv12,09/13] linux-generic: queue: implement ordered queues

Message ID 1439126035-13656-10-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 9, 2015, 1:13 p.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_pool.c     |   3 +
 platform/linux-generic/odp_queue.c    | 421 +++++++++++++++++++++++++++++++---
 platform/linux-generic/odp_schedule.c |   2 -
 3 files changed, 391 insertions(+), 35 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 14221fd..30d4b2b 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -514,6 +514,9 @@  odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
 	/* By default, buffers inherit their pool's zeroization setting */
 	buf->buf.flags.zeroized = pool->s.flags.zeroized;
 
+	/* By default, buffers are not associated with an ordered queue */
+	buf->buf.origin_qe = NULL;
+
 	if (buf->buf.type == ODP_EVENT_PACKET)
 		packet_init(pool, &buf->pkt, size);
 
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 4a0df11..4d3c548 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@ 
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -21,17 +22,20 @@ 
 #include <odp_debug_internal.h>
 #include <odp/hints.h>
 #include <odp/sync.h>
+#include <odp_spin_internal.h>
 
 #ifdef USE_TICKETLOCK
 #include <odp/ticketlock.h>
 #define LOCK(a)      odp_ticketlock_lock(a)
 #define UNLOCK(a)    odp_ticketlock_unlock(a)
 #define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a)  odp_ticketlock_trylock(a)
 #else
 #include <odp/spinlock.h>
 #define LOCK(a)      odp_spinlock_lock(a)
 #define UNLOCK(a)    odp_spinlock_unlock(a)
 #define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a)  odp_spinlock_trylock(a)
 #endif
 
 #include <string.h>
@@ -73,9 +77,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 		queue->s.dequeue_multi = pktin_deq_multi;
 		break;
 	case ODP_QUEUE_TYPE_PKTOUT:
-		queue->s.enqueue = pktout_enqueue;
+		queue->s.enqueue = queue_pktout_enq;
 		queue->s.dequeue = pktout_dequeue;
-		queue->s.enqueue_multi = pktout_enq_multi;
+		queue->s.enqueue_multi = queue_pktout_enq_multi;
 		queue->s.dequeue_multi = pktout_deq_multi;
 		break;
 	default:
@@ -89,6 +93,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 	queue->s.head = NULL;
 	queue->s.tail = NULL;
 
+	queue->s.reorder_head = NULL;
+	queue->s.reorder_tail = NULL;
+
 	queue->s.pri_queue = ODP_QUEUE_INVALID;
 	queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -329,30 +336,134 @@  odp_queue_t odp_queue_lookup(const char *name)
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 {
 	int sched = 0;
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+	odp_buffer_hdr_t *buf_tail;
+
+	/* Need two locks for enq operations from ordered queues */
+	if (origin_qe) {
+		LOCK(&origin_qe->s.lock);
+		while (!LOCK_TRY(&queue->s.lock)) {
+			UNLOCK(&origin_qe->s.lock);
+			LOCK(&origin_qe->s.lock);
+		}
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	} else {
+		LOCK(&queue->s.lock);
+	}
 
-	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
+		if (origin_qe)
+			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
 	}
 
-	if (queue->s.head == NULL) {
+	/* We can only complete the enq if we're in order */
+	if (origin_qe) {
+		if (buf_hdr->order > origin_qe->s.order_out) {
+			reorder_enq(queue, origin_qe, buf_hdr);
+
+			/* This enq can't complete until order is restored, so
+			 * we're done here.
+			 */
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			return 0;
+		}
+
+		/* We're in order, so account for this and proceed with enq */
+		if (!buf_hdr->flags.sustain)
+			order_release(origin_qe, 1);
+
+		/* if this element is linked, restore the linked chain */
+		buf_tail = buf_hdr->link;
+
+		if (buf_tail) {
+			buf_hdr->next = buf_tail;
+			buf_hdr->link = NULL;
+
+			/* find end of the chain */
+			while (buf_tail->next)
+				buf_tail = buf_tail->next;
+		} else {
+			buf_tail = buf_hdr;
+		}
+	} else {
+		buf_tail = buf_hdr;
+	}
+
+	if (!queue->s.head) {
 		/* Empty queue */
 		queue->s.head = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	} else {
 		queue->s.tail->next = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
 		sched = 1; /* retval: schedule queue */
 	}
-	UNLOCK(&queue->s.lock);
+
+	/*
+	 * If we came from an ordered queue, check to see if our successful
+	 * enq has unblocked other buffers in the origin's reorder queue.
+	 */
+	if (origin_qe) {
+		odp_buffer_hdr_t *reorder_buf;
+		odp_buffer_hdr_t *next_buf;
+		odp_buffer_hdr_t *reorder_prev;
+		odp_buffer_hdr_t *placeholder_buf;
+		uint32_t          release_count;
+		uint32_t          placeholder_count;
+
+		reorder_deq(queue, origin_qe,
+			    &reorder_buf, &reorder_prev, &placeholder_buf,
+			    &release_count, &placeholder_count);
+
+		/* Add released buffers to the queue as well */
+		if (release_count > 0) {
+			queue->s.tail->next       = origin_qe->s.reorder_head;
+			queue->s.tail             = reorder_prev;
+			origin_qe->s.reorder_head = reorder_prev->next;
+			reorder_prev->next        = NULL;
+		}
+
+		/* Reflect the above two in the output sequence */
+		order_release(origin_qe, release_count + placeholder_count);
+
+		/* Now handle any unblocked buffers destined for other queues */
+		UNLOCK(&queue->s.lock);
+
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out)
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+
+		if (reorder_buf)
+			odp_queue_enq(reorder_buf->target_qe->s.handle,
+				      (odp_event_t)reorder_buf->handle.handle);
+
+		/* Free all placeholder bufs that are now released */
+		while (placeholder_buf) {
+			next_buf = placeholder_buf->next;
+			odp_buffer_free(placeholder_buf->handle.handle);
+			placeholder_buf = next_buf;
+		}
+	} else {
+		UNLOCK(&queue->s.lock);
+	}
 
 	/* Add queue to scheduling */
 	if (sched && schedule_queue(queue))
@@ -364,41 +475,83 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int sched = 0;
-	int i;
+	int i, rc, ret_count = 0;
+	int ordered_head[num];
+	int ordered_count = 0;
 	odp_buffer_hdr_t *tail;
 
-	for (i = 0; i < num - 1; i++)
-		buf_hdr[i]->next = buf_hdr[i+1];
+	/* Identify ordered chains in the input buffer list */
+	for (i = 0; i < num; i++) {
+		if (buf_hdr[i]->origin_qe)
+			ordered_head[ordered_count++] = i;
 
-	tail = buf_hdr[num-1];
-	buf_hdr[num-1]->next = NULL;
+		buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+	}
 
-	LOCK(&queue->s.lock);
-	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-		UNLOCK(&queue->s.lock);
-		ODP_ERR("Bad queue status\n");
-		return -1;
+	if (ordered_count) {
+		if (ordered_head[0] > 0) {
+			tail = buf_hdr[ordered_head[0] - 1];
+			tail->next = NULL;
+			ret_count = ordered_head[0];
+		} else {
+			tail = NULL;
+			ret_count = 0;
+		}
+	} else {
+		tail = buf_hdr[num - 1];
+		ret_count = num;
 	}
 
-	/* Empty queue */
-	if (queue->s.head == NULL)
-		queue->s.head = buf_hdr[0];
-	else
-		queue->s.tail->next = buf_hdr[0];
+	/* Handle regular enq's at start of list */
+	if (tail) {
+		LOCK(&queue->s.lock);
+		if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			ODP_ERR("Bad queue status\n");
+			return -1;
+		}
 
-	queue->s.tail = tail;
+		/* Handle empty queue */
+		if (queue->s.head)
+			queue->s.tail->next = buf_hdr[0];
+		else
+			queue->s.head = buf_hdr[0];
 
-	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-		queue->s.status = QUEUE_STATUS_SCHED;
-		sched = 1; /* retval: schedule queue */
+		queue->s.tail = tail;
+
+		if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+			queue->s.status = QUEUE_STATUS_SCHED;
+			sched = 1; /* retval: schedule queue */
+		}
+		UNLOCK(&queue->s.lock);
+
+		/* Add queue to scheduling */
+		if (sched && schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
 	}
-	UNLOCK(&queue->s.lock);
 
-	/* Add queue to scheduling */
-	if (sched && schedule_queue(queue))
-		ODP_ABORT("schedule_queue failed\n");
+	/* Handle ordered chains in the list */
+	for (i = 0; i < ordered_count; i++) {
+		int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+		int list_count = eol - i;
 
-	return num; /* All events enqueued */
+		if (i < ordered_count - 1)
+			buf_hdr[eol - 1]->next = NULL;
+
+		buf_hdr[ordered_head[i]]->link =
+			list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+		rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+		if (rc < 0)
+			return ret_count;
+
+		if (rc < list_count)
+			return ret_count + rc;
+
+		ret_count += rc;
+	}
+
+	return ret_count;
 }
 
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -427,6 +580,9 @@  int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 	queue   = queue_to_qentry(handle);
 	buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
+	/* No chains via this entry */
+	buf_hdr->link = NULL;
+
 	return queue->s.enqueue(queue, buf_hdr);
 }
 
@@ -449,6 +605,13 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	buf_hdr       = queue->s.head;
 	queue->s.head = buf_hdr->next;
 	buf_hdr->next = NULL;
+	if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+		buf_hdr->origin_qe = queue;
+		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->flags.sustain = 0;
+	} else {
+		buf_hdr->origin_qe = NULL;
+	}
 
 	if (queue->s.head == NULL) {
 		/* Queue is now empty */
@@ -489,6 +652,13 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		buf_hdr[i]       = hdr;
 		hdr              = hdr->next;
 		buf_hdr[i]->next = NULL;
+		if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+			buf_hdr[i]->origin_qe = queue;
+			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->flags.sustain = 0;
+		} else {
+			buf_hdr[i]->origin_qe = NULL;
+		}
 	}
 
 	queue->s.head = hdr;
@@ -537,6 +707,191 @@  odp_event_t odp_queue_deq(odp_queue_t handle)
 	return ODP_EVENT_INVALID;
 }
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+{
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+	int rc, sustain;
+
+	/* Special processing needed only if we came from an ordered queue */
+	if (!origin_qe)
+		return pktout_enqueue(queue, buf_hdr);
+
+	/* Must lock origin_qe for ordered processing */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* We can only complete the enq if we're in order */
+	if (buf_hdr->order > origin_qe->s.order_out) {
+		reorder_enq(queue, origin_qe, buf_hdr);
+
+		/* This enq can't complete until order is restored, so
+		 * we're done here.
+		 */
+		UNLOCK(&origin_qe->s.lock);
+		return 0;
+	}
+
+	/* Perform our enq since we're in order.
+	 * Note: Don't hold the origin_qe lock across an I/O operation!
+	 * Note that we also cache the sustain flag since the buffer may
+	 * be freed by the I/O operation so we can't reference it afterwards.
+	 */
+	UNLOCK(&origin_qe->s.lock);
+	sustain = buf_hdr->flags.sustain;
+
+	/* Handle any chained buffers (internal calls) */
+	if (buf_hdr->link) {
+		odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+		odp_buffer_hdr_t *next_buf;
+		int num = 0;
+
+		next_buf = buf_hdr->link;
+		buf_hdr->link = NULL;
+
+		while (next_buf) {
+			buf_hdrs[num++] = next_buf;
+			next_buf = next_buf->next;
+		}
+
+		rc = pktout_enq_multi(queue, buf_hdrs, num);
+		if (rc < num)
+			return -1;
+	} else {
+		rc = pktout_enqueue(queue, buf_hdr);
+		if (!rc)
+			return rc;
+	}
+
+	/* Reacquire the lock following the I/O send. Note that we're still
+	 * guaranteed to be in order here since we haven't released
+	 * order yet.
+	 */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* Account for this ordered enq */
+	if (!sustain)
+		order_release(origin_qe, 1);
+
+	/* Now check to see if our successful enq has unblocked other buffers
+	 * in the origin's reorder queue.
+	 */
+	odp_buffer_hdr_t *reorder_buf;
+	odp_buffer_hdr_t *next_buf;
+	odp_buffer_hdr_t *reorder_prev;
+	odp_buffer_hdr_t *xmit_buf;
+	odp_buffer_hdr_t *placeholder_buf;
+	uint32_t          release_count;
+	uint32_t          placeholder_count;
+
+	reorder_deq(queue, origin_qe,
+		    &reorder_buf, &reorder_prev, &placeholder_buf,
+		    &release_count, &placeholder_count);
+
+	/* Send released buffers as well */
+	if (release_count > 0) {
+		xmit_buf = origin_qe->s.reorder_head;
+		origin_qe->s.reorder_head = reorder_prev->next;
+		reorder_prev->next = NULL;
+		UNLOCK(&origin_qe->s.lock);
+
+		do {
+			next_buf = xmit_buf->next;
+			pktout_enqueue(queue, xmit_buf);
+			xmit_buf = next_buf;
+		} while (xmit_buf);
+
+		/* Reacquire the origin_qe lock to continue */
+		LOCK(&origin_qe->s.lock);
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	}
+
+	/* Update the order sequence to reflect the deq'd elements */
+	order_release(origin_qe, release_count + placeholder_count);
+
+	/* Now handle sends to other queues that are ready to go */
+	if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out)
+		origin_qe->s.reorder_head = reorder_buf->next;
+	else
+		reorder_buf = NULL;
+
+	/* We're fully done with the origin_qe at last */
+	UNLOCK(&origin_qe->s.lock);
+
+	/* Now send the next buffer to its target queue */
+	if (reorder_buf)
+		odp_queue_enq(reorder_buf->target_qe->s.handle,
+			      (odp_event_t)reorder_buf->handle.handle);
+
+	/* Free all placeholder bufs that are now released */
+	while (placeholder_buf) {
+		next_buf = placeholder_buf->next;
+		odp_buffer_free(placeholder_buf->handle.handle);
+		placeholder_buf = next_buf;
+	}
+
+	return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+			   int num)
+{
+	int i, rc, ret_count = 0;
+	int ordered_head[num];
+	int ordered_count = 0;
+
+	/* Identify ordered chains in the input buffer list */
+	for (i = 0; i < num; i++) {
+		if (buf_hdr[i]->origin_qe)
+			ordered_head[ordered_count++] = i;
+
+		buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+	}
+
+	ret_count = ordered_count ? ordered_head[0] : num;
+
+	/* Handle regular enq's at start of list */
+	if (ret_count) {
+		rc = pktout_enq_multi(queue, buf_hdr, ret_count);
+		if (rc < ret_count)
+			return rc;
+	}
+
+	/* Handle ordered chains in the list */
+	for (i = 0; i < ordered_count; i++) {
+		int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+		int list_count = eol - i;
+
+		if (i < ordered_count - 1)
+			buf_hdr[eol - 1]->next = NULL;
+
+		buf_hdr[ordered_head[i]]->link =
+			list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+		rc = queue_pktout_enq(queue, buf_hdr[ordered_head[i]]);
+		if (rc < 0)
+			return ret_count;
+
+		if (rc < list_count)
+			return ret_count + rc;
+
+		ret_count += rc;
+	}
+
+	return ret_count;
+}
 
 void queue_lock(queue_entry_t *queue)
 {
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 2a2cc1d..d595375 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -411,8 +411,6 @@  static inline int copy_events(odp_event_t out_ev[], unsigned int max)
 
 /*
  * Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
  */
 static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		    unsigned int max_num, unsigned int max_deq)