Commit 51c78a4d authored by Jakub Kicinski's avatar Jakub Kicinski

Merge branch 'splice-net-replace-sendpage-with-sendmsg-msg_splice_pages-part-1'

David Howells says:

====================
splice, net: Replace sendpage with sendmsg(MSG_SPLICE_PAGES), part 1

Here's the first tranche of patches towards providing a MSG_SPLICE_PAGES
internal sendmsg flag that is intended to replace the ->sendpage() op with
calls to sendmsg().  MSG_SPLICE_PAGES is a hint that tells the protocol
that it should splice the pages supplied if it can and copy them if not.

This will allow splice to pass multiple pages in a single call and allow
certain parts of higher protocols (e.g. sunrpc, iwarp) to pass an entire
message in one go rather than having to send them piecemeal.  This should
also make it easier to handle the splicing of multipage folios.

A helper, skb_splice_from_iter() is provided to do the work of splicing or
copying data from an iterator.  If a page is determined to be unspliceable
(such as being in the slab), then the helper will give an error.

Note that this facility is not made available to userspace and does not
provide any sort of callback.

This set consists of the following parts:

 (1) Define the MSG_SPLICE_PAGES flag and prevent sys_sendmsg() from being
     able to set it.

 (2) Add an extra argument to skb_append_pagefrags() so that something
     other than MAX_SKB_FRAGS can be used (sysctl_max_skb_frags for
     example).

 (3) Add the skb_splice_from_iter() helper to handle splicing pages into
     skbuffs for MSG_SPLICE_PAGES that can be shared by TCP, IP/UDP and
     AF_UNIX.

 (4) Implement MSG_SPLICE_PAGES support in TCP.

 (5) Make do_tcp_sendpages() just wrap sendmsg() and then fold it in to its
     various callers.

 (6) Implement MSG_SPLICE_PAGES support in IP and make udp_sendpage() just
     a wrapper around sendmsg().

 (7) Implement MSG_SPLICE_PAGES support in IP6/UDP6.

 (8) Implement MSG_SPLICE_PAGES support in AF_UNIX.

 (9) Make AF_UNIX copy unspliceable pages.

Link: https://lore.kernel.org/r/20230316152618.711970-1-dhowells@redhat.com/ # v1
Link: https://lore.kernel.org/r/20230329141354.516864-1-dhowells@redhat.com/ # v2
Link: https://lore.kernel.org/r/20230331160914.1608208-1-dhowells@redhat.com/ # v3
Link: https://lore.kernel.org/r/20230405165339.3468808-1-dhowells@redhat.com/ # v4
Link: https://lore.kernel.org/r/20230406094245.3633290-1-dhowells@redhat.com/ # v5
Link: https://lore.kernel.org/r/20230411160902.4134381-1-dhowells@redhat.com/ # v6
Link: https://lore.kernel.org/r/20230515093345.396978-1-dhowells@redhat.com/ # v7
Link: https://lore.kernel.org/r/20230518113453.1350757-1-dhowells@redhat.com/ # v8
====================

Link: https://lore.kernel.org/r/20230522121125.2595254-1-dhowells@redhat.comSigned-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parents 57910a47 57d44a35
......@@ -312,7 +312,7 @@ static int siw_tx_ctrl(struct siw_iwarp_tx *c_tx, struct socket *s,
}
/*
* 0copy TCP transmit interface: Use do_tcp_sendpages.
* 0copy TCP transmit interface: Use MSG_SPLICE_PAGES.
*
* Using sendpage to push page by page appears to be less efficient
* than using sendmsg, even if data are copied.
......@@ -323,20 +323,27 @@ static int siw_tx_ctrl(struct siw_iwarp_tx *c_tx, struct socket *s,
static int siw_tcp_sendpages(struct socket *s, struct page **page, int offset,
size_t size)
{
struct bio_vec bvec;
struct msghdr msg = {
.msg_flags = (MSG_MORE | MSG_DONTWAIT | MSG_SENDPAGE_NOTLAST |
MSG_SPLICE_PAGES),
};
struct sock *sk = s->sk;
int i = 0, rv = 0, sent = 0,
flags = MSG_MORE | MSG_DONTWAIT | MSG_SENDPAGE_NOTLAST;
int i = 0, rv = 0, sent = 0;
while (size) {
size_t bytes = min_t(size_t, PAGE_SIZE - offset, size);
if (size + offset <= PAGE_SIZE)
flags = MSG_MORE | MSG_DONTWAIT;
msg.msg_flags &= ~MSG_SENDPAGE_NOTLAST;
tcp_rate_check_app_limited(sk);
bvec_set_page(&bvec, page[i], bytes, offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
try_page_again:
lock_sock(sk);
rv = do_tcp_sendpages(sk, page[i], offset, bytes, flags);
rv = tcp_sendmsg_locked(sk, &msg, size);
release_sock(sk);
if (rv > 0) {
......
......@@ -1383,7 +1383,7 @@ static inline int skb_pad(struct sk_buff *skb, int pad)
#define dev_kfree_skb(a) consume_skb(a)
int skb_append_pagefrags(struct sk_buff *skb, struct page *page,
int offset, size_t size);
int offset, size_t size, size_t max_frags);
struct skb_seq_state {
__u32 lower_offset;
......@@ -5097,5 +5097,8 @@ static inline void skb_mark_for_recycle(struct sk_buff *skb)
#endif
}
ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
ssize_t maxsize, gfp_t gfp);
#endif /* __KERNEL__ */
#endif /* _LINUX_SKBUFF_H */
......@@ -327,6 +327,7 @@ struct ucred {
*/
#define MSG_ZEROCOPY 0x4000000 /* Use user data in kernel path */
#define MSG_SPLICE_PAGES 0x8000000 /* Splice the pages from the iterator in sendmsg() */
#define MSG_FASTOPEN 0x20000000 /* Send data in TCP SYN */
#define MSG_CMSG_CLOEXEC 0x40000000 /* Set close_on_exec for file
descriptor received through
......@@ -337,6 +338,8 @@ struct ucred {
#define MSG_CMSG_COMPAT 0 /* We never have 32 bit fixups */
#endif
/* Flags to be cleared on entry by sendmsg and sendmmsg syscalls */
#define MSG_INTERNAL_SENDMSG_FLAGS (MSG_SPLICE_PAGES)
/* Setsockoptions(2) level. Thanks to BSD these must match IPPROTO_xxx */
#define SOL_IP 0
......
......@@ -220,8 +220,6 @@ int ip_append_data(struct sock *sk, struct flowi4 *fl4,
unsigned int flags);
int ip_generic_getfrag(void *from, char *to, int offset, int len, int odd,
struct sk_buff *skb);
ssize_t ip_append_page(struct sock *sk, struct flowi4 *fl4, struct page *page,
int offset, size_t size, int flags);
struct sk_buff *__ip_make_skb(struct sock *sk, struct flowi4 *fl4,
struct sk_buff_head *queue,
struct inet_cork *cork);
......
......@@ -333,8 +333,6 @@ int tcp_sendpage(struct sock *sk, struct page *page, int offset, size_t size,
int flags);
int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
size_t size, int flags);
ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
size_t size, int flags);
int tcp_send_mss(struct sock *sk, int *size_goal, int flags);
void tcp_push(struct sock *sk, int flags, int mss_now, int nonagle,
int size_goal);
......
......@@ -258,7 +258,7 @@ struct tls_context {
struct scatterlist *partially_sent_record;
u16 partially_sent_offset;
bool in_tcp_sendpages;
bool splicing_pages;
bool pending_open_record_frags;
struct mutex tx_lock; /* protects partially_sent_* fields and
......
......@@ -389,6 +389,7 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
if (flags & MSG_WAITALL)
min_ret = iov_iter_count(&msg.msg_iter);
flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
msg.msg_flags = flags;
ret = sock_sendmsg(sock, &msg);
if (ret < min_ret) {
......@@ -1136,6 +1137,7 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
msg_flags |= MSG_DONTWAIT;
if (msg_flags & MSG_WAITALL)
min_ret = iov_iter_count(&msg.msg_iter);
msg_flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
msg.msg_flags = msg_flags;
msg.msg_ubuf = &io_notif_to_data(zc->notif)->uarg;
......
......@@ -4188,13 +4188,13 @@ unsigned int skb_find_text(struct sk_buff *skb, unsigned int from,
EXPORT_SYMBOL(skb_find_text);
int skb_append_pagefrags(struct sk_buff *skb, struct page *page,
int offset, size_t size)
int offset, size_t size, size_t max_frags)
{
int i = skb_shinfo(skb)->nr_frags;
if (skb_can_coalesce(skb, i, page, offset)) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], size);
} else if (i < MAX_SKB_FRAGS) {
} else if (i < max_frags) {
skb_zcopy_downgrade_managed(skb);
get_page(page);
skb_fill_page_desc_noacc(skb, i, page, offset, size);
......@@ -6892,3 +6892,91 @@ nodefer: __kfree_skb(skb);
if (unlikely(kick) && !cmpxchg(&sd->defer_ipi_scheduled, 0, 1))
smp_call_function_single_async(cpu, &sd->defer_csd);
}
static void skb_splice_csum_page(struct sk_buff *skb, struct page *page,
size_t offset, size_t len)
{
const char *kaddr;
__wsum csum;
kaddr = kmap_local_page(page);
csum = csum_partial(kaddr + offset, len, 0);
kunmap_local(kaddr);
skb->csum = csum_block_add(skb->csum, csum, skb->len);
}
/**
* skb_splice_from_iter - Splice (or copy) pages to skbuff
* @skb: The buffer to add pages to
* @iter: Iterator representing the pages to be added
* @maxsize: Maximum amount of pages to be added
* @gfp: Allocation flags
*
* This is a common helper function for supporting MSG_SPLICE_PAGES. It
* extracts pages from an iterator and adds them to the socket buffer if
* possible, copying them to fragments if not possible (such as if they're slab
* pages).
*
* Returns the amount of data spliced/copied or -EMSGSIZE if there's
* insufficient space in the buffer to transfer anything.
*/
ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
ssize_t maxsize, gfp_t gfp)
{
size_t frag_limit = READ_ONCE(sysctl_max_skb_frags);
struct page *pages[8], **ppages = pages;
ssize_t spliced = 0, ret = 0;
unsigned int i;
while (iter->count > 0) {
ssize_t space, nr;
size_t off, len;
ret = -EMSGSIZE;
space = frag_limit - skb_shinfo(skb)->nr_frags;
if (space < 0)
break;
/* We might be able to coalesce without increasing nr_frags */
nr = clamp_t(size_t, space, 1, ARRAY_SIZE(pages));
len = iov_iter_extract_pages(iter, &ppages, maxsize, nr, 0, &off);
if (len <= 0) {
ret = len ?: -EIO;
break;
}
i = 0;
do {
struct page *page = pages[i++];
size_t part = min_t(size_t, PAGE_SIZE - off, len);
ret = -EIO;
if (WARN_ON_ONCE(!sendpage_ok(page)))
goto out;
ret = skb_append_pagefrags(skb, page, off, part,
frag_limit);
if (ret < 0) {
iov_iter_revert(iter, len);
goto out;
}
if (skb->ip_summed == CHECKSUM_NONE)
skb_splice_csum_page(skb, page, off, part);
off = 0;
spliced += part;
maxsize -= part;
len -= part;
} while (len > 0);
if (maxsize <= 0)
break;
}
out:
skb_len_add(skb, spliced);
return spliced ?: ret;
}
EXPORT_SYMBOL(skb_splice_from_iter);
......@@ -946,17 +946,6 @@ ip_generic_getfrag(void *from, char *to, int offset, int len, int odd, struct sk
}
EXPORT_SYMBOL(ip_generic_getfrag);
static inline __wsum
csum_page(struct page *page, int offset, int copy)
{
char *kaddr;
__wsum csum;
kaddr = kmap(page);
csum = csum_partial(kaddr + offset, copy, 0);
kunmap(page);
return csum;
}
static int __ip_append_data(struct sock *sk,
struct flowi4 *fl4,
struct sk_buff_head *queue,
......@@ -1048,6 +1037,14 @@ static int __ip_append_data(struct sock *sk,
skb_zcopy_set(skb, uarg, &extra_uref);
}
}
} else if ((flags & MSG_SPLICE_PAGES) && length) {
if (inet->hdrincl)
return -EPERM;
if (rt->dst.dev->features & NETIF_F_SG)
/* We need an empty buffer to attach stuff to */
paged = true;
else
flags &= ~MSG_SPLICE_PAGES;
}
cork->length += length;
......@@ -1207,6 +1204,15 @@ static int __ip_append_data(struct sock *sk,
err = -EFAULT;
goto error;
}
} else if (flags & MSG_SPLICE_PAGES) {
struct msghdr *msg = from;
err = skb_splice_from_iter(skb, &msg->msg_iter, copy,
sk->sk_allocation);
if (err < 0)
goto error;
copy = err;
wmem_alloc_delta += copy;
} else if (!zc) {
int i = skb_shinfo(skb)->nr_frags;
......@@ -1310,10 +1316,10 @@ static int ip_setup_cork(struct sock *sk, struct inet_cork *cork,
}
/*
* ip_append_data() and ip_append_page() can make one large IP datagram
* from many pieces of data. Each pieces will be holded on the socket
* until ip_push_pending_frames() is called. Each piece can be a page
* or non-page data.
* ip_append_data() can make one large IP datagram from many pieces of
* data. Each piece will be held on the socket until
* ip_push_pending_frames() is called. Each piece can be a page or
* non-page data.
*
* Not only UDP, other transport protocols - e.g. raw sockets - can use
* this interface potentially.
......@@ -1346,134 +1352,6 @@ int ip_append_data(struct sock *sk, struct flowi4 *fl4,
from, length, transhdrlen, flags);
}
ssize_t ip_append_page(struct sock *sk, struct flowi4 *fl4, struct page *page,
int offset, size_t size, int flags)
{
struct inet_sock *inet = inet_sk(sk);
struct sk_buff *skb;
struct rtable *rt;
struct ip_options *opt = NULL;
struct inet_cork *cork;
int hh_len;
int mtu;
int len;
int err;
unsigned int maxfraglen, fragheaderlen, fraggap, maxnonfragsize;
if (inet->hdrincl)
return -EPERM;
if (flags&MSG_PROBE)
return 0;
if (skb_queue_empty(&sk->sk_write_queue))
return -EINVAL;
cork = &inet->cork.base;
rt = (struct rtable *)cork->dst;
if (cork->flags & IPCORK_OPT)
opt = cork->opt;
if (!(rt->dst.dev->features & NETIF_F_SG))
return -EOPNOTSUPP;
hh_len = LL_RESERVED_SPACE(rt->dst.dev);
mtu = cork->gso_size ? IP_MAX_MTU : cork->fragsize;
fragheaderlen = sizeof(struct iphdr) + (opt ? opt->optlen : 0);
maxfraglen = ((mtu - fragheaderlen) & ~7) + fragheaderlen;
maxnonfragsize = ip_sk_ignore_df(sk) ? 0xFFFF : mtu;
if (cork->length + size > maxnonfragsize - fragheaderlen) {
ip_local_error(sk, EMSGSIZE, fl4->daddr, inet->inet_dport,
mtu - (opt ? opt->optlen : 0));
return -EMSGSIZE;
}
skb = skb_peek_tail(&sk->sk_write_queue);
if (!skb)
return -EINVAL;
cork->length += size;
while (size > 0) {
/* Check if the remaining data fits into current packet. */
len = mtu - skb->len;
if (len < size)
len = maxfraglen - skb->len;
if (len <= 0) {
struct sk_buff *skb_prev;
int alloclen;
skb_prev = skb;
fraggap = skb_prev->len - maxfraglen;
alloclen = fragheaderlen + hh_len + fraggap + 15;
skb = sock_wmalloc(sk, alloclen, 1, sk->sk_allocation);
if (unlikely(!skb)) {
err = -ENOBUFS;
goto error;
}
/*
* Fill in the control structures
*/
skb->ip_summed = CHECKSUM_NONE;
skb->csum = 0;
skb_reserve(skb, hh_len);
/*
* Find where to start putting bytes.
*/
skb_put(skb, fragheaderlen + fraggap);
skb_reset_network_header(skb);
skb->transport_header = (skb->network_header +
fragheaderlen);
if (fraggap) {
skb->csum = skb_copy_and_csum_bits(skb_prev,
maxfraglen,
skb_transport_header(skb),
fraggap);
skb_prev->csum = csum_sub(skb_prev->csum,
skb->csum);
pskb_trim_unique(skb_prev, maxfraglen);
}
/*
* Put the packet on the pending queue.
*/
__skb_queue_tail(&sk->sk_write_queue, skb);
continue;
}
if (len > size)
len = size;
if (skb_append_pagefrags(skb, page, offset, len)) {
err = -EMSGSIZE;
goto error;
}
if (skb->ip_summed == CHECKSUM_NONE) {
__wsum csum;
csum = csum_page(page, offset, len);
skb->csum = csum_block_add(skb->csum, csum, skb->len);
}
skb_len_add(skb, len);
refcount_add(len, &sk->sk_wmem_alloc);
offset += len;
size -= len;
}
return 0;
error:
cork->length -= size;
IP_INC_STATS(sock_net(sk), IPSTATS_MIB_OUTDISCARDS);
return err;
}
static void ip_cork_release(struct inet_cork *cork)
{
cork->flags &= ~IPCORK_OPT;
......
......@@ -974,175 +974,24 @@ static int tcp_wmem_schedule(struct sock *sk, int copy)
return min(copy, sk->sk_forward_alloc);
}
static struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags,
struct page *page, int offset, size_t *size)
{
struct sk_buff *skb = tcp_write_queue_tail(sk);
struct tcp_sock *tp = tcp_sk(sk);
bool can_coalesce;
int copy, i;
if (!skb || (copy = size_goal - skb->len) <= 0 ||
!tcp_skb_can_collapse_to(skb)) {
new_segment:
if (!sk_stream_memory_free(sk))
return NULL;
skb = tcp_stream_alloc_skb(sk, 0, sk->sk_allocation,
tcp_rtx_and_write_queues_empty(sk));
if (!skb)
return NULL;
#ifdef CONFIG_TLS_DEVICE
skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED);
#endif
tcp_skb_entail(sk, skb);
copy = size_goal;
}
if (copy > *size)
copy = *size;
i = skb_shinfo(skb)->nr_frags;
can_coalesce = skb_can_coalesce(skb, i, page, offset);
if (!can_coalesce && i >= READ_ONCE(sysctl_max_skb_frags)) {
tcp_mark_push(tp, skb);
goto new_segment;
}
if (tcp_downgrade_zcopy_pure(sk, skb))
return NULL;
copy = tcp_wmem_schedule(sk, copy);
if (!copy)
return NULL;
if (can_coalesce) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
} else {
get_page(page);
skb_fill_page_desc_noacc(skb, i, page, offset, copy);
}
if (!(flags & MSG_NO_SHARED_FRAGS))
skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
skb->len += copy;
skb->data_len += copy;
skb->truesize += copy;
sk_wmem_queued_add(sk, copy);
sk_mem_charge(sk, copy);
WRITE_ONCE(tp->write_seq, tp->write_seq + copy);
TCP_SKB_CB(skb)->end_seq += copy;
tcp_skb_pcount_set(skb, 0);
*size = copy;
return skb;
}
ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
struct tcp_sock *tp = tcp_sk(sk);
int mss_now, size_goal;
int err;
ssize_t copied;
long timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
if (IS_ENABLED(CONFIG_DEBUG_VM) &&
WARN_ONCE(!sendpage_ok(page),
"page must not be a Slab one and have page_count > 0"))
return -EINVAL;
/* Wait for a connection to finish. One exception is TCP Fast Open
* (passive side) where data is allowed to be sent before a connection
* is fully established.
*/
if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
!tcp_passive_fastopen(sk)) {
err = sk_stream_wait_connect(sk, &timeo);
if (err != 0)
goto out_err;
}
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
mss_now = tcp_send_mss(sk, &size_goal, flags);
copied = 0;
err = -EPIPE;
if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
goto out_err;
while (size > 0) {
struct sk_buff *skb;
size_t copy = size;
skb = tcp_build_frag(sk, size_goal, flags, page, offset, &copy);
if (!skb)
goto wait_for_space;
if (!copied)
TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;
copied += copy;
offset += copy;
size -= copy;
if (!size)
goto out;
if (skb->len < size_goal || (flags & MSG_OOB))
continue;
if (forced_push(tp)) {
tcp_mark_push(tp, skb);
__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
} else if (skb == tcp_send_head(sk))
tcp_push_one(sk, mss_now);
continue;
wait_for_space:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
tcp_push(sk, flags & ~MSG_MORE, mss_now,
TCP_NAGLE_PUSH, size_goal);
err = sk_stream_wait_memory(sk, &timeo);
if (err != 0)
goto do_error;
mss_now = tcp_send_mss(sk, &size_goal, flags);
}
out:
if (copied) {
tcp_tx_timestamp(sk, sk->sk_tsflags);
if (!(flags & MSG_SENDPAGE_NOTLAST))
tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
}
return copied;
do_error:
tcp_remove_empty_skb(sk);
if (copied)
goto out;
out_err:
/* make sure we wake any epoll edge trigger waiter */
if (unlikely(tcp_rtx_and_write_queues_empty(sk) && err == -EAGAIN)) {
sk->sk_write_space(sk);
tcp_chrono_stop(sk, TCP_CHRONO_SNDBUF_LIMITED);
}
return sk_stream_error(sk, flags, err);
}
EXPORT_SYMBOL_GPL(do_tcp_sendpages);
int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
struct bio_vec bvec;
struct msghdr msg = { .msg_flags = flags | MSG_SPLICE_PAGES, };
if (!(sk->sk_route_caps & NETIF_F_SG))
return sock_no_sendpage_locked(sk, page, offset, size, flags);
tcp_rate_check_app_limited(sk); /* is sending application-limited? */
return do_tcp_sendpages(sk, page, offset, size, flags);
bvec_set_page(&bvec, page, size, offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
if (flags & MSG_SENDPAGE_NOTLAST)
msg.msg_flags |= MSG_MORE;
return tcp_sendmsg_locked(sk, &msg, size);
}
EXPORT_SYMBOL_GPL(tcp_sendpage_locked);
......@@ -1223,7 +1072,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
int flags, err, copied = 0;
int mss_now = 0, size_goal, copied_syn = 0;
int process_backlog = 0;
bool zc = false;
int zc = 0;
long timeo;
flags = msg->msg_flags;
......@@ -1231,7 +1080,8 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
if ((flags & MSG_ZEROCOPY) && size) {
if (msg->msg_ubuf) {
uarg = msg->msg_ubuf;
zc = sk->sk_route_caps & NETIF_F_SG;
if (sk->sk_route_caps & NETIF_F_SG)
zc = MSG_ZEROCOPY;
} else if (sock_flag(sk, SOCK_ZEROCOPY)) {
skb = tcp_write_queue_tail(sk);
uarg = msg_zerocopy_realloc(sk, size, skb_zcopy(skb));
......@@ -1239,10 +1089,14 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
err = -ENOBUFS;
goto out_err;
}
zc = sk->sk_route_caps & NETIF_F_SG;
if (!zc)
if (sk->sk_route_caps & NETIF_F_SG)
zc = MSG_ZEROCOPY;
else
uarg_to_msgzc(uarg)->zerocopy = 0;
}
} else if (unlikely(msg->msg_flags & MSG_SPLICE_PAGES) && size) {
if (sk->sk_route_caps & NETIF_F_SG)
zc = MSG_SPLICE_PAGES;
}
if (unlikely(flags & MSG_FASTOPEN || inet_sk(sk)->defer_connect) &&
......@@ -1305,7 +1159,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
goto do_error;
while (msg_data_left(msg)) {
int copy = 0;
ssize_t copy = 0;
skb = tcp_write_queue_tail(sk);
if (skb)
......@@ -1346,7 +1200,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
if (copy > msg_data_left(msg))
copy = msg_data_left(msg);
if (!zc) {
if (zc == 0) {
bool merge = true;
int i = skb_shinfo(skb)->nr_frags;
struct page_frag *pfrag = sk_page_frag(sk);
......@@ -1391,7 +1245,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
page_ref_inc(pfrag->page);
}
pfrag->offset += copy;
} else {
} else if (zc == MSG_ZEROCOPY) {
/* First append to a fragless skb builds initial
* pure zerocopy skb
*/
......@@ -1412,6 +1266,30 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
if (err < 0)
goto do_error;
copy = err;
} else if (zc == MSG_SPLICE_PAGES) {
/* Splice in data if we can; copy if we can't. */
if (tcp_downgrade_zcopy_pure(sk, skb))
goto wait_for_space;
copy = tcp_wmem_schedule(sk, copy);
if (!copy)
goto wait_for_space;
err = skb_splice_from_iter(skb, &msg->msg_iter, copy,
sk->sk_allocation);
if (err < 0) {
if (err == -EMSGSIZE) {
tcp_mark_push(tp, skb);
goto new_segment;
}
goto do_error;
}
copy = err;
if (!(flags & MSG_NO_SHARED_FRAGS))
skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
sk_wmem_queued_add(sk, copy);
sk_mem_charge(sk, copy);
}
if (!copied)
......
......@@ -72,11 +72,13 @@ static int tcp_bpf_push(struct sock *sk, struct sk_msg *msg, u32 apply_bytes,
{
bool apply = apply_bytes;
struct scatterlist *sge;
struct msghdr msghdr = { .msg_flags = flags | MSG_SPLICE_PAGES, };
struct page *page;
int size, ret = 0;
u32 off;
while (1) {
struct bio_vec bvec;
bool has_tx_ulp;
sge = sk_msg_elem(msg, msg->sg.start);
......@@ -88,16 +90,18 @@ static int tcp_bpf_push(struct sock *sk, struct sk_msg *msg, u32 apply_bytes,
tcp_rate_check_app_limited(sk);
retry:
has_tx_ulp = tls_sw_has_ctx_tx(sk);
if (has_tx_ulp) {
flags |= MSG_SENDPAGE_NOPOLICY;
ret = kernel_sendpage_locked(sk,
page, off, size, flags);
} else {
ret = do_tcp_sendpages(sk, page, off, size, flags);
}
if (has_tx_ulp)
msghdr.msg_flags |= MSG_SENDPAGE_NOPOLICY;
if (flags & MSG_SENDPAGE_NOTLAST)
msghdr.msg_flags |= MSG_MORE;
bvec_set_page(&bvec, page, size, off);
iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size);
ret = tcp_sendmsg_locked(sk, &msghdr, size);
if (ret <= 0)
return ret;
if (apply)
apply_bytes -= ret;
msg->sg.size -= ret;
......@@ -404,7 +408,7 @@ static int tcp_bpf_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
long timeo;
int flags;
/* Don't let internal do_tcp_sendpages() flags through */
/* Don't let internal sendpage flags through */
flags = (msg->msg_flags & ~MSG_SENDPAGE_DECRYPTED);
flags |= MSG_NO_SHARED_FRAGS;
......
......@@ -1332,54 +1332,15 @@ EXPORT_SYMBOL(udp_sendmsg);
int udp_sendpage(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
struct inet_sock *inet = inet_sk(sk);
struct udp_sock *up = udp_sk(sk);
int ret;
struct bio_vec bvec;
struct msghdr msg = { .msg_flags = flags | MSG_SPLICE_PAGES };
if (flags & MSG_SENDPAGE_NOTLAST)
flags |= MSG_MORE;
if (!up->pending) {
struct msghdr msg = { .msg_flags = flags|MSG_MORE };
/* Call udp_sendmsg to specify destination address which
* sendpage interface can't pass.
* This will succeed only when the socket is connected.
*/
ret = udp_sendmsg(sk, &msg, 0);
if (ret < 0)
return ret;
}
lock_sock(sk);
msg.msg_flags |= MSG_MORE;
if (unlikely(!up->pending)) {
release_sock(sk);
net_dbg_ratelimited("cork failed\n");
return -EINVAL;
}
ret = ip_append_page(sk, &inet->cork.fl.u.ip4,
page, offset, size, flags);
if (ret == -EOPNOTSUPP) {
release_sock(sk);
return sock_no_sendpage(sk->sk_socket, page, offset,
size, flags);
}
if (ret < 0) {
udp_flush_pending_frames(sk);
goto out;
}
up->len += size;
if (!(READ_ONCE(up->corkflag) || (flags&MSG_MORE)))
ret = udp_push_pending_frames(sk);
if (!ret)
ret = size;
out:
release_sock(sk);
return ret;
bvec_set_page(&bvec, page, size, offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
return udp_sendmsg(sk, &msg, size);
}
#define UDP_SKB_IS_STATELESS 0x80000000
......
......@@ -1589,6 +1589,14 @@ static int __ip6_append_data(struct sock *sk,
skb_zcopy_set(skb, uarg, &extra_uref);
}
}
} else if ((flags & MSG_SPLICE_PAGES) && length) {
if (inet_sk(sk)->hdrincl)
return -EPERM;
if (rt->dst.dev->features & NETIF_F_SG)
/* We need an empty buffer to attach stuff to */
paged = true;
else
flags &= ~MSG_SPLICE_PAGES;
}
/*
......@@ -1778,6 +1786,15 @@ static int __ip6_append_data(struct sock *sk,
err = -EFAULT;
goto error;
}
} else if (flags & MSG_SPLICE_PAGES) {
struct msghdr *msg = from;
err = skb_splice_from_iter(skb, &msg->msg_iter, copy,
sk->sk_allocation);
if (err < 0)
goto error;
copy = err;
wmem_alloc_delta += copy;
} else if (!zc) {
int i = skb_shinfo(skb)->nr_frags;
......
......@@ -2138,6 +2138,7 @@ int __sys_sendto(int fd, void __user *buff, size_t len, unsigned int flags,
msg.msg_name = (struct sockaddr *)&address;
msg.msg_namelen = addr_len;
}
flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
if (sock->file->f_flags & O_NONBLOCK)
flags |= MSG_DONTWAIT;
msg.msg_flags = flags;
......@@ -2483,6 +2484,7 @@ static int ____sys_sendmsg(struct socket *sock, struct msghdr *msg_sys,
msg_sys->msg_control = ctl_buf;
msg_sys->msg_control_is_user = false;
}
flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
msg_sys->msg_flags = flags;
if (sock->file->f_flags & O_NONBLOCK)
......
......@@ -125,7 +125,10 @@ int tls_push_sg(struct sock *sk,
u16 first_offset,
int flags)
{
int sendpage_flags = flags | MSG_SENDPAGE_NOTLAST;
struct bio_vec bvec;
struct msghdr msg = {
.msg_flags = MSG_SENDPAGE_NOTLAST | MSG_SPLICE_PAGES | flags,
};
int ret = 0;
struct page *p;
size_t size;
......@@ -134,16 +137,19 @@ int tls_push_sg(struct sock *sk,
size = sg->length - offset;
offset += sg->offset;
ctx->in_tcp_sendpages = true;
ctx->splicing_pages = true;
while (1) {
if (sg_is_last(sg))
sendpage_flags = flags;
msg.msg_flags = flags;
/* is sending application-limited? */
tcp_rate_check_app_limited(sk);
p = sg_page(sg);
retry:
ret = do_tcp_sendpages(sk, p, offset, size, sendpage_flags);
bvec_set_page(&bvec, p, size, offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
ret = tcp_sendmsg_locked(sk, &msg, size);
if (ret != size) {
if (ret > 0) {
......@@ -155,7 +161,7 @@ int tls_push_sg(struct sock *sk,
offset -= sg->offset;
ctx->partially_sent_offset = offset;
ctx->partially_sent_record = (void *)sg;
ctx->in_tcp_sendpages = false;
ctx->splicing_pages = false;
return ret;
}
......@@ -169,7 +175,7 @@ int tls_push_sg(struct sock *sk,
size = sg->length;
}
ctx->in_tcp_sendpages = false;
ctx->splicing_pages = false;
return 0;
}
......@@ -247,11 +253,11 @@ static void tls_write_space(struct sock *sk)
{
struct tls_context *ctx = tls_get_ctx(sk);
/* If in_tcp_sendpages call lower protocol write space handler
/* If splicing_pages call lower protocol write space handler
* to ensure we wake up any waiting operations there. For example
* if do_tcp_sendpages where to call sk_wait_event.
* if splicing pages where to call sk_wait_event.
*/
if (ctx->in_tcp_sendpages) {
if (ctx->splicing_pages) {
ctx->sk_write_space(sk);
return;
}
......
......@@ -1839,24 +1839,6 @@ static void maybe_add_creds(struct sk_buff *skb, const struct socket *sock,
}
}
static int maybe_init_creds(struct scm_cookie *scm,
struct socket *socket,
const struct sock *other)
{
int err;
struct msghdr msg = { .msg_controllen = 0 };
err = scm_send(socket, &msg, scm, false);
if (err)
return err;
if (unix_passcred_enabled(socket, other)) {
scm->pid = get_pid(task_tgid(current));
current_uid_gid(&scm->creds.uid, &scm->creds.gid);
}
return err;
}
static bool unix_skb_scm_eq(struct sk_buff *skb,
struct scm_cookie *scm)
{
......@@ -2200,19 +2182,25 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
while (sent < len) {
size = len - sent;
/* Keep two messages in the pipe so it schedules better */
size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
if (unlikely(msg->msg_flags & MSG_SPLICE_PAGES)) {
skb = sock_alloc_send_pskb(sk, 0, 0,
msg->msg_flags & MSG_DONTWAIT,
&err, 0);
} else {
/* Keep two messages in the pipe so it schedules better */
size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
/* allow fallback to order-0 allocations */
size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
/* allow fallback to order-0 allocations */
size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
msg->msg_flags & MSG_DONTWAIT, &err,
get_order(UNIX_SKB_FRAGS_SZ));
skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
msg->msg_flags & MSG_DONTWAIT, &err,
get_order(UNIX_SKB_FRAGS_SZ));
}
if (!skb)
goto out_err;
......@@ -2224,13 +2212,24 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
}
fds_sent = true;
skb_put(skb, size - data_len);
skb->data_len = data_len;
skb->len = size;
err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
if (err) {
kfree_skb(skb);
goto out_err;
if (unlikely(msg->msg_flags & MSG_SPLICE_PAGES)) {
err = skb_splice_from_iter(skb, &msg->msg_iter, size,
sk->sk_allocation);
if (err < 0) {
kfree_skb(skb);
goto out_err;
}
size = err;
refcount_add(size, &sk->sk_wmem_alloc);
} else {
skb_put(skb, size - data_len);
skb->data_len = data_len;
skb->len = size;
err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
if (err) {
kfree_skb(skb);
goto out_err;
}
}
unix_state_lock(other);
......@@ -2275,117 +2274,15 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
static ssize_t unix_stream_sendpage(struct socket *socket, struct page *page,
int offset, size_t size, int flags)
{
int err;
bool send_sigpipe = false;
bool init_scm = true;
struct scm_cookie scm;
struct sock *other, *sk = socket->sk;
struct sk_buff *skb, *newskb = NULL, *tail = NULL;
if (flags & MSG_OOB)
return -EOPNOTSUPP;
other = unix_peer(sk);
if (!other || sk->sk_state != TCP_ESTABLISHED)
return -ENOTCONN;
if (false) {
alloc_skb:
unix_state_unlock(other);
mutex_unlock(&unix_sk(other)->iolock);
newskb = sock_alloc_send_pskb(sk, 0, 0, flags & MSG_DONTWAIT,
&err, 0);
if (!newskb)
goto err;
}
/* we must acquire iolock as we modify already present
* skbs in the sk_receive_queue and mess with skb->len
*/
err = mutex_lock_interruptible(&unix_sk(other)->iolock);
if (err) {
err = flags & MSG_DONTWAIT ? -EAGAIN : -ERESTARTSYS;
goto err;
}
if (sk->sk_shutdown & SEND_SHUTDOWN) {
err = -EPIPE;
send_sigpipe = true;
goto err_unlock;
}
unix_state_lock(other);
if (sock_flag(other, SOCK_DEAD) ||
other->sk_shutdown & RCV_SHUTDOWN) {
err = -EPIPE;
send_sigpipe = true;
goto err_state_unlock;
}
struct bio_vec bvec;
struct msghdr msg = { .msg_flags = flags | MSG_SPLICE_PAGES };
if (init_scm) {
err = maybe_init_creds(&scm, socket, other);
if (err)
goto err_state_unlock;
init_scm = false;
}
skb = skb_peek_tail(&other->sk_receive_queue);
if (tail && tail == skb) {
skb = newskb;
} else if (!skb || !unix_skb_scm_eq(skb, &scm)) {
if (newskb) {
skb = newskb;
} else {
tail = skb;
goto alloc_skb;
}
} else if (newskb) {
/* this is fast path, we don't necessarily need to
* call to kfree_skb even though with newskb == NULL
* this - does no harm
*/
consume_skb(newskb);
newskb = NULL;
}
if (flags & MSG_SENDPAGE_NOTLAST)
msg.msg_flags |= MSG_MORE;
if (skb_append_pagefrags(skb, page, offset, size)) {
tail = skb;
goto alloc_skb;
}
skb->len += size;
skb->data_len += size;
skb->truesize += size;
refcount_add(size, &sk->sk_wmem_alloc);
if (newskb) {
err = unix_scm_to_skb(&scm, skb, false);
if (err)
goto err_state_unlock;
spin_lock(&other->sk_receive_queue.lock);
__skb_queue_tail(&other->sk_receive_queue, newskb);
spin_unlock(&other->sk_receive_queue.lock);
}
unix_state_unlock(other);
mutex_unlock(&unix_sk(other)->iolock);
other->sk_data_ready(other);
scm_destroy(&scm);
return size;
err_state_unlock:
unix_state_unlock(other);
err_unlock:
mutex_unlock(&unix_sk(other)->iolock);
err:
kfree_skb(newskb);
if (send_sigpipe && !(flags & MSG_NOSIGNAL))
send_sig(SIGPIPE, current, 0);
if (!init_scm)
scm_destroy(&scm);
return err;
bvec_set_page(&bvec, page, size, offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
return unix_stream_sendmsg(socket, &msg, size);
}
static int unix_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
......
......@@ -205,14 +205,16 @@ static int espintcp_sendskb_locked(struct sock *sk, struct espintcp_msg *emsg,
static int espintcp_sendskmsg_locked(struct sock *sk,
struct espintcp_msg *emsg, int flags)
{
struct msghdr msghdr = { .msg_flags = flags | MSG_SPLICE_PAGES, };
struct sk_msg *skmsg = &emsg->skmsg;
struct scatterlist *sg;
int done = 0;
int ret;
flags |= MSG_SENDPAGE_NOTLAST;
msghdr.msg_flags |= MSG_SENDPAGE_NOTLAST;
sg = &skmsg->sg.data[skmsg->sg.start];
do {
struct bio_vec bvec;
size_t size = sg->length - emsg->offset;
int offset = sg->offset + emsg->offset;
struct page *p;
......@@ -220,11 +222,13 @@ static int espintcp_sendskmsg_locked(struct sock *sk,
emsg->offset = 0;
if (sg_is_last(sg))
flags &= ~MSG_SENDPAGE_NOTLAST;
msghdr.msg_flags &= ~MSG_SENDPAGE_NOTLAST;
p = sg_page(sg);
retry:
ret = do_tcp_sendpages(sk, p, offset, size, flags);
bvec_set_page(&bvec, p, size, offset);
iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size);
ret = tcp_sendmsg_locked(sk, &msghdr, size);
if (ret < 0) {
emsg->offset = offset - sg->offset;
skmsg->sg.start += done;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment