diff mbox series

[RFC,34/48] dlm_ckv: add KV get/set async API

Message ID 20220803162857.27770-35-d.bogdanov@yadro.com
State New
Headers show
Series Target cluster implementation over DLM | expand

Commit Message

Dmitry Bogdanov Dec. 22, 2021, 12:38 p.m. UTC
The sync versions of KV set/get functions may take too long time if user
want to store/read a number of keys.
Add async versions of KV get/set functions to allow the user to group
the requests and to wait for completion of all requests together.

Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com>
---
 drivers/target/dlm_ckv.c | 110 ++++++++++++++++++++++++++++++++++++++-
 drivers/target/dlm_ckv.h |   6 +++
 2 files changed, 115 insertions(+), 1 deletion(-)
diff mbox series

Patch

diff --git a/drivers/target/dlm_ckv.c b/drivers/target/dlm_ckv.c
index 22c5f0827595..417159f18fc6 100644
--- a/drivers/target/dlm_ckv.c
+++ b/drivers/target/dlm_ckv.c
@@ -14,15 +14,27 @@ 
 #include <target/target_core_base.h>
 #include "dlm_ckv.h"
 
+enum dlm_lksb_state {
+	ONE_STAGE_SYNC,
+	READ_FIRST_STAGE,
+	READ_SECOND_STAGE,
+	WRITE_FIRST_STAGE,
+	WRITE_SECOND_STAGE,
+};
 struct dlm_ckv_lksb {
 	struct dlm_lksb lksb;
 	struct completion compl;
+	enum dlm_lksb_state state;
 };
 
 struct dlm_ckv_lock {
 	struct dlm_ckv_bucket *bucket;
 	struct dlm_ckv_lksb lksb;
 	char name[DLM_RESNAME_MAXLEN];
+	dlm_ckv_async_cb async_cb;
+	void *cb_arg;
+	void *value;
+	size_t len;
 };
 
 struct dlm_ckv_kv {
@@ -97,8 +109,48 @@  static const struct dlm_lockspace_ops dlm_ckv_lockspace_ops = {
 static void dlm_ast(void *astarg)
 {
 	struct dlm_ckv_lksb *dlm_ckv_lksb = astarg;
+	struct dlm_ckv_lock *ckv_lock;
+	struct dlm_ckv_bucket *bucket;
+	int res;
+
+	ckv_lock = container_of(dlm_ckv_lksb, struct dlm_ckv_lock, lksb);
+	bucket = ckv_lock->bucket;
 
-	complete(&dlm_ckv_lksb->compl);
+	switch (dlm_ckv_lksb->state) {
+	case ONE_STAGE_SYNC:
+		complete(&dlm_ckv_lksb->compl);
+		break;
+	case READ_FIRST_STAGE:
+		if (dlm_ckv_lksb->lksb.sb_flags & DLM_SBF_VALNOTVALID) {
+			pr_debug("%s LVB was invalid\n", ckv_lock->name);
+			memset(ckv_lock->value, 0, ckv_lock->len);
+		} else
+			memcpy(ckv_lock->value, dlm_ckv_lksb->lksb.sb_lvbptr,
+			       ckv_lock->len);
+
+		dlm_ckv_lksb->state = READ_SECOND_STAGE;
+		res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_NL,
+			       &dlm_ckv_lksb->lksb, DLM_LKF_CONVERT, 0, 0, 0,
+			       dlm_ast, dlm_ckv_lksb, NULL);
+		if (res)
+			ckv_lock->async_cb(ckv_lock->cb_arg, res);
+		break;
+	case WRITE_FIRST_STAGE:
+
+		dlm_ckv_lksb->state = WRITE_SECOND_STAGE;
+		res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_NL,
+			       &dlm_ckv_lksb->lksb,
+			       DLM_LKF_VALBLK | DLM_LKF_CONVERT, 0, 0, 0,
+			       dlm_ast, dlm_ckv_lksb, NULL);
+		if (res)
+			ckv_lock->async_cb(ckv_lock->cb_arg, res);
+		break;
+	case READ_SECOND_STAGE:
+		fallthrough;
+	case WRITE_SECOND_STAGE:
+		ckv_lock->async_cb(ckv_lock->cb_arg, dlm_ckv_lksb->lksb.sb_status);
+		break;
+	}
 }
 
 /*
@@ -135,6 +187,7 @@  static int dlm_ckv_lock_wait(dlm_lockspace_t *ls, int mode,
 {
 	int res;
 
+	lksb->state = ONE_STAGE_SYNC;
 	res = dlm_lock(ls, mode, &lksb->lksb, flags,
 		       (void *)name, name ? strlen(name) : 0, 0,
 		       dlm_ast, lksb, bast);
@@ -164,6 +217,7 @@  static int dlm_ckv_unlock_wait(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb)
 {
 	int res;
 
+	lksb->state = ONE_STAGE_SYNC;
 	res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
 	if (res < 0)
 		goto out;
@@ -333,6 +387,32 @@  dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len)
 }
 EXPORT_SYMBOL(dlm_ckv_get);
 
+int
+dlm_ckv_get_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+		  dlm_ckv_async_cb cb, void *cb_arg)
+{
+	struct dlm_ckv_lock *ckv_lock = &kv->lock;
+	struct dlm_ckv_bucket *bucket;
+	int res;
+
+	BUG_ON(!ckv_lock);
+	bucket = ckv_lock->bucket;
+
+	ckv_lock->lksb.state = READ_FIRST_STAGE;
+	ckv_lock->len = len;
+	ckv_lock->value = value;
+	ckv_lock->async_cb = cb;
+	ckv_lock->cb_arg = cb_arg;
+	res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_CR, &ckv_lock->lksb.lksb,
+		       DLM_LKF_VALBLK | DLM_LKF_CONVERT, NULL, 0, 0,
+		       dlm_ast, &ckv_lock->lksb, NULL);
+	if (res)
+		pr_info("Can not get lock %s, rc=%d\n", ckv_lock->name, res);
+
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_get_async);
+
 int
 dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
 {
@@ -368,6 +448,34 @@  dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
 }
 EXPORT_SYMBOL(dlm_ckv_set);
 
+int
+dlm_ckv_set_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+		  dlm_ckv_async_cb cb, void *cb_arg)
+{
+	struct dlm_ckv_lock *ckv_lock = &kv->lock;
+	struct dlm_ckv_bucket *bucket;
+	int res;
+
+	BUG_ON(!ckv_lock);
+	bucket = ckv_lock->bucket;
+
+	ckv_lock->lksb.state = WRITE_FIRST_STAGE;
+	ckv_lock->len = len;
+	ckv_lock->value = value;
+	ckv_lock->async_cb = cb;
+	ckv_lock->cb_arg = cb_arg;
+	memcpy(ckv_lock->lksb.lksb.sb_lvbptr, ckv_lock->value, ckv_lock->len);
+
+	res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_EX, &ckv_lock->lksb.lksb,
+		       DLM_LKF_CONVERT, NULL, 0, 0,
+		       dlm_ast, &ckv_lock->lksb, NULL);
+	if (res)
+		pr_info("Can not get lock %s\n", ckv_lock->name);
+
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_set_async);
+
 static void dlm_cvk_pre_n_bast(void *astarg, int mode)
 {
 	struct dlm_ckv_lksb *lksb = astarg;
diff --git a/drivers/target/dlm_ckv.h b/drivers/target/dlm_ckv.h
index c01904313f1e..e8045917067e 100644
--- a/drivers/target/dlm_ckv.h
+++ b/drivers/target/dlm_ckv.h
@@ -9,6 +9,8 @@  struct dlm_ckv_kv;
 #define DLM_CKV_VALUE_MAX_SIZE	255
 
 typedef void (*dlm_ckv_notify_cb)(void *userarg);
+typedef void (*dlm_ckv_async_cb)(void *userarg, int res);
+typedef void (*dlm_ckv_nodeleft_cb)(void *arg, int nodeid);
 
 struct dlm_ckv_bucket *dlm_ckv_open_bucket(const char *name,
 					   const char *cluster_name,
@@ -26,6 +28,10 @@  dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key);
 void dlm_ckv_free_kv(struct dlm_ckv_kv *kv);
 int dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len);
 int dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len);
+int dlm_ckv_get_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+		      dlm_ckv_async_cb cb, void *cb_arg);
+int dlm_ckv_set_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+		      dlm_ckv_async_cb cb, void *cb_arg);
 
 struct dlm_ckv_notify *
 dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,