@@ -48,6 +48,36 @@ typedef struct queue_table_t {
static queue_table_t *queue_tbl;
+static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2)
+{
+ int i;
+
+ /* Special case: enq to self */
+ if (qe1 == qe2) {
+ LOCK(&qe1->s.lock);
+ return;
+ }
+
+ /* Enq to another queue. Issue is that since any queue can be either
+ * origin or target we can't have a static lock hierarchy. Strategy is
+ * to get one lock then attempt to get the other. If the second lock
+ * attempt fails, release the first and try again. Note that in single
+ * CPU mode we require the explicit yield since otherwise we may never
+ * resolve unless the scheduler happens to timeslice exactly when we
+ * hold no lock.
+ */
+ while (1) {
+ for (i = 0; i < 10; i++) {
+ LOCK(&qe1->s.lock);
+ if (LOCK_TRY(&qe2->s.lock))
+ return;
+ UNLOCK(&qe1->s.lock);
+ odp_sync_stores();
+ }
+ sched_yield();
+ }
+}
+
queue_entry_t *get_qentry(uint32_t queue_id)
{
return &queue_tbl->queue[queue_id];
@@ -370,14 +400,11 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
/* Need two locks for enq operations from ordered queues */
if (origin_qe) {
- LOCK(&origin_qe->s.lock);
- while (!LOCK_TRY(&queue->s.lock)) {
- UNLOCK(&origin_qe->s.lock);
- LOCK(&origin_qe->s.lock);
- }
+ get_qe_locks(origin_qe, queue);
if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
- UNLOCK(&origin_qe->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad origin queue status\n");
ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
queue->s.name, origin_qe->s.name, buf_hdr);
@@ -389,7 +416,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
- if (origin_qe)
+ if (origin_qe && origin_qe != queue)
UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
@@ -405,7 +432,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
* we're done here.
*/
UNLOCK(&queue->s.lock);
- UNLOCK(&origin_qe->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&origin_qe->s.lock);
return 0;
}
@@ -477,7 +505,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
/* Now handle any unblocked complete buffers destined for
* other queues, appending placeholder bufs as needed.
*/
- UNLOCK(&queue->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&queue->s.lock);
reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
1, 0);
UNLOCK(&origin_qe->s.lock);