@@ -173,6 +173,7 @@ static inline void reorder_enq(queue_entry_t *queue,
static inline void order_release(queue_entry_t *origin_qe, int count)
{
origin_qe->s.order_out += count;
+ odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
}
static inline void reorder_deq(queue_entry_t *queue,
@@ -123,6 +123,8 @@ int odp_queue_init_global(void)
/* init locks */
queue_entry_t *queue = get_qentry(i);
LOCK_INIT(&queue->s.lock);
+ odp_atomic_init_u64(&queue->s.sync_in, 0);
+ odp_atomic_init_u64(&queue->s.sync_out, 0);
queue->s.handle = queue_from_id(i);
}
@@ -608,6 +610,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
buf_hdr->origin_qe = queue;
buf_hdr->order = queue->s.order_in++;
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr->flags.sustain = 0;
} else {
buf_hdr->origin_qe = NULL;
@@ -655,6 +658,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
buf_hdr[i]->origin_qe = queue;
buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->sync =
+ odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr[i]->flags.sustain = 0;
} else {
buf_hdr[i]->origin_qe = NULL;
@@ -1007,3 +1012,32 @@ int odp_schedule_order_copy(odp_event_t src_event, odp_event_t dst_event)
UNLOCK(&origin_qe->s.lock);
return 0;
}
+
+void odp_schedule_order_lock(odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Wait until we are in order. Note that sync_out will be incremented
+ * both by unlocks as well as order resolution, so we're OK if only
+ * some events in the ordered flow need to lock.
+ */
+ while (buf_hdr->sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+ odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Get a new sync order for reusability, and release the lock. Note
+ * that this must be done in this sequence to prevent race conditions
+ * where the next waiter could lock and unlock before we're able to
+ * get a new sync order since that would cause order inversion on
+ * subsequent locks we may perform on this event in this ordered
+ * context.
+ */
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
Implement the odp_schedule_order_lock() and odp_schedule_order_unlock() routines to enable ordered synchronization within parallel processing of ordered flows. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 1 + platform/linux-generic/odp_queue.c | 34 ++++++++++++++++++++++ 2 files changed, 35 insertions(+)