Commit d649e1bb authored by Long Li's avatar Long Li Committed by Steve French

CIFS: SMBD: Implement function to send data via RDMA send

The transport doesn't maintain send buffers or send queue for transferring
payload via RDMA send. There is no data copy in the transport on send.
Signed-off-by: default avatarLong Li <longli@microsoft.com>
Signed-off-by: default avatarSteve French <smfrench@gmail.com>
Reviewed-by: default avatarPavel Shilovsky <pshilov@microsoft.com>
Reviewed-by: default avatarRonnie Sahlberg <lsahlber@redhat.com>
parent 2fef137a
......@@ -41,6 +41,12 @@ static int smbd_post_recv(
struct smbd_response *response);
static int smbd_post_send_empty(struct smbd_connection *info);
static int smbd_post_send_data(
struct smbd_connection *info,
struct kvec *iov, int n_vec, int remaining_data_length);
static int smbd_post_send_page(struct smbd_connection *info,
struct page *page, unsigned long offset,
size_t size, int remaining_data_length);
/* SMBD version number */
#define SMBD_V1 0x0100
......@@ -177,6 +183,10 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
log_rdma_event(INFO, "cancelling send immediate work\n");
cancel_delayed_work_sync(&info->send_immediate_work);
log_rdma_event(INFO, "wait for all send to finish\n");
wait_event(info->wait_smbd_send_pending,
info->smbd_send_pending == 0);
log_rdma_event(INFO, "wait for all recv to finish\n");
wake_up_interruptible(&info->wait_reassembly_queue);
wait_event(info->wait_smbd_recv_pending,
......@@ -1077,6 +1087,24 @@ static int smbd_post_send_sgl(struct smbd_connection *info,
return rc;
}
/*
* Send a page
* page: the page to send
* offset: offset in the page to send
* size: length in the page to send
* remaining_data_length: remaining data to send in this payload
*/
static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
unsigned long offset, size_t size, int remaining_data_length)
{
struct scatterlist sgl;
sg_init_table(&sgl, 1);
sg_set_page(&sgl, page, size, offset);
return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
}
/*
* Send an empty message
* Empty message is used to extend credits to peer to for keep live
......@@ -1088,6 +1116,35 @@ static int smbd_post_send_empty(struct smbd_connection *info)
return smbd_post_send_sgl(info, NULL, 0, 0);
}
/*
* Send a data buffer
* iov: the iov array describing the data buffers
* n_vec: number of iov array
* remaining_data_length: remaining data to send following this packet
* in segmented SMBD packet
*/
static int smbd_post_send_data(
struct smbd_connection *info, struct kvec *iov, int n_vec,
int remaining_data_length)
{
int i;
u32 data_length = 0;
struct scatterlist sgl[SMBDIRECT_MAX_SGE];
if (n_vec > SMBDIRECT_MAX_SGE) {
cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
return -ENOMEM;
}
sg_init_table(sgl, n_vec);
for (i = 0; i < n_vec; i++) {
data_length += iov[i].iov_len;
sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
}
return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
}
/*
* Post a receive request to the transport
* The remote peer can only send data when a receive request is posted
......@@ -1652,6 +1709,9 @@ struct smbd_connection *_smbd_get_connection(
queue_delayed_work(info->workqueue, &info->idle_timer_work,
info->keep_alive_interval*HZ);
init_waitqueue_head(&info->wait_smbd_send_pending);
info->smbd_send_pending = 0;
init_waitqueue_head(&info->wait_smbd_recv_pending);
info->smbd_recv_pending = 0;
......@@ -1943,3 +2003,189 @@ int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
msg->msg_iter.count = 0;
return rc;
}
/*
* Send data to transport
* Each rqst is transported as a SMBDirect payload
* rqst: the data to write
* return value: 0 if successfully write, otherwise error code
*/
int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
{
struct kvec vec;
int nvecs;
int size;
int buflen = 0, remaining_data_length;
int start, i, j;
int max_iov_size =
info->max_send_size - sizeof(struct smbd_data_transfer);
struct kvec iov[SMBDIRECT_MAX_SGE];
int rc;
info->smbd_send_pending++;
if (info->transport_status != SMBD_CONNECTED) {
rc = -ENODEV;
goto done;
}
/*
* This usually means a configuration error
* We use RDMA read/write for packet size > rdma_readwrite_threshold
* as long as it's properly configured we should never get into this
* situation
*/
if (rqst->rq_nvec + rqst->rq_npages > SMBDIRECT_MAX_SGE) {
log_write(ERR, "maximum send segment %x exceeding %x\n",
rqst->rq_nvec + rqst->rq_npages, SMBDIRECT_MAX_SGE);
rc = -EINVAL;
goto done;
}
/*
* Remove the RFC1002 length defined in MS-SMB2 section 2.1
* It is used only for TCP transport
* In future we may want to add a transport layer under protocol
* layer so this will only be issued to TCP transport
*/
iov[0].iov_base = (char *)rqst->rq_iov[0].iov_base + 4;
iov[0].iov_len = rqst->rq_iov[0].iov_len - 4;
buflen += iov[0].iov_len;
/* total up iov array first */
for (i = 1; i < rqst->rq_nvec; i++) {
iov[i].iov_base = rqst->rq_iov[i].iov_base;
iov[i].iov_len = rqst->rq_iov[i].iov_len;
buflen += iov[i].iov_len;
}
/* add in the page array if there is one */
if (rqst->rq_npages) {
buflen += rqst->rq_pagesz * (rqst->rq_npages - 1);
buflen += rqst->rq_tailsz;
}
if (buflen + sizeof(struct smbd_data_transfer) >
info->max_fragmented_send_size) {
log_write(ERR, "payload size %d > max size %d\n",
buflen, info->max_fragmented_send_size);
rc = -EINVAL;
goto done;
}
remaining_data_length = buflen;
log_write(INFO, "rqst->rq_nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
"rq_tailsz=%d buflen=%d\n",
rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
rqst->rq_tailsz, buflen);
start = i = iov[0].iov_len ? 0 : 1;
buflen = 0;
while (true) {
buflen += iov[i].iov_len;
if (buflen > max_iov_size) {
if (i > start) {
remaining_data_length -=
(buflen-iov[i].iov_len);
log_write(INFO, "sending iov[] from start=%d "
"i=%d nvecs=%d "
"remaining_data_length=%d\n",
start, i, i-start,
remaining_data_length);
rc = smbd_post_send_data(
info, &iov[start], i-start,
remaining_data_length);
if (rc)
goto done;
} else {
/* iov[start] is too big, break it */
nvecs = (buflen+max_iov_size-1)/max_iov_size;
log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
" break to %d vectors\n",
start, iov[start].iov_base,
buflen, nvecs);
for (j = 0; j < nvecs; j++) {
vec.iov_base =
(char *)iov[start].iov_base +
j*max_iov_size;
vec.iov_len = max_iov_size;
if (j == nvecs-1)
vec.iov_len =
buflen -
max_iov_size*(nvecs-1);
remaining_data_length -= vec.iov_len;
log_write(INFO,
"sending vec j=%d iov_base=%p"
" iov_len=%zu "
"remaining_data_length=%d\n",
j, vec.iov_base, vec.iov_len,
remaining_data_length);
rc = smbd_post_send_data(
info, &vec, 1,
remaining_data_length);
if (rc)
goto done;
}
i++;
}
start = i;
buflen = 0;
} else {
i++;
if (i == rqst->rq_nvec) {
/* send out all remaining vecs */
remaining_data_length -= buflen;
log_write(INFO,
"sending iov[] from start=%d i=%d "
"nvecs=%d remaining_data_length=%d\n",
start, i, i-start,
remaining_data_length);
rc = smbd_post_send_data(info, &iov[start],
i-start, remaining_data_length);
if (rc)
goto done;
break;
}
}
log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
}
/* now sending pages if there are any */
for (i = 0; i < rqst->rq_npages; i++) {
buflen = (i == rqst->rq_npages-1) ?
rqst->rq_tailsz : rqst->rq_pagesz;
nvecs = (buflen + max_iov_size - 1) / max_iov_size;
log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
buflen, nvecs);
for (j = 0; j < nvecs; j++) {
size = max_iov_size;
if (j == nvecs-1)
size = buflen - j*max_iov_size;
remaining_data_length -= size;
log_write(INFO, "sending pages i=%d offset=%d size=%d"
" remaining_data_length=%d\n",
i, j*max_iov_size, size, remaining_data_length);
rc = smbd_post_send_page(
info, rqst->rq_pages[i], j*max_iov_size,
size, remaining_data_length);
if (rc)
goto done;
}
}
done:
/*
* As an optimization, we don't wait for individual I/O to finish
* before sending the next one.
* Send them all and wait for pending send count to get to 0
* that means all the I/Os have been out and we are good to return
*/
wait_event(info->wait_send_payload_pending,
atomic_read(&info->send_payload_pending) == 0);
info->smbd_send_pending--;
wake_up(&info->wait_smbd_send_pending);
return rc;
}
......@@ -92,6 +92,9 @@ struct smbd_connection {
/* Activity accoutning */
/* Pending reqeusts issued from upper layer */
int smbd_send_pending;
wait_queue_head_t wait_smbd_send_pending;
int smbd_recv_pending;
wait_queue_head_t wait_smbd_recv_pending;
......@@ -257,6 +260,7 @@ void smbd_destroy(struct smbd_connection *info);
/* Interface for carrying upper layer I/O through send/recv */
int smbd_recv(struct smbd_connection *info, struct msghdr *msg);
int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst);
#else
#define cifs_rdma_enabled(server) 0
......@@ -266,6 +270,7 @@ static inline void *smbd_get_connection(
static inline int smbd_reconnect(struct TCP_Server_Info *server) {return -1; }
static inline void smbd_destroy(struct smbd_connection *info) {}
static inline int smbd_recv(struct smbd_connection *info, struct msghdr *msg) {return -1; }
static inline int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst) {return -1; }
#endif
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment