Commit 6976e07d authored by Joanne Hugé's avatar Joanne Hugé

WIP: START_RECEIVING

parent 1ab0e325
...@@ -57,8 +57,8 @@ ...@@ -57,8 +57,8 @@
#include "utils.c" #include "utils.c"
//#define EFREQ 38400 #define EFREQ 38400
#define EFREQ 100 //#define EFREQ 100
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150) #define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150)
//#define START_SENDING //#define START_SENDING
#define START_RECEIVING #define START_RECEIVING
...@@ -147,6 +147,7 @@ static volatile counter_stat_t rx_drop_counter; // frames sent to eRE ...@@ -147,6 +147,7 @@ static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile int sync_complete = 0; static volatile int sync_complete = 0;
static volatile int received_pkts = 0;
static volatile int sync_happened = 0; static volatile int sync_happened = 0;
static int first_trx_write = 1; static int first_trx_write = 1;
...@@ -469,6 +470,8 @@ static void *recv_thread(void *p) { ...@@ -469,6 +470,8 @@ static void *recv_thread(void *p) {
nb_rx = 1024; nb_rx = 1024;
#endif #endif
received_pkts = 1;
n = rbuf_write_amount(&rx_rbuf); n = rbuf_write_amount(&rx_rbuf);
drop_packet = nb_rx > n; drop_packet = nb_rx > n;
...@@ -595,11 +598,33 @@ static void *encode_thread(void *p) { ...@@ -595,11 +598,33 @@ static void *encode_thread(void *p) {
#endif #endif
} }
// If we have frames to encode (is there space in TX buffer)
n = rbuf_write_amount(&tx_rbuf); n = rbuf_write_amount(&tx_rbuf);
if(n) {
// Send empty frames until we receive something
#ifdef START_SENDING
if(!sync_complete && !n) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(encode_counter.counter > target_counter) {
int k = (encode_counter.counter - target_counter + EFREQ - 1) / EFREQ;
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * EFREQ;
}
n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
n = (n < EFREQ) ? n : EFREQ;
for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, n);
}
#endif
// If we have frames to encode (is there space in TX buffer)
// If there are frames from trx_write callback to encode // If there are frames from trx_write callback to encode
if(rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) { if(n && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g; int nb_frames; sample_group_t * g; int nb_frames;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t); g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
...@@ -656,30 +681,6 @@ static void *encode_thread(void *p) { ...@@ -656,30 +681,6 @@ static void *encode_thread(void *p) {
rbuf_update_read_index(&trxw_group_rbuf); rbuf_update_read_index(&trxw_group_rbuf);
} }
} }
else {
// Send empty frames until we receive something
#ifdef START_SENDING
if(!sync_complete) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(encode_counter.counter > target_counter) {
int k = (encode_counter.counter - target_counter + EFREQ - 1) / EFREQ;
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * EFREQ;
}
n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
n = (n < EFREQ) ? n : EFREQ;
for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, n);
}
#endif
}
}
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
...@@ -687,6 +688,8 @@ static void *decode_thread(void *p) { ...@@ -687,6 +688,8 @@ static void *decode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next;
int64_t target_counter = 0;
log_info("DECODE_THREAD", "Thread init"); log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
...@@ -695,12 +698,30 @@ static void *decode_thread(void *p) { ...@@ -695,12 +698,30 @@ static void *decode_thread(void *p) {
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity);
for(;;) { for(int64_t i = 0;; i++) {
int n; int n, nc;
#ifdef START_RECEIVING
if(!received_pkts && !rbuf_read_amount(&rx_rbuf)) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(decode_counter.counter > target_counter) {
int k = (decode_counter.counter - target_counter + EFREQ - 1) / EFREQ;
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * EFREQ;
}
n = EFREQ;
for(int j = 0; j < n; j++)
rbuf_update_write_index(&trxr_rbuf);
update_counter(&decode_counter, n);
continue;
}
#endif
while(!(n = rbuf_read_amount(&rx_rbuf))); while(!(n = rbuf_read_amount(&rx_rbuf)));
while(rbuf_write_amount(&trxr_rbuf[0]) < n); while(rbuf_write_amount(&trxr_rbuf[0]) < n);
int nc;
while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) { while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) {
uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22; uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22;
...@@ -1020,7 +1041,6 @@ static void trx_ecpri_end(TRXState *s1) ...@@ -1020,7 +1041,6 @@ static void trx_ecpri_end(TRXState *s1)
static int64_t prev_ts = 0; static int64_t prev_ts = 0;
static int64_t prev_count = 0; static int64_t prev_count = 0;
#define M 32 #define M 32
static volatile int64_t trxr_counter = 0;
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md) static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{ {
...@@ -1075,10 +1095,6 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -1075,10 +1095,6 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
update_counter(&write_counter, count / M); update_counter(&write_counter, count / M);
} }
static int64_t trxr_target_counter = 0;
static int trxr_i = 0;
static struct timespec trxr_next;
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md) static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{ {
(void) s1; (void) s1;
...@@ -1087,24 +1103,6 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1087,24 +1103,6 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
int read_count = (count / M); int read_count = (count / M);
int offset = 0; int offset = 0;
#ifdef START_RECEIVING
if(!sync_complete && rbuf_read_amount(&trxr_rbuf[0]) < read_count) {
if(trxr_i == 0)
clock_gettime(CLOCK_TAI, &trxr_next);
// Limit packets sent
if(trxr_counter > trxr_target_counter) {
int k = (trxr_counter - trxr_target_counter + EFREQ - 1) / EFREQ;
add_ns(&trxr_next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &trxr_next, NULL);
trxr_target_counter += k * EFREQ;
}
*ptimestamp = trxr_counter * M;
trxr_counter += read_count;
update_counter(&read_counter, read_count);
return count;
}
#endif
while(rbuf_read_amount(&trxr_rbuf[0]) < read_count); while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter); log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);
...@@ -1123,7 +1121,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1123,7 +1121,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
offset += len; offset += len;
} }
*ptimestamp = (recv_counter.counter) * M; *ptimestamp = (read_counter.counter) * M;
update_counter(&read_counter, read_count); update_counter(&read_counter, read_count);
return count; return count;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment