@@ -48,6 +48,28 @@ typedef struct queue_table_t {
static queue_table_t *queue_tbl;
+static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2)
+{
+ /* Special case: enq to self */
+ if (qe1 == qe2) {
+ LOCK(&qe1->s.lock);
+ return;
+ }
+
+ /* Since any queue can be either a source or target, queues do not have
+ * a natural locking hierarchy. Create one by using the qentry address
+ * as the ordering mechanism.
+ */
+
+ if (qe1 < qe2) {
+ LOCK(&qe1->s.lock);
+ LOCK(&qe2->s.lock);
+ } else {
+ LOCK(&qe2->s.lock);
+ LOCK(&qe1->s.lock);
+ }
+}
+
queue_entry_t *get_qentry(uint32_t queue_id)
{
return &queue_tbl->queue[queue_id];
@@ -370,14 +392,11 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
/* Need two locks for enq operations from ordered queues */
if (origin_qe) {
- LOCK(&origin_qe->s.lock);
- while (!LOCK_TRY(&queue->s.lock)) {
- UNLOCK(&origin_qe->s.lock);
- LOCK(&origin_qe->s.lock);
- }
+ get_qe_locks(origin_qe, queue);
if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
- UNLOCK(&origin_qe->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad origin queue status\n");
ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
queue->s.name, origin_qe->s.name, buf_hdr);
@@ -389,7 +408,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
- if (origin_qe)
+ if (origin_qe && origin_qe != queue)
UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
@@ -405,7 +424,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
* we're done here.
*/
UNLOCK(&queue->s.lock);
- UNLOCK(&origin_qe->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&origin_qe->s.lock);
return 0;
}
@@ -477,7 +497,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
/* Now handle any unblocked complete buffers destined for
* other queues, appending placeholder bufs as needed.
*/
- UNLOCK(&queue->s.lock);
+ if (origin_qe != queue)
+ UNLOCK(&queue->s.lock);
reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
1, 0);
UNLOCK(&origin_qe->s.lock);
Enqueueing to ordered queues requires two locks (source and target queues). Since any queue can be either source or target, queues do not have a natural locking hierarchy. Create one by using the address of the qentry as the lock order. This addresses the aspect of Bug https://bugs.linaro.org/show_bug.cgi?id=1879 relating to deadlock in unicore systems. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- platform/linux-generic/odp_queue.c | 39 +++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-)