Commit 40ba3cdf authored by Erik Hugne's avatar Erik Hugne Committed by David S. Miller

tipc: message reassembly using fragment chain

When the first fragment of a long data data message is received on a link, a
reassembly buffer large enough to hold the data from this and all subsequent
fragments of the message is allocated. The payload of each new fragment is
copied into this buffer upon arrival. When the last fragment is received, the
reassembled message is delivered upwards to the port/socket layer.

Not only is this an inefficient approach, but it may also cause bursts of
reassembly failures in low memory situations. since we may fail to allocate
the necessary large buffer in the first place. Furthermore, after 100 subsequent
such failures the link will be reset, something that in reality aggravates the
situation.

To remedy this problem, this patch introduces a different approach. Instead of
allocating a big reassembly buffer, we now append the arriving fragments
to a reassembly chain on the link, and deliver the whole chain up to the
socket layer once the last fragment has been received. This is safe because
the retransmission layer of a TIPC link always delivers packets in strict
uninterrupted order, to the reassembly layer as to all other upper layers.
Hence there can never be more than one fragment chain pending reassembly at
any given time in a link, and we can trust (but still verify) that the
fragments will be chained up in the correct order.
Signed-off-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Reviewed-by: default avatarPaul Gortmaker <paul.gortmaker@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 528f6f4b
...@@ -480,15 +480,19 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -480,15 +480,19 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_link_recv_bundle(buf); tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret = tipc_link_recv_fragment(&node->bclink.defragm, int ret;
&buf, &msg); ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
if (ret < 0) &node->bclink.reasm_tail,
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock; goto unlock;
spin_lock_bh(&bc_lock); spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++; bcl->stats.recv_fragments++;
if (ret > 0) { if (ret == LINK_REASM_COMPLETE) {
bcl->stats.recv_fragmented++; bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf);
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
goto receive; goto receive;
} }
......
...@@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr) ...@@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
*/ */
void tipc_link_reset_fragments(struct tipc_link *l_ptr) void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{ {
struct sk_buff *buf = l_ptr->defragm_buf; kfree_skb(l_ptr->reasm_head);
struct sk_buff *next; l_ptr->reasm_head = NULL;
l_ptr->reasm_tail = NULL;
while (buf) {
next = buf->next;
kfree_skb(buf);
buf = next;
}
l_ptr->defragm_buf = NULL;
} }
/** /**
...@@ -1649,13 +1643,15 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1649,13 +1643,15 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue; continue;
case MSG_FRAGMENTER: case MSG_FRAGMENTER:
l_ptr->stats.recv_fragments++; l_ptr->stats.recv_fragments++;
ret = tipc_link_recv_fragment(&l_ptr->defragm_buf, ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
&buf, &msg); &l_ptr->reasm_tail,
if (ret == 1) { &buf);
if (ret == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++; l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
goto deliver; goto deliver;
} }
if (ret == -1) if (ret == LINK_REASM_ERROR)
l_ptr->next_in_no--; l_ptr->next_in_no--;
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
...@@ -2342,115 +2338,48 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -2342,115 +2338,48 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return dsz; return dsz;
} }
/*
* A pending message being re-assembled must store certain values
* to handle subsequent fragments correctly. The following functions
* help storing these values in unused, available fields in the
* pending message. This makes dynamic memory allocation unnecessary.
*/
static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
{
msg_set_seqno(buf_msg(buf), seqno);
}
static u32 get_fragm_size(struct sk_buff *buf)
{
return msg_ack(buf_msg(buf));
}
static void set_fragm_size(struct sk_buff *buf, u32 sz)
{
msg_set_ack(buf_msg(buf), sz);
}
static u32 get_expected_frags(struct sk_buff *buf)
{
return msg_bcast_ack(buf_msg(buf));
}
static void set_expected_frags(struct sk_buff *buf, u32 exp)
{
msg_set_bcast_ack(buf_msg(buf), exp);
}
/* /*
* tipc_link_recv_fragment(): Called with node lock on. Returns * tipc_link_recv_fragment(): Called with node lock on. Returns
* the reassembled buffer if message is complete. * the reassembled buffer if message is complete.
*/ */
int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
struct tipc_msg **m) struct sk_buff **fbuf)
{ {
struct sk_buff *prev = NULL; struct sk_buff *frag = *fbuf;
struct sk_buff *fbuf = *fb; struct tipc_msg *msg = buf_msg(frag);
struct tipc_msg *fragm = buf_msg(fbuf); u32 fragid = msg_type(msg);
struct sk_buff *pbuf = *pending; bool headstolen;
u32 long_msg_seq_no = msg_long_msgno(fragm); int delta;
*fb = NULL; skb_pull(frag, msg_hdr_sz(msg));
if (fragid == FIRST_FRAGMENT) {
/* Is there an incomplete message waiting for this fragment? */ if (*head || skb_unclone(frag, GFP_ATOMIC))
while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) || goto out_free;
(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { *head = frag;
prev = pbuf; skb_frag_list_init(*head);
pbuf = pbuf->next;
}
if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
u32 msg_sz = msg_size(imsg);
u32 fragm_sz = msg_data_sz(fragm);
u32 exp_fragm_cnt;
u32 max = TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
if (msg_type(imsg) == TIPC_MCAST_MSG)
max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
if (fragm_sz == 0 || msg_size(imsg) > max) {
kfree_skb(fbuf);
return 0; return 0;
} } else if (skb_try_coalesce(*head, frag, &headstolen, &delta)) {
exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz); kfree_skb_partial(frag, headstolen);
pbuf = tipc_buf_acquire(msg_size(imsg));
if (pbuf != NULL) {
pbuf->next = *pending;
*pending = pbuf;
skb_copy_to_linear_data(pbuf, imsg,
msg_data_sz(fragm));
/* Prepare buffer for subsequent fragments. */
set_long_msg_seqno(pbuf, long_msg_seq_no);
set_fragm_size(pbuf, fragm_sz);
set_expected_frags(pbuf, exp_fragm_cnt - 1);
} else { } else {
pr_debug("Link unable to reassemble fragmented message\n"); if (!*head)
kfree_skb(fbuf); goto out_free;
return -1; if (!skb_has_frag_list(*head))
} skb_shinfo(*head)->frag_list = frag;
kfree_skb(fbuf);
return 0;
} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
u32 dsz = msg_data_sz(fragm);
u32 fsz = get_fragm_size(pbuf);
u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
u32 exp_frags = get_expected_frags(pbuf) - 1;
skb_copy_to_linear_data_offset(pbuf, crs,
msg_data(fragm), dsz);
kfree_skb(fbuf);
/* Is message complete? */
if (exp_frags == 0) {
if (prev)
prev->next = pbuf->next;
else else
*pending = pbuf->next; (*tail)->next = frag;
msg_reset_reroute_cnt(buf_msg(pbuf)); *tail = frag;
*fb = pbuf; (*head)->truesize += frag->truesize;
*m = buf_msg(pbuf);
return 1;
} }
set_expected_frags(pbuf, exp_frags); if (fragid == LAST_FRAGMENT) {
return 0; *fbuf = *head;
*tail = *head = NULL;
return LINK_REASM_COMPLETE;
} }
kfree_skb(fbuf);
return 0; return 0;
out_free:
pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
kfree_skb(*fbuf);
return LINK_REASM_ERROR;
} }
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
......
...@@ -40,6 +40,12 @@ ...@@ -40,6 +40,12 @@
#include "msg.h" #include "msg.h"
#include "node.h" #include "node.h"
/*
* Link reassembly status codes
*/
#define LINK_REASM_ERROR -1
#define LINK_REASM_COMPLETE 1
/* /*
* Out-of-range value for link sequence numbers * Out-of-range value for link sequence numbers
*/ */
...@@ -134,7 +140,8 @@ struct tipc_stats { ...@@ -134,7 +140,8 @@ struct tipc_stats {
* @next_out: ptr to first unsent outbound message in queue * @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate * @waiting_ports: linked list of ports waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages * @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @defragm_buf: list of partially reassembled inbound message fragments * @reasm_head: list head of partially reassembled inbound message fragments
* @reasm_tail: last fragment received
* @stats: collects statistics regarding link activity * @stats: collects statistics regarding link activity
*/ */
struct tipc_link { struct tipc_link {
...@@ -196,9 +203,10 @@ struct tipc_link { ...@@ -196,9 +203,10 @@ struct tipc_link {
struct sk_buff *next_out; struct sk_buff *next_out;
struct list_head waiting_ports; struct list_head waiting_ports;
/* Fragmentation/defragmentation */ /* Fragmentation/reassembly */
u32 long_msg_seq_no; u32 long_msg_seq_no;
struct sk_buff *defragm_buf; struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
/* Statistics */ /* Statistics */
struct tipc_stats stats; struct tipc_stats stats;
...@@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender, ...@@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect, struct iovec const *msg_sect,
unsigned int len, u32 destnode); unsigned int len, u32 destnode);
void tipc_link_recv_bundle(struct sk_buff *buf); void tipc_link_recv_bundle(struct sk_buff *buf);
int tipc_link_recv_fragment(struct sk_buff **pending, int tipc_link_recv_fragment(struct sk_buff **reasm_head,
struct sk_buff **fb, struct sk_buff **reasm_tail,
struct tipc_msg **msg); struct sk_buff **fbuf);
void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 gap, u32 tolerance, u32 priority,
u32 acked_mtu); u32 acked_mtu);
......
...@@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) ...@@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n); msg_set_bits(m, 4, 16, 0xffff, n);
} }
static inline u32 msg_fragm_no(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
}
static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{ {
msg_set_bits(m, 4, 16, 0xffff, n); msg_set_bits(m, 4, 16, 0xffff, n);
...@@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n) ...@@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 0, 0xffff, n); msg_set_bits(m, 4, 0, 0xffff, n);
} }
static inline u32 msg_long_msgno(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}
static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n) static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n)
{ {
msg_set_bits(m, 4, 0, 0xffff, n); msg_set_bits(m, 4, 0, 0xffff, n);
......
...@@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr) ...@@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
} }
n_ptr->bclink.deferred_size = 0; n_ptr->bclink.deferred_size = 0;
if (n_ptr->bclink.defragm) { if (n_ptr->bclink.reasm_head) {
kfree_skb(n_ptr->bclink.defragm); kfree_skb(n_ptr->bclink.reasm_head);
n_ptr->bclink.defragm = NULL; n_ptr->bclink.reasm_head = NULL;
n_ptr->bclink.reasm_tail = NULL;
} }
tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_remove_node(n_ptr->addr);
......
...@@ -74,7 +74,8 @@ ...@@ -74,7 +74,8 @@
* @deferred_size: number of OOS b'cast messages in deferred queue * @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node * @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node * @reasm_head: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages * @recv_permitted: true if node is allowed to receive b'cast messages
*/ */
struct tipc_node { struct tipc_node {
...@@ -98,7 +99,8 @@ struct tipc_node { ...@@ -98,7 +99,8 @@ struct tipc_node {
u32 deferred_size; u32 deferred_size;
struct sk_buff *deferred_head; struct sk_buff *deferred_head;
struct sk_buff *deferred_tail; struct sk_buff *deferred_tail;
struct sk_buff *defragm; struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
bool recv_permitted; bool recv_permitted;
} bclink; } bclink;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment