Commit f3b0914f authored by Joanne Hugé's avatar Joanne Hugé

WIP: rewrite and clean up code

parent 67101268
......@@ -22,12 +22,17 @@
#include "send_packet.h"
#include "utilities.h"
#define MAX_KERNEL_LATENCY 1000
#define MAX_RTT_LATENCY 1000
// Structs
typedef struct thread_stat {
typedef struct egress_stat {
int nb_cycles;
uint64_t rtt;
packet_info_t packet_info;
uint64_t high_kernel_latency;
uint64_t invalid_parameter;
uint64_t missed_deadline;
char data[MAX_BUFFER_SIZE];
} thread_stat_t;
typedef struct thread_param {
......@@ -36,7 +41,6 @@ typedef struct thread_param {
int priority;
int etf_offset;
thread_stat_t stats;
} thread_param_t;
typedef struct main_param {
......@@ -44,13 +48,6 @@ typedef struct main_param {
int verbose;
} main_param_t;
typedef struct network_config {
size_t tx_buffer_len;
char ip_address[256];
char network_if[256];
int packet_priority;
} network_config_t;
// Static functions
static void process_options(int argc, char *argv[]);
......@@ -60,19 +57,21 @@ static void sighand(int sig_num);
// Static variables
static int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL];
static int64_t kernel_latencies[MAX_KERNEL_LATENCY]++;
static int64_t rtt_latencies[MAX_RTT_LATENCY]++;
static main_param_t main_param;
static thread_param_t *param;
static network_config_t network_config;
static main_param_t main_params;
static thread_param_t thread_params;
static egress_stat * egress_stats;
static egress_param * egress_params;
static int enable_histograms;
static int enable_affinity;
static int enable_etf;
static int enable_timestamps;
enum TSNTask { SEND_PACKET_TASK,
RTT_TASK };
enum TSNTask { SEND_PACKET_TASK, RTT_TASK };
static enum TSNTask tsn_task;
struct timespec measures_start;
......@@ -105,8 +104,6 @@ static void *packet_sending_thread(void *p) {
struct timespec next;
uint64_t next_txtime;
struct sched_param priority;
thread_param_t *param = (thread_param_t *)p;
thread_stat_t *stats = &param->stats;
cpu_set_t mask;
// Set thread CPU affinity
......@@ -134,18 +131,18 @@ static void *packet_sending_thread(void *p) {
clock_gettime(CLOCK_MONOTONIC, &next);
clock_gettime(CLOCK_MONOTONIC, &measures_start);
// Packet sending loop
for (stats->nb_cycles = 0;; stats->nb_cycles++) {
if (param->max_cycles)
if (stats->nb_cycles >= param->max_cycles)
for (egress_stats->nb_cycles = 0;; egress_stats->nb_cycles++) {
if (thread_params.max_cycles)
if (egress_stats->nb_cycles >= thread_params.max_cycles)
break;
sprintf(send_data, "%d", stats->nb_cycles % 1000);
do_tsn_task(param, send_data, next_txtime);
sprintf(send_data, "%d", egress_stats->nb_cycles % 1000);
do_tsn_task(send_data, next_txtime);
add_ns(&next, param->interval);
add_ns(&next, thread_params.interval);
if (enable_etf)
next_txtime += param->interval;
next_txtime += thread_params.interval;
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL);
}
......@@ -157,15 +154,14 @@ static void *packet_sending_thread(void *p) {
// Handles the IO and creates real time threads
int main(int argc, char *argv[]) {
pthread_t thread;
thread_stat_t *stats;
param = malloc(sizeof(thread_param_t));
stats = &param->stats;
// Default configuration values
param->interval = 100000 * 1000;
param->max_cycles = 0;
param->priority = 99;
thread_params.interval = 100000 * 1000;
thread_params.max_cycles = 0;
thread_params.priority = 99;
main_params.refresh_rate = 50000;
main_params.verbose = 0;
enable_affinity = 0;
enable_etf = 0;
......@@ -173,18 +169,19 @@ int main(int argc, char *argv[]) {
enable_histograms = 0;
tsn_task = SEND_PACKET_TASK;
network_config.packet_priority = 3;
network_config.tx_buffer_len = 1024;
main_param.refresh_rate = 50000;
main_param.verbose = 0;
egress_params->packet_priority = 3;
egress_params->tx_buffer_len = 1024;
// Process bash options
process_options(argc, argv);
egress_params->use_etf = enable_etf;
egress_params->use_timestamps = enable_timestamps;
if (enable_histograms) {
// Init histograms
memset((int64_t *)histograms, 0, NB_HISTOGRAMS * MAX_HIST_VAL);
memset(kernel_latency, 0, MAX_KERNEL_LATENCY);
memset(rtt_latency, 0, MAX_RTT_LATENCY);
}
// Catch breaks with sighand to print the histograms
......@@ -212,7 +209,7 @@ int main(int argc, char *argv[]) {
if (tsn_task == RTT_TASK) {
printf("%*d: RTT: %*" PRIu64 "\n", 10, stats->nb_cycles, 10, stats->rtt);
printf("\033[%dA", 1);
printf("\033[%dA", 1);
} else if (enable_timestamps) {
int64_t user_space_time = stats->packet_info.userspace_exit_ts - stats->packet_info.userspace_enter_ts;
......@@ -220,7 +217,7 @@ int main(int argc, char *argv[]) {
printf("%*d: U: %*" PRIi64 ", K: %*" PRIi64 "\n",
10, stats->nb_cycles, 10, user_space_time, 10, kernel_space_time);
printf("\033[%dA", 1);
printf("\033[%dA", 1);
}
}
......
......@@ -37,79 +37,63 @@
#include "send_packet.h"
#include "utilities.h"
static void process_timestamps(packet_info_t *packet_info, int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL]);
static void init_tx_buffer(size_t _tx_buffer_len);
static void * poll_thread(void *p);
static void process_error_queue();
static int so_priority = 3;
static struct sock_txtime sk_txtime;
static char *tx_buffer;
static size_t tx_buffer_len;
static int sock_fd;
static int64_t tai_offset;
static void init_tx_buffer();
static int set_if();
static int so_timestamping_flags =
SOF_TIMESTAMPING_TX_SOFTWARE | SOF_TIMESTAMPING_SOFTWARE;
// Sets the interface
static int set_if(char *network_if) {
struct ifreq ifreq;
memset(&ifreq, 0, sizeof(ifreq));
strncpy(ifreq.ifr_name, network_if, sizeof(ifreq.ifr_name) - 1);
if (ioctl(sock_fd, SIOCGIFINDEX, &ifreq))
error(EXIT_FAILURE, errno, "ioctl SIOCGIFINDEX failed\n");
return ifreq.ifr_ifindex;
}
struct egress_param * params;
struct thread_param * thread_params;
uint64_t * kernel_latency_hist;
int use_histogram;
static void init_tx_buffer(size_t _tx_buffer_len) {
if (_tx_buffer_len < 1) {
fprintf(stderr, "tx buffer length should be greater than 1\n");
exit(EXIT_FAILURE);
}
static struct sock_txtime sk_txtime;
static char *tx_buffer;
static int sock_fd;
tx_buffer_len = _tx_buffer_len;
tx_buffer = malloc(tx_buffer_len);
}
static uint64_t timestamps_buffer[64];
static int ts_buf_read_index = 0;
static int ts_buf_write_index = 0;
/*
* Init UDP socket
*/
void init_udp_send(int use_etf, int use_timestamps, int packet_priority,
char *network_if, size_t _tx_buffer_len) {
int set_if_err;
struct timespec ts_mon;
struct timespec ts_tai;
init_udp_send(struct egress_param * _params,
struct thread_param * _thread_params,
int _use_histogram,
uint64_t * _kernel_latency) {
init_tx_buffer(_tx_buffer_len);
int set_if_err;
pthread_t thread;
clock_gettime(CLOCK_MONOTONIC, &ts_mon);
clock_gettime(CLOCK_TAI, &ts_tai);
tai_offset = (ts_mon.tv_sec - ts_tai.tv_sec) * NSEC_PER_SEC +
(ts_mon.tv_nsec - ts_tai.tv_nsec);
params = _params;
thread_params = _thread_params;
kernel_latency = _kernel_latency;
use_histogram = _use_histogram
so_priority = packet_priority;
init_tx_buffer();
sock_fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sock_fd < 0)
error(EXIT_FAILURE, errno, "Socket creation failed\n");
set_if_err = set_if(network_if);
set_if_err = set_if();
if (set_if_err < 0)
error(EXIT_FAILURE, errno, "Couldn't set interface\n");
if (setsockopt(sock_fd, SOL_SOCKET, SO_PRIORITY, &so_priority,
sizeof(so_priority)))
if (setsockopt(sock_fd, SOL_SOCKET, SO_PRIORITY, &params->packet_priority,
sizeof(params->packet_priority)))
error(EXIT_FAILURE, errno, "Couldn't set socket priority\n");
if (setsockopt(sock_fd, SOL_SOCKET, SO_BINDTODEVICE, network_if,
strlen(network_if)))
if (setsockopt(sock_fd, SOL_SOCKET, SO_BINDTODEVICE, params->network_if,
strlen(params->network_if)))
error(EXIT_FAILURE, errno, "setsockopt SO_BINDTODEVICE failed\n");
if (use_etf) {
if (params->use_etf) {
sk_txtime.clockid = CLOCK_TAI;
sk_txtime.flags = 0;
......@@ -117,38 +101,34 @@ void init_udp_send(int use_etf, int use_timestamps, int packet_priority,
error(EXIT_FAILURE, errno, "setsockopt SO_TXTIME failed\n");
}
if (use_timestamps) {
if (params->use_timestamps) {
if (setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping_flags,
sizeof(so_timestamping_flags)))
error(EXIT_FAILURE, errno, "setsockopt SO_TIMESTAMPING failed\n");
}
// Create poll thread
if (pthread_create(&thread, NULL, poll_thread, NULL))
error(EXIT_FAILURE, errno, "Couldn't create poll thread");
}
/*
* Sends udp packets
*/
packet_info_t send_udp_packet(int use_etf, int use_timestamps,
char *data,
uint64_t txtime,
const char *server_ip,
int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL]) {
void send_udp_packet(char *data,
uint64_t txtime) {
struct msghdr msg; // Message hardware, sent to the socket
struct cmsghdr *cmsg; // Control message hardware, for txtime
char control[CMSG_SPACE(sizeof(txtime))] = {}; // Stores txtime
struct iovec iov; // The iovec structures stores the TX buffer
// Poll file descriptor, used to poll for timestamp messages
struct pollfd poll_fd = {sock_fd, POLLPRI, 0};
int sendmsgerr, pollerr;
// Server address
struct sockaddr_in sin;
packet_info_t packet_info;
struct timespec ts;
int sendmsgerr;
struct sockaddr_in sin; // Server address
struct timespec ts; // timestamp for userspace timestamping
if (use_timestamps) {
clock_gettime(CLOCK_REALTIME, &ts);
packet_info.userspace_enter_ts = ts_to_uint(ts);
timestamps_buffer[ts_buf_write_index++] = ts_to_uint(ts);
}
strcpy(tx_buffer, data);
......@@ -180,136 +160,110 @@ packet_info_t send_udp_packet(int use_etf, int use_timestamps,
msg.msg_controllen = cmsg->cmsg_len;
}
if (use_timestamps) {
clock_gettime(CLOCK_REALTIME, &ts);
packet_info.userspace_exit_ts = ts_to_uint(ts);
}
sendmsgerr = sendmsg(sock_fd, &msg, 0);
if (sendmsgerr < 0)
error(EXIT_FAILURE, errno, "sendmsg failed, ret value: %d\n", sendmsgerr);
}
if (use_timestamps) {
pollerr = poll(&poll_fd, 1, 1);
if (pollerr > 0)
process_timestamps(&packet_info, histograms);
else
fprintf(stderr, "select failed\n");
}
static void * poll_thread(void *p) {
return packet_info;
}
(void)p;
// Poll file descriptor
struct pollfd poll_fd = {.fd = sock_fd};
static void fill_histograms(packet_info_t *packet_info, int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL]) {
while (1) {
int ret;
uint64_t user_space_time = packet_info->userspace_exit_ts - packet_info->userspace_enter_ts;
uint64_t kernel_space_time = packet_info->kernelspace_ts - packet_info->userspace_exit_ts;
ret = poll(&poll_fd, 1, -1);
if (ret == 1 && p_fd.revents & POLLERR) {
process_error_queue();
}
}
}
user_space_time /= 1000u;
kernel_space_time /= 1000u;
static void process_error_queue() {
// IO vector
unsigned char data_buffer[256]; // Buffer in io vector
struct iovec iov = {
.iov_base = data_buffer,
.iov_len = sizeof(data_buffer)};
if (user_space_time > MAX_HIST_VAL)
fprintf(stderr, "user_space_time value too high: %" PRIu64 "us\n", user_space_time);
else
histograms[0][user_space_time]++;
// Control data, will store error or timestamps
unsigned char msg_control[CMSG_SPACE(sizeof(struct sock_extended_err)) + CMSG_SPACE(sizeof(struct timespec))];
if (kernel_space_time > MAX_HIST_VAL)
fprintf(stderr, "kernel_space_time value too high: %" PRIu64 "us\n", kernel_space_time);
else
histograms[1][kernel_space_time]++;
}
// Message hardware structure, containts IO vector and control message hardware
struct msghdr msg = {
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = msg_control,
.msg_controllen = sizeof(msg_control)};
static void process_timestamps(packet_info_t *packet_info, int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL]) {
char data[256];
struct msghdr msg;
struct iovec entry;
struct sockaddr_in from_addr;
struct {
struct cmsghdr cm;
char control[512];
} control;
struct cmsghdr *cmsg;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &entry;
msg.msg_iovlen = 1;
entry.iov_base = data;
entry.iov_len = sizeof(data);
msg.msg_name = (caddr_t)&from_addr;
msg.msg_namelen = sizeof(from_addr);
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
if (recvmsg(sock_fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT) == -1) {
fprintf(stderr, "recvmsg failed\n");
// Timestamps and errors are received in the error queue
res = recvmsg(sock_fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
if (res == -1) {
fprintf(stderr, "recvmsg() failed\n");
return;
}
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
// If a timestamp was received
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMPING) {
struct timespec *stamp = (struct timespec *)CMSG_DATA(cmsg);
packet_info->kernelspace_ts = ts_to_uint(*stamp);
fill_histograms(packet_info, histograms);
} else {
uint64_t kernel_latency = ts_to_uint(stamp) - timestamps_buffer[ts_buf_read_index++];
kernel_latency /= 1000u;
#ifdef DEBUG
fprintf(stderr, "process_timestamps: level %d type %d", cmsg->cmsg_level,
cmsg->cmsg_type);
#endif
}
}
}
#ifdef DEBUG
/*
* Code from scheduled_tx_tools
*/
static int process_socket_error_queue() {
uint8_t msg_control[CMSG_SPACE(sizeof(struct sock_extended_err))];
unsigned char err_buffer[256];
struct sock_extended_err *serr;
struct cmsghdr *cmsg;
__u64 tstamp = 0;
if (kernel_latency > MAX_HIST_VAL)
stats.high_kernel_latency++;
else
kernel_latencies[kernel_latency]++;
struct iovec iov = {.iov_base = err_buffer, .iov_len = sizeof(err_buffer)};
struct msghdr msg = {.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = msg_control,
.msg_controllen = sizeof(msg_control)};
}
// If an error was received
else {
if (recvmsg(sock_fd, &msg, MSG_ERRQUEUE) == -1) {
fprintf(stderr, "recvmsg failed");
return -1;
}
struct sock_extended_err *serr = (void *)CMSG_DATA(cmsg);
cmsg = CMSG_FIRSTHDR(&msg);
while (cmsg != NULL) {
serr = (void *)CMSG_DATA(cmsg);
if (serr->ee_origin == SO_EE_ORIGIN_TXTIME) {
tstamp = ((__u64)serr->ee_data << 32) + serr->ee_info;
if (serr->ee_origin != SO_EE_ORIGIN_TXTIME)
continue;
switch (serr->ee_code) {
case SO_EE_CODE_TXTIME_INVALID_PARAM:
fprintf(stderr,
"packet with tstamp %llu dropped due to invalid params\n",
tstamp);
return 0;
stats.invalid_parameter++;
break;
case SO_EE_CODE_TXTIME_MISSED:
fprintf(stderr,
"packet with tstamp %llu dropped due to missed deadline\n",
tstamp);
return 0;
stats.missed_deadline++;
break;
default:
return -1;
fprintf(stderr, "Uknown TxTime error\n");
}
}
}
}
cmsg = CMSG_NXTHDR(&msg, cmsg);
// Sets the interface
static int set_if() {
struct ifreq ifreq;
memset(&ifreq, 0, sizeof(ifreq));
strncpy(ifreq.ifr_name, params->network_if, sizeof(ifreq.ifr_name) - 1);
if (ioctl(sock_fd, SIOCGIFINDEX, &ifreq))
error(EXIT_FAILURE, errno, "ioctl SIOCGIFINDEX failed\n");
return ifreq.ifr_ifindex;
}
static void init_tx_buffer() {
if (params->tx_buffer_len < 1) {
fprintf(stderr, "tx buffer length should be greater than 1\n");
exit(EXIT_FAILURE);
}
return 0;
tx_buffer = malloc(params->tx_buffer_len);
}
#endif
......@@ -3,7 +3,23 @@
#include "utilities.h"
void init_udp_send(int use_etf, int use_timestamps, int so_priority, char *network_if, size_t tx_buffer_len);
packet_info_t send_udp_packet(int use_etf, int use_timestamps, char *data, uint64_t txtime, const char *server_ip, int64_t histograms[NB_HISTOGRAMS][MAX_HIST_VAL]);
init_udp_send(struct egress_param * _params,
struct thread_param * _thread_params,
uint64_t kernel_latency[MAX_KERNEL_LATENCY]);
void send_udp_packet(char *data, uint64_t txtime);
struct egress_param {
int packet_priority;
size_t tx_buffer_len;
char server_ip[45];
char network_if[16];
int use_etf;
int use_timestamps;
int min_kernel_latency;
int avg_kernel_latency;
int max_kernel_latency;
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment