@@ -29,31 +29,27 @@ extern "C" {
typedef struct {
odp_atomic_u32_t head;
odp_atomic_u32_t tail;
- uint32_t mask;
- uint32_t *data;
} ring_spsc_t;
/* Initialize ring. Ring size must be a power of two. */
-static inline void ring_spsc_init(ring_spsc_t *ring, uint32_t *data,
- uint32_t size)
+static inline void ring_spsc_init(ring_spsc_t *ring)
{
odp_atomic_init_u32(&ring->head, 0);
odp_atomic_init_u32(&ring->tail, 0);
- ring->mask = size - 1;
- ring->data = data;
}
/* Dequeue data from the ring head. Max_num is smaller than ring size.*/
-static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
+static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring,
+ uint32_t *ring_data,
+ uint32_t ring_mask, uint32_t data[],
uint32_t max_num)
{
- uint32_t head, tail, mask, idx;
+ uint32_t head, tail, idx;
uint32_t num, i;
tail = odp_atomic_load_acq_u32(&ring->tail);
head = odp_atomic_load_u32(&ring->head);
- mask = ring->mask;
num = tail - head;
/* Empty */
@@ -63,11 +59,11 @@ static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
if (num > max_num)
num = max_num;
- idx = head & mask;
+ idx = head & ring_mask;
for (i = 0; i < num; i++) {
- data[i] = ring->data[idx];
- idx = (idx + 1) & mask;
+ data[i] = ring_data[idx];
+ idx = (idx + 1) & ring_mask;
}
odp_atomic_store_rel_u32(&ring->head, head + num);
@@ -77,16 +73,17 @@ static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
/* Enqueue data into the ring tail. Num_data is smaller than ring size. */
static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring,
+ uint32_t *ring_data,
+ uint32_t ring_mask,
const uint32_t data[],
uint32_t num_data)
{
- uint32_t head, tail, mask, size, idx;
+ uint32_t head, tail, size, idx;
uint32_t num, i;
head = odp_atomic_load_acq_u32(&ring->head);
tail = odp_atomic_load_u32(&ring->tail);
- mask = ring->mask;
- size = mask + 1;
+ size = ring_mask + 1;
num = size - (tail - head);
/* Full */
@@ -96,11 +93,11 @@ static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring,
if (num > num_data)
num = num_data;
- idx = tail & mask;
+ idx = tail & ring_mask;
for (i = 0; i < num; i++) {
- ring->data[idx] = data[i];
- idx = (idx + 1) & mask;
+ ring_data[idx] = data[i];
+ idx = (idx + 1) & ring_mask;
}
odp_atomic_store_rel_u32(&ring->tail, tail + num);
@@ -49,7 +49,8 @@ static inline int spsc_enq_multi(odp_queue_t handle,
return -1;
}
- return ring_spsc_enq_multi(ring_spsc, buf_idx, num);
+ return ring_spsc_enq_multi(ring_spsc, queue->s.ring_data,
+ queue->s.ring_mask, buf_idx, num);
}
static inline int spsc_deq_multi(odp_queue_t handle,
@@ -68,7 +69,8 @@ static inline int spsc_deq_multi(odp_queue_t handle,
return -1;
}
- num_deq = ring_spsc_deq_multi(ring_spsc, buf_idx, num);
+ num_deq = ring_spsc_deq_multi(ring_spsc, queue->s.ring_data,
+ queue->s.ring_mask, buf_idx, num);
if (num_deq == 0)
return 0;
@@ -127,6 +129,7 @@ void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
- ring_spsc_init(&queue->s.ring_spsc, &queue_glb->ring_data[offset],
- queue_size);
+ queue->s.ring_data = &queue_glb->ring_data[offset];
+ queue->s.ring_mask = queue_size - 1;
+ ring_spsc_init(&queue->s.ring_spsc);
}