diff mbox

[API-NEXT,PATCHv13,13/13] linux-generic: schedule: implement ordered locks

Message ID 1439126687-19932-14-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 9, 2015, 1:24 p.m. UTC
Implement the odp_schedule_order_lock() and odp_schedule_order_unlock()
routines to enable ordered synchronization within parallel processing
of ordered flows.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |  1 +
 platform/linux-generic/odp_queue.c                 | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 11bc837..66aa887 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -173,6 +173,7 @@  static inline void reorder_enq(queue_entry_t *queue,
 static inline void order_release(queue_entry_t *origin_qe, int count)
 {
 	origin_qe->s.order_out += count;
+	odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
 }
 
 static inline void reorder_deq(queue_entry_t *queue,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 99829b9..122146c 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -123,6 +123,8 @@  int odp_queue_init_global(void)
 		/* init locks */
 		queue_entry_t *queue = get_qentry(i);
 		LOCK_INIT(&queue->s.lock);
+		odp_atomic_init_u64(&queue->s.sync_in, 0);
+		odp_atomic_init_u64(&queue->s.sync_out, 0);
 		queue->s.handle = queue_from_id(i);
 	}
 
@@ -608,6 +610,7 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
 		buf_hdr->origin_qe = queue;
 		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 		buf_hdr->flags.sustain = 0;
 	} else {
 		buf_hdr->origin_qe = NULL;
@@ -655,6 +658,8 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
 			buf_hdr[i]->origin_qe = queue;
 			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->sync =
+				odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 			buf_hdr[i]->flags.sustain = 0;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
@@ -1007,3 +1012,32 @@  int odp_schedule_order_copy(odp_event_t src_event, odp_event_t dst_event)
 	UNLOCK(&origin_qe->s.lock);
 	return 0;
 }
+
+void odp_schedule_order_lock(odp_event_t ev)
+{
+	odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Wait until we are in order. Note that sync_out will be incremented
+	 * both by unlocks as well as order resolution, so we're OK if only
+	 * some events in the ordered flow need to lock.
+	 */
+	while (buf_hdr->sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+		odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_event_t ev)
+{
+	odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Get a new sync order for reusability, and release the lock. Note
+	 * that this must be done in this sequence to prevent race conditions
+	 * where the next waiter could lock and unlock before we're able to
+	 * get a new sync order since that would cause order inversion on
+	 * subsequent locks we may perform on this event in this ordered
+	 * context.
+	 */
+	buf_hdr->sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}