@@ -140,7 +140,10 @@ typedef struct odp_buffer_hdr_t {
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
uint64_t order; /* sequence for ordered queues */
queue_entry_t *origin_qe; /* ordered queue origin */
+ union {
queue_entry_t *target_qe; /* ordered queue target */
+ uint64_t sync; /* for ordered synchronization */
+ };
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
@@ -82,6 +82,8 @@ struct queue_entry_s {
uint64_t order_out;
odp_buffer_hdr_t *reorder_head;
odp_buffer_hdr_t *reorder_tail;
+ odp_atomic_u64_t sync_in;
+ odp_atomic_u64_t sync_out;
};
typedef union queue_entry_u {
@@ -120,6 +122,7 @@ int queue_sched_atomic(odp_queue_t handle);
int release_order(queue_entry_t *origin_qe, uint64_t order,
odp_pool_t pool, int enq_called);
void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
void sched_enq_called(void);
void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
@@ -123,6 +123,8 @@ int odp_queue_init_global(void)
/* init locks */
queue_entry_t *queue = get_qentry(i);
LOCK_INIT(&queue->s.lock);
+ odp_atomic_init_u64(&queue->s.sync_in, 0);
+ odp_atomic_init_u64(&queue->s.sync_out, 0);
queue->s.handle = queue_from_id(i);
}
@@ -629,6 +631,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
if (queue_is_ordered(queue)) {
buf_hdr->origin_qe = queue;
buf_hdr->order = queue->s.order_in++;
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr->flags.sustain = 0;
} else {
buf_hdr->origin_qe = NULL;
@@ -676,6 +679,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
if (queue_is_ordered(queue)) {
buf_hdr[i]->origin_qe = queue;
buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->sync =
+ odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr[i]->flags.sustain = 0;
} else {
buf_hdr[i]->origin_qe = NULL;
@@ -990,3 +995,39 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
UNLOCK(&origin_qe->s.lock);
return 0;
}
+
+void odp_schedule_order_lock(void)
+{
+ queue_entry_t *origin_qe;
+ uint64_t *sync;
+
+ get_sched_sync(&origin_qe, &sync);
+ if (!origin_qe)
+ return;
+
+ /* Wait until we are in order. Note that sync_out will be incremented
+ * both by unlocks as well as order resolution, so we're OK if only
+ * some events in the ordered flow need to lock.
+ */
+ while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+ odp_spin();
+}
+
+void odp_schedule_order_unlock(void)
+{
+ queue_entry_t *origin_qe;
+ uint64_t *sync;
+
+ get_sched_sync(&origin_qe, &sync);
+ if (!origin_qe)
+ return;
+
+ /* Get a new sync order for reusability, and release the lock. Note
+ * that this must be done in this sequence to prevent race conditions
+ * where the next waiter could lock and unlock before we're able to
+ * get a new sync order since that would cause order inversion on
+ * subsequent locks we may perform in this ordered context.
+ */
+ *sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
@@ -84,6 +84,7 @@ typedef struct {
queue_entry_t *qe;
queue_entry_t *origin_qe;
uint64_t order;
+ uint64_t sync;
odp_pool_t pool;
int enq_called;
int num;
@@ -796,6 +797,12 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
*order = sched_local.order;
}
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+{
+ *origin_qe = sched_local.origin_qe;
+ *sync = &sched_local.sync;
+}
+
void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
{
if (buf_hdr)
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 3 ++ .../linux-generic/include/odp_queue_internal.h | 3 ++ platform/linux-generic/odp_queue.c | 41 ++++++++++++++++++++++ platform/linux-generic/odp_schedule.c | 7 ++++ 4 files changed, 54 insertions(+)