Commit e013f74b authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph update from Sage Weil:
 "There are a few fixes for snapshot behavior with CephFS and support
  for the new keepalive protocol from Zheng, a libceph fix that affects
  both RBD and CephFS, a few bug fixes and cleanups for RBD from Ilya,
  and several small fixes and cleanups from Jianpeng and others"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: improve readahead for file holes
  ceph: get inode size for each append write
  libceph: check data_len in ->alloc_msg()
  libceph: use keepalive2 to verify the mon session is alive
  rbd: plug rbd_dev->header.object_prefix memory leak
  rbd: fix double free on rbd_dev->header_name
  libceph: set 'exists' flag for newly up osd
  ceph: cleanup use of ceph_msg_get
  ceph: no need to get parent inode in ceph_open
  ceph: remove the useless judgement
  ceph: remove redundant test of head->safe and silence static analysis warnings
  ceph: fix queuing inode to mdsdir's snaprealm
  libceph: rename con_work() to ceph_con_workfn()
  libceph: Avoid holding the zero page on ceph_msgr_slab_init errors
  libceph: remove the unused macro AES_KEY_SIZE
  ceph: invalidate dirty pages after forced umount
  ceph: EIO all operations after forced umount
parents 01cab554 43838685
...@@ -4673,7 +4673,10 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) ...@@ -4673,7 +4673,10 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
} }
ret = rbd_dev_v2_snap_context(rbd_dev); ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret); if (ret && first_time) {
kfree(rbd_dev->header.object_prefix);
rbd_dev->header.object_prefix = NULL;
}
return ret; return ret;
} }
...@@ -5154,7 +5157,6 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev) ...@@ -5154,7 +5157,6 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
out_err: out_err:
if (parent) { if (parent) {
rbd_dev_unparent(rbd_dev); rbd_dev_unparent(rbd_dev);
kfree(rbd_dev->header_name);
rbd_dev_destroy(parent); rbd_dev_destroy(parent);
} else { } else {
rbd_put_client(rbdc); rbd_put_client(rbdc);
......
...@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) ...@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
for (i = 0; i < num_pages; i++) { for (i = 0; i < num_pages; i++) {
struct page *page = osd_data->pages[i]; struct page *page = osd_data->pages[i];
if (rc < 0) if (rc < 0 && rc != ENOENT)
goto unlock; goto unlock;
if (bytes < (int)PAGE_CACHE_SIZE) { if (bytes < (int)PAGE_CACHE_SIZE) {
/* zero (remainder of) page */ /* zero (remainder of) page */
...@@ -717,8 +717,10 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -717,8 +717,10 @@ static int ceph_writepages_start(struct address_space *mapping,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" : wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
pr_warn("writepage_start %p on forced umount\n", inode); pr_warn("writepage_start %p on forced umount\n", inode);
truncate_pagecache(inode, 0);
mapping_set_error(mapping, -EIO);
return -EIO; /* we're in a forced umount, don't write! */ return -EIO; /* we're in a forced umount, don't write! */
} }
if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize) if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
......
...@@ -2413,6 +2413,14 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2413,6 +2413,14 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
goto out_unlock; goto out_unlock;
} }
if (!__ceph_is_any_caps(ci) &&
ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
*err = -EIO;
ret = 1;
goto out_unlock;
}
dout("get_cap_refs %p have %s needed %s\n", inode, dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need)); ceph_cap_string(have), ceph_cap_string(need));
} }
......
...@@ -136,7 +136,6 @@ int ceph_open(struct inode *inode, struct file *file) ...@@ -136,7 +136,6 @@ int ceph_open(struct inode *inode, struct file *file)
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_file_info *cf = file->private_data; struct ceph_file_info *cf = file->private_data;
struct inode *parent_inode = NULL;
int err; int err;
int flags, fmode, wanted; int flags, fmode, wanted;
...@@ -210,10 +209,7 @@ int ceph_open(struct inode *inode, struct file *file) ...@@ -210,10 +209,7 @@ int ceph_open(struct inode *inode, struct file *file)
ihold(inode); ihold(inode);
req->r_num_caps = 1; req->r_num_caps = 1;
if (flags & O_CREAT) err = ceph_mdsc_do_request(mdsc, NULL, req);
parent_inode = ceph_get_dentry_parent_inode(file->f_path.dentry);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode);
if (!err) if (!err)
err = ceph_init_file(inode, file, req->r_fmode); err = ceph_init_file(inode, file, req->r_fmode);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
...@@ -279,7 +275,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -279,7 +275,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
if (err) if (err)
goto out_req; goto out_req;
if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry); err = ceph_handle_notrace_create(dir, dentry);
if (d_unhashed(dentry)) { if (d_unhashed(dentry)) {
...@@ -956,6 +952,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -956,6 +952,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
/* We can write back this queue in page reclaim */ /* We can write back this queue in page reclaim */
current->backing_dev_info = inode_to_bdi(inode); current->backing_dev_info = inode_to_bdi(inode);
if (iocb->ki_flags & IOCB_APPEND) {
err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
if (err < 0)
goto out;
}
err = generic_write_checks(iocb, from); err = generic_write_checks(iocb, from);
if (err <= 0) if (err <= 0)
goto out; goto out;
......
...@@ -2107,7 +2107,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -2107,7 +2107,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
msg = create_request_message(mdsc, req, mds, drop_cap_releases); msg = create_request_message(mdsc, req, mds, drop_cap_releases);
if (IS_ERR(msg)) { if (IS_ERR(msg)) {
req->r_err = PTR_ERR(msg); req->r_err = PTR_ERR(msg);
complete_request(mdsc, req);
return PTR_ERR(msg); return PTR_ERR(msg);
} }
req->r_request = msg; req->r_request = msg;
...@@ -2135,7 +2134,7 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -2135,7 +2134,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
{ {
struct ceph_mds_session *session = NULL; struct ceph_mds_session *session = NULL;
int mds = -1; int mds = -1;
int err = -EAGAIN; int err = 0;
if (req->r_err || req->r_got_result) { if (req->r_err || req->r_got_result) {
if (req->r_aborted) if (req->r_aborted)
...@@ -2149,6 +2148,11 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -2149,6 +2148,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
err = -EIO; err = -EIO;
goto finish; goto finish;
} }
if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
dout("do_request forced umount\n");
err = -EIO;
goto finish;
}
put_request_session(req); put_request_session(req);
...@@ -2196,13 +2200,15 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -2196,13 +2200,15 @@ static int __do_request(struct ceph_mds_client *mdsc,
out_session: out_session:
ceph_put_mds_session(session); ceph_put_mds_session(session);
finish:
if (err) {
dout("__do_request early error %d\n", err);
req->r_err = err;
complete_request(mdsc, req);
__unregister_request(mdsc, req);
}
out: out:
return err; return err;
finish:
req->r_err = err;
complete_request(mdsc, req);
goto out;
} }
/* /*
...@@ -2289,8 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -2289,8 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
if (req->r_err) { if (req->r_err) {
err = req->r_err; err = req->r_err;
__unregister_request(mdsc, req);
dout("do_request early error %d\n", err);
goto out; goto out;
} }
...@@ -2411,7 +2415,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2411,7 +2415,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
goto out; goto out;
} }
if (req->r_got_safe && !head->safe) { if (req->r_got_safe) {
pr_warn("got unsafe after safe on %llu from mds%d\n", pr_warn("got unsafe after safe on %llu from mds%d\n",
tid, mds); tid, mds);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
...@@ -2520,8 +2524,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2520,8 +2524,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (err) { if (err) {
req->r_err = err; req->r_err = err;
} else { } else {
req->r_reply = msg; req->r_reply = ceph_msg_get(msg);
ceph_msg_get(msg);
req->r_got_result = true; req->r_got_result = true;
} }
} else { } else {
...@@ -3555,7 +3558,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ...@@ -3555,7 +3558,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{ {
u64 want_tid, want_flush, want_snap; u64 want_tid, want_flush, want_snap;
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return; return;
dout("sync\n"); dout("sync\n");
...@@ -3584,7 +3587,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ...@@ -3584,7 +3587,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
*/ */
static bool done_closing_sessions(struct ceph_mds_client *mdsc) static bool done_closing_sessions(struct ceph_mds_client *mdsc)
{ {
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return true; return true;
return atomic_read(&mdsc->num_sessions) == 0; return atomic_read(&mdsc->num_sessions) == 0;
} }
...@@ -3643,6 +3646,34 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) ...@@ -3643,6 +3646,34 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
dout("stopped\n"); dout("stopped\n");
} }
void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
{
struct ceph_mds_session *session;
int mds;
dout("force umount\n");
mutex_lock(&mdsc->mutex);
for (mds = 0; mds < mdsc->max_sessions; mds++) {
session = __ceph_lookup_mds_session(mdsc, mds);
if (!session)
continue;
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
__close_session(mdsc, session);
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
mutex_lock(&mdsc->mutex);
kick_requests(mdsc, mds);
}
__wake_requests(mdsc, &mdsc->waiting_for_map);
mutex_unlock(&mdsc->mutex);
}
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
{ {
dout("stop\n"); dout("stop\n");
......
...@@ -366,6 +366,7 @@ extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc, ...@@ -366,6 +366,7 @@ extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc,
extern int ceph_mdsc_init(struct ceph_fs_client *fsc); extern int ceph_mdsc_init(struct ceph_fs_client *fsc);
extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc); extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc);
extern void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc);
extern void ceph_mdsc_destroy(struct ceph_fs_client *fsc); extern void ceph_mdsc_destroy(struct ceph_fs_client *fsc);
extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc); extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
......
...@@ -338,12 +338,6 @@ static int build_snap_context(struct ceph_snap_realm *realm) ...@@ -338,12 +338,6 @@ static int build_snap_context(struct ceph_snap_realm *realm)
return 0; return 0;
} }
if (num == 0 && realm->seq == ceph_empty_snapc->seq) {
ceph_get_snap_context(ceph_empty_snapc);
snapc = ceph_empty_snapc;
goto done;
}
/* alloc new snap context */ /* alloc new snap context */
err = -ENOMEM; err = -ENOMEM;
if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
...@@ -381,7 +375,6 @@ static int build_snap_context(struct ceph_snap_realm *realm) ...@@ -381,7 +375,6 @@ static int build_snap_context(struct ceph_snap_realm *realm)
realm->ino, realm, snapc, snapc->seq, realm->ino, realm, snapc, snapc->seq,
(unsigned int) snapc->num_snaps); (unsigned int) snapc->num_snaps);
done:
ceph_put_snap_context(realm->cached_context); ceph_put_snap_context(realm->cached_context);
realm->cached_context = snapc; realm->cached_context = snapc;
return 0; return 0;
......
...@@ -708,6 +708,7 @@ static void ceph_umount_begin(struct super_block *sb) ...@@ -708,6 +708,7 @@ static void ceph_umount_begin(struct super_block *sb)
if (!fsc) if (!fsc)
return; return;
fsc->mount_state = CEPH_MOUNT_SHUTDOWN; fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
ceph_mdsc_force_umount(fsc->mdsc);
return; return;
} }
......
...@@ -46,6 +46,7 @@ struct ceph_options { ...@@ -46,6 +46,7 @@ struct ceph_options {
unsigned long mount_timeout; /* jiffies */ unsigned long mount_timeout; /* jiffies */
unsigned long osd_idle_ttl; /* jiffies */ unsigned long osd_idle_ttl; /* jiffies */
unsigned long osd_keepalive_timeout; /* jiffies */ unsigned long osd_keepalive_timeout; /* jiffies */
unsigned long monc_ping_timeout; /* jiffies */
/* /*
* any type that can't be simply compared or doesn't need need * any type that can't be simply compared or doesn't need need
...@@ -66,6 +67,7 @@ struct ceph_options { ...@@ -66,6 +67,7 @@ struct ceph_options {
#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000) #define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000) #define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000) #define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000)
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
......
...@@ -248,6 +248,8 @@ struct ceph_connection { ...@@ -248,6 +248,8 @@ struct ceph_connection {
int in_base_pos; /* bytes read */ int in_base_pos; /* bytes read */
__le64 in_temp_ack; /* for reading an ack */ __le64 in_temp_ack; /* for reading an ack */
struct timespec last_keepalive_ack;
struct delayed_work work; /* send|recv work */ struct delayed_work work; /* send|recv work */
unsigned long delay; /* current delay interval */ unsigned long delay; /* current delay interval */
}; };
...@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg); ...@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg);
extern void ceph_msg_revoke_incoming(struct ceph_msg *msg); extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con); extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);
extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment); size_t length, size_t alignment);
......
...@@ -84,10 +84,12 @@ struct ceph_entity_inst { ...@@ -84,10 +84,12 @@ struct ceph_entity_inst {
#define CEPH_MSGR_TAG_MSG 7 /* message */ #define CEPH_MSGR_TAG_MSG 7 /* message */
#define CEPH_MSGR_TAG_ACK 8 /* message ack */ #define CEPH_MSGR_TAG_ACK 8 /* message ack */
#define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */ #define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */
#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */ #define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */ #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
#define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */ #define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */
#define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */ #define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */
#define CEPH_MSGR_TAG_KEEPALIVE2 14 /* keepalive2 byte + ceph_timespec */
#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive2 reply */
/* /*
......
...@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name, ...@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name,
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
/* get mon ip(s) */ /* get mon ip(s) */
/* ip1[:port1][,ip2[:port2]...] */ /* ip1[:port1][,ip2[:port2]...] */
......
...@@ -79,10 +79,6 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey) ...@@ -79,10 +79,6 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey)
return 0; return 0;
} }
#define AES_KEY_SIZE 16
static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void)
{ {
return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
......
...@@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; ...@@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache;
static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class; static struct lock_class_key socket_class;
...@@ -176,7 +177,7 @@ static struct lock_class_key socket_class; ...@@ -176,7 +177,7 @@ static struct lock_class_key socket_class;
static void queue_con(struct ceph_connection *con); static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con);
static void con_work(struct work_struct *); static void ceph_con_workfn(struct work_struct *);
static void con_fault(struct ceph_connection *con); static void con_fault(struct ceph_connection *con);
/* /*
...@@ -276,22 +277,22 @@ static void _ceph_msgr_exit(void) ...@@ -276,22 +277,22 @@ static void _ceph_msgr_exit(void)
ceph_msgr_wq = NULL; ceph_msgr_wq = NULL;
} }
ceph_msgr_slab_exit();
BUG_ON(zero_page == NULL); BUG_ON(zero_page == NULL);
page_cache_release(zero_page); page_cache_release(zero_page);
zero_page = NULL; zero_page = NULL;
ceph_msgr_slab_exit();
} }
int ceph_msgr_init(void) int ceph_msgr_init(void)
{ {
if (ceph_msgr_slab_init())
return -ENOMEM;
BUG_ON(zero_page != NULL); BUG_ON(zero_page != NULL);
zero_page = ZERO_PAGE(0); zero_page = ZERO_PAGE(0);
page_cache_get(zero_page); page_cache_get(zero_page);
if (ceph_msgr_slab_init())
return -ENOMEM;
/* /*
* The number of active work items is limited by the number of * The number of active work items is limited by the number of
* connections, so leave @max_active at default. * connections, so leave @max_active at default.
...@@ -749,7 +750,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, ...@@ -749,7 +750,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
mutex_init(&con->mutex); mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent); INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work); INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
con->state = CON_STATE_CLOSED; con->state = CON_STATE_CLOSED;
} }
...@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con) ...@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con)
{ {
dout("prepare_write_keepalive %p\n", con); dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con); con_out_kvec_reset(con);
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
struct timespec ts = CURRENT_TIME;
struct ceph_timespec ceph_ts;
ceph_encode_timespec(&ceph_ts, &ts);
con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
con_out_kvec_add(con, sizeof(ceph_ts), &ceph_ts);
} else {
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
}
con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CON_FLAG_WRITE_PENDING);
} }
...@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con) ...@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con)
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
} }
static void prepare_read_keepalive_ack(struct ceph_connection *con)
{
dout("prepare_read_keepalive_ack %p\n", con);
con->in_base_pos = 0;
}
/* /*
* Prepare to read a message. * Prepare to read a message.
*/ */
...@@ -2322,13 +2337,6 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -2322,13 +2337,6 @@ static int read_partial_message(struct ceph_connection *con)
return ret; return ret;
BUG_ON(!con->in_msg ^ skip); BUG_ON(!con->in_msg ^ skip);
if (con->in_msg && data_len > con->in_msg->data_length) {
pr_warn("%s skipping long message (%u > %zd)\n",
__func__, data_len, con->in_msg->data_length);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
skip = 1;
}
if (skip) { if (skip) {
/* skip this message */ /* skip this message */
dout("alloc_msg said skip message\n"); dout("alloc_msg said skip message\n");
...@@ -2457,6 +2465,17 @@ static void process_message(struct ceph_connection *con) ...@@ -2457,6 +2465,17 @@ static void process_message(struct ceph_connection *con)
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
} }
static int read_keepalive_ack(struct ceph_connection *con)
{
struct ceph_timespec ceph_ts;
size_t size = sizeof(ceph_ts);
int ret = read_partial(con, size, size, &ceph_ts);
if (ret <= 0)
return ret;
ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
prepare_read_tag(con);
return 1;
}
/* /*
* Write something to the socket. Called in a worker thread when the * Write something to the socket. Called in a worker thread when the
...@@ -2526,6 +2545,10 @@ static int try_write(struct ceph_connection *con) ...@@ -2526,6 +2545,10 @@ static int try_write(struct ceph_connection *con)
do_next: do_next:
if (con->state == CON_STATE_OPEN) { if (con->state == CON_STATE_OPEN) {
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
/* is anything else pending? */ /* is anything else pending? */
if (!list_empty(&con->out_queue)) { if (!list_empty(&con->out_queue)) {
prepare_write_message(con); prepare_write_message(con);
...@@ -2535,10 +2558,6 @@ static int try_write(struct ceph_connection *con) ...@@ -2535,10 +2558,6 @@ static int try_write(struct ceph_connection *con)
prepare_write_ack(con); prepare_write_ack(con);
goto more; goto more;
} }
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
} }
/* Nothing to do! */ /* Nothing to do! */
...@@ -2641,6 +2660,9 @@ static int try_read(struct ceph_connection *con) ...@@ -2641,6 +2660,9 @@ static int try_read(struct ceph_connection *con)
case CEPH_MSGR_TAG_ACK: case CEPH_MSGR_TAG_ACK:
prepare_read_ack(con); prepare_read_ack(con);
break; break;
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
prepare_read_keepalive_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE: case CEPH_MSGR_TAG_CLOSE:
con_close_socket(con); con_close_socket(con);
con->state = CON_STATE_CLOSED; con->state = CON_STATE_CLOSED;
...@@ -2684,6 +2706,12 @@ static int try_read(struct ceph_connection *con) ...@@ -2684,6 +2706,12 @@ static int try_read(struct ceph_connection *con)
process_ack(con); process_ack(con);
goto more; goto more;
} }
if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
ret = read_keepalive_ack(con);
if (ret <= 0)
goto out;
goto more;
}
out: out:
dout("try_read done on %p ret %d\n", con, ret); dout("try_read done on %p ret %d\n", con, ret);
...@@ -2799,7 +2827,7 @@ static void con_fault_finish(struct ceph_connection *con) ...@@ -2799,7 +2827,7 @@ static void con_fault_finish(struct ceph_connection *con)
/* /*
* Do some work on a connection. Drop a connection ref when we're done. * Do some work on a connection. Drop a connection ref when we're done.
*/ */
static void con_work(struct work_struct *work) static void ceph_con_workfn(struct work_struct *work)
{ {
struct ceph_connection *con = container_of(work, struct ceph_connection, struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work); work.work);
...@@ -3101,6 +3129,20 @@ void ceph_con_keepalive(struct ceph_connection *con) ...@@ -3101,6 +3129,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
} }
EXPORT_SYMBOL(ceph_con_keepalive); EXPORT_SYMBOL(ceph_con_keepalive);
bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval)
{
if (interval > 0 &&
(con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
struct timespec now = CURRENT_TIME;
struct timespec ts;
jiffies_to_timespec(interval, &ts);
ts = timespec_add(con->last_keepalive_ack, ts);
return timespec_compare(&now, &ts) >= 0;
}
return false;
}
static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
{ {
struct ceph_msg_data *data; struct ceph_msg_data *data;
......
...@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc) ...@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc)
CEPH_ENTITY_TYPE_MON, monc->cur_mon, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
&monc->monmap->mon_inst[monc->cur_mon].addr); &monc->monmap->mon_inst[monc->cur_mon].addr);
/* send an initial keepalive to ensure our timestamp is
* valid by the time we are in an OPENED state */
ceph_con_keepalive(&monc->con);
/* initiatiate authentication handshake */ /* initiatiate authentication handshake */
ret = ceph_auth_build_hello(monc->auth, ret = ceph_auth_build_hello(monc->auth,
monc->m_auth->front.iov_base, monc->m_auth->front.iov_base,
...@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc) ...@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/ */
static void __schedule_delayed(struct ceph_mon_client *monc) static void __schedule_delayed(struct ceph_mon_client *monc)
{ {
unsigned int delay; struct ceph_options *opt = monc->client->options;
unsigned long delay;
if (monc->cur_mon < 0 || __sub_expired(monc)) if (monc->cur_mon < 0 || __sub_expired(monc)) {
delay = 10 * HZ; delay = 10 * HZ;
else } else {
delay = 20 * HZ; delay = 20 * HZ;
dout("__schedule_delayed after %u\n", delay); if (opt->monc_ping_timeout > 0)
schedule_delayed_work(&monc->delayed_work, delay); delay = min(delay, opt->monc_ping_timeout / 3);
}
dout("__schedule_delayed after %lu\n", delay);
schedule_delayed_work(&monc->delayed_work,
round_jiffies_relative(delay));
} }
/* /*
...@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work) ...@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work)
__close_session(monc); __close_session(monc);
__open_session(monc); /* continue hunting */ __open_session(monc); /* continue hunting */
} else { } else {
ceph_con_keepalive(&monc->con); struct ceph_options *opt = monc->client->options;
int is_auth = ceph_auth_is_authenticated(monc->auth);
if (ceph_con_keepalive_expired(&monc->con,
opt->monc_ping_timeout)) {
dout("monc keepalive timeout\n");
is_auth = 0;
__close_session(monc);
monc->hunting = true;
__open_session(monc);
}
__validate_auth(monc); if (!monc->hunting) {
ceph_con_keepalive(&monc->con);
__validate_auth(monc);
}
if (ceph_auth_is_authenticated(monc->auth)) if (is_auth)
__send_subscribe(monc); __send_subscribe(monc);
} }
__schedule_delayed(monc); __schedule_delayed(monc);
......
...@@ -2817,8 +2817,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2817,8 +2817,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
} }
/* /*
* lookup and return message for incoming reply. set up reply message * Lookup and return message for incoming reply. Don't try to do
* pages. * anything about a larger than preallocated data portion of the
* message at the moment - for now, just skip the message.
*/ */
static struct ceph_msg *get_reply(struct ceph_connection *con, static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr, struct ceph_msg_header *hdr,
...@@ -2836,10 +2837,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2836,10 +2837,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
if (!req) { if (!req) {
*skip = 1; pr_warn("%s osd%d tid %llu unknown, skipping\n",
__func__, osd->o_osd, tid);
m = NULL; m = NULL;
dout("get_reply unknown tid %llu from osd%d\n", tid, *skip = 1;
osd->o_osd);
goto out; goto out;
} }
...@@ -2849,10 +2850,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2849,10 +2850,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
ceph_msg_revoke_incoming(req->r_reply); ceph_msg_revoke_incoming(req->r_reply);
if (front_len > req->r_reply->front_alloc_len) { if (front_len > req->r_reply->front_alloc_len) {
pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
front_len, req->r_reply->front_alloc_len, __func__, osd->o_osd, req->r_tid, front_len,
(unsigned int)con->peer_name.type, req->r_reply->front_alloc_len);
le64_to_cpu(con->peer_name.num));
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
false); false);
if (!m) if (!m)
...@@ -2860,37 +2860,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2860,37 +2860,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
req->r_reply = m; req->r_reply = m;
} }
m = ceph_msg_get(req->r_reply);
if (data_len > 0) {
struct ceph_osd_data *osd_data;
/* if (data_len > req->r_reply->data_length) {
* XXX This is assuming there is only one op containing pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
* XXX page data. Probably OK for reads, but this __func__, osd->o_osd, req->r_tid, data_len,
* XXX ought to be done more generally. req->r_reply->data_length);
*/ m = NULL;
osd_data = osd_req_op_extent_osd_data(req, 0); *skip = 1;
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { goto out;
if (osd_data->pages &&
unlikely(osd_data->length < data_len)) {
pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
tid, data_len, osd_data->length);
*skip = 1;
ceph_msg_put(m);
m = NULL;
goto out;
}
}
} }
*skip = 0;
m = ceph_msg_get(req->r_reply);
dout("get_reply tid %lld %p\n", tid, m); dout("get_reply tid %lld %p\n", tid, m);
out: out:
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
return m; return m;
} }
static struct ceph_msg *alloc_msg(struct ceph_connection *con, static struct ceph_msg *alloc_msg(struct ceph_connection *con,
......
...@@ -1300,7 +1300,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ...@@ -1300,7 +1300,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
ceph_decode_addr(&addr); ceph_decode_addr(&addr);
pr_info("osd%d up\n", osd); pr_info("osd%d up\n", osd);
BUG_ON(osd >= map->max_osd); BUG_ON(osd >= map->max_osd);
map->osd_state[osd] |= CEPH_OSD_UP; map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS;
map->osd_addr[osd] = addr; map->osd_addr[osd] = addr;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment