Commit b3663583 authored by Joanne Hugé's avatar Joanne Hugé

Free DPDK RX packets correctly to fix segfault

parent 6976e07d
...@@ -46,23 +46,24 @@ ...@@ -46,23 +46,24 @@
#endif #endif
#include "private/trx_driver.h" #include "private/trx_driver.h"
#include "utils.c"
#define EFREQ 38400
//#define DEBUG // Enables / deactivates log_debug //#define DEBUG // Enables / deactivates log_debug
//#define DST_ADDR_SYNTAX // Depends on DPDK version
//#define TRACE //#define TRACE
//#define MONITOR
#define RECV_STOP_THRESHOLD 3 #define MONITOR
#define MONITOR_EXIT
#define RECV_PPS_THRESHOLD (EFREQ * 100 * 9 / 10)
#define RECV_STOP_THRESHOLD 2
#define PPS_UPDATE_PERIOD INT64_C(1000000000) #define PPS_UPDATE_PERIOD INT64_C(1000000000)
#include "utils.c"
#define EFREQ 38400
//#define EFREQ 100
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150)
//#define START_SENDING //#define START_SENDING
#define START_RECEIVING #define START_RECEIVING
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 500)
//#define DST_ADDR_SYNTAX // Depends on DPDK version
#define RX_N_CHANNEL 1 #define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4 #define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency #define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
...@@ -127,7 +128,8 @@ typedef struct { ...@@ -127,7 +128,8 @@ typedef struct {
- compression / decompression of IQ samples - compression / decompression of IQ samples
- fast conversion between int16_t and float - fast conversion between int16_t and float
*/ */
#include "private/bf1_avx2.c" //#include "private/bf1_avx2.c"
#include "private/bf1_avx2_nop.c"
// Buffers // Buffers
static ring_buffer_t rx_rbuf; // Received packets static ring_buffer_t rx_rbuf; // Received packets
...@@ -148,7 +150,7 @@ static volatile counter_stat_t tx_drop_counter; // frames sent to eRE ...@@ -148,7 +150,7 @@ static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile int sync_complete = 0; static volatile int sync_complete = 0;
static volatile int received_pkts = 0; static volatile int received_pkts = 0;
static volatile int sync_happened = 0; static volatile int recv_pps_threshold_hit = 0;
static int first_trx_write = 1; static int first_trx_write = 1;
#ifndef DPDK #ifndef DPDK
...@@ -350,8 +352,8 @@ struct rte_mbuf { ...@@ -350,8 +352,8 @@ struct rte_mbuf {
void rte_pktmbuf_free(void * pkt) { void rte_pktmbuf_free(void * pkt) {
(void) pkt; (void) pkt;
for(int i = 0; i < 1000; i ++) //for(int i = 0; i < 1000; i ++)
asm("NOP"); // asm("NOP");
} }
#endif #endif
...@@ -440,7 +442,12 @@ static void *recv_thread(void *p) { ...@@ -440,7 +442,12 @@ static void *recv_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
#ifdef DPDK
int first_seq_id = 1; int first_seq_id = 1;
#else
int64_t target_counter = 0;
struct timespec current, previous;
#endif
log_info("RECV_THREAD", "Thread init"); log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
...@@ -452,11 +459,14 @@ static void *recv_thread(void *p) { ...@@ -452,11 +459,14 @@ static void *recv_thread(void *p) {
#define RTE_MBUF_SIZE 20000 #define RTE_MBUF_SIZE 20000
#define MIN_RX 10000 #define MIN_RX 10000
for(;;) { for(int64_t i = 0;; i++) {
struct rte_mbuf * pkt[RTE_MBUF_SIZE]; struct rte_mbuf * pkt[RTE_MBUF_SIZE];
uint8_t * buf, * rtebuf; uint8_t * buf;
#ifdef DPDK
uint8_t * rtebuf;
int port = 0; int port = 0;
#endif
int nb_rx = 0; int nb_rx = 0;
int n; int n;
int drop_packet = 0; int drop_packet = 0;
...@@ -465,11 +475,33 @@ static void *recv_thread(void *p) { ...@@ -465,11 +475,33 @@ static void *recv_thread(void *p) {
while(!nb_rx) while(!nb_rx)
nb_rx = rte_eth_rx_burst(port, 0, pkt + nb_rx, 1024); nb_rx = rte_eth_rx_burst(port, 0, pkt + nb_rx, 1024);
#else #else
for(int i = 0; i < 150000; i++) // Limit packets sent
asm("NOP"); if(recv_counter.counter >= target_counter) {
nb_rx = 1024; clock_gettime(CLOCK_TAI, &current);
#endif if(i && calcdiff_ns(current, previous) < (1000 * 1000 * 10)) {
//for(int i = 0; i < 10000; i++)
// asm("NOP");
//usleep(10);
}
else {
target_counter += EFREQ;
previous = current;
}
}
if(recv_counter.counter < target_counter) {
nb_rx = 1024;
//for(int i = 0; i < 700000; i++)
// asm("NOP");
usleep(200);
}
else
continue;
#endif
if(nb_rx > RTE_MBUF_SIZE)
log_exit("RECV_THREAD", "nb_rx (%d) > RTE_MBUF_SIZE (%d)", nb_rx, RTE_MBUF_SIZE);
received_pkts = 1; received_pkts = 1;
n = rbuf_write_amount(&rx_rbuf); n = rbuf_write_amount(&rx_rbuf);
...@@ -482,7 +514,7 @@ static void *recv_thread(void *p) { ...@@ -482,7 +514,7 @@ static void *recv_thread(void *p) {
update_counter(&rx_drop_counter, nb_rx); update_counter(&rx_drop_counter, nb_rx);
} }
else { else {
int nc; int nr; int nc; int nr; int k = 0;
nr = nb_rx; nr = nb_rx;
while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) { while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) {
...@@ -497,7 +529,7 @@ static void *recv_thread(void *p) { ...@@ -497,7 +529,7 @@ static void *recv_thread(void *p) {
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len); buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
for(int i = 0; i < nc; i++) { for(int i = 0; i < nc; i++) {
#ifdef DPDK #ifdef DPDK
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off; rtebuf = (uint8_t *) (pkt[i + k])->buf_addr + (pkt[i + k])->data_off;
if(first_seq_id) { if(first_seq_id) {
uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]); uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
printf("seq_id = %d\n", seq_id); printf("seq_id = %d\n", seq_id);
...@@ -506,13 +538,14 @@ static void *recv_thread(void *p) { ...@@ -506,13 +538,14 @@ static void *recv_thread(void *p) {
memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len); memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
#else #else
memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len); //memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len);
#endif #endif
} }
rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len; rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len;
for(int i = 0; i < nc; i++) for(int i = 0; i < nc; i++)
rte_pktmbuf_free(pkt[i]); rte_pktmbuf_free(pkt[i + k]);
nr -= nc; nr -= nc;
k += nc;
} }
} }
update_counter(&recv_counter, nb_rx); update_counter(&recv_counter, nb_rx);
...@@ -576,8 +609,10 @@ static void *encode_thread(void *p) { ...@@ -576,8 +609,10 @@ static void *encode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next; #ifdef START_SENDING
int64_t target_counter = 0; int64_t target_counter = 0;
struct timespec next;
#endif
int reset_encode_counter = 1; int reset_encode_counter = 1;
// Set thread CPU affinity // Set thread CPU affinity
...@@ -602,7 +637,7 @@ static void *encode_thread(void *p) { ...@@ -602,7 +637,7 @@ static void *encode_thread(void *p) {
// Send empty frames until we receive something // Send empty frames until we receive something
#ifdef START_SENDING #ifdef START_SENDING
if(!sync_complete && !n) { if(!sync_complete) {
if(i == 0) if(i == 0)
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
// Limit packets sent // Limit packets sent
...@@ -688,8 +723,10 @@ static void *decode_thread(void *p) { ...@@ -688,8 +723,10 @@ static void *decode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
#ifdef START_RECEIVING
struct timespec next; struct timespec next;
int64_t target_counter = 0; int64_t target_counter = 0;
#endif
log_info("DECODE_THREAD", "Thread init"); log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
...@@ -755,7 +792,9 @@ static void *decode_thread(void *p) { ...@@ -755,7 +792,9 @@ static void *decode_thread(void *p) {
} }
static void *statistic_thread(void *p) { static void *statistic_thread(void *p) {
struct timespec next, initial; struct timespec next, initial;
#ifdef MONITOR
int64_t recv_stop = 0; int64_t recv_stop = 0;
#endif
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
FILE * stats_file_desc; FILE * stats_file_desc;
...@@ -808,12 +847,18 @@ static void *statistic_thread(void *p) { ...@@ -808,12 +847,18 @@ static void *statistic_thread(void *p) {
update_counter_pps(&encode_counter); update_counter_pps(&encode_counter);
update_counter_pps(&sent_counter); update_counter_pps(&sent_counter);
#ifdef MONITOR #ifdef MONITOR
if(recv_counter.pps < 3000000) { if(recv_counter.pps > RECV_PPS_THRESHOLD) {
recv_pps_threshold_hit = 1;
}
if(recv_pps_threshold_hit && recv_counter.pps < RECV_PPS_THRESHOLD) {
struct timespec _ts; struct timespec _ts;
int64_t ts; int64_t ts;
clock_gettime(CLOCK_MONOTONIC, &_ts); clock_gettime(CLOCK_MONOTONIC, &_ts);
ts = ts_to_int(_ts); ts = ts_to_int(_ts);
if(sync_happened && (recv_stop && ((ts - recv_stop) > RECV_STOP_THRESHOLD * INT64_C(1000000000)))) { if((recv_stop && ((ts - recv_stop) > RECV_STOP_THRESHOLD * INT64_C(1000000000)))) {
#ifdef MONITOR_EXIT
log_exit("MONITOR", "Stopped recieving packets, restarting...");
#endif
log_info("MONITOR", "Stopped recieving packets, sending again..."); log_info("MONITOR", "Stopped recieving packets, sending again...");
sync_complete = 0; sync_complete = 0;
recv_stop = 0; recv_stop = 0;
...@@ -1103,18 +1148,19 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1103,18 +1148,19 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
int read_count = (count / M); int read_count = (count / M);
int offset = 0; int offset = 0;
while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter); log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);
while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
sync_complete = 1; sync_complete = 1;
sync_happened = 1;
n = read_count; n = read_count;
while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) { while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) {
int len = nc * trxr_rbuf[0].len * sizeof(Complex); int len = nc * trxr_rbuf[0].len * sizeof(Complex);
for(int i = 0; i < RX_N_CHANNEL; i++ ) { for(int i = 0; i < RX_N_CHANNEL; i++ ) {
memcpy((uint8_t*) (_samples[i] + offset), ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex), len); uint8_t * dst = (uint8_t*) (_samples[i] + offset);
uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex);
memcpy(dst, src, len);
} }
trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len; trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
n -= nc; n -= nc;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment