Message ID | 20190423043130.18153-2-honnappa.nagarahalli@arm.com |
---|---|
State | Superseded |
Headers | show |
Series | lib/rcu: add RCU library supporting QSBR mechanism | expand |
On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote: > Add RCU library supporting quiescent state based memory reclamation method. > This library helps identify the quiescent state of the reader threads so > that the writers can free the memory associated with the lock less data > structures. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > Reviewed-by: Steve Capper <steve.capper@arm.com> > Reviewed-by: Gavin Hu <gavin.hu@arm.com> > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com> > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com> Much better! Acked-by: Paul E. McKenney <paulmck@linux.ibm.com> > --- > MAINTAINERS | 5 + > config/common_base | 6 + > lib/Makefile | 2 + > lib/librte_rcu/Makefile | 23 ++ > lib/librte_rcu/meson.build | 7 + > lib/librte_rcu/rte_rcu_qsbr.c | 268 ++++++++++++ > lib/librte_rcu/rte_rcu_qsbr.h | 639 +++++++++++++++++++++++++++++ > lib/librte_rcu/rte_rcu_version.map | 12 + > lib/meson.build | 2 +- > mk/rte.app.mk | 1 + > 10 files changed, 964 insertions(+), 1 deletion(-) > create mode 100644 lib/librte_rcu/Makefile > create mode 100644 lib/librte_rcu/meson.build > create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c > create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h > create mode 100644 lib/librte_rcu/rte_rcu_version.map > > diff --git a/MAINTAINERS b/MAINTAINERS > index a08583471..ae54f37db 100644 > --- a/MAINTAINERS > +++ b/MAINTAINERS > @@ -1274,6 +1274,11 @@ F: examples/bpf/ > F: app/test/test_bpf.c > F: doc/guides/prog_guide/bpf_lib.rst > > +RCU - EXPERIMENTAL > +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > +F: lib/librte_rcu/ > +F: doc/guides/prog_guide/rcu_lib.rst > + > > Test Applications > ----------------- > diff --git a/config/common_base b/config/common_base > index 7fb0dedb6..f50d26c30 100644 > --- a/config/common_base > +++ b/config/common_base > @@ -834,6 +834,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y > # > CONFIG_RTE_LIBRTE_TELEMETRY=n > > +# > +# Compile librte_rcu > +# > +CONFIG_RTE_LIBRTE_RCU=y > +CONFIG_RTE_LIBRTE_RCU_DEBUG=n > + > # > # Compile librte_lpm > # > diff --git a/lib/Makefile b/lib/Makefile > index 26021d0c0..791e0d991 100644 > --- a/lib/Makefile > +++ b/lib/Makefile > @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec > DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security > DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry > DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev > +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu > +DEPDIRS-librte_rcu := librte_eal > > ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) > DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni > diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile > new file mode 100644 > index 000000000..6aa677bd1 > --- /dev/null > +++ b/lib/librte_rcu/Makefile > @@ -0,0 +1,23 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright(c) 2018 Arm Limited > + > +include $(RTE_SDK)/mk/rte.vars.mk > + > +# library name > +LIB = librte_rcu.a > + > +CFLAGS += -DALLOW_EXPERIMENTAL_API > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 > +LDLIBS += -lrte_eal > + > +EXPORT_MAP := rte_rcu_version.map > + > +LIBABIVER := 1 > + > +# all source are stored in SRCS-y > +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c > + > +# install includes > +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h > + > +include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build > new file mode 100644 > index 000000000..0c2d5a2e0 > --- /dev/null > +++ b/lib/librte_rcu/meson.build > @@ -0,0 +1,7 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright(c) 2018 Arm Limited > + > +allow_experimental_apis = true > + > +sources = files('rte_rcu_qsbr.c') > +headers = files('rte_rcu_qsbr.h') > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c > new file mode 100644 > index 000000000..50ef0ba9a > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -0,0 +1,268 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2018 Arm Limited > + */ > + > +#include <stdio.h> > +#include <string.h> > +#include <stdint.h> > +#include <errno.h> > + > +#include <rte_common.h> > +#include <rte_log.h> > +#include <rte_memory.h> > +#include <rte_malloc.h> > +#include <rte_eal.h> > +#include <rte_eal_memconfig.h> > +#include <rte_atomic.h> > +#include <rte_per_lcore.h> > +#include <rte_lcore.h> > +#include <rte_errno.h> > + > +#include "rte_rcu_qsbr.h" > + > +/* Get the memory size of QSBR variable */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) > +{ > + size_t sz; > + > + if (max_threads == 0) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid max_threads %u\n", > + __func__, max_threads); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + sz = sizeof(struct rte_rcu_qsbr); > + > + /* Add the size of quiescent state counter array */ > + sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; > + > + /* Add the size of the registered thread ID bitmap array */ > + sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads); > + > + return sz; > +} > + > +/* Initialize a quiescent state variable */ > +int __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) > +{ > + size_t sz; > + > + if (v == NULL) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + sz = rte_rcu_qsbr_get_memsize(max_threads); > + if (sz == 1) > + return 1; > + > + /* Set all the threads to offline */ > + memset(v, 0, sz); > + v->max_threads = max_threads; > + v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads, > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / > + RTE_QSBR_THRID_ARRAY_ELM_SIZE; > + v->token = RTE_QSBR_CNT_INIT; > + > + return 0; > +} > + > +/* Register a reader thread to report its quiescent state > + * on a QS variable. > + */ > +int __rte_experimental > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + unsigned int i, id, success; > + uint64_t old_bmap, new_bmap; > + > + if (v == NULL || thread_id >= v->max_threads) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + id = thread_id & RTE_QSBR_THRID_MASK; > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure that the counter for registered threads does not > + * go out of sync. Hence, additional checks are required. > + */ > + /* Check if the thread is already registered */ > + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > + __ATOMIC_RELAXED); > + if (old_bmap & 1UL << id) > + return 0; > + > + do { > + new_bmap = old_bmap | (1UL << id); > + success = __atomic_compare_exchange( > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > + &old_bmap, &new_bmap, 0, > + __ATOMIC_RELEASE, __ATOMIC_RELAXED); > + > + if (success) > + __atomic_fetch_add(&v->num_threads, > + 1, __ATOMIC_RELAXED); > + else if (old_bmap & (1UL << id)) > + /* Someone else registered this thread. > + * Counter should not be incremented. > + */ > + return 0; > + } while (success == 0); > + > + return 0; > +} > + > +/* Remove a reader thread, from the list of threads reporting their > + * quiescent state on a QS variable. > + */ > +int __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + unsigned int i, id, success; > + uint64_t old_bmap, new_bmap; > + > + if (v == NULL || thread_id >= v->max_threads) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + id = thread_id & RTE_QSBR_THRID_MASK; > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure that the counter for registered threads does not > + * go out of sync. Hence, additional checks are required. > + */ > + /* Check if the thread is already unregistered */ > + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > + __ATOMIC_RELAXED); > + if (old_bmap & ~(1UL << id)) > + return 0; > + > + do { > + new_bmap = old_bmap & ~(1UL << id); > + /* Make sure any loads of the shared data structure are > + * completed before removal of the thread from the list of > + * reporting threads. > + */ > + success = __atomic_compare_exchange( > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > + &old_bmap, &new_bmap, 0, > + __ATOMIC_RELEASE, __ATOMIC_RELAXED); > + > + if (success) > + __atomic_fetch_sub(&v->num_threads, > + 1, __ATOMIC_RELAXED); > + else if (old_bmap & ~(1UL << id)) > + /* Someone else unregistered this thread. > + * Counter should not be incremented. > + */ > + return 0; > + } while (success == 0); > + > + return 0; > +} > + > +/* Wait till the reader threads have entered quiescent state. */ > +void __rte_experimental > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + uint64_t t; > + > + RTE_ASSERT(v != NULL); > + > + t = rte_rcu_qsbr_start(v); > + > + /* If the current thread has readside critical section, > + * update its quiescent state status. > + */ > + if (thread_id != RTE_QSBR_THRID_INVALID) > + rte_rcu_qsbr_quiescent(v, thread_id); > + > + /* Wait for other readers to enter quiescent state */ > + rte_rcu_qsbr_check(v, t, true); > +} > + > +/* Dump the details of a single quiescent state variable to a file. */ > +int __rte_experimental > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > +{ > + uint64_t bmap; > + uint32_t i, t, id; > + > + if (v == NULL || f == NULL) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + fprintf(f, "\nQuiescent State Variable @%p\n", v); > + > + fprintf(f, " QS variable memory size = %lu\n", > + rte_rcu_qsbr_get_memsize(v->max_threads)); > + fprintf(f, " Given # max threads = %u\n", v->max_threads); > + fprintf(f, " Current # threads = %u\n", v->num_threads); > + > + fprintf(f, " Registered thread ID mask = 0x"); > + for (i = 0; i < v->num_elems; i++) > + fprintf(f, "%lx", __atomic_load_n( > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > + __ATOMIC_ACQUIRE)); > + fprintf(f, "\n"); > + > + fprintf(f, " Token = %lu\n", > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); > + > + fprintf(f, "Quiescent State Counts for readers:\n"); > + for (i = 0; i < v->num_elems; i++) { > + bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > + __ATOMIC_ACQUIRE); > + id = i << RTE_QSBR_THRID_INDEX_SHIFT; > + while (bmap) { > + t = __builtin_ctzl(bmap); > + fprintf(f, "thread ID = %d, count = %lu, lock count = %u\n", > + id + t, > + __atomic_load_n( > + &v->qsbr_cnt[id + t].cnt, > + __ATOMIC_RELAXED), > + __atomic_load_n( > + &v->qsbr_cnt[id + t].lock_cnt, > + __ATOMIC_RELAXED)); > + bmap &= ~(1UL << t); > + } > + } > + > + return 0; > +} > + > +int rcu_log_type; > + > +RTE_INIT(rte_rcu_register) > +{ > + rcu_log_type = rte_log_register("lib.rcu"); > + if (rcu_log_type >= 0) > + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); > +} > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h > new file mode 100644 > index 000000000..58fb95ed0 > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -0,0 +1,639 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2018 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_H_ > +#define _RTE_RCU_QSBR_H_ > + > +/** > + * @file > + * RTE Quiescent State Based Reclamation (QSBR) > + * > + * Quiescent State (QS) is any point in the thread execution > + * where the thread does not hold a reference to a data structure > + * in shared memory. While using lock-less data structures, the writer > + * can safely free memory once all the reader threads have entered > + * quiescent state. > + * > + * This library provides the ability for the readers to report quiescent > + * state and for the writers to identify when all the readers have > + * entered quiescent state. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdio.h> > +#include <stdint.h> > +#include <errno.h> > +#include <rte_common.h> > +#include <rte_memory.h> > +#include <rte_lcore.h> > +#include <rte_debug.h> > +#include <rte_atomic.h> > + > +extern int rcu_log_type; > + > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG > +#define RCU_DP_LOG(level, fmt, args...) \ > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > + "%s(): " fmt "\n", __func__, ## args) > +#else > +#define RCU_DP_LOG(level, fmt, args...) > +#endif > + > +#if defined(RTE_LIBRTE_RCU_DEBUG) > +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\ > + if (v->qsbr_cnt[thread_id].lock_cnt) \ > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > + "%s(): " fmt "\n", __func__, ## args); \ > +} while (0) > +#else > +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) > +#endif > + > +/* Registered thread IDs are stored as a bitmap of 64b element array. > + * Given thread id needs to be converted to index into the array and > + * the id within the array element. > + */ > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) > +#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \ > + RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \ > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE) > +#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \ > + ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 > +#define RTE_QSBR_THRID_MASK 0x3f > +#define RTE_QSBR_THRID_INVALID 0xffffffff > + > +/* Worker thread counter */ > +struct rte_rcu_qsbr_cnt { > + uint64_t cnt; > + /**< Quiescent state counter. Value 0 indicates the thread is offline > + * 64b counter is used to avoid adding more code to address > + * counter overflow. Changing this to 32b would require additional > + * changes to various APIs. > + */ > + uint32_t lock_cnt; > + /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */ > +} __rte_cache_aligned; > + > +#define RTE_QSBR_CNT_THR_OFFLINE 0 > +#define RTE_QSBR_CNT_INIT 1 > + > +/* RTE Quiescent State variable structure. > + * This structure has two elements that vary in size based on the > + * 'max_threads' parameter. > + * 1) Quiescent state counter array > + * 2) Register thread ID array > + */ > +struct rte_rcu_qsbr { > + uint64_t token __rte_cache_aligned; > + /**< Counter to allow for multiple concurrent quiescent state queries */ > + > + uint32_t num_elems __rte_cache_aligned; > + /**< Number of elements in the thread ID array */ > + uint32_t num_threads; > + /**< Number of threads currently using this QS variable */ > + uint32_t max_threads; > + /**< Maximum number of threads using this QS variable */ > + > + struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned; > + /**< Quiescent state counter array of 'max_threads' elements */ > + > + /**< Registered thread IDs are stored in a bitmap array, > + * after the quiescent state counter array. > + */ > +} __rte_cache_aligned; > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Return the size of the memory occupied by a Quiescent State variable. > + * > + * @param max_threads > + * Maximum number of threads reporting quiescent state on this variable. > + * @return > + * On success - size of memory in bytes required for this QS variable. > + * On error - 1 with error code set in rte_errno. > + * Possible rte_errno codes are: > + * - EINVAL - max_threads is 0 > + */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Initialize a Quiescent State (QS) variable. > + * > + * @param v > + * QS variable > + * @param max_threads > + * Maximum number of threads reporting quiescent state on this variable. > + * This should be the same value as passed to rte_rcu_qsbr_get_memsize. > + * @return > + * On success - 0 > + * On error - 1 with error code set in rte_errno. > + * Possible rte_errno codes are: > + * - EINVAL - max_threads is 0 or 'v' is NULL. > + * > + */ > +int __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Register a reader thread to report its quiescent state > + * on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * Any reader thread that wants to report its quiescent state must > + * call this API. This can be called during initialization or as part > + * of the packet processing loop. > + * > + * Note that rte_rcu_qsbr_thread_online must be called before the > + * thread updates its quiescent state using rte_rcu_qsbr_quiescent. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state on > + * the QS variable. thread_id is a value between 0 and (max_threads - 1). > + * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API. > + */ > +int __rte_experimental > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Remove a reader thread, from the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * This API can be called from the reader threads during shutdown. > + * Ongoing quiescent state queries will stop waiting for the status from this > + * unregistered reader thread. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will stop reporting its quiescent > + * state on the QS variable. > + */ > +int __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Add a registered reader thread, to the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * Any registered reader thread that wants to report its quiescent state must > + * call this API before calling rte_rcu_qsbr_quiescent. This can be called > + * during initialization or as part of the packet processing loop. > + * > + * The reader thread must call rte_rcu_thread_offline API, before > + * calling any functions that block, to ensure that rte_rcu_qsbr_check > + * API does not wait indefinitely for the reader thread to update its QS. > + * > + * The reader thread must call rte_rcu_thread_online API, after the blocking > + * function call returns, to ensure that rte_rcu_qsbr_check API > + * waits for the reader thread to update its quiescent state. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state on > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + uint64_t t; > + > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + /* Copy the current value of token. > + * The fence at the end of the function will ensure that > + * the following will not move down after the load of any shared > + * data structure. > + */ > + t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); > + > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > + * 'cnt' (64b) is accessed atomically. > + */ > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > + t, __ATOMIC_RELAXED); > + > + /* The subsequent load of the data structure should not > + * move above the store. Hence a store-load barrier > + * is required. > + * If the load of the data structure moves above the store, > + * writer might not see that the reader is online, even though > + * the reader is referencing the shared data structure. > + */ > +#ifdef RTE_ARCH_X86_64 > + /* rte_smp_mb() for x86 is lighter */ > + rte_smp_mb(); > +#else > + __atomic_thread_fence(__ATOMIC_SEQ_CST); > +#endif > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Remove a registered reader thread from the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * This can be called during initialization or as part of the packet > + * processing loop. > + * > + * The reader thread must call rte_rcu_thread_offline API, before > + * calling any functions that block, to ensure that rte_rcu_qsbr_check > + * API does not wait indefinitely for the reader thread to update its QS. > + * > + * @param v > + * QS variable > + * @param thread_id > + * rte_rcu_qsbr_check API will not wait for the reader thread with > + * this thread ID to report its quiescent state on the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + /* The reader can go offline only after the load of the > + * data structure is completed. i.e. any load of the > + * data strcture can not move after this store. > + */ > + > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > + RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Acquire a lock for accessing a shared data structure. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * This API is provided to aid debugging. This should be called before > + * accessing a shared data structure. > + * > + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented. > + * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the > + * rte_rcu_qsbr_check API will verify that this counter is 0. > + * > + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread id > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_lock(struct rte_rcu_qsbr *v __rte_unused, > + unsigned int thread_id __rte_unused) > +{ > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > + > +#if defined(RTE_LIBRTE_RCU_DEBUG) > + /* Increment the lock counter */ > + __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt, > + 1, __ATOMIC_ACQUIRE); > +#endif > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Release a lock after accessing a shared data structure. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * This API is provided to aid debugging. This should be called after > + * accessing a shared data structure. > + * > + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will > + * decrement a lock counter. rte_rcu_qsbr_check API will verify that this > + * counter is 0. > + * > + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread id > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_unlock(struct rte_rcu_qsbr *v __rte_unused, > + unsigned int thread_id __rte_unused) > +{ > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > + > +#if defined(RTE_LIBRTE_RCU_DEBUG) > + /* Decrement the lock counter */ > + __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt, > + 1, __ATOMIC_RELEASE); > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING, > + "Lock counter %u. Nested locks?\n", > + v->qsbr_cnt[thread_id].lock_cnt); > +#endif > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Ask the reader threads to report the quiescent state > + * status. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe and can be called from worker threads. > + * > + * @param v > + * QS variable > + * @return > + * - This is the token for this call of the API. This should be > + * passed to rte_rcu_qsbr_check API. > + */ > +static __rte_always_inline uint64_t __rte_experimental > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) > +{ > + uint64_t t; > + > + RTE_ASSERT(v != NULL); > + > + /* Release the changes to the shared data structure. > + * This store release will ensure that changes to any data > + * structure are visible to the workers before the token > + * update is visible. > + */ > + t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE); > + > + return t; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Update quiescent state for a reader thread. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * All the reader threads registered to report their quiescent state > + * on the QS variable must call this API. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Update the quiescent state for the reader with this thread ID. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + uint64_t t; > + > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + /* Acquire the changes to the shared data structure released > + * by rte_rcu_qsbr_start. > + * Later loads of the shared data structure should not move > + * above this load. Hence, use load-acquire. > + */ > + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); > + > + /* Inform the writer that updates are visible to this reader. > + * Prior loads of the shared data structure should not move > + * beyond this store. Hence use store-release. > + */ > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > + t, __ATOMIC_RELEASE); > + > + RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d", > + __func__, t, thread_id); > +} > + > +/* Check the quiescent state counter for registered threads only, assuming > + * that not all threads have registered. > + */ > +static __rte_always_inline int > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) > +{ > + uint32_t i, j, id; > + uint64_t bmap; > + uint64_t c; > + uint64_t *reg_thread_id; > + > + for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0); > + i < v->num_elems; > + i++, reg_thread_id++) { > + /* Load the current registered thread bit map before > + * loading the reader thread quiescent state counters. > + */ > + bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE); > + id = i << RTE_QSBR_THRID_INDEX_SHIFT; > + > + while (bmap) { > + j = __builtin_ctzl(bmap); > + RCU_DP_LOG(DEBUG, > + "%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d", > + __func__, t, wait, bmap, id + j); > + c = __atomic_load_n( > + &v->qsbr_cnt[id + j].cnt, > + __ATOMIC_ACQUIRE); > + RCU_DP_LOG(DEBUG, > + "%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d", > + __func__, t, wait, c, id+j); > + /* Counter is not checked for wrap-around condition > + * as it is a 64b counter. > + */ > + if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) { > + /* This thread is not in quiescent state */ > + if (!wait) > + return 0; > + > + rte_pause(); > + /* This thread might have unregistered. > + * Re-read the bitmap. > + */ > + bmap = __atomic_load_n(reg_thread_id, > + __ATOMIC_ACQUIRE); > + > + continue; > + } > + > + bmap &= ~(1UL << j); > + } > + } > + > + return 1; > +} > + > +/* Check the quiescent state counter for all threads, assuming that > + * all the threads have registered. > + */ > +static __rte_always_inline int > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) > +{ > + uint32_t i; > + struct rte_rcu_qsbr_cnt *cnt; > + uint64_t c; > + > + for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { > + RCU_DP_LOG(DEBUG, > + "%s: check: token = %lu, wait = %d, Thread ID = %d", > + __func__, t, wait, i); > + while (1) { > + c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE); > + RCU_DP_LOG(DEBUG, > + "%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d", > + __func__, t, wait, c, i); > + /* Counter is not checked for wrap-around condition > + * as it is a 64b counter. > + */ > + if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t)) > + break; > + > + /* This thread is not in quiescent state */ > + if (!wait) > + return 0; > + > + rte_pause(); > + } > + } > + > + return 1; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Checks if all the reader threads have entered the quiescent state > + * referenced by token. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe and can be called from the worker threads as well. > + * > + * If this API is called with 'wait' set to true, the following > + * factors must be considered: > + * > + * 1) If the calling thread is also reporting the status on the > + * same QS variable, it must update the quiescent state status, before > + * calling this API. > + * > + * 2) In addition, while calling from multiple threads, only > + * one of those threads can be reporting the quiescent state status > + * on a given QS variable. > + * > + * @param v > + * QS variable > + * @param t > + * Token returned by rte_rcu_qsbr_start API > + * @param wait > + * If true, block till all the reader threads have completed entering > + * the quiescent state referenced by token 't'. > + * @return > + * - 0 if all reader threads have NOT passed through specified number > + * of quiescent states. > + * - 1 if all reader threads have passed through specified number > + * of quiescent states. > + */ > +static __rte_always_inline int __rte_experimental > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) > +{ > + RTE_ASSERT(v != NULL); > + > + if (likely(v->num_threads == v->max_threads)) > + return __rcu_qsbr_check_all(v, t, wait); > + else > + return __rcu_qsbr_check_selective(v, t, wait); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Wait till the reader threads have entered quiescent state. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * This API can be thought of as a wrapper around rte_rcu_qsbr_start and > + * rte_rcu_qsbr_check APIs. > + * > + * If this API is called from multiple threads, only one of > + * those threads can be reporting the quiescent state status on a > + * given QS variable. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Thread ID of the caller if it is registered to report quiescent state > + * on this QS variable (i.e. the calling thread is also part of the > + * readside critical section). If not, pass RTE_QSBR_THRID_INVALID. > + */ > +void __rte_experimental > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Dump the details of a single QS variables to a file. > + * > + * It is NOT multi-thread safe. > + * > + * @param f > + * A pointer to a file for output > + * @param v > + * QS variable > + * @return > + * On success - 0 > + * On error - 1 with error code set in rte_errno. > + * Possible rte_errno codes are: > + * - EINVAL - NULL parameters are passed > + */ > +int __rte_experimental > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RCU_QSBR_H_ */ > diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map > new file mode 100644 > index 000000000..5ea8524db > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_version.map > @@ -0,0 +1,12 @@ > +EXPERIMENTAL { > + global: > + > + rte_rcu_qsbr_get_memsize; > + rte_rcu_qsbr_init; > + rte_rcu_qsbr_thread_register; > + rte_rcu_qsbr_thread_unregister; > + rte_rcu_qsbr_synchronize; > + rte_rcu_qsbr_dump; > + > + local: *; > +}; > diff --git a/lib/meson.build b/lib/meson.build > index 595314d7d..67be10659 100644 > --- a/lib/meson.build > +++ b/lib/meson.build > @@ -22,7 +22,7 @@ libraries = [ > 'gro', 'gso', 'ip_frag', 'jobstats', > 'kni', 'latencystats', 'lpm', 'member', > 'power', 'pdump', 'rawdev', > - 'reorder', 'sched', 'security', 'stack', 'vhost', > + 'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu', > #ipsec lib depends on crypto and security > 'ipsec', > # add pkt framework libs which use other libs from above > diff --git a/mk/rte.app.mk b/mk/rte.app.mk > index abea16d48..ebe6d48a7 100644 > --- a/mk/rte.app.mk > +++ b/mk/rte.app.mk > @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL) += -lrte_eal > _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE) += -lrte_cmdline > _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER) += -lrte_reorder > _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED) += -lrte_sched > +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU) += -lrte_rcu > > ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) > _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI) += -lrte_kni > -- > 2.17.1 >
> > On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote: > > Add RCU library supporting quiescent state based memory reclamation > method. > > This library helps identify the quiescent state of the reader threads > > so that the writers can free the memory associated with the lock less > > data structures. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > Reviewed-by: Steve Capper <steve.capper@arm.com> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com> > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com> > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com> > > Much better! > > Acked-by: Paul E. McKenney <paulmck@linux.ibm.com> > Thanks a lot, appreciate your feedback. Any views from maintainers on including this library into RC3? IMO, this library is independent and should not affect existing code.
Hi Honnappa, > -----Original Message----- > From: dev <dev-bounces@dpdk.org> On Behalf Of Honnappa Nagarahalli > Sent: Tuesday, April 23, 2019 12:31 > To: konstantin.ananyev@intel.com; stephen@networkplumber.org; > paulmck@linux.ibm.com; marko.kovacevic@intel.com; dev@dpdk.org > Cc: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Gavin Hu (Arm > Technology China) <Gavin.Hu@arm.com>; Dharmik Thakkar > <Dharmik.Thakkar@arm.com>; Malvika Gupta <Malvika.Gupta@arm.com> > Subject: [dpdk-dev] [PATCH v7 1/3] rcu: add RCU library supporting QSBR > mechanism > *snip* > +int __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int > +thread_id) { > + unsigned int i, id, success; > + uint64_t old_bmap, new_bmap; > + > + if (v == NULL || thread_id >= v->max_threads) { > + rte_log(RTE_LOG_ERR, rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", > + v->qsbr_cnt[thread_id].lock_cnt); > + > + id = thread_id & RTE_QSBR_THRID_MASK; > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure that the counter for registered threads does not > + * go out of sync. Hence, additional checks are required. > + */ > + /* Check if the thread is already unregistered */ > + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > + __ATOMIC_RELAXED); > + if (old_bmap & ~(1UL << id)) If I understand correctly, here should be (!(old_bmap & 1UL << id)) Can you please check? > + return 0; > + > + do { > + new_bmap = old_bmap & ~(1UL << id); > + /* Make sure any loads of the shared data structure are > + * completed before removal of the thread from the list of > + * reporting threads. > + */ > + success = __atomic_compare_exchange( > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > + &old_bmap, &new_bmap, 0, > + __ATOMIC_RELEASE, > __ATOMIC_RELAXED); > + > + if (success) > + __atomic_fetch_sub(&v->num_threads, > + 1, __ATOMIC_RELAXED); > + else if (old_bmap & ~(1UL << id)) Same comment as previous. > + /* Someone else unregistered this thread. > + * Counter should not be incremented. > + */ > + return 0; > + } while (success == 0); > + > + return 0; > +} *snip*
> -----Original Message----- > From: dev <dev-bounces@dpdk.org> On Behalf Of Honnappa Nagarahalli > Sent: Wednesday, April 24, 2019 2:53 AM > To: paulmck@linux.ibm.com > Cc: konstantin.ananyev@intel.com; stephen@networkplumber.org; > marko.kovacevic@intel.com; dev@dpdk.org; Gavin Hu (Arm Technology China) > <Gavin.Hu@arm.com>; Dharmik Thakkar <Dharmik.Thakkar@arm.com>; > Malvika Gupta <Malvika.Gupta@arm.com>; Honnappa Nagarahalli > <Honnappa.Nagarahalli@arm.com>; bruce.richardson@intel.com; nd > <nd@arm.com>; thomas@monjalon.net; nd <nd@arm.com> > Subject: Re: [dpdk-dev] [PATCH v7 1/3] rcu: add RCU library supporting QSBR > mechanism > > > > > On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote: > > > Add RCU library supporting quiescent state based memory reclamation > > method. > > > This library helps identify the quiescent state of the reader > > > threads so that the writers can free the memory associated with the > > > lock less data structures. > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > Reviewed-by: Steve Capper <steve.capper@arm.com> > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com> > > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com> > > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com> > > > > Much better! > > > > Acked-by: Paul E. McKenney <paulmck@linux.ibm.com> > > > Thanks a lot, appreciate your feedback. > > Any views from maintainers on including this library into RC3? IMO, this library is > independent and should not affect existing code. Tested rcu_qsbr_autotest and rcu_qsbr_perf_autotest UT on a armv8.2 machine(octeontx2). Found rcu_qsbr_perf_autotest() runs successfully on 24 cores. There is come issue with rcu_qsbr_autotest on 24 cores. It works fine upto 20 cores. Please find below the success log, failure log and core dump. [master][dpdk.org] $ echo "rcu_qsbr_autotest" | sudo ./build/app/test -c 0xfffff0 EAL: Detected 24 lcore(s) EAL: Detected 1 NUMA nodes EAL: Multi-process socket /var/run/dpdk/rte/mp_socket EAL: No available hugepages reported in hugepages-2048kB EAL: Probing VFIO support... EAL: VFIO support initialized APP: HPET is not enabled, using TSC as default timer RTE>>rcu_qsbr_autotest Test rte_rcu_qsbr_thread_register() rte_rcu_qsbr_get_memsize(): Invalid max_threads 0 Test rte_rcu_qsbr_init() rte_rcu_qsbr_init(): Invalid input parameter Test rte_rcu_qsbr_thread_register() rte_rcu_qsbr_thread_register(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter Test rte_rcu_qsbr_thread_unregister() rte_rcu_qsbr_thread_unregister(): Invalid input parameter rte_rcu_qsbr_thread_unregister(): Invalid input parameter rte_rcu_qsbr_thread_unregister(): Invalid input parameter Test rte_rcu_qsbr_start() Test rte_rcu_qsbr_check() Test rte_rcu_qsbr_synchronize() Test rte_rcu_qsbr_dump() rte_rcu_qsbr_dump(): Invalid input parameter rte_rcu_qsbr_dump(): Invalid input parameter rte_rcu_qsbr_dump(): Invalid input parameter Quiescent State Variable @0x13ff94100 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 0 Registered thread ID mask = 0x00 Token = 1 Quiescent State Counts for readers: Quiescent State Variable @0x13ff94100 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 1 Registered thread ID mask = 0x200 Token = 1 Quiescent State Counts for readers: thread ID = 5, count = 0, lock count = 0 Quiescent State Variable @0x13ff8ff00 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 2 Registered thread ID mask = 0xc00 Token = 1 Quiescent State Counts for readers: thread ID = 6, count = 0, lock count = 0 thread ID = 7, count = 0, lock count = 0 Test rte_rcu_qsbr_thread_online() Test rte_rcu_qsbr_thread_offline() Functional tests Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries Test: 8 writers, 4 QSBR variable, simultaneous QSBR queries Test OK [master] [dpdk.org] $ echo "rcu_qsbr_autotest" | sudo ./build/app/test -c 0xffffff EAL: Detected 24 lcore(s) EAL: Detected 1 NUMA nodes EAL: Multi-process socket /var/run/dpdk/rte/mp_socket EAL: No available hugepages reported in hugepages-2048kB EAL: Probing VFIO support... EAL: VFIO support initialized APP: HPET is not enabled, using TSC as default timer RTE>>rcu_qsbr_autotest Test rte_rcu_qsbr_thread_register() rte_rcu_qsbr_get_memsize(): Invalid max_threads 0 Test rte_rcu_qsbr_init() rte_rcu_qsbr_init(): Invalid input parameter Test rte_rcu_qsbr_thread_register() rte_rcu_qsbr_thread_register(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter Test rte_rcu_qsbr_thread_unregister() rte_rcu_qsbr_thread_unregister(): Invalid input parameter rte_rcu_qsbr_thread_unregister(): Invalid input parameter rte_rcu_qsbr_thread_unregister(): Invalid input parameter Test rte_rcu_qsbr_start() Test rte_rcu_qsbr_check() Test rte_rcu_qsbr_synchronize() Test rte_rcu_qsbr_dump() rte_rcu_qsbr_dump(): Invalid input parameter rte_rcu_qsbr_dump(): Invalid input parameter rte_rcu_qsbr_dump(): Invalid input parameter Quiescent State Variable @0x13ff94100 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 0 Registered thread ID mask = 0x00 Token = 1 Quiescent State Counts for readers: Quiescent State Variable @0x13ff94100 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 1 Registered thread ID mask = 0x20 Token = 1 Quiescent State Counts for readers: thread ID = 1, count = 0, lock count = 0 Quiescent State Variable @0x13ff8ff00 QS variable memory size = 16768 Given # max threads = 128 Current # threads = 2 Registered thread ID mask = 0xc0 Token = 1 Quiescent State Counts for readers: thread ID = 2, count = 0, lock count = 0 thread ID = 3, count = 0, lock count = 0 Test rte_rcu_qsbr_thread_online() Test rte_rcu_qsbr_thread_offline() Functional tests Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries Test: 10 writers, 5 QSBR variable, simultaneous QSBR queries rte_rcu_qsbr_init(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter rte_rcu_qsbr_thread_register(): Invalid input parameter Segmentation fault [Thread debugging using libthread_db enabled] Using host libthread_db library "/usr/lib/libthread_db.so.1". Core was generated by `./build/app/test -c 0xffffff'. Program terminated with signal SIGSEGV, Segmentation fault. #0 rte_rcu_qsbr_thread_online (thread_id=<optimized out>, v=0x0) at /home/jerin/dpdk.org/build/include/rte_rcu_qsbr.h:238 /home/jerin/dpdk.org/lib/librte_rcu/rte_rcu_qsbr.h:238:7712:beg:0x564df4 [Current thread is 1 (Thread 0xffff9c06d900 (LWP 1938))] (gdb) bt #0 rte_rcu_qsbr_thread_online (thread_id=<optimized out>, v=0x0) at /home/jerin/dpdk.org/build/include/rte_rcu_qsbr.h:238 #1 test_rcu_qsbr_reader (arg=<optimized out>) at /home/jerin/dpdk.org/app/test/test_rcu_qsbr.c:641 #2 0x0000000000652430 in eal_thread_loop (arg=<optimized out>) at /home/jerin/dpdk.org/lib/librte_eal/linux/eal/eal_thread.c:153 #3 0x0000ffffa13a756c in start_thread () from /usr/lib/libpthread.so.0 #4 0x0000ffffa11e301c in thread_start () from /usr/lib/libc.so.6
> > > > > > On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli > wrote: > > > > Add RCU library supporting quiescent state based memory > > > > reclamation > > > method. > > > > This library helps identify the quiescent state of the reader > > > > threads so that the writers can free the memory associated with > > > > the lock less data structures. > > > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > > Reviewed-by: Steve Capper <steve.capper@arm.com> > > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com> > > > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com> > > > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com> > > > > > > Much better! > > > > > > Acked-by: Paul E. McKenney <paulmck@linux.ibm.com> > > > > > Thanks a lot, appreciate your feedback. > > > > Any views from maintainers on including this library into RC3? IMO, > > this library is independent and should not affect existing code. > > Tested rcu_qsbr_autotest and rcu_qsbr_perf_autotest UT on a armv8.2 > machine(octeontx2). > Found rcu_qsbr_perf_autotest() runs successfully on 24 cores. > There is come issue with rcu_qsbr_autotest on 24 cores. It works fine upto 20 > cores. Thanks Jerin for running the test cases. I could reproduce the issue. It is a test case issue, will fix in next version.
diff --git a/MAINTAINERS b/MAINTAINERS index a08583471..ae54f37db 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1274,6 +1274,11 @@ F: examples/bpf/ F: app/test/test_bpf.c F: doc/guides/prog_guide/bpf_lib.rst +RCU - EXPERIMENTAL +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> +F: lib/librte_rcu/ +F: doc/guides/prog_guide/rcu_lib.rst + Test Applications ----------------- diff --git a/config/common_base b/config/common_base index 7fb0dedb6..f50d26c30 100644 --- a/config/common_base +++ b/config/common_base @@ -834,6 +834,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y # CONFIG_RTE_LIBRTE_TELEMETRY=n +# +# Compile librte_rcu +# +CONFIG_RTE_LIBRTE_RCU=y +CONFIG_RTE_LIBRTE_RCU_DEBUG=n + # # Compile librte_lpm # diff --git a/lib/Makefile b/lib/Makefile index 26021d0c0..791e0d991 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu +DEPDIRS-librte_rcu := librte_eal ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile new file mode 100644 index 000000000..6aa677bd1 --- /dev/null +++ b/lib/librte_rcu/Makefile @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Arm Limited + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = librte_rcu.a + +CFLAGS += -DALLOW_EXPERIMENTAL_API +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 +LDLIBS += -lrte_eal + +EXPORT_MAP := rte_rcu_version.map + +LIBABIVER := 1 + +# all source are stored in SRCS-y +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c + +# install includes +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build new file mode 100644 index 000000000..0c2d5a2e0 --- /dev/null +++ b/lib/librte_rcu/meson.build @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Arm Limited + +allow_experimental_apis = true + +sources = files('rte_rcu_qsbr.c') +headers = files('rte_rcu_qsbr.h') diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c new file mode 100644 index 000000000..50ef0ba9a --- /dev/null +++ b/lib/librte_rcu/rte_rcu_qsbr.c @@ -0,0 +1,268 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2018 Arm Limited + */ + +#include <stdio.h> +#include <string.h> +#include <stdint.h> +#include <errno.h> + +#include <rte_common.h> +#include <rte_log.h> +#include <rte_memory.h> +#include <rte_malloc.h> +#include <rte_eal.h> +#include <rte_eal_memconfig.h> +#include <rte_atomic.h> +#include <rte_per_lcore.h> +#include <rte_lcore.h> +#include <rte_errno.h> + +#include "rte_rcu_qsbr.h" + +/* Get the memory size of QSBR variable */ +size_t __rte_experimental +rte_rcu_qsbr_get_memsize(uint32_t max_threads) +{ + size_t sz; + + if (max_threads == 0) { + rte_log(RTE_LOG_ERR, rcu_log_type, + "%s(): Invalid max_threads %u\n", + __func__, max_threads); + rte_errno = EINVAL; + + return 1; + } + + sz = sizeof(struct rte_rcu_qsbr); + + /* Add the size of quiescent state counter array */ + sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; + + /* Add the size of the registered thread ID bitmap array */ + sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads); + + return sz; +} + +/* Initialize a quiescent state variable */ +int __rte_experimental +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) +{ + size_t sz; + + if (v == NULL) { + rte_log(RTE_LOG_ERR, rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + sz = rte_rcu_qsbr_get_memsize(max_threads); + if (sz == 1) + return 1; + + /* Set all the threads to offline */ + memset(v, 0, sz); + v->max_threads = max_threads; + v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads, + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / + RTE_QSBR_THRID_ARRAY_ELM_SIZE; + v->token = RTE_QSBR_CNT_INIT; + + return 0; +} + +/* Register a reader thread to report its quiescent state + * on a QS variable. + */ +int __rte_experimental +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + unsigned int i, id, success; + uint64_t old_bmap, new_bmap; + + if (v == NULL || thread_id >= v->max_threads) { + rte_log(RTE_LOG_ERR, rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", + v->qsbr_cnt[thread_id].lock_cnt); + + id = thread_id & RTE_QSBR_THRID_MASK; + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; + + /* Make sure that the counter for registered threads does not + * go out of sync. Hence, additional checks are required. + */ + /* Check if the thread is already registered */ + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), + __ATOMIC_RELAXED); + if (old_bmap & 1UL << id) + return 0; + + do { + new_bmap = old_bmap | (1UL << id); + success = __atomic_compare_exchange( + RTE_QSBR_THRID_ARRAY_ELM(v, i), + &old_bmap, &new_bmap, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED); + + if (success) + __atomic_fetch_add(&v->num_threads, + 1, __ATOMIC_RELAXED); + else if (old_bmap & (1UL << id)) + /* Someone else registered this thread. + * Counter should not be incremented. + */ + return 0; + } while (success == 0); + + return 0; +} + +/* Remove a reader thread, from the list of threads reporting their + * quiescent state on a QS variable. + */ +int __rte_experimental +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + unsigned int i, id, success; + uint64_t old_bmap, new_bmap; + + if (v == NULL || thread_id >= v->max_threads) { + rte_log(RTE_LOG_ERR, rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", + v->qsbr_cnt[thread_id].lock_cnt); + + id = thread_id & RTE_QSBR_THRID_MASK; + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; + + /* Make sure that the counter for registered threads does not + * go out of sync. Hence, additional checks are required. + */ + /* Check if the thread is already unregistered */ + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), + __ATOMIC_RELAXED); + if (old_bmap & ~(1UL << id)) + return 0; + + do { + new_bmap = old_bmap & ~(1UL << id); + /* Make sure any loads of the shared data structure are + * completed before removal of the thread from the list of + * reporting threads. + */ + success = __atomic_compare_exchange( + RTE_QSBR_THRID_ARRAY_ELM(v, i), + &old_bmap, &new_bmap, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED); + + if (success) + __atomic_fetch_sub(&v->num_threads, + 1, __ATOMIC_RELAXED); + else if (old_bmap & ~(1UL << id)) + /* Someone else unregistered this thread. + * Counter should not be incremented. + */ + return 0; + } while (success == 0); + + return 0; +} + +/* Wait till the reader threads have entered quiescent state. */ +void __rte_experimental +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + uint64_t t; + + RTE_ASSERT(v != NULL); + + t = rte_rcu_qsbr_start(v); + + /* If the current thread has readside critical section, + * update its quiescent state status. + */ + if (thread_id != RTE_QSBR_THRID_INVALID) + rte_rcu_qsbr_quiescent(v, thread_id); + + /* Wait for other readers to enter quiescent state */ + rte_rcu_qsbr_check(v, t, true); +} + +/* Dump the details of a single quiescent state variable to a file. */ +int __rte_experimental +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) +{ + uint64_t bmap; + uint32_t i, t, id; + + if (v == NULL || f == NULL) { + rte_log(RTE_LOG_ERR, rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + fprintf(f, "\nQuiescent State Variable @%p\n", v); + + fprintf(f, " QS variable memory size = %lu\n", + rte_rcu_qsbr_get_memsize(v->max_threads)); + fprintf(f, " Given # max threads = %u\n", v->max_threads); + fprintf(f, " Current # threads = %u\n", v->num_threads); + + fprintf(f, " Registered thread ID mask = 0x"); + for (i = 0; i < v->num_elems; i++) + fprintf(f, "%lx", __atomic_load_n( + RTE_QSBR_THRID_ARRAY_ELM(v, i), + __ATOMIC_ACQUIRE)); + fprintf(f, "\n"); + + fprintf(f, " Token = %lu\n", + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); + + fprintf(f, "Quiescent State Counts for readers:\n"); + for (i = 0; i < v->num_elems; i++) { + bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), + __ATOMIC_ACQUIRE); + id = i << RTE_QSBR_THRID_INDEX_SHIFT; + while (bmap) { + t = __builtin_ctzl(bmap); + fprintf(f, "thread ID = %d, count = %lu, lock count = %u\n", + id + t, + __atomic_load_n( + &v->qsbr_cnt[id + t].cnt, + __ATOMIC_RELAXED), + __atomic_load_n( + &v->qsbr_cnt[id + t].lock_cnt, + __ATOMIC_RELAXED)); + bmap &= ~(1UL << t); + } + } + + return 0; +} + +int rcu_log_type; + +RTE_INIT(rte_rcu_register) +{ + rcu_log_type = rte_log_register("lib.rcu"); + if (rcu_log_type >= 0) + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); +} diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index 000000000..58fb95ed0 --- /dev/null +++ b/lib/librte_rcu/rte_rcu_qsbr.h @@ -0,0 +1,639 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2018 Arm Limited + */ + +#ifndef _RTE_RCU_QSBR_H_ +#define _RTE_RCU_QSBR_H_ + +/** + * @file + * RTE Quiescent State Based Reclamation (QSBR) + * + * Quiescent State (QS) is any point in the thread execution + * where the thread does not hold a reference to a data structure + * in shared memory. While using lock-less data structures, the writer + * can safely free memory once all the reader threads have entered + * quiescent state. + * + * This library provides the ability for the readers to report quiescent + * state and for the writers to identify when all the readers have + * entered quiescent state. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdio.h> +#include <stdint.h> +#include <errno.h> +#include <rte_common.h> +#include <rte_memory.h> +#include <rte_lcore.h> +#include <rte_debug.h> +#include <rte_atomic.h> + +extern int rcu_log_type; + +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG +#define RCU_DP_LOG(level, fmt, args...) \ + rte_log(RTE_LOG_ ## level, rcu_log_type, \ + "%s(): " fmt "\n", __func__, ## args) +#else +#define RCU_DP_LOG(level, fmt, args...) +#endif + +#if defined(RTE_LIBRTE_RCU_DEBUG) +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\ + if (v->qsbr_cnt[thread_id].lock_cnt) \ + rte_log(RTE_LOG_ ## level, rcu_log_type, \ + "%s(): " fmt "\n", __func__, ## args); \ +} while (0) +#else +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) +#endif + +/* Registered thread IDs are stored as a bitmap of 64b element array. + * Given thread id needs to be converted to index into the array and + * the id within the array element. + */ +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) +#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \ + RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \ + RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE) +#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \ + ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) +#define RTE_QSBR_THRID_INDEX_SHIFT 6 +#define RTE_QSBR_THRID_MASK 0x3f +#define RTE_QSBR_THRID_INVALID 0xffffffff + +/* Worker thread counter */ +struct rte_rcu_qsbr_cnt { + uint64_t cnt; + /**< Quiescent state counter. Value 0 indicates the thread is offline + * 64b counter is used to avoid adding more code to address + * counter overflow. Changing this to 32b would require additional + * changes to various APIs. + */ + uint32_t lock_cnt; + /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */ +} __rte_cache_aligned; + +#define RTE_QSBR_CNT_THR_OFFLINE 0 +#define RTE_QSBR_CNT_INIT 1 + +/* RTE Quiescent State variable structure. + * This structure has two elements that vary in size based on the + * 'max_threads' parameter. + * 1) Quiescent state counter array + * 2) Register thread ID array + */ +struct rte_rcu_qsbr { + uint64_t token __rte_cache_aligned; + /**< Counter to allow for multiple concurrent quiescent state queries */ + + uint32_t num_elems __rte_cache_aligned; + /**< Number of elements in the thread ID array */ + uint32_t num_threads; + /**< Number of threads currently using this QS variable */ + uint32_t max_threads; + /**< Maximum number of threads using this QS variable */ + + struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned; + /**< Quiescent state counter array of 'max_threads' elements */ + + /**< Registered thread IDs are stored in a bitmap array, + * after the quiescent state counter array. + */ +} __rte_cache_aligned; + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Return the size of the memory occupied by a Quiescent State variable. + * + * @param max_threads + * Maximum number of threads reporting quiescent state on this variable. + * @return + * On success - size of memory in bytes required for this QS variable. + * On error - 1 with error code set in rte_errno. + * Possible rte_errno codes are: + * - EINVAL - max_threads is 0 + */ +size_t __rte_experimental +rte_rcu_qsbr_get_memsize(uint32_t max_threads); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Initialize a Quiescent State (QS) variable. + * + * @param v + * QS variable + * @param max_threads + * Maximum number of threads reporting quiescent state on this variable. + * This should be the same value as passed to rte_rcu_qsbr_get_memsize. + * @return + * On success - 0 + * On error - 1 with error code set in rte_errno. + * Possible rte_errno codes are: + * - EINVAL - max_threads is 0 or 'v' is NULL. + * + */ +int __rte_experimental +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Register a reader thread to report its quiescent state + * on a QS variable. + * + * This is implemented as a lock-free function. It is multi-thread + * safe. + * Any reader thread that wants to report its quiescent state must + * call this API. This can be called during initialization or as part + * of the packet processing loop. + * + * Note that rte_rcu_qsbr_thread_online must be called before the + * thread updates its quiescent state using rte_rcu_qsbr_quiescent. + * + * @param v + * QS variable + * @param thread_id + * Reader thread with this thread ID will report its quiescent state on + * the QS variable. thread_id is a value between 0 and (max_threads - 1). + * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API. + */ +int __rte_experimental +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Remove a reader thread, from the list of threads reporting their + * quiescent state on a QS variable. + * + * This is implemented as a lock-free function. It is multi-thread safe. + * This API can be called from the reader threads during shutdown. + * Ongoing quiescent state queries will stop waiting for the status from this + * unregistered reader thread. + * + * @param v + * QS variable + * @param thread_id + * Reader thread with this thread ID will stop reporting its quiescent + * state on the QS variable. + */ +int __rte_experimental +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Add a registered reader thread, to the list of threads reporting their + * quiescent state on a QS variable. + * + * This is implemented as a lock-free function. It is multi-thread + * safe. + * + * Any registered reader thread that wants to report its quiescent state must + * call this API before calling rte_rcu_qsbr_quiescent. This can be called + * during initialization or as part of the packet processing loop. + * + * The reader thread must call rte_rcu_thread_offline API, before + * calling any functions that block, to ensure that rte_rcu_qsbr_check + * API does not wait indefinitely for the reader thread to update its QS. + * + * The reader thread must call rte_rcu_thread_online API, after the blocking + * function call returns, to ensure that rte_rcu_qsbr_check API + * waits for the reader thread to update its quiescent state. + * + * @param v + * QS variable + * @param thread_id + * Reader thread with this thread ID will report its quiescent state on + * the QS variable. + */ +static __rte_always_inline void __rte_experimental +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + uint64_t t; + + RTE_ASSERT(v != NULL && thread_id < v->max_threads); + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", + v->qsbr_cnt[thread_id].lock_cnt); + + /* Copy the current value of token. + * The fence at the end of the function will ensure that + * the following will not move down after the load of any shared + * data structure. + */ + t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); + + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure + * 'cnt' (64b) is accessed atomically. + */ + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, + t, __ATOMIC_RELAXED); + + /* The subsequent load of the data structure should not + * move above the store. Hence a store-load barrier + * is required. + * If the load of the data structure moves above the store, + * writer might not see that the reader is online, even though + * the reader is referencing the shared data structure. + */ +#ifdef RTE_ARCH_X86_64 + /* rte_smp_mb() for x86 is lighter */ + rte_smp_mb(); +#else + __atomic_thread_fence(__ATOMIC_SEQ_CST); +#endif +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Remove a registered reader thread from the list of threads reporting their + * quiescent state on a QS variable. + * + * This is implemented as a lock-free function. It is multi-thread + * safe. + * + * This can be called during initialization or as part of the packet + * processing loop. + * + * The reader thread must call rte_rcu_thread_offline API, before + * calling any functions that block, to ensure that rte_rcu_qsbr_check + * API does not wait indefinitely for the reader thread to update its QS. + * + * @param v + * QS variable + * @param thread_id + * rte_rcu_qsbr_check API will not wait for the reader thread with + * this thread ID to report its quiescent state on the QS variable. + */ +static __rte_always_inline void __rte_experimental +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + RTE_ASSERT(v != NULL && thread_id < v->max_threads); + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", + v->qsbr_cnt[thread_id].lock_cnt); + + /* The reader can go offline only after the load of the + * data structure is completed. i.e. any load of the + * data strcture can not move after this store. + */ + + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, + RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Acquire a lock for accessing a shared data structure. + * + * This is implemented as a lock-free function. It is multi-thread + * safe. + * + * This API is provided to aid debugging. This should be called before + * accessing a shared data structure. + * + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented. + * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the + * rte_rcu_qsbr_check API will verify that this counter is 0. + * + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. + * + * @param v + * QS variable + * @param thread_id + * Reader thread id + */ +static __rte_always_inline void __rte_experimental +rte_rcu_qsbr_lock(struct rte_rcu_qsbr *v __rte_unused, + unsigned int thread_id __rte_unused) +{ + RTE_ASSERT(v != NULL && thread_id < v->max_threads); + +#if defined(RTE_LIBRTE_RCU_DEBUG) + /* Increment the lock counter */ + __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt, + 1, __ATOMIC_ACQUIRE); +#endif +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Release a lock after accessing a shared data structure. + * + * This is implemented as a lock-free function. It is multi-thread + * safe. + * + * This API is provided to aid debugging. This should be called after + * accessing a shared data structure. + * + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will + * decrement a lock counter. rte_rcu_qsbr_check API will verify that this + * counter is 0. + * + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. + * + * @param v + * QS variable + * @param thread_id + * Reader thread id + */ +static __rte_always_inline void __rte_experimental +rte_rcu_qsbr_unlock(struct rte_rcu_qsbr *v __rte_unused, + unsigned int thread_id __rte_unused) +{ + RTE_ASSERT(v != NULL && thread_id < v->max_threads); + +#if defined(RTE_LIBRTE_RCU_DEBUG) + /* Decrement the lock counter */ + __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt, + 1, __ATOMIC_RELEASE); + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING, + "Lock counter %u. Nested locks?\n", + v->qsbr_cnt[thread_id].lock_cnt); +#endif +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Ask the reader threads to report the quiescent state + * status. + * + * This is implemented as a lock-free function. It is multi-thread + * safe and can be called from worker threads. + * + * @param v + * QS variable + * @return + * - This is the token for this call of the API. This should be + * passed to rte_rcu_qsbr_check API. + */ +static __rte_always_inline uint64_t __rte_experimental +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) +{ + uint64_t t; + + RTE_ASSERT(v != NULL); + + /* Release the changes to the shared data structure. + * This store release will ensure that changes to any data + * structure are visible to the workers before the token + * update is visible. + */ + t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE); + + return t; +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Update quiescent state for a reader thread. + * + * This is implemented as a lock-free function. It is multi-thread safe. + * All the reader threads registered to report their quiescent state + * on the QS variable must call this API. + * + * @param v + * QS variable + * @param thread_id + * Update the quiescent state for the reader with this thread ID. + */ +static __rte_always_inline void __rte_experimental +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id) +{ + uint64_t t; + + RTE_ASSERT(v != NULL && thread_id < v->max_threads); + + RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", + v->qsbr_cnt[thread_id].lock_cnt); + + /* Acquire the changes to the shared data structure released + * by rte_rcu_qsbr_start. + * Later loads of the shared data structure should not move + * above this load. Hence, use load-acquire. + */ + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); + + /* Inform the writer that updates are visible to this reader. + * Prior loads of the shared data structure should not move + * beyond this store. Hence use store-release. + */ + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, + t, __ATOMIC_RELEASE); + + RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d", + __func__, t, thread_id); +} + +/* Check the quiescent state counter for registered threads only, assuming + * that not all threads have registered. + */ +static __rte_always_inline int +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) +{ + uint32_t i, j, id; + uint64_t bmap; + uint64_t c; + uint64_t *reg_thread_id; + + for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0); + i < v->num_elems; + i++, reg_thread_id++) { + /* Load the current registered thread bit map before + * loading the reader thread quiescent state counters. + */ + bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE); + id = i << RTE_QSBR_THRID_INDEX_SHIFT; + + while (bmap) { + j = __builtin_ctzl(bmap); + RCU_DP_LOG(DEBUG, + "%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d", + __func__, t, wait, bmap, id + j); + c = __atomic_load_n( + &v->qsbr_cnt[id + j].cnt, + __ATOMIC_ACQUIRE); + RCU_DP_LOG(DEBUG, + "%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d", + __func__, t, wait, c, id+j); + /* Counter is not checked for wrap-around condition + * as it is a 64b counter. + */ + if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) { + /* This thread is not in quiescent state */ + if (!wait) + return 0; + + rte_pause(); + /* This thread might have unregistered. + * Re-read the bitmap. + */ + bmap = __atomic_load_n(reg_thread_id, + __ATOMIC_ACQUIRE); + + continue; + } + + bmap &= ~(1UL << j); + } + } + + return 1; +} + +/* Check the quiescent state counter for all threads, assuming that + * all the threads have registered. + */ +static __rte_always_inline int +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) +{ + uint32_t i; + struct rte_rcu_qsbr_cnt *cnt; + uint64_t c; + + for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { + RCU_DP_LOG(DEBUG, + "%s: check: token = %lu, wait = %d, Thread ID = %d", + __func__, t, wait, i); + while (1) { + c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE); + RCU_DP_LOG(DEBUG, + "%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d", + __func__, t, wait, c, i); + /* Counter is not checked for wrap-around condition + * as it is a 64b counter. + */ + if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t)) + break; + + /* This thread is not in quiescent state */ + if (!wait) + return 0; + + rte_pause(); + } + } + + return 1; +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Checks if all the reader threads have entered the quiescent state + * referenced by token. + * + * This is implemented as a lock-free function. It is multi-thread + * safe and can be called from the worker threads as well. + * + * If this API is called with 'wait' set to true, the following + * factors must be considered: + * + * 1) If the calling thread is also reporting the status on the + * same QS variable, it must update the quiescent state status, before + * calling this API. + * + * 2) In addition, while calling from multiple threads, only + * one of those threads can be reporting the quiescent state status + * on a given QS variable. + * + * @param v + * QS variable + * @param t + * Token returned by rte_rcu_qsbr_start API + * @param wait + * If true, block till all the reader threads have completed entering + * the quiescent state referenced by token 't'. + * @return + * - 0 if all reader threads have NOT passed through specified number + * of quiescent states. + * - 1 if all reader threads have passed through specified number + * of quiescent states. + */ +static __rte_always_inline int __rte_experimental +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) +{ + RTE_ASSERT(v != NULL); + + if (likely(v->num_threads == v->max_threads)) + return __rcu_qsbr_check_all(v, t, wait); + else + return __rcu_qsbr_check_selective(v, t, wait); +} + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Wait till the reader threads have entered quiescent state. + * + * This is implemented as a lock-free function. It is multi-thread safe. + * This API can be thought of as a wrapper around rte_rcu_qsbr_start and + * rte_rcu_qsbr_check APIs. + * + * If this API is called from multiple threads, only one of + * those threads can be reporting the quiescent state status on a + * given QS variable. + * + * @param v + * QS variable + * @param thread_id + * Thread ID of the caller if it is registered to report quiescent state + * on this QS variable (i.e. the calling thread is also part of the + * readside critical section). If not, pass RTE_QSBR_THRID_INVALID. + */ +void __rte_experimental +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Dump the details of a single QS variables to a file. + * + * It is NOT multi-thread safe. + * + * @param f + * A pointer to a file for output + * @param v + * QS variable + * @return + * On success - 0 + * On error - 1 with error code set in rte_errno. + * Possible rte_errno codes are: + * - EINVAL - NULL parameters are passed + */ +int __rte_experimental +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RCU_QSBR_H_ */ diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map new file mode 100644 index 000000000..5ea8524db --- /dev/null +++ b/lib/librte_rcu/rte_rcu_version.map @@ -0,0 +1,12 @@ +EXPERIMENTAL { + global: + + rte_rcu_qsbr_get_memsize; + rte_rcu_qsbr_init; + rte_rcu_qsbr_thread_register; + rte_rcu_qsbr_thread_unregister; + rte_rcu_qsbr_synchronize; + rte_rcu_qsbr_dump; + + local: *; +}; diff --git a/lib/meson.build b/lib/meson.build index 595314d7d..67be10659 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -22,7 +22,7 @@ libraries = [ 'gro', 'gso', 'ip_frag', 'jobstats', 'kni', 'latencystats', 'lpm', 'member', 'power', 'pdump', 'rawdev', - 'reorder', 'sched', 'security', 'stack', 'vhost', + 'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu', #ipsec lib depends on crypto and security 'ipsec', # add pkt framework libs which use other libs from above diff --git a/mk/rte.app.mk b/mk/rte.app.mk index abea16d48..ebe6d48a7 100644 --- a/mk/rte.app.mk +++ b/mk/rte.app.mk @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL) += -lrte_eal _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE) += -lrte_cmdline _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER) += -lrte_reorder _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED) += -lrte_sched +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU) += -lrte_rcu ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI) += -lrte_kni