@@ -22,6 +22,7 @@ extern "C" {
#include <odp/api/hints.h>
#include <odp/api/ticketlock.h>
#include <odp_config_internal.h>
+#include <odp_ring_mpmc_internal.h>
#include <odp_ring_st_internal.h>
#include <odp_ring_spsc_internal.h>
#include <odp_queue_lf.h>
@@ -33,22 +34,29 @@ extern "C" {
#define QUEUE_STATUS_SCHED 4
struct queue_entry_s {
- odp_ticketlock_t ODP_ALIGNED_CACHE lock;
- union {
- ring_st_t ring_st;
- ring_spsc_t ring_spsc;
- };
- int status;
-
+ /* The first cache line is read only */
queue_enq_fn_t ODP_ALIGNED_CACHE enqueue;
queue_deq_fn_t dequeue;
queue_enq_multi_fn_t enqueue_multi;
queue_deq_multi_fn_t dequeue_multi;
- queue_deq_multi_fn_t orig_dequeue_multi;
+ uint32_t *ring_data;
+ uint32_t ring_mask;
+ uint32_t index;
+ odp_queue_t handle;
+ odp_queue_type_t type;
+
+ /* MPMC ring (2 cache lines). */
+ ring_mpmc_t ring_mpmc;
- uint32_t index;
- odp_queue_t handle;
- odp_queue_type_t type;
+ odp_ticketlock_t lock;
+ union {
+ ring_st_t ring_st;
+ ring_spsc_t ring_spsc;
+ };
+
+ int status;
+
+ queue_deq_multi_fn_t orig_dequeue_multi;
odp_queue_param_t param;
odp_pktin_queue_t pktin;
odp_pktout_queue_t pktout;
@@ -400,8 +400,10 @@ static int queue_destroy(odp_queue_t handle)
if (queue->s.spsc)
empty = ring_spsc_is_empty(&queue->s.ring_spsc);
- else
+ else if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
empty = ring_st_is_empty(&queue->s.ring_st);
+ else
+ empty = ring_mpmc_is_empty(&queue->s.ring_mpmc);
if (!empty) {
UNLOCK(queue);
@@ -490,28 +492,19 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle,
{
queue_entry_t *queue;
int ret, num_enq;
- ring_st_t *ring_st;
+ ring_mpmc_t *ring_mpmc;
uint32_t buf_idx[num];
queue = qentry_from_handle(handle);
- ring_st = &queue->s.ring_st;
+ ring_mpmc = &queue->s.ring_mpmc;
if (sched_fn->ord_enq_multi(handle, (void **)buf_hdr, num, &ret))
return ret;
buffer_index_from_buf(buf_idx, buf_hdr, num);
- LOCK(queue);
-
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
- UNLOCK(queue);
- ODP_ERR("Bad queue status\n");
- return -1;
- }
-
- num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
-
- UNLOCK(queue);
+ num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->s.ring_data,
+ queue->s.ring_mask, buf_idx, num);
return num_enq;
}
@@ -521,23 +514,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t handle,
{
int num_deq;
queue_entry_t *queue;
- ring_st_t *ring_st;
+ ring_mpmc_t *ring_mpmc;
uint32_t buf_idx[num];
queue = qentry_from_handle(handle);
- ring_st = &queue->s.ring_st;
+ ring_mpmc = &queue->s.ring_mpmc;
- LOCK(queue);
-
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
- /* Bad queue, or queue has been destroyed. */
- UNLOCK(queue);
- return -1;
- }
-
- num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
-
- UNLOCK(queue);
+ num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->s.ring_data,
+ queue->s.ring_mask, buf_idx, num);
if (num_deq == 0)
return 0;
@@ -883,13 +867,17 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.dequeue = plain_queue_deq;
queue->s.dequeue_multi = plain_queue_deq_multi;
queue->s.orig_dequeue_multi = plain_queue_deq_multi;
+
+ queue->s.ring_data = &queue_glb->ring_data[offset];
+ queue->s.ring_mask = queue_size - 1;
+ ring_mpmc_init(&queue->s.ring_mpmc);
+
} else {
queue->s.enqueue = sched_queue_enq;
queue->s.enqueue_multi = sched_queue_enq_multi;
+ ring_st_init(&queue->s.ring_st,
+ &queue_glb->ring_data[offset], queue_size);
}
-
- ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
- queue_size);
}
return 0;