diff mbox series

[v7,6/8] selftests: ublk: kublk: decouple ublk_queues from ublk server threads

Message ID 20250527-ublk_task_per_io-v7-6-cbdbaf283baa@purestorage.com
State Superseded
Headers show
Series ublk: decouple server threads from ublk_queues/hctxs | expand

Commit Message

Uday Shankar May 27, 2025, 11:01 p.m. UTC
Add support in kublk for decoupled ublk_queues and ublk server threads.
kublk now has two modes of operation:

- (preexisting mode) threads and queues are paired 1:1, and each thread
  services all the I/Os of one queue
- (new mode) thread and queue counts are independently configurable.
  threads service I/Os in a way that balances load across threads even
  if load is not balanced over queues.

The default is the preexisting mode. The new mode is activated by
passing the --per_io_tasks flag.

Signed-off-by: Uday Shankar <ushankar@purestorage.com>
---
 tools/testing/selftests/ublk/file_backed.c |   4 +-
 tools/testing/selftests/ublk/kublk.c       | 106 ++++++++++++++++++++++++-----
 tools/testing/selftests/ublk/kublk.h       |   5 ++
 tools/testing/selftests/ublk/null.c        |   6 +-
 tools/testing/selftests/ublk/stripe.c      |   4 +-
 5 files changed, 101 insertions(+), 24 deletions(-)
diff mbox series

Patch

diff --git a/tools/testing/selftests/ublk/file_backed.c b/tools/testing/selftests/ublk/file_backed.c
index 922a87108b9f7bae53098e74602c7b1f3e0246bc..cfa59b631693793465f0e6909a6fbe1a364f4523 100644
--- a/tools/testing/selftests/ublk/file_backed.c
+++ b/tools/testing/selftests/ublk/file_backed.c
@@ -54,7 +54,7 @@  static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
 
 	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
 
-	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
 	sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
 	sqe[0]->user_data = build_user_data(tag,
 			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -66,7 +66,7 @@  static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
 	sqe[1]->flags |= IOSQE_FIXED_FILE | IOSQE_IO_HARDLINK;
 	sqe[1]->user_data = build_user_data(tag, ublk_op, 0, q->q_id, 1);
 
-	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
 	sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
 
 	return 2;
diff --git a/tools/testing/selftests/ublk/kublk.c b/tools/testing/selftests/ublk/kublk.c
index 40431a8357a8f74d7d62e271e9090c8708c3ecc5..c91c4b5aa0ff4a0a87c05bc1d9f404d105842e7f 100644
--- a/tools/testing/selftests/ublk/kublk.c
+++ b/tools/testing/selftests/ublk/kublk.c
@@ -505,8 +505,11 @@  static int ublk_thread_init(struct ublk_thread *t)
 	}
 
 	if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
