Commit 29042e19 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: let function tipc_msg_reverse() expand header when needed

The shortest TIPC message header, for cluster local CONNECTED messages,
is 24 bytes long. With this format, the fields "dest_node" and
"orig_node" are optimized away, since they in reality are redundant
in this particular case.

However, the absence of these fields leads to code inconsistencies
that are difficult to handle in some cases, especially when we need
to reverse or reject messages at the socket layer.

In this commit, we concentrate the handling of the absent fields
to one place, by letting the function tipc_msg_reverse() reallocate
the buffer and expand the header to 32 bytes when necessary. This
means that the socket code now can assume that the two previously
absent fields are present in the header when a message needs to be
rejected. This opens up for some further simplifications of the
socket code.
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent a69e5a0d
...@@ -463,43 +463,58 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, ...@@ -463,43 +463,58 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
/** /**
* tipc_msg_reverse(): swap source and destination addresses and add error code * tipc_msg_reverse(): swap source and destination addresses and add error code
* @buf: buffer containing message to be reversed * @own_node: originating node id for reversed message
* @dnode: return value: node where to send message after reversal * @skb: buffer containing message to be reversed; may be replaced.
* @err: error code to be set in message * @err: error code to be set in message, if any
* Consumes buffer if failure * Consumes buffer at failure
* Returns true if success, otherwise false * Returns true if success, otherwise false
*/ */
bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, u32 *dnode, int err)
int err)
{ {
struct tipc_msg *msg = buf_msg(buf); struct sk_buff *_skb = *skb;
struct tipc_msg *hdr = buf_msg(_skb);
struct tipc_msg ohdr; struct tipc_msg ohdr;
uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); int dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE);
if (skb_linearize(buf)) if (skb_linearize(_skb))
goto exit; goto exit;
msg = buf_msg(buf); hdr = buf_msg(_skb);
if (msg_dest_droppable(msg)) if (msg_dest_droppable(hdr))
goto exit; goto exit;
if (msg_errcode(msg)) if (msg_errcode(hdr))
goto exit; goto exit;
memcpy(&ohdr, msg, msg_hdr_sz(msg));
msg_set_errcode(msg, err); /* Take a copy of original header before altering message */
msg_set_origport(msg, msg_destport(&ohdr)); memcpy(&ohdr, hdr, msg_hdr_sz(hdr));
msg_set_destport(msg, msg_origport(&ohdr));
msg_set_prevnode(msg, own_addr); /* Never return SHORT header; expand by replacing buffer if necessary */
if (!msg_short(msg)) { if (msg_short(hdr)) {
msg_set_orignode(msg, msg_destnode(&ohdr)); *skb = tipc_buf_acquire(BASIC_H_SIZE + dlen);
msg_set_destnode(msg, msg_orignode(&ohdr)); if (!*skb)
goto exit;
memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen);
kfree_skb(_skb);
_skb = *skb;
hdr = buf_msg(_skb);
memcpy(hdr, &ohdr, BASIC_H_SIZE);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
} }
msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
skb_trim(buf, msg_size(msg)); /* Now reverse the concerned fields */
skb_orphan(buf); msg_set_errcode(hdr, err);
*dnode = msg_orignode(&ohdr); msg_set_origport(hdr, msg_destport(&ohdr));
msg_set_destport(hdr, msg_origport(&ohdr));
msg_set_destnode(hdr, msg_prevnode(&ohdr));
msg_set_prevnode(hdr, own_node);
msg_set_orignode(hdr, own_node);
msg_set_size(hdr, msg_hdr_sz(hdr) + dlen);
*dnode = msg_destnode(hdr);
skb_trim(_skb, msg_size(hdr));
skb_orphan(_skb);
return true; return true;
exit: exit:
kfree_skb(buf); kfree_skb(_skb);
*dnode = 0; *skb = NULL;
return false; return false;
} }
......
...@@ -785,8 +785,7 @@ static inline bool msg_peer_is_up(struct tipc_msg *m) ...@@ -785,8 +785,7 @@ static inline bool msg_peer_is_up(struct tipc_msg *m)
struct sk_buff *tipc_buf_acquire(u32 size); struct sk_buff *tipc_buf_acquire(u32 size);
bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_validate(struct sk_buff *skb);
bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, u32 *dnode, int err);
int err);
void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
u32 hsize, u32 destnode); u32 hsize, u32 destnode);
struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
......
...@@ -260,7 +260,7 @@ static void tsk_rej_rx_queue(struct sock *sk) ...@@ -260,7 +260,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
u32 own_node = tsk_own_node(tipc_sk(sk)); u32 own_node = tsk_own_node(tipc_sk(sk));
while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) if (tipc_msg_reverse(own_node, &skb, &dnode, TIPC_ERR_NO_PORT))
tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0); tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
} }
} }
...@@ -441,7 +441,7 @@ static int tipc_release(struct socket *sock) ...@@ -441,7 +441,7 @@ static int tipc_release(struct socket *sock)
tsk->connected = 0; tsk->connected = 0;
tipc_node_remove_conn(net, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid);
} }
if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
TIPC_ERR_NO_PORT)) TIPC_ERR_NO_PORT))
tipc_node_xmit_skb(net, skb, dnode, 0); tipc_node_xmit_skb(net, skb, dnode, 0);
} }
...@@ -784,7 +784,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb) ...@@ -784,7 +784,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
if (conn_cong) if (conn_cong)
tsk->sk.sk_write_space(&tsk->sk); tsk->sk.sk_write_space(&tsk->sk);
} else if (msg_type(msg) == CONN_PROBE) { } else if (msg_type(msg) == CONN_PROBE) {
if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_OK)) {
msg_set_type(msg, CONN_PROBE_REPLY); msg_set_type(msg, CONN_PROBE_REPLY);
return; return;
} }
...@@ -1702,7 +1702,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) ...@@ -1702,7 +1702,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
atomic_add(truesize, dcnt); atomic_add(truesize, dcnt);
return 0; return 0;
} }
if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode, -err))
tipc_node_xmit_skb(net, skb, dnode, tsk->portid); tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
return 0; return 0;
} }
...@@ -1796,7 +1796,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) ...@@ -1796,7 +1796,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
goto xmit; goto xmit;
} }
tn = net_generic(net, tipc_net_id); tn = net_generic(net, tipc_net_id);
if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) if (!tipc_msg_reverse(tn->own_addr, &skb, &dnode, -err))
continue; continue;
xmit: xmit:
tipc_node_xmit_skb(net, skb, dnode, dport); tipc_node_xmit_skb(net, skb, dnode, dport);
...@@ -2090,7 +2090,7 @@ static int tipc_shutdown(struct socket *sock, int how) ...@@ -2090,7 +2090,7 @@ static int tipc_shutdown(struct socket *sock, int how)
kfree_skb(skb); kfree_skb(skb);
goto restart; goto restart;
} }
if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
TIPC_CONN_SHUTDOWN)) TIPC_CONN_SHUTDOWN))
tipc_node_xmit_skb(net, skb, dnode, tipc_node_xmit_skb(net, skb, dnode,
tsk->portid); tsk->portid);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment