@@ -19,34 +19,36 @@
#include "qapi/error.h"
#include "standard-headers/linux/virtio_blk.h"
+/* A kick fd that we monitor on behalf of libvhost-user */
typedef struct VuFdWatch {
VuDev *vu_dev;
int fd; /*kick fd*/
void *pvt;
vu_watch_cb cb;
- bool processing;
QTAILQ_ENTRY(VuFdWatch) next;
} VuFdWatch;
-typedef struct VuServer VuServer;
-
-struct VuServer {
+/**
+ * VuServer:
+ * A vhost-user server instance with user-defined VuDevIface callbacks.
+ * Vhost-user device backends can be implemented using VuServer. VuDevIface
+ * callbacks and virtqueue kicks run in the given AioContext.
+ */
+typedef struct {
QIONetListener *listener;
+ QEMUBH *restart_listener_bh;
AioContext *ctx;
int max_queues;
const VuDevIface *vu_iface;
+
+ /* Protected by ctx lock */
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
QIOChannelSocket *sioc; /* The underlying data channel with the client */
- /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
- QIOChannel *ioc_slave;
- QIOChannelSocket *sioc_slave;
- Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
QTAILQ_HEAD(, VuFdWatch) vu_fd_watches;
- /* restart coroutine co_trip if AIOContext is changed */
- bool aio_context_changed;
- bool processing_msg;
-};
+
+ Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
+} VuServer;
bool vhost_user_server_start(VuServer *server,
SocketAddress *unix_socket,
@@ -57,6 +59,7 @@ bool vhost_user_server_start(VuServer *server,
void vhost_user_server_stop(VuServer *server);
-void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx);
+void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx);
+void vhost_user_server_detach_aio_context(VuServer *server);
#endif /* VHOST_USER_SERVER_H */
@@ -313,18 +313,13 @@ static const VuDevIface vu_block_iface = {
static void blk_aio_attached(AioContext *ctx, void *opaque)
{
VuBlockDev *vub_dev = opaque;
- aio_context_acquire(ctx);
- vhost_user_server_set_aio_context(&vub_dev->vu_server, ctx);
- aio_context_release(ctx);
+ vhost_user_server_attach_aio_context(&vub_dev->vu_server, ctx);
}
static void blk_aio_detach(void *opaque)
{
VuBlockDev *vub_dev = opaque;
- AioContext *ctx = vub_dev->vu_server.ctx;
- aio_context_acquire(ctx);
- vhost_user_server_set_aio_context(&vub_dev->vu_server, NULL);
- aio_context_release(ctx);
+ vhost_user_server_detach_aio_context(&vub_dev->vu_server);
}
static void
@@ -9,8 +9,50 @@
*/
#include "qemu/osdep.h"
#include "qemu/main-loop.h"
+#include "block/aio-wait.h"
#include "vhost-user-server.h"
+/*
+ * Theory of operation:
+ *
+ * VuServer is started and stopped by vhost_user_server_start() and
+ * vhost_user_server_stop() from the main loop thread. Starting the server
+ * opens a vhost-user UNIX domain socket and listens for incoming connections.
+ * Only one connection is allowed at a time.
+ *
+ * The connection is handled by the vu_client_trip() coroutine in the
+ * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
+ * where libvhost-user calls vu_message_read() to receive the next vhost-user
+ * protocol messages over the UNIX domain socket.
+ *
+ * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
+ * fds. These fds are also handled in the VuServer->ctx AioContext.
+ *
+ * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
+ * the socket connection. Shutting down the socket connection causes
+ * vu_message_read() to fail since no more data can be received from the socket.
+ * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
+ * libvhost-user before terminating the coroutine. vu_deinit() calls
+ * remove_watch() to stop monitoring kick fds and this stops virtqueue
+ * processing.
+ *
+ * When vu_client_trip() has finished cleaning up it schedules a BH in the main
+ * loop thread to accept the next client connection.
+ *
+ * When libvhost-user detects an error it calls panic_cb() and sets the
+ * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
+ * the dev->broken flag is set.
+ *
+ * It is possible to switch AioContexts using
+ * vhost_user_server_detach_aio_context() and
+ * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
+ * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
+ * coroutine remains in a yielded state during the switch. This is made
+ * possible by QIOChannel's support for spurious coroutine re-entry in
+ * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
+ * new AioContext.
+ */
+
static void vmsg_close_fds(VhostUserMsg *vmsg)
{
int i;
@@ -27,68 +69,9 @@ static void vmsg_unblock_fds(VhostUserMsg *vmsg)
}
}
-static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
- gpointer opaque);
-
-static void close_client(VuServer *server)
-{
- /*
- * Before closing the client
- *
- * 1. Let vu_client_trip stop processing new vhost-user msg
- *
- * 2. remove kick_handler
- *
- * 3. wait for the kick handler to be finished
- *
- * 4. wait for the current vhost-user msg to be finished processing
- */
-
- QIOChannelSocket *sioc = server->sioc;
- /* When this is set vu_client_trip will stop new processing vhost-user message */
- server->sioc = NULL;
-
- while (server->processing_msg) {
- if (server->ioc->read_coroutine) {
- server->ioc->read_coroutine = NULL;
- qio_channel_set_aio_fd_handler(server->ioc, server->ioc->ctx, NULL,
- NULL, server->ioc);
- server->processing_msg = false;
- }
- }
-
- vu_deinit(&server->vu_dev);
-
- /* vu_deinit() should have called remove_watch() */
- assert(QTAILQ_EMPTY(&server->vu_fd_watches));
-
- object_unref(OBJECT(sioc));
- object_unref(OBJECT(server->ioc));
-}
-
static void panic_cb(VuDev *vu_dev, const char *buf)
{
- VuServer *server = container_of(vu_dev, VuServer, vu_dev);
-
- /* avoid while loop in close_client */
- server->processing_msg = false;
-
- if (buf) {
- error_report("vu_panic: %s", buf);
- }
-
- if (server->sioc) {
- close_client(server);
- }
-
- /*
- * Set the callback function for network listener so another
- * vhost-user client can connect to this server
- */
- qio_net_listener_set_client_func(server->listener,
- vu_accept,
- server,
- NULL);
+ error_report("vu_panic: %s", buf);
}
static bool coroutine_fn
@@ -185,28 +168,31 @@ fail:
return false;
}
-
-static void vu_client_start(VuServer *server);
static coroutine_fn void vu_client_trip(void *opaque)
{
VuServer *server = opaque;
+ VuDev *vu_dev = &server->vu_dev;
- while (!server->aio_context_changed && server->sioc) {
- server->processing_msg = true;
- vu_dispatch(&server->vu_dev);
- server->processing_msg = false;
+ while (!vu_dev->broken && vu_dispatch(vu_dev)) {
+ /* Keep running */
}
- if (server->aio_context_changed && server->sioc) {
- server->aio_context_changed = false;
- vu_client_start(server);
- }
-}
+ vu_deinit(vu_dev);
+
+ /* vu_deinit() should have called remove_watch() */
+ assert(QTAILQ_EMPTY(&server->vu_fd_watches));
+
+ object_unref(OBJECT(server->sioc));
+ server->sioc = NULL;
-static void vu_client_start(VuServer *server)
-{
- server->co_trip = qemu_coroutine_create(vu_client_trip, server);
- aio_co_enter(server->ctx, server->co_trip);
+ object_unref(OBJECT(server->ioc));
+ server->ioc = NULL;
+
+ server->co_trip = NULL;
+ if (server->restart_listener_bh) {
+ qemu_bh_schedule(server->restart_listener_bh);
+ }
+ aio_wait_kick();
}
/*
@@ -219,12 +205,18 @@ static void vu_client_start(VuServer *server)
static void kick_handler(void *opaque)
{
VuFdWatch *vu_fd_watch = opaque;
- vu_fd_watch->processing = true;
- vu_fd_watch->cb(vu_fd_watch->vu_dev, 0, vu_fd_watch->pvt);
- vu_fd_watch->processing = false;
+ VuDev *vu_dev = vu_fd_watch->vu_dev;
+
+ vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
+
+ /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
+ if (vu_dev->broken) {
+ VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+
+ qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ }
}
-
static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
{
@@ -319,62 +311,95 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
server->ioc = QIO_CHANNEL(sioc);
object_ref(OBJECT(server->ioc));
- qio_channel_attach_aio_context(server->ioc, server->ctx);
+
+ /* TODO vu_message_write() spins if non-blocking! */
qio_channel_set_blocking(server->ioc, false, NULL);
- vu_client_start(server);
+
+ server->co_trip = qemu_coroutine_create(vu_client_trip, server);
+
+ aio_context_acquire(server->ctx);
+ vhost_user_server_attach_aio_context(server, server->ctx);
+ aio_context_release(server->ctx);
}
-
void vhost_user_server_stop(VuServer *server)
{
+ aio_context_acquire(server->ctx);
+
+ qemu_bh_delete(server->restart_listener_bh);
+ server->restart_listener_bh = NULL;
+
if (server->sioc) {
- close_client(server);
+ VuFdWatch *vu_fd_watch;
+
+ QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+ aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
+ NULL, NULL, NULL, vu_fd_watch);
+ }
+
+ qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+
+ AIO_WAIT_WHILE(server->ctx, server->co_trip);
}
+ aio_context_release(server->ctx);
+
if (server->listener) {
qio_net_listener_disconnect(server->listener);
object_unref(OBJECT(server->listener));
}
+}
+
+/*
+ * Allow the next client to connect to the server. Called from a BH in the main
+ * loop.
+ */
+static void restart_listener_bh(void *opaque)
+{
+ VuServer *server = opaque;
+ qio_net_listener_set_client_func(server->listener, vu_accept, server,
+ NULL);
}
-void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx)
+/* Called with ctx acquired */
+void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
{
- VuFdWatch *vu_fd_watch, *next;
- void *opaque = NULL;
- IOHandler *io_read = NULL;
- bool attach;
+ VuFdWatch *vu_fd_watch;
- server->ctx = ctx ? ctx : qemu_get_aio_context();
+ server->ctx = ctx;
if (!server->sioc) {
- /* not yet serving any client*/
return;
}
- if (ctx) {
- qio_channel_attach_aio_context(server->ioc, ctx);
- server->aio_context_changed = true;
- io_read = kick_handler;
- attach = true;
- } else {
+ qio_channel_attach_aio_context(server->ioc, ctx);
+
+ QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+ aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL,
+ NULL, vu_fd_watch);
+ }
+
+ aio_co_schedule(ctx, server->co_trip);
+}
+
+/* Called with server->ctx acquired */
+void vhost_user_server_detach_aio_context(VuServer *server)
+{
+ if (server->sioc) {
+ VuFdWatch *vu_fd_watch;
+
+ QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+ aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
+ NULL, NULL, NULL, vu_fd_watch);
+ }
+
qio_channel_detach_aio_context(server->ioc);
- /* server->ioc->ctx keeps the old AioConext */
- ctx = server->ioc->ctx;
- attach = false;
}
- QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
- if (vu_fd_watch->cb) {
- opaque = attach ? vu_fd_watch : NULL;
- aio_set_fd_handler(ctx, vu_fd_watch->fd, true,
- io_read, NULL, NULL,
- opaque);
- }
- }
+ server->ctx = NULL;
}
-
bool vhost_user_server_start(VuServer *server,
SocketAddress *socket_addr,
AioContext *ctx,
@@ -382,6 +407,7 @@ bool vhost_user_server_start(VuServer *server,
const VuDevIface *vu_iface,
Error **errp)
{
+ QEMUBH *bh;
QIONetListener *listener = qio_net_listener_new();
if (qio_net_listener_open_sync(listener, socket_addr, 1,
errp) < 0) {
@@ -389,9 +415,12 @@ bool vhost_user_server_start(VuServer *server,
return false;
}
+ bh = qemu_bh_new(restart_listener_bh, server);
+
/* zero out unspecified fields */
*server = (VuServer) {
.listener = listener,
+ .restart_listener_bh = bh,
.vu_iface = vu_iface,
.max_queues = max_queues,
.ctx = ctx,
The vu_client_trip() coroutine is leaked during AioContext switching. It is also unsafe to destroy the vu_dev in panic_cb() since its callers still access it in some cases. Rework the lifecycle to solve these safety issues. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- util/vhost-user-server.h | 29 ++-- block/export/vhost-user-blk-server.c | 9 +- util/vhost-user-server.c | 245 +++++++++++++++------------ 3 files changed, 155 insertions(+), 128 deletions(-)