Commit 3591ec8e authored by Titouan Soulard's avatar Titouan Soulard

Launch server in a separate thread

parent e8bc0796
CFLAGS = -Wall -O3
INCLUDE_FLAGS = -I./include
LIB_FLAGS =-lc -ldl -libverbs
LIB_FLAGS =-lc -ldl -libverbs -lpthread
# Libraries after `--no-as-needed` are forcefully loaded: they are required by Amarisoft libraries
FORCE_LIB_FLAGS = -Wl,--no-as-needed -lm -lpthread
......
......@@ -31,8 +31,7 @@ make install
rdma_standalone -c 192.168.16.10 < Makefile
```
The client should immediately display the content of `README.md`. The content of the `Makefile`
got written to the server, but currently not displayed because the program is mono-threaded.
The client should (almost) immediately display the content of `README.md`, and the server the content of `Makefile`.
## TRX driver
......
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
......@@ -9,6 +10,17 @@
#include "libcapulet/net_udp.h"
#include "libcapulet/rdma_ib.h"
struct ServeMrThreadContext {
struct CommonHashtableTable *mr_mgr;
int server_socket;
};
void *serve_mr_td_fn(void *raw_ctx) {
struct ServeMrThreadContext *ctx = (struct ServeMrThreadContext *) raw_ctx;
capulet_net_udp_serve_mr(ctx->mr_mgr, ctx->server_socket);
return raw_ctx;
}
int main(int argc, char *argv[]) {
struct CapuletRdmaIbContext rdma_ctx;
struct CapuletNetUdpContext *udp_ctx;
......@@ -23,8 +35,12 @@ int main(int argc, char *argv[]) {
struct addrinfo server_hints;
struct addrinfo *server_infos;
pthread_t serve_mr_td;
struct ServeMrThreadContext serve_mr_td_ctx;
int server_socket;
char remote_host[16];
char in_mr_fc;
bool result;
int allocated_size = 16384 * sizeof(char);
......@@ -154,48 +170,63 @@ int main(int argc, char *argv[]) {
}
if(!is_client) {
capulet_net_udp_serve_mr(&mr_mgr, server_socket);
// Reached only on error: serve_mr is looping forever
return -1;
}
// XXX: check ownership
serve_mr_td_ctx.mr_mgr = &mr_mgr;
serve_mr_td_ctx.server_socket = server_socket;
result = capulet_net_udp_query_mr(server_socket, server_infos, "in", &in_info_packet);
if(!result || (strcmp(in_info_packet.name, "in") != 0)) {
fprintf(stderr, "Querying MR (in) from server failed\n");
return -1;
}
pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) &serve_mr_td_ctx);
} else {
result = capulet_net_udp_query_mr(server_socket, server_infos, "in", &in_info_packet);
if(!result || (strcmp(in_info_packet.name, "in") != 0)) {
fprintf(stderr, "Querying MR (in) from server failed\n");
return -1;
}
result = capulet_net_udp_query_mr(server_socket, server_infos, "out", &out_info_packet);
if(!result || (strcmp(out_info_packet.name, "out") != 0)) {
fprintf(stderr, "Querying MR (out) from server failed\n");
return -1;
}
result = capulet_net_udp_query_mr(server_socket, server_infos, "out", &out_info_packet);
if(!result || (strcmp(out_info_packet.name, "out") != 0)) {
fprintf(stderr, "Querying MR (out) from server failed\n");
return -1;
}
result = capulet_rdma_ib_send_read(&rdma_ctx, &out_info_packet, out_mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Read failed\n");
return -1;
}
result = capulet_rdma_ib_send_read(&rdma_ctx, &out_info_packet, out_mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Read failed\n");
return -1;
}
result = capulet_rdma_ib_send_write(&rdma_ctx, &in_info_packet, in_mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Write failed\n");
return -1;
result = capulet_rdma_ib_send_write(&rdma_ctx, &in_info_packet, in_mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Write failed\n");
return -1;
}
}
/******************************
****** Poll completion *******
******************************/
do {
result = ibv_poll_cq(rdma_ctx.cq, 1, &poll_wc);
} while(result == 0);
if(result > 0 && poll_wc.status == IBV_WC_SUCCESS) {
if(is_client) {
printf("%s\n", (char *) out_mr->addr);
if(is_client) {
do {
result = ibv_poll_cq(rdma_ctx.cq, 1, &poll_wc);
usleep(100);
} while(result == 0);
if(result > 0 && poll_wc.status == IBV_WC_SUCCESS) {
if(is_client) {
printf("%s\n", (char *) out_mr->addr);
}
} else {
printf("Failed: %s (WR %lu)\n", ibv_wc_status_str(poll_wc.status), poll_wc.wr_id);
}
} else {
printf("Failed: %s (WR %lu)\n", ibv_wc_status_str(poll_wc.status), poll_wc.wr_id);
// RDMA Read and Write do not give any information on completion
// on the server side, so we wait for the first char of the string
// to be non-zero.
do {
in_mr_fc = *((char *) in_mr->addr);
usleep(100);
} while(in_mr_fc == 0);
printf("%s\n", (char *) in_mr->addr);
}
/******************************
......
#pragma once
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
......@@ -9,6 +10,11 @@
#include "libcapulet/rdma_ib.h"
#include "libcapulet/net_udp.h"
struct SDRServeMrThreadContext {
struct CommonHashtableTable *mr_mgr;
int server_socket;
};
struct SDRMetadata {
uint8_t version;
uint16_t mr_length;
......
#include "libtrx/trx_rdma.h"
void *serve_mr_td_fn(void *raw_ctx) {
struct SDRServeMrThreadContext *ctx = (struct SDRServeMrThreadContext *) raw_ctx;
capulet_net_udp_serve_mr(ctx->mr_mgr, ctx->server_socket);
return raw_ctx;
}
static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) {
struct SDRContext *sdr_context;
struct SDRServeMrThreadContext serve_mr_td_ctx;
struct ibv_mr *in_mr;
struct ibv_mr *out_mr;
struct addrinfo server_hints;
struct addrinfo *server_infos;
pthread_t serve_mr_td;
int mr_size;
int server_socket;
bool result;
......@@ -72,16 +82,20 @@ static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) {
result = capulet_rdma_ib_set_peer_from_udp(&sdr_context->ib_ctx, sdr_context->udp_ctx);
if(!result) return -1;
// XXX: will likely require to have a thread serving MRs and a thread answering to LTEENB requests
if(sdr_context->server_addr) {
// XXX: check ownership
serve_mr_td_ctx.mr_mgr = &sdr_context->mr_mgr;
serve_mr_td_ctx.server_socket = server_socket;
pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) &serve_mr_td_ctx);
capulet_net_udp_serve_mr(&sdr_context->mr_mgr, server_socket);
} else {
capulet_net_udp_query_mr(server_socket, server_infos, "in", &sdr_context->in_remote);
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
}
capulet_net_udp_query_mr(server_socket, server_infos, "in", &sdr_context->in_remote);
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
freeaddrinfo(server_infos);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment