Merge tag 'io_uring-5.14-2021-07-09' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "A few fixes that should go into this merge.

  One fixes a regression introduced in this release, others are just
  generic fixes, mostly related to handling fallback task_work"

* tag 'io_uring-5.14-2021-07-09' of git://git.kernel.dk/linux-block:
  io_uring: remove dead non-zero 'poll' check
  io_uring: mitigate unlikely iopoll lag
  io_uring: fix drain alloc fail return code
  io_uring: fix exiting io_req_task_work_add leaks
  io_uring: simplify task_work func
  io_uring: fix stuck fallback reqs
diff --git a/fs/io_uring.c b/fs/io_uring.c
index e55b21f..d94fb58 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -465,7 +465,8 @@
 		struct mm_struct		*mm_account;
 
 		/* ctx exit and cancelation */
-		struct callback_head		*exit_task_work;
+		struct llist_head		fallback_llist;
+		struct delayed_work		fallback_work;
 		struct work_struct		exit_work;
 		struct list_head		tctx_list;
 		struct completion		ref_comp;
@@ -784,9 +785,14 @@
 	struct io_poll_iocb	*double_poll;
 };
 
+typedef void (*io_req_tw_func_t)(struct io_kiocb *req);
+
 struct io_task_work {
-	struct io_wq_work_node	node;
-	task_work_func_t	func;
+	union {
+		struct io_wq_work_node	node;
+		struct llist_node	fallback_node;
+	};
+	io_req_tw_func_t		func;
 };
 
 enum {
@@ -849,10 +855,7 @@
 
 	/* used with ctx->iopoll_list with reads/writes */
 	struct list_head		inflight_entry;
-	union {
-		struct io_task_work	io_task_work;
-		struct callback_head	task_work;
-	};
+	struct io_task_work		io_task_work;
 	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
 	struct hlist_node		hash_node;
 	struct async_poll		*apoll;
@@ -1071,6 +1074,8 @@
 static bool io_poll_remove_waitqs(struct io_kiocb *req);
 static int io_req_prep_async(struct io_kiocb *req);
 
+static void io_fallback_req_func(struct work_struct *unused);
+
 static struct kmem_cache *req_cachep;
 
 static const struct file_operations io_uring_fops;
@@ -1202,6 +1207,7 @@
 	INIT_LIST_HEAD(&ctx->tctx_list);
 	INIT_LIST_HEAD(&ctx->submit_state.comp.free_list);
 	INIT_LIST_HEAD(&ctx->locked_free_list);
+	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
 	return ctx;
 err:
 	kfree(ctx->dummy_ubuf);
@@ -1929,7 +1935,7 @@
 				ctx = req->ctx;
 				percpu_ref_get(&ctx->refs);
 			}
-			req->task_work.func(&req->task_work);
+			req->io_task_work.func(req);
 			node = next;
 		}
 		if (wq_list_empty(&tctx->task_list)) {
@@ -1946,17 +1952,13 @@
 	ctx_flush_and_put(ctx);
 }
 
-static int io_req_task_work_add(struct io_kiocb *req)
+static void io_req_task_work_add(struct io_kiocb *req)
 {
 	struct task_struct *tsk = req->task;
 	struct io_uring_task *tctx = tsk->io_uring;
 	enum task_work_notify_mode notify;
-	struct io_wq_work_node *node, *prev;
+	struct io_wq_work_node *node;
 	unsigned long flags;
-	int ret = 0;
-
-	if (unlikely(tsk->flags & PF_EXITING))
-		return -ESRCH;
 
 	WARN_ON_ONCE(!tctx);
 
@@ -1967,7 +1969,9 @@
 	/* task_work already pending, we're done */
 	if (test_bit(0, &tctx->task_state) ||
 	    test_and_set_bit(0, &tctx->task_state))
-		return 0;
+		return;
+	if (unlikely(tsk->flags & PF_EXITING))
+		goto fail;
 
 	/*
 	 * SQPOLL kernel thread doesn't need notification, just a wakeup. For
@@ -1976,72 +1980,28 @@
 	 * will do the job.
 	 */
 	notify = (req->ctx->flags & IORING_SETUP_SQPOLL) ? TWA_NONE : TWA_SIGNAL;
-
 	if (!task_work_add(tsk, &tctx->task_work, notify)) {
 		wake_up_process(tsk);
-		return 0;
+		return;
 	}
-
-	/*
-	 * Slow path - we failed, find and delete work. if the work is not
-	 * in the list, it got run and we're fine.
-	 */
-	spin_lock_irqsave(&tctx->task_lock, flags);
-	wq_list_for_each(node, prev, &tctx->task_list) {
-		if (&req->io_task_work.node == node) {
-			wq_list_del(&tctx->task_list, node, prev);
-			ret = 1;
-			break;
-		}
-	}
-	spin_unlock_irqrestore(&tctx->task_lock, flags);
+fail:
 	clear_bit(0, &tctx->task_state);
-	return ret;
+	spin_lock_irqsave(&tctx->task_lock, flags);
+	node = tctx->task_list.first;
+	INIT_WQ_LIST(&tctx->task_list);
+	spin_unlock_irqrestore(&tctx->task_lock, flags);
+
+	while (node) {
+		req = container_of(node, struct io_kiocb, io_task_work.node);
+		node = node->next;
+		if (llist_add(&req->io_task_work.fallback_node,
+			      &req->ctx->fallback_llist))
+			schedule_delayed_work(&req->ctx->fallback_work, 1);
+	}
 }
 
-static bool io_run_task_work_head(struct callback_head **work_head)
+static void io_req_task_cancel(struct io_kiocb *req)
 {
-	struct callback_head *work, *next;
-	bool executed = false;
-
-	do {
-		work = xchg(work_head, NULL);
-		if (!work)
-			break;
-
-		do {
-			next = work->next;
-			work->func(work);
-			work = next;
-			cond_resched();
-		} while (work);
-		executed = true;
-	} while (1);
-
-	return executed;
-}
-
-static void io_task_work_add_head(struct callback_head **work_head,
-				  struct callback_head *task_work)
-{
-	struct callback_head *head;
-
-	do {
-		head = READ_ONCE(*work_head);
-		task_work->next = head;
-	} while (cmpxchg(work_head, head, task_work) != head);
-}
-
-static void io_req_task_work_add_fallback(struct io_kiocb *req,
-					  task_work_func_t cb)
-{
-	init_task_work(&req->task_work, cb);
-	io_task_work_add_head(&req->ctx->exit_task_work, &req->task_work);
-}
-
-static void io_req_task_cancel(struct callback_head *cb)
-{
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
 	struct io_ring_ctx *ctx = req->ctx;
 
 	/* ctx is guaranteed to stay alive while we hold uring_lock */
@@ -2050,7 +2010,7 @@
 	mutex_unlock(&ctx->uring_lock);
 }
 
-static void __io_req_task_submit(struct io_kiocb *req)
+static void io_req_task_submit(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
@@ -2063,28 +2023,17 @@
 	mutex_unlock(&ctx->uring_lock);
 }
 
-static void io_req_task_submit(struct callback_head *cb)
-{
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
-
-	__io_req_task_submit(req);
-}
-
 static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
 {
 	req->result = ret;
-	req->task_work.func = io_req_task_cancel;
-
-	if (unlikely(io_req_task_work_add(req)))
-		io_req_task_work_add_fallback(req, io_req_task_cancel);
+	req->io_task_work.func = io_req_task_cancel;
+	io_req_task_work_add(req);
 }
 
 static void io_req_task_queue(struct io_kiocb *req)
 {
-	req->task_work.func = io_req_task_submit;
-
-	if (unlikely(io_req_task_work_add(req)))
-		io_req_task_queue_fail(req, -ECANCELED);
+	req->io_task_work.func = io_req_task_submit;
+	io_req_task_work_add(req);
 }
 
 static inline void io_queue_next(struct io_kiocb *req)
@@ -2195,18 +2144,10 @@
 		io_free_req(req);
 }
 
-static void io_put_req_deferred_cb(struct callback_head *cb)
-{
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
-
-	io_free_req(req);
-}
-
 static void io_free_req_deferred(struct io_kiocb *req)
 {
-	req->task_work.func = io_put_req_deferred_cb;
-	if (unlikely(io_req_task_work_add(req)))
-		io_req_task_work_add_fallback(req, io_put_req_deferred_cb);
+	req->io_task_work.func = io_free_req;
+	io_req_task_work_add(req);
 }
 
 static inline void io_put_req_deferred(struct io_kiocb *req, int refs)
@@ -2415,11 +2356,15 @@
 		 * very same mutex.
 		 */
 		if (list_empty(&ctx->iopoll_list)) {
+			u32 tail = ctx->cached_cq_tail;
+
 			mutex_unlock(&ctx->uring_lock);
 			io_run_task_work();
 			mutex_lock(&ctx->uring_lock);
 
-			if (list_empty(&ctx->iopoll_list))
+			/* some requests don't go through iopoll_list */
+			if (tail != ctx->cached_cq_tail ||
+			    list_empty(&ctx->iopoll_list))
 				break;
 		}
 		ret = io_do_iopoll(ctx, &nr_events, min);
@@ -2485,6 +2430,17 @@
 }
 #endif
 
+static void io_fallback_req_func(struct work_struct *work)
+{
+	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
+						fallback_work.work);
+	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
+	struct io_kiocb *req, *tmp;
+
+	llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
+		req->io_task_work.func(req);
+}
+
 static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
 			     unsigned int issue_flags)
 {
@@ -4850,10 +4806,8 @@
 };
 
 static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
-			   __poll_t mask, task_work_func_t func)
+			   __poll_t mask, io_req_tw_func_t func)
 {
-	int ret;
-
 	/* for instances that support it check for an event match first: */
 	if (mask && !(mask & poll->events))
 		return 0;
@@ -4863,7 +4817,7 @@
 	list_del_init(&poll->wait.entry);
 
 	req->result = mask;
-	req->task_work.func = func;
+	req->io_task_work.func = func;
 
 	/*
 	 * If this fails, then the task is exiting. When a task exits, the
@@ -4871,11 +4825,7 @@
 	 * of executing it. We can't safely execute it anyway, as we may not
 	 * have the needed state needed for it anyway.
 	 */
-	ret = io_req_task_work_add(req);
-	if (unlikely(ret)) {
-		WRITE_ONCE(poll->canceled, true);
-		io_req_task_work_add_fallback(req, func);
-	}
+	io_req_task_work_add(req);
 	return 1;
 }
 
@@ -4884,6 +4834,9 @@
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
+	if (unlikely(req->task->flags & PF_EXITING))
+		WRITE_ONCE(poll->canceled, true);
+
 	if (!req->result && !READ_ONCE(poll->canceled)) {
 		struct poll_table_struct pt = { ._key = poll->events };
 
@@ -4960,9 +4913,8 @@
 	return !(flags & IORING_CQE_F_MORE);
 }
 
-static void io_poll_task_func(struct callback_head *cb)
+static void io_poll_task_func(struct io_kiocb *req)
 {
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_kiocb *nxt;
 
@@ -4984,7 +4936,7 @@
 		if (done) {
 			nxt = io_put_req_find_next(req);
 			if (nxt)
-				__io_req_task_submit(nxt);
+				io_req_task_submit(nxt);
 		}
 	}
 }
@@ -5004,7 +4956,7 @@
 
 	list_del_init(&wait->entry);
 
-	if (poll && poll->head) {
+	if (poll->head) {
 		bool done;
 
 		spin_lock(&poll->head->lock);
@@ -5093,9 +5045,8 @@
 	__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
 }
 
-static void io_async_task_func(struct callback_head *cb)
+static void io_async_task_func(struct io_kiocb *req)
 {
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
 	struct async_poll *apoll = req->apoll;
 	struct io_ring_ctx *ctx = req->ctx;
 
@@ -5111,7 +5062,7 @@
 	spin_unlock_irq(&ctx->completion_lock);
 
 	if (!READ_ONCE(apoll->poll.canceled))
-		__io_req_task_submit(req);
+		io_req_task_submit(req);
 	else
 		io_req_complete_failed(req, -ECANCELED);
 }
@@ -6072,7 +6023,7 @@
 	io_prep_async_link(req);
 	de = kmalloc(sizeof(*de), GFP_KERNEL);
 	if (!de) {
-		io_req_complete_failed(req, ret);
+		io_req_complete_failed(req, -ENOMEM);
 		return true;
 	}
 
@@ -8767,11 +8718,6 @@
 	return -EINVAL;
 }
 
-static inline bool io_run_ctx_fallback(struct io_ring_ctx *ctx)
-{
-	return io_run_task_work_head(&ctx->exit_task_work);
-}
-
 struct io_tctx_exit {
 	struct callback_head		task_work;
 	struct completion		completion;
@@ -8837,7 +8783,7 @@
 	/*
 	 * Some may use context even when all refs and requests have been put,
 	 * and they are free to do so while still holding uring_lock or
-	 * completion_lock, see __io_req_task_submit(). Apart from other work,
+	 * completion_lock, see io_req_task_submit(). Apart from other work,
 	 * this lock/unlock section also waits them to finish.
 	 */
 	mutex_lock(&ctx->uring_lock);
@@ -9036,7 +8982,6 @@
 		ret |= io_kill_timeouts(ctx, task, cancel_all);
 		if (task)
 			ret |= io_run_task_work();
-		ret |= io_run_ctx_fallback(ctx);
 		if (!ret)
 			break;
 		cond_resched();