Commit 682c30ed authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (39 commits)
  ceph: generalize mon requests, add pool op support
  ceph: only queue async writeback on cap revocation if there is dirty data
  ceph: do not ignore osd_idle_ttl mount option
  ceph: constify dentry_operations
  ceph: whitespace cleanup
  ceph: add flock/fcntl lock support
  ceph: define on-wire types, constants for file locking support
  ceph: add CEPH_FEATURE_FLOCK to the supported feature bits
  ceph: support v2 reconnect encoding
  ceph: support v2 client_caps encoding
  ceph: move AES iv definition to shared header
  ceph: fix decoding of pool snap info
  ceph: make ->sync_fs not wait if wait==0
  ceph: warn on missing snap realm
  ceph: print useful error message when crush rule not found
  ceph: use %pU to print uuid (fsid)
  ceph: sync header defs with server code
  ceph: clean up header guards
  ceph: strip misleading/obsolete version, feature info
  ceph: specify supported features in super.h
  ...
parents 84479f3c e56fa10e
......@@ -6,7 +6,7 @@ ifneq ($(KERNELRELEASE),)
obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o \
messenger.o msgpool.o buffer.o pagelist.o \
mds_client.o mdsmap.o \
......
......@@ -309,7 +309,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
zero_user_segment(page, s, PAGE_CACHE_SIZE);
}
if (add_to_page_cache_lru(page, mapping, page->index, GFP_NOFS)) {
if (add_to_page_cache_lru(page, mapping, page->index,
GFP_NOFS)) {
page_cache_release(page);
dout("readpages %p add_to_page_cache failed %p\n",
inode, page);
......@@ -552,7 +553,7 @@ static void writepages_finish(struct ceph_osd_request *req,
* page truncation thread, possibly losing some data that
* raced its way in
*/
if ((issued & CEPH_CAP_FILE_CACHE) == 0)
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
generic_error_remove_page(inode->i_mapping, page);
unlock_page(page);
......@@ -797,9 +798,12 @@ static int ceph_writepages_start(struct address_space *mapping,
dout("%p will write page %p idx %lu\n",
inode, page, page->index);
writeback_stat = atomic_long_inc_return(&client->writeback_count);
if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
writeback_stat =
atomic_long_inc_return(&client->writeback_count);
if (writeback_stat > CONGESTION_ON_THRESH(
client->mount_args->congestion_kb)) {
set_bdi_congested(&client->backing_dev_info,
BLK_RW_ASYNC);
}
set_page_writeback(page);
......@@ -1036,7 +1040,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
*pagep = page;
dout("write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len);
inode, page, (int)pos, (int)len);
r = ceph_update_writeable_page(file, pos, len, page);
} while (r == -EAGAIN);
......
#include <linux/errno.h>
int ceph_armor(char *dst, const char *src, const char *end);
int ceph_unarmor(char *dst, const char *src, const char *end);
/*
* base64 encode/decode.
*/
const char *pem_key = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static const char *pem_key =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static int encode_bits(int c)
{
......
......@@ -20,7 +20,7 @@ static u32 supported_protocols[] = {
CEPH_AUTH_CEPHX
};
int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
static int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
{
switch (protocol) {
case CEPH_AUTH_NONE:
......@@ -133,8 +133,8 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
return -ERANGE;
}
int ceph_build_auth_request(struct ceph_auth_client *ac,
void *msg_buf, size_t msg_len)
static int ceph_build_auth_request(struct ceph_auth_client *ac,
void *msg_buf, size_t msg_len)
{
struct ceph_mon_request_header *monhdr = msg_buf;
void *p = monhdr + 1;
......
......@@ -87,8 +87,8 @@ static int ceph_x_decrypt(struct ceph_crypto_key *secret,
/*
* get existing (or insert new) ticket handler
*/
struct ceph_x_ticket_handler *get_ticket_handler(struct ceph_auth_client *ac,
int service)
static struct ceph_x_ticket_handler *
get_ticket_handler(struct ceph_auth_client *ac, int service)
{
struct ceph_x_ticket_handler *th;
struct ceph_x_info *xi = ac->private;
......@@ -429,7 +429,7 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
auth->struct_v = 1;
auth->key = 0;
for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++)
auth->key ^= *u;
auth->key ^= *(__le64 *)u;
dout(" server_challenge %llx client_challenge %llx key %llx\n",
xi->server_challenge, le64_to_cpu(auth->client_challenge),
le64_to_cpu(auth->key));
......
......@@ -47,22 +47,6 @@ void ceph_buffer_release(struct kref *kref)
kfree(b);
}
int ceph_buffer_alloc(struct ceph_buffer *b, int len, gfp_t gfp)
{
b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN);
if (b->vec.iov_base) {
b->is_vmalloc = false;
} else {
b->vec.iov_base = __vmalloc(len, gfp, PAGE_KERNEL);
b->is_vmalloc = true;
}
if (!b->vec.iov_base)
return -ENOMEM;
b->alloc_len = len;
b->vec.iov_len = len;
return 0;
}
int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end)
{
size_t len;
......
......@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
return cap_str[i];
}
/*
* Cap reservations
*
* Maintain a global pool of preallocated struct ceph_caps, referenced
* by struct ceph_caps_reservations. This ensures that we preallocate
* memory needed to successfully process an MDS response. (If an MDS
* sends us cap information and we fail to process it, we will have
* problems due to the client and MDS being out of sync.)
*
* Reservations are 'owned' by a ceph_cap_reservation context.
*/
static spinlock_t caps_list_lock;
static struct list_head caps_list; /* unused (reserved or unreserved) */
static int caps_total_count; /* total caps allocated */
static int caps_use_count; /* in use */
static int caps_reserve_count; /* unused, reserved */
static int caps_avail_count; /* unused, unreserved */
static int caps_min_count; /* keep at least this many (unreserved) */
void __init ceph_caps_init(void)
void ceph_caps_init(struct ceph_mds_client *mdsc)
{
INIT_LIST_HEAD(&caps_list);
spin_lock_init(&caps_list_lock);
INIT_LIST_HEAD(&mdsc->caps_list);
spin_lock_init(&mdsc->caps_list_lock);
}
void ceph_caps_finalize(void)
void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
struct ceph_cap *cap;
spin_lock(&caps_list_lock);
while (!list_empty(&caps_list)) {
cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
spin_lock(&mdsc->caps_list_lock);
while (!list_empty(&mdsc->caps_list)) {
cap = list_first_entry(&mdsc->caps_list,
struct ceph_cap, caps_item);
list_del(&cap->caps_item);
kmem_cache_free(ceph_cap_cachep, cap);
}
caps_total_count = 0;
caps_avail_count = 0;
caps_use_count = 0;
caps_reserve_count = 0;
caps_min_count = 0;
spin_unlock(&caps_list_lock);
mdsc->caps_total_count = 0;
mdsc->caps_avail_count = 0;
mdsc->caps_use_count = 0;
mdsc->caps_reserve_count = 0;
mdsc->caps_min_count = 0;
spin_unlock(&mdsc->caps_list_lock);
}
void ceph_adjust_min_caps(int delta)
void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
{
spin_lock(&caps_list_lock);
caps_min_count += delta;
BUG_ON(caps_min_count < 0);
spin_unlock(&caps_list_lock);
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_min_count += delta;
BUG_ON(mdsc->caps_min_count < 0);
spin_unlock(&mdsc->caps_list_lock);
}
int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need)
{
int i;
struct ceph_cap *cap;
......@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
dout("reserve caps ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
spin_lock(&caps_list_lock);
if (caps_avail_count >= need)
spin_lock(&mdsc->caps_list_lock);
if (mdsc->caps_avail_count >= need)
have = need;
else
have = caps_avail_count;
caps_avail_count -= have;
caps_reserve_count += have;
BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
caps_avail_count);
spin_unlock(&caps_list_lock);
have = mdsc->caps_avail_count;
mdsc->caps_avail_count -= have;
mdsc->caps_reserve_count += have;
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; i++) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
......@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
}
BUG_ON(have + alloc != need);
spin_lock(&caps_list_lock);
caps_total_count += alloc;
caps_reserve_count += alloc;
list_splice(&newcaps, &caps_list);
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_total_count += alloc;
mdsc->caps_reserve_count += alloc;
list_splice(&newcaps, &mdsc->caps_list);
BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
caps_avail_count);
spin_unlock(&caps_list_lock);
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
ctx->count = need;
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
ctx, caps_total_count, caps_use_count, caps_reserve_count,
caps_avail_count);
ctx, mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;
out_alloc_count:
......@@ -220,26 +205,29 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
return ret;
}
int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx)
{
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
if (ctx->count) {
spin_lock(&caps_list_lock);
BUG_ON(caps_reserve_count < ctx->count);
caps_reserve_count -= ctx->count;
caps_avail_count += ctx->count;
spin_lock(&mdsc->caps_list_lock);
BUG_ON(mdsc->caps_reserve_count < ctx->count);
mdsc->caps_reserve_count -= ctx->count;
mdsc->caps_avail_count += ctx->count;
ctx->count = 0;
dout("unreserve caps %d = %d used + %d resv + %d avail\n",
caps_total_count, caps_use_count, caps_reserve_count,
caps_avail_count);
BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
caps_avail_count);
spin_unlock(&caps_list_lock);
mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count);
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
}
return 0;
}
static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx)
{
struct ceph_cap *cap = NULL;
......@@ -247,71 +235,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (cap) {
caps_use_count++;
caps_total_count++;
mdsc->caps_use_count++;
mdsc->caps_total_count++;
}
return cap;
}
spin_lock(&caps_list_lock);
spin_lock(&mdsc->caps_list_lock);
dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
ctx, ctx->count, caps_total_count, caps_use_count,
caps_reserve_count, caps_avail_count);
ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count);
BUG_ON(!ctx->count);
BUG_ON(ctx->count > caps_reserve_count);
BUG_ON(list_empty(&caps_list));
BUG_ON(ctx->count > mdsc->caps_reserve_count);
BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
caps_reserve_count--;
caps_use_count++;
mdsc->caps_reserve_count--;
mdsc->caps_use_count++;
cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
caps_avail_count);
spin_unlock(&caps_list_lock);
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count + mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
return cap;
}
void ceph_put_cap(struct ceph_cap *cap)
void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
spin_lock(&caps_list_lock);
spin_lock(&mdsc->caps_list_lock);
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
cap, caps_total_count, caps_use_count,
caps_reserve_count, caps_avail_count);
caps_use_count--;
cap, mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count);
mdsc->caps_use_count--;
/*
* Keep some preallocated caps around (ceph_min_count), to
* avoid lots of free/alloc churn.
*/
if (caps_avail_count >= caps_reserve_count + caps_min_count) {
caps_total_count--;
if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
mdsc->caps_min_count) {
mdsc->caps_total_count--;
kmem_cache_free(ceph_cap_cachep, cap);
} else {
caps_avail_count++;
list_add(&cap->caps_item, &caps_list);
mdsc->caps_avail_count++;
list_add(&cap->caps_item, &mdsc->caps_list);
}
BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
caps_avail_count);
spin_unlock(&caps_list_lock);
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count + mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
}
void ceph_reservation_status(struct ceph_client *client,
int *total, int *avail, int *used, int *reserved,
int *min)
{
struct ceph_mds_client *mdsc = &client->mdsc;
if (total)
*total = caps_total_count;
*total = mdsc->caps_total_count;
if (avail)
*avail = caps_avail_count;
*avail = mdsc->caps_avail_count;
if (used)
*used = caps_use_count;
*used = mdsc->caps_use_count;
if (reserved)
*reserved = caps_reserve_count;
*reserved = mdsc->caps_reserve_count;
if (min)
*min = caps_min_count;
*min = mdsc->caps_min_count;
}
/*
......@@ -336,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
return NULL;
}
struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
struct ceph_cap *cap;
spin_lock(&ci->vfs_inode.i_lock);
cap = __get_cap_for_mds(ci, mds);
spin_unlock(&ci->vfs_inode.i_lock);
return cap;
}
/*
* Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else
* -1.
* Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
*/
static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
{
struct ceph_cap *cap;
int mds = -1;
struct rb_node *p;
/* prefer mds with WR|WRBUFFER|EXCL caps */
/* prefer mds with WR|BUFFER|EXCL caps */
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node);
mds = cap->mds;
if (mseq)
*mseq = cap->mseq;
if (cap->issued & (CEPH_CAP_FILE_WR |
CEPH_CAP_FILE_BUFFER |
CEPH_CAP_FILE_EXCL))
......@@ -364,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
{
int mds;
spin_lock(&inode->i_lock);
mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
mds = __ceph_get_cap_mds(ceph_inode(inode));
spin_unlock(&inode->i_lock);
return mds;
}
......@@ -483,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* Each time we receive FILE_CACHE anew, we increment
* i_rdcache_gen.
*/
if ((issued & CEPH_CAP_FILE_CACHE) &&
(had & CEPH_CAP_FILE_CACHE) == 0)
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
ci->i_rdcache_gen++;
/*
......@@ -543,7 +541,7 @@ int ceph_add_cap(struct inode *inode,
new_cap = NULL;
} else {
spin_unlock(&inode->i_lock);
new_cap = get_cap(caps_reservation);
new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
......@@ -588,6 +586,7 @@ int ceph_add_cap(struct inode *inode,
} else {
pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
realmino);
WARN_ON(!realm);
}
}
......@@ -831,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
int want = 0;
int mode;
for (mode = 0; mode < 4; mode++)
for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
if (ci->i_nr_by_mode[mode])
want |= ceph_caps_for_mode(mode);
return want;
......@@ -901,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
ci->i_auth_cap = NULL;
if (removed)
ceph_put_cap(cap);
ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
......@@ -1197,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
*/
void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession)
__releases(ci->vfs_inode->i_lock)
__acquires(ci->vfs_inode->i_lock)
{
struct inode *inode = &ci->vfs_inode;
int mds;
......@@ -1232,7 +1233,13 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
BUG_ON(capsnap->dirty == 0);
/* pick mds, take s_mutex */
mds = __ceph_get_cap_mds(ci, &mseq);
if (ci->i_auth_cap == NULL) {
dout("no auth cap (migrating?), doing nothing\n");
goto out;
}
mds = ci->i_auth_cap->session->s_mds;
mseq = ci->i_auth_cap->mseq;
if (session && session->s_mds != mds) {
dout("oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
......@@ -1251,8 +1258,8 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
}
/*
* if session == NULL, we raced against a cap
* deletion. retry, and we'll get a better
* @mds value next time.
* deletion or migration. retry, and we'll
* get a better @mds value next time.
*/
spin_lock(&inode->i_lock);
goto retry;
......@@ -1290,6 +1297,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
out:
if (psession)
*psession = session;
else if (session) {
......@@ -1435,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
*/
void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session)
__releases(session->s_mutex)
{
struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = &client->mdsc;
......@@ -1510,11 +1517,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
ci->i_rdcache_gen && /* may have cached pages */
(file_wanted == 0 || /* no open files */
(revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
!tried_invalidate) {
dout("check_caps trying to invalidate on %p\n", inode);
if (try_nonblocking_invalidate(inode) < 0) {
if (revoking & CEPH_CAP_FILE_CACHE) {
if (revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) {
dout("check_caps queuing invalidate\n");
queue_invalidate = 1;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
......@@ -2250,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_mds_session *session,
struct ceph_cap *cap,
struct ceph_buffer *xattr_buf)
__releases(inode->i_lock)
__releases(session->s_mutex)
__releases(inode->i_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
......@@ -2278,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
* will invalidate _after_ writeback.)
*/
if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!ci->i_wrbuffer_ref) {
if (try_nonblocking_invalidate(inode) == 0) {
revoked_rdcache = 1;
......@@ -2369,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
/* revocation, grant, or no-op? */
if (cap->issued & ~newcaps) {
dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued),
ceph_cap_string(newcaps));
if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
writeback = 1; /* will delay ack */
else if (dirty & ~newcaps)
check_caps = 1; /* initiate writeback in check_caps */
else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
revoked_rdcache)
check_caps = 2; /* send revoke ack in check_caps */
int revoking = cap->issued & ~newcaps;
dout("revocation: %s -> %s (revoking %s)\n",
ceph_cap_string(cap->issued),
ceph_cap_string(newcaps),
ceph_cap_string(revoking));
if (revoking & used & CEPH_CAP_FILE_BUFFER)
writeback = 1; /* initiate writeback; will delay ack */
else if (revoking == CEPH_CAP_FILE_CACHE &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
queue_invalidate)
; /* do nothing yet, invalidation will be queued */
else if (cap == ci->i_auth_cap)
check_caps = 1; /* check auth cap only */
else
check_caps = 2; /* check all caps */
cap->issued = newcaps;
cap->implemented |= newcaps;
} else if (cap->issued == newcaps) {
......@@ -2568,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
* caller holds s_mutex
*/
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
struct ceph_mds_session *session)
struct ceph_mds_session *session,
int *open_target_sessions)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
......@@ -2600,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
ci->i_cap_exporting_mds = mds;
ci->i_cap_exporting_mseq = mseq;
ci->i_cap_exporting_issued = cap->issued;
/*
* make sure we have open sessions with all possible
* export targets, so that we get the matching IMPORT
*/
*open_target_sessions = 1;
}
__ceph_remove_cap(cap);
}
......@@ -2675,6 +2698,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 size, max_size;
u64 tid;
void *snaptrace;
size_t snaptrace_len;
void *flock;
u32 flock_len;
int open_target_sessions = 0;
dout("handle_caps from mds%d\n", mds);
......@@ -2683,7 +2710,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
snaptrace = h + 1;
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
......@@ -2693,6 +2719,21 @@ void ceph_handle_caps(struct ceph_mds_session *session,
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len);
if (le16_to_cpu(msg->hdr.version) >= 2) {
void *p, *end;
p = snaptrace + snaptrace_len;
end = msg->front.iov_base + msg->front.iov_len;
ceph_decode_32_safe(&p, end, flock_len, bad);
flock = p;
} else {
flock = NULL;
flock_len = 0;
}
mutex_lock(&session->s_mutex);
session->s_seq++;
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
......@@ -2714,7 +2755,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
* along for the mds (who clearly thinks we still have this
* cap).
*/
ceph_add_cap_releases(mdsc, session, -1);
ceph_add_cap_releases(mdsc, session);
ceph_send_cap_releases(mdsc, session);
goto done;
}
......@@ -2726,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done;
case CEPH_CAP_OP_EXPORT:
handle_cap_export(inode, h, session);
handle_cap_export(inode, h, session, &open_target_sessions);
goto done;
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
snaptrace, le32_to_cpu(h->snap_trace_len));
snaptrace, snaptrace_len);
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
session);
goto done_unlocked;
......@@ -2773,6 +2814,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
done_unlocked:
if (inode)
iput(inode);
if (open_target_sessions)
ceph_mdsc_open_export_target_sessions(mdsc, session);
return;
bad:
......
#ifndef _FS_CEPH_FRAG_H
#define _FS_CEPH_FRAG_H
#ifndef FS_CEPH_FRAG_H
#define FS_CEPH_FRAG_H
/*
* "Frags" are a way to describe a subset of a 32-bit number space,
......
......@@ -29,46 +29,44 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
int ceph_flags_to_mode(int flags)
{
int mode;
#ifdef O_DIRECTORY /* fixme */
if ((flags & O_DIRECTORY) == O_DIRECTORY)
return CEPH_FILE_MODE_PIN;
#endif
if ((flags & O_APPEND) == O_APPEND)
flags |= O_WRONLY;
if ((flags & O_ACCMODE) == O_RDWR)
mode = CEPH_FILE_MODE_RDWR;
else if ((flags & O_ACCMODE) == O_WRONLY)
mode = CEPH_FILE_MODE_WR;
else
mode = CEPH_FILE_MODE_RD;
#ifdef O_LAZY
if (flags & O_LAZY)
return CEPH_FILE_MODE_LAZY;
mode |= CEPH_FILE_MODE_LAZY;
#endif
if ((flags & O_APPEND) == O_APPEND)
flags |= O_WRONLY;
flags &= O_ACCMODE;
if ((flags & O_RDWR) == O_RDWR)
return CEPH_FILE_MODE_RDWR;
if ((flags & O_WRONLY) == O_WRONLY)
return CEPH_FILE_MODE_WR;
return CEPH_FILE_MODE_RD;
return mode;
}
int ceph_caps_for_mode(int mode)
{
switch (mode) {
case CEPH_FILE_MODE_PIN:
return CEPH_CAP_PIN;
case CEPH_FILE_MODE_RD:
return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
int caps = CEPH_CAP_PIN;
if (mode & CEPH_FILE_MODE_RD)
caps |= CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
case CEPH_FILE_MODE_RDWR:
return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_EXCL |
CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE |
CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
case CEPH_FILE_MODE_WR:
return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_EXCL |
if (mode & CEPH_FILE_MODE_WR)
caps |= CEPH_CAP_FILE_EXCL |
CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
}
return 0;
if (mode & CEPH_FILE_MODE_LAZY)
caps |= CEPH_CAP_FILE_LAZYIO;
return caps;
}
......@@ -9,26 +9,12 @@
* LGPL2
*/
#ifndef _FS_CEPH_CEPH_FS_H
#define _FS_CEPH_CEPH_FS_H
#ifndef CEPH_FS_H
#define CEPH_FS_H
#include "msgr.h"
#include "rados.h"
/*
* Ceph release version
*/
#define CEPH_VERSION_MAJOR 0
#define CEPH_VERSION_MINOR 20
#define CEPH_VERSION_PATCH 0
#define _CEPH_STRINGIFY(x) #x
#define CEPH_STRINGIFY(x) _CEPH_STRINGIFY(x)
#define CEPH_MAKE_VERSION(x, y, z) CEPH_STRINGIFY(x) "." CEPH_STRINGIFY(y) \
"." CEPH_STRINGIFY(z)
#define CEPH_VERSION CEPH_MAKE_VERSION(CEPH_VERSION_MAJOR, \
CEPH_VERSION_MINOR, CEPH_VERSION_PATCH)
/*
* subprotocol versions. when specific messages types or high-level
* protocols change, bump the affected components. we keep rev
......@@ -53,18 +39,10 @@
/*
* feature bits
*/
#define CEPH_FEATURE_UID 1
#define CEPH_FEATURE_NOSRCADDR 2
#define CEPH_FEATURE_FLOCK 4
#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_FLOCK
#define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_CLIENT CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_UID (1<<0)
#define CEPH_FEATURE_NOSRCADDR (1<<1)
#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
#define CEPH_FEATURE_FLOCK (1<<3)
/*
......@@ -96,6 +74,8 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
#define CEPH_CRYPTO_NONE 0x0
#define CEPH_CRYPTO_AES 0x1
#define CEPH_AES_IV "cephsageyudagreg"
/* security/authentication protocols */
#define CEPH_AUTH_UNKNOWN 0x0
#define CEPH_AUTH_NONE 0x1
......@@ -275,6 +255,7 @@ extern const char *ceph_mds_state_name(int s);
#define CEPH_LOCK_IDFT 512 /* dir frag tree */
#define CEPH_LOCK_INEST 1024 /* mds internal */
#define CEPH_LOCK_IXATTR 2048
#define CEPH_LOCK_IFLOCK 4096 /* advisory file locks */
#define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */
/* client_session ops */
......@@ -316,6 +297,8 @@ enum {
CEPH_MDS_OP_RMXATTR = 0x01106,
CEPH_MDS_OP_SETLAYOUT = 0x01107,
CEPH_MDS_OP_SETATTR = 0x01108,
CEPH_MDS_OP_SETFILELOCK= 0x01109,
CEPH_MDS_OP_GETFILELOCK= 0x00110,
CEPH_MDS_OP_MKNOD = 0x01201,
CEPH_MDS_OP_LINK = 0x01202,
......@@ -386,6 +369,15 @@ union ceph_mds_request_args {
struct {
struct ceph_file_layout layout;
} __attribute__ ((packed)) setlayout;
struct {
__u8 rule; /* currently fcntl or flock */
__u8 type; /* shared, exclusive, remove*/
__le64 pid; /* process id requesting the lock */
__le64 pid_namespace;
__le64 start; /* initial location to lock */
__le64 length; /* num bytes to lock from start */
__u8 wait; /* will caller wait for lock to become available? */
} __attribute__ ((packed)) filelock_change;
} __attribute__ ((packed));
#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */
......@@ -480,6 +472,23 @@ struct ceph_mds_reply_dirfrag {
__le32 dist[];
} __attribute__ ((packed));
#define CEPH_LOCK_FCNTL 1
#define CEPH_LOCK_FLOCK 2
#define CEPH_LOCK_SHARED 1
#define CEPH_LOCK_EXCL 2
#define CEPH_LOCK_UNLOCK 4
struct ceph_filelock {
__le64 start;/* file offset to start lock at */
__le64 length; /* num bytes to lock; 0 for all following start */
__le64 client; /* which client holds the lock */
__le64 pid; /* process id holding the lock on the client */
__le64 pid_namespace;
__u8 type; /* shared lock, exclusive lock, or unlock */
} __attribute__ ((packed));
/* file access modes */
#define CEPH_FILE_MODE_PIN 0
#define CEPH_FILE_MODE_RD 1
......@@ -508,9 +517,10 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_SAUTH 2
#define CEPH_CAP_SLINK 4
#define CEPH_CAP_SXATTR 6
#define CEPH_CAP_SFILE 8 /* goes at the end (uses >2 cap bits) */
#define CEPH_CAP_SFILE 8
#define CEPH_CAP_SFLOCK 20
#define CEPH_CAP_BITS 16
#define CEPH_CAP_BITS 22
/* composed values */
#define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH)
......@@ -528,6 +538,9 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_FILE_BUFFER (CEPH_CAP_GBUFFER << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_WREXTEND (CEPH_CAP_GWREXTEND << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_LAZYIO (CEPH_CAP_GLAZYIO << CEPH_CAP_SFILE)
#define CEPH_CAP_FLOCK_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SFLOCK)
#define CEPH_CAP_FLOCK_EXCL (CEPH_CAP_GEXCL << CEPH_CAP_SFLOCK)
/* cap masks (for getattr) */
#define CEPH_STAT_CAP_INODE CEPH_CAP_PIN
......@@ -563,7 +576,8 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_FILE_EXCL)
#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
#define CEPH_CAP_ANY (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \
CEPH_CAP_ANY_FILE_WR | CEPH_CAP_PIN)
CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \
CEPH_CAP_PIN)
#define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
CEPH_LOCK_IXATTR)
......@@ -650,6 +664,16 @@ struct ceph_mds_lease {
/* client reconnect */
struct ceph_mds_cap_reconnect {
__le64 cap_id;
__le32 wanted;
__le32 issued;
__le64 snaprealm;
__le64 pathbase; /* base ino for our path to this ino */
__le32 flock_len; /* size of flock state blob, if any */
} __attribute__ ((packed));
/* followed by flock blob */
struct ceph_mds_cap_reconnect_v1 {
__le64 cap_id;
__le32 wanted;
__le32 issued;
......@@ -658,7 +682,6 @@ struct ceph_mds_cap_reconnect {
__le64 snaprealm;
__le64 pathbase; /* base ino for our path to this ino */
} __attribute__ ((packed));
/* followed by encoded string */
struct ceph_mds_snaprealm_reconnect {
__le64 ino; /* snap realm base */
......
#ifndef _FS_CEPH_HASH_H
#define _FS_CEPH_HASH_H
#ifndef FS_CEPH_HASH_H
#define FS_CEPH_HASH_H
#define CEPH_STR_HASH_LINUX 0x1 /* linux dcache hash */
#define CEPH_STR_HASH_RJENKINS 0x2 /* robert jenkins' */
......
......@@ -28,6 +28,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_TRUNCATE: return "truncate";
case CEPH_OSD_OP_ZERO: return "zero";
case CEPH_OSD_OP_WRITEFULL: return "writefull";
case CEPH_OSD_OP_ROLLBACK: return "rollback";
case CEPH_OSD_OP_APPEND: return "append";
case CEPH_OSD_OP_STARTSYNC: return "startsync";
......@@ -129,6 +130,8 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_LSSNAP: return "lssnap";
case CEPH_MDS_OP_MKSNAP: return "mksnap";
case CEPH_MDS_OP_RMSNAP: return "rmsnap";
case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
}
return "???";
}
......
#ifndef _CRUSH_CRUSH_H
#define _CRUSH_CRUSH_H
#ifndef CEPH_CRUSH_CRUSH_H
#define CEPH_CRUSH_CRUSH_H
#include <linux/types.h>
......
#ifndef _CRUSH_HASH_H
#define _CRUSH_HASH_H
#ifndef CEPH_CRUSH_HASH_H
#define CEPH_CRUSH_HASH_H
#define CRUSH_HASH_RJENKINS1 0
......
#ifndef _CRUSH_MAPPER_H
#define _CRUSH_MAPPER_H
#ifndef CEPH_CRUSH_MAPPER_H
#define CEPH_CRUSH_MAPPER_H
/*
* CRUSH functions for find rules and then mapping an input to an
......
......@@ -75,10 +75,11 @@ static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void)
return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
}
const u8 *aes_iv = "cephsageyudagreg";
static const u8 *aes_iv = (u8 *)CEPH_AES_IV;
int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
const void *src, size_t src_len)
static int ceph_aes_encrypt(const void *key, int key_len,
void *dst, size_t *dst_len,
const void *src, size_t src_len)
{
struct scatterlist sg_in[2], sg_out[1];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
......@@ -126,9 +127,10 @@ int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
const void *src1, size_t src1_len,
const void *src2, size_t src2_len)
static int ceph_aes_encrypt2(const void *key, int key_len, void *dst,
size_t *dst_len,
const void *src1, size_t src1_len,
const void *src2, size_t src2_len)
{
struct scatterlist sg_in[3], sg_out[1];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
......@@ -179,8 +181,9 @@ int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
const void *src, size_t src_len)
static int ceph_aes_decrypt(const void *key, int key_len,
void *dst, size_t *dst_len,
const void *src, size_t src_len)
{
struct scatterlist sg_in[1], sg_out[2];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
......@@ -238,10 +241,10 @@ int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
int ceph_aes_decrypt2(const void *key, int key_len,
void *dst1, size_t *dst1_len,
void *dst2, size_t *dst2_len,
const void *src, size_t src_len)
static int ceph_aes_decrypt2(const void *key, int key_len,
void *dst1, size_t *dst1_len,
void *dst2, size_t *dst2_len,
const void *src, size_t src_len)
{
struct scatterlist sg_in[1], sg_out[3];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
......
......@@ -42,7 +42,7 @@ extern int ceph_encrypt2(struct ceph_crypto_key *secret,
const void *src2, size_t src2_len);
/* armor.c */
extern int ceph_armor(char *dst, const void *src, const void *end);
extern int ceph_unarmor(void *dst, const char *src, const char *end);
extern int ceph_armor(char *dst, const char *src, const char *end);
extern int ceph_unarmor(char *dst, const char *src, const char *end);
#endif
......@@ -291,7 +291,7 @@ static int dentry_lru_show(struct seq_file *s, void *ptr)
return 0;
}
#define DEFINE_SHOW_FUNC(name) \
#define DEFINE_SHOW_FUNC(name) \
static int name##_open(struct inode *inode, struct file *file) \
{ \
struct seq_file *sf; \
......@@ -361,8 +361,8 @@ int ceph_debugfs_client_init(struct ceph_client *client)
int ret = 0;
char name[80];
snprintf(name, sizeof(name), FSID_FORMAT ".client%lld",
PR_FSID(&client->fsid), client->monc.auth->global_id);
snprintf(name, sizeof(name), "%pU.client%lld", &client->fsid,
client->monc.auth->global_id);
client->debugfs_dir = debugfs_create_dir(name, ceph_debugfs_dir);
if (!client->debugfs_dir)
......@@ -432,11 +432,12 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_caps)
goto out;
client->debugfs_congestion_kb = debugfs_create_file("writeback_congestion_kb",
0600,
client->debugfs_dir,
client,
&congestion_kb_fops);
client->debugfs_congestion_kb =
debugfs_create_file("writeback_congestion_kb",
0600,
client->debugfs_dir,
client,
&congestion_kb_fops);
if (!client->debugfs_congestion_kb)
goto out;
......@@ -466,7 +467,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
debugfs_remove(client->debugfs_dir);
}
#else // CONFIG_DEBUG_FS
#else /* CONFIG_DEBUG_FS */
int __init ceph_debugfs_init(void)
{
......@@ -486,4 +487,4 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
{
}
#endif // CONFIG_DEBUG_FS
#endif /* CONFIG_DEBUG_FS */
......@@ -99,11 +99,13 @@ static inline void ceph_encode_timespec(struct ceph_timespec *tv,
*/
static inline void ceph_encode_addr(struct ceph_entity_addr *a)
{
a->in_addr.ss_family = htons(a->in_addr.ss_family);
__be16 ss_family = htons(a->in_addr.ss_family);
a->in_addr.ss_family = *(__u16 *)&ss_family;
}
static inline void ceph_decode_addr(struct ceph_entity_addr *a)
{
a->in_addr.ss_family = ntohs(a->in_addr.ss_family);
__be16 ss_family = *(__be16 *)&a->in_addr.ss_family;
a->in_addr.ss_family = ntohs(ss_family);
WARN_ON(a->in_addr.ss_family == 512);
}
......
......@@ -27,7 +27,7 @@
const struct inode_operations ceph_dir_iops;
const struct file_operations ceph_dir_fops;
struct dentry_operations ceph_dentry_ops;
const struct dentry_operations ceph_dentry_ops;
/*
* Initialize ceph dentry state.
......@@ -94,6 +94,8 @@ static unsigned fpos_off(loff_t p)
*/
static int __dcache_readdir(struct file *filp,
void *dirent, filldir_t filldir)
__releases(inode->i_lock)
__acquires(inode->i_lock)
{
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_file_info *fi = filp->private_data;
......@@ -1239,16 +1241,16 @@ const struct inode_operations ceph_dir_iops = {
.create = ceph_create,
};
struct dentry_operations ceph_dentry_ops = {
const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_dentry_release,
};
struct dentry_operations ceph_snapdir_dentry_ops = {
const struct dentry_operations ceph_snapdir_dentry_ops = {
.d_revalidate = ceph_snapdir_d_revalidate,
.d_release = ceph_dentry_release,
};
struct dentry_operations ceph_snap_dentry_ops = {
const struct dentry_operations ceph_snap_dentry_ops = {
.d_release = ceph_dentry_release,
};
......@@ -317,7 +317,7 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
/*
* allocate a vector new pages
*/
struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
static struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
{
struct page **pages;
int i;
......@@ -665,7 +665,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
* throw out any page cache pages in this range. this
* may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
truncate_inode_pages_range(inode->i_mapping, pos,
(pos+len) | (PAGE_CACHE_SIZE-1));
} else {
pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
......@@ -740,28 +740,32 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
struct ceph_file_info *fi = filp->private_data;
loff_t *ppos = &iocb->ki_pos;
size_t len = iov->iov_len;
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
void *base = iov->iov_base;
void __user *base = iov->iov_base;
ssize_t ret;
int got = 0;
int want, got = 0;
int checkeof = 0, read = 0;
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), pos, (unsigned)len, inode);
again:
__ceph_do_pending_vmtruncate(inode);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE,
&got, -1);
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
if (ret < 0)
goto out;
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), pos, (unsigned)len,
ceph_cap_string(got));
if ((got & CEPH_CAP_FILE_CACHE) == 0 ||
if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_filp->f_flags & O_DIRECT) ||
(inode->i_sb->s_flags & MS_SYNCHRONOUS))
/* hmm, this isn't really async... */
......@@ -807,11 +811,12 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *file = iocb->ki_filp;
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc;
loff_t endoff = pos + iov->iov_len;
int got = 0;
int want, got = 0;
int ret, err;
if (ceph_snap(inode) != CEPH_NOSNAP)
......@@ -824,8 +829,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
inode->i_size);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
&got, endoff);
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
if (ret < 0)
goto out;
......@@ -833,7 +841,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
ceph_cap_string(got));
if ((got & CEPH_CAP_FILE_BUFFER) == 0 ||
if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_filp->f_flags & O_DIRECT) ||
(inode->i_sb->s_flags & MS_SYNCHRONOUS)) {
ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
......@@ -930,6 +938,8 @@ const struct file_operations ceph_file_fops = {
.aio_write = ceph_aio_write,
.mmap = ceph_mmap,
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
.splice_read = generic_file_splice_read,
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
......
......@@ -442,8 +442,9 @@ int ceph_fill_file_size(struct inode *inode, int issued,
* the file is either opened or mmaped
*/
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
CEPH_CAP_FILE_EXCL)) ||
CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
CEPH_CAP_FILE_EXCL|
CEPH_CAP_FILE_LAZYIO)) ||
mapping_mapped(inode->i_mapping) ||
__ceph_caps_file_wanted(ci)) {
ci->i_truncate_pending++;
......
......@@ -143,6 +143,27 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
return 0;
}
static long ceph_ioctl_lazyio(struct file *file)
{
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
spin_lock(&inode->i_lock);
ci->i_nr_by_mode[fi->fmode]--;
fi->fmode |= CEPH_FILE_MODE_LAZY;
ci->i_nr_by_mode[fi->fmode]++;
spin_unlock(&inode->i_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
ceph_check_caps(ci, 0, NULL);
} else {
dout("ioctl_layzio: file %p already lazy\n", file);
}
return 0;
}
long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg);
......@@ -155,6 +176,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
case CEPH_IOC_GET_DATALOC:
return ceph_ioctl_get_dataloc(file, (void __user *)arg);
case CEPH_IOC_LAZYIO:
return ceph_ioctl_lazyio(file);
}
return -ENOTTY;
}
......@@ -37,4 +37,6 @@ struct ceph_ioctl_dataloc {
#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \
struct ceph_ioctl_dataloc)
#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
#endif
#include "ceph_debug.h"
#include <linux/file.h>
#include <linux/namei.h>
#include "super.h"
#include "mds_client.h"
#include "pagelist.h"
/**
* Implement fcntl and flock locking functions.
*/
static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
u64 pid, u64 pid_ns,
int cmd, u64 start, u64 length, u8 wait)
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_mds_client *mdsc =
&ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req;
int err;
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d", (int)lock_type,
(int)operation, pid, start, length, wait, cmd);
req->r_args.filelock_change.rule = lock_type;
req->r_args.filelock_change.type = cmd;
req->r_args.filelock_change.pid = cpu_to_le64(pid);
/* This should be adjusted, but I'm not sure if
namespaces actually get id numbers*/
req->r_args.filelock_change.pid_namespace =
cpu_to_le64((u64)pid_ns);
req->r_args.filelock_change.start = cpu_to_le64(start);
req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait;
err = ceph_mdsc_do_request(mdsc, inode, req);
ceph_mdsc_put_request(req);
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d err code %d", (int)lock_type,
(int)operation, pid, start, length, wait, cmd, err);
return err;
}
/**
* Attempt to set an fcntl lock.
* For now, this just goes away to the server. Later it may be more awesome.
*/
int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
{
u64 length;
u8 lock_cmd;
int err;
u8 wait = 0;
u16 op = CEPH_MDS_OP_SETFILELOCK;
fl->fl_nspid = get_pid(task_tgid(current));
dout("ceph_lock, fl_pid:%d", fl->fl_pid);
/* set wait bit as appropriate, then make command as Ceph expects it*/
if (F_SETLKW == cmd)
wait = 1;
if (F_GETLK == cmd)
op = CEPH_MDS_OP_GETFILELOCK;
if (F_RDLCK == fl->fl_type)
lock_cmd = CEPH_LOCK_SHARED;
else if (F_WRLCK == fl->fl_type)
lock_cmd = CEPH_LOCK_EXCL;
else
lock_cmd = CEPH_LOCK_UNLOCK;
if (LLONG_MAX == fl->fl_end)
length = 0;
else
length = fl->fl_end - fl->fl_start + 1;
err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
(u64)fl->fl_pid, (u64)fl->fl_nspid,
lock_cmd, fl->fl_start,
length, wait);
if (!err) {
dout("mds locked, locking locally");
err = posix_lock_file(file, fl, NULL);
if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
/* undo! This should only happen if the kernel detects
* local deadlock. */
ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
(u64)fl->fl_pid, (u64)fl->fl_nspid,
CEPH_LOCK_UNLOCK, fl->fl_start,
length, 0);
dout("got %d on posix_lock_file, undid lock", err);
}
} else {
dout("mds returned error code %d", err);
}
return err;
}
int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
{
u64 length;
u8 lock_cmd;
int err;
u8 wait = 1;
fl->fl_nspid = get_pid(task_tgid(current));
dout("ceph_flock, fl_pid:%d", fl->fl_pid);
/* set wait bit, then clear it out of cmd*/
if (cmd & LOCK_NB)
wait = 0;
cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN);
/* set command sequence that Ceph wants to see:
shared lock, exclusive lock, or unlock */
if (LOCK_SH == cmd)
lock_cmd = CEPH_LOCK_SHARED;
else if (LOCK_EX == cmd)
lock_cmd = CEPH_LOCK_EXCL;
else
lock_cmd = CEPH_LOCK_UNLOCK;
/* mds requires start and length rather than start and end */
if (LLONG_MAX == fl->fl_end)
length = 0;
else
length = fl->fl_end - fl->fl_start + 1;
err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
file, (u64)fl->fl_pid, (u64)fl->fl_nspid,
lock_cmd, fl->fl_start,
length, wait);
if (!err) {
err = flock_lock_file_wait(file, fl);
if (err) {
ceph_lock_message(CEPH_LOCK_FLOCK,
CEPH_MDS_OP_SETFILELOCK,
file, (u64)fl->fl_pid,
(u64)fl->fl_nspid,
CEPH_LOCK_UNLOCK, fl->fl_start,
length, 0);
dout("got %d on flock_lock_file_wait, undid lock", err);
}
} else {
dout("mds error code %d", err);
}
return err;
}
/**
* Must be called with BKL already held. Fills in the passed
* counter variables, so you can prepare pagelist metadata before calling
* ceph_encode_locks.
*/
void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
{
struct file_lock *lock;
*fcntl_count = 0;
*flock_count = 0;
for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
if (lock->fl_flags & FL_POSIX)
++(*fcntl_count);
else if (lock->fl_flags & FL_FLOCK)
++(*flock_count);
}
dout("counted %d flock locks and %d fcntl locks",
*flock_count, *fcntl_count);
}
/**
* Encode the flock and fcntl locks for the given inode into the pagelist.
* Format is: #fcntl locks, sequential fcntl locks, #flock locks,
* sequential flock locks.
* Must be called with BLK already held, and the lock numbers should have
* been gathered under the same lock holding window.
*/
int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
int num_fcntl_locks, int num_flock_locks)
{
struct file_lock *lock;
struct ceph_filelock cephlock;
int err = 0;
dout("encoding %d flock and %d fcntl locks", num_flock_locks,
num_fcntl_locks);
err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32));
if (err)
goto fail;
for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
if (lock->fl_flags & FL_POSIX) {
err = lock_to_ceph_filelock(lock, &cephlock);
if (err)
goto fail;
err = ceph_pagelist_append(pagelist, &cephlock,
sizeof(struct ceph_filelock));
}
if (err)
goto fail;
}
err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32));
if (err)
goto fail;
for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
if (lock->fl_flags & FL_FLOCK) {
err = lock_to_ceph_filelock(lock, &cephlock);
if (err)
goto fail;
err = ceph_pagelist_append(pagelist, &cephlock,
sizeof(struct ceph_filelock));
}
if (err)
goto fail;
}
fail:
return err;
}
/*
* Given a pointer to a lock, convert it to a ceph filelock
*/
int lock_to_ceph_filelock(struct file_lock *lock,
struct ceph_filelock *cephlock)
{
int err = 0;
cephlock->start = cpu_to_le64(lock->fl_start);
cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
cephlock->client = cpu_to_le64(0);
cephlock->pid = cpu_to_le64(lock->fl_pid);
cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid);
switch (lock->fl_type) {
case F_RDLCK:
cephlock->type = CEPH_LOCK_SHARED;
break;
case F_WRLCK:
cephlock->type = CEPH_LOCK_EXCL;
break;
case F_UNLCK:
cephlock->type = CEPH_LOCK_UNLOCK;
break;
default:
dout("Have unknown lock type %d", lock->fl_type);
err = -EINVAL;
}
return err;
}
......@@ -3,6 +3,7 @@
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/smp_lock.h>
#include "mds_client.h"
#include "mon_client.h"
......@@ -37,6 +38,11 @@
* are no longer valid.
*/
struct ceph_reconnect_state {
struct ceph_pagelist *pagelist;
bool flock;
};
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
......@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req->r_path1);
kfree(req->r_path2);
put_request_session(req);
ceph_unreserve_caps(&req->r_caps_reservation);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
kfree(req);
}
......@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
{
req->r_tid = ++mdsc->last_tid;
if (req->r_num_caps)
ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
ceph_reserve_caps(mdsc, &req->r_caps_reservation,
req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
__insert_request(mdsc, req);
......@@ -703,6 +710,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
return 0;
}
/*
* open sessions for any export targets for the given mds
*
* called under mdsc->mutex
*/
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_mds_info *mi;
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
int target;
if (mds >= mdsc->mdsmap->m_max_mds)
return;
mi = &mdsc->mdsmap->m_info[mds];
dout("open_export_target_sessions for mds%d (%d targets)\n",
session->s_mds, mi->num_export_targets);
for (i = 0; i < mi->num_export_targets; i++) {
target = mi->export_targets[i];
ts = __ceph_lookup_mds_session(mdsc, target);
if (!ts) {
ts = register_session(mdsc, target);
if (IS_ERR(ts))
return;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING)
__open_session(mdsc, session);
else
dout(" mds%d target mds%d %p is %s\n", session->s_mds,
i, ts, session_state_name(ts->s_state));
ceph_put_mds_session(ts);
}
}
void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
mutex_lock(&mdsc->mutex);
__open_export_target_sessions(mdsc, session);
mutex_unlock(&mdsc->mutex);
}
/*
* session caps
*/
......@@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
last_inode = NULL;
}
if (old_cap) {
ceph_put_cap(old_cap);
ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
......@@ -793,7 +845,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
if (last_inode)
iput(last_inode);
if (old_cap)
ceph_put_cap(old_cap);
ceph_put_cap(session->s_mdsc, old_cap);
return ret;
}
......@@ -1067,15 +1119,16 @@ static int trim_caps(struct ceph_mds_client *mdsc,
* Called under s_mutex.
*/
int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
int extra)
struct ceph_mds_session *session)
{
struct ceph_msg *msg;
struct ceph_msg *msg, *partial = NULL;
struct ceph_mds_cap_release *head;
int err = -ENOMEM;
int extra = mdsc->client->mount_args->cap_release_safety;
int num;
if (extra < 0)
extra = mdsc->client->mount_args->cap_release_safety;
dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
extra);
spin_lock(&session->s_cap_lock);
......@@ -1084,9 +1137,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_msg,
list_head);
head = msg->front.iov_base;
extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
num = le32_to_cpu(head->num);
if (num) {
dout(" partial %p with (%d/%d)\n", msg, num,
(int)CEPH_CAPS_PER_RELEASE);
extra += CEPH_CAPS_PER_RELEASE - num;
partial = msg;
}
}
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
......@@ -1103,19 +1161,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
}
if (!list_empty(&session->s_cap_releases)) {
msg = list_first_entry(&session->s_cap_releases,
struct ceph_msg,
list_head);
head = msg->front.iov_base;
if (head->num) {
dout(" queueing non-full %p (%d)\n", msg,
le32_to_cpu(head->num));
list_move_tail(&msg->list_head,
&session->s_cap_releases_done);
session->s_num_cap_releases -=
CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
}
if (partial) {
head = partial->front.iov_base;
num = le32_to_cpu(head->num);
dout(" queueing partial %p with %d/%d\n", partial, num,
(int)CEPH_CAPS_PER_RELEASE);
list_move_tail(&partial->list_head,
&session->s_cap_releases_done);
session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
}
err = 0;
spin_unlock(&session->s_cap_lock);
......@@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
return ERR_PTR(-ENOMEM);
mutex_init(&req->r_fill_mutex);
req->r_mdsc = mdsc;
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
......@@ -1580,6 +1634,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_mds = mds;
req->r_attempts++;
if (req->r_inode) {
struct ceph_cap *cap =
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
if (cap)
req->r_sent_on_mseq = cap->mseq;
else
req->r_sent_on_mseq = -1;
}
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
......@@ -1914,21 +1977,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
result = le32_to_cpu(head->result);
/*
* Tolerate 2 consecutive ESTALEs from the same mds.
* FIXME: we should be looking at the cap migrate_seq.
* Handle an ESTALE
* if we're not talking to the authority, send to them
* if the authority has changed while we weren't looking,
* send to new authority
* Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
req->r_direct_mode = USE_AUTH_MDS;
req->r_num_stale++;
if (req->r_num_stale <= 2) {
dout("got ESTALE on request %llu", req->r_tid);
if (!req->r_inode) {
/* do nothing; not an authority problem */
} else if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
} else {
struct ceph_inode_info *ci = ceph_inode(req->r_inode);
struct ceph_cap *cap =
ceph_get_cap_for_mds(ci, req->r_mds);;
dout("already using auth");
if ((!cap || cap != ci->i_auth_cap) ||
(cap->mseq != req->r_sent_on_mseq)) {
dout("but cap changed, so resending");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
}
}
} else {
req->r_num_stale = 0;
dout("have to return ESTALE on request %llu", req->r_tid);
}
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
......@@ -1985,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (err == 0) {
if (result == 0 && rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(&req->r_caps_reservation);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
mutex_unlock(&req->r_fill_mutex);
......@@ -2005,7 +2087,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
mutex_unlock(&mdsc->mutex);
ceph_add_cap_releases(mdsc, req->r_session, -1);
ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
......@@ -2193,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_mds_cap_reconnect rec;
union {
struct ceph_mds_cap_reconnect v2;
struct ceph_mds_cap_reconnect_v1 v1;
} rec;
size_t reclen;
struct ceph_inode_info *ci;
struct ceph_pagelist *pagelist = arg;
struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
......@@ -2228,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
rec.cap_id = cpu_to_le64(cap->cap_id);
rec.pathbase = cpu_to_le64(pathbase);
rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.issued = cpu_to_le32(cap->issued);
rec.size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
ceph_encode_timespec(&rec.atime, &inode->i_atime);
rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
if (recon_state->flock) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v2.pathbase = cpu_to_le64(pathbase);
rec.v2.flock_len = 0;
reclen = sizeof(rec.v2);
} else {
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v1.issued = cpu_to_le32(cap->issued);
rec.v1.size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v1.pathbase = cpu_to_le64(pathbase);
reclen = sizeof(rec.v1);
}
spin_unlock(&inode->i_lock);
err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
if (recon_state->flock) {
int num_fcntl_locks, num_flock_locks;
lock_kernel();
ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
rec.v2.flock_len = (2*sizeof(u32) +
(num_fcntl_locks+num_flock_locks) *
sizeof(struct ceph_filelock));
err = ceph_pagelist_append(pagelist, &rec, reclen);
if (!err)
err = ceph_encode_locks(inode, pagelist,
num_fcntl_locks,
num_flock_locks);
unlock_kernel();
}
out:
kfree(path);
......@@ -2267,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
int mds = session->s_mds;
int err = -ENOMEM;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
pr_info("mds%d reconnect start\n", mds);
......@@ -2301,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
err = iterate_session_caps(session, encode_caps_cb, pagelist);
recon_state.pagelist = pagelist;
recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
......@@ -2326,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
}
reply->pagelist = pagelist;
if (recon_state.flock)
reply->hdr.version = cpu_to_le16(2);
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
......@@ -2376,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
dout("check_new_map mds%d state %s -> %s (session %s)\n",
dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
i, ceph_mds_state_name(oldstate),
ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
ceph_mds_state_name(newstate),
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
......@@ -2428,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
wake_up_session_caps(s, 1);
}
}
for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i];
if (!s)
continue;
if (!ceph_mdsmap_is_laggy(newmap, i))
continue;
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG ||
s->s_state == CEPH_MDS_SESSION_CLOSING) {
dout(" connecting to export targets of laggy mds%d\n",
i);
__open_export_target_sessions(mdsc, s);
}
}
}
......@@ -2715,7 +2852,7 @@ static void delayed_work(struct work_struct *work)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
ceph_add_cap_releases(mdsc, s, -1);
ceph_add_cap_releases(mdsc, s);
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(mdsc, s);
......@@ -2764,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
ceph_caps_init(mdsc);
ceph_adjust_min_caps(mdsc, client->min_caps);
return 0;
}
......@@ -2959,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
ceph_caps_finalize(mdsc);
}
......
......@@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request {
u64 r_tid; /* transaction id */
struct rb_node r_node;
struct ceph_mds_client *r_mdsc;
int r_op; /* mds op code */
int r_mds;
......@@ -207,8 +208,8 @@ struct ceph_mds_request {
int r_attempts; /* resend attempts */
int r_num_fwd; /* number of forward attempts */
int r_num_stale;
int r_resend_mds; /* mds to resend to next, if any*/
u32 r_sent_on_mseq; /* cap mseq request was sent at*/
struct kref r_kref;
struct list_head r_wait;
......@@ -267,6 +268,27 @@ struct ceph_mds_client {
spinlock_t cap_dirty_lock; /* protects above items */
wait_queue_head_t cap_flushing_wq;
/*
* Cap reservations
*
* Maintain a global pool of preallocated struct ceph_caps, referenced
* by struct ceph_caps_reservations. This ensures that we preallocate
* memory needed to successfully process an MDS response. (If an MDS
* sends us cap information and we fail to process it, we will have
* problems due to the client and MDS being out of sync.)
*
* Reservations are 'owned' by a ceph_cap_reservation context.
*/
spinlock_t caps_list_lock;
struct list_head caps_list; /* unused (reserved or
unreserved) */
int caps_total_count; /* total caps allocated */
int caps_use_count; /* in use */
int caps_reserve_count; /* unused, reserved */
int caps_avail_count; /* unused, unreserved */
int caps_min_count; /* keep at least this many
(unreserved) */
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;
#endif
......@@ -324,8 +346,7 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
}
extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
int extra);
struct ceph_mds_session *session);
extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
......@@ -343,4 +364,7 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
#endif
......@@ -85,6 +85,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
struct ceph_entity_addr addr;
u32 num_export_targets;
void *pexport_targets = NULL;
struct ceph_timespec laggy_since;
ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
global_id = ceph_decode_64(p);
......@@ -103,7 +104,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
state_seq = ceph_decode_64(p);
ceph_decode_copy(p, &addr, sizeof(addr));
ceph_decode_addr(&addr);
*p += sizeof(struct ceph_timespec);
ceph_decode_copy(p, &laggy_since, sizeof(laggy_since));
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
*p += namelen;
......@@ -122,6 +123,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_info[mds].global_id = global_id;
m->m_info[mds].state = state;
m->m_info[mds].addr = addr;
m->m_info[mds].laggy =
(laggy_since.tv_sec != 0 ||
laggy_since.tv_nsec != 0);
m->m_info[mds].num_export_targets = num_export_targets;
if (num_export_targets) {
m->m_info[mds].export_targets =
......
......@@ -13,6 +13,7 @@ struct ceph_mds_info {
struct ceph_entity_addr addr;
s32 state;
int num_export_targets;
bool laggy;
u32 *export_targets;
};
......@@ -47,6 +48,13 @@ static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
return m->m_info[w].state;
}
static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
{
if (w >= 0 && w < m->m_max_mds)
return m->m_info[w].laggy;
return false;
}
extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
......
......@@ -108,7 +108,7 @@ void ceph_msgr_exit(void)
destroy_workqueue(ceph_msgr_wq);
}
void ceph_msgr_flush()
void ceph_msgr_flush(void)
{
flush_workqueue(ceph_msgr_wq);
}
......@@ -647,7 +647,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED_CLIENT);
con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED);
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
......@@ -1081,11 +1081,11 @@ static int process_banner(struct ceph_connection *con)
sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr.in_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
pr_warning("wrong peer, want %s/%lld, got %s/%lld\n",
pr_warning("wrong peer, want %s/%d, got %s/%d\n",
pr_addr(&con->peer_addr.in_addr),
le64_to_cpu(con->peer_addr.nonce),
(int)le32_to_cpu(con->peer_addr.nonce),
pr_addr(&con->actual_peer_addr.in_addr),
le64_to_cpu(con->actual_peer_addr.nonce));
(int)le32_to_cpu(con->actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
......@@ -1123,8 +1123,8 @@ static void fail_protocol(struct ceph_connection *con)
static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
u64 sup_feat = CEPH_FEATURE_SUPPORTED;
u64 req_feat = CEPH_FEATURE_REQUIRED;
u64 server_feat = le64_to_cpu(con->in_reply.features);
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
......@@ -1302,8 +1302,8 @@ static void process_ack(struct ceph_connection *con)
static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
struct kvec *section,
unsigned int sec_len, u32 *crc)
{
int left;
int ret;
......@@ -1434,7 +1434,8 @@ static int read_partial_message(struct ceph_connection *con)
/* middle */
if (m->middle) {
ret = read_partial_message_section(con, &m->middle->vec, middle_len,
ret = read_partial_message_section(con, &m->middle->vec,
middle_len,
&con->in_middle_crc);
if (ret <= 0)
return ret;
......@@ -1920,7 +1921,7 @@ static void ceph_fault(struct ceph_connection *con)
/*
* in case we faulted due to authentication, invalidate our
* current tickets so that we can get new ones.
*/
*/
if (con->auth_retry && con->ops->invalidate_authorizer) {
dout("calling invalidate_authorizer()\n");
con->ops->invalidate_authorizer(con);
......
......@@ -349,7 +349,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
}
/*
* statfs
* generic requests (e.g., statfs, poolop)
*/
static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid)
......@@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
static int do_generic_request(struct ceph_mon_client *monc,
struct ceph_mon_generic_request *req)
{
int err;
/* register request */
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
ceph_con_send(monc->con, ceph_msg_get(req->request));
mutex_unlock(&monc->mutex);
err = wait_for_completion_interruptible(&req->completion);
mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
mutex_unlock(&monc->mutex);
if (!err)
err = req->result;
return err;
}
/*
* statfs
*/
static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
......@@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
return;
bad:
pr_err("corrupt generic reply, no tid\n");
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
......@@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
kref_init(&req->kref);
req->buf = buf;
req->buf_len = sizeof(*buf);
init_completion(&req->completion);
err = -ENOMEM;
......@@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
/* register request */
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
mutex_unlock(&monc->mutex);
err = do_generic_request(monc, req);
/* send request and wait */
ceph_con_send(monc->con, ceph_msg_get(req->request));
err = wait_for_completion_interruptible(&req->completion);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
/*
* pool ops
*/
static int get_poolop_reply_buf(const char *src, size_t src_len,
char *dst, size_t dst_len)
{
u32 buf_len;
if (src_len != sizeof(u32) + dst_len)
return -EINVAL;
buf_len = le32_to_cpu(*(u32 *)src);
if (buf_len != dst_len)
return -EINVAL;
memcpy(dst, src + sizeof(u32), dst_len);
return 0;
}
static void handle_poolop_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*reply))
goto bad;
dout("handle_poolop_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
req = __lookup_generic_req(monc, tid);
if (req) {
if (req->buf_len &&
get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
msg->front.iov_len - sizeof(*reply),
req->buf, req->buf_len) < 0) {
mutex_unlock(&monc->mutex);
goto bad;
}
req->result = le32_to_cpu(reply->reply_code);
get_generic_request(req);
}
mutex_unlock(&monc->mutex);
if (req) {
complete(&req->completion);
put_generic_request(req);
}
return;
if (!err)
err = req->result;
bad:
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
/*
* Do a synchronous pool op.
*/
int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop *h;
int err;
req = kzalloc(sizeof(*req), GFP_NOFS);
if (!req)
return -ENOMEM;
kref_init(&req->kref);
req->buf = buf;
req->buf_len = len;
init_completion(&req->completion);
err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
if (!req->request)
goto out;
req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
if (!req->reply)
goto out;
/* fill out request */
req->request->hdr.version = cpu_to_le16(2);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->pool = cpu_to_le32(pool);
h->op = cpu_to_le32(op);
h->auid = 0;
h->snapid = cpu_to_le64(snapid);
h->name_len = 0;
err = do_generic_request(monc, req);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, 0, 0);
}
/*
* Resend pending statfs requests.
* Resend pending generic requests.
*/
static void __resend_generic_request(struct ceph_mon_client *monc)
{
......@@ -783,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;
case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg);
break;
case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg);
break;
......@@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack);
break;
case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
......
......@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
struct rb_node node;
int result;
void *buf;
int buf_len;
struct completion completion;
struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */
......@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid);
extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid);
#endif
#ifndef __MSGR_H
#define __MSGR_H
#ifndef CEPH_MSGR_H
#define CEPH_MSGR_H
/*
* Data types for message passing layer used by Ceph.
......
......@@ -1276,8 +1276,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
/* it may be a short read due to an object boundary */
req->r_pages = pages;
num_pages = calc_pages_for(off, *plen);
req->r_num_pages = num_pages;
dout("readpages final extent is %llu~%llu (%d pages)\n",
off, *plen, req->r_num_pages);
......@@ -1319,7 +1317,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
/* it may be a short write due to an object boundary */
req->r_pages = pages;
req->r_num_pages = calc_pages_for(off, len);
dout("writepages %llu~%llu (%d pages)\n", off, len,
req->r_num_pages);
......@@ -1476,8 +1473,8 @@ static void put_osd_con(struct ceph_connection *con)
* authentication
*/
static int get_authorizer(struct ceph_connection *con,
void **buf, int *len, int *proto,
void **reply_buf, int *reply_len, int force_new)
void **buf, int *len, int *proto,
void **reply_buf, int *reply_len, int force_new)
{
struct ceph_osd *o = con->private;
struct ceph_osd_client *osdc = o->o_osdc;
......@@ -1497,7 +1494,7 @@ static int get_authorizer(struct ceph_connection *con,
&o->o_authorizer_reply_buf,
&o->o_authorizer_reply_buf_len);
if (ret)
return ret;
return ret;
}
*proto = ac->protocol;
......
......@@ -424,12 +424,30 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
kfree(pi);
}
void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
{
unsigned n, m;
ceph_decode_copy(p, &pi->v, sizeof(pi->v));
calc_pg_masks(pi);
*p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
/* num_snaps * snap_info_t */
n = le32_to_cpu(pi->v.num_snaps);
while (n--) {
ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) +
sizeof(struct ceph_timespec), bad);
*p += sizeof(u64) + /* key */
1 + sizeof(u64) + /* u8, snapid */
sizeof(struct ceph_timespec);
m = ceph_decode_32(p); /* snap name */
*p += m;
}
*p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
return 0;
bad:
return -EINVAL;
}
static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
......@@ -571,7 +589,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
kfree(pi);
goto bad;
}
__decode_pool(p, pi);
err = __decode_pool(p, end, pi);
if (err < 0)
goto bad;
__insert_pg_pool(&map->pg_pools, pi);
}
......@@ -760,7 +780,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
pi->id = pool;
__insert_pg_pool(&map->pg_pools, pi);
}
__decode_pool(p, pi);
err = __decode_pool(p, end, pi);
if (err < 0)
goto bad;
}
if (version >= 5 && __decode_pool_names(p, end, map) < 0)
goto bad;
......@@ -833,7 +855,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
node)->pgid, pgid) <= 0) {
struct ceph_pg_mapping *cur =
rb_entry(rbp, struct ceph_pg_mapping, node);
rbp = rb_next(rbp);
dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid);
rb_erase(&cur->node, &map->pg_temp);
......@@ -1026,8 +1048,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
pool->v.type, pool->v.size);
if (ruleno < 0) {
pr_err("no crush rule pool %d type %d size %d\n",
poolid, pool->v.type, pool->v.size);
pr_err("no crush rule pool %d ruleset %d type %d size %d\n",
poolid, pool->v.crush_ruleset, pool->v.type,
pool->v.size);
return NULL;
}
......
#ifndef __RADOS_H
#define __RADOS_H
#ifndef CEPH_RADOS_H
#define CEPH_RADOS_H
/*
* Data types for the Ceph distributed object storage layer RADOS
......@@ -203,6 +203,7 @@ enum {
CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12,
CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
/** attrs **/
/* read */
......@@ -272,6 +273,10 @@ static inline int ceph_osd_op_mode_modify(int op)
return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
}
/*
* note that the following tmap stuff is also defined in the ceph librados.h
* any modification here needs to be updated there
*/
#define CEPH_OSD_TMAP_HDR 'h'
#define CEPH_OSD_TMAP_SET 's'
#define CEPH_OSD_TMAP_RM 'r'
......@@ -297,6 +302,7 @@ enum {
CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */
CEPH_OSD_FLAG_PGOP = 1024, /* pg op, no object */
CEPH_OSD_FLAG_EXEC = 2048, /* op may exec */
CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */
};
enum {
......@@ -350,6 +356,9 @@ struct ceph_osd_op {
struct {
__le64 cookie, count;
} __attribute__ ((packed)) pgls;
struct {
__le64 snapid;
} __attribute__ ((packed)) snap;
};
__le32 payload_len;
} __attribute__ ((packed));
......
......@@ -2,6 +2,7 @@
#include "ceph_debug.h"
#include <linux/backing-dev.h>
#include <linux/ctype.h>
#include <linux/fs.h>
#include <linux/inet.h>
#include <linux/in6.h>
......@@ -101,12 +102,21 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
}
static int ceph_syncfs(struct super_block *sb, int wait)
static int ceph_sync_fs(struct super_block *sb, int wait)
{
dout("sync_fs %d\n", wait);
struct ceph_client *client = ceph_sb_to_client(sb);
if (!wait) {
dout("sync_fs (non-blocking)\n");
ceph_flush_dirty_caps(&client->mdsc);
dout("sync_fs (non-blocking) done\n");
return 0;
}
dout("sync_fs (blocking)\n");
ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
dout("sync_fs %d done\n", wait);
dout("sync_fs (blocking) done\n");
return 0;
}
......@@ -150,9 +160,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
struct ceph_mount_args *args = client->mount_args;
if (args->flags & CEPH_OPT_FSID)
seq_printf(m, ",fsidmajor=%llu,fsidminor%llu",
le64_to_cpu(*(__le64 *)&args->fsid.fsid[0]),
le64_to_cpu(*(__le64 *)&args->fsid.fsid[8]));
seq_printf(m, ",fsid=%pU", &args->fsid);
if (args->flags & CEPH_OPT_NOSHARE)
seq_puts(m, ",noshare");
if (args->flags & CEPH_OPT_DIRSTAT)
......@@ -279,7 +287,7 @@ static const struct super_operations ceph_super_ops = {
.alloc_inode = ceph_alloc_inode,
.destroy_inode = ceph_destroy_inode,
.write_inode = ceph_write_inode,
.sync_fs = ceph_syncfs,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
.show_options = ceph_show_options,
.statfs = ceph_statfs,
......@@ -322,9 +330,6 @@ const char *ceph_msg_type_name(int type)
* mount options
*/
enum {
Opt_fsidmajor,
Opt_fsidminor,
Opt_monport,
Opt_wsize,
Opt_rsize,
Opt_osdtimeout,
......@@ -339,6 +344,7 @@ enum {
Opt_congestion_kb,
Opt_last_int,
/* int args above */
Opt_fsid,
Opt_snapdirname,
Opt_name,
Opt_secret,
......@@ -355,9 +361,6 @@ enum {
};
static match_table_t arg_tokens = {
{Opt_fsidmajor, "fsidmajor=%ld"},
{Opt_fsidminor, "fsidminor=%ld"},
{Opt_monport, "monport=%d"},
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
{Opt_osdtimeout, "osdtimeout=%d"},
......@@ -371,6 +374,7 @@ static match_table_t arg_tokens = {
{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
{Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */
{Opt_fsid, "fsid=%s"},
{Opt_snapdirname, "snapdirname=%s"},
{Opt_name, "name=%s"},
{Opt_secret, "secret=%s"},
......@@ -386,6 +390,36 @@ static match_table_t arg_tokens = {
{-1, NULL}
};
static int parse_fsid(const char *str, struct ceph_fsid *fsid)
{
int i = 0;
char tmp[3];
int err = -EINVAL;
int d;
dout("parse_fsid '%s'\n", str);
tmp[2] = 0;
while (*str && i < 16) {
if (ispunct(*str)) {
str++;
continue;
}
if (!isxdigit(str[0]) || !isxdigit(str[1]))
break;
tmp[0] = str[0];
tmp[1] = str[1];
if (sscanf(tmp, "%x", &d) < 1)
break;
fsid->fsid[i] = d & 0xff;
i++;
str += 2;
}
if (i == 16)
err = 0;
dout("parse_fsid ret %d got fsid %pU", err, fsid);
return err;
}
static struct ceph_mount_args *parse_mount_args(int flags, char *options,
const char *dev_name,
......@@ -469,12 +503,6 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
dout("got token %d\n", token);
}
switch (token) {
case Opt_fsidmajor:
*(__le64 *)&args->fsid.fsid[0] = cpu_to_le64(intval);
break;
case Opt_fsidminor:
*(__le64 *)&args->fsid.fsid[8] = cpu_to_le64(intval);
break;
case Opt_ip:
err = ceph_parse_ips(argstr[0].from,
argstr[0].to,
......@@ -485,6 +513,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
args->flags |= CEPH_OPT_MYIP;
break;
case Opt_fsid:
err = parse_fsid(argstr[0].from, &args->fsid);
if (err == 0)
args->flags |= CEPH_OPT_FSID;
break;
case Opt_snapdirname:
kfree(args->snapdir_name);
args->snapdir_name = kstrndup(argstr[0].from,
......@@ -515,6 +548,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
case Opt_osdkeepalivetimeout:
args->osd_keepalive_timeout = intval;
break;
case Opt_osd_idle_ttl:
args->osd_idle_ttl = intval;
break;
case Opt_mount_timeout:
args->mount_timeout = intval;
break;
......@@ -630,7 +666,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
/* caps */
client->min_caps = args->max_readdir;
ceph_adjust_min_caps(client->min_caps);
/* subsystems */
err = ceph_monc_init(&client->monc, client);
......@@ -680,8 +715,6 @@ static void ceph_destroy_client(struct ceph_client *client)
ceph_monc_stop(&client->monc);
ceph_adjust_min_caps(-client->min_caps);
ceph_debugfs_client_cleanup(client);
destroy_workqueue(client->wb_wq);
destroy_workqueue(client->pg_inv_wq);
......@@ -706,13 +739,13 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
{
if (client->have_fsid) {
if (ceph_fsid_compare(&client->fsid, fsid)) {
pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT,
PR_FSID(&client->fsid), PR_FSID(fsid));
pr_err("bad fsid, had %pU got %pU",
&client->fsid, fsid);
return -1;
}
} else {
pr_info("client%lld fsid " FSID_FORMAT "\n",
client->monc.auth->global_id, PR_FSID(fsid));
pr_info("client%lld fsid %pU\n", client->monc.auth->global_id,
fsid);
memcpy(&client->fsid, fsid, sizeof(*fsid));
ceph_debugfs_client_init(client);
client->have_fsid = true;
......@@ -1043,8 +1076,6 @@ static int __init init_ceph(void)
if (ret)
goto out_msgr;
ceph_caps_init();
ret = register_filesystem(&ceph_fs_type);
if (ret)
goto out_icache;
......@@ -1069,7 +1100,6 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
ceph_caps_finalize();
destroy_caches();
ceph_msgr_exit();
ceph_debugfs_cleanup();
......
......@@ -30,6 +30,12 @@
#define CEPH_BLOCK_SHIFT 20 /* 1 MB */
#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT)
/*
* Supported features
*/
#define CEPH_FEATURE_SUPPORTED CEPH_FEATURE_NOSRCADDR | CEPH_FEATURE_FLOCK
#define CEPH_FEATURE_REQUIRED CEPH_FEATURE_NOSRCADDR
/*
* mount options
*/
......@@ -560,11 +566,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
/* what the mds thinks we want */
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
extern void ceph_caps_init(void);
extern void ceph_caps_finalize(void);
extern void ceph_adjust_min_caps(int delta);
extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
extern void ceph_caps_init(struct ceph_mds_client *mdsc);
extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need);
extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx);
extern void ceph_reservation_status(struct ceph_client *client,
int *total, int *avail, int *used,
int *reserved, int *min);
......@@ -738,13 +746,6 @@ extern struct kmem_cache *ceph_file_cachep;
extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
"%02x%02x%02x%02x%02x%02x"
#define PR_FSID(f) (f)->fsid[0], (f)->fsid[1], (f)->fsid[2], (f)->fsid[3], \
(f)->fsid[4], (f)->fsid[5], (f)->fsid[6], (f)->fsid[7], \
(f)->fsid[8], (f)->fsid[9], (f)->fsid[10], (f)->fsid[11], \
(f)->fsid[12], (f)->fsid[13], (f)->fsid[14], (f)->fsid[15]
/* inode.c */
extern const struct inode_operations ceph_file_iops;
......@@ -806,13 +807,16 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
__ceph_remove_cap(cap);
spin_unlock(&inode->i_lock);
}
extern void ceph_put_cap(struct ceph_cap *cap);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, int datasync);
extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
int mds);
extern int ceph_get_cap_mds(struct inode *inode);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
......@@ -857,7 +861,7 @@ extern void ceph_release_page_vector(struct page **pages, int num_pages);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct inode_operations ceph_dir_iops;
extern struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops;
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
......@@ -888,6 +892,14 @@ extern void ceph_debugfs_cleanup(void);
extern int ceph_debugfs_client_init(struct ceph_client *client);
extern void ceph_debugfs_client_cleanup(struct ceph_client *client);
/* locks.c */
extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
int p_locks, int f_locks);
extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
{
if (dentry && dentry->d_parent)
......
......@@ -337,6 +337,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
}
static int __build_xattrs(struct inode *inode)
__releases(inode->i_lock)
__acquires(inode->i_lock)
{
u32 namelen;
u32 numattr = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment