Commit fbb32c56 authored by Joanne Hugé's avatar Joanne Hugé

512kPPS TX and RX

parent 0ea04a2b
......@@ -41,7 +41,6 @@ IQ samples: {}'''.format(
parse_binary(iq_samples))[1:]
return (avg == 0, s)
# Parse RX/TX frame
def read_trace(name, n):
log_directory = '/root/ecpri-logs'
file_name = '{}/{}'.format(log_directory, name)
......@@ -124,7 +123,6 @@ def print_iq_list(data, start, end, tx=False):
prev_x = x
print(h + " " + " ".join(map(lambda x: "{}*{}".format(*x), iq_packed)))
# Parse RX/TX frame
def read_trx_trace(name, n, channels):
log_directory = '/root/ecpri-logs'
file_name = '{}/{}'.format(log_directory, name)
......@@ -160,6 +158,8 @@ def analyze_trx_tdd(data, channels):
avg_iq_sample += iq_samples_abs_avg
empty = iq_samples_abs_avg < 0.1
if i == 0:
first_tdd_period = empty
if i > 0 and empty != prev_empty:
tdd_switch_list.append(i-prev_i)
prev_i = i
......@@ -176,6 +176,7 @@ def analyze_trx_tdd(data, channels):
ratio = total_frames / null_frames if null_frames > 0 else 'inf'
print('TDD ratio: {}\n'.format(ratio))
print('TDD switch list: ' + ', '.join(map(str, tdd_switch_list)) + '\n')
print('First TDD period: ' + "Not emitting" if first_tdd_period else "emitting")
BF1_PATH="../bf1/bf1"
......
......@@ -7,7 +7,7 @@ $DIR/stop-ecpri.sh;
cd $DIR/..;
#make clean;
make;
make &&
#$DIR/launch-ptp > $LOG_DIRECTORY/ptp.log 2> $LOG_DIRECTORY/ptp.error &
#$DIR/launch-phc2sys > $LOG_DIRECTORY/phc2sys.log 2> $LOG_DIRECTORY/phc2sys.error &
......
......@@ -47,8 +47,8 @@
#define SSE4 // define if CPU supports SSE4.1
#define DST_ADDR_SYNTAX // Depends on DPDK version
//#define SEND_LIMIT (100000)
//#define TRACE
#define SEND_LIMIT (100000)
#define TRACE
/* Proprietary code:
- compression / decompression of IQ samples
......@@ -68,8 +68,9 @@
#define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
#define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 20000
#define TRX_MAX_GROUP 1000
#define TRX_BUF_MAX_SIZE 100000
#define TXRX_BUF_MAX_SIZE 100000
#define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000)
#define TRACE_BUFFER_SIZE_MB 200
......@@ -129,29 +130,31 @@ static void write_buffer(buffer_t * buffer, int i, uint8_t * source, int64_t len
}
#endif
typedef struct {
int64_t count;
uint8_t wait;
uint8_t zeroes;
} sample_group_t;
// Buffers
static ring_buffer_t rx_rbuf; // Received packets
static ring_buffer_t trx_read_rbuf; // Decoded IQ samples
static ring_buffer_t trxr_rbuf; // Decoded IQ samples
static ring_buffer_t tx_rbuf; // Packets to send
static ring_buffer_t trx_write_rbuf; // Uncompressed IQ samples
// List of timestamps at which data should be sent
static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS];
// List of corresponding indexes in trx_write_rbuf
static volatile int trx_wb_part[TRX_WB_MAX_PARTS];
static int trx_wb_part_read_index;
static int trx_wb_part_write_index;
static ring_buffer_t trxw_rbuf; // Uncompressed IQ samples
static ring_buffer_t trxw_group_rbuf; // Group of IQ samples
// Locks
pthread_mutex_t tx_mutex;
pthread_cond_t tx_cond;
pthread_mutex_t encode_mutex;
pthread_cond_t encode_cond;
pthread_mutex_t rx_mutex;
pthread_cond_t rx_cond;
pthread_mutex_t encode_mutex;
pthread_cond_t encode_cond;
pthread_mutex_t decode_mutex;
pthread_cond_t decode_cond;
pthread_mutex_t trx_write_mutex;
pthread_cond_t trx_write_cond;
sem_t trx_read_sem;
pthread_mutex_t trxw_mutex;
pthread_cond_t trxw_cond;
static volatile int64_t rx_cond_counter = 0;
static volatile int64_t decode_cond_counter = 0;
// Counters
static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
......@@ -159,17 +162,17 @@ static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // compressed samples
static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
#define STAT_FRAME_INTERVAL INT64_C(3800000)
#define STAT_FRAME_INTERVAL INT64_C(380000)
static volatile int sync_complete = 0;
static int first_trx_write = 1;
static volatile uint8_t iq_frame_full[1024];
static volatile uint8_t iq_frame_empty[1024];
// Computed values
static int rxtx_buf_size;
static int ecpri_period_mult;
// Network
static volatile int seq_id;
......@@ -211,6 +214,66 @@ static int rbuf_write_amount(ring_buffer_t * rbuf) {
pthread_mutex_init(&rbuf.ahead_mutex, NULL);\
} while(0)
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
// Dump useful information
fprintf(stderr,
"%13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s \n",
"rx dropped",
"tx dropped",
"received",
"decode",
"read",
"write",
"encode",
"sent",
"received pps",
"decode pps",
"read pps",
"write pps",
"encode pps",
"sent pps");
fprintf(stderr,
"%13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps\n",
rx_drop_counter.counter,
tx_drop_counter.counter,
recv_counter.counter,
decode_counter.counter,
read_counter.counter,
write_counter.counter,
encode_counter.counter,
sent_counter.counter,
recv_counter.pps,
decode_counter.pps,
read_counter.pps,
write_counter.pps,
encode_counter.pps,
sent_counter.pps);
fprintf(stderr, "TX RBUF: ri %li wi %li ra %li wa %li\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));
fprintf(stderr, "RX RBUF: ri %li wi %li ra %li wa %li\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
fprintf(stderr, "TRXW RBUF: ri %li wi %li ra %li wa %li\n", trxw_rbuf.read_index, trxw_rbuf.write_index, rbuf_read_amount(&trxw_rbuf), rbuf_write_amount(&trxw_rbuf));
fprintf(stderr, "TRXR RBUF: ri %li wi %li ra %li wa %li\n", trxr_rbuf.read_index, trxr_rbuf.write_index, rbuf_read_amount(&trxr_rbuf), rbuf_write_amount(&trxr_rbuf));
fprintf(stderr, "TRXW GROUP RBUF: ri %li wi %li ra %li wa %li\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf));
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
#include "dpdk.c"
static void send_packets(int port) {
......@@ -292,7 +355,6 @@ static void send_limit_handler(struct timespec initial, TRXEcpriState * s) {
d = calcdiff_ns(next, initial);
log_info("SEND_THREAD", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("SEND_THREAD", "Duration: %" PRIi64, d);
log_info("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_info("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
#ifdef TRACE
FILE * f;
......@@ -349,21 +411,30 @@ static void *recv_thread(void *p) {
while(1) {
int n = rbuf_write_amount(&rx_rbuf);
int drop_packet = 0;
usleep(1000);
nb_rx = rte_eth_rx_burst(port, 0, pkt, 1024);
if(nb_rx > n)
log_exit("RECV_THREAD", "%lip available to write in rx_rbuf, but %lip received", n, nb_rx);
drop_packet = nb_rx > n;
for(int i = 0; i < nb_rx; i++) {
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
if((pkt[i])->pkt_len > RX_MAX_PACKET_SIZE)
log_exit("RECV_THREAD", "Received packet of length %u, but RX_MAX_PACKET_SIZE = %u", (pkt[i])->pkt_len, RX_MAX_PACKET_SIZE);
memcpy(buf, rtebuf, (pkt[i])->pkt_len);
#ifdef TRACE
write_buffer(&rx_trace_buffer, 0, (uint8_t*) rtebuf, RX_MAX_PACKET_SIZE);
#endif
rbuf_update_write_index(&rx_rbuf);
rte_pktmbuf_free(pkt[i]);
if(drop_packet) {
for(int i = 0; i < nb_rx; i++)
rte_pktmbuf_free(pkt[i]);
if(nb_rx)
update_counter(&rx_drop_counter, nb_rx);
}
else {
for(int i = 0; i < nb_rx; i++) {
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
if((pkt[i])->pkt_len > RX_MAX_PACKET_SIZE)
log_exit("RECV_THREAD", "Received packet of length %u, but RX_MAX_PACKET_SIZE = %u", (pkt[i])->pkt_len, RX_MAX_PACKET_SIZE);
memcpy(buf, rtebuf, (pkt[i])->pkt_len);
#ifdef TRACE
write_buffer(&rx_trace_buffer, 0, (uint8_t*) rtebuf, RX_MAX_PACKET_SIZE);
#endif
rbuf_update_write_index(&rx_rbuf);
rte_pktmbuf_free(pkt[i]);
}
}
if(nb_rx)
......@@ -372,7 +443,10 @@ static void *recv_thread(void *p) {
update_counter(&recv_counter, nb_rx);
pthread_mutex_lock(&rx_mutex); pthread_cond_signal(&rx_cond); pthread_mutex_unlock(&rx_mutex);
if((recv_counter.counter - rx_cond_counter) >= 1000) {
pthread_mutex_lock(&rx_mutex); pthread_cond_signal(&rx_cond); pthread_mutex_unlock(&rx_mutex);
rx_cond_counter = recv_counter.counter;
}
}
pthread_exit(EXIT_SUCCESS);
}
......@@ -461,47 +535,32 @@ static void *encode_thread(void *p) {
pthread_mutex_unlock(&tx_mutex);
// If there are frames from trx_write callback to encode
pthread_mutex_lock(&trx_write_mutex);
if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts, frames_until_ts;
pthread_mutex_unlock(&trx_write_mutex);
pthread_mutex_lock(&trxw_mutex);
if(rbuf_read_amount(&trxw_rbuf) && rbuf_read_amount(&trxw_group_rbuf)) {
int64_t ts, frames_until_ts; sample_group_t * g; int nb_frames;
pthread_mutex_unlock(&trxw_mutex);
trx_started = 1;
// Get the next timestamp at which we should write
ts = trx_wb_ts[trx_wb_part_read_index];
frames_until_ts = ts - encode_counter.counter;
if(frames_until_ts > 0) {
if(!first_ts) {
log_exit("ENCODE_THREAD", "Gap between TRX timestamps: %li", frames_until_ts);
}
else {
int nb_frames = frames_until_ts > n ? n : frames_until_ts;
for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
#ifdef TRACE
write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
#endif
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, nb_frames);
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
if(nb_frames == frames_until_ts)
first_ts = 0;
}
g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) {
g->wait = 0;
g->count -= encode_counter.counter;
g->zeroes = 1;
}
if (frames_until_ts == 0) {
int next_trx_index = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS];
// TRX frames to encode
int nb_frames = (trx_write_rbuf.buf_len + next_trx_index - trx_write_rbuf.read_index) % trx_write_rbuf.buf_len;
int left_frames = nb_frames;
nb_frames = nb_frames < n ? nb_frames : n;
left_frames -= nb_frames;
nb_frames = g->count > n ? n : g->count;
g->count -= nb_frames;
if(g->zeroes) {
for(int j = 0; j < nb_frames; j++) {
float * const trx_samples = RBUF_READ0(trx_write_rbuf, float);
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
#ifdef TRACE
write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
#endif
rbuf_update_write_index(&tx_rbuf);
}
} else {
for(int j = 0; j < nb_frames; j++) {
float * const trx_samples = RBUF_READ0(trxw_rbuf, float);
uint8_t * const tx_frame = RBUF_WRITE0(tx_rbuf, uint8_t);
memset(samples_int, 0, 512);
......@@ -516,27 +575,21 @@ static void *encode_thread(void *p) {
#ifdef TRACE
write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
#endif
rbuf_update_write_index(&tx_rbuf);
rbuf_update_read_index(&trx_write_rbuf);
update_counter(&encode_counter, 1);
}
if(left_frames == 0) {
trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS;
rbuf_update_write_index(&tx_rbuf); // TODO update multiple indexes at once
rbuf_update_read_index(&trxw_rbuf);
}
else {
trx_wb_ts[trx_wb_part_read_index] += nb_frames;
}
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
}
// We have sent too much empty frames and missed a timestamp
else if(frames_until_ts < 0) {
log_error("TRX_ECPRI_SEND", "Missed trx_write timestamp: p %015li ts %015li r %015li n %015li e %015li", encode_counter.counter, ts, rbuf_read_amount(&trx_write_rbuf), n, frames_until_ts);
update_counter(&encode_counter, nb_frames);
if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf);
}
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
}
else {
// Send empty frames until we receive something
if(!trx_started && !sync_complete) {
pthread_mutex_unlock(&trx_write_mutex);
pthread_mutex_unlock(&trxw_mutex);
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
......@@ -553,10 +606,10 @@ static void *encode_thread(void *p) {
update_counter(&encode_counter, n);
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
}
// Wait for TRX
// Wait for TRX TODO: maybe poll here ?
else {
pthread_cond_wait(&trx_write_cond, &trx_write_mutex);
pthread_mutex_unlock(&trx_write_mutex);
pthread_cond_wait(&trxw_cond, &trxw_mutex);
pthread_mutex_unlock(&trxw_mutex);
}
}
}
......@@ -594,51 +647,37 @@ static void *decode_thread(void *p) {
if(!n)
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
int n_rbuf = rbuf_write_amount(&trxr_rbuf);
while(n_rbuf < n) {
usleep(100);
n_rbuf = rbuf_write_amount(&trxr_rbuf);
}
for(int j = 0; j < n; j++) {
int16_t samples_int[RX_N_SAMPLES];
const uint8_t * dst_mac = RBUF_READ0(rx_rbuf, uint8_t);
const uint8_t * src_mac = RBUF_READ0(rx_rbuf, uint8_t) + 6;
const uint16_t ether_type = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 12)));
const uint8_t ecpri_protocol_rev = *(RBUF_READ0(rx_rbuf, uint8_t) + 14);
const uint8_t ecpri_message_type = *(RBUF_READ0(rx_rbuf, uint8_t) + 15);
const uint16_t ecpri_payload_size = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 16)));
const uint16_t pc_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 18)));
const uint16_t seq_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 20)));
//const uint8_t * dst_mac = RBUF_READ0(rx_rbuf, uint8_t);
//const uint8_t * src_mac = RBUF_READ0(rx_rbuf, uint8_t) + 6;
//const uint16_t ether_type = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 12)));
//const uint8_t ecpri_protocol_rev = *(RBUF_READ0(rx_rbuf, uint8_t) + 14);
//const uint8_t ecpri_message_type = *(RBUF_READ0(rx_rbuf, uint8_t) + 15);
//const uint16_t ecpri_payload_size = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 16)));
//const uint16_t pc_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 18)));
//const uint16_t seq_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 20)));
const uint8_t * rx_samples = RBUF_READ0(rx_rbuf, uint8_t) + 22;
if(s->trace_period && !(k % s->trace_period)) {
fprintf(trace_file_desc,
"%010" PRIu64 " %x:%x:%x:%x:%x:%x %x:%x:%x:%x:%x:%x %x"
" %x %x %x"
" %x %x\n",
k,
dst_mac[0], dst_mac[1], dst_mac[2], dst_mac[3], dst_mac[4], dst_mac[5],
src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5],
ether_type,
ecpri_protocol_rev, ecpri_message_type, ecpri_payload_size,
pc_id, seq_id);
fprintf(trace_file_desc, "\n");
fflush(trace_file_desc);
}
k++;
rbuf_update_read_index(&rx_rbuf);
if((ecpri_payload_size - 4) % 60) {
fprintf(stderr, "received eCPRI payload of size %u, not a multiple of 60\n", (ecpri_payload_size));
exit(EXIT_FAILURE);
}
int n_rbuf = rbuf_write_amount(&trx_read_rbuf);
if(n_rbuf < ((ecpri_payload_size - 4) / 60))
log_exit("DECODE_THREAD", "Not enough space to write in trx_read_rbuf (%li < %li)", n_rbuf, ((ecpri_payload_size - 4) / 60));
for(int i = 0; i < (ecpri_payload_size - 4) / 60; i++) {
memset((uint8_t * ) samples_int, 0, sizeof(int16_t) * RX_N_SAMPLES);
decode_bf1(samples_int, rx_samples + i * 60, 16);
int16_to_float(RBUF_WRITE0(trx_read_rbuf, float), samples_int, RX_N_SAMPLES, mult);
rbuf_update_write_index(&trx_read_rbuf);
}
memset((uint8_t * ) samples_int, 0, sizeof(int16_t) * RX_N_SAMPLES);
decode_bf1(samples_int, rx_samples, 16);
int16_to_float(RBUF_WRITE0(trxr_rbuf, float), samples_int, RX_N_SAMPLES, mult);
rbuf_update_write_index(&trxr_rbuf);
update_counter(&decode_counter, 1);
}
if((decode_counter.counter - decode_cond_counter) >= 9216) {
pthread_mutex_lock(&decode_mutex); pthread_cond_signal(&decode_cond); pthread_mutex_unlock(&decode_mutex);
decode_cond_counter = decode_counter.counter;
}
}
pthread_exit(EXIT_SUCCESS);
......@@ -672,7 +711,9 @@ static void *statistic_thread(void *p) {
add_ns(&next, STATISTIC_REFRESH_RATE);
if((i % 50) == 0)
fprintf(stats_file_desc,
"%14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s \n",
"%13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s %13s \n",
"rx dropped",
"tx dropped",
"received",
"decode",
"read",
......@@ -686,7 +727,9 @@ static void *statistic_thread(void *p) {
"encode pps",
"sent pps");
fprintf(stats_file_desc,
"%14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps\n",
"%13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 " %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps %13" PRIi64 "pps\n",
rx_drop_counter.counter,
tx_drop_counter.counter,
recv_counter.counter,
decode_counter.counter,
read_counter.counter,
......@@ -842,9 +885,11 @@ int startdpdk(TRXEcpriState * s) {
init_dpdk(argc, argv);
log_info("TRX_ECPRI", "Start");
//set_latency_target();
set_latency_target();
seq_id = 0;
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
init_counter(&recv_counter);
init_counter(&decode_counter);
init_counter(&read_counter);
......@@ -852,16 +897,11 @@ int startdpdk(TRXEcpriState * s) {
init_counter(&encode_counter);
init_counter(&sent_counter);
ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000;
rxtx_buf_size = (3 * ecpri_period_mult);
RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, TX_ECPRI_PACKET_SIZE, uint8_t);
RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, RX_N_SAMPLES, float);
RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, TX_N_SAMPLES, float);
trx_wb_part_read_index = 0;
trx_wb_part_write_index = 0;
RBUF_INIT(rx_rbuf, "RX ring buffer", TXRX_BUF_MAX_SIZE, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", TXRX_BUF_MAX_SIZE, TX_ECPRI_PACKET_SIZE, uint8_t);
RBUF_INIT(trxr_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, RX_N_SAMPLES, float);
RBUF_INIT(trxw_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, TX_N_SAMPLES, float);
RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t);
pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&encode_mutex, NULL);
......@@ -871,7 +911,6 @@ int startdpdk(TRXEcpriState * s) {
pthread_cond_init(&encode_cond, NULL);
pthread_cond_init(&rx_cond, NULL);
pthread_cond_init(&decode_cond, NULL);
sem_init(&trx_read_sem, 0, 0);
memset((uint8_t *) ecpri_message, 0, TX_ECPRI_PACKET_SIZE);
......@@ -900,7 +939,7 @@ int startdpdk(TRXEcpriState * s) {
*((uint16_t *) (ecpri_message + 2)) = htons(244);
*((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);
for(int i = 0; i < rxtx_buf_size; i++)
for(int i = 0; i < TXRX_BUF_MAX_SIZE; i++)
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);
start_threads(s);
......@@ -921,48 +960,55 @@ int64_t prev_count = 0;
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int write_count = count / M;
int64_t ts = timestamp / M;
int n_rbuf = rbuf_write_amount(&trx_write_rbuf);
if(write_count > n_rbuf)
log_exit("TRX_ECPRI_WRITE", "Not enough space in trx_write_rbuf (%li > %li)", write_count, n_rbuf);
if(prev_count && (ts - prev_ts) != prev_count)
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps");
prev_ts = ts;
prev_count = write_count;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index;
trx_wb_ts[trx_wb_part_write_index] = ts;
float ** _samples; int write_count; int64_t ts; sample_group_t * g;
_samples = (float **) __samples;
write_count = count / M;
ts = timestamp / M;
if(prev_count && (ts - prev_ts) != prev_count) {
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts));
}
prev_ts = ts; prev_count = write_count;
int n_rbuf = rbuf_write_amount(&trxw_rbuf);
if(write_count > n_rbuf) {
update_counter(&tx_drop_counter, write_count);
return;
}
if(first_trx_write) {
sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g2->count = ts;
g2->wait = 1;
g2->zeroes = 1;
rbuf_update_write_index(&trxw_group_rbuf);
}
g = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g->zeroes = __samples ? 0 : 1;
g->wait = 0;
g->count = write_count;
for(int k = 0; k < write_count; k++) {
if(__samples) {
for(int i = 0; i < 4; i++) {
for(int j = 0; j < 64; j++) {
RBUF_WRITE(trx_write_rbuf, k, float)[i * 64 + j] = _samples[i][j + (k * 64)];
RBUF_WRITE(trxw_rbuf, k, float)[i * 64 + j] = _samples[i][j + (k * 64)];
}
}
}
else {
memcpy((uint8_t *) RBUF_WRITE(trx_write_rbuf, k, float), iq_frame_empty, sizeof(float) * 4 * 64);
}
#ifdef TRACE
if(__samples)
write_buffer(&trxw_trace_buffer, 0, iq_frame_full, 1024);
else
write_buffer(&trxw_trace_buffer, 0, iq_frame_empty, 1024);
write_buffer(&trxw_trace_buffer, 0, __samples ? iq_frame_full : iq_frame_empty, 1024);
#endif
}
trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index + write_count;
// Update write index at the end so that everything stays consistent
for(int k = 0; k < write_count; k++)
rbuf_update_write_index(&trx_write_rbuf);
update_counter(&write_counter, write_count);
pthread_mutex_lock(&trx_write_mutex);
pthread_cond_signal(&trx_write_cond);
pthread_mutex_unlock(&trx_write_mutex);
if(__samples) {
// Update write index at the end so that everything stays consistent
for(int k = 0; k < write_count; k++)
rbuf_update_write_index(&trxw_rbuf);
}
rbuf_update_write_index(&trxw_group_rbuf);
update_counter(&write_counter, count / M);
pthread_mutex_lock(&trxw_mutex); pthread_cond_signal(&trxw_cond); pthread_mutex_unlock(&trxw_mutex);
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
......@@ -973,25 +1019,25 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
log_limit("TRX_ECPRI_READ", "count = %ld", count);
pthread_mutex_lock(&decode_mutex);
while(rbuf_read_amount(&trx_read_rbuf) < read_count) {
pthread_cond_wait(&decode_cond, &decode_mutex);
int nr = rbuf_read_amount(&trxr_rbuf);
while(nr < read_count) {
usleep(50);
nr = rbuf_read_amount(&trxr_rbuf);
}
pthread_mutex_unlock(&decode_mutex);
sync_complete = 1;
for(int k = 0; k < read_count; k++) {
float * trx_samples;
trx_samples = RBUF_READ0(trx_read_rbuf, float);
trx_samples = RBUF_READ0(trxr_rbuf, float);
for(int i = 0; i < 64; i++)
_samples[0][i] = trx_samples[i];
#ifdef TRACE
write_buffer(&trxr_trace_buffer, 0, (uint8_t *) trx_samples, 64 * sizeof(float));
#endif
rbuf_update_read_index(&trx_read_rbuf);
rbuf_update_read_index(&trxr_rbuf);
}
*ptimestamp = read_counter.counter * M;
*ptimestamp = recv_counter.counter * M;
update_counter(&read_counter, read_count);
return count;
......
......@@ -52,25 +52,6 @@ static void log_info(const char * section, const char * msg, ...) {
puts(line);
}
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
#ifdef DEBUG
static void log_debug(const char * section, const char * msg, ...) {
time_t t;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment