blob: d816c09c88a544a271ba192cf9f143d23577002f [file] [log] [blame]
// SPDX-License-Identifier: GPL-2.0
/*
* Shared application/kernel submission and completion ring pairs, for
* supporting fast/efficient IO.
*
* A note on the read/write ordering memory barriers that are matched between
* the application and kernel side.
*
* After the application reads the CQ ring tail, it must use an
* appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
* before writing the tail (using smp_load_acquire to read the tail will
* do). It also needs a smp_mb() before updating CQ head (ordering the
* entry load(s) with the head store), pairing with an implicit barrier
* through a control-dependency in io_get_cqe (smp_store_release to
* store head will do). Failure to do so could lead to reading invalid
* CQ entries.
*
* Likewise, the application must use an appropriate smp_wmb() before
* writing the SQ tail (ordering SQ entry stores with the tail store),
* which pairs with smp_load_acquire in io_get_sqring (smp_store_release
* to store the tail will do). And it needs a barrier ordering the SQ
* head load before writing new SQ entries (smp_load_acquire to read
* head will do).
*
* When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
* needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
* updating the SQ tail; a full memory barrier smp_mb() is needed
* between.
*
* Also see the examples in the liburing library:
*
* git://git.kernel.dk/liburing
*
* io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
* from data shared between the kernel and application. This is done both
* for ordering purposes, but also to ensure that once a value is loaded from
* data that the application could potentially modify, it remains stable.
*
* Copyright (C) 2018-2019 Jens Axboe
* Copyright (c) 2018-2019 Christoph Hellwig
*/
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/syscalls.h>
#include <linux/compat.h>
#include <net/compat.h>
#include <linux/refcount.h>
#include <linux/uio.h>
#include <linux/bits.h>
#include <linux/sched/signal.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/fdtable.h>
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/blkdev.h>
#include <linux/bvec.h>
#include <linux/net.h>
#include <net/sock.h>
#include <net/af_unix.h>
#include <net/scm.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
#include <linux/sizes.h>
#include <linux/hugetlb.h>
#include <linux/highmem.h>
#include <linux/namei.h>
#include <linux/fsnotify.h>
#include <linux/fadvise.h>
#include <linux/eventpoll.h>
#include <linux/splice.h>
#include <linux/task_work.h>
#include <linux/pagemap.h>
#include <linux/io_uring.h>
#include <linux/tracehook.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
#include <uapi/linux/io_uring.h>
#include "internal.h"
#include "io-wq.h"
#define IORING_MAX_ENTRIES 32768
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
#define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
/* only define max */
#define IORING_MAX_FIXED_FILES (1U << 15)
#define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
IORING_REGISTER_LAST + IORING_OP_LAST)
#define IO_RSRC_TAG_TABLE_SHIFT (PAGE_SHIFT - 3)
#define IO_RSRC_TAG_TABLE_MAX (1U << IO_RSRC_TAG_TABLE_SHIFT)
#define IO_RSRC_TAG_TABLE_MASK (IO_RSRC_TAG_TABLE_MAX - 1)
#define IORING_MAX_REG_BUFFERS (1U << 14)
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
IOSQE_IO_HARDLINK | IOSQE_ASYNC | \
IOSQE_BUFFER_SELECT)
#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS)
#define IO_TCTX_REFS_CACHE_NR (1U << 10)
struct io_uring {
u32 head ____cacheline_aligned_in_smp;
u32 tail ____cacheline_aligned_in_smp;
};
/*
* This data is shared with the application through the mmap at offsets
* IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
*
* The offsets to the member fields are published through struct
* io_sqring_offsets when calling io_uring_setup.
*/
struct io_rings {
/*
* Head and tail offsets into the ring; the offsets need to be
* masked to get valid indices.
*
* The kernel controls head of the sq ring and the tail of the cq ring,
* and the application controls tail of the sq ring and the head of the
* cq ring.
*/
struct io_uring sq, cq;
/*
* Bitmasks to apply to head and tail offsets (constant, equals
* ring_entries - 1)
*/
u32 sq_ring_mask, cq_ring_mask;
/* Ring sizes (constant, power of 2) */
u32 sq_ring_entries, cq_ring_entries;
/*
* Number of invalid entries dropped by the kernel due to
* invalid index stored in array
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* After a new SQ head value was read by the application this
* counter includes all submissions that were dropped reaching
* the new SQ head (and possibly more).
*/
u32 sq_dropped;
/*
* Runtime SQ flags
*
* Written by the kernel, shouldn't be modified by the
* application.
*
* The application needs a full memory barrier before checking
* for IORING_SQ_NEED_WAKEUP after updating the sq tail.
*/
u32 sq_flags;
/*
* Runtime CQ flags
*
* Written by the application, shouldn't be modified by the
* kernel.
*/
u32 cq_flags;
/*
* Number of completion events lost because the queue was full;
* this should be avoided by the application by making sure
* there are not more requests pending than there is space in
* the completion queue.
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* As completion events come in out of order this counter is not
* ordered with any other data.
*/
u32 cq_overflow;
/*
* Ring buffer of completion events.
*
* The kernel writes completion events fresh every time they are
* produced, so the application is allowed to modify pending
* entries.
*/
struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp;
};
enum io_uring_cmd_flags {
IO_URING_F_NONBLOCK = 1,
IO_URING_F_COMPLETE_DEFER = 2,
};
struct io_mapped_ubuf {
u64 ubuf;
u64 ubuf_end;
unsigned int nr_bvecs;
unsigned long acct_pages;
struct bio_vec bvec[];
};
struct io_ring_ctx;
struct io_overflow_cqe {
struct io_uring_cqe cqe;
struct list_head list;
};
struct io_fixed_file {
/* file * with additional FFS_* flags */
unsigned long file_ptr;
};
struct io_rsrc_put {
struct list_head list;
u64 tag;
union {
void *rsrc;
struct file *file;
struct io_mapped_ubuf *buf;
};
};
struct io_file_table {
struct io_fixed_file *files;
};
struct io_rsrc_node {
struct percpu_ref refs;
struct list_head node;
struct list_head rsrc_list;
struct io_rsrc_data *rsrc_data;
struct llist_node llist;
bool done;
};
typedef void (rsrc_put_fn)(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
struct io_rsrc_data {
struct io_ring_ctx *ctx;
u64 **tags;
unsigned int nr;
rsrc_put_fn *do_put;
atomic_t refs;
struct completion done;
bool quiesce;
};
struct io_buffer {
struct list_head list;
__u64 addr;
__u32 len;
__u16 bid;
};
struct io_restriction {
DECLARE_BITMAP(register_op, IORING_REGISTER_LAST);
DECLARE_BITMAP(sqe_op, IORING_OP_LAST);
u8 sqe_flags_allowed;
u8 sqe_flags_required;
bool registered;
};
enum {
IO_SQ_THREAD_SHOULD_STOP = 0,
IO_SQ_THREAD_SHOULD_PARK,
};
struct io_sq_data {
refcount_t refs;
atomic_t park_pending;
struct mutex lock;
/* ctx's that are using this sqd */
struct list_head ctx_list;
struct task_struct *thread;
struct wait_queue_head wait;
unsigned sq_thread_idle;
int sq_cpu;
pid_t task_pid;
pid_t task_tgid;
unsigned long state;
struct completion exited;
};
#define IO_COMPL_BATCH 32
#define IO_REQ_CACHE_SIZE 32
#define IO_REQ_ALLOC_BATCH 8
struct io_submit_link {
struct io_kiocb *head;
struct io_kiocb *last;
};
struct io_submit_state {
struct blk_plug plug;
struct io_submit_link link;
/*
* io_kiocb alloc cache
*/
void *reqs[IO_REQ_CACHE_SIZE];
unsigned int free_reqs;
bool plug_started;
/*
* Batch completion logic
*/
struct io_kiocb *compl_reqs[IO_COMPL_BATCH];
unsigned int compl_nr;
/* inline/task_work completion list, under ->uring_lock */
struct list_head free_list;
unsigned int ios_left;
};
struct io_ring_ctx {
/* const or read-mostly hot data */
struct {
struct percpu_ref refs;
struct io_rings *rings;
unsigned int flags;
unsigned int compat: 1;
unsigned int drain_next: 1;
unsigned int eventfd_async: 1;
unsigned int restricted: 1;
unsigned int off_timeout_used: 1;
unsigned int drain_active: 1;
} ____cacheline_aligned_in_smp;
/* submission data */
struct {
struct mutex uring_lock;
/*
* Ring buffer of indices into array of io_uring_sqe, which is
* mmapped by the application using the IORING_OFF_SQES offset.
*
* This indirection could e.g. be used to assign fixed
* io_uring_sqe entries to operations and only submit them to
* the queue when needed.
*
* The kernel modifies neither the indices array nor the entries
* array.
*/
u32 *sq_array;
struct io_uring_sqe *sq_sqes;
unsigned cached_sq_head;
unsigned sq_entries;
struct list_head defer_list;
/*
* Fixed resources fast path, should be accessed only under
* uring_lock, and updated through io_uring_register(2)
*/
struct io_rsrc_node *rsrc_node;
struct io_file_table file_table;
unsigned nr_user_files;
unsigned nr_user_bufs;
struct io_mapped_ubuf **user_bufs;
struct io_submit_state submit_state;
struct list_head timeout_list;
struct list_head ltimeout_list;
struct list_head cq_overflow_list;
struct xarray io_buffers;
struct xarray personalities;
u32 pers_next;
unsigned sq_thread_idle;
} ____cacheline_aligned_in_smp;
/* IRQ completion list, under ->completion_lock */
struct list_head locked_free_list;
unsigned int locked_free_nr;
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
struct wait_queue_head sqo_sq_wait;
struct list_head sqd_list;
unsigned long check_cq_overflow;
struct {
unsigned cached_cq_tail;
unsigned cq_entries;
struct eventfd_ctx *cq_ev_fd;
struct wait_queue_head poll_wait;
struct wait_queue_head cq_wait;
unsigned cq_extra;
atomic_t cq_timeouts;
struct fasync_struct *cq_fasync;
unsigned cq_last_tm_flush;
} ____cacheline_aligned_in_smp;
struct {
spinlock_t completion_lock;
spinlock_t timeout_lock;
/*
* ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
*/
struct list_head iopoll_list;
struct hlist_head *cancel_hash;
unsigned cancel_hash_bits;
bool poll_multi_queue;
} ____cacheline_aligned_in_smp;
struct io_restriction restrictions;
/* slow path rsrc auxilary data, used by update/register */
struct {
struct io_rsrc_node *rsrc_backup_node;
struct io_mapped_ubuf *dummy_ubuf;
struct io_rsrc_data *file_data;
struct io_rsrc_data *buf_data;
struct delayed_work rsrc_put_work;
struct llist_head rsrc_put_llist;
struct list_head rsrc_ref_list;
spinlock_t rsrc_ref_lock;
};
/* Keep this last, we don't need it for the fast path */
struct {
#if defined(CONFIG_UNIX)
struct socket *ring_sock;
#endif
/* hashed buffered write serialization */
struct io_wq_hash *hash_map;
/* Only used for accounting purposes */
struct user_struct *user;
struct mm_struct *mm_account;
/* ctx exit and cancelation */
struct llist_head fallback_llist;
struct delayed_work fallback_work;
struct work_struct exit_work;
struct list_head tctx_list;
struct completion ref_comp;
};
};
struct io_uring_task {
/* submission side */
int cached_refs;
struct xarray xa;
struct wait_queue_head wait;
const struct io_ring_ctx *last;
struct io_wq *io_wq;
struct percpu_counter inflight;
atomic_t inflight_tracked;
atomic_t in_idle;
spinlock_t task_lock;
struct io_wq_work_list task_list;
struct callback_head task_work;
bool task_running;
};
/*
* First field must be the file pointer in all the
* iocb unions! See also 'struct kiocb' in <linux/fs.h>
*/
struct io_poll_iocb {
struct file *file;
struct wait_queue_head *head;
__poll_t events;
bool done;
bool canceled;
struct wait_queue_entry wait;
};
struct io_poll_update {
struct file *file;
u64 old_user_data;
u64 new_user_data;
__poll_t events;
bool update_events;
bool update_user_data;
};
struct io_close {
struct file *file;
int fd;
};
struct io_timeout_data {
struct io_kiocb *req;
struct hrtimer timer;
struct timespec64 ts;
enum hrtimer_mode mode;
u32 flags;
};
struct io_accept {
struct file *file;
struct sockaddr __user *addr;
int __user *addr_len;
int flags;
u32 file_slot;
unsigned long nofile;
};
struct io_sync {
struct file *file;
loff_t len;
loff_t off;
int flags;
int mode;
};
struct io_cancel {
struct file *file;
u64 addr;
};
struct io_timeout {
struct file *file;
u32 off;
u32 target_seq;
struct list_head list;
/* head of the link, used by linked timeouts only */
struct io_kiocb *head;
/* for linked completions */
struct io_kiocb *prev;
};
struct io_timeout_rem {
struct file *file;
u64 addr;
/* timeout update */
struct timespec64 ts;
u32 flags;
bool ltimeout;
};
struct io_rw {
/* NOTE: kiocb has the file as the first member, so don't do it here */
struct kiocb kiocb;
u64 addr;
u64 len;
};
struct io_connect {
struct file *file;
struct sockaddr __user *addr;
int addr_len;
};
struct io_sr_msg {
struct file *file;
union {
struct compat_msghdr __user *umsg_compat;
struct user_msghdr __user *umsg;
void __user *buf;
};
int msg_flags;
int bgid;
size_t len;
struct io_buffer *kbuf;
};
struct io_open {
struct file *file;
int dfd;
u32 file_slot;
struct filename *filename;
struct open_how how;
unsigned long nofile;
};
struct io_rsrc_update {
struct file *file;
u64 arg;
u32 nr_args;
u32 offset;
};
struct io_fadvise {
struct file *file;
u64 offset;
u32 len;
u32 advice;
};
struct io_madvise {
struct file *file;
u64 addr;
u32 len;
u32 advice;
};
struct io_epoll {
struct file *file;
int epfd;
int op;
int fd;
struct epoll_event event;
};
struct io_splice {
struct file *file_out;
struct file *file_in;
loff_t off_out;
loff_t off_in;
u64 len;
unsigned int flags;
};
struct io_provide_buf {
struct file *file;
__u64 addr;
__u32 len;
__u32 bgid;
__u16 nbufs;
__u16 bid;
};
struct io_statx {
struct file *file;
int dfd;
unsigned int mask;
unsigned int flags;
const char __user *filename;
struct statx __user *buffer;
};
struct io_shutdown {
struct file *file;
int how;
};
struct io_rename {
struct file *file;
int old_dfd;
int new_dfd;
struct filename *oldpath;
struct filename *newpath;
int flags;
};
struct io_unlink {
struct file *file;
int dfd;
int flags;
struct filename *filename;
};
struct io_mkdir {
struct file *file;
int dfd;
umode_t mode;
struct filename *filename;
};
struct io_symlink {
struct file *file;
int new_dfd;
struct filename *oldpath;
struct filename *newpath;
};
struct io_hardlink {
struct file *file;
int old_dfd;
int new_dfd;
struct filename *oldpath;
struct filename *newpath;
int flags;
};
struct io_completion {
struct file *file;
u32 cflags;
};
struct io_async_connect {
struct sockaddr_storage address;
};
struct io_async_msghdr {
struct iovec fast_iov[UIO_FASTIOV];
/* points to an allocated iov, if NULL we use fast_iov instead */
struct iovec *free_iov;
struct sockaddr __user *uaddr;
struct msghdr msg;
struct sockaddr_storage addr;
};
struct io_async_rw {
struct iovec fast_iov[UIO_FASTIOV];
const struct iovec *free_iovec;
struct iov_iter iter;
size_t bytes_done;
struct wait_page_queue wpq;
};
enum {
REQ_F_FIXED_FILE_BIT = IOSQE_FIXED_FILE_BIT,
REQ_F_IO_DRAIN_BIT = IOSQE_IO_DRAIN_BIT,
REQ_F_LINK_BIT = IOSQE_IO_LINK_BIT,
REQ_F_HARDLINK_BIT = IOSQE_IO_HARDLINK_BIT,
REQ_F_FORCE_ASYNC_BIT = IOSQE_ASYNC_BIT,
REQ_F_BUFFER_SELECT_BIT = IOSQE_BUFFER_SELECT_BIT,
/* first byte is taken by user flags, shift it to not overlap */
REQ_F_FAIL_BIT = 8,
REQ_F_INFLIGHT_BIT,
REQ_F_CUR_POS_BIT,
REQ_F_NOWAIT_BIT,
REQ_F_LINK_TIMEOUT_BIT,
REQ_F_NEED_CLEANUP_BIT,
REQ_F_POLLED_BIT,
REQ_F_BUFFER_SELECTED_BIT,
REQ_F_COMPLETE_INLINE_BIT,
REQ_F_REISSUE_BIT,
REQ_F_DONT_REISSUE_BIT,
REQ_F_CREDS_BIT,
REQ_F_REFCOUNT_BIT,
REQ_F_ARM_LTIMEOUT_BIT,
/* keep async read/write and isreg together and in order */
REQ_F_NOWAIT_READ_BIT,
REQ_F_NOWAIT_WRITE_BIT,
REQ_F_ISREG_BIT,
/* not a real bit, just to check we're not overflowing the space */
__REQ_F_LAST_BIT,
};
enum {
/* ctx owns file */
REQ_F_FIXED_FILE = BIT(REQ_F_FIXED_FILE_BIT),
/* drain existing IO first */
REQ_F_IO_DRAIN = BIT(REQ_F_IO_DRAIN_BIT),
/* linked sqes */
REQ_F_LINK = BIT(REQ_F_LINK_BIT),
/* doesn't sever on completion < 0 */
REQ_F_HARDLINK = BIT(REQ_F_HARDLINK_BIT),
/* IOSQE_ASYNC */
REQ_F_FORCE_ASYNC = BIT(REQ_F_FORCE_ASYNC_BIT),
/* IOSQE_BUFFER_SELECT */
REQ_F_BUFFER_SELECT = BIT(REQ_F_BUFFER_SELECT_BIT),
/* fail rest of links */
REQ_F_FAIL = BIT(REQ_F_FAIL_BIT),
/* on inflight list, should be cancelled and waited on exit reliably */
REQ_F_INFLIGHT = BIT(REQ_F_INFLIGHT_BIT),
/* read/write uses file position */
REQ_F_CUR_POS = BIT(REQ_F_CUR_POS_BIT),
/* must not punt to workers */
REQ_F_NOWAIT = BIT(REQ_F_NOWAIT_BIT),
/* has or had linked timeout */
REQ_F_LINK_TIMEOUT = BIT(REQ_F_LINK_TIMEOUT_BIT),
/* needs cleanup */
REQ_F_NEED_CLEANUP = BIT(REQ_F_NEED_CLEANUP_BIT),
/* already went through poll handler */
REQ_F_POLLED = BIT(REQ_F_POLLED_BIT),
/* buffer already selected */
REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT),
/* completion is deferred through io_comp_state */
REQ_F_COMPLETE_INLINE = BIT(REQ_F_COMPLETE_INLINE_BIT),
/* caller should reissue async */
REQ_F_REISSUE = BIT(REQ_F_REISSUE_BIT),
/* don't attempt request reissue, see io_rw_reissue() */
REQ_F_DONT_REISSUE = BIT(REQ_F_DONT_REISSUE_BIT),
/* supports async reads */
REQ_F_NOWAIT_READ = BIT(REQ_F_NOWAIT_READ_BIT),
/* supports async writes */
REQ_F_NOWAIT_WRITE = BIT(REQ_F_NOWAIT_WRITE_BIT),
/* regular file */
REQ_F_ISREG = BIT(REQ_F_ISREG_BIT),
/* has creds assigned */
REQ_F_CREDS = BIT(REQ_F_CREDS_BIT),
/* skip refcounting if not set */
REQ_F_REFCOUNT = BIT(REQ_F_REFCOUNT_BIT),
/* there is a linked timeout that has to be armed */
REQ_F_ARM_LTIMEOUT = BIT(REQ_F_ARM_LTIMEOUT_BIT),
};
struct async_poll {
struct io_poll_iocb poll;
struct io_poll_iocb *double_poll;
};
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, bool *locked);
struct io_task_work {
union {
struct io_wq_work_node node;
struct llist_node fallback_node;
};
io_req_tw_func_t func;
};
enum {
IORING_RSRC_FILE = 0,
IORING_RSRC_BUFFER = 1,
};
/*
* NOTE! Each of the iocb union members has the file pointer
* as the first entry in their struct definition. So you can
* access the file pointer through any of the sub-structs,
* or directly as just 'ki_filp' in this struct.
*/
struct io_kiocb {
union {
struct file *file;
struct io_rw rw;
struct io_poll_iocb poll;
struct io_poll_update poll_update;
struct io_accept accept;
struct io_sync sync;
struct io_cancel cancel;
struct io_timeout timeout;
struct io_timeout_rem timeout_rem;
struct io_connect connect;
struct io_sr_msg sr_msg;
struct io_open open;
struct io_close close;
struct io_rsrc_update rsrc_update;
struct io_fadvise fadvise;
struct io_madvise madvise;
struct io_epoll epoll;
struct io_splice splice;
struct io_provide_buf pbuf;
struct io_statx statx;
struct io_shutdown shutdown;
struct io_rename rename;
struct io_unlink unlink;
struct io_mkdir mkdir;
struct io_symlink symlink;
struct io_hardlink hardlink;
/* use only after cleaning per-op data, see io_clean_op() */
struct io_completion compl;
};
/* opcode allocated if it needs to store data for async defer */
void *async_data;
u8 opcode;
/* polled IO has completed */
u8 iopoll_completed;
u16 buf_index;
u32 result;
struct io_ring_ctx *ctx;
unsigned int flags;
atomic_t refs;
struct task_struct *task;
u64 user_data;
struct io_kiocb *link;
struct percpu_ref *fixed_rsrc_refs;
/* used with ctx->iopoll_list with reads/writes */
struct list_head inflight_entry;
struct io_task_work io_task_work;
/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
struct hlist_node hash_node;
struct async_poll *apoll;
struct io_wq_work work;
const struct cred *creds;
/* store used ubuf, so we can prevent reloading */
struct io_mapped_ubuf *imu;
};
struct io_tctx_node {
struct list_head ctx_node;
struct task_struct *task;
struct io_ring_ctx *ctx;
};
struct io_defer_entry {
struct list_head list;
struct io_kiocb *req;
u32 seq;
};
struct io_op_def {
/* needs req->file assigned */
unsigned needs_file : 1;
/* hash wq insertion if file is a regular file */
unsigned hash_reg_file : 1;
/* unbound wq insertion if file is a non-regular file */
unsigned unbound_nonreg_file : 1;
/* opcode is not supported by this kernel */
unsigned not_supported : 1;
/* set if opcode supports polled "wait" */
unsigned pollin : 1;
unsigned pollout : 1;
/* op supports buffer selection */
unsigned buffer_select : 1;
/* do prep async if is going to be punted */
unsigned needs_async_setup : 1;
/* should block plug */
unsigned plug : 1;
/* size of async data needed, if any */
unsigned short async_size;
};
static const struct io_op_def io_op_defs[] = {
[IORING_OP_NOP] = {},
[IORING_OP_READV] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.buffer_select = 1,
.needs_async_setup = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_WRITEV] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
.needs_async_setup = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_FSYNC] = {
.needs_file = 1,
},
[IORING_OP_READ_FIXED] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_WRITE_FIXED] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_POLL_ADD] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
},
[IORING_OP_POLL_REMOVE] = {},
[IORING_OP_SYNC_FILE_RANGE] = {
.needs_file = 1,
},
[IORING_OP_SENDMSG] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
.needs_async_setup = 1,
.async_size = sizeof(struct io_async_msghdr),
},
[IORING_OP_RECVMSG] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.buffer_select = 1,
.needs_async_setup = 1,
.async_size = sizeof(struct io_async_msghdr),
},
[IORING_OP_TIMEOUT] = {
.async_size = sizeof(struct io_timeout_data),
},
[IORING_OP_TIMEOUT_REMOVE] = {
/* used by timeout updates' prep() */
},
[IORING_OP_ACCEPT] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
},
[IORING_OP_ASYNC_CANCEL] = {},
[IORING_OP_LINK_TIMEOUT] = {
.async_size = sizeof(struct io_timeout_data),
},
[IORING_OP_CONNECT] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
.needs_async_setup = 1,
.async_size = sizeof(struct io_async_connect),
},
[IORING_OP_FALLOCATE] = {
.needs_file = 1,
},
[IORING_OP_OPENAT] = {},
[IORING_OP_CLOSE] = {},
[IORING_OP_FILES_UPDATE] = {},
[IORING_OP_STATX] = {},
[IORING_OP_READ] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.buffer_select = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_WRITE] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
},
[IORING_OP_FADVISE] = {
.needs_file = 1,
},
[IORING_OP_MADVISE] = {},
[IORING_OP_SEND] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
},
[IORING_OP_RECV] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.buffer_select = 1,
},
[IORING_OP_OPENAT2] = {
},
[IORING_OP_EPOLL_CTL] = {
.unbound_nonreg_file = 1,
},
[IORING_OP_SPLICE] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
},
[IORING_OP_PROVIDE_BUFFERS] = {},
[IORING_OP_REMOVE_BUFFERS] = {},
[IORING_OP_TEE] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
},
[IORING_OP_SHUTDOWN] = {
.needs_file = 1,
},
[IORING_OP_RENAMEAT] = {},
[IORING_OP_UNLINKAT] = {},
[IORING_OP_MKDIRAT] = {},
[IORING_OP_SYMLINKAT] = {},
[IORING_OP_LINKAT] = {},
};
/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
static bool io_disarm_next(struct io_kiocb *req);
static void io_uring_del_tctx_node(unsigned long index);
static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
struct task_struct *task,
bool cancel_all);
static void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
static bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
long res, unsigned int cflags);
static void io_put_req(struct io_kiocb *req);
static void io_put_req_deferred(struct io_kiocb *req);
static void io_dismantle_req(struct io_kiocb *req);
static void io_queue_linked_timeout(struct io_kiocb *req);
static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type,
struct io_uring_rsrc_update2 *up,
unsigned nr_args);
static void io_clean_op(struct io_kiocb *req);
static struct file *io_file_get(struct io_ring_ctx *ctx,
struct io_kiocb *req, int fd, bool fixed);
static void __io_queue_sqe(struct io_kiocb *req);
static void io_rsrc_put_work(struct work_struct *work);
static void io_req_task_queue(struct io_kiocb *req);
static void io_submit_flush_completions(struct io_ring_ctx *ctx);
static int io_req_prep_async(struct io_kiocb *req);
static int io_install_fixed_file(struct io_kiocb *req, struct file *file,
unsigned int issue_flags, u32 slot_index);
static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer);
static struct kmem_cache *req_cachep;
static const struct file_operations io_uring_fops;
struct sock *io_uring_get_socket(struct file *file)
{
#if defined(CONFIG_UNIX)
if (file->f_op == &io_uring_fops) {
struct io_ring_ctx *ctx = file->private_data;
return ctx->ring_sock->sk;
}
#endif
return NULL;
}
EXPORT_SYMBOL(io_uring_get_socket);
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
{
if (!*locked) {
mutex_lock(&ctx->uring_lock);
*locked = true;
}
}
#define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link)
/*
* Shamelessly stolen from the mm implementation of page reference checking,
* see commit f958d7b528b1 for details.
*/
#define req_ref_zero_or_close_to_overflow(req) \
((unsigned int) atomic_read(&(req->refs)) + 127u <= 127u)
static inline bool req_ref_inc_not_zero(struct io_kiocb *req)
{
WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
return atomic_inc_not_zero(&req->refs);
}
static inline bool req_ref_put_and_test(struct io_kiocb *req)
{
if (likely(!(req->flags & REQ_F_REFCOUNT)))
return true;
WARN_ON_ONCE(req_ref_zero_or_close_to_overflow(req));
return atomic_dec_and_test(&req->refs);
}
static inline void req_ref_put(struct io_kiocb *req)
{
WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
WARN_ON_ONCE(req_ref_put_and_test(req));
}
static inline void req_ref_get(struct io_kiocb *req)
{
WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
WARN_ON_ONCE(req_ref_zero_or_close_to_overflow(req));
atomic_inc(&req->refs);
}
static inline void __io_req_set_refcount(struct io_kiocb *req, int nr)
{
if (!(req->flags & REQ_F_REFCOUNT)) {
req->flags |= REQ_F_REFCOUNT;
atomic_set(&req->refs, nr);
}
}
static inline void io_req_set_refcount(struct io_kiocb *req)
{
__io_req_set_refcount(req, 1);
}
static inline void io_req_set_rsrc_node(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
if (!req->fixed_rsrc_refs) {
req->fixed_rsrc_refs = &ctx->rsrc_node->refs;
percpu_ref_get(req->fixed_rsrc_refs);
}
}
static void io_refs_resurrect(struct percpu_ref *ref, struct completion *compl)
{
bool got = percpu_ref_tryget(ref);
/* already at zero, wait for ->release() */
if (!got)
wait_for_completion(compl);
percpu_ref_resurrect(ref);
if (got)
percpu_ref_put(ref);
}
static bool io_match_task(struct io_kiocb *head, struct task_struct *task,
bool cancel_all)
{
struct io_kiocb *req;
if (task && head->task != task)
return false;
if (cancel_all)
return true;
io_for_each_link(req, head) {
if (req->flags & REQ_F_INFLIGHT)
return true;
}
return false;
}
static inline void req_set_fail(struct io_kiocb *req)
{
req->flags |= REQ_F_FAIL;
}
static inline void req_fail_link_node(struct io_kiocb *req, int res)
{
req_set_fail(req);
req->result = res;
}
static void io_ring_ctx_ref_free(struct percpu_ref *ref)
{
struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
complete(&ctx->ref_comp);
}
static inline bool io_is_timeout_noseq(struct io_kiocb *req)
{
return !req->timeout.off;
}
static void io_fallback_req_func(struct work_struct *work)
{
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
fallback_work.work);
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
struct io_kiocb *req, *tmp;
bool locked = false;
percpu_ref_get(&ctx->refs);
llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
req->io_task_work.func(req, &locked);
if (locked) {
if (ctx->submit_state.compl_nr)
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
}
percpu_ref_put(&ctx->refs);
}
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
struct io_ring_ctx *ctx;
int hash_bits;
ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
if (!ctx)
return NULL;
/*
* Use 5 bits less than the max cq entries, that should give us around
* 32 entries per hash list if totally full and uniformly spread.
*/
hash_bits = ilog2(p->cq_entries);
hash_bits -= 5;
if (hash_bits <= 0)
hash_bits = 1;
ctx->cancel_hash_bits = hash_bits;
ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
GFP_KERNEL);
if (!ctx->cancel_hash)
goto err;
__hash_init(ctx->cancel_hash, 1U << hash_bits);
ctx->dummy_ubuf = kzalloc(sizeof(*ctx->dummy_ubuf), GFP_KERNEL);
if (!ctx->dummy_ubuf)
goto err;
/* set invalid range, so io_import_fixed() fails meeting it */
ctx->dummy_ubuf->ubuf = -1UL;
if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
goto err;
ctx->flags = p->flags;
init_waitqueue_head(&ctx->sqo_sq_wait);
INIT_LIST_HEAD(&ctx->sqd_list);
init_waitqueue_head(&ctx->poll_wait);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
init_completion(&ctx->ref_comp);
xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1);
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->cq_wait);
spin_lock_init(&ctx->completion_lock);
spin_lock_init(&ctx->timeout_lock);
INIT_LIST_HEAD(&ctx->iopoll_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
spin_lock_init(&ctx->rsrc_ref_lock);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
init_llist_head(&ctx->rsrc_put_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
INIT_LIST_HEAD(&ctx->submit_state.free_list);
INIT_LIST_HEAD(&ctx->locked_free_list);
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
return ctx;
err:
kfree(ctx->dummy_ubuf);
kfree(ctx->cancel_hash);
kfree(ctx);
return NULL;
}
static void io_account_cq_overflow(struct io_ring_ctx *ctx)
{
struct io_rings *r = ctx->rings;
WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
ctx->cq_extra--;
}
static bool req_need_defer(struct io_kiocb *req, u32 seq)
{
if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
struct io_ring_ctx *ctx = req->ctx;
return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
}
return false;
}
#define FFS_ASYNC_READ 0x1UL
#define FFS_ASYNC_WRITE 0x2UL
#ifdef CONFIG_64BIT
#define FFS_ISREG 0x4UL
#else
#define FFS_ISREG 0x0UL
#endif
#define FFS_MASK ~(FFS_ASYNC_READ|FFS_ASYNC_WRITE|FFS_ISREG)
static inline bool io_req_ffs_set(struct io_kiocb *req)
{
return IS_ENABLED(CONFIG_64BIT) && (req->flags & REQ_F_FIXED_FILE);
}
static void io_req_track_inflight(struct io_kiocb *req)
{
if (!(req->flags & REQ_F_INFLIGHT)) {
req->flags |= REQ_F_INFLIGHT;
atomic_inc(&current->io_uring->inflight_tracked);
}
}
static inline void io_unprep_linked_timeout(struct io_kiocb *req)
{
req->flags &= ~REQ_F_LINK_TIMEOUT;
}
static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
{
if (WARN_ON_ONCE(!req->link))
return NULL;
req->flags &= ~REQ_F_ARM_LTIMEOUT;
req->flags |= REQ_F_LINK_TIMEOUT;
/* linked timeouts should have two refs once prep'ed */
io_req_set_refcount(req);
__io_req_set_refcount(req->link, 2);
return req->link;
}
static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
{
if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
return NULL;
return __io_prep_linked_timeout(req);
}
static void io_prep_async_work(struct io_kiocb *req)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx;
if (!(req->flags & REQ_F_CREDS)) {
req->flags |= REQ_F_CREDS;
req->creds = get_current_cred();
}
req->work.list.next = NULL;
req->work.flags = 0;
if (req->flags & REQ_F_FORCE_ASYNC)
req->work.flags |= IO_WQ_WORK_CONCURRENT;
if (req->flags & REQ_F_ISREG) {
if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
io_wq_hash_work(&req->work, file_inode(req->file));
} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
if (def->unbound_nonreg_file)
req->work.flags |= IO_WQ_WORK_UNBOUND;
}
switch (req->opcode) {
case IORING_OP_SPLICE:
case IORING_OP_TEE:
if (!S_ISREG(file_inode(req->splice.file_in)->i_mode))
req->work.flags |= IO_WQ_WORK_UNBOUND;
break;
}
}
static void io_prep_async_link(struct io_kiocb *req)
{
struct io_kiocb *cur;
if (req->flags & REQ_F_LINK_TIMEOUT) {
struct io_ring_ctx *ctx = req->ctx;
spin_lock(&ctx->completion_lock);
io_for_each_link(cur, req)
io_prep_async_work(cur);
spin_unlock(&ctx->completion_lock);
} else {
io_for_each_link(cur, req)
io_prep_async_work(cur);
}
}
static void io_queue_async_work(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link = io_prep_linked_timeout(req);
struct io_uring_task *tctx = req->task->io_uring;
/* must not take the lock, NULL it as a precaution */
locked = NULL;
BUG_ON(!tctx);
BUG_ON(!tctx->io_wq);
/* init ->work of the whole link before punting */
io_prep_async_link(req);
/*
* Not expected to happen, but if we do have a bug where this _can_
* happen, catch it here and ensure the request is marked as
* canceled. That will make io-wq go through the usual work cancel
* procedure rather than attempt to run this request (or create a new
* worker for it).
*/
if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
req->work.flags |= IO_WQ_WORK_CANCEL;
trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
&req->work, req->flags);
io_wq_enqueue(tctx->io_wq, &req->work);
if (link)
io_queue_linked_timeout(link);
}
static void io_kill_timeout(struct io_kiocb *req, int status)
__must_hold(&req->ctx->completion_lock)
__must_hold(&req->ctx->timeout_lock)
{
struct io_timeout_data *io = req->async_data;
if (hrtimer_try_to_cancel(&io->timer) != -1) {
atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1);
list_del_init(&req->timeout.list);
io_cqring_fill_event(req->ctx, req->user_data, status, 0);
io_put_req_deferred(req);
}
}
static void io_queue_deferred(struct io_ring_ctx *ctx)
{
while (!list_empty(&ctx->defer_list)) {
struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
struct io_defer_entry, list);
if (req_need_defer(de->req, de->seq))
break;
list_del_init(&de->list);
io_req_task_queue(de->req);
kfree(de);
}
}
static void io_flush_timeouts(struct io_ring_ctx *ctx)
__must_hold(&ctx->completion_lock)
{
u32 seq = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts);
spin_lock_irq(&ctx->timeout_lock);
while (!list_empty(&ctx->timeout_list)) {
u32 events_needed, events_got;
struct io_kiocb *req = list_first_entry(&ctx->timeout_list,
struct io_kiocb, timeout.list);
if (io_is_timeout_noseq(req))
break;
/*
* Since seq can easily wrap around over time, subtract
* the last seq at which timeouts were flushed before comparing.
* Assuming not more than 2^31-1 events have happened since,
* these subtractions won't have wrapped, so we can check if
* target is in [last_seq, current_seq] by comparing the two.
*/
events_needed = req->timeout.target_seq - ctx->cq_last_tm_flush;
events_got = seq - ctx->cq_last_tm_flush;
if (events_got < events_needed)
break;
list_del_init(&req->timeout.list);
io_kill_timeout(req, 0);
}
ctx->cq_last_tm_flush = seq;
spin_unlock_irq(&ctx->timeout_lock);
}
static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
if (ctx->off_timeout_used)
io_flush_timeouts(ctx);
if (ctx->drain_active)
io_queue_deferred(ctx);
}
static inline void io_commit_cqring(struct io_ring_ctx *ctx)
{
if (unlikely(ctx->off_timeout_used || ctx->drain_active))
__io_commit_cqring_flush(ctx);
/* order cqe stores with ring update */
smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail);
}
static inline bool io_sqring_full(struct io_ring_ctx *ctx)
{
struct io_rings *r = ctx->rings;
return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == ctx->sq_entries;
}
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
{
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}
static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
unsigned tail, mask = ctx->cq_entries - 1;
/*
* writes to the cq entry need to come after reading head; the
* control dependency is enough as we're using WRITE_ONCE to
* fill the cq entry
*/
if (__io_cqring_events(ctx) == ctx->cq_entries)
return NULL;
tail = ctx->cached_cq_tail++;
return &rings->cqes[tail & mask];
}
static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
{
if (likely(!ctx->cq_ev_fd))
return false;
if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
return false;
return !ctx->eventfd_async || io_wq_current_is_worker();
}
/*
* This should only get called when at least one event has been posted.
* Some applications rely on the eventfd notification count only changing
* IFF a new CQE has been added to the CQ ring. There's no depedency on
* 1:1 relationship between how many times this function is called (and
* hence the eventfd count) and number of CQEs posted to the CQ ring.
*/
static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
{
/*
* wake_up_all() may seem excessive, but io_wake_function() and
* io_should_wake() handle the termination of the loop and only
* wake as many waiters as we need to.
*/
if (wq_has_sleeper(&ctx->cq_wait))
wake_up_all(&ctx->cq_wait);
if (ctx->sq_data && waitqueue_active(&ctx->sq_data->wait))
wake_up(&ctx->sq_data->wait);
if (io_should_trigger_evfd(ctx))
eventfd_signal(ctx->cq_ev_fd, 1);
if (waitqueue_active(&ctx->poll_wait)) {
wake_up_interruptible(&ctx->poll_wait);
kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
}
}
static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
{
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (wq_has_sleeper(&ctx->cq_wait))
wake_up_all(&ctx->cq_wait);
}
if (io_should_trigger_evfd(ctx))
eventfd_signal(ctx->cq_ev_fd, 1);
if (waitqueue_active(&ctx->poll_wait)) {
wake_up_interruptible(&ctx->poll_wait);
kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
}
}
/* Returns true if there are no backlogged entries after the flush */
static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
{
bool all_flushed, posted;
if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
return false;
posted = false;
spin_lock(&ctx->completion_lock);
while (!list_empty(&ctx->cq_overflow_list)) {
struct io_uring_cqe *cqe = io_get_cqe(ctx);
struct io_overflow_cqe *ocqe;
if (!cqe && !force)
break;
ocqe = list_first_entry(&ctx->cq_overflow_list,
struct io_overflow_cqe, list);
if (cqe)
memcpy(cqe, &ocqe->cqe, sizeof(*cqe));
else
io_account_cq_overflow(ctx);
posted = true;
list_del(&ocqe->list);
kfree(ocqe);
}
all_flushed = list_empty(&ctx->cq_overflow_list);
if (all_flushed) {
clear_bit(0, &ctx->check_cq_overflow);
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags & ~IORING_SQ_CQ_OVERFLOW);
}
if (posted)
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
if (posted)
io_cqring_ev_posted(ctx);
return all_flushed;
}
static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
bool ret = true;
if (test_bit(0, &ctx->check_cq_overflow)) {
/* iopoll syncs against uring_lock, not completion_lock */
if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_lock(&ctx->uring_lock);
ret = __io_cqring_overflow_flush(ctx, false);
if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_unlock(&ctx->uring_lock);
}
return ret;
}
/* must to be called somewhat shortly after putting a request */
static inline void io_put_task(struct task_struct *task, int nr)
{
struct io_uring_task *tctx = task->io_uring;
if (likely(task == current)) {
tctx->cached_refs += nr;
} else {
percpu_counter_sub(&tctx->inflight, nr);
if (unlikely(atomic_read(&tctx->in_idle)))
wake_up(&tctx->wait);
put_task_struct_many(task, nr);
}
}
static void io_task_refs_refill(struct io_uring_task *tctx)
{
unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
percpu_counter_add(&tctx->inflight, refill);
refcount_add(refill, &current->usage);
tctx->cached_refs += refill;
}
static inline void io_get_task_refs(int nr)
{
struct io_uring_task *tctx = current->io_uring;
tctx->cached_refs -= nr;
if (unlikely(tctx->cached_refs < 0))
io_task_refs_refill(tctx);
}
static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
long res, unsigned int cflags)
{
struct io_overflow_cqe *ocqe;
ocqe = kmalloc(sizeof(*ocqe), GFP_ATOMIC | __GFP_ACCOUNT);
if (!ocqe) {
/*
* If we're in ring overflow flush mode, or in task cancel mode,
* or cannot allocate an overflow entry, then we need to drop it
* on the floor.
*/
io_account_cq_overflow(ctx);
return false;
}
if (list_empty(&ctx->cq_overflow_list)) {
set_bit(0, &ctx->check_cq_overflow);
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags | IORING_SQ_CQ_OVERFLOW);
}
ocqe->cqe.user_data = user_data;
ocqe->cqe.res = res;
ocqe->cqe.flags = cflags;
list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
return true;
}
static inline bool __io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
long res, unsigned int cflags)
{
struct io_uring_cqe *cqe;
trace_io_uring_complete(ctx, user_data, res, cflags);
/*
* If we can't get a cq entry, userspace overflowed the
* submission (by quite a lot). Increment the overflow count in
* the ring.
*/
cqe = io_get_cqe(ctx);
if (likely(cqe)) {
WRITE_ONCE(cqe->user_data, user_data);
WRITE_ONCE(cqe->res, res);
WRITE_ONCE(cqe->flags, cflags);
return true;
}
return io_cqring_event_overflow(ctx, user_data, res, cflags);
}
/* not as hot to bloat with inlining */
static noinline bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
long res, unsigned int cflags)
{
return __io_cqring_fill_event(ctx, user_data, res, cflags);
}
static void io_req_complete_post(struct io_kiocb *req, long res,
unsigned int cflags)
{
struct io_ring_ctx *ctx = req->ctx;
spin_lock(&ctx->completion_lock);
__io_cqring_fill_event(ctx, req->user_data, res, cflags);
/*
* If we're the last reference to this request, add to our locked
* free_list cache.
*/
if (req_ref_put_and_test(req)) {
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) {
if (req->flags & IO_DISARM_MASK)
io_disarm_next(req);
if (req->link) {
io_req_task_queue(req->link);
req->link = NULL;
}
}
io_dismantle_req(req);
io_put_task(req->task, 1);
list_add(&req->inflight_entry, &ctx->locked_free_list);
ctx->locked_free_nr++;
} else {
if (!percpu_ref_tryget(&ctx->refs))
req = NULL;
}
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
if (req) {
io_cqring_ev_posted(ctx);
percpu_ref_put(&ctx->refs);
}
}
static inline bool io_req_needs_clean(struct io_kiocb *req)
{
return req->flags & IO_REQ_CLEAN_FLAGS;
}
static void io_req_complete_state(struct io_kiocb *req, long res,
unsigned int cflags)
{
if (io_req_needs_clean(req))
io_clean_op(req);
req->result = res;
req->compl.cflags = cflags;
req->flags |= REQ_F_COMPLETE_INLINE;
}
static inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags,
long res, unsigned cflags)
{
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
io_req_complete_state(req, res, cflags);
else
io_req_complete_post(req, res, cflags);
}
static inline void io_req_complete(struct io_kiocb *req, long res)
{
__io_req_complete(req, 0, res, 0);
}
static void io_req_complete_failed(struct io_kiocb *req, long res)
{
req_set_fail(req);
io_req_complete_post(req, res, 0);
}
static void io_req_complete_fail_submit(struct io_kiocb *req)
{
/*
* We don't submit, fail them all, for that replace hardlinks with
* normal links. Extra REQ_F_LINK is tolerated.
*/
req->flags &= ~REQ_F_HARDLINK;
req->flags |= REQ_F_LINK;
io_req_complete_failed(req, req->result);
}
/*
* Don't initialise the fields below on every allocation, but do that in
* advance and keep them valid across allocations.
*/
static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
req->ctx = ctx;
req->link = NULL;
req->async_data = NULL;
/* not necessary, but safer to zero */
req->result = 0;
}
static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
struct io_submit_state *state)
{
spin_lock(&ctx->completion_lock);
list_splice_init(&ctx->locked_free_list, &state->free_list);
ctx->locked_free_nr = 0;
spin_unlock(&ctx->completion_lock);
}
/* Returns true IFF there are requests in the cache */
static bool io_flush_cached_reqs(struct io_ring_ctx *ctx)
{
struct io_submit_state *state = &ctx->submit_state;
int nr;
/*
* If we have more than a batch's worth of requests in our IRQ side
* locked cache, grab the lock and move them over to our submission
* side cache.
*/
if (READ_ONCE(ctx->locked_free_nr) > IO_COMPL_BATCH)
io_flush_cached_locked_reqs(ctx, state);
nr = state->free_reqs;
while (!list_empty(&state->free_list)) {
struct io_kiocb *req = list_first_entry(&state->free_list,
struct io_kiocb, inflight_entry);
list_del(&req->inflight_entry);
state->reqs[nr++] = req;
if (nr == ARRAY_SIZE(state->reqs))
break;
}
state->free_reqs = nr;
return nr != 0;
}
/*
* A request might get retired back into the request caches even before opcode
* handlers and io_issue_sqe() are done with it, e.g. inline completion path.
* Because of that, io_alloc_req() should be called only under ->uring_lock
* and with extra caution to not get a request that is still worked on.
*/
static struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
struct io_submit_state *state = &ctx->submit_state;
gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
int ret, i;
BUILD_BUG_ON(ARRAY_SIZE(state->reqs) < IO_REQ_ALLOC_BATCH);
if (likely(state->free_reqs || io_flush_cached_reqs(ctx)))
goto got_req;
ret = kmem_cache_alloc_bulk(req_cachep, gfp, IO_REQ_ALLOC_BATCH,
state->reqs);
/*
* Bulk alloc is all-or-nothing. If we fail to get a batch,
* retry single alloc to be on the safe side.
*/
if (unlikely(ret <= 0)) {
state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
if (!state->reqs[0])
return NULL;
ret = 1;
}
for (i = 0; i < ret; i++)
io_preinit_req(state->reqs[i], ctx);
state->free_reqs = ret;
got_req:
state->free_reqs--;
return state->reqs[state->free_reqs];
}
static inline void io_put_file(struct file *file)
{
if (file)
fput(file);
}
static void io_dismantle_req(struct io_kiocb *req)
{
unsigned int flags = req->flags;
if (io_req_needs_clean(req))
io_clean_op(req);
if (!(flags & REQ_F_FIXED_FILE))
io_put_file(req->file);
if (req->fixed_rsrc_refs)
percpu_ref_put(req->fixed_rsrc_refs);
if (req->async_data) {
kfree(req->async_data);
req->async_data = NULL;
}
}
static void __io_free_req(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
io_dismantle_req(req);
io_put_task(req->task, 1);
spin_lock(&ctx->completion_lock);
list_add(&req->inflight_entry, &ctx->locked_free_list);
ctx->locked_free_nr++;
spin_unlock(&ctx->completion_lock);
percpu_ref_put(&ctx->refs);
}
static inline void io_remove_next_linked(struct io_kiocb *req)
{
struct io_kiocb *nxt = req->link;
req->link = nxt->link;
nxt->link = NULL;
}
static bool io_kill_linked_timeout(struct io_kiocb *req)
__must_hold(&req->ctx->completion_lock)
__must_hold(&req->ctx->timeout_lock)
{
struct io_kiocb *link = req->link;
if (link && link->opcode == IORING_OP_LINK_TIMEOUT) {
struct io_timeout_data *io = link->async_data;
io_remove_next_linked(req);
link->timeout.head = NULL;
if (hrtimer_try_to_cancel(&io->timer) != -1) {
list_del(&link->timeout.list);
io_cqring_fill_event(link->ctx, link->user_data,
-ECANCELED, 0);
io_put_req_deferred(link);
return true;
}
}
return false;
}
static void io_fail_links(struct io_kiocb *req)
__must_hold(&req->ctx->completion_lock)
{
struct io_kiocb *nxt, *link = req->link;
req->link = NULL;
while (link) {
long res = -ECANCELED;
if (link->flags & REQ_F_FAIL)
res = link->result;
nxt = link->link;
link->link = NULL;
trace_io_uring_fail_link(req, link);
io_cqring_fill_event(link->ctx, link->user_data, res, 0);
io_put_req_deferred(link);
link = nxt;
}
}
static bool io_disarm_next(struct io_kiocb *req)
__must_hold(&req->ctx->completion_lock)
{
bool posted = false;
if (req->flags & REQ_F_ARM_LTIMEOUT) {
struct io_kiocb *link = req->link;
req->flags &= ~REQ_F_ARM_LTIMEOUT;
if (link && link->opcode == IORING_OP_LINK_TIMEOUT) {
io_remove_next_linked(req);
io_cqring_fill_event(link->ctx, link->user_data,
-ECANCELED, 0);
io_put_req_deferred(link);
posted = true;
}
} else if (req->flags & REQ_F_LINK_TIMEOUT) {
struct io_ring_ctx *ctx = req->ctx;
spin_lock_irq(&ctx->timeout_lock);
posted = io_kill_linked_timeout(req);
spin_unlock_irq(&ctx->timeout_lock);
}
if (unlikely((req->flags & REQ_F_FAIL) &&
!(req->flags & REQ_F_HARDLINK))) {
posted |= (req->link != NULL);
io_fail_links(req);
}
return posted;
}
static struct io_kiocb *__io_req_find_next(struct io_kiocb *req)
{
struct io_kiocb *nxt;
/*
* If LINK is set, we have dependent requests in this chain. If we
* didn't fail this request, queue the first one up, moving any other
* dependencies to the next request. In case of failure, fail the rest
* of the chain.
*/
if (req->flags & IO_DISARM_MASK) {
struct io_ring_ctx *ctx = req->ctx;
bool posted;
spin_lock(&ctx->completion_lock);
posted = io_disarm_next(req);
if (posted)
io_commit_cqring(req->ctx);
spin_unlock(&ctx->completion_lock);
if (posted)
io_cqring_ev_posted(ctx);
}
nxt = req->link;
req->link = NULL;
return nxt;
}
static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
{
if (likely(!(req->flags & (REQ_F_LINK|REQ_F_HARDLINK))))
return NULL;
return __io_req_find_next(req);
}
static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
{
if (!ctx)
return;
if (*locked) {
if (ctx->submit_state.compl_nr)
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
*locked = false;
}
percpu_ref_put(&ctx->refs);
}
static void tctx_task_work(struct callback_head *cb)
{
bool locked = false;
struct io_ring_ctx *ctx = NULL;
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
while (1) {
struct io_wq_work_node *node;
if (!tctx->task_list.first && locked && ctx->submit_state.compl_nr)
io_submit_flush_completions(ctx);
spin_lock_irq(&tctx->task_lock);
node = tctx->task_list.first;
INIT_WQ_LIST(&tctx->task_list);
if (!node)
tctx->task_running = false;
spin_unlock_irq(&tctx->task_lock);
if (!node)
break;
do {
struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
if (req->ctx != ctx) {
ctx_flush_and_put(ctx, &locked);
ctx = req->ctx;
/* if not contended, grab and improve batching */
locked = mutex_trylock(&ctx->uring_lock);
percpu_ref_get(&ctx->refs);
}
req->io_task_work.func(req, &locked);
node = next;
} while (node);
cond_resched();
}
ctx_flush_and_put(ctx, &locked);
}
static void io_req_task_work_add(struct io_kiocb *req)
{
struct task_struct *tsk = req->task;
struct io_uring_task *tctx = tsk->io_uring;
enum task_work_notify_mode notify;
struct io_wq_work_node *node;
unsigned long flags;
bool running;
WARN_ON_ONCE(!tctx);
spin_lock_irqsave(&tctx->task_lock, flags);
wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
running = tctx->task_running;
if (!running)
tctx->task_running = true;
spin_unlock_irqrestore(&tctx->task_lock, flags);
/* task_work already pending, we're done */
if (running)
return;
/*
* SQPOLL kernel thread doesn't need notification, just a wakeup. For
* all other cases, use TWA_SIGNAL unconditionally to ensure we're
* processing task_work. There's no reliable way to tell if TWA_RESUME
* will do the job.
*/
notify = (req->ctx->flags & IORING_SETUP_SQPOLL) ? TWA_NONE : TWA_SIGNAL;
if (!task_work_add(tsk, &tctx->task_work, notify)) {
wake_up_process(tsk);
return;
}
spin_lock_irqsave(&tctx->task_lock, flags);
tctx->task_running = false;
node = tctx->task_list.first;
INIT_WQ_LIST(&tctx->task_list);
spin_unlock_irqrestore(&tctx->task_lock, flags);
while (node) {
req = container_of(node, struct io_kiocb, io_task_work.node);
node = node->next;
if (llist_add(&req->io_task_work.fallback_node,
&req->ctx->fallback_llist))
schedule_delayed_work(&req->ctx->fallback_work, 1);
}
}
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
/* not needed for normal modes, but SQPOLL depends on it */
io_tw_lock(ctx, locked);
io_req_complete_failed(req, req->result);
}
static void io_req_task_submit(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
io_tw_lock(ctx, locked);
/* req->task == current here, checking PF_EXITING is safe */
if (likely(!(req->task->flags & PF_EXITING)))
__io_queue_sqe(req);
else
io_req_complete_failed(req, -EFAULT);
}
static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
{
req->result = ret;
req->io_task_work.func = io_req_task_cancel;
io_req_task_work_add(req);
}
static void io_req_task_queue(struct io_kiocb *req)
{
req->io_task_work.func = io_req_task_submit;
io_req_task_work_add(req);
}
static void io_req_task_queue_reissue(struct io_kiocb *req)
{
req->io_task_work.func = io_queue_async_work;
io_req_task_work_add(req);
}
static inline void io_queue_next(struct io_kiocb *req)
{
struct io_kiocb *nxt = io_req_find_next(req);
if (nxt)
io_req_task_queue(nxt);
}
static void io_free_req(struct io_kiocb *req)
{
io_queue_next(req);
__io_free_req(req);
}
static void io_free_req_work(struct io_kiocb *req, bool *locked)
{
io_free_req(req);
}
struct req_batch {
struct task_struct *task;
int task_refs;
int ctx_refs;
};
static inline void io_init_req_batch(struct req_batch *rb)
{
rb->task_refs = 0;
rb->ctx_refs = 0;
rb->task = NULL;
}
static void io_req_free_batch_finish(struct io_ring_ctx *ctx,
struct req_batch *rb)
{
if (rb->ctx_refs)
percpu_ref_put_many(&ctx->refs, rb->ctx_refs);
if (rb->task)
io_put_task(rb->task, rb->task_refs);
}
static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req,
struct io_submit_state *state)
{
io_queue_next(req);
io_dismantle_req(req);
if (req->task != rb->task) {
if (rb->task)
io_put_task(rb->task, rb->task_refs);
rb->task = req->task;
rb->task_refs = 0;
}
rb->task_refs++;
rb->ctx_refs++;
if (state->free_reqs != ARRAY_SIZE(state->reqs))
state->reqs[state->free_reqs++] = req;
else
list_add(&req->inflight_entry, &state->free_list);
}
static void io_submit_flush_completions(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
struct io_submit_state *state = &ctx->submit_state;
int i, nr = state->compl_nr;
struct req_batch rb;
spin_lock(&ctx->completion_lock);
for (i = 0; i < nr; i++) {
struct io_kiocb *req = state->compl_reqs[i];
__io_cqring_fill_event(ctx, req->user_data, req->result,
req->compl.cflags);
}
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
io_init_req_batch(&rb);
for (i = 0; i < nr; i++) {
struct io_kiocb *req = state->compl_reqs[i];
if (req_ref_put_and_test(req))
io_req_free_batch(&rb, req, &ctx->submit_state);
}
io_req_free_batch_finish(ctx, &rb);
state->compl_nr = 0;
}
/*
* Drop reference to request, return next in chain (if there is one) if this
* was the last reference to this request.
*/
static inline struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
{
struct io_kiocb *nxt = NULL;
if (req_ref_put_and_test(req)) {
nxt = io_req_find_next(req);
__io_free_req(req);
}
return nxt;
}
static inline void io_put_req(struct io_kiocb *req)
{
if (req_ref_put_and_test(req))
io_free_req(req);
}
static inline void io_put_req_deferred(struct io_kiocb *req)
{
if (req_ref_put_and_test(req)) {
req->io_task_work.func = io_free_req_work;
io_req_task_work_add(req);
}
}
static unsigned io_cqring_events(struct io_ring_ctx *ctx)
{
/* See comment at the top of this file */
smp_rmb();
return __io_cqring_events(ctx);
}
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
/* make sure SQ entry isn't read before tail */
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}
static unsigned int io_put_kbuf(struct io_kiocb *req, struct io_buffer *kbuf)
{
unsigned int cflags;
cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
cflags |= IORING_CQE_F_BUFFER;
req->flags &= ~REQ_F_BUFFER_SELECTED;
kfree(kbuf);
return cflags;
}
static inline unsigned int io_put_rw_kbuf(struct io_kiocb *req)
{
struct io_buffer *kbuf;
if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
return 0;
kbuf = (struct io_buffer *) (unsigned long) req->rw.addr;
return io_put_kbuf(req, kbuf);
}
static inline bool io_run_task_work(void)
{
if (test_thread_flag(TIF_NOTIFY_SIGNAL) || current->task_works) {
__set_current_state(TASK_RUNNING);
tracehook_notify_signal();
return true;
}
return false;
}
/*
* Find and free completed poll iocbs
*/
static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
struct list_head *done)
{
struct req_batch rb;
struct io_kiocb *req;
/* order with ->result store in io_complete_rw_iopoll() */
smp_rmb();
io_init_req_batch(&rb);
while (!list_empty(done)) {
req = list_first_entry(done, struct io_kiocb, inflight_entry);
list_del(&req->inflight_entry);
if (READ_ONCE(req->result) == -EAGAIN &&
!(req->flags & REQ_F_DONT_REISSUE)) {
req->iopoll_completed = 0;
io_req_task_queue_reissue(req);
continue;
}
__io_cqring_fill_event(ctx, req->user_data, req->result,
io_put_rw_kbuf(req));
(*nr_events)++;
if (req_ref_put_and_test(req))
io_req_free_batch(&rb, req, &ctx->submit_state);
}
io_commit_cqring(ctx);
io_cqring_ev_posted_iopoll(ctx);
io_req_free_batch_finish(ctx, &rb);
}
static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
long min)
{
struct io_kiocb *req, *tmp;
LIST_HEAD(done);
bool spin;
/*
* Only spin for completions if we don't have multiple devices hanging
* off our complete list, and we're under the requested amount.
*/
spin = !ctx->poll_multi_queue && *nr_events < min;
list_for_each_entry_safe(req, tmp, &ctx->iopoll_list, inflight_entry) {
struct kiocb *kiocb = &req->rw.kiocb;
int ret;
/*
* Move completed and retryable entries to our local lists.
* If we find a request that requires polling, break out
* and complete those lists first, if we have entries there.
*/
if (READ_ONCE(req->iopoll_completed)) {
list_move_tail(&req->inflight_entry, &done);
continue;
}
if (!list_empty(&done))
break;
ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
if (unlikely(ret < 0))
return ret;
else if (ret)
spin = false;
/* iopoll may have completed current req */
if (READ_ONCE(req->iopoll_completed))
list_move_tail(&req->inflight_entry, &done);
}
if (!list_empty(&done))
io_iopoll_complete(ctx, nr_events, &done);
return 0;
}
/*
* We can't just wait for polled events to come to us, we have to actively
* find and complete them.
*/
static void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
{
if (!(ctx->flags & IORING_SETUP_IOPOLL))
return;
mutex_lock(&ctx->uring_lock);
while (!list_empty(&ctx->iopoll_list)) {
unsigned int nr_events = 0;
io_do_iopoll(ctx, &nr_events, 0);
/* let it sleep and repeat later if can't complete a request */
if (nr_events == 0)
break;
/*
* Ensure we allow local-to-the-cpu processing to take place,
* in this case we need to ensure that we reap all events.
* Also let task_work, etc. to progress by releasing the mutex
*/
if (need_resched()) {
mutex_unlock(&ctx->uring_lock);
cond_resched();
mutex_lock(&ctx->uring_lock);
}
}
mutex_unlock(&ctx->uring_lock);
}
static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
{
unsigned int nr_events = 0;
int ret = 0;
/*
* We disallow the app entering submit/complete with polling, but we
* still need to lock the ring to prevent racing with polled issue
* that got punted to a workqueue.
*/
mutex_lock(&ctx->uring_lock);
/*
* Don't enter poll loop if we already have events pending.
* If we do, we can potentially be spinning for commands that
* already triggered a CQE (eg in error).
*/
if (test_bit(0, &ctx->check_cq_overflow))
__io_cqring_overflow_flush(ctx, false);
if (io_cqring_events(ctx))
goto out;
do {
/*
* If a submit got punted to a workqueue, we can have the
* application entering polling for a command before it gets
* issued. That app will hold the uring_lock for the duration
* of the poll right here, so we need to take a breather every
* now and then to ensure that the issue has a chance to add
* the poll to the issued list. Otherwise we can spin here
* forever, while the workqueue is stuck trying to acquire the
* very same mutex.
*/
if (list_empty(&ctx->iopoll_list)) {
u32 tail = ctx->cached_cq_tail;
mutex_unlock(&ctx->uring_lock);
io_run_task_work();
mutex_lock(&ctx->uring_lock);
/* some requests don't go through iopoll_list */
if (tail != ctx->cached_cq_tail ||
list_empty(&ctx->iopoll_list))
break;
}
ret = io_do_iopoll(ctx, &nr_events, min);
} while (!ret && nr_events < min && !need_resched());
out:
mutex_unlock(&ctx->uring_lock);
return ret;
}
static void kiocb_end_write(struct io_kiocb *req)
{
/*
* Tell lockdep we inherited freeze protection from submission
* thread.
*/
if (req->flags & REQ_F_ISREG) {
struct super_block *sb = file_inode(req->file)->i_sb;
__sb_writers_acquired(sb, SB_FREEZE_WRITE);
sb_end_write(sb);
}
}
#ifdef CONFIG_BLOCK
static bool io_resubmit_prep(struct io_kiocb *req)
{
struct io_async_rw *rw = req->async_data;
if (!rw)
return !io_req_prep_async(req);
/* may have left rw->iter inconsistent on -EIOCBQUEUED */
iov_iter_revert(&rw->iter, req->result - iov_iter_count(&rw->iter));
return true;
}
static bool io_rw_should_reissue(struct io_kiocb *req)
{
umode_t mode = file_inode(req->file)->i_mode;
struct io_ring_ctx *ctx = req->ctx;
if (!S_ISBLK(mode) && !S_ISREG(mode))
return false;
if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
!(ctx->flags & IORING_SETUP_IOPOLL)))
return false;
/*
* If ref is dying, we might be running poll reap from the exit work.
* Don't attempt to reissue from that path, just let it fail with
* -EAGAIN.
*/
if (percpu_ref_is_dying(&ctx->refs))
return false;
/*
* Play it safe and assume not safe to re-import and reissue if we're
* not in the original thread group (or in task context).
*/
if (!same_thread_group(req->task, current) || !in_task())
return false;
return true;
}
#else
static bool io_resubmit_prep(struct io_kiocb *req)
{
return false;
}
static bool io_rw_should_reissue(struct io_kiocb *req)
{
return false;
}
#endif
static bool __io_complete_rw_common(struct io_kiocb *req, long res)
{
if (req->rw.kiocb.ki_flags & IOCB_WRITE)
kiocb_end_write(req);
if (res != req->result) {
if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
io_rw_should_reissue(req)) {
req->flags |= REQ_F_REISSUE;
return true;
}
req_set_fail(req);
req->result = res;
}
return false;
}
static void io_req_task_complete(struct io_kiocb *req, bool *locked)
{
unsigned int cflags = io_put_rw_kbuf(req);
long res = req->result;
if (*locked) {
struct io_ring_ctx *ctx = req->ctx;
struct io_submit_state *state = &ctx->submit_state;
io_req_complete_state(req, res, cflags);
state->compl_reqs[state->compl_nr++] = req;
if (state->compl_nr == ARRAY_SIZE(state->compl_reqs))
io_submit_flush_completions(ctx);
} else {
io_req_complete_post(req, res, cflags);
}
}
static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
unsigned int issue_flags)
{
if (__io_complete_rw_common(req, res))
return;
__io_req_complete(req, issue_flags, req->result, io_put_rw_kbuf(req));
}
static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
if (__io_complete_rw_common(req, res))
return;
req->result = res;
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
}
static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
if (kiocb->ki_flags & IOCB_WRITE)
kiocb_end_write(req);
if (unlikely(res != req->result)) {
if (!(res == -EAGAIN && io_rw_should_reissue(req) &&
io_resubmit_prep(req))) {
req_set_fail(req);
req->flags |= REQ_F_DONT_REISSUE;
}
}
WRITE_ONCE(req->result, res);
/* order with io_iopoll_complete() checking ->result */
smp_wmb();
WRITE_ONCE(req->iopoll_completed, 1);
}
/*
* After the iocb has been issued, it's safe to be found on the poll list.
* Adding the kiocb to the list AFTER submission ensures that we don't
* find it from a io_do_iopoll() thread before the issuer is done
* accessing the kiocb cookie.
*/
static void io_iopoll_req_issued(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
const bool in_async = io_wq_current_is_worker();
/* workqueue context doesn't hold uring_lock, grab it now */
if (unlikely(in_async))
mutex_lock(&ctx->uring_lock);
/*
* Track whether we have multiple files in our lists. This will impact
* how we do polling eventually, not spinning if we're on potentially
* different devices.
*/
if (list_empty(&ctx->iopoll_list)) {
ctx->poll_multi_queue = false;
} else if (!ctx->poll_multi_queue) {
struct io_kiocb *list_req;
unsigned int queue_num0, queue_num1;
list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb,
inflight_entry);
if (list_req->file != req->file) {
ctx->poll_multi_queue = true;
} else {
queue_num0 = blk_qc_t_to_queue_num(list_req->rw.kiocb.ki_cookie);
queue_num1 = blk_qc_t_to_queue_num(req->rw.kiocb.ki_cookie);
if (queue_num0 != queue_num1)
ctx->poll_multi_queue = true;
}
}
/*
* For fast devices, IO may have already completed. If it has, add
* it to the front so we find it first.
*/
if (READ_ONCE(req->iopoll_completed))
list_add(&req->inflight_entry, &ctx->iopoll_list);
else
list_add_tail(&req->inflight_entry, &ctx->iopoll_list);
if (unlikely(in_async)) {
/*
* If IORING_SETUP_SQPOLL is enabled, sqes are either handle
* in sq thread task context or in io worker task context. If
* current task context is sq thread, we don't need to check
* whether should wake up sq thread.
*/
if ((ctx->flags & IORING_SETUP_SQPOLL) &&
wq_has_sleeper(&ctx->sq_data->wait))
wake_up(&ctx->sq_data->wait);
mutex_unlock(&ctx->uring_lock);
}
}
static bool io_bdev_nowait(struct block_device *bdev)
{
return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
}
/*
* If we tracked the file through the SCM inflight mechanism, we could support
* any file. For now, just ensure that anything potentially problematic is done
* inline.
*/
static bool __io_file_supports_nowait(struct file *file, int rw)
{
umode_t mode = file_inode(file)->i_mode;
if (S_ISBLK(mode)) {
if (IS_ENABLED(CONFIG_BLOCK) &&
io_bdev_nowait(I_BDEV(file->f_mapping->host)))
return true;
return false;
}
if (S_ISSOCK(mode))
return true;
if (S_ISREG(mode)) {
if (IS_ENABLED(CONFIG_BLOCK) &&
io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
file->f_op != &io_uring_fops)
return true;
return false;
}
/* any ->read/write should understand O_NONBLOCK */
if (file->f_flags & O_NONBLOCK)
return true;
if (!(file->f_mode & FMODE_NOWAIT))
return false;
if (rw == READ)
return file->f_op->read_iter != NULL;
return file->f_op->write_iter != NULL;
}
static bool io_file_supports_nowait(struct io_kiocb *req, int rw)
{
if (rw == READ && (req->flags & REQ_F_NOWAIT_READ))
return true;
else if (rw == WRITE && (req->flags & REQ_F_NOWAIT_WRITE))
return true;
return __io_file_supports_nowait(req->file, rw);
}
static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
struct kiocb *kiocb = &req->rw.kiocb;
struct file *file = req->file;
unsigned ioprio;
int ret;
if (!io_req_ffs_set(req) && S_ISREG(file_inode(file)->i_mode))
req->flags |= REQ_F_ISREG;
kiocb->ki_pos = READ_ONCE(sqe->off);
if (kiocb->ki_pos == -1 && !(file->f_mode & FMODE_STREAM)) {
req->flags |= REQ_F_CUR_POS;
kiocb->ki_pos = file->f_pos;
}
kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
if (unlikely(ret))
return ret;
/* don't allow async punt for O_NONBLOCK or RWF_NOWAIT */
if ((kiocb->ki_flags & IOCB_NOWAIT) || (file->f_flags & O_NONBLOCK))
req->flags |= REQ_F_NOWAIT;
ioprio = READ_ONCE(sqe->ioprio);
if (ioprio) {
ret = ioprio_check_cap(ioprio);
if (ret)
return ret;
kiocb->ki_ioprio = ioprio;
} else
kiocb->ki_ioprio = get_current_ioprio();
if (ctx->flags & IORING_SETUP_IOPOLL) {
if (!(kiocb->ki_flags & IOCB_DIRECT) ||
!kiocb->ki_filp->f_op->iopoll)
return -EOPNOTSUPP;
kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
kiocb->ki_complete = io_complete_rw_iopoll;
req->iopoll_completed = 0;
} else {
if (kiocb->ki_flags & IOCB_HIPRI)
return -EINVAL;
kiocb->ki_complete = io_complete_rw;
}
if (req->opcode == IORING_OP_READ_FIXED ||
req->opcode == IORING_OP_WRITE_FIXED) {
req->imu = NULL;
io_req_set_rsrc_node(req);
}
req->rw.addr = READ_ONCE(sqe->addr);
req->rw.len = READ_ONCE(sqe->len);
req->buf_index = READ_ONCE(sqe->buf_index);
return 0;
}
static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
{
switch (ret) {
case -EIOCBQUEUED:
break;
case -ERESTARTSYS:
case -ERESTARTNOINTR:
case -ERESTARTNOHAND:
case -ERESTART_RESTARTBLOCK:
/*
* We can't just restart the syscall, since previously
* submitted sqes may already be in progress. Just fail this
* IO with EINTR.
*/
ret = -EINTR;
fallthrough;
default:
kiocb->ki_complete(kiocb, ret, 0);
}
}
static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
unsigned int issue_flags)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
struct io_async_rw *io = req->async_data;
bool check_reissue = kiocb->ki_complete == io_complete_rw;
/* add previously done IO, if any */
if (io && io->bytes_done > 0) {
if (ret < 0)
ret = io->bytes_done;
else
ret += io->bytes_done;
}
if (req->flags & REQ_F_CUR_POS)
req->file->f_pos = kiocb->ki_pos;
if (ret >= 0 && check_reissue)
__io_complete_rw(req, ret, 0, issue_flags);
else
io_rw_done(kiocb, ret);
if (check_reissue && (req->flags & REQ_F_REISSUE)) {
req->flags &= ~REQ_F_REISSUE;
if (io_resubmit_prep(req)) {
io_req_task_queue_reissue(req);
} else {
req_set_fail(req);
__io_req_complete(req, issue_flags, ret,
io_put_rw_kbuf(req));
}
}
}
static int __io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter,
struct io_mapped_ubuf *imu)
{
size_t len = req->rw.len;
u64 buf_end, buf_addr = req->rw.addr;
size_t offset;
if (unlikely(check_add_overflow(buf_addr, (u64)len, &buf_end)))
return -EFAULT;
/* not inside the mapped region */
if (unlikely(buf_addr < imu->ubuf || buf_end > imu->ubuf_end))
return -EFAULT;
/*
* May not be a start of buffer, set size appropriately
* and advance us to the beginning.
*/
offset = buf_addr - imu->ubuf;
iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
if (offset) {
/*
* Don't use iov_iter_advance() here, as it's really slow for
* using the latter parts of a big fixed buffer - it iterates
* over each segment manually. We can cheat a bit here, because
* we know that:
*
* 1) it's a BVEC iter, we set it up
* 2) all bvecs are PAGE_SIZE in size, except potentially the
* first and last bvec
*
* So just find our index, and adjust the iterator afterwards.
* If the offset is within the first bvec (or the whole first
* bvec, just use iov_iter_advance(). This makes it easier
* since we can just skip the first segment, which may not
* be PAGE_SIZE aligned.
*/
const struct bio_vec *bvec = imu->bvec;
if (offset <= bvec->bv_len) {
iov_iter_advance(iter, offset);
} else {
unsigned long seg_skip;
/* skip first vec */
offset -= bvec->bv_len;
seg_skip = 1 + (offset >> PAGE_SHIFT);
iter->bvec = bvec + seg_skip;
iter->nr_segs -= seg_skip;
iter->count -= bvec->bv_len + offset;
iter->iov_offset = offset & ~PAGE_MASK;
}
}
return 0;
}
static int io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_mapped_ubuf *imu = req->imu;
u16 index, buf_index = req->buf_index;
if (likely(!imu)) {
if (unlikely(buf_index >= ctx->nr_user_bufs))
return -EFAULT;
index = array_index_nospec(buf_index, ctx->nr_user_bufs);
imu = READ_ONCE(ctx->user_bufs[index]);
req->imu = imu;
}
return __io_import_fixed(req, rw, iter, imu);
}
static void io_ring_submit_unlock(struct io_ring_ctx *ctx, bool needs_lock)
{
if (needs_lock)
mutex_unlock(&ctx->uring_lock);
}
static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock)
{
/*
* "Normal" inline submissions always hold the uring_lock, since we
* grab it from the system call. Same is true for the SQPOLL offload.
* The only exception is when we've detached the request and issue it
* from an async worker thread, grab the lock for that case.
*/
if (needs_lock)
mutex_lock(&ctx->uring_lock);
}
static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len,
int bgid, struct io_buffer *kbuf,
bool needs_lock)
{
struct io_buffer *head;
if (req->flags & REQ_F_BUFFER_SELECTED)
return kbuf;
io_ring_submit_lock(req->ctx, needs_lock);
lockdep_assert_held(&req->ctx->uring_lock);
head = xa_load(&req->ctx->io_buffers, bgid);
if (head) {
if (!list_empty(&head->list)) {
kbuf = list_last_entry(&head->list, struct io_buffer,
list);
list_del(&kbuf->list);
} else {
kbuf = head;
xa_erase(&req->ctx->io_buffers, bgid);
}
if (*len > kbuf->len)
*len = kbuf->len;
} else {
kbuf = ERR_PTR(-ENOBUFS);
}
io_ring_submit_unlock(req->ctx, needs_lock);
return kbuf;
}
static void __user *io_rw_buffer_select(struct io_kiocb *req, size_t *len,
bool needs_lock)
{
struct io_buffer *kbuf;
u16 bgid;
kbuf = (struct io_buffer *) (unsigned long) req->rw.addr;
bgid = req->buf_index;
kbuf = io_buffer_select(req, len, bgid, kbuf, needs_lock);
if (IS_ERR(kbuf))
return kbuf;
req->rw.addr = (u64) (unsigned long) kbuf;
req->flags |= REQ_F_BUFFER_SELECTED;
return u64_to_user_ptr(kbuf->addr);
}
#ifdef CONFIG_COMPAT
static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
bool needs_lock)
{
struct compat_iovec __user *uiov;
compat_ssize_t clen;
void __user *buf;
ssize_t len;
uiov = u64_to_user_ptr(req->rw.addr);
if (!access_ok(uiov, sizeof(*uiov)))
return -EFAULT;
if (__get_user(clen, &uiov->iov_len))
return -EFAULT;
if (clen < 0)
return -EINVAL;
len = clen;
buf = io_rw_buffer_select(req, &len, needs_lock);
if (IS_ERR(buf))
return PTR_ERR(buf);
iov[0].iov_base = buf;
iov[0].iov_len = (compat_size_t) len;
return 0;
}
#endif
static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
bool needs_lock)
{
struct iovec __user *uiov = u64_to_user_ptr(req->rw.addr);
void __user *buf;
ssize_t len;
if (copy_from_user(iov, uiov, sizeof(*uiov)))
return -EFAULT;
len = iov[0].iov_len;
if (len < 0)
return -EINVAL;
buf = io_rw_buffer_select(req, &len, needs_lock);
if (IS_ERR(buf))
return PTR_ERR(buf);
iov[0].iov_base = buf;
iov[0].iov_len = len;
return 0;
}
static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
bool needs_lock)
{
if (req->flags & REQ_F_BUFFER_SELECTED) {
struct io_buffer *kbuf;
kbuf = (struct io_buffer *) (unsigned long) req->rw.addr;
iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
iov[0].iov_len = kbuf->len;
return 0;
}
if (req->rw.len != 1)
return -EINVAL;
#ifdef CONFIG_COMPAT
if (req->ctx->compat)
return io_compat_import(req, iov, needs_lock);
#endif