Commit 413a1f89 authored by Titouan Soulard's avatar Titouan Soulard

libtrx: complete support

- Support Read/Write
- Returns correct values to higher-level interface
- Warning: still untested
parent 77715527
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <stdio.h> #include <stdio.h>
#include "rdma_ib.h" #include "rdma_ib.h"
#include "rdma_mr_mgr.h"
#include "trx_driver.h" #include "trx_driver.h"
#include "net_udp.h" #include "net_udp.h"
...@@ -23,7 +24,7 @@ struct SDRContext { ...@@ -23,7 +24,7 @@ struct SDRContext {
struct CapuletRdmaIbContext ib_ctx; struct CapuletRdmaIbContext ib_ctx;
struct CapuletNetUdpContext *udp_ctx; struct CapuletNetUdpContext *udp_ctx;
struct SDRMemoryRegion *in; struct CapuletNetUdpMrInfoPacket in_remote;
struct SDRMemoryRegion *out; struct CapuletNetUdpMrInfoPacket out_remote;
}; };
#include "trx_rdma.h" #include "trx_rdma.h"
static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) { static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) {
struct SDRContext *sdr_context = s->opaque; struct SDRContext *sdr_context;
struct CapuletRdmaMrMgrElement *in_mr_el;
struct CapuletRdmaMrMgrElement *out_mr_el;
struct addrinfo server_hints;
struct addrinfo *server_infos;
int mr_size;
int server_socket;
bool result;
sdr_context = s->opaque;
mr_size = sizeof(struct SDRMemoryRegion);
// Find a server address to connect to
// XXX: server address is hardcoded
memset(&server_hints, 0, sizeof(struct addrinfo));
server_hints.ai_family = AF_INET;
server_hints.ai_socktype = SOCK_DGRAM;
if(getaddrinfo("192.168.16.20", "7362", &server_hints, &server_infos) != 0) return -1;
// Get IBV context for device
// XXX: device name is hardcoded
result = capulet_rdma_ib_initialize_device(&sdr_context->ib_ctx, "rxe0");
if(!result) return -1;
// Register two memory regions: one is used for client write (in), the other for client read (out)
// MRs with the same name are registered on the server side, so the operations will involve 4 MRs in total
in_mr_el = capulet_rdma_mr_mgr_register(&sdr_context->ib_ctx, "in", mr_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
out_mr_el = capulet_rdma_mr_mgr_register(&sdr_context->ib_ctx, "out", mr_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ);
if(!in_mr_el || !out_mr_el) return -1;
// Setup three queues: Recv, Send and Completion
// The Recv queue must be filled with Recv requests to allow for Completion polling
// Access control is already handled by MRs, so full remote access is given (R/W)
result = capulet_rdma_ib_initialize_qp(&sdr_context->ib_ctx, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE);
if(!result) return -1;
result = capulet_rdma_ib_post_recv(&sdr_context->ib_ctx, in_mr_el->mr, mr_size);
if(!result) return -1;
result = capulet_rdma_ib_post_recv(&sdr_context->ib_ctx, out_mr_el->mr, mr_size);
if(!result) return -1;
// Create UDP context and exchange informations with peer
// Sends three packets: one QP informations and two MR queries
sdr_context->udp_ctx = capulet_net_udp_initialize();
if(!sdr_context->udp_ctx) return -1;
result = capulet_rdma_ib_fill_base_udp(&sdr_context->ib_ctx, sdr_context->udp_ctx->local);
if(!result) return -1;
server_socket = capulet_net_udp_send_qp(sdr_context->udp_ctx, server_infos);
if(!server_socket) return -1;
result = capulet_rdma_ib_set_peer_from_udp(&sdr_context->ib_ctx, sdr_context->udp_ctx);
if(!result) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "in", &sdr_context->in_remote);
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
freeaddrinfo(server_infos);
return 0;
} }
static void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int rf_port_index) { static void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int rf_port_index) {
struct SDRContext *sdr_context;
struct SDRMemoryRegion *in;
struct CapuletRdmaMrMgrElement *in_mr_el;
int byte_count;
sdr_context = s->opaque;
byte_count = count * sizeof(float);
in_mr_el = capulet_rdma_mr_mgr_find("in");
in = (struct SDRMemoryRegion *) in_mr_el->mr->addr;
// First copy data from user buffer to local RDMA buffer
in->meta.mr_length = count;
memcpy((void *) &in->iq, *samples, byte_count);
// Then Send local RDMA buffer to server
capulet_rdma_ib_send_write(&sdr_context->ib_ctx, &sdr_context->in_remote, in_mr_el->mr, byte_count + sizeof(struct SDRMetadata));
} }
static int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int count, int rf_port) { static int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int count, int rf_port) {
struct SDRContext *sdr_context;
struct SDRMemoryRegion *out;
struct CapuletRdmaMrMgrElement *out_mr_el;
struct ibv_wc poll_wc;
int byte_count;
int result;
sdr_context = s->opaque;
byte_count = count * sizeof(float);
out_mr_el = capulet_rdma_mr_mgr_find("out");
out = (struct SDRMemoryRegion *) out_mr_el->mr->addr;
// Send Read request and poll for completion
capulet_rdma_ib_send_read(&sdr_context->ib_ctx, &sdr_context->out_remote, out_mr_el->mr, byte_count + sizeof(struct SDRMetadata));
do {
result = ibv_poll_cq(sdr_context->ib_ctx.cq, 1, &poll_wc);
} while(result == 0);
// Copy data from RDMA buffer to local buffer
memcpy(*psamples, &out->iq, byte_count);
return out->meta.mr_length;
} }
static int trx_rdma_get_sample_rate(TRXState *s, TRXFraction *psample_rate, int *psample_rate_num, int sample_rate_min) { static int trx_rdma_get_sample_rate(TRXState *s, TRXFraction *psample_rate, int *psample_rate_num, int sample_rate_min) {
...@@ -19,12 +118,26 @@ static int trx_rdma_get_sample_rate(TRXState *s, TRXFraction *psample_rate, int ...@@ -19,12 +118,26 @@ static int trx_rdma_get_sample_rate(TRXState *s, TRXFraction *psample_rate, int
} }
static void trx_rdma_end(TRXState *s) { static void trx_rdma_end(TRXState *s) {
struct SDRContext *sdr_context;
struct CapuletRdmaMrMgrElement *in_mr_el;
struct CapuletRdmaMrMgrElement *out_mr_el;
sdr_context = s->opaque;
in_mr_el = capulet_rdma_mr_mgr_find("in");
out_mr_el = capulet_rdma_mr_mgr_find("out");
if(in_mr_el) capulet_rdma_mr_mgr_free(in_mr_el);
if(out_mr_el) capulet_rdma_mr_mgr_free(out_mr_el);
capulet_net_udp_free(sdr_context->udp_ctx);
capulet_rdma_ib_free(&sdr_context->ib_ctx);
} }
int trx_driver_init(TRXState *s) { int trx_driver_init(TRXState *s) {
struct SDRContext *sdr_context; struct SDRContext *sdr_context;
srand48(getpid() * time(NULL));
sdr_context = calloc(1, sizeof(struct SDRContext)); sdr_context = calloc(1, sizeof(struct SDRContext));
if(s->trx_api_version != TRX_API_VERSION) { if(s->trx_api_version != TRX_API_VERSION) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment