diff mbox series

[v1,2/4] linux-gen: queue: use mpmc ring in plain queues

Message ID 1535119206-23556-3-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v1,1/4] linux-gen: ring_mpmc: new multi-producer, multi-consumer ring | expand

Commit Message

Github ODP bot Aug. 24, 2018, 2 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Change plain queue implementation to use ring_mpmc instead
of ticket lock and ring_st ring. Performance and scalability
improves especially on 64 bit ARM.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 683 (psavol:master-queue-lockless-enqdeq-3)
 ** https://github.com/Linaro/odp/pull/683
 ** Patch: https://github.com/Linaro/odp/pull/683.patch
 ** Base sha: 989df5d2f97ab4711328b11282dcc743f5740e00
 ** Merge commit sha: 28073c54671148efdd01c9cf38c1a235d5a133f0
 **/
 .../include/odp_queue_basic_internal.h        | 30 +++++++-----
 platform/linux-generic/odp_queue_basic.c      | 48 +++++++------------
 2 files changed, 37 insertions(+), 41 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h
index 15e49772c..46b747955 100644
--- a/platform/linux-generic/include/odp_queue_basic_internal.h
+++ b/platform/linux-generic/include/odp_queue_basic_internal.h
@@ -22,6 +22,7 @@  extern "C" {
 #include <odp/api/hints.h>
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
+#include <odp_ring_mpmc_internal.h>
 #include <odp_ring_st_internal.h>
 #include <odp_ring_spsc_internal.h>
 #include <odp_queue_lf.h>
@@ -33,22 +34,29 @@  extern "C" {
 #define QUEUE_STATUS_SCHED        4
 
 struct queue_entry_s {
-	odp_ticketlock_t  ODP_ALIGNED_CACHE lock;
-	union {
-		ring_st_t         ring_st;
-		ring_spsc_t       ring_spsc;
-	};
-	int               status;
-
+	/* The first cache line is read only */
 	queue_enq_fn_t       ODP_ALIGNED_CACHE enqueue;
 	queue_deq_fn_t       dequeue;
 	queue_enq_multi_fn_t enqueue_multi;
 	queue_deq_multi_fn_t dequeue_multi;
-	queue_deq_multi_fn_t orig_dequeue_multi;
+	uint32_t             *ring_data;
+	uint32_t             ring_mask;
+	uint32_t             index;
+	odp_queue_t          handle;
+	odp_queue_type_t     type;
+
+	/* MPMC ring (2 cache lines). */
+	ring_mpmc_t          ring_mpmc;
 
-	uint32_t          index;
-	odp_queue_t       handle;
-	odp_queue_type_t  type;
+	odp_ticketlock_t     lock;
+	union {
+		ring_st_t    ring_st;
+		ring_spsc_t  ring_spsc;
+	};
+
+	int                  status;
+
+	queue_deq_multi_fn_t orig_dequeue_multi;
 	odp_queue_param_t param;
 	odp_pktin_queue_t pktin;
 	odp_pktout_queue_t pktout;
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index e5d915643..8b9a70bbc 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -400,8 +400,10 @@  static int queue_destroy(odp_queue_t handle)
 
 	if (queue->s.spsc)
 		empty = ring_spsc_is_empty(&queue->s.ring_spsc);
-	else
+	else if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
 		empty = ring_st_is_empty(&queue->s.ring_st);
+	else
+		empty = ring_mpmc_is_empty(&queue->s.ring_mpmc);
 
 	if (!empty) {
 		UNLOCK(queue);
@@ -490,28 +492,19 @@  static inline int _plain_queue_enq_multi(odp_queue_t handle,
 {
 	queue_entry_t *queue;
 	int ret, num_enq;
-	ring_st_t *ring_st;
+	ring_mpmc_t *ring_mpmc;
 	uint32_t buf_idx[num];
 
 	queue = qentry_from_handle(handle);
-	ring_st = &queue->s.ring_st;
+	ring_mpmc = &queue->s.ring_mpmc;
 
 	if (sched_fn->ord_enq_multi(handle, (void **)buf_hdr, num, &ret))
 		return ret;
 
 	buffer_index_from_buf(buf_idx, buf_hdr, num);
 
-	LOCK(queue);
-
-	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-		UNLOCK(queue);
-		ODP_ERR("Bad queue status\n");
-		return -1;
-	}
-
-	num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
-
-	UNLOCK(queue);
+	num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->s.ring_data,
+				      queue->s.ring_mask, buf_idx, num);
 
 	return num_enq;
 }
@@ -521,23 +514,14 @@  static inline int _plain_queue_deq_multi(odp_queue_t handle,
 {
 	int num_deq;
 	queue_entry_t *queue;
-	ring_st_t *ring_st;
+	ring_mpmc_t *ring_mpmc;
 	uint32_t buf_idx[num];
 
 	queue = qentry_from_handle(handle);
-	ring_st = &queue->s.ring_st;
+	ring_mpmc = &queue->s.ring_mpmc;
 
-	LOCK(queue);
-
-	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-		/* Bad queue, or queue has been destroyed. */
-		UNLOCK(queue);
-		return -1;
-	}
-
-	num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
-
-	UNLOCK(queue);
+	num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->s.ring_data,
+				      queue->s.ring_mask, buf_idx, num);
 
 	if (num_deq == 0)
 		return 0;
@@ -883,13 +867,17 @@  static int queue_init(queue_entry_t *queue, const char *name,
 			queue->s.dequeue            = plain_queue_deq;
 			queue->s.dequeue_multi      = plain_queue_deq_multi;
 			queue->s.orig_dequeue_multi = plain_queue_deq_multi;
+
+			queue->s.ring_data = &queue_glb->ring_data[offset];
+			queue->s.ring_mask = queue_size - 1;
+			ring_mpmc_init(&queue->s.ring_mpmc);
+
 		} else {
 			queue->s.enqueue            = sched_queue_enq;
 			queue->s.enqueue_multi      = sched_queue_enq_multi;
+			ring_st_init(&queue->s.ring_st,
+				     &queue_glb->ring_data[offset], queue_size);
 		}
-
-		ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
-			     queue_size);
 	}
 
 	return 0;