+		unsigned nr_ios = dev->dev_info.queue_depth * dev->dev_info.nr_hw_queues;
+		unsigned max_nr_ios_per_thread = nr_ios / dev->nthreads;
+		max_nr_ios_per_thread += !!(nr_ios % dev->nthreads);
 		ret = io_uring_register_buffers_sparse(
-			&t->ring, dev->dev_info.queue_depth);
+			&t->ring, max_nr_ios_per_thread);
 		if (ret) {
 			ublk_err("ublk dev %d thread %d register spare buffers failed %d",
 					dev->dev_info.dev_id, t->idx, ret);
@@ -578,7 +581,7 @@  static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
 	if (q->tgt_ops->buf_index)
 		buf.index = q->tgt_ops->buf_index(q, tag);
 	else
-		buf.index = tag;
+		buf.index = q->ios[tag].buf_index;
 
 	if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
 		buf.flags = UBLK_AUTO_BUF_REG_FALLBACK;
@@ -660,18 +663,44 @@  int ublk_queue_io_cmd(struct ublk_io *io)
 
 static void ublk_submit_fetch_commands(struct ublk_thread *t)
 {
-	/*
-	 * Service exclusively the queue whose q_id matches our thread
-	 * index. This may change in the future.
-	 */
-	struct ublk_queue *q = &t->dev->q[t->idx];
+	struct ublk_queue *q;
 	struct ublk_io *io;
-	int i = 0;
+	int i = 0, j = 0;
 
-	for (i = 0; i < q->q_depth; i++) {
-		io = &q->ios[i];
-		io->t = t;
-		ublk_queue_io_cmd(io);
+	if (t->dev->per_io_tasks) {
+		/*
+		 * Lexicographically order all the (qid,tag) pairs, with
+		 * qid taking priority (so (1,0) > (0,1)). Then make
+		 * this thread the daemon for every Nth entry in this
+		 * list (N is the number of threads), starting at this
+		 * thread's index. This ensures that each queue is
+		 * handled by as many ublk server threads as possible,
+		 * so that load that is concentrated on one or a few
+		 * queues can make use of all ublk server threads.
+		 */
+		const struct ublksrv_ctrl_dev_info *dinfo = &t->dev->dev_info;
+		int nr_ios = dinfo->nr_hw_queues * dinfo->queue_depth;
+		for (i = t->idx; i < nr_ios; i += t->dev->nthreads) {
+			int q_id = i / dinfo->queue_depth;
+			int tag = i % dinfo->queue_depth;
+			q = &t->dev->q[q_id];
+			io = &q->ios[tag];
+			io->t = t;
+			io->buf_index = j++;
+			ublk_queue_io_cmd(io);
+		}
+	} else {
+		/*
+		 * Service exclusively the queue whose q_id matches our
+		 * thread index.
+		 */
+		struct ublk_queue *q = &t->dev->q[t->idx];
+		for (i = 0; i < q->q_depth; i++) {
+			io = &q->ios[i];
+			io->t = t;
+			io->buf_index = i;
+			ublk_queue_io_cmd(io);
+		}
 	}
 }
 
@@ -826,7 +855,8 @@  static void *ublk_io_handler_fn(void *data)
 		return NULL;
 	}
 	/* IO perf is sensitive with queue pthread affinity on NUMA machine*/
-	ublk_thread_set_sched_affinity(t, info->affinity);
+	if (info->affinity)
+		ublk_thread_set_sched_affinity(t, info->affinity);
 	sem_post(info->ready);
 
 	ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
@@ -893,7 +923,7 @@  static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
 
 	ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);
 
-	tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
+	tinfo = calloc(sizeof(struct ublk_thread_info), dev->nthreads);
 	if (!tinfo)
 		return -ENOMEM;
 
@@ -919,17 +949,29 @@  static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
 				 dinfo->dev_id, i);
 			goto fail;
 		}
+	}
 
+	for (i = 0; i < dev->nthreads; i++) {
 		tinfo[i].dev = dev;
 		tinfo[i].idx = i;
 		tinfo[i].ready = &ready;
-		tinfo[i].affinity = &affinity_buf[i];
+
+		/*
+		 * If threads are not tied 1:1 to queues, setting thread
+		 * affinity based on queue affinity makes little sense.
+		 * However, thread CPU affinity has significant impact
+		 * on performance, so to compare fairly, we'll still set
+		 * thread CPU affinity based on queue affinity where
+		 * possible.
+		 */
+		if (dev->nthreads == dinfo->nr_hw_queues)
+			tinfo[i].affinity = &affinity_buf[i];
 		pthread_create(&dev->threads[i].thread, NULL,
 				ublk_io_handler_fn,
 				&tinfo[i]);
 	}
 
-	for (i = 0; i < dinfo->nr_hw_queues; i++)
+	for (i = 0; i < dev->nthreads; i++)
 		sem_wait(&ready);
 	free(tinfo);
 	free(affinity_buf);
@@ -953,7 +995,7 @@  static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
 		ublk_send_dev_event(ctx, dev, dev->dev_info.dev_id);
 
 	/* wait until we are terminated */
-	for (i = 0; i < dinfo->nr_hw_queues; i++)
+	for (i = 0; i < dev->nthreads; i++)
 		pthread_join(dev->threads[i].thread, &thread_ret);
  fail:
 	for (i = 0; i < dinfo->nr_hw_queues; i++)
@@ -1063,6 +1105,7 @@  static int ublk_stop_io_daemon(const struct ublk_dev *dev)
 
 static int __cmd_dev_add(const struct dev_ctx *ctx)
 {
+	unsigned nthreads = ctx->nthreads;
 	unsigned nr_queues = ctx->nr_hw_queues;
 	const char *tgt_type = ctx->tgt_type;
 	unsigned depth = ctx->queue_depth;
@@ -1086,6 +1129,23 @@  static int __cmd_dev_add(const struct dev_ctx *ctx)
 		return -EINVAL;
 	}
 
+	/* default to 1:1 threads:queues if nthreads is unspecified */
+	if (nthreads == -1)
+		nthreads = nr_queues;
+
+	if (nthreads > UBLK_MAX_THREADS) {
+		ublk_err("%s: %u is too many threads (max %u)\n",
+				__func__, nthreads, UBLK_MAX_THREADS);
+		return -EINVAL;
+	}
+
+	if (nthreads != nr_queues && !ctx->per_io_tasks) {
+		ublk_err("%s: threads %u must be same as queues %u if "
+			"not using per_io_tasks\n",
+			__func__, nthreads, nr_queues);
+		return -EINVAL;
+	}
+
 	dev = ublk_ctrl_init();
 	if (!dev) {
 		ublk_err("%s: can't alloc dev id %d, type %s\n",
@@ -1109,6 +1169,8 @@  static int __cmd_dev_add(const struct dev_ctx *ctx)
 	if ((features & UBLK_F_QUIESCE) &&
 			(info->flags & UBLK_F_USER_RECOVERY))
 		info->flags |= UBLK_F_QUIESCE;
+	dev->nthreads = nthreads;
+	dev->per_io_tasks = ctx->per_io_tasks;
 	dev->tgt.ops = ops;
 	dev->tgt.sq_depth = depth;
 	dev->tgt.cq_depth = depth;
@@ -1307,6 +1369,7 @@  static int cmd_dev_get_features(void)
 		[const_ilog2(UBLK_F_UPDATE_SIZE)] = "UPDATE_SIZE",
 		[const_ilog2(UBLK_F_AUTO_BUF_REG)] = "AUTO_BUF_REG",
 		[const_ilog2(UBLK_F_QUIESCE)] = "QUIESCE",
+		[const_ilog2(UBLK_F_PER_IO_DAEMON)] = "PER_IO_DAEMON",
 	};
 	struct ublk_dev *dev;
 	__u64 features = 0;
@@ -1401,8 +1464,10 @@  static void __cmd_create_help(char *exe, bool recovery)
 			exe, recovery ? "recover" : "add");
 	printf("\t[--foreground] [--quiet] [-z] [--auto_zc] [--auto_zc_fallback] [--debug_mask mask] [-r 0|1 ] [-g]\n");
 	printf("\t[-e 0|1 ] [-i 0|1]\n");
+	printf("\t[--nthreads threads] [--per_io_tasks]\n");
 	printf("\t[target options] [backfile1] [backfile2] ...\n");
 	printf("\tdefault: nr_queues=2(max 32), depth=128(max 1024), dev_id=-1(auto allocation)\n");
+	printf("\tdefault: nthreads=nr_queues");
 
 	for (i = 0; i < sizeof(tgt_ops_list) / sizeof(tgt_ops_list[0]); i++) {
 		const struct ublk_tgt_ops *ops = tgt_ops_list[i];
@@ -1459,6 +1524,8 @@  int main(int argc, char *argv[])
 		{ "auto_zc",		0,	NULL,  0 },
 		{ "auto_zc_fallback", 	0,	NULL,  0 },
 		{ "size",		1,	NULL, 's'},
+		{ "nthreads",		1,	NULL,  0 },
+		{ "per_io_tasks",	0,	NULL,  0 },
 		{ 0, 0, 0, 0 }
 	};
 	const struct ublk_tgt_ops *ops = NULL;
@@ -1467,6 +1534,7 @@  int main(int argc, char *argv[])
 	struct dev_ctx ctx = {
 		.queue_depth	=	128,
 		.nr_hw_queues	=	2,
+		.nthreads	=	-1,
 		.dev_id		=	-1,
 		.tgt_type	=	"unknown",
 	};
@@ -1534,6 +1602,10 @@  int main(int argc, char *argv[])
 				ctx.flags |= UBLK_F_AUTO_BUF_REG;
 			if (!strcmp(longopts[option_idx].name, "auto_zc_fallback"))
 				ctx.auto_zc_fallback = 1;
+			if (!strcmp(longopts[option_idx].name, "nthreads"))
+				ctx.nthreads = strtol(optarg, NULL, 10);
+			if (!strcmp(longopts[option_idx].name, "per_io_tasks"))
+				ctx.per_io_tasks = 1;
 			break;
 		case '?':
 			/*
diff --git a/tools/testing/selftests/ublk/kublk.h b/tools/testing/selftests/ublk/kublk.h
index 3a2ae095bee18633acd5a9c923cfab2d14fe3bff..4cc8103bc49a7a93bbf61986cde8f4e6e1be716d 100644
--- a/tools/testing/selftests/ublk/kublk.h
+++ b/tools/testing/selftests/ublk/kublk.h
@@ -80,6 +80,7 @@  struct dev_ctx {
 	char tgt_type[16];
 	unsigned long flags;
 	unsigned nr_hw_queues;
+	unsigned nthreads;
 	unsigned queue_depth;
 	int dev_id;
 	int nr_files;
@@ -89,6 +90,7 @@  struct dev_ctx {
 	unsigned int	fg:1;
 	unsigned int	recovery:1;
 	unsigned int	auto_zc_fallback:1;
+	unsigned int	per_io_tasks:1;
 
 	int _evtfd;
 	int _shmid;
@@ -128,6 +130,7 @@  struct ublk_io {
 	unsigned short refs;		/* used by target code only */
 
 	int tag;
+	int buf_index;
 
 	int result;
 
@@ -203,6 +206,8 @@  struct ublk_dev {
 	struct ublksrv_ctrl_dev_info  dev_info;
 	struct ublk_queue q[UBLK_MAX_QUEUES];
 	struct ublk_thread threads[UBLK_MAX_THREADS];
+	unsigned nthreads;
+	unsigned per_io_tasks;
 
 	int fds[MAX_BACK_FILES + 1];	/* fds[0] points to /dev/ublkcN */
 	int nr_fds;
diff --git a/tools/testing/selftests/ublk/null.c b/tools/testing/selftests/ublk/null.c
index 9acc7e0d271b5ae52d6d31587cc5bfb63b19778d..afe0b99d77eec74acae04952a9af5348252bc599 100644
--- a/tools/testing/selftests/ublk/null.c
+++ b/tools/testing/selftests/ublk/null.c
@@ -62,7 +62,7 @@  static int null_queue_zc_io(struct ublk_queue *q, int tag)
 
 	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
 
-	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
 	sqe[0]->user_data = build_user_data(tag,
 			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
 	sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
@@ -70,7 +70,7 @@  static int null_queue_zc_io(struct ublk_queue *q, int tag)
 	__setup_nop_io(tag, iod, sqe[1], q->q_id);
 	sqe[1]->flags |= IOSQE_IO_HARDLINK;
 
-	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
 	sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
 
 	// buf register is marked as IOSQE_CQE_SKIP_SUCCESS
@@ -136,7 +136,7 @@  static unsigned short ublk_null_buf_index(const struct ublk_queue *q, int tag)
 {
 	if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
 		return (unsigned short)-1;
-	return tag;
+	return q->ios[tag].buf_index;
 }
 
 const struct ublk_tgt_ops null_tgt_ops = {
diff --git a/tools/testing/selftests/ublk/stripe.c b/tools/testing/selftests/ublk/stripe.c
index 97079c3121ef8d4edc71891a289dd40658ce3f2a..37d50bbf5f5e86a520efedc9228510f8e1273625 100644
--- a/tools/testing/selftests/ublk/stripe.c
+++ b/tools/testing/selftests/ublk/stripe.c
@@ -141,7 +141,7 @@  static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
 	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, s->nr + extra);
 
 	if (zc) {
-		io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+		io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, io->buf_index);
 		sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
 		sqe[0]->user_data = build_user_data(tag,
 			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -167,7 +167,7 @@  static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
 	if (zc) {
 		struct io_uring_sqe *unreg = sqe[s->nr + 1];
 
-		io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, tag);
+		io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, io->buf_index);
 		unreg->user_data = build_user_data(
 			tag, ublk_cmd_op_nr(unreg->cmd_op), 0, q->q_id, 1);
 	}