netfs: Add some TCP receive queue helpers Add some helpers to splice buffers from a TCP receive queue to a private queue from where they can be processed without the need to hold the socket lock. This is particularly significant if a large amount of data is being copied, say for a read-data RPC call. Signed-off-by: David Howells <dhowells@redhat.com> cc: Steve French <sfrench@samba.org> cc: Paulo Alcantara <pc@manguebit.org> cc: Shyam Prasad N <sprasad@microsoft.com> cc: Tom Talpey <tom@talpey.com> cc: linux-cifs@vger.kernel.org cc: netfs@lists.linux.dev cc: linux-fsdevel@vger.kernel.org
diff --git a/fs/netfs/Makefile b/fs/netfs/Makefile index 9cfc3cc..66e6426 100644 --- a/fs/netfs/Makefile +++ b/fs/netfs/Makefile
@@ -20,7 +20,10 @@ netfs-$(CONFIG_NETFS_PGPRIV2) += read_pgpriv2.o netfs-$(CONFIG_NETFS_STATS) += stats.o -netfs-$(CONFIG_INET) += tcp_splice.o + +netfs-$(CONFIG_INET) += \ + rxqueue.o \ + tcp_splice.o netfs-$(CONFIG_FSCACHE) += \ fscache_cache.o \
diff --git a/fs/netfs/rxqueue.c b/fs/netfs/rxqueue.c new file mode 100644 index 0000000..e40c4e2 --- /dev/null +++ b/fs/netfs/rxqueue.c
@@ -0,0 +1,532 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* Receive buffer handling. Move memory copy outside of the socket lock. + * + * Copyright (C) 2026 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + */ + +#include <linux/delay.h> +#include "internal.h" + +/* + * Get a ref on an Rx page. + */ +static void netfs_get_rx_page(struct bio_vec *bv) +{ + get_page(bv->bv_page); +} + +/* + * Drop a ref on an Rx page. + */ +static void netfs_put_rx_page(struct bio_vec *bv) +{ + put_page(bv->bv_page); +} + +/* + * netfs_alloc_rx_bvecq - Allocate a receive buffer. + * nr_bv: Number of slots to allocate + * + * Allocate a receive buffer with the specified number of slots. This function + * will not fail, though it has no upper bound on the time taken to succeed. + * + * Return: The new buffer. + */ +struct bvecq *netfs_alloc_rx_bvecq(unsigned int nr_bv) +{ + struct bvecq *bq; + + for (;;) { + bq = bvecq_alloc_one(nr_bv, GFP_NOFS); + if (bq) + return bq; + msleep(50); + } +} +EXPORT_SYMBOL(netfs_alloc_rx_bvecq); + +/** + * netfs_put_rx_bvecq - Put a receive buffer chain + * @bq: The head buffer to put + * + * Put a ref on the first buffer segment in a bvecq chain and if that reaches + * zero, destroy it, move on to the next buffer and repeat the put. + */ +void netfs_put_rx_bvecq(struct bvecq *bq) +{ + struct bvecq *next; + + for (; bq; bq = next) { + if (!refcount_dec_and_test(&bq->ref)) + break; + for (int seg = 0; seg < bq->nr_slots; seg++) + if (bq->bv[seg].bv_page) + netfs_put_rx_page(&bq->bv[seg]); + next = bq->next; + kfree(bq); + } +} +EXPORT_SYMBOL(netfs_put_rx_bvecq); + +/** + * netfs_rxqueue_read_iter - Read data from a queue to an iterator + * @rxq: The receive queue to read + * @dest: The buffer to fill + * @skip: The amount of data in the queue to skip over + * @amount: The amount of data to read + * + * Copy data from the receive queue to an iterator. The data doesn't have to + * lie at the beginning of the buffer, but the initial unwanted data can be + * skipped over. + * + * Note that this does not discard any data from the queue. + * + * Return: The amount of data copied. 0 is returned on failure or if @amount + * is 0. + */ +size_t netfs_rxqueue_read_iter(const struct netfs_rxqueue *rxq, + struct iov_iter *dest, size_t skip, size_t amount) +{ + const struct bvecq *from = rxq->take_from; + unsigned int slot = rxq->take_slot; + size_t qsize = rxq->qsize, copied = 0; + + if (WARN(amount > iov_iter_count(dest), + "MSG=%x %zx > %zx", + rxq->msg_id, amount, iov_iter_count(dest))) + amount = iov_iter_count(dest); + + if (skip > rxq->pdu_remain) { + pr_warn("Rx over-skip %zx > %x\n", skip, rxq->pdu_remain); + return 0; + } + if (amount > rxq->pdu_remain - skip) { + pr_warn("Rx over-read %zx+%zx > %x\n", + skip, amount, rxq->pdu_remain); + amount = rxq->pdu_remain - skip; + if (amount == 0) + return 0; + } + if (amount > qsize) + amount = qsize; + + skip += rxq->take_offset; + + while (copied < amount) { + if (slot >= from->nr_slots) { + slot = 0; + from = from->next; + if (!from) + break; + } + + const struct bio_vec *bv = &from->bv[slot]; + size_t blen = bv->bv_len; + + if (skip >= blen) { + skip -= blen; + slot++; + continue; + } + + size_t part = umin(blen - skip, amount - copied), got; + + trace_netfs_rxq_read(rxq, slot, skip, part, copied); + + got = copy_page_to_iter(bv->bv_page, + bv->bv_offset + skip, part, dest); + if (WARN_ON(got != part)) { + bvecq_dump(rxq->take_from); + return 0; + } + + copied += part; + skip = 0; + slot++; + } + + if (amount != copied) + pr_warn("Failed to fully read %zx/%zx %x %pSR\n", + copied, amount, rxq->qsize, __builtin_return_address(0)); + return copied; +} +EXPORT_SYMBOL(netfs_rxqueue_read_iter); + +/** + * netfs_rxqueue_read - Read data from a queue to a flat buffer + * @rxq: The receive queue to read + * @buffer: The buffer to fill + * @skip: The amount of data in the queue to skip over + * @amount: The amount of data to read + * + * Copy data from the receive queue to a flat buffer. The data doesn't have to + * lie at the beginning of the buffer, but the initial unwanted data can be + * skipped over. + * + * Note that this does not discard any data from the queue. + * + * Return: The amount of data copied. 0 is returned on failure or if @amount + * is 0. + */ +size_t netfs_rxqueue_read(const struct netfs_rxqueue *rxq, + void *buffer, size_t skip, size_t amount) +{ + struct iov_iter iter; + struct kvec kv = { .iov_base = buffer, .iov_len = amount }; + + iov_iter_kvec(&iter, ITER_DEST, &kv, 1, amount); + return netfs_rxqueue_read_iter(rxq, &iter, skip, amount); +} +EXPORT_SYMBOL(netfs_rxqueue_read); + +/** + * netfs_rxqueue_discard - Discard data from a queue + * @rxq: The receive queue to modify + * @amount: The amount of data to discard + * + * Discard the specified amount of data from @rxq. This may free pages and + * bvecq segments. In the event that the last bvecq is entirely cleared, it + * will be refurbished and prepared for future refilling. + */ +void netfs_rxqueue_discard(struct netfs_rxqueue *rxq, size_t amount) +{ + struct bvecq *from = rxq->take_from, *dead; + unsigned int offset = rxq->take_offset; + unsigned int slot = rxq->take_slot; + size_t qsize = rxq->qsize; + + if (amount > rxq->pdu_remain) { + pr_warn("Rx over discard %zx > %x\n", amount, rxq->pdu_remain); + amount = rxq->pdu_remain; + } + + rxq->pdu_remain -= amount; + + for (;;) { + if (slot >= from->nr_slots) { + slot = 0; + offset = 0; + if (!from->next) { + /* Refurbish the final bvecq. add_to is NULL + * for a queue excerpt. + */ + if (!rxq->add_to) { + netfs_put_rx_bvecq(from); + from = NULL; + break; + } + + WARN_ON_ONCE(from != rxq->add_to); + from->nr_slots = 0; + rxq->add_to = from; + break; + } + dead = from; + from = dead->next; + from->prev = NULL; + dead->next = NULL; + netfs_put_rx_bvecq(dead); + } + + if (!amount) + break; + + struct bio_vec *bv = &from->bv[slot]; + + if (offset < bv->bv_len) { + size_t part = umin(umin(bv->bv_len - offset, amount), qsize); + offset += part; + amount -= part; + qsize -= part; + if (offset < bv->bv_len) + break; + } + + if (!WARN_ON_ONCE(!bv->bv_page)) + netfs_put_rx_page(bv); + bv->bv_page = NULL; + bv->bv_offset = 0; + bv->bv_len = 0; + slot++; + offset = 0; + } + + if (amount > 0) + pr_warn("Failed to fully discard %zx %zx %pSR\n", + amount, qsize, __builtin_return_address(0)); + + rxq->take_from = from; + rxq->take_slot = slot; + rxq->take_offset = offset; + rxq->qsize = qsize; +} +EXPORT_SYMBOL(netfs_rxqueue_discard); + +/** + * netfs_rxqueue_count - Count the number of segs holding the specified data + * @rxq: The receive queue to assess + * @amount: The amount of data desired + * + * Count the number of segments in the receive queue that the requested amount + * of data spans. + * + * Return: Segment count; 0 is returned if there is in sufficient data. + */ +unsigned int netfs_rxqueue_count(const struct netfs_rxqueue *rxq, size_t amount) +{ + const struct bvecq *from = rxq->take_from; + unsigned int offset = rxq->take_offset; + unsigned int count = 0; + unsigned int slot = rxq->take_slot; + + while (amount > 0) { + if (slot >= from->nr_slots) { + if (WARN_ON_ONCE(!from->next)) + return 0; + from = from->next; + slot = 0; + offset = 0; + } + + const struct bio_vec *bv = &from->bv[slot]; + + if (offset < bv->bv_len) { + count++; + if (bv->bv_len - offset >= amount) + return count; + amount -= bv->bv_len - offset; + } + + slot++; + offset = 0; + } + + return count; +} +EXPORT_SYMBOL(netfs_rxqueue_count); + +/* + * Append (part of) a segment to a bvec queue and get/transfer a page count. + * If we use up to the end of the source segment, we steal the page ref and + * clear the segment. + */ +static void netfs_bvecq_append_seg(struct bvecq **pdestq, struct bio_vec *sv, + size_t offset, size_t len) +{ + struct bvecq *destq = *pdestq; + struct bio_vec *dv; + + if (bvecq_is_full(destq)) { + destq = destq->next; + *pdestq = destq; + } + dv = &destq->bv[destq->nr_slots++]; + *dv = *sv; + if (offset > 0) { + dv->bv_offset += offset; + dv->bv_len -= offset; + } + if (len < dv->bv_len) { + dv->bv_len = len; + netfs_get_rx_page(dv); + } else { + sv->bv_page = NULL; + sv->bv_offset = 0; + sv->bv_len = 0; + } +} + +/** + * netfs_rxqueue_decant - Decant data segs to a private queue + * @rxq: The receive queue to decant from + * @amount: The amount of data received + * + * Decant data from the receive queue to a private queue. This prevents the + * receive queue keeping the buffers pinned. + * + * Return: A bvecq chain, with ref, holding the decanted data or NULL if out of + * memory. + */ +struct bvecq *netfs_rxqueue_decant(struct netfs_rxqueue *rxq, size_t amount) +{ + struct bvecq *head_bq = NULL, *pbq, *destq; + struct bvecq *from = rxq->take_from; + unsigned int need_segs; + unsigned int offset = rxq->take_offset; + unsigned int slot = rxq->take_slot; + size_t qsize = rxq->qsize; + + if (amount > rxq->pdu_remain) { + pr_warn("Rx over decant %zx > %x\n", + amount, rxq->pdu_remain); + amount = rxq->pdu_remain; + } + + /* Count the number of segments in the queue for this PDU and then + * allocate sufficient bvecq capacity to hold the whole PDU. + */ + need_segs = netfs_rxqueue_count(rxq, amount); + while (need_segs > 0) { + struct bvecq *b; + unsigned int max_bv = (PAGE_SIZE - sizeof(*b)) / sizeof(b->bv[0]); + unsigned int nr_bv = umin(need_segs, max_bv); + + need_segs -= nr_bv; + b = netfs_alloc_rx_bvecq(nr_bv); + if (!b) + goto nomem; + b->prev = pbq; + if (head_bq) + pbq->next = b; + else + head_bq = b; + pbq = b; + } + + rxq->pdu_remain -= amount; + destq = head_bq; + for (;;) { + if (slot >= from->nr_slots) { + struct bvecq *dead; + + slot = 0; + offset = 0; + if (!from->next) { + /* Refurbish the final bvecq. */ + WARN_ON_ONCE(from != rxq->add_to); + from->nr_slots = 0; + rxq->add_to = from; + break; + } + dead = from; + from = dead->next; + from->prev = NULL; + dead->next = NULL; + netfs_put_rx_bvecq(dead); + } + + if (!amount) + break; + + struct bio_vec *bv = &from->bv[slot]; + + if (offset < bv->bv_len) { + size_t part = umin(umin(bv->bv_len - offset, amount), qsize); + netfs_bvecq_append_seg(&destq, bv, offset, part); + offset += part; + amount -= part; + qsize -= part; + if (offset < bv->bv_len) + break; + } + + if (WARN_ON_ONCE(bv->bv_page)) { + netfs_put_rx_page(bv); + bv->bv_page = NULL; + bv->bv_offset = 0; + bv->bv_len = 0; + } + slot++; + offset = 0; + } + + rxq->take_from = from; + rxq->take_slot = slot; + rxq->take_offset = offset; + rxq->qsize = qsize; + return head_bq; + +nomem: + netfs_put_rx_bvecq(head_bq); + return NULL; +} +EXPORT_SYMBOL(netfs_rxqueue_decant); + +/** + * netfs_rxqueue_tcp_refill - Refill receive queue by TCP splice + * @tcp_sock: The TCP socket to splice data from + * @rxq: The Rx queue to splice into + * @min_size: The amount of data we're interested in + * + * Refill the receive queue by splicing network receive buffer segments from a + * TCP socket, but don't wait for data. The caller must do any waiting + * required. + * + * Note that whilst the peer may send PDUs in separate TCP packets, it's + * possible that the local NIC may join them back together if doing receive + * offload. + */ +int netfs_rxqueue_tcp_refill(struct socket *tcp_sock, struct netfs_rxqueue *rxq, + size_t min_size) +{ + struct bvecq *add_to = rxq->add_to; + size_t qsize = rxq->qsize; + int rc = 0; + + if (!rxq->refillable) { + WARN_ON(min_size > qsize); + return 0; + } + if (qsize >= min_size && min_size > 0) + return 0; + + do { + if (!add_to || bvecq_is_full(add_to)) { + struct bvecq *b; + unsigned int nr_bv = (2048 - sizeof(*add_to)) / sizeof(add_to->bv[0]); + + b = netfs_alloc_rx_bvecq(nr_bv); + b->prev = add_to; + if (!add_to) + rxq->take_from = b; + else + add_to->next = b; + add_to = b; + } + + rc = netfs_tcp_splice_to_bvecq(tcp_sock, add_to, INT_MAX); + //trace_smb3_tcp_splice(rc); + if (rc < 0) + break; + + qsize += rc; + rc = 0; + } while (qsize < min_size); + + rxq->add_to = add_to; + rxq->qsize = qsize; + return rc; +} +EXPORT_SYMBOL(netfs_rxqueue_tcp_refill); + +/** + * netfs_rxqueue_tcp_consume - Receive and discard data from a receive queue + * @tcp_sock: The TCP socket that's the data source + * @rxq: The Rx queue to discard from + * @amount: The amount of data to discard + * + * Consume received data by receiving it if it's not already queued and then + * discarding it. This function does no waiting, so the caller must do the + * waiting and repeatedly call it until the desired amount of data is consumed. + */ +int netfs_rxqueue_tcp_consume(struct socket *tcp_sock, struct netfs_rxqueue *rxq, + size_t amount) +{ + while (amount) { + size_t part = umin(amount, rxq->qsize); + int rc; + + amount -= part; + netfs_rxqueue_discard(rxq, part); + + if (!amount) + break; + + rc = netfs_rxqueue_tcp_refill(tcp_sock, rxq, 1); + if (rc < 0) + return rc; + } + return 0; +} +EXPORT_SYMBOL(netfs_rxqueue_tcp_consume);
diff --git a/include/linux/netfs.h b/include/linux/netfs.h index cdcb6ff..5a81ff6 100644 --- a/include/linux/netfs.h +++ b/include/linux/netfs.h
@@ -422,6 +422,22 @@ struct netfs_cache_ops { enum netfs_cache_collect block_type); }; +/* + * (An excerpt from) a receive queue. Data buffers can be spliced out of, say, + * a TCP socket into a sequence of these and then a variety of helpers used + * to manipulate them and extract data. + */ +struct netfs_rxqueue { + struct bvecq *add_to; /* Where to add data to the Rx queue */ + struct bvecq *take_from; /* Where to take data from the Rx queue */ + unsigned int take_offset; /* Current offset in rx_take_slot */ + unsigned short take_slot; /* Current slot in rx_take_from */ + bool refillable; /* T if refillable; F if excerpt */ + unsigned int qsize; /* Amount of data in rx_queue */ + unsigned int pdu_remain; /* Amount of current PDU left */ + unsigned int msg_id; /* ID to log in tracepoints as MSG=xx */ +}; + /* High-level read API. */ ssize_t netfs_unbuffered_read_iter_locked(struct kiocb *iocb, struct iov_iter *iter); ssize_t netfs_unbuffered_read_iter(struct kiocb *iocb, struct iov_iter *iter); @@ -483,6 +499,21 @@ void netfs_end_io_write(struct inode *inode); int netfs_start_io_direct(struct inode *inode); void netfs_end_io_direct(struct inode *inode); +/* Receive queue API. */ +struct bvecq *netfs_alloc_rx_bvecq(unsigned int nr_bv); +void netfs_put_rx_bvecq(struct bvecq *bq); +size_t netfs_rxqueue_read_iter(const struct netfs_rxqueue *rxq, + struct iov_iter *dest, size_t skip, size_t amount); +size_t netfs_rxqueue_read(const struct netfs_rxqueue *rxq, + void *buffer, size_t skip, size_t amount); +void netfs_rxqueue_discard(struct netfs_rxqueue *rxq, size_t amount); +unsigned int netfs_rxqueue_count(const struct netfs_rxqueue *rxq, size_t amount); +struct bvecq *netfs_rxqueue_decant(struct netfs_rxqueue *rxq, size_t amount); +int netfs_rxqueue_tcp_refill(struct socket *tcp_sock, struct netfs_rxqueue *rxq, + size_t min_size); +int netfs_rxqueue_tcp_consume(struct socket *tcp_sock, struct netfs_rxqueue *rxq, + size_t amount); + /* TCP transport helper API. */ #ifdef CONFIG_INET ssize_t netfs_tcp_splice_to_bvecq(struct socket *sock, struct bvecq *bvecq, size_t len);
diff --git a/include/trace/events/netfs.h b/include/trace/events/netfs.h index fbd0003..96a2c1f 100644 --- a/include/trace/events/netfs.h +++ b/include/trace/events/netfs.h
@@ -855,6 +855,34 @@ TRACE_EVENT(netfs_bv_slot, __entry->pfn, __entry->offset, __entry->offset + __entry->len) ); +TRACE_EVENT(netfs_rxq_read, + TP_PROTO(const struct netfs_rxqueue *rxq, unsigned int slot, + size_t skip, size_t part, size_t copied), + + TP_ARGS(rxq, slot, skip, part, copied), + + TP_STRUCT__entry( + __field(unsigned int, msg_id) + __field(unsigned int, slot) + __field(unsigned int, skip) + __field(unsigned int, part) + __field(unsigned int, copied) + ), + + TP_fast_assign( + __entry->msg_id = rxq->msg_id; + __entry->slot = slot; + __entry->skip = skip; + __entry->part = part; + __entry->copied = copied; + ), + + TP_printk("MSG=%08x [%02x] b=%04x-%04x c=%04x", + __entry->msg_id, __entry->slot, + __entry->skip, __entry->skip + __entry->part, + __entry->copied) + ); + #undef EM #undef E_ #endif /* _TRACE_NETFS_H */