@@ -54,7 +54,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
- io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+ io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
sqe[0]->user_data = build_user_data(tag,
ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -66,7 +66,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
sqe[1]->flags |= IOSQE_FIXED_FILE | IOSQE_IO_HARDLINK;
sqe[1]->user_data = build_user_data(tag, ublk_op, 0, q->q_id, 1);
- io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+ io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
return 2;
@@ -505,8 +505,11 @@ static int ublk_thread_init(struct ublk_thread *t)
}
if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
+ unsigned nr_ios = dev->dev_info.queue_depth * dev->dev_info.nr_hw_queues;
+ unsigned max_nr_ios_per_thread = nr_ios / dev->nthreads;
+ max_nr_ios_per_thread += !!(nr_ios % dev->nthreads);
ret = io_uring_register_buffers_sparse(
- &t->ring, dev->dev_info.queue_depth);
+ &t->ring, max_nr_ios_per_thread);
if (ret) {
ublk_err("ublk dev %d thread %d register spare buffers failed %d",
dev->dev_info.dev_id, t->idx, ret);
@@ -578,7 +581,7 @@ static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
if (q->tgt_ops->buf_index)
buf.index = q->tgt_ops->buf_index(q, tag);
else
- buf.index = tag;
+ buf.index = q->ios[tag].buf_index;
if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
buf.flags = UBLK_AUTO_BUF_REG_FALLBACK;
@@ -660,18 +663,44 @@ int ublk_queue_io_cmd(struct ublk_io *io)
static void ublk_submit_fetch_commands(struct ublk_thread *t)
{
- /*
- * Service exclusively the queue whose q_id matches our thread
- * index. This may change in the future.
- */
- struct ublk_queue *q = &t->dev->q[t->idx];
+ struct ublk_queue *q;
struct ublk_io *io;
- int i = 0;
+ int i = 0, j = 0;
- for (i = 0; i < q->q_depth; i++) {
- io = &q->ios[i];
- io->t = t;
- ublk_queue_io_cmd(io);
+ if (t->dev->per_io_tasks) {
+ /*
+ * Lexicographically order all the (qid,tag) pairs, with
+ * qid taking priority (so (1,0) > (0,1)). Then make
+ * this thread the daemon for every Nth entry in this
+ * list (N is the number of threads), starting at this
+ * thread's index. This ensures that each queue is
+ * handled by as many ublk server threads as possible,
+ * so that load that is concentrated on one or a few
+ * queues can make use of all ublk server threads.
+ */
+ const struct ublksrv_ctrl_dev_info *dinfo = &t->dev->dev_info;
+ int nr_ios = dinfo->nr_hw_queues * dinfo->queue_depth;
+ for (i = t->idx; i < nr_ios; i += t->dev->nthreads) {
+ int q_id = i / dinfo->queue_depth;
+ int tag = i % dinfo->queue_depth;
+ q = &t->dev->q[q_id];
+ io = &q->ios[tag];
+ io->t = t;
+ io->buf_index = j++;
+ ublk_queue_io_cmd(io);
+ }
+ } else {
+ /*
+ * Service exclusively the queue whose q_id matches our
+ * thread index.
+ */
+ struct ublk_queue *q = &t->dev->q[t->idx];
+ for (i = 0; i < q->q_depth; i++) {
+ io = &q->ios[i];
+ io->t = t;
+ io->buf_index = i;
+ ublk_queue_io_cmd(io);
+ }
}
}
@@ -826,7 +855,8 @@ static void *ublk_io_handler_fn(void *data)
return NULL;
}
/* IO perf is sensitive with queue pthread affinity on NUMA machine*/
- ublk_thread_set_sched_affinity(t, info->affinity);
+ if (info->affinity)
+ ublk_thread_set_sched_affinity(t, info->affinity);
sem_post(info->ready);
ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
@@ -893,7 +923,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);
- tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
+ tinfo = calloc(sizeof(struct ublk_thread_info), dev->nthreads);
if (!tinfo)
return -ENOMEM;
@@ -919,17 +949,29 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
dinfo->dev_id, i);
goto fail;
}
+ }
+ for (i = 0; i < dev->nthreads; i++) {
tinfo[i].dev = dev;
tinfo[i].idx = i;
tinfo[i].ready = &ready;
- tinfo[i].affinity = &affinity_buf[i];
+
+ /*
+ * If threads are not tied 1:1 to queues, setting thread
+ * affinity based on queue affinity makes little sense.
+ * However, thread CPU affinity has significant impact
+ * on performance, so to compare fairly, we'll still set
+ * thread CPU affinity based on queue affinity where
+ * possible.
+ */
+ if (dev->nthreads == dinfo->nr_hw_queues)
+ tinfo[i].affinity = &affinity_buf[i];
pthread_create(&dev->threads[i].thread, NULL,
ublk_io_handler_fn,
&tinfo[i]);
}
- for (i = 0; i < dinfo->nr_hw_queues; i++)
+ for (i = 0; i < dev->nthreads; i++)
sem_wait(&ready);
free(tinfo);
free(affinity_buf);
@@ -953,7 +995,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
ublk_send_dev_event(ctx, dev, dev->dev_info.dev_id);
/* wait until we are terminated */
- for (i = 0; i < dinfo->nr_hw_queues; i++)
+ for (i = 0; i < dev->nthreads; i++)
pthread_join(dev->threads[i].thread, &thread_ret);
fail:
for (i = 0; i < dinfo->nr_hw_queues; i++)
@@ -1063,6 +1105,7 @@ static int ublk_stop_io_daemon(const struct ublk_dev *dev)
static int __cmd_dev_add(const struct dev_ctx *ctx)
{
+ unsigned nthreads = ctx->nthreads;
unsigned nr_queues = ctx->nr_hw_queues;
const char *tgt_type = ctx->tgt_type;
unsigned depth = ctx->queue_depth;
@@ -1086,6 +1129,23 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
return -EINVAL;
}
+ /* default to 1:1 threads:queues if nthreads is unspecified */
+ if (nthreads == -1)
+ nthreads = nr_queues;
+
+ if (nthreads > UBLK_MAX_THREADS) {
+ ublk_err("%s: %u is too many threads (max %u)\n",
+ __func__, nthreads, UBLK_MAX_THREADS);
+ return -EINVAL;
+ }
+
+ if (nthreads != nr_queues && !ctx->per_io_tasks) {
+ ublk_err("%s: threads %u must be same as queues %u if "
+ "not using per_io_tasks\n",
+ __func__, nthreads, nr_queues);
+ return -EINVAL;
+ }
+
dev = ublk_ctrl_init();
if (!dev) {
ublk_err("%s: can't alloc dev id %d, type %s\n",
@@ -1109,6 +1169,8 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
if ((features & UBLK_F_QUIESCE) &&
(info->flags & UBLK_F_USER_RECOVERY))
info->flags |= UBLK_F_QUIESCE;
+ dev->nthreads = nthreads;
+ dev->per_io_tasks = ctx->per_io_tasks;
dev->tgt.ops = ops;
dev->tgt.sq_depth = depth;
dev->tgt.cq_depth = depth;
@@ -1307,6 +1369,7 @@ static int cmd_dev_get_features(void)
[const_ilog2(UBLK_F_UPDATE_SIZE)] = "UPDATE_SIZE",
[const_ilog2(UBLK_F_AUTO_BUF_REG)] = "AUTO_BUF_REG",
[const_ilog2(UBLK_F_QUIESCE)] = "QUIESCE",
+ [const_ilog2(UBLK_F_PER_IO_DAEMON)] = "PER_IO_DAEMON",
};
struct ublk_dev *dev;
__u64 features = 0;
@@ -1401,8 +1464,10 @@ static void __cmd_create_help(char *exe, bool recovery)
exe, recovery ? "recover" : "add");
printf("\t[--foreground] [--quiet] [-z] [--auto_zc] [--auto_zc_fallback] [--debug_mask mask] [-r 0|1 ] [-g]\n");
printf("\t[-e 0|1 ] [-i 0|1]\n");
+ printf("\t[--nthreads threads] [--per_io_tasks]\n");
printf("\t[target options] [backfile1] [backfile2] ...\n");
printf("\tdefault: nr_queues=2(max 32), depth=128(max 1024), dev_id=-1(auto allocation)\n");
+ printf("\tdefault: nthreads=nr_queues");
for (i = 0; i < sizeof(tgt_ops_list) / sizeof(tgt_ops_list[0]); i++) {
const struct ublk_tgt_ops *ops = tgt_ops_list[i];
@@ -1459,6 +1524,8 @@ int main(int argc, char *argv[])
{ "auto_zc", 0, NULL, 0 },
{ "auto_zc_fallback", 0, NULL, 0 },
{ "size", 1, NULL, 's'},
+ { "nthreads", 1, NULL, 0 },
+ { "per_io_tasks", 0, NULL, 0 },
{ 0, 0, 0, 0 }
};
const struct ublk_tgt_ops *ops = NULL;
@@ -1467,6 +1534,7 @@ int main(int argc, char *argv[])
struct dev_ctx ctx = {
.queue_depth = 128,
.nr_hw_queues = 2,
+ .nthreads = -1,
.dev_id = -1,
.tgt_type = "unknown",
};
@@ -1534,6 +1602,10 @@ int main(int argc, char *argv[])
ctx.flags |= UBLK_F_AUTO_BUF_REG;
if (!strcmp(longopts[option_idx].name, "auto_zc_fallback"))
ctx.auto_zc_fallback = 1;
+ if (!strcmp(longopts[option_idx].name, "nthreads"))
+ ctx.nthreads = strtol(optarg, NULL, 10);
+ if (!strcmp(longopts[option_idx].name, "per_io_tasks"))
+ ctx.per_io_tasks = 1;
break;
case '?':
/*
@@ -80,6 +80,7 @@ struct dev_ctx {
char tgt_type[16];
unsigned long flags;
unsigned nr_hw_queues;
+ unsigned nthreads;
unsigned queue_depth;
int dev_id;
int nr_files;
@@ -89,6 +90,7 @@ struct dev_ctx {
unsigned int fg:1;
unsigned int recovery:1;
unsigned int auto_zc_fallback:1;
+ unsigned int per_io_tasks:1;
int _evtfd;
int _shmid;
@@ -128,6 +130,7 @@ struct ublk_io {
unsigned short refs; /* used by target code only */
int tag;
+ int buf_index;
int result;
@@ -203,6 +206,8 @@ struct ublk_dev {
struct ublksrv_ctrl_dev_info dev_info;
struct ublk_queue q[UBLK_MAX_QUEUES];
struct ublk_thread threads[UBLK_MAX_THREADS];
+ unsigned nthreads;
+ unsigned per_io_tasks;
int fds[MAX_BACK_FILES + 1]; /* fds[0] points to /dev/ublkcN */
int nr_fds;
@@ -62,7 +62,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)
ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
- io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+ io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
sqe[0]->user_data = build_user_data(tag,
ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
@@ -70,7 +70,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)
__setup_nop_io(tag, iod, sqe[1], q->q_id);
sqe[1]->flags |= IOSQE_IO_HARDLINK;
- io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+ io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
// buf register is marked as IOSQE_CQE_SKIP_SUCCESS
@@ -136,7 +136,7 @@ static unsigned short ublk_null_buf_index(const struct ublk_queue *q, int tag)
{
if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
return (unsigned short)-1;
- return tag;
+ return q->ios[tag].buf_index;
}
const struct ublk_tgt_ops null_tgt_ops = {
@@ -141,7 +141,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, s->nr + extra);
if (zc) {
- io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+ io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, io->buf_index);
sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
sqe[0]->user_data = build_user_data(tag,
ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -167,7 +167,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
if (zc) {
struct io_uring_sqe *unreg = sqe[s->nr + 1];
- io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, tag);
+ io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, io->buf_index);
unreg->user_data = build_user_data(
tag, ublk_cmd_op_nr(unreg->cmd_op), 0, q->q_id, 1);
}
Add support in kublk for decoupled ublk_queues and ublk server threads. kublk now has two modes of operation: - (preexisting mode) threads and queues are paired 1:1, and each thread services all the I/Os of one queue - (new mode) thread and queue counts are independently configurable. threads service I/Os in a way that balances load across threads even if load is not balanced over queues. The default is the preexisting mode. The new mode is activated by passing the --per_io_tasks flag. Signed-off-by: Uday Shankar <ushankar@purestorage.com> --- tools/testing/selftests/ublk/file_backed.c | 4 +- tools/testing/selftests/ublk/kublk.c | 106 ++++++++++++++++++++++++----- tools/testing/selftests/ublk/kublk.h | 5 ++ tools/testing/selftests/ublk/null.c | 6 +- tools/testing/selftests/ublk/stripe.c | 4 +- 5 files changed, 101 insertions(+), 24 deletions(-)