diff mbox series

[v3] eventpoll: Fix priority inversion problem

Message ID 20250527090836.1290532-1-namcao@linutronix.de
State New
Headers show
Series [v3] eventpoll: Fix priority inversion problem | expand

Commit Message

Nam Cao May 27, 2025, 9:08 a.m. UTC
The ready event list of an epoll object is protected by read-write
semaphore:

  - The consumer (waiter) acquires the write lock and takes items.
  - the producer (waker) takes the read lock and adds items.

The point of this design is enabling epoll to scale well with large number
of producers, as multiple producers can hold the read lock at the same
time.

Unfortunately, this implementation may cause scheduling priority inversion
problem. Suppose the consumer has higher scheduling priority than the
producer. The consumer needs to acquire the write lock, but may be blocked
by the producer holding the read lock. Since read-write semaphore does not
support priority-boosting for the readers (even with CONFIG_PREEMPT_RT=y),
we have a case of priority inversion: a higher priority consumer is blocked
by a lower priority producer. This problem was reported in [1].

Furthermore, this could also cause stall problem, as described in [2].

To fix this problem, make the event list half-lockless:

  - The consumer acquires a mutex (ep->mtx) and takes items.
  - The producer locklessly adds items to the list.

Performance is not the main goal of this patch, but as the producer now can
add items without waiting for consumer to release the lock, performance
improvement is observed using the stress test from
https://github.com/rouming/test-tools/blob/master/stress-epoll.c. This is
the same test that justified using read-write semaphore in the past.

Testing using 12 x86_64 CPUs:

          Before     After        Diff
threads  events/ms  events/ms
      8       6932      19753    +185%
     16       7820      27923    +257%
     32       7648      35164    +360%
     64       9677      37780    +290%
    128      11166      38174    +242%

Testing using 1 riscv64 CPU (averaged over 10 runs, as the numbers are
noisy):

          Before     After        Diff
threads  events/ms  events/ms
      1         73        129     +77%
      2        151        216     +43%
      4        216        364     +69%
      8        234        382     +63%
     16        251        392     +56%

Reported-by: Frederic Weisbecker <frederic@kernel.org>
Closes: https://lore.kernel.org/linux-rt-users/20210825132754.GA895675@lothringen/ [1]
Reported-by: Valentin Schneider <vschneid@redhat.com>
Closes: https://lore.kernel.org/linux-rt-users/xhsmhttqvnall.mognet@vschneid.remote.csb/ [2]
Signed-off-by: Nam Cao <namcao@linutronix.de>
---
v3:
  - get rid of the "link_used" and "ready" flags. They are hard to
    understand and unnecessary
  - get rid of the obsolete lockdep_assert_irqs_enabled()
  - Add lockdep_assert_held(&ep->mtx)
  - rewrite some comments
v2:
  - rename link_locked -> link_used
  - replace xchg() with smp_store_release() when applicable
  - make sure llist_node is in clean state when not on a list
  - remove now-unused list_add_tail_lockless()
---
 fs/eventpoll.c | 458 +++++++++++++++----------------------------------
 1 file changed, 134 insertions(+), 324 deletions(-)
diff mbox series

Patch

diff --git a/fs/eventpoll.c b/fs/eventpoll.c
index d4dbffdedd08e..a97a771a459c9 100644
--- a/fs/eventpoll.c
+++ b/fs/eventpoll.c
@@ -137,13 +137,7 @@  struct epitem {
 	};
 
 	/* List header used to link this structure to the eventpoll ready list */
-	struct list_head rdllink;
-
-	/*
-	 * Works together "struct eventpoll"->ovflist in keeping the
-	 * single linked chain of items.
-	 */
-	struct epitem *next;
+	struct llist_node rdllink;
 
 	/* The file descriptor information this item refers to */
 	struct epoll_filefd ffd;
@@ -191,22 +185,15 @@  struct eventpoll {
 	/* Wait queue used by file->poll() */
 	wait_queue_head_t poll_wait;
 
-	/* List of ready file descriptors */
-	struct list_head rdllist;
-
-	/* Lock which protects rdllist and ovflist */
-	rwlock_t lock;
+	/*
+	 * List of ready file descriptors. Adding to this list is lockless. Items can be removed
+	 * only with eventpoll::mtx
+	 */
+	struct llist_head rdllist;
 
 	/* RB tree root used to store monitored fd structs */
 	struct rb_root_cached rbr;
 
-	/*
-	 * This is a single linked list that chains all the "struct epitem" that
-	 * happened while transferring ready events to userspace w/out
-	 * holding ->lock.
-	 */
-	struct epitem *ovflist;
-
 	/* wakeup_source used when ep_send_events or __ep_eventpoll_poll is running */
 	struct wakeup_source *ws;
 
@@ -361,10 +348,14 @@  static inline int ep_cmp_ffd(struct epoll_filefd *p1,
 	        (p1->file < p2->file ? -1 : p1->fd - p2->fd));
 }
 
-/* Tells us if the item is currently linked */
-static inline int ep_is_linked(struct epitem *epi)
+/*
+ * Add the item to its container eventpoll's rdllist; do nothing if the item is already on rdllist.
+ */
+static void epitem_ready(struct epitem *epi)
 {
-	return !list_empty(&epi->rdllink);
+	if (&epi->rdllink == cmpxchg(&epi->rdllink.next, &epi->rdllink, NULL))
+		llist_add(&epi->rdllink, &epi->ep->rdllist);
+
 }
 
 static inline struct eppoll_entry *ep_pwq_from_wait(wait_queue_entry_t *p)
@@ -383,13 +374,26 @@  static inline struct epitem *ep_item_from_wait(wait_queue_entry_t *p)
  *
  * @ep: Pointer to the eventpoll context.
  *
- * Return: a value different than %zero if ready events are available,
- *          or %zero otherwise.
+ * Return: true if ready events might be available, false otherwise.
  */
-static inline int ep_events_available(struct eventpoll *ep)
+static inline bool ep_events_available(struct eventpoll *ep)
 {
-	return !list_empty_careful(&ep->rdllist) ||
-		READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
+	bool available;
+	int locked;
+
+	locked = mutex_trylock(&ep->mtx);
+	if (!locked) {
+		/*
+		 * The lock held and someone might have removed all items while inspecting it. The
+		 * llist_empty() check in this case is futile. Assume that something is enqueued and
+		 * let ep_try_send_events() figure it out.
+		 */
+		return true;
+	}
+
+	available = !llist_empty(&ep->rdllist);
+	mutex_unlock(&ep->mtx);
+	return available;
 }
 
 #ifdef CONFIG_NET_RX_BUSY_POLL
@@ -724,77 +728,6 @@  static inline void ep_pm_stay_awake_rcu(struct epitem *epi)
 	rcu_read_unlock();
 }
 
-
-/*
- * ep->mutex needs to be held because we could be hit by
- * eventpoll_release_file() and epoll_ctl().
- */
-static void ep_start_scan(struct eventpoll *ep, struct list_head *txlist)
-{
-	/*
-	 * Steal the ready list, and re-init the original one to the
-	 * empty list. Also, set ep->ovflist to NULL so that events
-	 * happening while looping w/out locks, are not lost. We cannot
-	 * have the poll callback to queue directly on ep->rdllist,
-	 * because we want the "sproc" callback to be able to do it
-	 * in a lockless way.
-	 */
-	lockdep_assert_irqs_enabled();
-	write_lock_irq(&ep->lock);
-	list_splice_init(&ep->rdllist, txlist);
-	WRITE_ONCE(ep->ovflist, NULL);
-	write_unlock_irq(&ep->lock);
-}
-
-static void ep_done_scan(struct eventpoll *ep,
-			 struct list_head *txlist)
-{
-	struct epitem *epi, *nepi;
-
-	write_lock_irq(&ep->lock);
-	/*
-	 * During the time we spent inside the "sproc" callback, some
-	 * other events might have been queued by the poll callback.
-	 * We re-insert them inside the main ready-list here.
-	 */
-	for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
-	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
-		/*
-		 * We need to check if the item is already in the list.
-		 * During the "sproc" callback execution time, items are
-		 * queued into ->ovflist but the "txlist" might already
-		 * contain them, and the list_splice() below takes care of them.
-		 */
-		if (!ep_is_linked(epi)) {
-			/*
-			 * ->ovflist is LIFO, so we have to reverse it in order
-			 * to keep in FIFO.
-			 */
-			list_add(&epi->rdllink, &ep->rdllist);
-			ep_pm_stay_awake(epi);
-		}
-	}
-	/*
-	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
-	 * releasing the lock, events will be queued in the normal way inside
-	 * ep->rdllist.
-	 */
-	WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
-
-	/*
-	 * Quickly re-inject items left on "txlist".
-	 */
-	list_splice(txlist, &ep->rdllist);
-	__pm_relax(ep->ws);
-
-	if (!list_empty(&ep->rdllist)) {
-		if (waitqueue_active(&ep->wq))
-			wake_up(&ep->wq);
-	}
-
-	write_unlock_irq(&ep->lock);
-}
-
 static void ep_get(struct eventpoll *ep)
 {
 	refcount_inc(&ep->refcount);
@@ -832,10 +765,12 @@  static void ep_free(struct eventpoll *ep)
 static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
 {
 	struct file *file = epi->ffd.file;
+	struct llist_node *put_back_last;
 	struct epitems_head *to_free;
 	struct hlist_head *head;
+	LLIST_HEAD(put_back);
 
-	lockdep_assert_irqs_enabled();
+	lockdep_assert_held(&ep->mtx);
 
 	/*
 	 * Removes poll wait queue hooks.
@@ -867,10 +802,20 @@  static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
 
 	rb_erase_cached(&epi->rbn, &ep->rbr);
 
-	write_lock_irq(&ep->lock);
-	if (ep_is_linked(epi))
-		list_del_init(&epi->rdllink);
-	write_unlock_irq(&ep->lock);
+	if (llist_on_list(&epi->rdllink)) {
+		put_back_last = NULL;
+		while (true) {
+			struct llist_node *n = llist_del_first(&ep->rdllist);
+
+			if (&epi->rdllink == n || WARN_ON(!n))
+				break;
+			if (!put_back_last)
+				put_back_last = n;
+			__llist_add(n, &put_back);
+		}
+		if (put_back_last)
+			llist_add_batch(put_back.first, put_back_last, &ep->rdllist);
+	}
 
 	wakeup_source_unregister(ep_wakeup_source(epi));
 	/*
@@ -974,8 +919,9 @@  static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth
 static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
 {
 	struct eventpoll *ep = file->private_data;
-	LIST_HEAD(txlist);
-	struct epitem *epi, *tmp;
+	struct wakeup_source *ws;
+	struct llist_node *n;
+	struct epitem *epi;
 	poll_table pt;
 	__poll_t res = 0;
 
@@ -989,22 +935,39 @@  static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int dep
 	 * the ready list.
 	 */
 	mutex_lock_nested(&ep->mtx, depth);
-	ep_start_scan(ep, &txlist);
-	list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
+	while (true) {
+		n = llist_del_first_init(&ep->rdllist);
+		if (!n)
+			break;
+
+		epi = llist_entry(n, struct epitem, rdllink);
+
 		if (ep_item_poll(epi, &pt, depth + 1)) {
 			res = EPOLLIN | EPOLLRDNORM;
+			epitem_ready(epi);
 			break;
 		} else {
 			/*
-			 * Item has been dropped into the ready list by the poll
-			 * callback, but it's not actually ready, as far as
-			 * caller requested events goes. We can remove it here.
+			 * We need to activate ep before deactivating epi, to prevent autosuspend
+			 * just in case epi becomes active after ep_item_poll() above.
+			 *
+			 * This is similar to ep_send_events().
 			 */
+			ws = ep_wakeup_source(epi);
+			if (ws) {
+				if (ws->active)
+					__pm_stay_awake(ep->ws);
+				__pm_relax(ws);
+			}
 			__pm_relax(ep_wakeup_source(epi));
-			list_del_init(&epi->rdllink);
+
+			/* Just in case epi becomes active right before __pm_relax() */
+			if (unlikely(ep_item_poll(epi, &pt, depth + 1)))
+				ep_pm_stay_awake(epi);
+
+			__pm_relax(ep->ws);
 		}
 	}
-	ep_done_scan(ep, &txlist);
 	mutex_unlock(&ep->mtx);
 	return res;
 }
@@ -1153,12 +1116,10 @@  static int ep_alloc(struct eventpoll **pep)
 		return -ENOMEM;
 
 	mutex_init(&ep->mtx);
-	rwlock_init(&ep->lock);
 	init_waitqueue_head(&ep->wq);
 	init_waitqueue_head(&ep->poll_wait);
-	INIT_LIST_HEAD(&ep->rdllist);
+	init_llist_head(&ep->rdllist);
 	ep->rbr = RB_ROOT_CACHED;
-	ep->ovflist = EP_UNACTIVE_PTR;
 	ep->user = get_current_user();
 	refcount_set(&ep->refcount, 1);
 
@@ -1240,94 +1201,11 @@  struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
 }
 #endif /* CONFIG_KCMP */
 
-/*
- * Adds a new entry to the tail of the list in a lockless way, i.e.
- * multiple CPUs are allowed to call this function concurrently.
- *
- * Beware: it is necessary to prevent any other modifications of the
- *         existing list until all changes are completed, in other words
- *         concurrent list_add_tail_lockless() calls should be protected
- *         with a read lock, where write lock acts as a barrier which
- *         makes sure all list_add_tail_lockless() calls are fully
- *         completed.
- *
- *        Also an element can be locklessly added to the list only in one
- *        direction i.e. either to the tail or to the head, otherwise
- *        concurrent access will corrupt the list.
- *
- * Return: %false if element has been already added to the list, %true
- * otherwise.
- */
-static inline bool list_add_tail_lockless(struct list_head *new,
-					  struct list_head *head)
-{
-	struct list_head *prev;
-
-	/*
-	 * This is simple 'new->next = head' operation, but cmpxchg()
-	 * is used in order to detect that same element has been just
-	 * added to the list from another CPU: the winner observes
-	 * new->next == new.
-	 */
-	if (!try_cmpxchg(&new->next, &new, head))
-		return false;
-
-	/*
-	 * Initially ->next of a new element must be updated with the head
-	 * (we are inserting to the tail) and only then pointers are atomically
-	 * exchanged.  XCHG guarantees memory ordering, thus ->next should be
-	 * updated before pointers are actually swapped and pointers are
-	 * swapped before prev->next is updated.
-	 */
-
-	prev = xchg(&head->prev, new);
-
-	/*
-	 * It is safe to modify prev->next and new->prev, because a new element
-	 * is added only to the tail and new->next is updated before XCHG.
-	 */
-
-	prev->next = new;
-	new->prev = prev;
-
-	return true;
-}
-
-/*
- * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
- * i.e. multiple CPUs are allowed to call this function concurrently.
- *
- * Return: %false if epi element has been already chained, %true otherwise.
- */
-static inline bool chain_epi_lockless(struct epitem *epi)
-{
-	struct eventpoll *ep = epi->ep;
-
-	/* Fast preliminary check */
-	if (epi->next != EP_UNACTIVE_PTR)
-		return false;
-
-	/* Check that the same epi has not been just chained from another CPU */
-	if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
-		return false;
-
-	/* Atomically exchange tail */
-	epi->next = xchg(&ep->ovflist, epi);
-
-	return true;
-}
-
 /*
  * This is the callback that is passed to the wait queue wakeup
  * mechanism. It is called by the stored file descriptors when they
  * have events to report.
  *
- * This callback takes a read lock in order not to contend with concurrent
- * events from another file descriptor, thus all modifications to ->rdllist
- * or ->ovflist are lockless.  Read lock is paired with the write lock from
- * ep_start/done_scan(), which stops all list modifications and guarantees
- * that lists state is seen correctly.
- *
  * Another thing worth to mention is that ep_poll_callback() can be called
  * concurrently for the same @epi from different CPUs if poll table was inited
  * with several wait queues entries.  Plural wakeup from different CPUs of a
@@ -1337,15 +1215,11 @@  static inline bool chain_epi_lockless(struct epitem *epi)
  */
 static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
 {
-	int pwake = 0;
 	struct epitem *epi = ep_item_from_wait(wait);
 	struct eventpoll *ep = epi->ep;
 	__poll_t pollflags = key_to_poll(key);
-	unsigned long flags;
 	int ewake = 0;
 
-	read_lock_irqsave(&ep->lock, flags);
-
 	ep_set_busy_poll_napi_id(epi);
 
 	/*
@@ -1355,7 +1229,7 @@  static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
 	 * until the next EPOLL_CTL_MOD will be issued.
 	 */
 	if (!(epi->event.events & ~EP_PRIVATE_BITS))
-		goto out_unlock;
+		goto out;
 
 	/*
 	 * Check the events coming with the callback. At this stage, not
@@ -1364,22 +1238,10 @@  static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
 	 * test for "key" != NULL before the event match test.
 	 */
 	if (pollflags && !(pollflags & epi->event.events))
-		goto out_unlock;
+		goto out;
 
-	/*
-	 * If we are transferring events to userspace, we can hold no locks
-	 * (because we're accessing user memory, and because of linux f_op->poll()
-	 * semantics). All the events that happen during that period of time are
-	 * chained in ep->ovflist and requeued later on.
-	 */
-	if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
-		if (chain_epi_lockless(epi))
-			ep_pm_stay_awake_rcu(epi);
-	} else if (!ep_is_linked(epi)) {
-		/* In the usual case, add event to ready list. */
-		if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
-			ep_pm_stay_awake_rcu(epi);
-	}
+	ep_pm_stay_awake_rcu(epi);
+	epitem_ready(epi);
 
 	/*
 	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
@@ -1408,15 +1270,9 @@  static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
 			wake_up(&ep->wq);
 	}
 	if (waitqueue_active(&ep->poll_wait))
-		pwake++;
-
-out_unlock:
-	read_unlock_irqrestore(&ep->lock, flags);
-
-	/* We have to call this outside the lock */
-	if (pwake)
 		ep_poll_safewake(ep, epi, pollflags & EPOLL_URING_WAKE);
 
+out:
 	if (!(epi->event.events & EPOLLEXCLUSIVE))
 		ewake = 1;
 
@@ -1661,8 +1517,6 @@  static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 	if (is_file_epoll(tfile))
 		tep = tfile->private_data;
 
-	lockdep_assert_irqs_enabled();
-
 	if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
 					    max_user_watches) >= 0))
 		return -ENOSPC;
@@ -1674,11 +1528,10 @@  static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 	}
 
 	/* Item initialization follow here ... */
-	INIT_LIST_HEAD(&epi->rdllink);
+	init_llist_node(&epi->rdllink);
 	epi->ep = ep;
 	ep_set_ffd(&epi->ffd, tfile, fd);
 	epi->event = *event;
-	epi->next = EP_UNACTIVE_PTR;
 
 	if (tep)
 		mutex_lock_nested(&tep->mtx, 1);
@@ -1745,16 +1598,13 @@  static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 		return -ENOMEM;
 	}
 
-	/* We have to drop the new item inside our item list to keep track of it */
-	write_lock_irq(&ep->lock);
-
 	/* record NAPI ID of new item if present */
 	ep_set_busy_poll_napi_id(epi);
 
 	/* If the file is already "ready" we drop it inside the ready list */
-	if (revents && !ep_is_linked(epi)) {
-		list_add_tail(&epi->rdllink, &ep->rdllist);
+	if (revents) {
 		ep_pm_stay_awake(epi);
+		epitem_ready(epi);
 
 		/* Notify waiting tasks that events are available */
 		if (waitqueue_active(&ep->wq))
@@ -1763,8 +1613,6 @@  static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 			pwake++;
 	}
 
-	write_unlock_irq(&ep->lock);
-
 	/* We have to call this outside the lock */
 	if (pwake)
 		ep_poll_safewake(ep, NULL, 0);
@@ -1779,11 +1627,8 @@  static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 static int ep_modify(struct eventpoll *ep, struct epitem *epi,
 		     const struct epoll_event *event)
 {
-	int pwake = 0;
 	poll_table pt;
 
-	lockdep_assert_irqs_enabled();
-
 	init_poll_funcptr(&pt, NULL);
 
 	/*
@@ -1827,24 +1672,16 @@  static int ep_modify(struct eventpoll *ep, struct epitem *epi,
 	 * list, push it inside.
 	 */
 	if (ep_item_poll(epi, &pt, 1)) {
-		write_lock_irq(&ep->lock);
-		if (!ep_is_linked(epi)) {
-			list_add_tail(&epi->rdllink, &ep->rdllist);
-			ep_pm_stay_awake(epi);
+		ep_pm_stay_awake(epi);
+		epitem_ready(epi);
 
-			/* Notify waiting tasks that events are available */
-			if (waitqueue_active(&ep->wq))
-				wake_up(&ep->wq);
-			if (waitqueue_active(&ep->poll_wait))
-				pwake++;
-		}
-		write_unlock_irq(&ep->lock);
+		/* Notify waiting tasks that events are available */
+		if (waitqueue_active(&ep->wq))
+			wake_up(&ep->wq);
+		if (waitqueue_active(&ep->poll_wait))
+			ep_poll_safewake(ep, NULL, 0);
 	}
 
-	/* We have to call this outside the lock */
-	if (pwake)
-		ep_poll_safewake(ep, NULL, 0);
-
 	return 0;
 }
 
@@ -1852,7 +1689,7 @@  static int ep_send_events(struct eventpoll *ep,
 			  struct epoll_event __user *events, int maxevents)
 {
 	struct epitem *epi, *tmp;
-	LIST_HEAD(txlist);
+	LLIST_HEAD(txlist);
 	poll_table pt;
 	int res = 0;
 
@@ -1867,19 +1704,18 @@  static int ep_send_events(struct eventpoll *ep,
 	init_poll_funcptr(&pt, NULL);
 
 	mutex_lock(&ep->mtx);
-	ep_start_scan(ep, &txlist);
 
-	/*
-	 * We can loop without lock because we are passed a task private list.
-	 * Items cannot vanish during the loop we are holding ep->mtx.
-	 */
-	list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
+	while (res < maxevents) {
 		struct wakeup_source *ws;
+		struct llist_node *n;
 		__poll_t revents;
 
-		if (res >= maxevents)
+		n = llist_del_first(&ep->rdllist);
+		if (!n)
 			break;
 
+		epi = llist_entry(n, struct epitem, rdllink);
+
 		/*
 		 * Activate ep->ws before deactivating epi->ws to prevent
 		 * triggering auto-suspend here (in case we reactive epi->ws
@@ -1896,21 +1732,30 @@  static int ep_send_events(struct eventpoll *ep,
 			__pm_relax(ws);
 		}
 
-		list_del_init(&epi->rdllink);
-
 		/*
 		 * If the event mask intersect the caller-requested one,
 		 * deliver the event to userspace. Again, we are holding ep->mtx,
 		 * so no operations coming from userspace can change the item.
 		 */
 		revents = ep_item_poll(epi, &pt, 1);
-		if (!revents)
+		if (!revents) {
+			init_llist_node(n);
+
+			/*
+			 * Just in case epi becomes ready after ep_item_poll() above, but before
+			 * init_llist_node(). Make sure to add it to the ready list, otherwise an
+			 * event may be lost.
+			 */
+			if (unlikely(ep_item_poll(epi, &pt, 1))) {
+				ep_pm_stay_awake(epi);
+				epitem_ready(epi);
+			}
 			continue;
+		}
 
 		events = epoll_put_uevent(revents, epi->event.data, events);
 		if (!events) {
-			list_add(&epi->rdllink, &txlist);
-			ep_pm_stay_awake(epi);
+			llist_add(&epi->rdllink, &ep->rdllist);
 			if (!res)
 				res = -EFAULT;
 			break;
@@ -1918,25 +1763,31 @@  static int ep_send_events(struct eventpoll *ep,
 		res++;
 		if (epi->event.events & EPOLLONESHOT)
 			epi->event.events &= EP_PRIVATE_BITS;
-		else if (!(epi->event.events & EPOLLET)) {
+		__llist_add(n, &txlist);
+	}
+
+	llist_for_each_entry_safe(epi, tmp, txlist.first, rdllink) {
+		init_llist_node(&epi->rdllink);
+
+		if (!(epi->event.events & EPOLLET)) {
 			/*
-			 * If this file has been added with Level
-			 * Trigger mode, we need to insert back inside
-			 * the ready list, so that the next call to
-			 * epoll_wait() will check again the events
-			 * availability. At this point, no one can insert
-			 * into ep->rdllist besides us. The epoll_ctl()
-			 * callers are locked out by
-			 * ep_send_events() holding "mtx" and the
-			 * poll callback will queue them in ep->ovflist.
+			 * If this file has been added with Level Trigger mode, we need to insert
+			 * back inside the ready list, so that the next call to epoll_wait() will
+			 * check again the events availability.
 			 */
-			list_add_tail(&epi->rdllink, &ep->rdllist);
 			ep_pm_stay_awake(epi);
+			epitem_ready(epi);
 		}
 	}
-	ep_done_scan(ep, &txlist);
+
+	__pm_relax(ep->ws);
 	mutex_unlock(&ep->mtx);
 
+	if (!llist_empty(&ep->rdllist)) {
+		if (waitqueue_active(&ep->wq))
+			wake_up(&ep->wq);
+	}
+
 	return res;
 }
 
@@ -2029,8 +1880,6 @@  static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
 	wait_queue_entry_t wait;
 	ktime_t expires, *to = NULL;
 
-	lockdep_assert_irqs_enabled();
-
 	if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
 		slack = select_estimate_accuracy(timeout);
 		to = &expires;
@@ -2090,54 +1939,15 @@  static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
 		init_wait(&wait);
 		wait.func = ep_autoremove_wake_function;
 
-		write_lock_irq(&ep->lock);
-		/*
-		 * Barrierless variant, waitqueue_active() is called under
-		 * the same lock on wakeup ep_poll_callback() side, so it
-		 * is safe to avoid an explicit barrier.
-		 */
-		__set_current_state(TASK_INTERRUPTIBLE);
+		prepare_to_wait_exclusive(&ep->wq, &wait, TASK_INTERRUPTIBLE);
 
-		/*
-		 * Do the final check under the lock. ep_start/done_scan()
-		 * plays with two lists (->rdllist and ->ovflist) and there
-		 * is always a race when both lists are empty for short
-		 * period of time although events are pending, so lock is
-		 * important.
-		 */
-		eavail = ep_events_available(ep);
-		if (!eavail)
-			__add_wait_queue_exclusive(&ep->wq, &wait);
-
-		write_unlock_irq(&ep->lock);
-
-		if (!eavail)
+		if (!ep_events_available(ep))
 			timed_out = !ep_schedule_timeout(to) ||
 				!schedule_hrtimeout_range(to, slack,
 							  HRTIMER_MODE_ABS);
-		__set_current_state(TASK_RUNNING);
-
-		/*
-		 * We were woken up, thus go and try to harvest some events.
-		 * If timed out and still on the wait queue, recheck eavail
-		 * carefully under lock, below.
-		 */
-		eavail = 1;
 
-		if (!list_empty_careful(&wait.entry)) {
-			write_lock_irq(&ep->lock);
-			/*
-			 * If the thread timed out and is not on the wait queue,
-			 * it means that the thread was woken up after its
-			 * timeout expired before it could reacquire the lock.
-			 * Thus, when wait.entry is empty, it needs to harvest
-			 * events.
-			 */
-			if (timed_out)
-				eavail = list_empty(&wait.entry);
-			__remove_wait_queue(&ep->wq, &wait);
-			write_unlock_irq(&ep->lock);
-		}
+		finish_wait(&ep->wq, &wait);
+		eavail = ep_events_available(ep);
 	}
 }