Commit 17b8a74f authored by David S. Miller's avatar David S. Miller
parents 0e15df49 dff10e9e
...@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void) ...@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
return bcl->fsm_msg_cnt; return bcl->fsm_msg_cnt;
} }
/** static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
* bclink_set_gap - set gap according to contents of current deferred pkt queue
*
* Called with 'node' locked, bc_lock unlocked
*/
static void bclink_set_gap(struct tipc_node *n_ptr)
{ {
struct sk_buff *buf = n_ptr->bclink.deferred_head; node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
seqno : node->bclink.last_sent;
n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
mod(n_ptr->bclink.last_in);
if (unlikely(buf != NULL))
n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
} }
/**
* bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
*
* This mechanism endeavours to prevent all nodes in network from trying
* to ACK or NACK at the same time.
*
* Note: TIPC uses a different trigger to distribute ACKs than it does to
* distribute NACKs, but tries to use the same spacing (divide by 16).
*/
static int bclink_ack_allowed(u32 n) /*
{
return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
}
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission * tipc_bclink_retransmit_to - get most recent node to request retransmission
* *
* Called with bc_lock locked * Called with bc_lock locked
...@@ -300,140 +275,94 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) ...@@ -300,140 +275,94 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
} }
/** /*
* bclink_send_ack - unicast an ACK msg * tipc_bclink_update_link_state - update broadcast link state
* *
* tipc_net_lock and node lock set * tipc_net_lock and node lock set
*/ */
static void bclink_send_ack(struct tipc_node *n_ptr) void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{ {
struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; struct sk_buff *buf;
if (l_ptr != NULL) /* Ignore "stale" link state info */
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
/** if (less_eq(last_sent, n_ptr->bclink.last_in))
* bclink_send_nack- broadcast a NACK msg return;
*
* tipc_net_lock and node lock set
*/
static void bclink_send_nack(struct tipc_node *n_ptr) /* Update link synchronization state; quit if in sync */
{
struct sk_buff *buf; bclink_update_last_sent(n_ptr, last_sent);
struct tipc_msg *msg;
if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;
/* Update out-of-sync state; quit if loss is still unconfirmed */
if ((++n_ptr->bclink.oos_state) == 1) {
if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
return;
n_ptr->bclink.oos_state++;
}
if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) /* Don't NACK if one has been recently sent (or seen) */
if (n_ptr->bclink.oos_state & 0x1)
return; return;
/* Send NACK */
buf = tipc_buf_acquire(INT_H_SIZE); buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) { if (buf) {
msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr); INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1); msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id); msg_set_mc_netid(msg, tipc_net_id);
msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
msg_set_bcast_tag(msg, tipc_own_tag); ? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);
spin_lock_bh(&bc_lock);
tipc_bearer_send(&bcbearer->bearer, buf, NULL); tipc_bearer_send(&bcbearer->bearer, buf, NULL);
bcl->stats.sent_nacks++; bcl->stats.sent_nacks++;
spin_unlock_bh(&bc_lock);
buf_discard(buf); buf_discard(buf);
/* n_ptr->bclink.oos_state++;
* Ensure we doesn't send another NACK msg to the node
* until 16 more deferred messages arrive from it
* (i.e. helps prevent all nodes from NACK'ing at same time)
*/
n_ptr->bclink.nack_sync = tipc_own_tag;
} }
} }
/** /*
* tipc_bclink_check_gap - send a NACK if a sequence gap exists * bclink_peek_nack - monitor retransmission requests sent by other nodes
* *
* tipc_net_lock and node lock set * Delay any upcoming NACK by this node if another node has already
*/ * requested the first message this node is going to ask for.
void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
{
if (!n_ptr->bclink.supported ||
less_eq(last_sent, mod(n_ptr->bclink.last_in)))
return;
bclink_set_gap(n_ptr);
if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
n_ptr->bclink.gap_to = last_sent;
bclink_send_nack(n_ptr);
}
/**
* tipc_bclink_peek_nack - process a NACK msg meant for another node
* *
* Only tipc_net_lock set. * Only tipc_net_lock set.
*/ */
static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) static void bclink_peek_nack(struct tipc_msg *msg)
{ {
struct tipc_node *n_ptr = tipc_node_find(dest); struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
u32 my_after, my_to;
if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) if (unlikely(!n_ptr))
return; return;
tipc_node_lock(n_ptr); tipc_node_lock(n_ptr);
/*
* Modify gap to suppress unnecessary NACKs from this node
*/
my_after = n_ptr->bclink.gap_after;
my_to = n_ptr->bclink.gap_to;
if (less_eq(gap_after, my_after)) {
if (less(my_after, gap_to) && less(gap_to, my_to))
n_ptr->bclink.gap_after = gap_to;
else if (less_eq(my_to, gap_to))
n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
} else if (less_eq(gap_after, my_to)) {
if (less_eq(my_to, gap_to))
n_ptr->bclink.gap_to = gap_after;
} else {
/*
* Expand gap if missing bufs not in deferred queue:
*/
struct sk_buff *buf = n_ptr->bclink.deferred_head;
u32 prev = n_ptr->bclink.gap_to;
for (; buf; buf = buf->next) { if (n_ptr->bclink.supported &&
u32 seqno = buf_seqno(buf); (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;
if (mod(seqno - prev) != 1) {
buf = NULL;
break;
}
if (seqno == gap_after)
break;
prev = seqno;
}
if (buf == NULL)
n_ptr->bclink.gap_to = gap_after;
}
/*
* Some nodes may send a complementary NACK now:
*/
if (bclink_ack_allowed(sender_tag + 1)) {
if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
bclink_send_nack(n_ptr);
bclink_set_gap(n_ptr);
}
}
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
} }
/** /*
* tipc_bclink_send_msg - broadcast a packet to all nodes in cluster * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
*/ */
...@@ -460,7 +389,33 @@ int tipc_bclink_send_msg(struct sk_buff *buf) ...@@ -460,7 +389,33 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
return res; return res;
} }
/** /*
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bc_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
bclink_update_last_sent(node, seqno);
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;
bcl->stats.recv_info++;
/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_send_proto_msg(
node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
bcl->stats.sent_acks++;
}
}
/*
* tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
* *
* tipc_net_lock is read_locked, no other locks set * tipc_net_lock is read_locked, no other locks set
...@@ -472,7 +427,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -472,7 +427,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
struct tipc_node *node; struct tipc_node *node;
u32 next_in; u32 next_in;
u32 seqno; u32 seqno;
struct sk_buff *deferred; int deferred;
/* Screen out unwanted broadcast messages */ /* Screen out unwanted broadcast messages */
...@@ -487,6 +442,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -487,6 +442,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
if (unlikely(!node->bclink.supported)) if (unlikely(!node->bclink.supported))
goto unlock; goto unlock;
/* Handle broadcast protocol message */
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG) if (msg_type(msg) != STATE_MSG)
goto unlock; goto unlock;
...@@ -501,85 +458,114 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -501,85 +458,114 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
} else { } else {
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_bclink_peek_nack(msg_destnode(msg), bclink_peek_nack(msg);
msg_bcast_tag(msg),
msg_bcgap_after(msg),
msg_bcgap_to(msg));
} }
goto exit; goto exit;
} }
/* Handle in-sequence broadcast message */ /* Handle in-sequence broadcast message */
receive:
next_in = mod(node->bclink.last_in + 1);
seqno = msg_seqno(msg); seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) { if (likely(seqno == next_in)) {
bcl->stats.recv_info++; receive:
node->bclink.last_in++; /* Deliver message to destination */
bclink_set_gap(node);
if (unlikely(bclink_ack_allowed(seqno))) {
bclink_send_ack(node);
bcl->stats.sent_acks++;
}
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
if (likely(msg_mcast(msg))) if (likely(msg_mcast(msg)))
tipc_port_recv_mcast(buf, NULL); tipc_port_recv_mcast(buf, NULL);
else else
buf_discard(buf); buf_discard(buf);
} else if (msg_user(msg) == MSG_BUNDLER) { } else if (msg_user(msg) == MSG_BUNDLER) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++; bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg); bcl->stats.recv_bundled += msg_msgcnt(msg);
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_link_recv_bundle(buf); tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret = tipc_link_recv_fragment(&node->bclink.defragm,
&buf, &msg);
if (ret < 0)
goto unlock;
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++; bcl->stats.recv_fragments++;
if (tipc_link_recv_fragment(&node->bclink.defragm, if (ret > 0)
&buf, &msg))
bcl->stats.recv_fragmented++; bcl->stats.recv_fragmented++;
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_net_route_msg(buf); tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) { } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_named_recv(buf); tipc_named_recv(buf);
} else { } else {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
buf_discard(buf); buf_discard(buf);
} }
buf = NULL; buf = NULL;
/* Determine new synchronization state */
tipc_node_lock(node); tipc_node_lock(node);
deferred = node->bclink.deferred_head; if (unlikely(!tipc_node_is_up(node)))
if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { goto unlock;
buf = deferred;
msg = buf_msg(buf); if (node->bclink.last_in == node->bclink.last_sent)
node->bclink.deferred_head = deferred->next; goto unlock;
if (!node->bclink.deferred_head) {
node->bclink.oos_state = 1;
goto unlock;
}
msg = buf_msg(node->bclink.deferred_head);
seqno = msg_seqno(msg);
next_in = mod(next_in + 1);
if (seqno != next_in)
goto unlock;
/* Take in-sequence message from deferred queue & deliver it */
buf = node->bclink.deferred_head;
node->bclink.deferred_head = buf->next;
node->bclink.deferred_size--;
goto receive; goto receive;
} }
} else if (less(next_in, seqno)) {
u32 gap_after = node->bclink.gap_after;
u32 gap_to = node->bclink.gap_to;
if (tipc_link_defer_pkt(&node->bclink.deferred_head, /* Handle out-of-sequence broadcast message */
if (less(next_in, seqno)) {
deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
&node->bclink.deferred_tail, &node->bclink.deferred_tail,
buf)) { buf);
node->bclink.nack_sync++; node->bclink.deferred_size += deferred;
bcl->stats.deferred_recv++; bclink_update_last_sent(node, seqno);
if (seqno == mod(gap_after + 1))
node->bclink.gap_after = seqno;
else if (less(gap_after, seqno) && less(seqno, gap_to))
node->bclink.gap_to = seqno;
}
buf = NULL; buf = NULL;
if (bclink_ack_allowed(node->bclink.nack_sync)) { } else
if (gap_to != gap_after) deferred = 0;
bclink_send_nack(node);
bclink_set_gap(node); spin_lock_bh(&bc_lock);
}
} else { if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++; bcl->stats.duplicates++;
}
spin_unlock_bh(&bc_lock);
unlock: unlock:
tipc_node_unlock(node); tipc_node_unlock(node);
exit: exit:
......
...@@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf); ...@@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf);
void tipc_bclink_recv_pkt(struct sk_buff *buf); void tipc_bclink_recv_pkt(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_get_last_sent(void);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno); void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void); int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit); int tipc_bclink_set_queue_limits(u32 limit);
......
...@@ -1501,13 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, ...@@ -1501,13 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_node_lock(n_ptr); tipc_node_lock(n_ptr);
tipc_addr_string_fill(addr_string, n_ptr->addr); tipc_addr_string_fill(addr_string, n_ptr->addr);
info("Multicast link info for %s\n", addr_string); info("Broadcast link info for %s\n", addr_string);
info("Supportable: %d, ", n_ptr->bclink.supportable);
info("Supported: %d, ", n_ptr->bclink.supported); info("Supported: %d, ", n_ptr->bclink.supported);
info("Acked: %u\n", n_ptr->bclink.acked); info("Acked: %u\n", n_ptr->bclink.acked);
info("Last in: %u, ", n_ptr->bclink.last_in); info("Last in: %u, ", n_ptr->bclink.last_in);
info("Gap after: %u, ", n_ptr->bclink.gap_after); info("Oos state: %u, ", n_ptr->bclink.oos_state);
info("Gap to: %u\n", n_ptr->bclink.gap_to); info("Last sent: %u\n", n_ptr->bclink.last_sent);
info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
...@@ -1736,7 +1736,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1736,7 +1736,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Release acked messages */ /* Release acked messages */
if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) if (n_ptr->bclink.supported)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
crs = l_ptr->first_out; crs = l_ptr->first_out;
...@@ -1774,6 +1774,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1774,6 +1774,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
head = link_insert_deferred_queue(l_ptr, head = link_insert_deferred_queue(l_ptr,
head); head);
if (likely(msg_is_dest(msg, tipc_own_addr))) { if (likely(msg_is_dest(msg, tipc_own_addr))) {
int ret;
deliver: deliver:
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
...@@ -1798,11 +1799,15 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1798,11 +1799,15 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue; continue;
case MSG_FRAGMENTER: case MSG_FRAGMENTER:
l_ptr->stats.recv_fragments++; l_ptr->stats.recv_fragments++;
if (tipc_link_recv_fragment(&l_ptr->defragm_buf, ret = tipc_link_recv_fragment(
&buf, &msg)) { &l_ptr->defragm_buf,
&buf, &msg);
if (ret == 1) {
l_ptr->stats.recv_fragmented++; l_ptr->stats.recv_fragmented++;
goto deliver; goto deliver;
} }
if (ret == -1)
l_ptr->next_in_no--;
break; break;
case CHANGEOVER_PROTOCOL: case CHANGEOVER_PROTOCOL:
type = msg_type(msg); type = msg_type(msg);
...@@ -1853,17 +1858,16 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1853,17 +1858,16 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
} }
/* /*
* link_defer_buf(): Sort a received out-of-sequence packet * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
* into the deferred reception queue. *
* Returns the increase of the queue length,i.e. 0 or 1 * Returns increase in queue length (i.e. 0 or 1)
*/ */
u32 tipc_link_defer_pkt(struct sk_buff **head, u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff **tail,
struct sk_buff *buf) struct sk_buff *buf)
{ {
struct sk_buff *prev = NULL; struct sk_buff *queue_buf;
struct sk_buff *crs = *head; struct sk_buff **prev;
u32 seq_no = buf_seqno(buf); u32 seq_no = buf_seqno(buf);
buf->next = NULL; buf->next = NULL;
...@@ -1881,31 +1885,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, ...@@ -1881,31 +1885,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
return 1; return 1;
} }
/* Scan through queue and sort it in */ /* Locate insertion point in queue, then insert; discard if duplicate */
do { prev = head;
struct tipc_msg *msg = buf_msg(crs); queue_buf = *head;
for (;;) {
u32 curr_seqno = buf_seqno(queue_buf);
if (less(seq_no, msg_seqno(msg))) { if (seq_no == curr_seqno) {
buf->next = crs; buf_discard(buf);
if (prev) return 0;
prev->next = buf;
else
*head = buf;
return 1;
} }
if (seq_no == msg_seqno(msg))
if (less(seq_no, curr_seqno))
break; break;
prev = crs;
crs = crs->next;
} while (crs);
/* Message is a duplicate of an existing message */ prev = &queue_buf->next;
queue_buf = queue_buf->next;
}
buf_discard(buf); buf->next = queue_buf;
return 0; *prev = buf;
return 1;
} }
/** /*
* link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
*/ */
...@@ -1956,6 +1959,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, ...@@ -1956,6 +1959,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
u32 msg_size = sizeof(l_ptr->proto_msg); u32 msg_size = sizeof(l_ptr->proto_msg);
int r_flag; int r_flag;
/* Discard any previous message that was deferred due to congestion */
if (l_ptr->proto_msg_queue) {
buf_discard(l_ptr->proto_msg_queue);
l_ptr->proto_msg_queue = NULL;
}
if (link_blocked(l_ptr)) if (link_blocked(l_ptr))
return; return;
...@@ -1964,9 +1974,11 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, ...@@ -1964,9 +1974,11 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
return; return;
/* Create protocol message with "out-of-sequence" sequence number */
msg_set_type(msg, msg_typ); msg_set_type(msg, msg_typ);
msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
if (msg_typ == STATE_MSG) { if (msg_typ == STATE_MSG) {
...@@ -2020,44 +2032,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, ...@@ -2020,44 +2032,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
msg_set_redundant_link(msg, r_flag); msg_set_redundant_link(msg, r_flag);
msg_set_linkprio(msg, l_ptr->priority); msg_set_linkprio(msg, l_ptr->priority);
msg_set_size(msg, msg_size);
/* Ensure sequence number will not fit : */
msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
/* Congestion? */
if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
if (!l_ptr->proto_msg_queue) {
l_ptr->proto_msg_queue =
tipc_buf_acquire(sizeof(l_ptr->proto_msg));
}
buf = l_ptr->proto_msg_queue;
if (!buf)
return;
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
return;
}
/* Message can be sent */
buf = tipc_buf_acquire(msg_size); buf = tipc_buf_acquire(msg_size);
if (!buf) if (!buf)
return; return;
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
msg_set_size(buf_msg(buf), msg_size);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { /* Defer message if bearer is already congested */
l_ptr->unacked_window = 0;
buf_discard(buf); if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
l_ptr->proto_msg_queue = buf;
return; return;
} }
/* New congestion */ /* Defer message if attempting to send results in bearer congestion */
if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
l_ptr->proto_msg_queue = buf; l_ptr->proto_msg_queue = buf;
l_ptr->stats.bearer_congs++; l_ptr->stats.bearer_congs++;
return;
}
/* Discard message if it was sent successfully */
l_ptr->unacked_window = 0;
buf_discard(buf);
} }
/* /*
...@@ -2105,6 +2109,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -2105,6 +2109,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
l_ptr->owner->block_setup = WAIT_NODE_DOWN; l_ptr->owner->block_setup = WAIT_NODE_DOWN;
} }
link_state_event(l_ptr, RESET_MSG);
/* fall thru' */ /* fall thru' */
case ACTIVATE_MSG: case ACTIVATE_MSG:
/* Update link settings according other endpoint's values */ /* Update link settings according other endpoint's values */
...@@ -2127,16 +2133,22 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -2127,16 +2133,22 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
} else { } else {
l_ptr->max_pkt = l_ptr->max_pkt_target; l_ptr->max_pkt = l_ptr->max_pkt_target;
} }
l_ptr->owner->bclink.supported = (max_pkt_info != 0); l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
link_state_event(l_ptr, msg_type(msg)); /* Synchronize broadcast link info, if not done previously */
if (!tipc_node_is_up(l_ptr->owner)) {
l_ptr->owner->bclink.last_sent =
l_ptr->owner->bclink.last_in =
msg_last_bcast(msg);
l_ptr->owner->bclink.oos_state = 0;
}
l_ptr->peer_session = msg_session(msg); l_ptr->peer_session = msg_session(msg);
l_ptr->peer_bearer_id = msg_bearer_id(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg);
/* Synchronize broadcast sequence numbers */ if (msg_type(msg) == ACTIVATE_MSG)
if (!tipc_node_redundant_links(l_ptr->owner)) link_state_event(l_ptr, ACTIVATE_MSG);
l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
break; break;
case STATE_MSG: case STATE_MSG:
...@@ -2177,7 +2189,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -2177,7 +2189,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
/* Protocol message before retransmits, reduce loss risk */ /* Protocol message before retransmits, reduce loss risk */
tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); if (l_ptr->owner->bclink.supported)
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));
if (rec_gap || (msg_probe(msg))) { if (rec_gap || (msg_probe(msg))) {
tipc_link_send_proto_msg(l_ptr, STATE_MSG, tipc_link_send_proto_msg(l_ptr, STATE_MSG,
...@@ -2623,7 +2637,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, ...@@ -2623,7 +2637,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
set_fragm_size(pbuf, fragm_sz); set_fragm_size(pbuf, fragm_sz);
set_expected_frags(pbuf, exp_fragm_cnt - 1); set_expected_frags(pbuf, exp_fragm_cnt - 1);
} else { } else {
warn("Link unable to reassemble fragmented message\n"); dbg("Link unable to reassemble fragmented message\n");
buf_discard(fbuf);
return -1;
} }
buf_discard(fbuf); buf_discard(fbuf);
return 0; return 0;
......
...@@ -239,9 +239,6 @@ void tipc_named_node_up(unsigned long nodearg) ...@@ -239,9 +239,6 @@ void tipc_named_node_up(unsigned long nodearg)
* *
* Invoked for each publication issued by a newly failed node. * Invoked for each publication issued by a newly failed node.
* Removes publication structure from name table & deletes it. * Removes publication structure from name table & deletes it.
* In rare cases the link may have come back up again when this
* function is called, and we have two items representing the same
* publication. Nudge this item's key to distinguish it from the other.
*/ */
static void named_purge_publ(struct publication *publ) static void named_purge_publ(struct publication *publ)
...@@ -249,7 +246,6 @@ static void named_purge_publ(struct publication *publ) ...@@ -249,7 +246,6 @@ static void named_purge_publ(struct publication *publ)
struct publication *p; struct publication *p;
write_lock_bh(&tipc_nametbl_lock); write_lock_bh(&tipc_nametbl_lock);
publ->key += 1222345;
p = tipc_nametbl_remove_publ(publ->type, publ->lower, p = tipc_nametbl_remove_publ(publ->type, publ->lower,
publ->node, publ->ref, publ->key); publ->node, publ->ref, publ->key);
if (p) if (p)
......
...@@ -49,9 +49,8 @@ LIST_HEAD(tipc_node_list); ...@@ -49,9 +49,8 @@ LIST_HEAD(tipc_node_list);
static u32 tipc_num_nodes; static u32 tipc_num_nodes;
static atomic_t tipc_num_links = ATOMIC_INIT(0); static atomic_t tipc_num_links = ATOMIC_INIT(0);
u32 tipc_own_tag;
/** /*
* tipc_node_find - locate specified node object, if it exists * tipc_node_find - locate specified node object, if it exists
*/ */
...@@ -306,10 +305,9 @@ static void node_established_contact(struct tipc_node *n_ptr) ...@@ -306,10 +305,9 @@ static void node_established_contact(struct tipc_node *n_ptr)
/* Syncronize broadcast acks */ /* Syncronize broadcast acks */
n_ptr->bclink.acked = tipc_bclink_get_last_sent(); n_ptr->bclink.acked = tipc_bclink_get_last_sent();
if (n_ptr->bclink.supported) { if (n_ptr->bclink.supportable) {
tipc_bclink_add_node(n_ptr->addr); tipc_bclink_add_node(n_ptr->addr);
if (n_ptr->addr < tipc_own_addr) n_ptr->bclink.supported = 1;
tipc_own_tag++;
} }
} }
...@@ -338,12 +336,12 @@ static void node_lost_contact(struct tipc_node *n_ptr) ...@@ -338,12 +336,12 @@ static void node_lost_contact(struct tipc_node *n_ptr)
/* Flush broadcast link info associated with lost node */ /* Flush broadcast link info associated with lost node */
if (n_ptr->bclink.supported) { if (n_ptr->bclink.supported) {
n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
while (n_ptr->bclink.deferred_head) { while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head; struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next; n_ptr->bclink.deferred_head = buf->next;
buf_discard(buf); buf_discard(buf);
} }
n_ptr->bclink.deferred_size = 0;
if (n_ptr->bclink.defragm) { if (n_ptr->bclink.defragm) {
buf_discard(n_ptr->bclink.defragm); buf_discard(n_ptr->bclink.defragm);
...@@ -352,8 +350,6 @@ static void node_lost_contact(struct tipc_node *n_ptr) ...@@ -352,8 +350,6 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_remove_node(n_ptr->addr);
tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
if (n_ptr->addr < tipc_own_addr)
tipc_own_tag--;
n_ptr->bclink.supported = 0; n_ptr->bclink.supported = 0;
} }
...@@ -449,7 +445,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) ...@@ -449,7 +445,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
read_lock_bh(&tipc_net_lock); read_lock_bh(&tipc_net_lock);
/* Get space for all unicast links + multicast link */ /* Get space for all unicast links + broadcast link */
payload_size = TLV_SPACE(sizeof(link_info)) * payload_size = TLV_SPACE(sizeof(link_info)) *
(atomic_read(&tipc_num_links) + 1); (atomic_read(&tipc_num_links) + 1);
......
...@@ -62,12 +62,13 @@ ...@@ -62,12 +62,13 @@
* @link_cnt: number of links to node * @link_cnt: number of links to node
* @permit_changeover: non-zero if node has redundant links to this system * @permit_changeover: non-zero if node has redundant links to this system
* @bclink: broadcast-related info * @bclink: broadcast-related info
* @supportable: non-zero if node supports TIPC b'cast link capability
* @supported: non-zero if node supports TIPC b'cast capability * @supported: non-zero if node supports TIPC b'cast capability
* @acked: sequence # of last outbound b'cast message acknowledged by node * @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node * @last_in: sequence # of last in-sequence b'cast message received from node
* @gap_after: sequence # of last message not requiring a NAK request * @last_sent: sequence # of last b'cast message sent by node
* @gap_to: sequence # of last message requiring a NAK request * @oos_state: state tracker for handling OOS b'cast messages
* @nack_sync: counter that determines when NAK requests should be sent * @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node * @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node * @defragm: list of partially reassembled b'cast message fragments from node
...@@ -86,12 +87,13 @@ struct tipc_node { ...@@ -86,12 +87,13 @@ struct tipc_node {
int block_setup; int block_setup;
int permit_changeover; int permit_changeover;
struct { struct {
int supported; u8 supportable;
u8 supported;
u32 acked; u32 acked;
u32 last_in; u32 last_in;
u32 gap_after; u32 last_sent;
u32 gap_to; u32 oos_state;
u32 nack_sync; u32 deferred_size;
struct sk_buff *deferred_head; struct sk_buff *deferred_head;
struct sk_buff *deferred_tail; struct sk_buff *deferred_tail;
struct sk_buff *defragm; struct sk_buff *defragm;
...@@ -112,8 +114,6 @@ static inline unsigned int tipc_hashfn(u32 addr) ...@@ -112,8 +114,6 @@ static inline unsigned int tipc_hashfn(u32 addr)
return addr & (NODE_HTABLE_SIZE - 1); return addr & (NODE_HTABLE_SIZE - 1);
} }
extern u32 tipc_own_tag;
struct tipc_node *tipc_node_find(u32 addr); struct tipc_node *tipc_node_find(u32 addr);
struct tipc_node *tipc_node_create(u32 addr); struct tipc_node *tipc_node_create(u32 addr);
void tipc_node_delete(struct tipc_node *n_ptr); void tipc_node_delete(struct tipc_node *n_ptr);
......
...@@ -400,16 +400,17 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) ...@@ -400,16 +400,17 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
/* send self-abort message when rejecting on a connected port */ /* send self-abort message when rejecting on a connected port */
if (msg_connected(msg)) { if (msg_connected(msg)) {
struct sk_buff *abuf = NULL;
struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
if (p_ptr) { if (p_ptr) {
struct sk_buff *abuf = NULL;
if (p_ptr->connected) if (p_ptr->connected)
abuf = port_build_self_abort_msg(p_ptr, err); abuf = port_build_self_abort_msg(p_ptr, err);
tipc_port_unlock(p_ptr); tipc_port_unlock(p_ptr);
}
tipc_net_route_msg(abuf); tipc_net_route_msg(abuf);
} }
}
/* send returned message & dispose of rejected message */ /* send returned message & dispose of rejected message */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment