diff mbox

[API-NEXT,8/8] linux-generic: schedule: implement ordered locks

Message ID 1440389196-28814-9-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 24, 2015, 4:06 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../include/odp/plat/schedule_types.h              |  2 --
 .../linux-generic/include/odp_buffer_internal.h    |  3 ++
 .../linux-generic/include/odp_queue_internal.h     |  2 ++
 platform/linux-generic/odp_queue.c                 | 39 ++++++++++++++++++++++
 platform/linux-generic/odp_schedule.c              |  7 ++++
 5 files changed, 51 insertions(+), 2 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp/plat/schedule_types.h b/platform/linux-generic/include/odp/plat/schedule_types.h
index f13bfab..3665fec 100644
--- a/platform/linux-generic/include/odp/plat/schedule_types.h
+++ b/platform/linux-generic/include/odp/plat/schedule_types.h
@@ -52,8 +52,6 @@  typedef int odp_schedule_group_t;
 
 #define ODP_SCHED_GROUP_NAME_LEN 32
 
-typedef int odp_schedule_olock_t;
-
 /**
  * @}
  */
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ca4d314..6badeba 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -140,7 +140,10 @@  typedef struct odp_buffer_hdr_t {
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 	uint64_t                 order;      /* sequence for ordered queues */
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
+	union {
 		queue_entry_t   *target_qe;  /* ordered queue target */
+		uint64_t         sync;       /* for ordered synchronization */
+	};
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index aab36be..4cee9b6 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -122,6 +122,7 @@  int queue_sched_atomic(odp_queue_t handle);
 int release_order(queue_entry_t *origin_qe, uint64_t order,
 		  odp_pool_t pool, int enq_called);
 void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
 void sched_enq_called(void);
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
 
@@ -193,6 +194,7 @@  static inline void reorder_enq(queue_entry_t *queue,
 static inline void order_release(queue_entry_t *origin_qe, int count)
 {
 	origin_qe->s.order_out += count;
+	odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
 }
 
 static inline int reorder_deq(queue_entry_t *queue,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 6b5eee4..8a673d0 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -626,6 +626,7 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	if (queue_is_ordered(queue)) {
 		buf_hdr->origin_qe = queue;
 		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 		buf_hdr->flags.sustain = 0;
 	} else {
 		buf_hdr->origin_qe = NULL;
@@ -673,6 +674,8 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		if (queue_is_ordered(queue)) {
 			buf_hdr[i]->origin_qe = queue;
 			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->sync =
+				odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 			buf_hdr[i]->flags.sustain = 0;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
@@ -985,3 +988,39 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 	UNLOCK(&origin_qe->s.lock);
 	return 0;
 }
+
+void odp_schedule_order_lock(void)
+{
+	queue_entry_t *origin_qe;
+	uint64_t *sync;
+
+	get_sched_sync(&origin_qe, &sync);
+	if (!origin_qe)
+		return;
+
+	/* Wait until we are in order. Note that sync_out will be incremented
+	 * both by unlocks as well as order resolution, so we're OK if only
+	 * some events in the ordered flow need to lock.
+	 */
+	while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+		odp_spin();
+}
+
+void odp_schedule_order_unlock(void)
+{
+	queue_entry_t *origin_qe;
+	uint64_t *sync;
+
+	get_sched_sync(&origin_qe, &sync);
+	if (!origin_qe)
+		return;
+
+	/* Get a new sync order for reusability, and release the lock. Note
+	 * that this must be done in this sequence to prevent race conditions
+	 * where the next waiter could lock and unlock before we're able to
+	 * get a new sync order since that would cause order inversion on
+	 * subsequent locks we may perform in this ordered context.
+	 */
+	*sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index f816460..4087730 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -84,6 +84,7 @@  typedef struct {
 	queue_entry_t *qe;
 	queue_entry_t *origin_qe;
 	uint64_t order;
+	uint64_t sync;
 	odp_pool_t pool;
 	int enq_called;
 	int num;
@@ -795,6 +796,12 @@  void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
 	*order     = sched_local.order;
 }
 
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+{
+	*origin_qe = sched_local.origin_qe;
+	*sync      = &sched_local.sync;
+}
+
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
 {
 	if (buf_hdr)