Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
trx-ecpri
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
trx-ecpri
Commits
431bebe4
Commit
431bebe4
authored
May 04, 2022
by
Joanne Hugé
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Use parameters from rf_driver config instead of macros
parent
bdfdf896
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
181 additions
and
189 deletions
+181
-189
trx_ecpri_dpdk.c
trx_ecpri_dpdk.c
+181
-189
No files found.
trx_ecpri_dpdk.c
View file @
431bebe4
...
...
@@ -48,40 +48,19 @@
#include "private/trx_driver.h"
#include "utils.c"
#define EFREQ 10
//#define DEBUG // Enables / deactivates log_debug
#define TRACE 0
//#define TRACE_TX
#define TRACE_RX
//#define MONITOR
//#define MONITOR_EXIT
#define RECV_PPS_THRESHOLD (EFREQ * 100 * 9 / 10)
#define RECV_STOP_THRESHOLD 2
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
//#define START_SENDING
//#define START_RECEIVING
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 500)
//#define DST_ADDR_SYNTAX // Depends on DPDK version
#define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
#define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define N_SAMPLES (32)
#define TRX_MAX_GROUP 1500
//#define TRX_BUF_MAX_SIZE 500000
//#define TXRX_BUF_MAX_SIZE 500000
#define TRX_BUF_MAX_SIZE 4000
#define TXRX_BUF_MAX_SIZE 4000
#define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000)
#define STAT_INT_LEN "9"
typedef
struct
{
...
...
@@ -102,6 +81,19 @@ typedef struct {
int
statistic_affinity
;
int
ecpri_period
;
int
flow_id
;
int
frame_frequency
;
int
trx_buf_size
;
int
txrx_buf_size
;
int
trace_rx
;
int
trace_tx
;
int
trace_offset
;
int
monitor_pps
;
int
monitor_trigger_duration
;
int
start_sending
;
int
start_receiving
;
int
rx_n_channel
;
int
tx_n_channel
;
int
statistics_refresh_rate_ns
;
int
sample_rate
;
}
TRXEcpriState
;
...
...
@@ -137,9 +129,9 @@ typedef struct {
// Buffers
static
ring_buffer_t
rx_rbuf
;
// Received packets
static
ring_buffer_t
trxr_rbuf
[
RX_N_CHANNEL
];
// Decoded IQ samples
static
ring_buffer_t
trxr_rbuf
[
4
];
// Decoded IQ samples
static
ring_buffer_t
tx_rbuf
;
// Packets to send
static
ring_buffer_t
trxw_rbuf
[
TX_N_CHANNEL
];
// Uncompressed IQ samples
static
ring_buffer_t
trxw_rbuf
[
4
];
// Uncompressed IQ samples
static
ring_buffer_t
trxw_group_rbuf
;
// Group of IQ samples
// Counters
...
...
@@ -161,12 +153,12 @@ static int first_trx_write = 1;
static
uint8_t
pkt_frame_full
[
1024
];
#endif
#ifdef TRACE
static
volatile
int
rx_trace_ready
=
0
;
static
volatile
int
tx_trace_ready
=
1
;
static
int64_t
encode_counter_prev
=
0
;
static
int64_t
decode_counter_prev
=
0
;
#endif
static
int
recv_pps_threshold
;
// Network
static
volatile
int
seq_id
;
...
...
@@ -383,22 +375,18 @@ static void update_counter_pps(volatile counter_stat_t * c) {
static
void
update_counter
(
volatile
counter_stat_t
*
c
,
int64_t
v
)
{
c
->
counter
+=
v
;
}
#ifdef TRACE
static
void
trace_handler
(
struct
timespec
initial
,
TRXEcpriState
*
s
)
{
struct
timespec
next
;
int
ready
=
1
;
#ifdef TRACE_TX
ready
&=
tx_trace_ready
;
#endif
#ifdef TRACE_RX
ready
&=
rx_trace_ready
;
#endif
if
(
ready
)
{
int64_t
d
;
FILE
*
f
;
char
n
[
256
];
uint8_t
ones
[
14
];
int
start
;
uint8_t
ones
[
14
];
for
(
int
i
=
0
;
i
<
14
;
i
++
)
ones
[
i
]
=
0xff
;
...
...
@@ -406,15 +394,13 @@ static void trace_handler(struct timespec initial, TRXEcpriState * s) {
d
=
calcdiff_ns
(
next
,
initial
);
log_info
(
"TRACE"
,
"Packets sent: %"
PRIi64
,
sent_counter
.
counter
);
log_info
(
"TRACE"
,
"Duration: %"
PRIi64
,
d
);
log_info
(
"TRACE"
,
"FRAME_FREQ: %"
PRIi64
,
FRAME_FREQ
);
usleep
(
1000
*
200
);
#ifdef TRACE_TX
memset
(
n
,
'\0'
,
256
);
sprintf
(
n
,
"%s/tx.trace"
,
s
->
log_directory
);
f
=
fopen
(
n
,
"wb+"
);
start
=
(
TRACE
+
encode_counter_prev
)
%
tx_rbuf
.
buf_len
;
start
=
(
s
->
trace_offset
+
encode_counter_prev
)
%
tx_rbuf
.
buf_len
;
log_info
(
"TRACE"
,
"Writing %d frames to tx.trace"
,
(
tx_rbuf
.
write_index
+
tx_rbuf
.
buf_len
-
start
)
%
tx_rbuf
.
buf_len
);
for
(
int
i
=
start
;
i
!=
tx_rbuf
.
write_index
;
i
=
(
i
+
1
)
%
tx_rbuf
.
buf_len
)
{
fwrite
(
ones
,
14
,
1
,
f
);
...
...
@@ -425,20 +411,18 @@ static void trace_handler(struct timespec initial, TRXEcpriState * s) {
memset
(
n
,
'\0'
,
256
);
sprintf
(
n
,
"%s/trxw.trace"
,
s
->
log_directory
);
f
=
fopen
(
n
,
"wb+"
);
start
=
(
TRACE
)
%
trxw_rbuf
[
0
].
buf_len
;
start
=
(
s
->
trace_offset
)
%
trxw_rbuf
[
0
].
buf_len
;
log_info
(
"TRACE"
,
"Writing %d frames to trxw.trace"
,
(
trxw_rbuf
[
0
].
write_index
+
trxw_rbuf
[
0
].
buf_len
-
start
)
%
trxw_rbuf
[
0
].
buf_len
);
for
(
int
i
=
start
;
i
!=
trxw_rbuf
[
0
].
write_index
;
i
=
(
i
+
1
)
%
trxw_rbuf
[
0
].
buf_len
)
{
for
(
int
j
=
0
;
j
<
TX_N_CHANNEL
;
j
++
)
for
(
int
j
=
0
;
j
<
s
->
tx_n_channel
;
j
++
)
fwrite
((
uint8_t
*
)
(((
Complex
*
)
trxw_rbuf
[
j
].
buffer
)
+
i
*
trxw_rbuf
[
0
].
len
),
trxw_rbuf
[
0
].
len
*
sizeof
(
Complex
),
1
,
f
);
}
fclose
(
f
);
#endif
#ifdef TRACE_RX
memset
(
n
,
'\0'
,
256
);
sprintf
(
n
,
"%s/rx.trace"
,
s
->
log_directory
);
f
=
fopen
(
n
,
"wb+"
);
start
=
TRACE
%
rx_rbuf
.
buf_len
;
start
=
s
->
trace_offset
%
rx_rbuf
.
buf_len
;
log_info
(
"TRACE"
,
"Writing %d frames to rx.trace"
,
(
rx_rbuf
.
write_index
+
rx_rbuf
.
buf_len
-
start
)
%
rx_rbuf
.
buf_len
);
for
(
int
i
=
start
;
i
!=
rx_rbuf
.
write_index
;
i
=
(
i
+
1
)
%
rx_rbuf
.
buf_len
)
{
fwrite
(((
uint8_t
*
)
rx_rbuf
.
buffer
)
+
i
*
rx_rbuf
.
len
,
rx_rbuf
.
len
,
1
,
f
);
...
...
@@ -448,18 +432,16 @@ static void trace_handler(struct timespec initial, TRXEcpriState * s) {
memset
(
n
,
'\0'
,
256
);
sprintf
(
n
,
"%s/trxr.trace"
,
s
->
log_directory
);
f
=
fopen
(
n
,
"wb+"
);
start
=
(
TRACE
+
decode_counter_prev
)
%
trxr_rbuf
[
0
].
buf_len
;
start
=
(
s
->
trace_offset
+
decode_counter_prev
)
%
trxr_rbuf
[
0
].
buf_len
;
log_info
(
"TRACE"
,
"Writing %d frames to trxr.trace"
,
(
trxr_rbuf
[
0
].
write_index
+
trxr_rbuf
[
0
].
buf_len
-
start
)
%
trxr_rbuf
[
0
].
buf_len
);
for
(
int
i
=
start
;
i
!=
trxr_rbuf
[
0
].
write_index
;
i
=
(
i
+
1
)
%
trxr_rbuf
[
0
].
buf_len
)
{
for
(
int
j
=
0
;
j
<
RX_N_CHANNEL
;
j
++
)
for
(
int
j
=
0
;
j
<
s
->
rx_n_channel
;
j
++
)
fwrite
((
uint8_t
*
)
(((
Complex
*
)
trxr_rbuf
[
j
].
buffer
)
+
i
*
trxr_rbuf
[
0
].
len
),
trxr_rbuf
[
0
].
len
*
sizeof
(
Complex
),
1
,
f
);
}
fclose
(
f
);
#endif
log_exit
(
""
,
"Finished tracing"
);
}
}
#endif
static
void
*
recv_thread
(
void
*
p
)
{
...
...
@@ -502,20 +484,13 @@ static void *recv_thread(void *p) {
// Limit packets sent
if
(
recv_counter
.
counter
>=
target_counter
)
{
clock_gettime
(
CLOCK_TAI
,
&
current
);
if
(
i
&&
calcdiff_ns
(
current
,
previous
)
<
(
1000
*
1000
*
10
))
{
//for(int i = 0; i < 10000; i++)
// asm("NOP");
//usleep(10);
}
else
{
target_counter
+=
EFREQ
;
if
(
!
i
||
calcdiff_ns
(
current
,
previous
)
>=
(
1000
*
1000
*
10
))
{
target_counter
+=
s
->
frame_frequency
/
100
;
previous
=
current
;
}
}
if
(
recv_counter
.
counter
<
target_counter
)
{
nb_rx
=
1024
;
//for(int i = 0; i < 700000; i++)
// asm("NOP");
usleep
(
200
);
}
else
...
...
@@ -542,17 +517,15 @@ static void *recv_thread(void *p) {
nr
=
nb_rx
;
while
((
nc
=
rbuf_contiguous_copy
(
NULL
,
&
rx_rbuf
,
nr
)))
{
#ifdef TRACE
#ifdef TRACE_RX
if
((
recv_counter
.
counter
+
nc
)
>=
(
rx_rbuf
.
buf_len
+
TRACE
))
{
rx_trace_ready
=
1
;
log_info
(
"RECV_THREAD"
,
"RX Trace ready"
);
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
rx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
if
(
s
->
trace_rx
)
{
if
((
recv_counter
.
counter
+
nc
)
>=
(
rx_rbuf
.
buf_len
+
s
->
trace_offset
))
{
rx_trace_ready
=
1
;
log_info
(
"RECV_THREAD"
,
"RX Trace ready"
)
;
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
rx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
}
}
#endif
#endif
buf
=
((
uint8_t
*
)
rx_rbuf
.
buffer
)
+
(
rx_rbuf
.
write_index
*
rx_rbuf
.
len
);
for
(
int
i
=
0
;
i
<
nc
;
i
++
)
{
...
...
@@ -637,11 +610,9 @@ static void *encode_thread(void *p) {
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
#ifdef START_SENDING
int64_t
target_counter
=
0
;
struct
timespec
next
;
int
reset_encode_counter
=
1
;
#endif
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
...
...
@@ -655,32 +626,33 @@ static void *encode_thread(void *p) {
n
=
rbuf_write_amount
(
&
tx_rbuf
);
// Send empty frames until we receive something
#ifdef START_SENDING
if
(
!
sync_complete
)
{
if
(
i
==
0
)
clock_gettime
(
CLOCK_TAI
,
&
next
);
// Limit packets sent
if
(
encode_counter
.
counter
>
target_counter
)
{
int
k
=
(
encode_counter
.
counter
-
target_counter
+
EFREQ
-
1
)
/
EFREQ
;
add_ns
(
&
next
,
k
*
1000
*
1000
*
10
);
// 10ms to send 38400 packets
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
target_counter
+=
k
*
EFREQ
;
if
(
s
->
start_sending
)
{
if
(
!
sync_complete
)
{
if
(
i
==
0
)
clock_gettime
(
CLOCK_TAI
,
&
next
);
// Limit packets sent
if
(
encode_counter
.
counter
>
target_counter
)
{
int
k
=
(
encode_counter
.
counter
-
target_counter
+
(
s
->
frame_frequency
/
100
)
-
1
)
/
(
s
->
frame_frequency
/
100
);
add_ns
(
&
next
,
k
*
1000
*
1000
*
10
);
// 10ms to send 38400 packets
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
target_counter
+=
k
*
s
->
frame_frequency
/
100
;
}
n
=
(
n
>
TX_SYNC_BURST_SIZE
)
?
n
:
TX_SYNC_BURST_SIZE
;
n
=
(
n
<
(
s
->
frame_frequency
/
100
))
?
n
:
(
s
->
frame_frequency
/
100
);
for
(
int
j
=
0
;
j
<
n
;
j
++
)
{
*
((
uint16_t
*
)
(
RBUF_WRITE0
(
tx_rbuf
,
uint8_t
)
+
6
))
=
htons
(
seq_id
++
);
rbuf_update_write_index
(
&
tx_rbuf
);
}
update_counter
(
&
encode_counter
,
n
);
}
n
=
(
n
>
TX_SYNC_BURST_SIZE
)
?
n
:
TX_SYNC_BURST_SIZE
;
n
=
(
n
<
EFREQ
)
?
n
:
EFREQ
;
for
(
int
j
=
0
;
j
<
n
;
j
++
)
{
*
((
uint16_t
*
)
(
RBUF_WRITE0
(
tx_rbuf
,
uint8_t
)
+
6
))
=
htons
(
seq_id
++
);
rbuf_update_write_index
(
&
tx_rbuf
);
else
if
(
reset_encode_counter
)
{
if
(
s
->
trace_tx
)
encode_counter_prev
=
encode_counter
.
counter
;
encode_counter
.
counter
=
0
;
reset_encode_counter
=
0
;
seq_id
=
0
;
}
update_counter
(
&
encode_counter
,
n
);
}
else
if
(
reset_encode_counter
)
{
encode_counter_prev
=
encode_counter
.
counter
;
encode_counter
.
counter
=
0
;
reset_encode_counter
=
0
;
seq_id
=
0
;
}
#endif
// If we have frames to encode (is there space in TX buffer)
// If there are frames from trx_write callback to encode
...
...
@@ -695,17 +667,15 @@ static void *encode_thread(void *p) {
}
nb_frames
=
g
->
count
>
n
?
n
:
g
->
count
;
g
->
count
-=
nb_frames
;
#ifdef TRACE
#ifdef TRACE_TX
if
(
sync_complete
&&
(
encode_counter
.
counter
+
nb_frames
)
>=
(
tx_rbuf
.
buf_len
+
TRACE
))
{
tx_trace_ready
=
1
;
log_info
(
"ENCODE_THREAD"
,
"TX Trace ready"
);
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
tx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
if
(
s
->
trace_tx
)
{
if
(
sync_complete
&&
(
encode_counter
.
counter
+
nb_frames
)
>=
(
tx_rbuf
.
buf_len
+
s
->
trace_offset
))
{
tx_trace_ready
=
1
;
log_info
(
"ENCODE_THREAD"
,
"TX Trace ready"
)
;
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
tx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
}
}
#endif
#endif
if
(
g
->
zeroes
)
{
for
(
int
j
=
0
;
j
<
nb_frames
;
j
++
)
{
memset
(
RBUF_WRITE0
(
tx_rbuf
,
uint8_t
)
+
8
,
0x00
,
240
);
...
...
@@ -720,14 +690,14 @@ static void *encode_thread(void *p) {
while
((
nc
=
rbuf_contiguous_copy
(
&
trxw_rbuf
[
0
],
&
tx_rbuf
,
nf
)))
{
Complex
*
iq_samples
[
4
];
uint8_t
*
buf
=
RBUF_WRITE0
(
tx_rbuf
,
uint8_t
)
+
8
;
for
(
int
j
=
0
;
j
<
TX_N_CHANNEL
;
j
++
)
for
(
int
j
=
0
;
j
<
s
->
tx_n_channel
;
j
++
)
iq_samples
[
j
]
=
((
Complex
*
)
trxw_rbuf
[
j
].
buffer
)
+
(
trxw_rbuf
[
0
].
read_index
*
trxw_rbuf
[
0
].
len
);
for
(
int
i
=
0
;
i
<
nc
;
i
++
)
{
for
(
int
i
=
0
;
i
<
TX_N_CHANNEL
;
i
++
)
for
(
int
i
=
0
;
i
<
s
->
tx_n_channel
;
i
++
)
encode_s64_b60_2
(
buf
+
i
*
60
,
(
float
*
)
iq_samples
[
i
]);
*
((
uint16_t
*
)(
buf
-
2
))
=
htons
(
seq_id
++
);
for
(
int
j
=
0
;
j
<
TX_N_CHANNEL
;
j
++
)
for
(
int
j
=
0
;
j
<
s
->
tx_n_channel
;
j
++
)
iq_samples
[
j
]
+=
trxw_rbuf
[
0
].
len
;
buf
+=
tx_rbuf
.
len
;
}
...
...
@@ -752,11 +722,9 @@ static void *decode_thread(void *p) {
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
#ifdef START_RECEIVING
struct
timespec
next
;
int64_t
target_counter
=
0
;
int
reset_decode_counter
=
1
;
#endif
log_info
(
"DECODE_THREAD"
,
"Thread init"
);
// Set thread CPU affinity
...
...
@@ -767,29 +735,30 @@ static void *decode_thread(void *p) {
for
(
int64_t
i
=
0
;;
i
++
)
{
int
n
,
nc
;
#ifdef START_RECEIVING
if
(
!
received_pkts
)
{
if
(
i
==
0
)
clock_gettime
(
CLOCK_TAI
,
&
next
);
// Limit packets sent
if
(
decode_counter
.
counter
>
target_counter
)
{
int
k
=
(
decode_counter
.
counter
-
target_counter
+
EFREQ
-
1
)
/
EFREQ
;
add_ns
(
&
next
,
k
*
1000
*
1000
*
10
);
// 10ms to send 38400 packets
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
target_counter
+=
k
*
EFREQ
;
if
(
s
->
start_receiving
)
{
if
(
!
received_pkts
)
{
if
(
i
==
0
)
clock_gettime
(
CLOCK_TAI
,
&
next
);
// Limit packets sent
if
(
decode_counter
.
counter
>
target_counter
)
{
int
k
=
(
decode_counter
.
counter
-
target_counter
+
(
s
->
frame_frequency
/
100
)
-
1
)
/
(
s
->
frame_frequency
/
100
);
add_ns
(
&
next
,
k
*
1000
*
1000
*
10
);
// 10ms to send 38400 packets
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
target_counter
+=
k
*
(
s
->
frame_frequency
/
100
);
}
n
=
(
s
->
frame_frequency
/
100
);
for
(
int
j
=
0
;
j
<
n
;
j
++
)
rbuf_update_write_index
(
&
trxr_rbuf
[
0
]);
update_counter
(
&
decode_counter
,
n
);
continue
;
}
else
if
(
reset_decode_counter
)
{
if
(
s
->
trace_rx
)
decode_counter_prev
=
decode_counter
.
counter
;
decode_counter
.
counter
=
0
;
reset_decode_counter
=
0
;
}
n
=
EFREQ
;
for
(
int
j
=
0
;
j
<
n
;
j
++
)
rbuf_update_write_index
(
&
trxr_rbuf
[
0
]);
update_counter
(
&
decode_counter
,
n
);
continue
;
}
else
if
(
reset_decode_counter
)
{
decode_counter_prev
=
decode_counter
.
counter
;
decode_counter
.
counter
=
0
;
reset_decode_counter
=
0
;
}
#endif
while
(
!
(
n
=
rbuf_read_amount
(
&
rx_rbuf
)))
{};
while
(
rbuf_write_amount
(
&
trxr_rbuf
[
0
])
<
n
)
{};
...
...
@@ -798,24 +767,22 @@ static void *decode_thread(void *p) {
uint8_t
*
buf
=
((
uint8_t
*
)
rx_rbuf
.
buffer
)
+
(
rx_rbuf
.
read_index
*
rx_rbuf
.
len
)
+
22
;
#ifdef TRACE
#ifdef TRACE_RX
if
(
received_pkts
&&
((
decode_counter
.
counter
+
nc
)
>=
(
trxr_rbuf
[
0
].
buf_len
+
TRACE
)))
{
rx_trace_ready
=
1
;
log_info
(
"DECODE_THREAD"
,
"RX Trace ready"
);
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
rx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
if
(
s
->
trace_rx
)
{
if
(
received_pkts
&&
((
decode_counter
.
counter
+
nc
)
>=
(
trxr_rbuf
[
0
].
buf_len
+
s
->
trace_offset
)))
{
rx_trace_ready
=
1
;
log_info
(
"DECODE_THREAD"
,
"RX Trace ready"
)
;
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
rx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
}
}
#endif
#endif
Complex
*
iq_samples
[
4
];
for
(
int
i
=
0
;
i
<
RX_N_CHANNEL
;
i
++
)
for
(
int
i
=
0
;
i
<
s
->
rx_n_channel
;
i
++
)
iq_samples
[
i
]
=
(((
Complex
*
)
trxr_rbuf
[
i
].
buffer
)
+
(
trxr_rbuf
[
0
].
write_index
*
trxr_rbuf
[
0
].
len
));
for
(
int
i
=
0
;
i
<
nc
;
i
++
)
{
for
(
int
j
=
0
;
j
<
RX_N_CHANNEL
;
j
++
)
{
for
(
int
j
=
0
;
j
<
s
->
rx_n_channel
;
j
++
)
{
decode_s64_b60_2
((
float
*
)
(
iq_samples
[
j
]
+
i
*
32
),
buf
+
j
*
60
+
i
*
rx_rbuf
.
len
);
}
}
...
...
@@ -831,9 +798,7 @@ static void *decode_thread(void *p) {
}
static
void
*
statistic_thread
(
void
*
p
)
{
struct
timespec
next
,
initial
;
#ifdef MONITOR
int64_t
recv_stop
=
0
;
#endif
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
FILE
*
stats_file_desc
;
...
...
@@ -857,10 +822,9 @@ static void *statistic_thread(void *p) {
clock_gettime
(
CLOCK_TAI
,
&
initial
);
next
=
initial
;
for
(
int64_t
i
=
0
;;
i
++
)
{
add_ns
(
&
next
,
STATISTIC_REFRESH_RATE
);
#ifdef TRACE
trace_handler
(
initial
,
s
);
#endif
add_ns
(
&
next
,
s
->
statistics_refresh_rate_ns
);
if
(
s
->
trace_rx
)
trace_handler
(
initial
,
s
);
print_stats
(
stats_file_desc
,
(
i
%
50
)
==
0
);
#ifdef DEBUG
fprintf
(
stats_file_desc
,
...
...
@@ -885,27 +849,26 @@ static void *statistic_thread(void *p) {
update_counter_pps
(
&
write_counter
);
update_counter_pps
(
&
encode_counter
);
update_counter_pps
(
&
sent_counter
);
#ifdef MONITOR
if
(
recv_counter
.
pps
>
RECV_PPS_THRESHOLD
)
{
recv_pps_threshold_hit
=
1
;
}
if
(
recv_pps_threshold_hit
&&
recv_counter
.
pps
<
RECV_PPS_THRESHOLD
)
{
struct
timespec
_ts
;
int64_t
ts
;
clock_gettime
(
CLOCK_MONOTONIC
,
&
_ts
);
ts
=
ts_to_int
(
_ts
);
if
((
recv_stop
&&
((
ts
-
recv_stop
)
>
RECV_STOP_THRESHOLD
*
INT64_C
(
1000000000
))))
{
#ifdef MONITOR_EXIT
log_exit
(
"MONITOR"
,
"Stopped recieving packets, restarting..."
);
#endif
log_info
(
"MONITOR"
,
"Stopped recieving packets, sending again..."
);
sync_complete
=
0
;
recv_stop
=
0
;
if
(
s
->
monitor_pps
)
{
if
(
recv_counter
.
pps
>
recv_pps_threshold
)
{
recv_pps_threshold_hit
=
1
;
}
if
(
recv_pps_threshold_hit
&&
recv_counter
.
pps
<
recv_pps_threshold
)
{
struct
timespec
_ts
;
int64_t
ts
;
clock_gettime
(
CLOCK_MONOTONIC
,
&
_ts
);
ts
=
ts_to_int
(
_ts
);
if
((
recv_stop
&&
((
ts
-
recv_stop
)
>
(
s
->
monitor_trigger_duration
)
*
INT64_C
(
1000000000
))))
{
if
(
s
->
monitor_pps
==
1
)
log_exit
(
"MONITOR"
,
"Stopped recieving packets, restarting..."
);
log_info
(
"MONITOR"
,
"Stopped recieving packets, sending again..."
);
sync_complete
=
0
;
recv_stop
=
0
;
}
if
(
!
recv_stop
)
recv_stop
=
ts
;
}
if
(
!
recv_stop
)
recv_stop
=
ts
;
}
#endif
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
}
...
...
@@ -1065,17 +1028,17 @@ int startdpdk(TRXEcpriState * s) {
init_counter
(
&
encode_counter
);
init_counter
(
&
sent_counter
);
RBUF_INIT
(
rx_rbuf
,
"RX ring buffer"
,
TXRX_BUF_MAX_SIZE
,
RX_MAX_PACKET_SIZE
,
uint8_t
);
RBUF_INIT
(
tx_rbuf
,
"TX ring buffer"
,
TXRX_BUF_MAX_SIZE
,
TX_ECPRI_PACKET_SIZE
,
uint8_t
);
for
(
int
i
=
0
;
i
<
TX_N_CHANNEL
;
i
++
)
{
char
s
[
256
];
sprintf
(
s
,
"TRXWrite Ring Buffer %d"
,
i
);
RBUF_INIT
(
trxw_rbuf
[
i
],
s
,
TRX_BUF_MAX_SIZE
,
N_SAMPLES
,
Complex
);
RBUF_INIT
(
rx_rbuf
,
"RX ring buffer"
,
s
->
txrx_buf_size
,
RX_MAX_PACKET_SIZE
,
uint8_t
);
RBUF_INIT
(
tx_rbuf
,
"TX ring buffer"
,
s
->
txrx_buf_size
,
TX_ECPRI_PACKET_SIZE
,
uint8_t
);
for
(
int
i
=
0
;
i
<
s
->
tx_n_channel
;
i
++
)
{
char
name
[
256
];
sprintf
(
name
,
"TRXWrite Ring Buffer %d"
,
i
);
RBUF_INIT
(
trxw_rbuf
[
i
],
name
,
s
->
trx_buf_size
,
N_SAMPLES
,
Complex
);
}
for
(
int
i
=
0
;
i
<
RX_N_CHANNEL
;
i
++
)
{
char
s
[
256
];
sprintf
(
s
,
"TRXRead Ring Buffer %d"
,
i
);
RBUF_INIT
(
trxr_rbuf
[
i
],
s
,
TRX_BUF_MAX_SIZE
,
N_SAMPLES
,
Complex
);
for
(
int
i
=
0
;
i
<
s
->
rx_n_channel
;
i
++
)
{
char
name
[
256
];
sprintf
(
name
,
"TRXRead Ring Buffer %d"
,
i
);
RBUF_INIT
(
trxr_rbuf
[
i
],
name
,
s
->
trx_buf_size
,
N_SAMPLES
,
Complex
);
}
RBUF_INIT
(
trxw_group_rbuf
,
"TRXGroupWrite ring buffer"
,
TRX_MAX_GROUP
,
1
,
sample_group_t
);
...
...
@@ -1128,9 +1091,9 @@ static int64_t prev_count = 0;
static
void
trx_ecpri_write
(
TRXState
*
s1
,
trx_timestamp_t
timestamp
,
const
void
**
__samples
,
int
count
,
int
tx_port_index
,
TRXWriteMetadata
*
md
)
{
(
void
)
s1
;
int
write_count
;
int64_t
ts
;
sample_group_t
*
g
;
int
nc
;
int
nk
=
0
;
float
**
_samples
=
(
float
**
)
__samples
;
TRXEcpriState
*
s
=
s1
->
opaque
;
write_count
=
count
/
M
;
ts
=
timestamp
/
M
;
...
...
@@ -1147,17 +1110,15 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
return
;
}
#ifdef TRACE
#ifdef TRACE_TX
if
((
write_counter
.
counter
+
write_count
)
>=
(
trxw_rbuf
[
0
].
buf_len
+
TRACE
))
{
tx_trace_ready
=
1
;
log_info
(
"TRX_ECPRI_WRITE"
,
"TX Trace ready"
);
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
tx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
if
(
s
->
trace_tx
)
{
if
((
write_counter
.
counter
+
write_count
)
>=
(
trxw_rbuf
[
0
].
buf_len
+
s
->
trace_offset
))
{
tx_trace_ready
=
1
;
log_info
(
"TRX_ECPRI_WRITE"
,
"TX Trace ready"
)
;
pthread_exit
(
EXIT_SUCCESS
);
}
else
if
(
tx_trace_ready
)
{
pthread_exit
(
EXIT_SUCCESS
);
}
}
#endif
#endif
if
(
first_trx_write
)
{
sample_group_t
*
g2
=
RBUF_WRITE0
(
trxw_group_rbuf
,
sample_group_t
);
...
...
@@ -1174,7 +1135,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
while
((
nc
=
rbuf_contiguous_copy
(
NULL
,
&
trxw_rbuf
[
0
],
write_count
)))
{
int
len
=
nc
*
trxr_rbuf
[
0
].
len
*
sizeof
(
Complex
);
if
(
__samples
)
for
(
int
i
=
0
;
i
<
TX_N_CHANNEL
;
i
++
)
{
for
(
int
i
=
0
;
i
<
s
->
tx_n_channel
;
i
++
)
{
uint8_t
*
src
=
((
uint8_t
*
)
_samples
[
i
])
+
len
;
uint8_t
*
dst
=
((
uint8_t
*
)
trxw_rbuf
[
i
].
buffer
)
+
trxw_rbuf
[
0
].
write_index
*
trxw_rbuf
[
0
].
len
*
sizeof
(
Complex
);
memcpy
(
dst
,
src
,
len
);
...
...
@@ -1190,11 +1151,11 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
static
int
trx_ecpri_read
(
TRXState
*
s1
,
trx_timestamp_t
*
ptimestamp
,
void
**
__samples
,
int
count
,
int
rx_port_index
,
TRXReadMetadata
*
md
)
{
(
void
)
s1
;
int
nc
;
int
n
;
float
**
_samples
=
(
float
**
)
__samples
;
int
read_count
=
(
count
/
M
);
int
offset
=
0
;
TRXEcpriState
*
s
=
s1
->
opaque
;
log_debug
(
"TRX_ECPRI_READ"
,
"count = %ld (%li)"
,
read_count
,
read_counter
.
counter
);
...
...
@@ -1205,7 +1166,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
n
=
read_count
;
while
((
nc
=
rbuf_contiguous_copy
(
&
trxr_rbuf
[
0
],
NULL
,
n
)))
{
int
len
=
nc
*
trxr_rbuf
[
0
].
len
*
sizeof
(
Complex
);
for
(
int
i
=
0
;
i
<
RX_N_CHANNEL
;
i
++
)
{
for
(
int
i
=
0
;
i
<
s
->
rx_n_channel
;
i
++
)
{
uint8_t
*
dst
=
(
uint8_t
*
)
(
_samples
[
i
]
+
offset
);
uint8_t
*
src
=
((
uint8_t
*
)
trxr_rbuf
[
i
].
buffer
)
+
trxr_rbuf
[
0
].
read_index
*
trxr_rbuf
[
0
].
len
*
sizeof
(
Complex
);
memcpy
(
dst
,
src
,
len
);
...
...
@@ -1458,19 +1419,50 @@ int trx_driver_init(TRXState *s1)
s
->
statistic_affinity
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"flow_id"
);
s
->
flow_id
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"frame_frequency"
);
s
->
frame_frequency
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"trx_buf_size"
);
s
->
trx_buf_size
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"txrx_buf_size"
);
s
->
txrx_buf_size
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"trace_rx"
);
s
->
trace_rx
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"trace_tx"
);
s
->
trace_tx
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"trace_offset"
);
s
->
trace_offset
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"monitor_pps"
);
s
->
monitor_pps
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"monitor_trigger_duration"
);
s
->
monitor_trigger_duration
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"start_sending"
);
s
->
start_sending
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"start_receiving"
);
s
->
start_receiving
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"rx_n_channel"
);
s
->
rx_n_channel
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"tx_n_channel"
);
s
->
tx_n_channel
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"statistics_refresh_rate_ns"
);
s
->
statistics_refresh_rate_ns
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"ecpri_period"
);
if
(((
int
)
val
)
==
0
)
{
fprintf
(
stderr
,
"ecpri_period parameter can't be null
\n
"
);
return
-
1
;
}
s
->
ecpri_period
=
(
int
)
val
;
if
(
s
->
ecpri_period
==
0
)
log_exit
(
"TRX_ECPRI"
,
"ecpri_period parameter can't be null
\n
"
);
if
(
s
->
rx_n_channel
==
0
)
log_exit
(
"TRX_ECPRI"
,
"rx_n_channel parameter can't be null
\n
"
);
if
(
s
->
tx_n_channel
==
0
)
log_exit
(
"TRX_ECPRI"
,
"tx_n_channel parameter can't be null
\n
"
);
s
->
re_mac
=
(
uint8_t
*
)
trx_get_param_string
(
s1
,
"re_mac"
);
s
->
rec_mac
=
(
uint8_t
*
)
trx_get_param_string
(
s1
,
"rec_mac"
);
s
->
rec_if
=
(
uint8_t
*
)
trx_get_param_string
(
s1
,
"rec_if"
);
s
->
dpdk_options
=
trx_get_param_string
(
s1
,
"dpdk_options"
);
s
->
log_directory
=
(
uint8_t
*
)
trx_get_param_string
(
s1
,
"log_directory"
);
recv_pps_threshold
=
(
s
->
frame_frequency
*
9
/
10
);
s1
->
opaque
=
s
;
s1
->
trx_end_func
=
trx_ecpri_end
;
s1
->
trx_write_func2
=
trx_ecpri_write
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment