Commit 566050e1 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: separate msgr1 protocol implementation

In preparation for msgr2, define internal messenger <-> protocol
interface (as opposed to external messenger <-> client interface, which
is struct ceph_connection_operations) consisting of try_read(),
try_write(), revoke(), revoke_incoming(), opened(), reset_session() and
reset_protocol() ops.  The semantics are exactly the same as they are
now.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 6503e0b6
......@@ -382,6 +382,14 @@ int ceph_con_in_msg_alloc(struct ceph_connection *con,
struct ceph_msg_header *hdr, int *skip);
void ceph_con_get_out_msg(struct ceph_connection *con);
int ceph_con_v1_try_read(struct ceph_connection *con);
int ceph_con_v1_try_write(struct ceph_connection *con);
void ceph_con_v1_revoke(struct ceph_connection *con);
void ceph_con_v1_revoke_incoming(struct ceph_connection *con);
bool ceph_con_v1_opened(struct ceph_connection *con);
void ceph_con_v1_reset_session(struct ceph_connection *con);
void ceph_con_v1_reset_protocol(struct ceph_connection *con);
extern const char *ceph_pr_addr(const struct ceph_entity_addr *addr);
......
......@@ -593,6 +593,11 @@ int ceph_con_close_socket(struct ceph_connection *con)
return rc;
}
void ceph_con_v1_reset_protocol(struct ceph_connection *con)
{
con->out_skip = 0;
}
static void ceph_con_reset_protocol(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
......@@ -609,7 +614,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con)
con->out_msg = NULL;
}
con->out_skip = 0;
ceph_con_v1_reset_protocol(con);
}
/*
......@@ -631,6 +636,12 @@ static void ceph_msg_remove_list(struct list_head *head)
}
}
void ceph_con_v1_reset_session(struct ceph_connection *con)
{
con->connect_seq = 0;
con->peer_global_seq = 0;
}
void ceph_con_reset_session(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
......@@ -643,8 +654,7 @@ void ceph_con_reset_session(struct ceph_connection *con)
con->in_seq = 0;
con->in_seq_acked = 0;
con->connect_seq = 0;
con->peer_global_seq = 0;
ceph_con_v1_reset_session(con);
}
/*
......@@ -692,12 +702,17 @@ void ceph_con_open(struct ceph_connection *con,
}
EXPORT_SYMBOL(ceph_con_open);
bool ceph_con_v1_opened(struct ceph_connection *con)
{
return con->connect_seq;
}
/*
* return true if this connection ever successfully opened
*/
bool ceph_con_opened(struct ceph_connection *con)
{
return con->connect_seq > 0;
return ceph_con_v1_opened(con);
}
/*
......@@ -2552,7 +2567,7 @@ static int read_keepalive_ack(struct ceph_connection *con)
* Write something to the socket. Called in a worker thread when the
* socket appears to be writeable and we have something ready to send.
*/
static int try_write(struct ceph_connection *con)
int ceph_con_v1_try_write(struct ceph_connection *con)
{
int ret = 1;
......@@ -2649,7 +2664,7 @@ static int try_write(struct ceph_connection *con)
/*
* Read what we can from the socket.
*/
static int try_read(struct ceph_connection *con)
int ceph_con_v1_try_read(struct ceph_connection *con)
{
int ret = -1;
......@@ -2930,7 +2945,7 @@ static void ceph_con_workfn(struct work_struct *work)
BUG_ON(con->sock);
}
ret = try_read(con);
ret = ceph_con_v1_try_read(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
......@@ -2940,7 +2955,7 @@ static void ceph_con_workfn(struct work_struct *work)
break;
}
ret = try_write(con);
ret = ceph_con_v1_try_write(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
......@@ -3116,6 +3131,29 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
}
EXPORT_SYMBOL(ceph_con_send);
void ceph_con_v1_revoke(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
WARN_ON(con->out_skip);
/* footer */
if (con->out_msg_done) {
con->out_skip += con_out_kvec_skip(con);
} else {
WARN_ON(!msg->data_length);
con->out_skip += sizeof_footer(con);
}
/* data, middle, front */
if (msg->data_length)
con->out_skip += msg->cursor.total_resid;
if (msg->middle)
con->out_skip += con_out_kvec_skip(con);
con->out_skip += con_out_kvec_skip(con);
dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
con->out_kvec_bytes, con->out_skip);
}
/*
* Revoke a message that was previously queued for send
*/
......@@ -3129,39 +3167,50 @@ void ceph_msg_revoke(struct ceph_msg *msg)
}
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head);
msg->hdr.seq = 0;
ceph_msg_put(msg);
if (list_empty(&msg->list_head)) {
WARN_ON(con->out_msg == msg);
dout("%s con %p msg %p not linked\n", __func__, con, msg);
mutex_unlock(&con->mutex);
return;
}
if (con->out_msg == msg) {
BUG_ON(con->out_skip);
/* footer */
if (con->out_msg_done) {
con->out_skip += con_out_kvec_skip(con);
} else {
BUG_ON(!msg->data_length);
con->out_skip += sizeof_footer(con);
}
/* data, middle, front */
if (msg->data_length)
con->out_skip += msg->cursor.total_resid;
if (msg->middle)
con->out_skip += con_out_kvec_skip(con);
con->out_skip += con_out_kvec_skip(con);
dout("%s %p msg %p - was sending, will write %d skip %d\n",
__func__, con, msg, con->out_kvec_bytes, con->out_skip);
msg->hdr.seq = 0;
dout("%s con %p msg %p was linked\n", __func__, con, msg);
msg->hdr.seq = 0;
ceph_msg_remove(msg);
if (con->out_msg == msg) {
WARN_ON(con->state != CEPH_CON_S_OPEN);
dout("%s con %p msg %p was sending\n", __func__, con, msg);
ceph_con_v1_revoke(con);
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
ceph_msg_put(msg);
} else {
dout("%s con %p msg %p not current, out_msg %p\n", __func__,
con, msg, con->out_msg);
}
mutex_unlock(&con->mutex);
}
void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
{
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */
con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) -
front_len -
middle_len -
data_len -
sizeof(struct ceph_msg_footer);
con->in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
}
/*
* Revoke a message that we may be reading data into
*/
......@@ -3176,25 +3225,14 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
mutex_lock(&con->mutex);
if (con->in_msg == msg) {
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */
dout("%s %p msg %p revoked\n", __func__, con, msg);
con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) -
front_len -
middle_len -
data_len -
sizeof(struct ceph_msg_footer);
WARN_ON(con->state != CEPH_CON_S_OPEN);
dout("%s con %p msg %p was recving\n", __func__, con, msg);
ceph_con_v1_revoke_incoming(con);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
} else {
dout("%s %p in_msg %p msg %p no-op\n",
__func__, con, con->in_msg, msg);
dout("%s con %p msg %p not current, in_msg %p\n", __func__,
con, msg, con->in_msg);
}
mutex_unlock(&con->mutex);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment