Commit 96fd26b9 authored by David S. Miller's avatar David S. Miller

Merge branch 'rds-assorted-bug-fixes'

Santosh Shilimkar says:

====================
RDS: Assorted bug fixes

We would like to improve RDS upstream support and in that context, I
started playing with it.  But run into number of issues including as
basic is RDS IB RDMA doesn't work. As part of the debug, I ended up
creating the $subject series which has bunch of assorted fixes. At
least with this series I can run RDS IB RDMA and other tests
successfully.

Some of these fixes have been done by Chris Meson, Andy Grover and
Zach Brown while at Oracle. There are still more kinks with FMR and
error handling and I plan to address them in a follow up series.

Series generated against Linus's master(v4.2-rc-7) but also applies
against next-next cleanly. Its tested on Oracle hardware with IB
fabric for both bcopy as well as RDMA mode. I don't have access
to iWARP hardware so any testing help on iWARP hardware appreciated.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 43e122b0 ae05368a
...@@ -438,6 +438,14 @@ static const struct proto_ops rds_proto_ops = { ...@@ -438,6 +438,14 @@ static const struct proto_ops rds_proto_ops = {
.sendpage = sock_no_sendpage, .sendpage = sock_no_sendpage,
}; };
static void rds_sock_destruct(struct sock *sk)
{
struct rds_sock *rs = rds_sk_to_rs(sk);
WARN_ON((&rs->rs_item != rs->rs_item.next ||
&rs->rs_item != rs->rs_item.prev));
}
static int __rds_create(struct socket *sock, struct sock *sk, int protocol) static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
{ {
struct rds_sock *rs; struct rds_sock *rs;
...@@ -445,6 +453,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) ...@@ -445,6 +453,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
sock_init_data(sock, sk); sock_init_data(sock, sk);
sock->ops = &rds_proto_ops; sock->ops = &rds_proto_ops;
sk->sk_protocol = protocol; sk->sk_protocol = protocol;
sk->sk_destruct = rds_sock_destruct;
rs = rds_sk_to_rs(sk); rs = rds_sk_to_rs(sk);
spin_lock_init(&rs->rs_lock); spin_lock_init(&rs->rs_lock);
......
...@@ -301,6 +301,8 @@ void rds_conn_shutdown(struct rds_connection *conn) ...@@ -301,6 +301,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
wait_event(conn->c_waitq, wait_event(conn->c_waitq,
!test_bit(RDS_IN_XMIT, &conn->c_flags)); !test_bit(RDS_IN_XMIT, &conn->c_flags));
wait_event(conn->c_waitq,
!test_bit(RDS_RECV_REFILL, &conn->c_flags));
conn->c_trans->conn_shutdown(conn); conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn); rds_conn_reset(conn);
......
...@@ -320,7 +320,7 @@ void rds_ib_recv_exit(void); ...@@ -320,7 +320,7 @@ void rds_ib_recv_exit(void);
int rds_ib_recv(struct rds_connection *conn); int rds_ib_recv(struct rds_connection *conn);
int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
void rds_ib_recv_refill(struct rds_connection *conn, int prefill); void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
void rds_ib_inc_free(struct rds_incoming *inc); void rds_ib_inc_free(struct rds_incoming *inc);
int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
......
...@@ -135,7 +135,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even ...@@ -135,7 +135,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
rds_ib_recv_init_ring(ic); rds_ib_recv_init_ring(ic);
/* Post receive buffers - as a side effect, this will update /* Post receive buffers - as a side effect, this will update
* the posted credit count. */ * the posted credit count. */
rds_ib_recv_refill(conn, 1); rds_ib_recv_refill(conn, 1, GFP_KERNEL);
/* Tune RNR behavior */ /* Tune RNR behavior */
rds_ib_tune_rnr(ic, &qp_attr); rds_ib_tune_rnr(ic, &qp_attr);
...@@ -640,6 +640,15 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ...@@ -640,6 +640,15 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
(atomic_read(&ic->i_signaled_sends) == 0)); (atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_recv_tasklet); tasklet_kill(&ic->i_recv_tasklet);
/* first destroy the ib state that generates callbacks */
if (ic->i_cm_id->qp)
rdma_destroy_qp(ic->i_cm_id);
if (ic->i_send_cq)
ib_destroy_cq(ic->i_send_cq);
if (ic->i_recv_cq)
ib_destroy_cq(ic->i_recv_cq);
/* then free the resources that ib callbacks use */
if (ic->i_send_hdrs) if (ic->i_send_hdrs)
ib_dma_free_coherent(dev, ib_dma_free_coherent(dev,
ic->i_send_ring.w_nr * ic->i_send_ring.w_nr *
...@@ -663,12 +672,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ...@@ -663,12 +672,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
if (ic->i_recvs) if (ic->i_recvs)
rds_ib_recv_clear_ring(ic); rds_ib_recv_clear_ring(ic);
if (ic->i_cm_id->qp)
rdma_destroy_qp(ic->i_cm_id);
if (ic->i_send_cq)
ib_destroy_cq(ic->i_send_cq);
if (ic->i_recv_cq)
ib_destroy_cq(ic->i_recv_cq);
rdma_destroy_id(ic->i_cm_id); rdma_destroy_id(ic->i_cm_id);
/* /*
......
...@@ -151,12 +151,17 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) ...@@ -151,12 +151,17 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
struct rds_ib_device *rds_ibdev_old; struct rds_ib_device *rds_ibdev_old;
rds_ibdev_old = rds_ib_get_device(ipaddr); rds_ibdev_old = rds_ib_get_device(ipaddr);
if (rds_ibdev_old) { if (!rds_ibdev_old)
return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
if (rds_ibdev_old != rds_ibdev) {
rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
rds_ib_dev_put(rds_ibdev_old); rds_ib_dev_put(rds_ibdev_old);
return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
} }
rds_ib_dev_put(rds_ibdev_old);
return rds_ib_add_ipaddr(rds_ibdev, ipaddr); return 0;
} }
void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
...@@ -485,7 +490,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr) ...@@ -485,7 +490,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
/* FIXME we need a way to tell a r/w MR /* FIXME we need a way to tell a r/w MR
* from a r/o MR */ * from a r/o MR */
BUG_ON(irqs_disabled()); WARN_ON(!page->mapping && irqs_disabled());
set_page_dirty(page); set_page_dirty(page);
put_page(page); put_page(page);
} }
......
...@@ -297,7 +297,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic ...@@ -297,7 +297,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
} }
static int rds_ib_recv_refill_one(struct rds_connection *conn, static int rds_ib_recv_refill_one(struct rds_connection *conn,
struct rds_ib_recv_work *recv, int prefill) struct rds_ib_recv_work *recv, gfp_t gfp)
{ {
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct ib_sge *sge; struct ib_sge *sge;
...@@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn, ...@@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
gfp_t slab_mask = GFP_NOWAIT; gfp_t slab_mask = GFP_NOWAIT;
gfp_t page_mask = GFP_NOWAIT; gfp_t page_mask = GFP_NOWAIT;
if (prefill) { if (gfp & __GFP_WAIT) {
slab_mask = GFP_KERNEL; slab_mask = GFP_KERNEL;
page_mask = GFP_HIGHUSER; page_mask = GFP_HIGHUSER;
} }
...@@ -347,6 +347,24 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn, ...@@ -347,6 +347,24 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
return ret; return ret;
} }
static int acquire_refill(struct rds_connection *conn)
{
return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0;
}
static void release_refill(struct rds_connection *conn)
{
clear_bit(RDS_RECV_REFILL, &conn->c_flags);
/* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
* hot path and finding waiters is very rare. We don't want to walk
* the system-wide hashed waitqueue buckets in the fast path only to
* almost never find waiters.
*/
if (waitqueue_active(&conn->c_waitq))
wake_up_all(&conn->c_waitq);
}
/* /*
* This tries to allocate and post unused work requests after making sure that * This tries to allocate and post unused work requests after making sure that
* they have all the allocations they need to queue received fragments into * they have all the allocations they need to queue received fragments into
...@@ -354,15 +372,23 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn, ...@@ -354,15 +372,23 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
* *
* -1 is returned if posting fails due to temporary resource exhaustion. * -1 is returned if posting fails due to temporary resource exhaustion.
*/ */
void rds_ib_recv_refill(struct rds_connection *conn, int prefill) void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
{ {
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_recv_work *recv; struct rds_ib_recv_work *recv;
struct ib_recv_wr *failed_wr; struct ib_recv_wr *failed_wr;
unsigned int posted = 0; unsigned int posted = 0;
int ret = 0; int ret = 0;
int can_wait = gfp & __GFP_WAIT;
u32 pos; u32 pos;
/* the goal here is to just make sure that someone, somewhere
* is posting buffers. If we can't get the refill lock,
* let them do their thing
*/
if (!acquire_refill(conn))
return;
while ((prefill || rds_conn_up(conn)) && while ((prefill || rds_conn_up(conn)) &&
rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) { rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
if (pos >= ic->i_recv_ring.w_nr) { if (pos >= ic->i_recv_ring.w_nr) {
...@@ -372,7 +398,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill) ...@@ -372,7 +398,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
} }
recv = &ic->i_recvs[pos]; recv = &ic->i_recvs[pos];
ret = rds_ib_recv_refill_one(conn, recv, prefill); ret = rds_ib_recv_refill_one(conn, recv, gfp);
if (ret) { if (ret) {
break; break;
} }
...@@ -402,6 +428,24 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill) ...@@ -402,6 +428,24 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
if (ret) if (ret)
rds_ib_ring_unalloc(&ic->i_recv_ring, 1); rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
release_refill(conn);
/* if we're called from the softirq handler, we'll be GFP_NOWAIT.
* in this case the ring being low is going to lead to more interrupts
* and we can safely let the softirq code take care of it unless the
* ring is completely empty.
*
* if we're called from krdsd, we'll be GFP_KERNEL. In this case
* we might have raced with the softirq code while we had the refill
* lock held. Use rds_ib_ring_low() instead of ring_empty to decide
* if we should requeue.
*/
if (rds_conn_up(conn) &&
((can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
rds_ib_ring_empty(&ic->i_recv_ring))) {
queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
}
} }
/* /*
...@@ -982,10 +1026,17 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic, ...@@ -982,10 +1026,17 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic,
} }
/* /*
* It's very important that we only free this ring entry if we've truly * rds_ib_process_recv() doesn't always consume the frag, and
* freed the resources allocated to the entry. The refilling path can * we might not have called it at all if the wc didn't indicate
* leak if we don't. * success. We already unmapped the frag's pages, though, and
* the following rds_ib_ring_free() call tells the refill path
* that it will not find an allocated frag here. Make sure we
* keep that promise by freeing a frag that's still on the ring.
*/ */
if (recv->r_frag) {
rds_ib_frag_free(ic, recv->r_frag);
recv->r_frag = NULL;
}
rds_ib_ring_free(&ic->i_recv_ring, 1); rds_ib_ring_free(&ic->i_recv_ring, 1);
} }
} }
...@@ -1016,7 +1067,7 @@ void rds_ib_recv_tasklet_fn(unsigned long data) ...@@ -1016,7 +1067,7 @@ void rds_ib_recv_tasklet_fn(unsigned long data)
rds_ib_stats_inc(s_ib_rx_ring_empty); rds_ib_stats_inc(s_ib_rx_ring_empty);
if (rds_ib_ring_low(&ic->i_recv_ring)) if (rds_ib_ring_low(&ic->i_recv_ring))
rds_ib_recv_refill(conn, 0); rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
} }
int rds_ib_recv(struct rds_connection *conn) int rds_ib_recv(struct rds_connection *conn)
...@@ -1025,8 +1076,10 @@ int rds_ib_recv(struct rds_connection *conn) ...@@ -1025,8 +1076,10 @@ int rds_ib_recv(struct rds_connection *conn)
int ret = 0; int ret = 0;
rdsdebug("conn %p\n", conn); rdsdebug("conn %p\n", conn);
if (rds_conn_up(conn)) if (rds_conn_up(conn)) {
rds_ib_attempt_ack(ic); rds_ib_attempt_ack(ic);
rds_ib_recv_refill(conn, 0, GFP_KERNEL);
}
return ret; return ret;
} }
......
...@@ -709,6 +709,11 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -709,6 +709,11 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (scat == &rm->data.op_sg[rm->data.op_count]) { if (scat == &rm->data.op_sg[rm->data.op_count]) {
prev->s_op = ic->i_data_op; prev->s_op = ic->i_data_op;
prev->s_wr.send_flags |= IB_SEND_SOLICITED; prev->s_wr.send_flags |= IB_SEND_SOLICITED;
if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
prev->s_wr.send_flags |= IB_SEND_SIGNALED;
nr_sig++;
}
ic->i_data_op = NULL; ic->i_data_op = NULL;
} }
......
...@@ -451,7 +451,7 @@ void rds_rdma_free_op(struct rm_rdma_op *ro) ...@@ -451,7 +451,7 @@ void rds_rdma_free_op(struct rm_rdma_op *ro)
* is the case for a RDMA_READ which copies from remote * is the case for a RDMA_READ which copies from remote
* to local memory */ * to local memory */
if (!ro->op_write) { if (!ro->op_write) {
BUG_ON(irqs_disabled()); WARN_ON(!page->mapping && irqs_disabled());
set_page_dirty(page); set_page_dirty(page);
} }
put_page(page); put_page(page);
...@@ -658,6 +658,8 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, ...@@ -658,6 +658,8 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write); ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
if (ret < 0) if (ret < 0)
goto out; goto out;
else
ret = 0;
rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n", rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
nr_bytes, nr, iov->bytes, iov->addr); nr_bytes, nr, iov->bytes, iov->addr);
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <rdma/rdma_cm.h> #include <rdma/rdma_cm.h>
#include "rdma_transport.h" #include "rdma_transport.h"
#include "ib.h"
static struct rdma_cm_id *rds_rdma_listen_id; static struct rdma_cm_id *rds_rdma_listen_id;
...@@ -82,8 +83,18 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, ...@@ -82,8 +83,18 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
break; break;
case RDMA_CM_EVENT_ROUTE_RESOLVED: case RDMA_CM_EVENT_ROUTE_RESOLVED:
/* XXX worry about racing with listen acceptance */ /* Connection could have been dropped so make sure the
* cm_id is valid before proceeding
*/
if (conn) {
struct rds_ib_connection *ibic;
ibic = conn->c_transport_data;
if (ibic && ibic->i_cm_id == cm_id)
ret = trans->cm_initiate_connect(cm_id); ret = trans->cm_initiate_connect(cm_id);
else
rds_conn_drop(conn);
}
break; break;
case RDMA_CM_EVENT_ESTABLISHED: case RDMA_CM_EVENT_ESTABLISHED:
......
...@@ -80,6 +80,7 @@ enum { ...@@ -80,6 +80,7 @@ enum {
#define RDS_LL_SEND_FULL 0 #define RDS_LL_SEND_FULL 0
#define RDS_RECONNECT_PENDING 1 #define RDS_RECONNECT_PENDING 1
#define RDS_IN_XMIT 2 #define RDS_IN_XMIT 2
#define RDS_RECV_REFILL 3
struct rds_connection { struct rds_connection {
struct hlist_node c_hash_node; struct hlist_node c_hash_node;
......
...@@ -282,26 +282,34 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -282,26 +282,34 @@ int rds_send_xmit(struct rds_connection *conn)
/* The transport either sends the whole rdma or none of it */ /* The transport either sends the whole rdma or none of it */
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
rm->m_final_op = &rm->rdma; rm->m_final_op = &rm->rdma;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
*/
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
if (ret) if (ret) {
clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
wake_up_interruptible(&rm->m_flush_wait);
break; break;
}
conn->c_xmit_rdma_sent = 1; conn->c_xmit_rdma_sent = 1;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
} }
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
rm->m_final_op = &rm->atomic; rm->m_final_op = &rm->atomic;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
*/
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
if (ret) if (ret) {
clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
wake_up_interruptible(&rm->m_flush_wait);
break; break;
}
conn->c_xmit_atomic_sent = 1; conn->c_xmit_atomic_sent = 1;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
} }
/* /*
...@@ -411,7 +419,8 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -411,7 +419,8 @@ int rds_send_xmit(struct rds_connection *conn)
*/ */
if (ret == 0) { if (ret == 0) {
smp_mb(); smp_mb();
if (!list_empty(&conn->c_send_queue) && if ((test_bit(0, &conn->c_map_queued) ||
!list_empty(&conn->c_send_queue)) &&
send_gen == conn->c_send_gen) { send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
goto restart; goto restart;
...@@ -769,8 +778,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) ...@@ -769,8 +778,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
while (!list_empty(&list)) { while (!list_empty(&list)) {
rm = list_entry(list.next, struct rds_message, m_sock_item); rm = list_entry(list.next, struct rds_message, m_sock_item);
list_del_init(&rm->m_sock_item); list_del_init(&rm->m_sock_item);
rds_message_wait(rm); rds_message_wait(rm);
/* just in case the code above skipped this message
* because RDS_MSG_ON_CONN wasn't set, run it again here
* taking m_rs_lock is the only thing that keeps us
* from racing with ack processing.
*/
spin_lock_irqsave(&rm->m_rs_lock, flags);
spin_lock(&rs->rs_lock);
__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
rds_message_put(rm); rds_message_put(rm);
} }
} }
...@@ -992,6 +1015,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ...@@ -992,6 +1015,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out; goto out;
} }
if (payload_len > rds_sk_sndbuf(rs)) {
ret = -EMSGSIZE;
goto out;
}
/* size of rm including all sgs */ /* size of rm including all sgs */
ret = rds_rm_size(msg, payload_len); ret = rds_rm_size(msg, payload_len);
if (ret < 0) if (ret < 0)
...@@ -1064,11 +1092,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ...@@ -1064,11 +1092,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
dport, &queued)) { dport, &queued)) {
rds_stats_inc(s_send_queue_full); rds_stats_inc(s_send_queue_full);
/* XXX make sure this is reasonable */
if (payload_len > rds_sk_sndbuf(rs)) {
ret = -EMSGSIZE;
goto out;
}
if (nonblock) { if (nonblock) {
ret = -EAGAIN; ret = -EAGAIN;
goto out; goto out;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment