Commit 9cbf22b3 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-4.3' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set mainly includes a change to the way the dlm uses the SCTP API
  in the kernel, removing the direct dependency on the sctp module.
  Other odd SCTP-related fixes are also included.

  The other notable fix is for a long standing regression in the
  behavior of lock value blocks for user space locks"

* tag 'dlm-4.3' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: print error from kernel_sendpage
  dlm: fix lvb copy for user locks
  dlm: sctp_accept_from_sock() can be static
  dlm: fix reconnecting but not sending data
  dlm: replace BUG_ON with a less severe handling
  dlm: use sctp 1-to-1 API
  dlm: fix not reconnecting on connecting error handling
  dlm: fix race while closing connections
  dlm: fix connection stealing if using SCTP
parents ea814ab9 b3a5bbfd
...@@ -120,12 +120,11 @@ struct connection { ...@@ -120,12 +120,11 @@ struct connection {
struct cbuf cb; struct cbuf cb;
int retries; int retries;
#define MAX_CONNECT_RETRIES 3 #define MAX_CONNECT_RETRIES 3
int sctp_assoc;
struct hlist_node list; struct hlist_node list;
struct connection *othercon; struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */ struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */ struct work_struct swork; /* Send workqueue */
bool try_new_addr; void (*orig_error_report)(struct sock *sk);
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
...@@ -252,26 +251,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) ...@@ -252,26 +251,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
return con; return con;
} }
/* This is a bit drastic, but only called when things go wrong */
static struct connection *assoc2con(int assoc_id)
{
int i;
struct connection *con;
mutex_lock(&connections_lock);
for (i = 0 ; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry(con, &connection_hash[i], list) {
if (con->sctp_assoc == assoc_id) {
mutex_unlock(&connections_lock);
return con;
}
}
}
mutex_unlock(&connections_lock);
return NULL;
}
static struct dlm_node_addr *find_node_addr(int nodeid) static struct dlm_node_addr *find_node_addr(int nodeid)
{ {
struct dlm_node_addr *na; struct dlm_node_addr *na;
...@@ -322,14 +301,14 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ...@@ -322,14 +301,14 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
spin_lock(&dlm_node_addrs_spin); spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid); na = find_node_addr(nodeid);
if (na && na->addr_count) { if (na && na->addr_count) {
memcpy(&sas, na->addr[na->curr_addr_index],
sizeof(struct sockaddr_storage));
if (try_new_addr) { if (try_new_addr) {
na->curr_addr_index++; na->curr_addr_index++;
if (na->curr_addr_index == na->addr_count) if (na->curr_addr_index == na->addr_count)
na->curr_addr_index = 0; na->curr_addr_index = 0;
} }
memcpy(&sas, na->addr[na->curr_addr_index ],
sizeof(struct sockaddr_storage));
} }
spin_unlock(&dlm_node_addrs_spin); spin_unlock(&dlm_node_addrs_spin);
...@@ -459,18 +438,23 @@ static inline void lowcomms_connect_sock(struct connection *con) ...@@ -459,18 +438,23 @@ static inline void lowcomms_connect_sock(struct connection *con)
static void lowcomms_state_change(struct sock *sk) static void lowcomms_state_change(struct sock *sk)
{ {
if (sk->sk_state == TCP_ESTABLISHED) /* SCTP layer is not calling sk_data_ready when the connection
* is done, so we catch the signal through here. Also, it
* doesn't switch socket state when entering shutdown, so we
* skip the write in that case.
*/
if (sk->sk_shutdown) {
if (sk->sk_shutdown == RCV_SHUTDOWN)
lowcomms_data_ready(sk);
} else if (sk->sk_state == TCP_ESTABLISHED) {
lowcomms_write_space(sk); lowcomms_write_space(sk);
}
} }
int dlm_lowcomms_connect_node(int nodeid) int dlm_lowcomms_connect_node(int nodeid)
{ {
struct connection *con; struct connection *con;
/* with sctp there's no connecting without sending */
if (dlm_config.ci_protocol != 0)
return 0;
if (nodeid == dlm_our_nodeid()) if (nodeid == dlm_our_nodeid())
return 0; return 0;
...@@ -481,6 +465,43 @@ int dlm_lowcomms_connect_node(int nodeid) ...@@ -481,6 +465,43 @@ int dlm_lowcomms_connect_node(int nodeid)
return 0; return 0;
} }
static void lowcomms_error_report(struct sock *sk)
{
struct connection *con = sock2con(sk);
struct sockaddr_storage saddr;
if (nodeid_to_addr(con->nodeid, &saddr, NULL, false)) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d, port %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, dlm_config.ci_tcp_port,
sk->sk_err, sk->sk_err_soft);
return;
} else if (saddr.ss_family == AF_INET) {
struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d at %pI4, port %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, &sin4->sin_addr.s_addr,
dlm_config.ci_tcp_port, sk->sk_err,
sk->sk_err_soft);
} else {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d at %u.%u.%u.%u, "
"port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, sin6->sin6_addr.s6_addr32[0],
sin6->sin6_addr.s6_addr32[1],
sin6->sin6_addr.s6_addr32[2],
sin6->sin6_addr.s6_addr32[3],
dlm_config.ci_tcp_port, sk->sk_err,
sk->sk_err_soft);
}
con->orig_error_report(sk);
}
/* Make a socket active */ /* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con) static void add_sock(struct socket *sock, struct connection *con)
{ {
...@@ -492,6 +513,8 @@ static void add_sock(struct socket *sock, struct connection *con) ...@@ -492,6 +513,8 @@ static void add_sock(struct socket *sock, struct connection *con)
con->sock->sk->sk_state_change = lowcomms_state_change; con->sock->sk->sk_state_change = lowcomms_state_change;
con->sock->sk->sk_user_data = con; con->sock->sk->sk_user_data = con;
con->sock->sk->sk_allocation = GFP_NOFS; con->sock->sk->sk_allocation = GFP_NOFS;
con->orig_error_report = con->sock->sk->sk_error_report;
con->sock->sk->sk_error_report = lowcomms_error_report;
} }
/* Add the port number to an IPv6 or 4 sockaddr and return the address /* Add the port number to an IPv6 or 4 sockaddr and return the address
...@@ -514,17 +537,24 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, ...@@ -514,17 +537,24 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
} }
/* Close a remote connection and tidy up */ /* Close a remote connection and tidy up */
static void close_connection(struct connection *con, bool and_other) static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx)
{ {
mutex_lock(&con->sock_mutex); clear_bit(CF_CONNECT_PENDING, &con->flags);
clear_bit(CF_WRITE_PENDING, &con->flags);
if (tx && cancel_work_sync(&con->swork))
log_print("canceled swork for node %d", con->nodeid);
if (rx && cancel_work_sync(&con->rwork))
log_print("canceled rwork for node %d", con->nodeid);
mutex_lock(&con->sock_mutex);
if (con->sock) { if (con->sock) {
sock_release(con->sock); sock_release(con->sock);
con->sock = NULL; con->sock = NULL;
} }
if (con->othercon && and_other) { if (con->othercon && and_other) {
/* Will only re-enter once. */ /* Will only re-enter once. */
close_connection(con->othercon, false); close_connection(con->othercon, false, true, true);
} }
if (con->rx_page) { if (con->rx_page) {
__free_page(con->rx_page); __free_page(con->rx_page);
...@@ -535,254 +565,6 @@ static void close_connection(struct connection *con, bool and_other) ...@@ -535,254 +565,6 @@ static void close_connection(struct connection *con, bool and_other)
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
} }
/* We only send shutdown messages to nodes that are not part of the cluster */
static void sctp_send_shutdown(sctp_assoc_t associd)
{
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct msghdr outmessage;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
int ret;
struct connection *con;
con = nodeid2con(0,0);
BUG_ON(con == NULL);
outmessage.msg_name = NULL;
outmessage.msg_namelen = 0;
outmessage.msg_control = outcmsg;
outmessage.msg_controllen = sizeof(outcmsg);
outmessage.msg_flags = MSG_EOR;
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_flags |= MSG_EOF;
sinfo->sinfo_assoc_id = associd;
ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
if (ret != 0)
log_print("send EOF to node failed: %d", ret);
}
static void sctp_init_failed_foreach(struct connection *con)
{
/*
* Don't try to recover base con and handle race where the
* other node's assoc init creates a assoc and we get that
* notification, then we get a notification that our attempt
* failed due. This happens when we are still trying the primary
* address, but the other node has already tried secondary addrs
* and found one that worked.
*/
if (!con->nodeid || con->sctp_assoc)
return;
log_print("Retrying SCTP association init for node %d\n", con->nodeid);
con->try_new_addr = true;
con->sctp_assoc = 0;
if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
queue_work(send_workqueue, &con->swork);
}
}
/* INIT failed but we don't know which node...
restart INIT on all pending nodes */
static void sctp_init_failed(void)
{
mutex_lock(&connections_lock);
foreach_conn(sctp_init_failed_foreach);
mutex_unlock(&connections_lock);
}
static void retry_failed_sctp_send(struct connection *recv_con,
struct sctp_send_failed *sn_send_failed,
char *buf)
{
int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
struct dlm_mhandle *mh;
struct connection *con;
char *retry_buf;
int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
log_print("Retry sending %d bytes to node id %d", len, nodeid);
if (!nodeid) {
log_print("Shouldn't resend data via listening connection.");
return;
}
con = nodeid2con(nodeid, 0);
if (!con) {
log_print("Could not look up con for nodeid %d\n",
nodeid);
return;
}
mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
if (!mh) {
log_print("Could not allocate buf for retry.");
return;
}
memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
dlm_lowcomms_commit_buffer(mh);
/*
* If we got a assoc changed event before the send failed event then
* we only need to retry the send.
*/
if (con->sctp_assoc) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
queue_work(send_workqueue, &con->swork);
} else
sctp_init_failed_foreach(con);
}
/* Something happened to an association */
static void process_sctp_notification(struct connection *con,
struct msghdr *msg, char *buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
struct linger linger;
switch (sn->sn_header.sn_type) {
case SCTP_SEND_FAILED:
retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
break;
case SCTP_ASSOC_CHANGE:
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_UP:
case SCTP_RESTART:
{
/* Check that the new node is in the lockspace */
struct sctp_prim prim;
int nodeid;
int prim_len, ret;
int addr_len;
struct connection *new_con;
/*
* We get this before any data for an association.
* We verify that the node is in the cluster and
* then peel off a socket for it.
*/
if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
log_print("COMM_UP for invalid assoc ID %d",
(int)sn->sn_assoc_change.sac_assoc_id);
sctp_init_failed();
return;
}
memset(&prim, 0, sizeof(struct sctp_prim));
prim_len = sizeof(struct sctp_prim);
prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
ret = kernel_getsockopt(con->sock,
IPPROTO_SCTP,
SCTP_PRIMARY_ADDR,
(char*)&prim,
&prim_len);
if (ret < 0) {
log_print("getsockopt/sctp_primary_addr on "
"new assoc %d failed : %d",
(int)sn->sn_assoc_change.sac_assoc_id,
ret);
/* Retry INIT later */
new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
if (new_con)
clear_bit(CF_CONNECT_PENDING, &con->flags);
return;
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
unsigned char *b=(unsigned char *)&prim.ssp_addr;
log_print("reject connect from unknown addr");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage));
sctp_send_shutdown(prim.ssp_assoc_id);
return;
}
new_con = nodeid2con(nodeid, GFP_NOFS);
if (!new_con)
return;
/* Peel off a new sock */
lock_sock(con->sock->sk);
ret = sctp_do_peeloff(con->sock->sk,
sn->sn_assoc_change.sac_assoc_id,
&new_con->sock);
release_sock(con->sock->sk);
if (ret < 0) {
log_print("Can't peel off a socket for "
"connection %d to node %d: err=%d",
(int)sn->sn_assoc_change.sac_assoc_id,
nodeid, ret);
return;
}
add_sock(new_con->sock, new_con);
linger.l_onoff = 1;
linger.l_linger = 0;
ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER,
(char *)&linger, sizeof(linger));
if (ret < 0)
log_print("set socket option SO_LINGER failed");
log_print("connecting to %d sctp association %d",
nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
new_con->try_new_addr = false;
/* Send any pending writes */
clear_bit(CF_CONNECT_PENDING, &new_con->flags);
clear_bit(CF_INIT_PENDING, &new_con->flags);
if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
queue_work(send_workqueue, &new_con->swork);
}
if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
queue_work(recv_workqueue, &new_con->rwork);
}
break;
case SCTP_COMM_LOST:
case SCTP_SHUTDOWN_COMP:
{
con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
if (con) {
con->sctp_assoc = 0;
}
}
break;
case SCTP_CANT_STR_ASSOC:
{
/* Will retry init when we get the send failed notification */
log_print("Can't start SCTP association - retrying");
}
break;
default:
log_print("unexpected SCTP assoc change id=%d state=%d",
(int)sn->sn_assoc_change.sac_assoc_id,
sn->sn_assoc_change.sac_state);
}
default:
; /* fall through */
}
}
/* Data received from remote end */ /* Data received from remote end */
static int receive_from_sock(struct connection *con) static int receive_from_sock(struct connection *con)
{ {
...@@ -793,7 +575,6 @@ static int receive_from_sock(struct connection *con) ...@@ -793,7 +575,6 @@ static int receive_from_sock(struct connection *con)
int r; int r;
int call_again_soon = 0; int call_again_soon = 0;
int nvec; int nvec;
char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
...@@ -801,6 +582,10 @@ static int receive_from_sock(struct connection *con) ...@@ -801,6 +582,10 @@ static int receive_from_sock(struct connection *con)
ret = -EAGAIN; ret = -EAGAIN;
goto out_close; goto out_close;
} }
if (con->nodeid == 0) {
ret = -EINVAL;
goto out_close;
}
if (con->rx_page == NULL) { if (con->rx_page == NULL) {
/* /*
...@@ -813,11 +598,6 @@ static int receive_from_sock(struct connection *con) ...@@ -813,11 +598,6 @@ static int receive_from_sock(struct connection *con)
cbuf_init(&con->cb, PAGE_CACHE_SIZE); cbuf_init(&con->cb, PAGE_CACHE_SIZE);
} }
/* Only SCTP needs these really */
memset(&incmsg, 0, sizeof(incmsg));
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
/* /*
* iov[0] is the bit of the circular buffer between the current end * iov[0] is the bit of the circular buffer between the current end
* point (cb.base + cb.len) and the end of the buffer. * point (cb.base + cb.len) and the end of the buffer.
...@@ -843,31 +623,18 @@ static int receive_from_sock(struct connection *con) ...@@ -843,31 +623,18 @@ static int receive_from_sock(struct connection *con)
MSG_DONTWAIT | MSG_NOSIGNAL); MSG_DONTWAIT | MSG_NOSIGNAL);
if (ret <= 0) if (ret <= 0)
goto out_close; goto out_close;
else if (ret == len)
/* Process SCTP notifications */
if (msg.msg_flags & MSG_NOTIFICATION) {
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
process_sctp_notification(con, &msg,
page_address(con->rx_page) + con->cb.base);
mutex_unlock(&con->sock_mutex);
return 0;
}
BUG_ON(con->nodeid == 0);
if (ret == len)
call_again_soon = 1; call_again_soon = 1;
cbuf_add(&con->cb, ret); cbuf_add(&con->cb, ret);
ret = dlm_process_incoming_buffer(con->nodeid, ret = dlm_process_incoming_buffer(con->nodeid,
page_address(con->rx_page), page_address(con->rx_page),
con->cb.base, con->cb.len, con->cb.base, con->cb.len,
PAGE_CACHE_SIZE); PAGE_CACHE_SIZE);
if (ret == -EBADMSG) { if (ret == -EBADMSG) {
log_print("lowcomms: addr=%p, base=%u, len=%u, " log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
"iov_len=%u, iov_base[0]=%p, read=%d", page_address(con->rx_page), con->cb.base,
page_address(con->rx_page), con->cb.base, con->cb.len, con->cb.len, r);
len, iov[0].iov_base, r);
} }
if (ret < 0) if (ret < 0)
goto out_close; goto out_close;
...@@ -892,7 +659,7 @@ static int receive_from_sock(struct connection *con) ...@@ -892,7 +659,7 @@ static int receive_from_sock(struct connection *con)
out_close: out_close:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) { if (ret != -EAGAIN) {
close_connection(con, false); close_connection(con, false, true, false);
/* Reconnect when there is something to send */ /* Reconnect when there is something to send */
} }
/* Don't return success if we really got EOF */ /* Don't return success if we really got EOF */
...@@ -1033,6 +800,120 @@ static int tcp_accept_from_sock(struct connection *con) ...@@ -1033,6 +800,120 @@ static int tcp_accept_from_sock(struct connection *con)
return result; return result;
} }
static int sctp_accept_from_sock(struct connection *con)
{
/* Check that the new node is in the lockspace */
struct sctp_prim prim;
int nodeid;
int prim_len, ret;
int addr_len;
struct connection *newcon;
struct connection *addcon;
struct socket *newsock;
mutex_lock(&connections_lock);
if (!dlm_allow_conn) {
mutex_unlock(&connections_lock);
return -1;
}
mutex_unlock(&connections_lock);
mutex_lock_nested(&con->sock_mutex, 0);
ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
if (ret < 0)
goto accept_err;
memset(&prim, 0, sizeof(struct sctp_prim));
prim_len = sizeof(struct sctp_prim);
ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
(char *)&prim, &prim_len);
if (ret < 0) {
log_print("getsockopt/sctp_primary_addr failed: %d", ret);
goto accept_err;
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
unsigned char *b = (unsigned char *)&prim.ssp_addr;
log_print("reject connect from unknown addr");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage));
goto accept_err;
}
newcon = nodeid2con(nodeid, GFP_NOFS);
if (!newcon) {
ret = -ENOMEM;
goto accept_err;
}
mutex_lock_nested(&newcon->sock_mutex, 1);
if (newcon->sock) {
struct connection *othercon = newcon->othercon;
if (!othercon) {
othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
if (!othercon) {
log_print("failed to allocate incoming socket");
mutex_unlock(&newcon->sock_mutex);
ret = -ENOMEM;
goto accept_err;
}
othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex);
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
set_bit(CF_IS_OTHERCON, &othercon->flags);
}
if (!othercon->sock) {
newcon->othercon = othercon;
othercon->sock = newsock;
newsock->sk->sk_user_data = othercon;
add_sock(newsock, othercon);
addcon = othercon;
} else {
printk("Extra connection from node %d attempted\n", nodeid);
ret = -EAGAIN;
mutex_unlock(&newcon->sock_mutex);
goto accept_err;
}
} else {
newsock->sk->sk_user_data = newcon;
newcon->rx_action = receive_from_sock;
add_sock(newsock, newcon);
addcon = newcon;
}
log_print("connected to %d", nodeid);
mutex_unlock(&newcon->sock_mutex);
/*
* Add it to the active queue in case we got data
* between processing the accept adding the socket
* to the read_sockets list
*/
if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
queue_work(recv_workqueue, &addcon->rwork);
mutex_unlock(&con->sock_mutex);
return 0;
accept_err:
mutex_unlock(&con->sock_mutex);
if (newsock)
sock_release(newsock);
if (ret != -EAGAIN)
log_print("error accepting connection from node: %d", ret);
return ret;
}
static void free_entry(struct writequeue_entry *e) static void free_entry(struct writequeue_entry *e)
{ {
__free_page(e->page); __free_page(e->page);
...@@ -1057,97 +938,129 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) ...@@ -1057,97 +938,129 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
} }
} }
/*
* sctp_bind_addrs - bind a SCTP socket to all our addresses
*/
static int sctp_bind_addrs(struct connection *con, uint16_t port)
{
struct sockaddr_storage localaddr;
int i, addr_len, result = 0;
for (i = 0; i < dlm_local_count; i++) {
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, port, &addr_len);
if (!i)
result = kernel_bind(con->sock,
(struct sockaddr *)&localaddr,
addr_len);
else
result = kernel_setsockopt(con->sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)&localaddr, addr_len);
if (result < 0) {
log_print("Can't bind to %d addr number %d, %d.\n",
port, i + 1, result);
break;
}
}
return result;
}
/* Initiate an SCTP association. /* Initiate an SCTP association.
This is a special case of send_to_sock() in that we don't yet have a This is a special case of send_to_sock() in that we don't yet have a
peeled-off socket for this association, so we use the listening socket peeled-off socket for this association, so we use the listening socket
and add the primary IP address of the remote node. and add the primary IP address of the remote node.
*/ */
static void sctp_init_assoc(struct connection *con) static void sctp_connect_to_sock(struct connection *con)
{ {
struct sockaddr_storage rem_addr; struct sockaddr_storage daddr;
char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; int one = 1;
struct msghdr outmessage; int result;
struct cmsghdr *cmsg; int addr_len;
struct sctp_sndrcvinfo *sinfo; struct socket *sock;
struct connection *base_con;
struct writequeue_entry *e; if (con->nodeid == 0) {
int len, offset; log_print("attempt to connect sock 0 foiled");
int ret; return;
int addrlen; }
struct kvec iov[1];
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
goto unlock;
if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, /* Some odd races can cause double-connects, ignore them */
con->try_new_addr)) { if (con->retries++ > MAX_CONNECT_RETRIES)
goto out;
if (con->sock) {
log_print("node %d already connected.", con->nodeid);
goto out;
}
memset(&daddr, 0, sizeof(daddr));
result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid); log_print("no address for nodeid %d", con->nodeid);
goto unlock; goto out;
} }
base_con = nodeid2con(0, 0);
BUG_ON(base_con == NULL);
make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); /* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_SCTP, &sock);
if (result < 0)
goto socket_err;
outmessage.msg_name = &rem_addr; sock->sk->sk_user_data = con;
outmessage.msg_namelen = addrlen; con->rx_action = receive_from_sock;
outmessage.msg_control = outcmsg; con->connect_action = sctp_connect_to_sock;
outmessage.msg_controllen = sizeof(outcmsg); add_sock(sock, con);
outmessage.msg_flags = MSG_EOR;
spin_lock(&con->writequeue_lock); /* Bind to all addresses. */
if (sctp_bind_addrs(con, 0))
goto bind_err;
if (list_empty(&con->writequeue)) { make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
spin_unlock(&con->writequeue_lock);
log_print("writequeue empty for nodeid %d", con->nodeid);
goto unlock;
}
e = list_first_entry(&con->writequeue, struct writequeue_entry, list); log_print("connecting to %d", con->nodeid);
len = e->len;
offset = e->offset;
/* Send the first block off the write queue */ /* Turn off Nagle's algorithm */
iov[0].iov_base = page_address(e->page)+offset; kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
iov[0].iov_len = len; sizeof(one));
spin_unlock(&con->writequeue_lock);
if (rem_addr.ss_family == AF_INET) { result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; O_NONBLOCK);
log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr); if (result == -EINPROGRESS)
} else { result = 0;
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr; if (result == 0)
log_print("Trying to connect to %pI6", &sin6->sin6_addr); goto out;
}
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo->sinfo_flags |= SCTP_ADDR_OVER;
ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); bind_err:
if (ret < 0) { con->sock = NULL;
log_print("Send first packet to node %d failed: %d", sock_release(sock);
con->nodeid, ret);
/* Try again later */ socket_err:
/*
* Some errors are fatal and this list might need adjusting. For other
* errors we try again until the max number of retries is reached.
*/
if (result != -EHOSTUNREACH &&
result != -ENETUNREACH &&
result != -ENETDOWN &&
result != -EINVAL &&
result != -EPROTONOSUPPORT) {
log_print("connect %d try %d error %d", con->nodeid,
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
clear_bit(CF_CONNECT_PENDING, &con->flags); clear_bit(CF_CONNECT_PENDING, &con->flags);
clear_bit(CF_INIT_PENDING, &con->flags); lowcomms_connect_sock(con);
} return;
else {
spin_lock(&con->writequeue_lock);
writequeue_entry_complete(e, ret);
spin_unlock(&con->writequeue_lock);
} }
unlock: out:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
set_bit(CF_WRITE_PENDING, &con->flags);
} }
/* Connect a new socket to its peer */ /* Connect a new socket to its peer */
...@@ -1236,11 +1149,13 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1236,11 +1149,13 @@ static void tcp_connect_to_sock(struct connection *con)
con->retries, result); con->retries, result);
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
msleep(1000); msleep(1000);
clear_bit(CF_CONNECT_PENDING, &con->flags);
lowcomms_connect_sock(con); lowcomms_connect_sock(con);
return; return;
} }
out: out:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
set_bit(CF_WRITE_PENDING, &con->flags);
return; return;
} }
...@@ -1325,37 +1240,11 @@ static void init_local(void) ...@@ -1325,37 +1240,11 @@ static void init_local(void)
} }
} }
/* Bind to an IP address. SCTP allows multiple address so it can do
multi-homing */
static int add_sctp_bind_addr(struct connection *sctp_con,
struct sockaddr_storage *addr,
int addr_len, int num)
{
int result = 0;
if (num == 1)
result = kernel_bind(sctp_con->sock,
(struct sockaddr *) addr,
addr_len);
else
result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)addr, addr_len);
if (result < 0)
log_print("Can't bind to port %d addr number %d",
dlm_config.ci_tcp_port, num);
return result;
}
/* Initialise SCTP socket and bind to all interfaces */ /* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void) static int sctp_listen_for_all(void)
{ {
struct socket *sock = NULL; struct socket *sock = NULL;
struct sockaddr_storage localaddr; int result = -EINVAL;
struct sctp_event_subscribe subscribe;
int result = -EINVAL, num = 1, i, addr_len;
struct connection *con = nodeid2con(0, GFP_NOFS); struct connection *con = nodeid2con(0, GFP_NOFS);
int bufsize = NEEDED_RMEM; int bufsize = NEEDED_RMEM;
int one = 1; int one = 1;
...@@ -1366,33 +1255,17 @@ static int sctp_listen_for_all(void) ...@@ -1366,33 +1255,17 @@ static int sctp_listen_for_all(void)
log_print("Using SCTP for communications"); log_print("Using SCTP for communications");
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_SEQPACKET, IPPROTO_SCTP, &sock); SOCK_STREAM, IPPROTO_SCTP, &sock);
if (result < 0) { if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded"); log_print("Can't create comms socket, check SCTP is loaded");
goto out; goto out;
} }
/* Listen for events */
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_data_io_event = 1;
subscribe.sctp_association_event = 1;
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_shutdown_event = 1;
subscribe.sctp_partial_delivery_event = 1;
result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
(char *)&bufsize, sizeof(bufsize)); (char *)&bufsize, sizeof(bufsize));
if (result) if (result)
log_print("Error increasing buffer space on socket %d", result); log_print("Error increasing buffer space on socket %d", result);
result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
(char *)&subscribe, sizeof(subscribe));
if (result < 0) {
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
result);
goto create_delsock;
}
result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
sizeof(one)); sizeof(one));
if (result < 0) if (result < 0)
...@@ -1402,19 +1275,12 @@ static int sctp_listen_for_all(void) ...@@ -1402,19 +1275,12 @@ static int sctp_listen_for_all(void)
sock->sk->sk_user_data = con; sock->sk->sk_user_data = con;
con->sock = sock; con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready; con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = receive_from_sock; con->rx_action = sctp_accept_from_sock;
con->connect_action = sctp_init_assoc; con->connect_action = sctp_connect_to_sock;
/* Bind to all interfaces. */ /* Bind to all addresses. */
for (i = 0; i < dlm_local_count; i++) { if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); goto create_delsock;
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
if (result)
goto create_delsock;
++num;
}
result = sock->ops->listen(sock, 5); result = sock->ops->listen(sock, 5);
if (result < 0) { if (result < 0) {
...@@ -1612,14 +1478,13 @@ static void send_to_sock(struct connection *con) ...@@ -1612,14 +1478,13 @@ static void send_to_sock(struct connection *con)
send_error: send_error:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
close_connection(con, false); close_connection(con, false, false, true);
lowcomms_connect_sock(con); lowcomms_connect_sock(con);
return; return;
out_connect: out_connect:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
if (!test_bit(CF_INIT_PENDING, &con->flags)) lowcomms_connect_sock(con);
lowcomms_connect_sock(con);
} }
static void clean_one_writequeue(struct connection *con) static void clean_one_writequeue(struct connection *con)
...@@ -1644,15 +1509,9 @@ int dlm_lowcomms_close(int nodeid) ...@@ -1644,15 +1509,9 @@ int dlm_lowcomms_close(int nodeid)
log_print("closing connection to node %d", nodeid); log_print("closing connection to node %d", nodeid);
con = nodeid2con(nodeid, 0); con = nodeid2con(nodeid, 0);
if (con) { if (con) {
clear_bit(CF_CONNECT_PENDING, &con->flags);
clear_bit(CF_WRITE_PENDING, &con->flags);
set_bit(CF_CLOSE, &con->flags); set_bit(CF_CLOSE, &con->flags);
if (cancel_work_sync(&con->swork)) close_connection(con, true, true, true);
log_print("canceled swork for node %d", nodeid);
if (cancel_work_sync(&con->rwork))
log_print("canceled rwork for node %d", nodeid);
clean_one_writequeue(con); clean_one_writequeue(con);
close_connection(con, true);
} }
spin_lock(&dlm_node_addrs_spin); spin_lock(&dlm_node_addrs_spin);
...@@ -1685,10 +1544,8 @@ static void process_send_sockets(struct work_struct *work) ...@@ -1685,10 +1544,8 @@ static void process_send_sockets(struct work_struct *work)
{ {
struct connection *con = container_of(work, struct connection, swork); struct connection *con = container_of(work, struct connection, swork);
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
con->connect_action(con); con->connect_action(con);
set_bit(CF_WRITE_PENDING, &con->flags);
}
if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
send_to_sock(con); send_to_sock(con);
} }
...@@ -1735,7 +1592,7 @@ static void stop_conn(struct connection *con) ...@@ -1735,7 +1592,7 @@ static void stop_conn(struct connection *con)
static void free_conn(struct connection *con) static void free_conn(struct connection *con)
{ {
close_connection(con, true); close_connection(con, true, true, true);
if (con->othercon) if (con->othercon)
kmem_cache_free(con_cache, con->othercon); kmem_cache_free(con_cache, con->othercon);
hlist_del(&con->list); hlist_del(&con->list);
...@@ -1806,7 +1663,7 @@ int dlm_lowcomms_start(void) ...@@ -1806,7 +1663,7 @@ int dlm_lowcomms_start(void)
dlm_allow_conn = 0; dlm_allow_conn = 0;
con = nodeid2con(0,0); con = nodeid2con(0,0);
if (con) { if (con) {
close_connection(con, false); close_connection(con, false, true, true);
kmem_cache_free(con_cache, con); kmem_cache_free(con_cache, con);
} }
fail_destroy: fail_destroy:
......
...@@ -782,6 +782,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -782,6 +782,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
struct dlm_callback cb; struct dlm_callback cb;
int rv, resid, copy_lvb = 0; int rv, resid, copy_lvb = 0;
int old_mode, new_mode;
if (count == sizeof(struct dlm_device_version)) { if (count == sizeof(struct dlm_device_version)) {
rv = copy_version_to_user(buf, count); rv = copy_version_to_user(buf, count);
...@@ -838,6 +839,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -838,6 +839,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list); lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
/* rem_lkb_callback sets a new lkb_last_cast */
old_mode = lkb->lkb_last_cast.mode;
rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid); rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
if (rv < 0) { if (rv < 0) {
/* this shouldn't happen; lkb should have been removed from /* this shouldn't happen; lkb should have been removed from
...@@ -861,9 +865,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -861,9 +865,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
} }
if (cb.flags & DLM_CB_CAST) { if (cb.flags & DLM_CB_CAST) {
int old_mode, new_mode;
old_mode = lkb->lkb_last_cast.mode;
new_mode = cb.mode; new_mode = cb.mode;
if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr && if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
/* Version of the device interface */ /* Version of the device interface */
#define DLM_DEVICE_VERSION_MAJOR 6 #define DLM_DEVICE_VERSION_MAJOR 6
#define DLM_DEVICE_VERSION_MINOR 0 #define DLM_DEVICE_VERSION_MINOR 0
#define DLM_DEVICE_VERSION_PATCH 1 #define DLM_DEVICE_VERSION_PATCH 2
/* struct passed to the lock write */ /* struct passed to the lock write */
struct dlm_lock_params { struct dlm_lock_params {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment