Commit 33372bc2 authored by Daniel Borkmann's avatar Daniel Borkmann

Merge branch 'xsk-batching'

Maciej Fijalkowski says:

====================
Unfortunately, similar scalability issues that were addressed for XDP
processing in ice, exist for XDP in the zero-copy driver used by AF_XDP.
Let's resolve them in mostly the same way as we did in [0] and utilize
the Tx batching API from XSK buffer pool.

Move the array of Tx descriptors that is used with batching approach to
the XSK buffer pool. This means that future users of this API will not
have to carry the array on their own side, they can simple refer to
pool's tx_desc array.

We also improve the Rx side where we extend ice_alloc_rx_buf_zc() to
handle the ring wrap and bump Rx tail more frequently. By doing so,
Rx side is adjusted to Tx and it was needed for l2fwd scenario.

Here are the improvements of performance numbers that this set brings
measured with xdpsock app in busy poll mode for 1 and 2 core modes.
Both Tx and Rx rings were sized to 1k length and busy poll budget was
256.

----------------------------------------------------------------
     |      txonly:      |      l2fwd      |      rxdrop
----------------------------------------------------------------
1C   |       149%        |       14%       |        3%
----------------------------------------------------------------
2C   |       134%        |       20%       |        5%
----------------------------------------------------------------

Next step will be to introduce batching onto Rx side.

v5:
* collect acks
* fix typos
* correct comments showing cache line boundaries in ice_tx_ring struct
v4 - address Alexandr's review:
* new patch (2) for making sure ring size is pow(2) when attaching
  xsk socket
* don't open code ALIGN_DOWN (patch 3)
* resign from storing tx_thresh in ice_tx_ring (patch 4)
* scope variables in a better way for Tx batching (patch 7)
v3:
* drop likely() that was wrapping napi_complete_done (patch 1)
* introduce configurable Tx threshold (patch 2)
* handle ring wrap on Rx side when allocating buffers (patch 3)
* respect NAPI budget when cleaning Tx descriptors in ZC (patch 6)
v2:
* introduce new patch that resets @next_dd and @next_rs fields
* use batching API for AF_XDP Tx on ice side

  [0]: https://lore.kernel.org/bpf/20211015162908.145341-8-anthony.l.nguyen@intel.com/
====================
Signed-off-by: default avatarDaniel Borkmann <daniel@iogearbox.net>
parents 8033c6c2 59e92bfe
......@@ -830,8 +830,6 @@ void i40e_free_tx_resources(struct i40e_ring *tx_ring)
i40e_clean_tx_ring(tx_ring);
kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL;
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;
if (tx_ring->desc) {
dma_free_coherent(tx_ring->dev, tx_ring->size,
......@@ -1433,13 +1431,6 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
if (!tx_ring->tx_bi)
goto err;
if (ring_is_xdp(tx_ring)) {
tx_ring->xsk_descs = kcalloc(I40E_MAX_NUM_DESCRIPTORS, sizeof(*tx_ring->xsk_descs),
GFP_KERNEL);
if (!tx_ring->xsk_descs)
goto err;
}
u64_stats_init(&tx_ring->syncp);
/* round up to nearest 4K */
......@@ -1463,8 +1454,6 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
return 0;
err:
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;
kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL;
return -ENOMEM;
......
......@@ -390,7 +390,6 @@ struct i40e_ring {
u16 rx_offset;
struct xdp_rxq_info xdp_rxq;
struct xsk_buff_pool *xsk_pool;
struct xdp_desc *xsk_descs; /* For storing descriptors in the AF_XDP ZC path */
} ____cacheline_internodealigned_in_smp;
static inline bool ring_uses_build_skb(struct i40e_ring *ring)
......
......@@ -467,11 +467,11 @@ static void i40e_set_rs_bit(struct i40e_ring *xdp_ring)
**/
static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget)
{
struct xdp_desc *descs = xdp_ring->xsk_descs;
struct xdp_desc *descs = xdp_ring->xsk_pool->tx_descs;
u32 nb_pkts, nb_processed = 0;
unsigned int total_bytes = 0;
nb_pkts = xsk_tx_peek_release_desc_batch(xdp_ring->xsk_pool, descs, budget);
nb_pkts = xsk_tx_peek_release_desc_batch(xdp_ring->xsk_pool, budget);
if (!nb_pkts)
return true;
......
......@@ -2803,6 +2803,8 @@ ice_set_ringparam(struct net_device *netdev, struct ethtool_ringparam *ring,
/* clone ring and setup updated count */
xdp_rings[i] = *vsi->xdp_rings[i];
xdp_rings[i].count = new_tx_cnt;
xdp_rings[i].next_dd = ICE_RING_QUARTER(&xdp_rings[i]) - 1;
xdp_rings[i].next_rs = ICE_RING_QUARTER(&xdp_rings[i]) - 1;
xdp_rings[i].desc = NULL;
xdp_rings[i].tx_buf = NULL;
err = ice_setup_tx_ring(&xdp_rings[i]);
......
......@@ -2495,10 +2495,10 @@ static int ice_xdp_alloc_setup_rings(struct ice_vsi *vsi)
xdp_ring->reg_idx = vsi->txq_map[xdp_q_idx];
xdp_ring->vsi = vsi;
xdp_ring->netdev = NULL;
xdp_ring->next_dd = ICE_TX_THRESH - 1;
xdp_ring->next_rs = ICE_TX_THRESH - 1;
xdp_ring->dev = dev;
xdp_ring->count = vsi->num_tx_desc;
xdp_ring->next_dd = ICE_RING_QUARTER(xdp_ring) - 1;
xdp_ring->next_rs = ICE_RING_QUARTER(xdp_ring) - 1;
WRITE_ONCE(vsi->xdp_rings[i], xdp_ring);
if (ice_setup_tx_ring(xdp_ring))
goto free_xdp_rings;
......
......@@ -173,6 +173,8 @@ void ice_clean_tx_ring(struct ice_tx_ring *tx_ring)
tx_ring->next_to_use = 0;
tx_ring->next_to_clean = 0;
tx_ring->next_dd = ICE_RING_QUARTER(tx_ring) - 1;
tx_ring->next_rs = ICE_RING_QUARTER(tx_ring) - 1;
if (!tx_ring->netdev)
return;
......@@ -1460,7 +1462,7 @@ int ice_napi_poll(struct napi_struct *napi, int budget)
bool wd;
if (tx_ring->xsk_pool)
wd = ice_clean_tx_irq_zc(tx_ring, budget);
wd = ice_xmit_zc(tx_ring, ICE_DESC_UNUSED(tx_ring), budget);
else if (ice_ring_is_xdp(tx_ring))
wd = true;
else
......@@ -1513,7 +1515,7 @@ int ice_napi_poll(struct napi_struct *napi, int budget)
/* Exit the polling mode, but don't re-enable interrupts if stack might
* poll us due to busy-polling
*/
if (likely(napi_complete_done(napi, work_done))) {
if (napi_complete_done(napi, work_done)) {
ice_net_dim(q_vector);
ice_enable_interrupt(q_vector);
} else {
......
......@@ -13,7 +13,6 @@
#define ICE_MAX_CHAINED_RX_BUFS 5
#define ICE_MAX_BUF_TXD 8
#define ICE_MIN_TX_LEN 17
#define ICE_TX_THRESH 32
/* The size limit for a transmit buffer in a descriptor is (16K - 1).
* In order to align with the read requests we will align the value to
......@@ -111,6 +110,8 @@ static inline int ice_skb_pad(void)
(u16)((((R)->next_to_clean > (R)->next_to_use) ? 0 : (R)->count) + \
(R)->next_to_clean - (R)->next_to_use - 1)
#define ICE_RING_QUARTER(R) ((R)->count >> 2)
#define ICE_TX_FLAGS_TSO BIT(0)
#define ICE_TX_FLAGS_HW_VLAN BIT(1)
#define ICE_TX_FLAGS_SW_VLAN BIT(2)
......@@ -321,17 +322,18 @@ struct ice_tx_ring {
u16 count; /* Number of descriptors */
u16 q_index; /* Queue number of ring */
/* stats structs */
struct ice_q_stats stats;
struct u64_stats_sync syncp;
struct ice_txq_stats tx_stats;
/* CL3 - 3rd cacheline starts here */
struct ice_q_stats stats;
struct u64_stats_sync syncp;
struct rcu_head rcu; /* to avoid race on free */
DECLARE_BITMAP(xps_state, ICE_TX_NBITS); /* XPS Config State */
struct ice_channel *ch;
struct ice_ptp_tx *tx_tstamps;
spinlock_t tx_lock;
u32 txq_teid; /* Added Tx queue TEID */
/* CL4 - 4th cacheline starts here */
u16 xdp_tx_active;
#define ICE_TX_FLAGS_RING_XDP BIT(0)
u8 flags;
u8 dcb_tc; /* Traffic class of ring */
......
......@@ -222,6 +222,7 @@ ice_receive_skb(struct ice_rx_ring *rx_ring, struct sk_buff *skb, u16 vlan_tag)
static void ice_clean_xdp_irq(struct ice_tx_ring *xdp_ring)
{
unsigned int total_bytes = 0, total_pkts = 0;
u16 tx_thresh = ICE_RING_QUARTER(xdp_ring);
u16 ntc = xdp_ring->next_to_clean;
struct ice_tx_desc *next_dd_desc;
u16 next_dd = xdp_ring->next_dd;
......@@ -233,7 +234,7 @@ static void ice_clean_xdp_irq(struct ice_tx_ring *xdp_ring)
cpu_to_le64(ICE_TX_DESC_DTYPE_DESC_DONE)))
return;
for (i = 0; i < ICE_TX_THRESH; i++) {
for (i = 0; i < tx_thresh; i++) {
tx_buf = &xdp_ring->tx_buf[ntc];
total_bytes += tx_buf->bytecount;
......@@ -254,9 +255,9 @@ static void ice_clean_xdp_irq(struct ice_tx_ring *xdp_ring)
}
next_dd_desc->cmd_type_offset_bsz = 0;
xdp_ring->next_dd = xdp_ring->next_dd + ICE_TX_THRESH;
xdp_ring->next_dd = xdp_ring->next_dd + tx_thresh;
if (xdp_ring->next_dd > xdp_ring->count)
xdp_ring->next_dd = ICE_TX_THRESH - 1;
xdp_ring->next_dd = tx_thresh - 1;
xdp_ring->next_to_clean = ntc;
ice_update_tx_ring_stats(xdp_ring, total_pkts, total_bytes);
}
......@@ -269,12 +270,13 @@ static void ice_clean_xdp_irq(struct ice_tx_ring *xdp_ring)
*/
int ice_xmit_xdp_ring(void *data, u16 size, struct ice_tx_ring *xdp_ring)
{
u16 tx_thresh = ICE_RING_QUARTER(xdp_ring);
u16 i = xdp_ring->next_to_use;
struct ice_tx_desc *tx_desc;
struct ice_tx_buf *tx_buf;
dma_addr_t dma;
if (ICE_DESC_UNUSED(xdp_ring) < ICE_TX_THRESH)
if (ICE_DESC_UNUSED(xdp_ring) < tx_thresh)
ice_clean_xdp_irq(xdp_ring);
if (!unlikely(ICE_DESC_UNUSED(xdp_ring))) {
......@@ -300,13 +302,14 @@ int ice_xmit_xdp_ring(void *data, u16 size, struct ice_tx_ring *xdp_ring)
tx_desc->cmd_type_offset_bsz = ice_build_ctob(ICE_TX_DESC_CMD_EOP, 0,
size, 0);
xdp_ring->xdp_tx_active++;
i++;
if (i == xdp_ring->count) {
i = 0;
tx_desc = ICE_TX_DESC(xdp_ring, xdp_ring->next_rs);
tx_desc->cmd_type_offset_bsz |=
cpu_to_le64(ICE_TX_DESC_CMD_RS << ICE_TXD_QW1_CMD_S);
xdp_ring->next_rs = ICE_TX_THRESH - 1;
xdp_ring->next_rs = tx_thresh - 1;
}
xdp_ring->next_to_use = i;
......@@ -314,7 +317,7 @@ int ice_xmit_xdp_ring(void *data, u16 size, struct ice_tx_ring *xdp_ring)
tx_desc = ICE_TX_DESC(xdp_ring, xdp_ring->next_rs);
tx_desc->cmd_type_offset_bsz |=
cpu_to_le64(ICE_TX_DESC_CMD_RS << ICE_TXD_QW1_CMD_S);
xdp_ring->next_rs += ICE_TX_THRESH;
xdp_ring->next_rs += tx_thresh;
}
return ICE_XDP_TX;
......
This diff is collapsed.
......@@ -6,19 +6,37 @@
#include "ice_txrx.h"
#include "ice.h"
#define PKTS_PER_BATCH 8
#ifdef __clang__
#define loop_unrolled_for _Pragma("clang loop unroll_count(8)") for
#elif __GNUC__ >= 4
#define loop_unrolled_for _Pragma("GCC unroll 8") for
#else
#define loop_unrolled_for for
#endif
struct ice_vsi;
#ifdef CONFIG_XDP_SOCKETS
int ice_xsk_pool_setup(struct ice_vsi *vsi, struct xsk_buff_pool *pool,
u16 qid);
int ice_clean_rx_irq_zc(struct ice_rx_ring *rx_ring, int budget);
bool ice_clean_tx_irq_zc(struct ice_tx_ring *xdp_ring, int budget);
int ice_xsk_wakeup(struct net_device *netdev, u32 queue_id, u32 flags);
bool ice_alloc_rx_bufs_zc(struct ice_rx_ring *rx_ring, u16 count);
bool ice_xsk_any_rx_ring_ena(struct ice_vsi *vsi);
void ice_xsk_clean_rx_ring(struct ice_rx_ring *rx_ring);
void ice_xsk_clean_xdp_ring(struct ice_tx_ring *xdp_ring);
bool ice_xmit_zc(struct ice_tx_ring *xdp_ring, u32 budget, int napi_budget);
#else
static inline bool
ice_xmit_zc(struct ice_tx_ring __always_unused *xdp_ring,
u32 __always_unused budget,
int __always_unused napi_budget)
{
return false;
}
static inline int
ice_xsk_pool_setup(struct ice_vsi __always_unused *vsi,
struct xsk_buff_pool __always_unused *pool,
......@@ -34,13 +52,6 @@ ice_clean_rx_irq_zc(struct ice_rx_ring __always_unused *rx_ring,
return 0;
}
static inline bool
ice_clean_tx_irq_zc(struct ice_tx_ring __always_unused *xdp_ring,
int __always_unused budget)
{
return false;
}
static inline bool
ice_alloc_rx_bufs_zc(struct ice_rx_ring __always_unused *rx_ring,
u16 __always_unused count)
......
......@@ -13,7 +13,7 @@
void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries);
bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc);
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, u32 max);
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max);
void xsk_tx_release(struct xsk_buff_pool *pool);
struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev,
u16 queue_id);
......@@ -142,8 +142,7 @@ static inline bool xsk_tx_peek_desc(struct xsk_buff_pool *pool,
return false;
}
static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc,
u32 max)
static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max)
{
return 0;
}
......
......@@ -60,6 +60,7 @@ struct xsk_buff_pool {
*/
dma_addr_t *dma_pages;
struct xdp_buff_xsk *heads;
struct xdp_desc *tx_descs;
u64 chunk_mask;
u64 addrs_cnt;
u32 free_list_cnt;
......
......@@ -343,9 +343,9 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
}
EXPORT_SYMBOL(xsk_tx_peek_desc);
static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs,
u32 max_entries)
static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entries)
{
struct xdp_desc *descs = pool->tx_descs;
u32 nb_pkts = 0;
while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
......@@ -355,8 +355,7 @@ static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_d
return nb_pkts;
}
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs,
u32 max_entries)
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
{
struct xdp_sock *xs;
u32 nb_pkts;
......@@ -365,7 +364,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *
if (!list_is_singular(&pool->xsk_tx_list)) {
/* Fallback to the non-batched version */
rcu_read_unlock();
return xsk_tx_peek_release_fallback(pool, descs, max_entries);
return xsk_tx_peek_release_fallback(pool, max_entries);
}
xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
......@@ -374,7 +373,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *
goto out;
}
nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries);
nb_pkts = xskq_cons_peek_desc_batch(xs->tx, pool, max_entries);
if (!nb_pkts) {
xs->tx->queue_empty_descs++;
goto out;
......@@ -386,7 +385,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *
* packets. This avoids having to implement any buffering in
* the Tx path.
*/
nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts);
nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
if (!nb_pkts)
goto out;
......
......@@ -37,6 +37,7 @@ void xp_destroy(struct xsk_buff_pool *pool)
if (!pool)
return;
kvfree(pool->tx_descs);
kvfree(pool->heads);
kvfree(pool);
}
......@@ -58,6 +59,12 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs,
if (!pool->heads)
goto out;
if (xs->tx) {
pool->tx_descs = kcalloc(xs->tx->nentries, sizeof(*pool->tx_descs), GFP_KERNEL);
if (!pool->tx_descs)
goto out;
}
pool->chunk_mask = ~((u64)umem->chunk_size - 1);
pool->addrs_cnt = umem->size;
pool->heads_cnt = umem->chunks;
......
......@@ -205,11 +205,11 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
return false;
}
static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
struct xdp_desc *descs,
struct xsk_buff_pool *pool, u32 max)
static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
u32 max)
{
u32 cached_cons = q->cached_cons, nb_entries = 0;
struct xdp_desc *descs = pool->tx_descs;
while (cached_cons != q->cached_prod && nb_entries < max) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
......@@ -282,12 +282,12 @@ static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
return xskq_cons_read_desc(q, desc, pool);
}
static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs,
struct xsk_buff_pool *pool, u32 max)
static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
u32 max)
{
u32 entries = xskq_cons_nb_entries(q, max);
return xskq_cons_read_desc_batch(q, descs, pool, entries);
return xskq_cons_read_desc_batch(q, pool, entries);
}
/* To improve performance in the xskq_cons_release functions, only update local state here.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment