diff mbox series

[v2,2/2] lib/hash: load pData after full key compare

Message ID 20190702211634.37940-3-honnappa.nagarahalli@arm.com
State New
Headers show
Series lib/hash: perf improvements for lock-free | expand

Commit Message

Honnappa Nagarahalli July 2, 2019, 9:16 p.m. UTC
When a hash entry is added, there are 2 sets of stores.

1) The application writes its data to memory (whose address
is provided in rte_hash_add_key_with_hash_data API (or NULL))
2) The rte_hash library writes to its own internal data structures;
key store entry and the hash table.

The only ordering requirement between these 2 is that - store
to the application data must complete before the store to key_index.
There are no ordering requirements between the stores to
key/signature and store to application data. The synchronization
point for application data can be any point between the 'store to
application data' and 'store to the key_index'. So, 'pdata' should not
be a guard variable for the data in hash table. It should be a guard
variable only for the application data written to the memory location
pointed by 'pdata'. Hence, in the lookup functions, 'pdata' can be
loaded after full key comparison succeeds.

The synchronization point for the application data (store-release
to 'pdata' in key store) is changed to be consistent with the order
of loads in lookup function. However, this change is cosmetic and
does not affect the functionality.

Fixes: e605a1d36 ("hash: add lock-free r/w concurrency")
Cc: stable@dpdk.org

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

Tested-by: Ruifeng Wang <ruifeng.wang@arm.com>

---
 lib/librte_hash/rte_cuckoo_hash.c | 67 +++++++++++++++----------------
 1 file changed, 32 insertions(+), 35 deletions(-)

-- 
2.17.1
diff mbox series

Patch

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 0e042d924..55c5c1b8a 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -649,9 +649,11 @@  search_and_update(const struct rte_hash *h, void *data, const void *key,
 			k = (struct rte_hash_key *) ((char *)keys +
 					bkt->key_idx[i] * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				/* 'pdata' acts as the synchronization point
-				 * when an existing hash entry is updated.
-				 * Key is not updated in this case.
+				/* The store to application data at *data
+				 * should not leak after the store to pdata
+				 * in the key store. i.e. pdata is the guard
+				 * variable. Release the application data
+				 * to the readers.
 				 */
 				__atomic_store_n(&k->pdata,
 					data,
@@ -711,11 +713,10 @@  rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
 		/* Check if slot is available */
 		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
 			prim_bkt->sig_current[i] = sig;
-			/* Key can be of arbitrary length, so it is
-			 * not possible to store it atomically.
-			 * Hence the new key element's memory stores
-			 * (key as well as data) should be complete
-			 * before it is referenced.
+			/* Store to signature and key should not
+			 * leak after the store to key_idx. i.e.
+			 * key_idx is the guard variable for signature
+			 * and key.
 			 */
 			__atomic_store_n(&prim_bkt->key_idx[i],
 					 new_idx,
@@ -990,17 +991,15 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 
 	new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
 	new_idx = (uint32_t)((uintptr_t) slot_id);
-	/* Copy key */
-	memcpy(new_k->key, key, h->key_len);
-	/* Key can be of arbitrary length, so it is not possible to store
-	 * it atomically. Hence the new key element's memory stores
-	 * (key as well as data) should be complete before it is referenced.
-	 * 'pdata' acts as the synchronization point when an existing hash
-	 * entry is updated.
+	/* The store to application data (by the application) at *data should
+	 * not leak after the store of pdata in the key store. i.e. pdata is
+	 * the guard variable. Release the application data to the readers.
 	 */
 	__atomic_store_n(&new_k->pdata,
 		data,
 		__ATOMIC_RELEASE);
+	/* Copy key */
+	memcpy(new_k->key, key, h->key_len);
 
 	/* Find an empty slot and insert */
 	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
@@ -1064,8 +1063,10 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 			/* Check if slot is available */
 			if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) {
 				cur_bkt->sig_current[i] = short_sig;
-				/* Store to signature should not leak after
-				 * the store to key_idx
+				/* Store to signature and key should not
+				 * leak after the store to key_idx. i.e.
+				 * key_idx is the guard variable for signature
+				 * and key.
 				 */
 				__atomic_store_n(&cur_bkt->key_idx[i],
 						 new_idx,
@@ -1087,8 +1088,9 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1;
 	/* Use the first location of the new bucket */
 	(h->buckets_ext[bkt_id]).sig_current[0] = short_sig;
-	/* Store to signature should not leak after
-	 * the store to key_idx
+	/* Store to signature and key should not leak after
+	 * the store to key_idx. i.e. key_idx is the guard variable
+	 * for signature and key.
 	 */
 	__atomic_store_n(&(h->buckets_ext[bkt_id]).key_idx[0],
 			 new_idx,
@@ -1184,7 +1186,6 @@  search_one_bucket_lf(const struct rte_hash *h, const void *key, uint16_t sig,
 {
 	int i;
 	uint32_t key_idx;
-	void *pdata;
 	struct rte_hash_key *k, *keys = h->key_store;
 
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
@@ -1201,12 +1202,13 @@  search_one_bucket_lf(const struct rte_hash *h, const void *key, uint16_t sig,
 			if (key_idx != EMPTY_SLOT) {
 				k = (struct rte_hash_key *) ((char *)keys +
 						key_idx * h->key_entry_size);
-				pdata = __atomic_load_n(&k->pdata,
-						__ATOMIC_ACQUIRE);
 
 				if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-					if (data != NULL)
-						*data = pdata;
+					if (data != NULL) {
+						*data = __atomic_load_n(
+							&k->pdata,
+							__ATOMIC_ACQUIRE);
+					}
 					/*
 					 * Return index where key is stored,
 					 * subtracting the first dummy index
@@ -1904,7 +1906,6 @@  __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
 	uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	struct rte_hash_bucket *cur_bkt, *next_bkt;
-	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
 	uint32_t cnt_b, cnt_a;
 
 	/* Prefetch first keys */
@@ -2006,10 +2007,6 @@  __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
 					(const char *)h->key_store +
 					key_idx * h->key_entry_size);
 
-				if (key_idx != EMPTY_SLOT)
-					pdata[i] = __atomic_load_n(
-							&key_slot->pdata,
-							__ATOMIC_ACQUIRE);
 				/*
 				 * If key index is 0, do not compare key,
 				 * as it is checking the dummy slot
@@ -2018,7 +2015,9 @@  __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
 					!rte_hash_cmp_eq(
 						key_slot->key, keys[i], h)) {
 					if (data != NULL)
-						data[i] = pdata[i];
+						data[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
 
 					hits |= 1ULL << i;
 					positions[i] = key_idx - 1;
@@ -2040,10 +2039,6 @@  __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
 					(const char *)h->key_store +
 					key_idx * h->key_entry_size);
 
-				if (key_idx != EMPTY_SLOT)
-					pdata[i] = __atomic_load_n(
-							&key_slot->pdata,
-							__ATOMIC_ACQUIRE);
 				/*
 				 * If key index is 0, do not compare key,
 				 * as it is checking the dummy slot
@@ -2053,7 +2048,9 @@  __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
 					!rte_hash_cmp_eq(
 						key_slot->key, keys[i], h)) {
 					if (data != NULL)
-						data[i] = pdata[i];
+						data[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
 
 					hits |= 1ULL << i;
 					positions[i] = key_idx - 1;