diff mbox

[API-NEXT,PATCHv4,6/8] linux-generic: queue: streamline and correct release_order() routine

Message ID 1447166821-24585-7-git-send-email-bill.fischofer@linaro.org
State Accepted
Commit e29fbafe209d62f21129d7a3de53df26cd6d0377
Headers show

Commit Message

Bill Fischofer Nov. 10, 2015, 2:46 p.m. UTC
Resolve the corner case of releasing order for an order that still has
events on the reorder queue. This also allows the reorder_complete()
routine to be streamlined.

This patch resolves Bug https://bugs.linaro.org/show_bug.cgi?id=1879

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |  5 +-
 platform/linux-generic/odp_queue.c                 | 57 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 14 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 6120740..a70044b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -335,8 +335,7 @@  static inline int reorder_deq(queue_entry_t *queue,
 static inline void reorder_complete(queue_entry_t *origin_qe,
 				    odp_buffer_hdr_t **reorder_buf_return,
 				    odp_buffer_hdr_t **placeholder_buf,
-				    int placeholder_append,
-				    int order_released)
+				    int placeholder_append)
 {
 	odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
 	odp_buffer_hdr_t *next_buf;
@@ -356,7 +355,7 @@  static inline void reorder_complete(queue_entry_t *origin_qe,
 
 			reorder_buf = next_buf;
 			order_release(origin_qe, 1);
-		} else if (!order_released && reorder_buf->flags.sustain) {
+		} else if (reorder_buf->flags.sustain) {
 			reorder_buf = next_buf;
 		} else {
 			*reorder_buf_return = origin_qe->s.reorder_head;
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 9cab9b2..a5e60d7 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -39,6 +39,11 @@ 
 
 #include <string.h>
 
+#define RESOLVE_ORDER 0
+#define SUSTAIN_ORDER 1
+
+#define NOAPPEND 0
+#define APPEND   1
 
 typedef struct queue_table_t {
 	queue_entry_t  queue[ODP_CONFIG_QUEUES];
@@ -521,8 +526,7 @@  int ordered_queue_enq(queue_entry_t *queue,
 	if (sched && schedule_queue(queue))
 		ODP_ABORT("schedule_queue failed\n");
 
-	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
-			 1, 0);
+	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
 	UNLOCK(&origin_qe->s.lock);
 
 	if (reorder_buf)
@@ -606,7 +610,8 @@  int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
 	for (i = 0; i < num; i++)
 		buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
 
-	return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
+	return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr,
+						     num, SUSTAIN_ORDER);
 }
 
 int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
@@ -620,7 +625,7 @@  int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 	/* No chains via this entry */
 	buf_hdr->link = NULL;
 
-	return queue->s.enqueue(queue, buf_hdr, 1);
+	return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER);
 }
 
 int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
@@ -660,7 +665,7 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 			buf_hdr->sync[i] =
 				odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
 		}
-		buf_hdr->flags.sustain = 0;
+		buf_hdr->flags.sustain = SUSTAIN_ORDER;
 	} else {
 		buf_hdr->origin_qe = NULL;
 	}
@@ -713,7 +718,7 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 					odp_atomic_fetch_inc_u64
 					(&queue->s.sync_in[j]);
 			}
-			buf_hdr[i]->flags.sustain = 0;
+			buf_hdr[i]->flags.sustain = SUSTAIN_ORDER;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
 		}
@@ -879,7 +884,7 @@  int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
 	order_release(origin_qe, release_count + placeholder_count);
 
 	/* Now handle sends to other queues that are ready to go */
-	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1, 0);
+	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
 
 	/* We're fully done with the origin_qe at last */
 	UNLOCK(&origin_qe->s.lock);
@@ -947,13 +952,43 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 	odp_buffer_t placeholder_buf;
 	odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
 
-	/* Must tlock the origin queue to process the release */
+	/* Must lock the origin queue to process the release */
 	LOCK(&origin_qe->s.lock);
 
-	/* If we are in the order we can release immediately since there can
-	 * be no confusion about intermediate elements
+	/* If we are in order we can release immediately since there can be no
+	 * confusion about intermediate elements
 	 */
 	if (order <= origin_qe->s.order_out) {
+		reorder_buf = origin_qe->s.reorder_head;
+
+		/* We're in order, however there may be one or more events on
+		 * the reorder queue that are part of this order. If that is
+		 * the case, remove them and let ordered_queue_enq() handle
+		 * them and resolve the order for us.
+		 */
+		if (reorder_buf && reorder_buf->order == order) {
+			odp_buffer_hdr_t *reorder_head = reorder_buf;
+
+			next_buf = reorder_buf->next;
+
+			while (next_buf && next_buf->order == order) {
+				reorder_buf = next_buf;
+				next_buf    = next_buf->next;
+			}
+
+			origin_qe->s.reorder_head = reorder_buf->next;
+			reorder_buf->next = NULL;
+
+			UNLOCK(&origin_qe->s.lock);
+			reorder_head->link = reorder_buf->next;
+			return ordered_queue_enq(reorder_head->target_qe,
+						 reorder_head, RESOLVE_ORDER,
+						 origin_qe, order);
+		}
+
+		/* Reorder queue has no elements for this order, so it's safe
+		 * to resolve order here
+		 */
 		order_release(origin_qe, 1);
 
 		/* Check if this release allows us to unblock waiters.  At the
@@ -965,7 +1000,7 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 		 * element(s) on the reorder queue
 		 */
 		reorder_complete(origin_qe, &reorder_buf,
-				 &placeholder_buf_hdr, 0, 1);
+				 &placeholder_buf_hdr, NOAPPEND);
 
 		/* Now safe to unlock */
 		UNLOCK(&origin_qe->s.lock);