Commit b91c06c5 authored by David S. Miller's avatar David S. Miller

Merge branch 'mptcp-introduce-support-for-real-multipath-xmit'

Paolo Abeni says:

====================
mptcp: introduce support for real multipath xmit

This series enable MPTCP socket to transmit data on multiple subflows
concurrently in a load balancing scenario.

First the receive code path is refactored to better deal with out-of-order
data (patches 1-7). An RB-tree is introduced to queue MPTCP-level out-of-order
data, closely resembling the TCP level OoO handling.

When data is sent on multiple subflows, the peer can easily see OoO - "future"
data at the MPTCP level, especially if speeds, delay, or jitter are not
symmetric.

The other major change regards the netlink PM, which is extended to allow
creating non backup subflows in patches 9-11.

There are a few smaller additions, like the introduction of OoO related mibs,
send buffer autotuning and better ack handling.

Finally a bunch of new self-tests is introduced. The new feature is tested
ensuring that the B/W used by an MPTCP socket using multiple subflows matches
the link aggregated B/W - we use low B/W virtual links, to ensure the tests
are not CPU bounded.

v1 -> v2:
  - fix 32 bit build breakage
  - fix a bunch of checkpatch issues
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 26cdb8f7 1a418cb8
...@@ -1414,6 +1414,8 @@ static inline int tcp_full_space(const struct sock *sk) ...@@ -1414,6 +1414,8 @@ static inline int tcp_full_space(const struct sock *sk)
return tcp_win_from_space(sk, READ_ONCE(sk->sk_rcvbuf)); return tcp_win_from_space(sk, READ_ONCE(sk->sk_rcvbuf));
} }
void tcp_cleanup_rbuf(struct sock *sk, int copied);
/* We provision sk_rcvbuf around 200% of sk_rcvlowat. /* We provision sk_rcvbuf around 200% of sk_rcvlowat.
* If 87.5 % (7/8) of the space has been consumed, we want to override * If 87.5 % (7/8) of the space has been consumed, we want to override
* SO_RCVLOWAT constraint, since we are receiving skbs with too small * SO_RCVLOWAT constraint, since we are receiving skbs with too small
......
...@@ -1527,7 +1527,7 @@ static int tcp_peek_sndq(struct sock *sk, struct msghdr *msg, int len) ...@@ -1527,7 +1527,7 @@ static int tcp_peek_sndq(struct sock *sk, struct msghdr *msg, int len)
* calculation of whether or not we must ACK for the sake of * calculation of whether or not we must ACK for the sake of
* a window update. * a window update.
*/ */
static void tcp_cleanup_rbuf(struct sock *sk, int copied) void tcp_cleanup_rbuf(struct sock *sk, int copied)
{ {
struct tcp_sock *tp = tcp_sk(sk); struct tcp_sock *tp = tcp_sk(sk);
bool time_to_ack = false; bool time_to_ack = false;
......
...@@ -22,6 +22,11 @@ static const struct snmp_mib mptcp_snmp_list[] = { ...@@ -22,6 +22,11 @@ static const struct snmp_mib mptcp_snmp_list[] = {
SNMP_MIB_ITEM("MPJoinAckHMacFailure", MPTCP_MIB_JOINACKMAC), SNMP_MIB_ITEM("MPJoinAckHMacFailure", MPTCP_MIB_JOINACKMAC),
SNMP_MIB_ITEM("DSSNotMatching", MPTCP_MIB_DSSNOMATCH), SNMP_MIB_ITEM("DSSNotMatching", MPTCP_MIB_DSSNOMATCH),
SNMP_MIB_ITEM("InfiniteMapRx", MPTCP_MIB_INFINITEMAPRX), SNMP_MIB_ITEM("InfiniteMapRx", MPTCP_MIB_INFINITEMAPRX),
SNMP_MIB_ITEM("OFOQueueTail", MPTCP_MIB_OFOQUEUETAIL),
SNMP_MIB_ITEM("OFOQueue", MPTCP_MIB_OFOQUEUE),
SNMP_MIB_ITEM("OFOMerge", MPTCP_MIB_OFOMERGE),
SNMP_MIB_ITEM("NoDSSInWindow", MPTCP_MIB_NODSSWINDOW),
SNMP_MIB_ITEM("DuplicateData", MPTCP_MIB_DUPDATA),
SNMP_MIB_SENTINEL SNMP_MIB_SENTINEL
}; };
......
...@@ -15,6 +15,11 @@ enum linux_mptcp_mib_field { ...@@ -15,6 +15,11 @@ enum linux_mptcp_mib_field {
MPTCP_MIB_JOINACKMAC, /* HMAC was wrong on ACK + MP_JOIN */ MPTCP_MIB_JOINACKMAC, /* HMAC was wrong on ACK + MP_JOIN */
MPTCP_MIB_DSSNOMATCH, /* Received a new mapping that did not match the previous one */ MPTCP_MIB_DSSNOMATCH, /* Received a new mapping that did not match the previous one */
MPTCP_MIB_INFINITEMAPRX, /* Received an infinite mapping */ MPTCP_MIB_INFINITEMAPRX, /* Received an infinite mapping */
MPTCP_MIB_OFOQUEUETAIL, /* Segments inserted into OoO queue tail */
MPTCP_MIB_OFOQUEUE, /* Segments inserted into OoO queue */
MPTCP_MIB_OFOMERGE, /* Segments merged in OoO queue */
MPTCP_MIB_NODSSWINDOW, /* Segments not in MPTCP windows */
MPTCP_MIB_DUPDATA, /* Segments discarded due to duplicate DSS */
__MPTCP_MIB_MAX __MPTCP_MIB_MAX
}; };
......
...@@ -23,8 +23,6 @@ static int pm_nl_pernet_id; ...@@ -23,8 +23,6 @@ static int pm_nl_pernet_id;
struct mptcp_pm_addr_entry { struct mptcp_pm_addr_entry {
struct list_head list; struct list_head list;
unsigned int flags;
int ifindex;
struct mptcp_addr_info addr; struct mptcp_addr_info addr;
struct rcu_head rcu; struct rcu_head rcu;
}; };
...@@ -119,7 +117,7 @@ select_local_address(const struct pm_nl_pernet *pernet, ...@@ -119,7 +117,7 @@ select_local_address(const struct pm_nl_pernet *pernet,
rcu_read_lock(); rcu_read_lock();
spin_lock_bh(&msk->join_list_lock); spin_lock_bh(&msk->join_list_lock);
list_for_each_entry_rcu(entry, &pernet->local_addr_list, list) { list_for_each_entry_rcu(entry, &pernet->local_addr_list, list) {
if (!(entry->flags & MPTCP_PM_ADDR_FLAG_SUBFLOW)) if (!(entry->addr.flags & MPTCP_PM_ADDR_FLAG_SUBFLOW))
continue; continue;
/* avoid any address already in use by subflows and /* avoid any address already in use by subflows and
...@@ -150,7 +148,7 @@ select_signal_address(struct pm_nl_pernet *pernet, unsigned int pos) ...@@ -150,7 +148,7 @@ select_signal_address(struct pm_nl_pernet *pernet, unsigned int pos)
* can lead to additional addresses not being announced. * can lead to additional addresses not being announced.
*/ */
list_for_each_entry_rcu(entry, &pernet->local_addr_list, list) { list_for_each_entry_rcu(entry, &pernet->local_addr_list, list) {
if (!(entry->flags & MPTCP_PM_ADDR_FLAG_SIGNAL)) if (!(entry->addr.flags & MPTCP_PM_ADDR_FLAG_SIGNAL))
continue; continue;
if (i++ == pos) { if (i++ == pos) {
ret = entry; ret = entry;
...@@ -210,8 +208,7 @@ static void mptcp_pm_create_subflow_or_signal_addr(struct mptcp_sock *msk) ...@@ -210,8 +208,7 @@ static void mptcp_pm_create_subflow_or_signal_addr(struct mptcp_sock *msk)
msk->pm.subflows++; msk->pm.subflows++;
check_work_pending(msk); check_work_pending(msk);
spin_unlock_bh(&msk->pm.lock); spin_unlock_bh(&msk->pm.lock);
__mptcp_subflow_connect(sk, local->ifindex, __mptcp_subflow_connect(sk, &local->addr, &remote);
&local->addr, &remote);
spin_lock_bh(&msk->pm.lock); spin_lock_bh(&msk->pm.lock);
return; return;
} }
...@@ -257,13 +254,13 @@ void mptcp_pm_nl_add_addr_received(struct mptcp_sock *msk) ...@@ -257,13 +254,13 @@ void mptcp_pm_nl_add_addr_received(struct mptcp_sock *msk)
local.family = remote.family; local.family = remote.family;
spin_unlock_bh(&msk->pm.lock); spin_unlock_bh(&msk->pm.lock);
__mptcp_subflow_connect((struct sock *)msk, 0, &local, &remote); __mptcp_subflow_connect((struct sock *)msk, &local, &remote);
spin_lock_bh(&msk->pm.lock); spin_lock_bh(&msk->pm.lock);
} }
static bool address_use_port(struct mptcp_pm_addr_entry *entry) static bool address_use_port(struct mptcp_pm_addr_entry *entry)
{ {
return (entry->flags & return (entry->addr.flags &
(MPTCP_PM_ADDR_FLAG_SIGNAL | MPTCP_PM_ADDR_FLAG_SUBFLOW)) == (MPTCP_PM_ADDR_FLAG_SIGNAL | MPTCP_PM_ADDR_FLAG_SUBFLOW)) ==
MPTCP_PM_ADDR_FLAG_SIGNAL; MPTCP_PM_ADDR_FLAG_SIGNAL;
} }
...@@ -293,9 +290,9 @@ static int mptcp_pm_nl_append_new_local_addr(struct pm_nl_pernet *pernet, ...@@ -293,9 +290,9 @@ static int mptcp_pm_nl_append_new_local_addr(struct pm_nl_pernet *pernet,
goto out; goto out;
} }
if (entry->flags & MPTCP_PM_ADDR_FLAG_SIGNAL) if (entry->addr.flags & MPTCP_PM_ADDR_FLAG_SIGNAL)
pernet->add_addr_signal_max++; pernet->add_addr_signal_max++;
if (entry->flags & MPTCP_PM_ADDR_FLAG_SUBFLOW) if (entry->addr.flags & MPTCP_PM_ADDR_FLAG_SUBFLOW)
pernet->local_addr_max++; pernet->local_addr_max++;
entry->addr.id = pernet->next_id++; entry->addr.id = pernet->next_id++;
...@@ -345,8 +342,9 @@ int mptcp_pm_nl_get_local_id(struct mptcp_sock *msk, struct sock_common *skc) ...@@ -345,8 +342,9 @@ int mptcp_pm_nl_get_local_id(struct mptcp_sock *msk, struct sock_common *skc)
if (!entry) if (!entry)
return -ENOMEM; return -ENOMEM;
entry->flags = 0;
entry->addr = skc_local; entry->addr = skc_local;
entry->addr.ifindex = 0;
entry->addr.flags = 0;
ret = mptcp_pm_nl_append_new_local_addr(pernet, entry); ret = mptcp_pm_nl_append_new_local_addr(pernet, entry);
if (ret < 0) if (ret < 0)
kfree(entry); kfree(entry);
...@@ -460,14 +458,17 @@ static int mptcp_pm_parse_addr(struct nlattr *attr, struct genl_info *info, ...@@ -460,14 +458,17 @@ static int mptcp_pm_parse_addr(struct nlattr *attr, struct genl_info *info,
entry->addr.addr.s_addr = nla_get_in_addr(tb[addr_addr]); entry->addr.addr.s_addr = nla_get_in_addr(tb[addr_addr]);
skip_family: skip_family:
if (tb[MPTCP_PM_ADDR_ATTR_IF_IDX]) if (tb[MPTCP_PM_ADDR_ATTR_IF_IDX]) {
entry->ifindex = nla_get_s32(tb[MPTCP_PM_ADDR_ATTR_IF_IDX]); u32 val = nla_get_s32(tb[MPTCP_PM_ADDR_ATTR_IF_IDX]);
entry->addr.ifindex = val;
}
if (tb[MPTCP_PM_ADDR_ATTR_ID]) if (tb[MPTCP_PM_ADDR_ATTR_ID])
entry->addr.id = nla_get_u8(tb[MPTCP_PM_ADDR_ATTR_ID]); entry->addr.id = nla_get_u8(tb[MPTCP_PM_ADDR_ATTR_ID]);
if (tb[MPTCP_PM_ADDR_ATTR_FLAGS]) if (tb[MPTCP_PM_ADDR_ATTR_FLAGS])
entry->flags = nla_get_u32(tb[MPTCP_PM_ADDR_ATTR_FLAGS]); entry->addr.flags = nla_get_u32(tb[MPTCP_PM_ADDR_ATTR_FLAGS]);
return 0; return 0;
} }
...@@ -535,9 +536,9 @@ static int mptcp_nl_cmd_del_addr(struct sk_buff *skb, struct genl_info *info) ...@@ -535,9 +536,9 @@ static int mptcp_nl_cmd_del_addr(struct sk_buff *skb, struct genl_info *info)
ret = -EINVAL; ret = -EINVAL;
goto out; goto out;
} }
if (entry->flags & MPTCP_PM_ADDR_FLAG_SIGNAL) if (entry->addr.flags & MPTCP_PM_ADDR_FLAG_SIGNAL)
pernet->add_addr_signal_max--; pernet->add_addr_signal_max--;
if (entry->flags & MPTCP_PM_ADDR_FLAG_SUBFLOW) if (entry->addr.flags & MPTCP_PM_ADDR_FLAG_SUBFLOW)
pernet->local_addr_max--; pernet->local_addr_max--;
pernet->addrs--; pernet->addrs--;
...@@ -593,10 +594,10 @@ static int mptcp_nl_fill_addr(struct sk_buff *skb, ...@@ -593,10 +594,10 @@ static int mptcp_nl_fill_addr(struct sk_buff *skb,
goto nla_put_failure; goto nla_put_failure;
if (nla_put_u8(skb, MPTCP_PM_ADDR_ATTR_ID, addr->id)) if (nla_put_u8(skb, MPTCP_PM_ADDR_ATTR_ID, addr->id))
goto nla_put_failure; goto nla_put_failure;
if (nla_put_u32(skb, MPTCP_PM_ADDR_ATTR_FLAGS, entry->flags)) if (nla_put_u32(skb, MPTCP_PM_ADDR_ATTR_FLAGS, entry->addr.flags))
goto nla_put_failure; goto nla_put_failure;
if (entry->ifindex && if (entry->addr.ifindex &&
nla_put_s32(skb, MPTCP_PM_ADDR_ATTR_IF_IDX, entry->ifindex)) nla_put_s32(skb, MPTCP_PM_ADDR_ATTR_IF_IDX, entry->addr.ifindex))
goto nla_put_failure; goto nla_put_failure;
if (addr->family == AF_INET && if (addr->family == AF_INET &&
......
...@@ -32,6 +32,8 @@ struct mptcp6_sock { ...@@ -32,6 +32,8 @@ struct mptcp6_sock {
#endif #endif
struct mptcp_skb_cb { struct mptcp_skb_cb {
u64 map_seq;
u64 end_seq;
u32 offset; u32 offset;
}; };
...@@ -110,64 +112,204 @@ static int __mptcp_socket_create(struct mptcp_sock *msk) ...@@ -110,64 +112,204 @@ static int __mptcp_socket_create(struct mptcp_sock *msk)
return 0; return 0;
} }
static void __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk, static void mptcp_drop(struct sock *sk, struct sk_buff *skb)
struct sk_buff *skb, {
unsigned int offset, size_t copy_len) sk_drops_add(sk, skb);
__kfree_skb(skb);
}
static bool mptcp_try_coalesce(struct sock *sk, struct sk_buff *to,
struct sk_buff *from)
{
bool fragstolen;
int delta;
if (MPTCP_SKB_CB(from)->offset ||
!skb_try_coalesce(to, from, &fragstolen, &delta))
return false;
pr_debug("colesced seq %llx into %llx new len %d new end seq %llx",
MPTCP_SKB_CB(from)->map_seq, MPTCP_SKB_CB(to)->map_seq,
to->len, MPTCP_SKB_CB(from)->end_seq);
MPTCP_SKB_CB(to)->end_seq = MPTCP_SKB_CB(from)->end_seq;
kfree_skb_partial(from, fragstolen);
atomic_add(delta, &sk->sk_rmem_alloc);
sk_mem_charge(sk, delta);
return true;
}
static bool mptcp_ooo_try_coalesce(struct mptcp_sock *msk, struct sk_buff *to,
struct sk_buff *from)
{
if (MPTCP_SKB_CB(from)->map_seq != MPTCP_SKB_CB(to)->end_seq)
return false;
return mptcp_try_coalesce((struct sock *)msk, to, from);
}
/* "inspired" by tcp_data_queue_ofo(), main differences:
* - use mptcp seqs
* - don't cope with sacks
*/
static void mptcp_data_queue_ofo(struct mptcp_sock *msk, struct sk_buff *skb)
{ {
struct sock *sk = (struct sock *)msk; struct sock *sk = (struct sock *)msk;
struct sk_buff *tail; struct rb_node **p, *parent;
u64 seq, end_seq, max_seq;
struct sk_buff *skb1;
seq = MPTCP_SKB_CB(skb)->map_seq;
end_seq = MPTCP_SKB_CB(skb)->end_seq;
max_seq = tcp_space(sk);
max_seq = max_seq > 0 ? max_seq + msk->ack_seq : msk->ack_seq;
pr_debug("msk=%p seq=%llx limit=%llx empty=%d", msk, seq, max_seq,
RB_EMPTY_ROOT(&msk->out_of_order_queue));
if (after64(seq, max_seq)) {
/* out of window */
mptcp_drop(sk, skb);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_NODSSWINDOW);
return;
}
__skb_unlink(skb, &ssk->sk_receive_queue); p = &msk->out_of_order_queue.rb_node;
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_OFOQUEUE);
if (RB_EMPTY_ROOT(&msk->out_of_order_queue)) {
rb_link_node(&skb->rbnode, NULL, p);
rb_insert_color(&skb->rbnode, &msk->out_of_order_queue);
msk->ooo_last_skb = skb;
goto end;
}
skb_ext_reset(skb); /* with 2 subflows, adding at end of ooo queue is quite likely
skb_orphan(skb); * Use of ooo_last_skb avoids the O(Log(N)) rbtree lookup.
msk->ack_seq += copy_len; */
if (mptcp_ooo_try_coalesce(msk, msk->ooo_last_skb, skb)) {
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_OFOMERGE);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_OFOQUEUETAIL);
return;
}
tail = skb_peek_tail(&sk->sk_receive_queue); /* Can avoid an rbtree lookup if we are adding skb after ooo_last_skb */
if (offset == 0 && tail) { if (!before64(seq, MPTCP_SKB_CB(msk->ooo_last_skb)->end_seq)) {
bool fragstolen; MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_OFOQUEUETAIL);
int delta; parent = &msk->ooo_last_skb->rbnode;
p = &parent->rb_right;
goto insert;
}
if (skb_try_coalesce(tail, skb, &fragstolen, &delta)) { /* Find place to insert this segment. Handle overlaps on the way. */
kfree_skb_partial(skb, fragstolen); parent = NULL;
atomic_add(delta, &sk->sk_rmem_alloc); while (*p) {
sk_mem_charge(sk, delta); parent = *p;
skb1 = rb_to_skb(parent);
if (before64(seq, MPTCP_SKB_CB(skb1)->map_seq)) {
p = &parent->rb_left;
continue;
}
if (before64(seq, MPTCP_SKB_CB(skb1)->end_seq)) {
if (!after64(end_seq, MPTCP_SKB_CB(skb1)->end_seq)) {
/* All the bits are present. Drop. */
mptcp_drop(sk, skb);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_DUPDATA);
return;
}
if (after64(seq, MPTCP_SKB_CB(skb1)->map_seq)) {
/* partial overlap:
* | skb |
* | skb1 |
* continue traversing
*/
} else {
/* skb's seq == skb1's seq and skb covers skb1.
* Replace skb1 with skb.
*/
rb_replace_node(&skb1->rbnode, &skb->rbnode,
&msk->out_of_order_queue);
mptcp_drop(sk, skb1);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_DUPDATA);
goto merge_right;
}
} else if (mptcp_ooo_try_coalesce(msk, skb1, skb)) {
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_OFOMERGE);
return; return;
} }
p = &parent->rb_right;
} }
skb_set_owner_r(skb, sk); insert:
__skb_queue_tail(&sk->sk_receive_queue, skb); /* Insert segment into RB tree. */
MPTCP_SKB_CB(skb)->offset = offset; rb_link_node(&skb->rbnode, parent, p);
} rb_insert_color(&skb->rbnode, &msk->out_of_order_queue);
static void mptcp_stop_timer(struct sock *sk) merge_right:
{ /* Remove other segments covered by skb. */
struct inet_connection_sock *icsk = inet_csk(sk); while ((skb1 = skb_rb_next(skb)) != NULL) {
if (before64(end_seq, MPTCP_SKB_CB(skb1)->end_seq))
break;
rb_erase(&skb1->rbnode, &msk->out_of_order_queue);
mptcp_drop(sk, skb1);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_DUPDATA);
}
/* If there is no skb after us, we are the last_skb ! */
if (!skb1)
msk->ooo_last_skb = skb;
sk_stop_timer(sk, &icsk->icsk_retransmit_timer); end:
mptcp_sk(sk)->timer_ival = 0; skb_condense(skb);
skb_set_owner_r(skb, sk);
} }
/* both sockets must be locked */ static bool __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
static bool mptcp_subflow_dsn_valid(const struct mptcp_sock *msk, struct sk_buff *skb, unsigned int offset,
struct sock *ssk) size_t copy_len)
{ {
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk); struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
u64 dsn = mptcp_subflow_get_mapped_dsn(subflow); struct sock *sk = (struct sock *)msk;
struct sk_buff *tail;
/* revalidate data sequence number. __skb_unlink(skb, &ssk->sk_receive_queue);
*
* mptcp_subflow_data_available() is usually called skb_ext_reset(skb);
* without msk lock. Its unlikely (but possible) skb_orphan(skb);
* that msk->ack_seq has been advanced since the last
* call found in-sequence data. /* the skb map_seq accounts for the skb offset:
* mptcp_subflow_get_mapped_dsn() is based on the current tp->copied_seq
* value
*/ */
if (likely(dsn == msk->ack_seq)) MPTCP_SKB_CB(skb)->map_seq = mptcp_subflow_get_mapped_dsn(subflow);
MPTCP_SKB_CB(skb)->end_seq = MPTCP_SKB_CB(skb)->map_seq + copy_len;
MPTCP_SKB_CB(skb)->offset = offset;
if (MPTCP_SKB_CB(skb)->map_seq == msk->ack_seq) {
/* in sequence */
msk->ack_seq += copy_len;
tail = skb_peek_tail(&sk->sk_receive_queue);
if (tail && mptcp_try_coalesce(sk, tail, skb))
return true;
skb_set_owner_r(skb, sk);
__skb_queue_tail(&sk->sk_receive_queue, skb);
return true; return true;
} else if (after64(MPTCP_SKB_CB(skb)->map_seq, msk->ack_seq)) {
mptcp_data_queue_ofo(msk, skb);
return false;
}
subflow->data_avail = 0; /* old data, keep it simple and drop the whole pkt, sender
return mptcp_subflow_data_available(ssk); * will retransmit as needed, if needed.
*/
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_DUPDATA);
mptcp_drop(sk, skb);
return false;
}
static void mptcp_stop_timer(struct sock *sk)
{
struct inet_connection_sock *icsk = inet_csk(sk);
sk_stop_timer(sk, &icsk->icsk_retransmit_timer);
mptcp_sk(sk)->timer_ival = 0;
} }
static void mptcp_check_data_fin_ack(struct sock *sk) static void mptcp_check_data_fin_ack(struct sock *sk)
...@@ -313,11 +455,7 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, ...@@ -313,11 +455,7 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
struct tcp_sock *tp; struct tcp_sock *tp;
bool done = false; bool done = false;
if (!mptcp_subflow_dsn_valid(msk, ssk)) { pr_debug("msk=%p ssk=%p", msk, ssk);
*bytes = 0;
return false;
}
tp = tcp_sk(ssk); tp = tcp_sk(ssk);
do { do {
u32 map_remaining, offset; u32 map_remaining, offset;
...@@ -355,9 +493,9 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, ...@@ -355,9 +493,9 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
if (tp->urg_data) if (tp->urg_data)
done = true; done = true;
__mptcp_move_skb(msk, ssk, skb, offset, len); if (__mptcp_move_skb(msk, ssk, skb, offset, len))
moved += len;
seq += len; seq += len;
moved += len;
if (WARN_ON_ONCE(map_remaining < len)) if (WARN_ON_ONCE(map_remaining < len))
break; break;
...@@ -376,20 +514,56 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, ...@@ -376,20 +514,56 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
} }
} while (more_data_avail); } while (more_data_avail);
*bytes = moved; *bytes += moved;
if (moved)
/* If the moves have caught up with the DATA_FIN sequence number tcp_cleanup_rbuf(ssk, moved);
* it's time to ack the DATA_FIN and change socket state, but
* this is not a good place to change state. Let the workqueue
* do it.
*/
if (mptcp_pending_data_fin(sk, NULL) &&
schedule_work(&msk->work))
sock_hold(sk);
return done; return done;
} }
static bool mptcp_ofo_queue(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb, *tail;
bool moved = false;
struct rb_node *p;
u64 end_seq;
p = rb_first(&msk->out_of_order_queue);
pr_debug("msk=%p empty=%d", msk, RB_EMPTY_ROOT(&msk->out_of_order_queue));
while (p) {
skb = rb_to_skb(p);
if (after64(MPTCP_SKB_CB(skb)->map_seq, msk->ack_seq))
break;
p = rb_next(p);
rb_erase(&skb->rbnode, &msk->out_of_order_queue);
if (unlikely(!after64(MPTCP_SKB_CB(skb)->end_seq,
msk->ack_seq))) {
mptcp_drop(sk, skb);
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_DUPDATA);
continue;
}
end_seq = MPTCP_SKB_CB(skb)->end_seq;
tail = skb_peek_tail(&sk->sk_receive_queue);
if (!tail || !mptcp_ooo_try_coalesce(msk, tail, skb)) {
int delta = msk->ack_seq - MPTCP_SKB_CB(skb)->map_seq;
/* skip overlapping data, if any */
pr_debug("uncoalesced seq=%llx ack seq=%llx delta=%d",
MPTCP_SKB_CB(skb)->map_seq, msk->ack_seq,
delta);
MPTCP_SKB_CB(skb)->offset += delta;
__skb_queue_tail(&sk->sk_receive_queue, skb);
}
msk->ack_seq = end_seq;
moved = true;
}
return moved;
}
/* In most cases we will be able to lock the mptcp socket. If its already /* In most cases we will be able to lock the mptcp socket. If its already
* owned, we need to defer to the work queue to avoid ABBA deadlock. * owned, we need to defer to the work queue to avoid ABBA deadlock.
*/ */
...@@ -405,8 +579,19 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) ...@@ -405,8 +579,19 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
return false; return false;
/* must re-check after taking the lock */ /* must re-check after taking the lock */
if (!READ_ONCE(sk->sk_lock.owned)) if (!READ_ONCE(sk->sk_lock.owned)) {
__mptcp_move_skbs_from_subflow(msk, ssk, &moved); __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
mptcp_ofo_queue(msk);
/* If the moves have caught up with the DATA_FIN sequence number
* it's time to ack the DATA_FIN and change socket state, but
* this is not a good place to change state. Let the workqueue
* do it.
*/
if (mptcp_pending_data_fin(sk, NULL) &&
schedule_work(&msk->work))
sock_hold(sk);
}
spin_unlock_bh(&sk->sk_lock.slock); spin_unlock_bh(&sk->sk_lock.slock);
...@@ -415,9 +600,17 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) ...@@ -415,9 +600,17 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
void mptcp_data_ready(struct sock *sk, struct sock *ssk) void mptcp_data_ready(struct sock *sk, struct sock *ssk)
{ {
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
bool wake;
set_bit(MPTCP_DATA_READY, &msk->flags); /* move_skbs_to_msk below can legitly clear the data_avail flag,
* but we will need later to properly woke the reader, cache its
* value
*/
wake = subflow->data_avail == MPTCP_SUBFLOW_DATA_AVAIL;
if (wake)
set_bit(MPTCP_DATA_READY, &msk->flags);
if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) && if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) &&
move_skbs_to_msk(msk, ssk)) move_skbs_to_msk(msk, ssk))
...@@ -438,7 +631,8 @@ void mptcp_data_ready(struct sock *sk, struct sock *ssk) ...@@ -438,7 +631,8 @@ void mptcp_data_ready(struct sock *sk, struct sock *ssk)
move_skbs_to_msk(msk, ssk); move_skbs_to_msk(msk, ssk);
} }
wake: wake:
sk->sk_data_ready(sk); if (wake)
sk->sk_data_ready(sk);
} }
static void __mptcp_flush_join_list(struct mptcp_sock *msk) static void __mptcp_flush_join_list(struct mptcp_sock *msk)
...@@ -472,7 +666,7 @@ void mptcp_data_acked(struct sock *sk) ...@@ -472,7 +666,7 @@ void mptcp_data_acked(struct sock *sk)
{ {
mptcp_reset_timer(sk); mptcp_reset_timer(sk);
if ((!sk_stream_is_writeable(sk) || if ((!test_bit(MPTCP_SEND_SPACE, &mptcp_sk(sk)->flags) ||
(inet_sk_state_load(sk) != TCP_ESTABLISHED)) && (inet_sk_state_load(sk) != TCP_ESTABLISHED)) &&
schedule_work(&mptcp_sk(sk)->work)) schedule_work(&mptcp_sk(sk)->work))
sock_hold(sk); sock_hold(sk);
...@@ -567,6 +761,20 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag) ...@@ -567,6 +761,20 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag)
put_page(dfrag->page); put_page(dfrag->page);
} }
static bool mptcp_is_writeable(struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
if (!sk_stream_is_writeable((struct sock *)msk))
return false;
mptcp_for_each_subflow(msk, subflow) {
if (sk_stream_is_writeable(subflow->tcp_sock))
return true;
}
return false;
}
static void mptcp_clean_una(struct sock *sk) static void mptcp_clean_una(struct sock *sk)
{ {
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
...@@ -609,8 +817,15 @@ static void mptcp_clean_una(struct sock *sk) ...@@ -609,8 +817,15 @@ static void mptcp_clean_una(struct sock *sk)
sk_mem_reclaim_partial(sk); sk_mem_reclaim_partial(sk);
/* Only wake up writers if a subflow is ready */ /* Only wake up writers if a subflow is ready */
if (test_bit(MPTCP_SEND_SPACE, &msk->flags)) if (mptcp_is_writeable(msk)) {
set_bit(MPTCP_SEND_SPACE, &mptcp_sk(sk)->flags);
smp_mb__after_atomic();
/* set SEND_SPACE before sk_stream_write_space clears
* NOSPACE
*/
sk_stream_write_space(sk); sk_stream_write_space(sk);
}
} }
} }
...@@ -801,60 +1016,128 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, ...@@ -801,60 +1016,128 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
return ret; return ret;
} }
static void mptcp_nospace(struct mptcp_sock *msk, struct socket *sock) static void mptcp_nospace(struct mptcp_sock *msk)
{ {
struct mptcp_subflow_context *subflow;
clear_bit(MPTCP_SEND_SPACE, &msk->flags); clear_bit(MPTCP_SEND_SPACE, &msk->flags);
smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
/* enables sk->write_space() callbacks */ mptcp_for_each_subflow(msk, subflow) {
set_bit(SOCK_NOSPACE, &sock->flags); struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
struct socket *sock = READ_ONCE(ssk->sk_socket);
/* enables ssk->write_space() callbacks */
if (sock)
set_bit(SOCK_NOSPACE, &sock->flags);
}
} }
static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk) static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow)
{ {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
/* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */
if (subflow->request_join && !subflow->fully_established)
return false;
/* only send if our side has not closed yet */
return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT));
}
#define MPTCP_SEND_BURST_SIZE ((1 << 16) - \
sizeof(struct tcphdr) - \
MAX_TCP_OPTION_SPACE - \
sizeof(struct ipv6hdr) - \
sizeof(struct frag_hdr))
struct subflow_send_info {
struct sock *ssk;
u64 ratio;
};
static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
u32 *sndbuf)
{
struct subflow_send_info send_info[2];
struct mptcp_subflow_context *subflow; struct mptcp_subflow_context *subflow;
struct sock *backup = NULL; int i, nr_active = 0;
struct sock *ssk;
u64 ratio;
u32 pace;
sock_owned_by_me((const struct sock *)msk); sock_owned_by_me((struct sock *)msk);
*sndbuf = 0;
if (!mptcp_ext_cache_refill(msk)) if (!mptcp_ext_cache_refill(msk))
return NULL; return NULL;
if (__mptcp_check_fallback(msk)) {
if (!msk->first)
return NULL;
*sndbuf = msk->first->sk_sndbuf;
return sk_stream_memory_free(msk->first) ? msk->first : NULL;
}
/* re-use last subflow, if the burst allow that */
if (msk->last_snd && msk->snd_burst > 0 &&
sk_stream_memory_free(msk->last_snd) &&
mptcp_subflow_active(mptcp_subflow_ctx(msk->last_snd))) {
mptcp_for_each_subflow(msk, subflow) {
ssk = mptcp_subflow_tcp_sock(subflow);
*sndbuf = max(tcp_sk(ssk)->snd_wnd, *sndbuf);
}
return msk->last_snd;
}
/* pick the subflow with the lower wmem/wspace ratio */
for (i = 0; i < 2; ++i) {
send_info[i].ssk = NULL;
send_info[i].ratio = -1;
}
mptcp_for_each_subflow(msk, subflow) { mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow); ssk = mptcp_subflow_tcp_sock(subflow);
if (!mptcp_subflow_active(subflow))
continue;
if (!sk_stream_memory_free(ssk)) { nr_active += !subflow->backup;
struct socket *sock = ssk->sk_socket; *sndbuf = max(tcp_sk(ssk)->snd_wnd, *sndbuf);
if (!sk_stream_memory_free(subflow->tcp_sock))
continue;
if (sock) pace = READ_ONCE(ssk->sk_pacing_rate);
mptcp_nospace(msk, sock); if (!pace)
continue;
return NULL; ratio = div_u64((u64)READ_ONCE(ssk->sk_wmem_queued) << 32,
pace);
if (ratio < send_info[subflow->backup].ratio) {
send_info[subflow->backup].ssk = ssk;
send_info[subflow->backup].ratio = ratio;
} }
}
if (subflow->backup) { pr_debug("msk=%p nr_active=%d ssk=%p:%lld backup=%p:%lld",
if (!backup) msk, nr_active, send_info[0].ssk, send_info[0].ratio,
backup = ssk; send_info[1].ssk, send_info[1].ratio);
continue; /* pick the best backup if no other subflow is active */
} if (!nr_active)
send_info[0].ssk = send_info[1].ssk;
return ssk; if (send_info[0].ssk) {
msk->last_snd = send_info[0].ssk;
msk->snd_burst = min_t(int, MPTCP_SEND_BURST_SIZE,
sk_stream_wspace(msk->last_snd));
return msk->last_snd;
} }
return NULL;
return backup;
} }
static void ssk_check_wmem(struct mptcp_sock *msk, struct sock *ssk) static void ssk_check_wmem(struct mptcp_sock *msk)
{ {
struct socket *sock; if (unlikely(!mptcp_is_writeable(msk)))
mptcp_nospace(msk);
if (likely(sk_stream_is_writeable(ssk)))
return;
sock = READ_ONCE(ssk->sk_socket);
if (sock)
mptcp_nospace(msk, sock);
} }
static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
...@@ -864,6 +1147,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -864,6 +1147,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
struct page_frag *pfrag; struct page_frag *pfrag;
size_t copied = 0; size_t copied = 0;
struct sock *ssk; struct sock *ssk;
u32 sndbuf;
bool tx_ok; bool tx_ok;
long timeo; long timeo;
...@@ -890,7 +1174,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -890,7 +1174,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
} }
__mptcp_flush_join_list(msk); __mptcp_flush_join_list(msk);
ssk = mptcp_subflow_get_send(msk); ssk = mptcp_subflow_get_send(msk, &sndbuf);
while (!sk_stream_memory_free(sk) || while (!sk_stream_memory_free(sk) ||
!ssk || !ssk ||
!mptcp_page_frag_refill(ssk, pfrag)) { !mptcp_page_frag_refill(ssk, pfrag)) {
...@@ -907,19 +1191,25 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -907,19 +1191,25 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
mptcp_reset_timer(sk); mptcp_reset_timer(sk);
} }
mptcp_nospace(msk);
ret = sk_stream_wait_memory(sk, &timeo); ret = sk_stream_wait_memory(sk, &timeo);
if (ret) if (ret)
goto out; goto out;
mptcp_clean_una(sk); mptcp_clean_una(sk);
ssk = mptcp_subflow_get_send(msk); ssk = mptcp_subflow_get_send(msk, &sndbuf);
if (list_empty(&msk->conn_list)) { if (list_empty(&msk->conn_list)) {
ret = -ENOTCONN; ret = -ENOTCONN;
goto out; goto out;
} }
} }
/* do auto tuning */
if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
sndbuf > READ_ONCE(sk->sk_sndbuf))
WRITE_ONCE(sk->sk_sndbuf, sndbuf);
pr_debug("conn_list->subflow=%p", ssk); pr_debug("conn_list->subflow=%p", ssk);
lock_sock(ssk); lock_sock(ssk);
...@@ -936,6 +1226,10 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -936,6 +1226,10 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
break; break;
} }
/* burst can be negative, we will try move to the next subflow
* at selection time, if possible.
*/
msk->snd_burst -= ret;
copied += ret; copied += ret;
tx_ok = msg_data_left(msg); tx_ok = msg_data_left(msg);
...@@ -945,7 +1239,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -945,7 +1239,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
if (!sk_stream_memory_free(ssk) || if (!sk_stream_memory_free(ssk) ||
!mptcp_page_frag_refill(ssk, pfrag) || !mptcp_page_frag_refill(ssk, pfrag) ||
!mptcp_ext_cache_refill(msk)) { !mptcp_ext_cache_refill(msk)) {
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
tcp_push(ssk, msg->msg_flags, mss_now, tcp_push(ssk, msg->msg_flags, mss_now,
tcp_sk(ssk)->nonagle, size_goal); tcp_sk(ssk)->nonagle, size_goal);
mptcp_set_timeout(sk, ssk); mptcp_set_timeout(sk, ssk);
...@@ -993,9 +1286,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -993,9 +1286,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
mptcp_reset_timer(sk); mptcp_reset_timer(sk);
} }
ssk_check_wmem(msk, ssk);
release_sock(ssk); release_sock(ssk);
out: out:
ssk_check_wmem(msk);
release_sock(sk); release_sock(sk);
return copied ? : ret; return copied ? : ret;
} }
...@@ -1133,10 +1426,14 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied) ...@@ -1133,10 +1426,14 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
*/ */
mptcp_for_each_subflow(msk, subflow) { mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk; struct sock *ssk;
bool slow;
ssk = mptcp_subflow_tcp_sock(subflow); ssk = mptcp_subflow_tcp_sock(subflow);
slow = lock_sock_fast(ssk);
WRITE_ONCE(ssk->sk_rcvbuf, rcvbuf); WRITE_ONCE(ssk->sk_rcvbuf, rcvbuf);
tcp_sk(ssk)->window_clamp = window_clamp; tcp_sk(ssk)->window_clamp = window_clamp;
tcp_cleanup_rbuf(ssk, 1);
unlock_sock_fast(ssk, slow);
} }
} }
} }
...@@ -1152,6 +1449,11 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk) ...@@ -1152,6 +1449,11 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk)
unsigned int moved = 0; unsigned int moved = 0;
bool done; bool done;
/* avoid looping forever below on racing close */
if (((struct sock *)msk)->sk_state == TCP_CLOSE)
return false;
__mptcp_flush_join_list(msk);
do { do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk); struct sock *ssk = mptcp_subflow_recv_lookup(msk);
...@@ -1163,7 +1465,11 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk) ...@@ -1163,7 +1465,11 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk)
release_sock(ssk); release_sock(ssk);
} while (!done); } while (!done);
return moved > 0; if (mptcp_ofo_queue(msk) || moved > 0) {
mptcp_check_data_fin((struct sock *)msk);
return true;
}
return false;
} }
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
...@@ -1259,6 +1565,9 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, ...@@ -1259,6 +1565,9 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
set_bit(MPTCP_DATA_READY, &msk->flags); set_bit(MPTCP_DATA_READY, &msk->flags);
} }
out_err: out_err:
pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d",
msk, test_bit(MPTCP_DATA_READY, &msk->flags),
skb_queue_empty(&sk->sk_receive_queue), copied);
mptcp_rcv_space_adjust(msk, copied); mptcp_rcv_space_adjust(msk, copied);
release_sock(sk); release_sock(sk);
...@@ -1309,9 +1618,15 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) ...@@ -1309,9 +1618,15 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk)
sock_owned_by_me((const struct sock *)msk); sock_owned_by_me((const struct sock *)msk);
if (__mptcp_check_fallback(msk))
return msk->first;
mptcp_for_each_subflow(msk, subflow) { mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow); struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
if (!mptcp_subflow_active(subflow))
continue;
/* still data outstanding at TCP level? Don't retransmit. */ /* still data outstanding at TCP level? Don't retransmit. */
if (!tcp_write_queue_empty(ssk)) if (!tcp_write_queue_empty(ssk))
return NULL; return NULL;
...@@ -1472,6 +1787,7 @@ static int __mptcp_init_sock(struct sock *sk) ...@@ -1472,6 +1787,7 @@ static int __mptcp_init_sock(struct sock *sk)
INIT_LIST_HEAD(&msk->rtx_queue); INIT_LIST_HEAD(&msk->rtx_queue);
__set_bit(MPTCP_SEND_SPACE, &msk->flags); __set_bit(MPTCP_SEND_SPACE, &msk->flags);
INIT_WORK(&msk->work, mptcp_worker); INIT_WORK(&msk->work, mptcp_worker);
msk->out_of_order_queue = RB_ROOT;
msk->first = NULL; msk->first = NULL;
inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss; inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss;
...@@ -1505,7 +1821,7 @@ static int mptcp_init_sock(struct sock *sk) ...@@ -1505,7 +1821,7 @@ static int mptcp_init_sock(struct sock *sk)
sk_sockets_allocated_inc(sk); sk_sockets_allocated_inc(sk);
sk->sk_rcvbuf = sock_net(sk)->ipv4.sysctl_tcp_rmem[1]; sk->sk_rcvbuf = sock_net(sk)->ipv4.sysctl_tcp_rmem[1];
sk->sk_sndbuf = sock_net(sk)->ipv4.sysctl_tcp_wmem[2]; sk->sk_sndbuf = sock_net(sk)->ipv4.sysctl_tcp_wmem[1];
return 0; return 0;
} }
...@@ -1811,6 +2127,7 @@ static void mptcp_destroy(struct sock *sk) ...@@ -1811,6 +2127,7 @@ static void mptcp_destroy(struct sock *sk)
{ {
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
skb_rbtree_purge(&msk->out_of_order_queue);
mptcp_token_destroy(msk); mptcp_token_destroy(msk);
if (msk->cached_ext) if (msk->cached_ext)
__skb_ext_put(msk->cached_ext); __skb_ext_put(msk->cached_ext);
...@@ -2286,13 +2603,13 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, ...@@ -2286,13 +2603,13 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock,
sock_poll_wait(file, sock, wait); sock_poll_wait(file, sock, wait);
state = inet_sk_state_load(sk); state = inet_sk_state_load(sk);
pr_debug("msk=%p state=%d flags=%lx", msk, state, msk->flags);
if (state == TCP_LISTEN) if (state == TCP_LISTEN)
return mptcp_check_readable(msk); return mptcp_check_readable(msk);
if (state != TCP_SYN_SENT && state != TCP_SYN_RECV) { if (state != TCP_SYN_SENT && state != TCP_SYN_RECV) {
mask |= mptcp_check_readable(msk); mask |= mptcp_check_readable(msk);
if (sk_stream_is_writeable(sk) && if (test_bit(MPTCP_SEND_SPACE, &msk->flags))
test_bit(MPTCP_SEND_SPACE, &msk->flags))
mask |= EPOLLOUT | EPOLLWRNORM; mask |= EPOLLOUT | EPOLLWRNORM;
} }
if (sk->sk_shutdown & RCV_SHUTDOWN) if (sk->sk_shutdown & RCV_SHUTDOWN)
......
...@@ -140,6 +140,8 @@ struct mptcp_addr_info { ...@@ -140,6 +140,8 @@ struct mptcp_addr_info {
sa_family_t family; sa_family_t family;
__be16 port; __be16 port;
u8 id; u8 id;
u8 flags;
int ifindex;
union { union {
struct in_addr addr; struct in_addr addr;
#if IS_ENABLED(CONFIG_MPTCP_IPV6) #if IS_ENABLED(CONFIG_MPTCP_IPV6)
...@@ -194,6 +196,8 @@ struct mptcp_sock { ...@@ -194,6 +196,8 @@ struct mptcp_sock {
u64 write_seq; u64 write_seq;
u64 ack_seq; u64 ack_seq;
u64 rcv_data_fin_seq; u64 rcv_data_fin_seq;
struct sock *last_snd;
int snd_burst;
atomic64_t snd_una; atomic64_t snd_una;
unsigned long timer_ival; unsigned long timer_ival;
u32 token; u32 token;
...@@ -204,6 +208,8 @@ struct mptcp_sock { ...@@ -204,6 +208,8 @@ struct mptcp_sock {
bool snd_data_fin_enable; bool snd_data_fin_enable;
spinlock_t join_list_lock; spinlock_t join_list_lock;
struct work_struct work; struct work_struct work;
struct sk_buff *ooo_last_skb;
struct rb_root out_of_order_queue;
struct list_head conn_list; struct list_head conn_list;
struct list_head rtx_queue; struct list_head rtx_queue;
struct list_head join_list; struct list_head join_list;
...@@ -268,6 +274,12 @@ mptcp_subflow_rsk(const struct request_sock *rsk) ...@@ -268,6 +274,12 @@ mptcp_subflow_rsk(const struct request_sock *rsk)
return (struct mptcp_subflow_request_sock *)rsk; return (struct mptcp_subflow_request_sock *)rsk;
} }
enum mptcp_data_avail {
MPTCP_SUBFLOW_NODATA,
MPTCP_SUBFLOW_DATA_AVAIL,
MPTCP_SUBFLOW_OOO_DATA
};
/* MPTCP subflow context */ /* MPTCP subflow context */
struct mptcp_subflow_context { struct mptcp_subflow_context {
struct list_head node;/* conn_list of subflows */ struct list_head node;/* conn_list of subflows */
...@@ -292,10 +304,10 @@ struct mptcp_subflow_context { ...@@ -292,10 +304,10 @@ struct mptcp_subflow_context {
map_valid : 1, map_valid : 1,
mpc_map : 1, mpc_map : 1,
backup : 1, backup : 1,
data_avail : 1,
rx_eof : 1, rx_eof : 1,
use_64bit_ack : 1, /* Set when we received a 64-bit DSN */ use_64bit_ack : 1, /* Set when we received a 64-bit DSN */
can_ack : 1; /* only after processing the remote a key */ can_ack : 1; /* only after processing the remote a key */
enum mptcp_data_avail data_avail;
u32 remote_nonce; u32 remote_nonce;
u64 thmac; u64 thmac;
u32 local_nonce; u32 local_nonce;
...@@ -350,8 +362,7 @@ bool mptcp_subflow_data_available(struct sock *sk); ...@@ -350,8 +362,7 @@ bool mptcp_subflow_data_available(struct sock *sk);
void __init mptcp_subflow_init(void); void __init mptcp_subflow_init(void);
/* called with sk socket lock held */ /* called with sk socket lock held */
int __mptcp_subflow_connect(struct sock *sk, int ifindex, int __mptcp_subflow_connect(struct sock *sk, const struct mptcp_addr_info *loc,
const struct mptcp_addr_info *loc,
const struct mptcp_addr_info *remote); const struct mptcp_addr_info *remote);
int mptcp_subflow_create_socket(struct sock *sk, struct socket **new_sock); int mptcp_subflow_create_socket(struct sock *sk, struct socket **new_sock);
...@@ -464,12 +475,12 @@ static inline bool before64(__u64 seq1, __u64 seq2) ...@@ -464,12 +475,12 @@ static inline bool before64(__u64 seq1, __u64 seq2)
void mptcp_diag_subflow_init(struct tcp_ulp_ops *ops); void mptcp_diag_subflow_init(struct tcp_ulp_ops *ops);
static inline bool __mptcp_check_fallback(struct mptcp_sock *msk) static inline bool __mptcp_check_fallback(const struct mptcp_sock *msk)
{ {
return test_bit(MPTCP_FALLBACK_DONE, &msk->flags); return test_bit(MPTCP_FALLBACK_DONE, &msk->flags);
} }
static inline bool mptcp_check_fallback(struct sock *sk) static inline bool mptcp_check_fallback(const struct sock *sk)
{ {
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
struct mptcp_sock *msk = mptcp_sk(subflow->conn); struct mptcp_sock *msk = mptcp_sk(subflow->conn);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <net/ip6_route.h> #include <net/ip6_route.h>
#endif #endif
#include <net/mptcp.h> #include <net/mptcp.h>
#include <uapi/linux/mptcp.h>
#include "protocol.h" #include "protocol.h"
#include "mib.h" #include "mib.h"
...@@ -434,6 +435,7 @@ static void mptcp_sock_destruct(struct sock *sk) ...@@ -434,6 +435,7 @@ static void mptcp_sock_destruct(struct sock *sk)
sock_orphan(sk); sock_orphan(sk);
} }
skb_rbtree_purge(&mptcp_sk(sk)->out_of_order_queue);
mptcp_token_destroy(mptcp_sk(sk)); mptcp_token_destroy(mptcp_sk(sk));
inet_sock_destruct(sk); inet_sock_destruct(sk);
} }
...@@ -804,16 +806,25 @@ static enum mapping_status get_mapping_status(struct sock *ssk, ...@@ -804,16 +806,25 @@ static enum mapping_status get_mapping_status(struct sock *ssk,
return MAPPING_OK; return MAPPING_OK;
} }
static int subflow_read_actor(read_descriptor_t *desc, static void mptcp_subflow_discard_data(struct sock *ssk, struct sk_buff *skb,
struct sk_buff *skb, unsigned int limit)
unsigned int offset, size_t len)
{ {
size_t copy_len = min(desc->count, len); struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
bool fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
desc->count -= copy_len; u32 incr;
pr_debug("flushed %zu bytes, %zu left", copy_len, desc->count); incr = limit >= skb->len ? skb->len + fin : limit;
return copy_len;
pr_debug("discarding=%d len=%d seq=%d", incr, skb->len,
subflow->map_subflow_seq);
MPTCP_INC_STATS(sock_net(ssk), MPTCP_MIB_DUPDATA);
tcp_sk(ssk)->copied_seq += incr;
if (!before(tcp_sk(ssk)->copied_seq, TCP_SKB_CB(skb)->end_seq))
sk_eat_skb(ssk, skb);
if (mptcp_subflow_get_map_offset(subflow) >= subflow->map_data_len)
subflow->map_valid = 0;
if (incr)
tcp_cleanup_rbuf(ssk, incr);
} }
static bool subflow_check_data_avail(struct sock *ssk) static bool subflow_check_data_avail(struct sock *ssk)
...@@ -825,13 +836,13 @@ static bool subflow_check_data_avail(struct sock *ssk) ...@@ -825,13 +836,13 @@ static bool subflow_check_data_avail(struct sock *ssk)
pr_debug("msk=%p ssk=%p data_avail=%d skb=%p", subflow->conn, ssk, pr_debug("msk=%p ssk=%p data_avail=%d skb=%p", subflow->conn, ssk,
subflow->data_avail, skb_peek(&ssk->sk_receive_queue)); subflow->data_avail, skb_peek(&ssk->sk_receive_queue));
if (!skb_peek(&ssk->sk_receive_queue))
subflow->data_avail = 0;
if (subflow->data_avail) if (subflow->data_avail)
return true; return true;
msk = mptcp_sk(subflow->conn); msk = mptcp_sk(subflow->conn);
for (;;) { for (;;) {
u32 map_remaining;
size_t delta;
u64 ack_seq; u64 ack_seq;
u64 old_ack; u64 old_ack;
...@@ -849,6 +860,7 @@ static bool subflow_check_data_avail(struct sock *ssk) ...@@ -849,6 +860,7 @@ static bool subflow_check_data_avail(struct sock *ssk)
subflow->map_data_len = skb->len; subflow->map_data_len = skb->len;
subflow->map_subflow_seq = tcp_sk(ssk)->copied_seq - subflow->map_subflow_seq = tcp_sk(ssk)->copied_seq -
subflow->ssn_offset; subflow->ssn_offset;
subflow->data_avail = MPTCP_SUBFLOW_DATA_AVAIL;
return true; return true;
} }
...@@ -876,42 +888,18 @@ static bool subflow_check_data_avail(struct sock *ssk) ...@@ -876,42 +888,18 @@ static bool subflow_check_data_avail(struct sock *ssk)
ack_seq = mptcp_subflow_get_mapped_dsn(subflow); ack_seq = mptcp_subflow_get_mapped_dsn(subflow);
pr_debug("msk ack_seq=%llx subflow ack_seq=%llx", old_ack, pr_debug("msk ack_seq=%llx subflow ack_seq=%llx", old_ack,
ack_seq); ack_seq);
if (ack_seq == old_ack) if (ack_seq == old_ack) {
subflow->data_avail = MPTCP_SUBFLOW_DATA_AVAIL;
break; break;
} else if (after64(ack_seq, old_ack)) {
subflow->data_avail = MPTCP_SUBFLOW_OOO_DATA;
break;
}
/* only accept in-sequence mapping. Old values are spurious /* only accept in-sequence mapping. Old values are spurious
* retransmission; we can hit "future" values on active backup * retransmission
* subflow switch, we relay on retransmissions to get
* in-sequence data.
* Cuncurrent subflows support will require subflow data
* reordering
*/ */
map_remaining = subflow->map_data_len - mptcp_subflow_discard_data(ssk, skb, old_ack - ack_seq);
mptcp_subflow_get_map_offset(subflow);
if (before64(ack_seq, old_ack))
delta = min_t(size_t, old_ack - ack_seq, map_remaining);
else
delta = min_t(size_t, ack_seq - old_ack, map_remaining);
/* discard mapped data */
pr_debug("discarding %zu bytes, current map len=%d", delta,
map_remaining);
if (delta) {
read_descriptor_t desc = {
.count = delta,
};
int ret;
ret = tcp_read_sock(ssk, &desc, subflow_read_actor);
if (ret < 0) {
ssk->sk_err = -ret;
goto fatal;
}
if (ret < delta)
return false;
if (delta == map_remaining)
subflow->map_valid = 0;
}
} }
return true; return true;
...@@ -922,13 +910,13 @@ static bool subflow_check_data_avail(struct sock *ssk) ...@@ -922,13 +910,13 @@ static bool subflow_check_data_avail(struct sock *ssk)
ssk->sk_error_report(ssk); ssk->sk_error_report(ssk);
tcp_set_state(ssk, TCP_CLOSE); tcp_set_state(ssk, TCP_CLOSE);
tcp_send_active_reset(ssk, GFP_ATOMIC); tcp_send_active_reset(ssk, GFP_ATOMIC);
subflow->data_avail = 0;
return false; return false;
} }
bool mptcp_subflow_data_available(struct sock *sk) bool mptcp_subflow_data_available(struct sock *sk)
{ {
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
struct sk_buff *skb;
/* check if current mapping is still valid */ /* check if current mapping is still valid */
if (subflow->map_valid && if (subflow->map_valid &&
...@@ -941,15 +929,7 @@ bool mptcp_subflow_data_available(struct sock *sk) ...@@ -941,15 +929,7 @@ bool mptcp_subflow_data_available(struct sock *sk)
subflow->map_data_len); subflow->map_data_len);
} }
if (!subflow_check_data_avail(sk)) { return subflow_check_data_avail(sk);
subflow->data_avail = 0;
return false;
}
skb = skb_peek(&sk->sk_receive_queue);
subflow->data_avail = skb &&
before(tcp_sk(sk)->copied_seq, TCP_SKB_CB(skb)->end_seq);
return subflow->data_avail;
} }
/* If ssk has an mptcp parent socket, use the mptcp rcvbuf occupancy, /* If ssk has an mptcp parent socket, use the mptcp rcvbuf occupancy,
...@@ -996,8 +976,10 @@ static void subflow_write_space(struct sock *sk) ...@@ -996,8 +976,10 @@ static void subflow_write_space(struct sock *sk)
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
struct sock *parent = subflow->conn; struct sock *parent = subflow->conn;
sk_stream_write_space(sk); if (!sk_stream_is_writeable(sk))
if (sk_stream_is_writeable(sk)) { return;
if (sk_stream_is_writeable(parent)) {
set_bit(MPTCP_SEND_SPACE, &mptcp_sk(parent)->flags); set_bit(MPTCP_SEND_SPACE, &mptcp_sk(parent)->flags);
smp_mb__after_atomic(); smp_mb__after_atomic();
/* set SEND_SPACE before sk_stream_write_space clears NOSPACE */ /* set SEND_SPACE before sk_stream_write_space clears NOSPACE */
...@@ -1056,8 +1038,7 @@ static void mptcp_info2sockaddr(const struct mptcp_addr_info *info, ...@@ -1056,8 +1038,7 @@ static void mptcp_info2sockaddr(const struct mptcp_addr_info *info,
#endif #endif
} }
int __mptcp_subflow_connect(struct sock *sk, int ifindex, int __mptcp_subflow_connect(struct sock *sk, const struct mptcp_addr_info *loc,
const struct mptcp_addr_info *loc,
const struct mptcp_addr_info *remote) const struct mptcp_addr_info *remote)
{ {
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
...@@ -1101,7 +1082,7 @@ int __mptcp_subflow_connect(struct sock *sk, int ifindex, ...@@ -1101,7 +1082,7 @@ int __mptcp_subflow_connect(struct sock *sk, int ifindex,
if (loc->family == AF_INET6) if (loc->family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6); addrlen = sizeof(struct sockaddr_in6);
#endif #endif
ssk->sk_bound_dev_if = ifindex; ssk->sk_bound_dev_if = loc->ifindex;
err = kernel_bind(sf, (struct sockaddr *)&addr, addrlen); err = kernel_bind(sf, (struct sockaddr *)&addr, addrlen);
if (err) if (err)
goto failed; goto failed;
...@@ -1112,7 +1093,7 @@ int __mptcp_subflow_connect(struct sock *sk, int ifindex, ...@@ -1112,7 +1093,7 @@ int __mptcp_subflow_connect(struct sock *sk, int ifindex,
subflow->remote_token = remote_token; subflow->remote_token = remote_token;
subflow->local_id = local_id; subflow->local_id = local_id;
subflow->request_join = 1; subflow->request_join = 1;
subflow->request_bkup = 1; subflow->request_bkup = !!(loc->flags & MPTCP_PM_ADDR_FLAG_BACKUP);
mptcp_info2sockaddr(remote, &addr); mptcp_info2sockaddr(remote, &addr);
err = kernel_connect(sf, (struct sockaddr *)&addr, addrlen, O_NONBLOCK); err = kernel_connect(sf, (struct sockaddr *)&addr, addrlen, O_NONBLOCK);
......
...@@ -5,7 +5,8 @@ KSFT_KHDR_INSTALL := 1 ...@@ -5,7 +5,8 @@ KSFT_KHDR_INSTALL := 1
CFLAGS = -Wall -Wl,--no-as-needed -O2 -g -I$(top_srcdir)/usr/include CFLAGS = -Wall -Wl,--no-as-needed -O2 -g -I$(top_srcdir)/usr/include
TEST_PROGS := mptcp_connect.sh pm_netlink.sh mptcp_join.sh diag.sh TEST_PROGS := mptcp_connect.sh pm_netlink.sh mptcp_join.sh diag.sh \
simult_flows.sh
TEST_GEN_FILES = mptcp_connect pm_nl_ctl TEST_GEN_FILES = mptcp_connect pm_nl_ctl
......
#!/bin/bash
# SPDX-License-Identifier: GPL-2.0
rndh=$(printf %x $sec)-$(mktemp -u XXXXXX)
ns1="ns1-$rndh"
ns2="ns2-$rndh"
ns3="ns3-$rndh"
capture=false
ksft_skip=4
timeout=30
test_cnt=1
ret=0
bail=0
usage() {
echo "Usage: $0 [ -b ] [ -c ] [ -d ]"
echo -e "\t-b: bail out after first error, otherwise runs al testcases"
echo -e "\t-c: capture packets for each test using tcpdump (default: no capture)"
echo -e "\t-d: debug this script"
}
cleanup()
{
rm -f "$cin" "$cout"
rm -f "$sin" "$sout"
rm -f "$capout"
local netns
for netns in "$ns1" "$ns2" "$ns3";do
ip netns del $netns
done
}
ip -Version > /dev/null 2>&1
if [ $? -ne 0 ];then
echo "SKIP: Could not run test without ip tool"
exit $ksft_skip
fi
# "$ns1" ns2 ns3
# ns1eth1 ns2eth1 ns2eth3 ns3eth1
# netem
# ns1eth2 ns2eth2
# netem
setup()
{
large=$(mktemp)
small=$(mktemp)
sout=$(mktemp)
cout=$(mktemp)
capout=$(mktemp)
size=$((2048 * 4096))
dd if=/dev/zero of=$small bs=4096 count=20 >/dev/null 2>&1
dd if=/dev/zero of=$large bs=4096 count=$((size / 4096)) >/dev/null 2>&1
trap cleanup EXIT
for i in "$ns1" "$ns2" "$ns3";do
ip netns add $i || exit $ksft_skip
ip -net $i link set lo up
done
ip link add ns1eth1 netns "$ns1" type veth peer name ns2eth1 netns "$ns2"
ip link add ns1eth2 netns "$ns1" type veth peer name ns2eth2 netns "$ns2"
ip link add ns2eth3 netns "$ns2" type veth peer name ns3eth1 netns "$ns3"
ip -net "$ns1" addr add 10.0.1.1/24 dev ns1eth1
ip -net "$ns1" addr add dead:beef:1::1/64 dev ns1eth1 nodad
ip -net "$ns1" link set ns1eth1 up mtu 1500
ip -net "$ns1" route add default via 10.0.1.2
ip -net "$ns1" route add default via dead:beef:1::2
ip -net "$ns1" addr add 10.0.2.1/24 dev ns1eth2
ip -net "$ns1" addr add dead:beef:2::1/64 dev ns1eth2 nodad
ip -net "$ns1" link set ns1eth2 up mtu 1500
ip -net "$ns1" route add default via 10.0.2.2 metric 101
ip -net "$ns1" route add default via dead:beef:2::2 metric 101
ip netns exec "$ns1" ./pm_nl_ctl limits 1 1
ip netns exec "$ns1" ./pm_nl_ctl add 10.0.2.1 dev ns1eth2 flags subflow
ip netns exec "$ns1" sysctl -q net.ipv4.conf.all.rp_filter=0
ip -net "$ns2" addr add 10.0.1.2/24 dev ns2eth1
ip -net "$ns2" addr add dead:beef:1::2/64 dev ns2eth1 nodad
ip -net "$ns2" link set ns2eth1 up mtu 1500
ip -net "$ns2" addr add 10.0.2.2/24 dev ns2eth2
ip -net "$ns2" addr add dead:beef:2::2/64 dev ns2eth2 nodad
ip -net "$ns2" link set ns2eth2 up mtu 1500
ip -net "$ns2" addr add 10.0.3.2/24 dev ns2eth3
ip -net "$ns2" addr add dead:beef:3::2/64 dev ns2eth3 nodad
ip -net "$ns2" link set ns2eth3 up mtu 1500
ip netns exec "$ns2" sysctl -q net.ipv4.ip_forward=1
ip netns exec "$ns2" sysctl -q net.ipv6.conf.all.forwarding=1
ip -net "$ns3" addr add 10.0.3.3/24 dev ns3eth1
ip -net "$ns3" addr add dead:beef:3::3/64 dev ns3eth1 nodad
ip -net "$ns3" link set ns3eth1 up mtu 1500
ip -net "$ns3" route add default via 10.0.3.2
ip -net "$ns3" route add default via dead:beef:3::2
ip netns exec "$ns3" ./pm_nl_ctl limits 1 1
}
# $1: ns, $2: port
wait_local_port_listen()
{
local listener_ns="${1}"
local port="${2}"
local port_hex i
port_hex="$(printf "%04X" "${port}")"
for i in $(seq 10); do
ip netns exec "${listener_ns}" cat /proc/net/tcp* | \
awk "BEGIN {rc=1} {if (\$2 ~ /:${port_hex}\$/ && \$4 ~ /0A/) {rc=0; exit}} END {exit rc}" &&
break
sleep 0.1
done
}
do_transfer()
{
local cin=$1
local sin=$2
local max_time=$3
local port
port=$((10000+$test_cnt))
test_cnt=$((test_cnt+1))
:> "$cout"
:> "$sout"
:> "$capout"
local addr_port
addr_port=$(printf "%s:%d" ${connect_addr} ${port})
if $capture; then
local capuser
if [ -z $SUDO_USER ] ; then
capuser=""
else
capuser="-Z $SUDO_USER"
fi
local capfile="${rndh}-${port}"
local capopt="-i any -s 65535 -B 32768 ${capuser}"
ip netns exec ${ns3} tcpdump ${capopt} -w "${capfile}-listener.pcap" >> "${capout}" 2>&1 &
local cappid_listener=$!
ip netns exec ${ns1} tcpdump ${capopt} -w "${capfile}-connector.pcap" >> "${capout}" 2>&1 &
local cappid_connector=$!
sleep 1
fi
ip netns exec ${ns3} ./mptcp_connect -jt $timeout -l -p $port 0.0.0.0 < "$sin" > "$sout" &
local spid=$!
wait_local_port_listen "${ns3}" "${port}"
local start
start=$(date +%s%3N)
ip netns exec ${ns1} ./mptcp_connect -jt $timeout -p $port 10.0.3.3 < "$cin" > "$cout" &
local cpid=$!
wait $cpid
local retc=$?
wait $spid
local rets=$?
local stop
stop=$(date +%s%3N)
if $capture; then
sleep 1
kill ${cappid_listener}
kill ${cappid_connector}
fi
local duration
duration=$((stop-start))
cmp $sin $cout > /dev/null 2>&1
local cmps=$?
cmp $cin $sout > /dev/null 2>&1
local cmpc=$?
printf "%16s" "$duration max $max_time "
if [ $retc -eq 0 ] && [ $rets -eq 0 ] && \
[ $cmpc -eq 0 ] && [ $cmps -eq 0 ] && \
[ $duration -lt $max_time ]; then
echo "[ OK ]"
cat "$capout"
return 0
fi
echo " [ fail ]"
echo "client exit code $retc, server $rets" 1>&2
echo "\nnetns ${ns3} socket stat for $port:" 1>&2
ip netns exec ${ns3} ss -nita 1>&2 -o "sport = :$port"
echo "\nnetns ${ns1} socket stat for $port:" 1>&2
ip netns exec ${ns1} ss -nita 1>&2 -o "dport = :$port"
ls -l $sin $cout
ls -l $cin $sout
cat "$capout"
return 1
}
run_test()
{
local rate1=$1
local rate2=$2
local delay1=$3
local delay2=$4
local lret
local dev
shift 4
local msg=$*
[ $delay1 -gt 0 ] && delay1="delay $delay1" || delay1=""
[ $delay2 -gt 0 ] && delay2="delay $delay2" || delay2=""
for dev in ns1eth1 ns1eth2; do
tc -n $ns1 qdisc del dev $dev root >/dev/null 2>&1
done
for dev in ns2eth1 ns2eth2; do
tc -n $ns2 qdisc del dev $dev root >/dev/null 2>&1
done
tc -n $ns1 qdisc add dev ns1eth1 root netem rate ${rate1}mbit $delay1
tc -n $ns1 qdisc add dev ns1eth2 root netem rate ${rate2}mbit $delay2
tc -n $ns2 qdisc add dev ns2eth1 root netem rate ${rate1}mbit $delay1
tc -n $ns2 qdisc add dev ns2eth2 root netem rate ${rate2}mbit $delay2
# time is measure in ms
local time=$((size * 8 * 1000 / (( $rate1 + $rate2) * 1024 *1024) ))
# mptcp_connect will do some sleeps to allow the mp_join handshake
# completion
time=$((time + 1350))
printf "%-50s" "$msg"
do_transfer $small $large $((time * 11 / 10))
lret=$?
if [ $lret -ne 0 ]; then
ret=$lret
[ $bail -eq 0 ] || exit $ret
fi
printf "%-50s" "$msg - reverse direction"
do_transfer $large $small $((time * 11 / 10))
lret=$?
if [ $lret -ne 0 ]; then
ret=$lret
[ $bail -eq 0 ] || exit $ret
fi
}
while getopts "bcdh" option;do
case "$option" in
"h")
usage $0
exit 0
;;
"b")
bail=1
;;
"c")
capture=true
;;
"d")
set -x
;;
"?")
usage $0
exit 1
;;
esac
done
setup
run_test 10 10 0 0 "balanced bwidth"
run_test 10 10 1 50 "balanced bwidth with unbalanced delay"
# we still need some additional infrastructure to pass the following test-cases
# run_test 30 10 0 0 "unbalanced bwidth"
# run_test 30 10 1 50 "unbalanced bwidth with unbalanced delay"
# run_test 30 10 50 1 "unbalanced bwidth with opposed, unbalanced delay"
exit $ret
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment