Commit 7035cdf3 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
 "The bulk of this pull is a series from Alex that refactors and cleans
  up the RBD code to lay the groundwork for supporting the new image
  format and evolving feature set.  There are also some cleanups in
  libceph, and for ceph there's fixed validation of file striping
  layouts and a bugfix in the code handling a shrinking MDS cluster."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits)
  ceph: avoid 32-bit page index overflow
  ceph: return EIO on invalid layout on GET_DATALOC ioctl
  rbd: BUG on invalid layout
  ceph: propagate layout error on osd request creation
  libceph: check for invalid mapping
  ceph: convert to use le32_add_cpu()
  ceph: Fix oops when handling mdsmap that decreases max_mds
  rbd: update remaining header fields for v2
  rbd: get snapshot name for a v2 image
  rbd: get the snapshot context for a v2 image
  rbd: get image features for a v2 image
  rbd: get the object prefix for a v2 rbd image
  rbd: add code to get the size of a v2 rbd image
  rbd: lay out header probe infrastructure
  rbd: encapsulate code that gets snapshot info
  rbd: add an rbd features field
  rbd: don't use index in __rbd_add_snap_dev()
  rbd: kill create_snap sysfs entry
  rbd: define rbd_dev_image_id()
  rbd: define some new format constants
  ...
parents 6432f212 6285bc23
......@@ -25,6 +25,10 @@ client_id
The ceph unique client id that was assigned for this specific session.
features
A hexadecimal encoding of the feature bits for this image.
major
The block device major number.
......@@ -33,6 +37,11 @@ name
The name of the rbd image.
image_id
The unique id for the rbd image. (For rbd image format 1
this is empty.)
pool
The name of the storage pool where this rbd image resides.
......@@ -57,12 +66,6 @@ current_snap
The current snapshot for which the device is mapped.
create_snap
Create a snapshot:
$ echo <snap-name> > /sys/bus/rbd/devices/<dev-id>/snap_create
snap_*
A directory per each snapshot
......@@ -79,4 +82,7 @@ snap_size
The size of the image when this snapshot was taken.
snap_features
A hexadecimal encoding of the feature bits for this snapshot.
This diff is collapsed.
......@@ -15,15 +15,30 @@
#include <linux/types.h>
/* For format version 2, rbd image 'foo' consists of objects
* rbd_id.foo - id of image
* rbd_header.<id> - image metadata
* rbd_data.<id>.0000000000000000
* rbd_data.<id>.0000000000000001
* ... - data
* Clients do not access header data directly in rbd format 2.
*/
#define RBD_HEADER_PREFIX "rbd_header."
#define RBD_DATA_PREFIX "rbd_data."
#define RBD_ID_PREFIX "rbd_id."
/*
* rbd image 'foo' consists of objects
* foo.rbd - image metadata
* foo.00000000
* foo.00000001
* ... - data
* For format version 1, rbd image 'foo' consists of objects
* foo.rbd - image metadata
* rb.<idhi>.<idlo>.00000000
* rb.<idhi>.<idlo>.00000001
* ... - data
* There is no notion of a persistent image id in rbd format 1.
*/
#define RBD_SUFFIX ".rbd"
#define RBD_DIRECTORY "rbd_directory"
#define RBD_INFO "rbd_info"
......@@ -47,7 +62,7 @@ struct rbd_image_snap_ondisk {
struct rbd_image_header_ondisk {
char text[40];
char block_name[24];
char object_prefix[24];
char signature[4];
char version[8];
struct {
......
......@@ -205,7 +205,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
page->index << PAGE_CACHE_SHIFT, &len,
(u64) page_offset(page), &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
if (err == -ENOENT)
......@@ -286,7 +286,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
int nr_pages = 0;
int ret;
off = page->index << PAGE_CACHE_SHIFT;
off = (u64) page_offset(page);
/* count pages */
next_index = page->index;
......@@ -308,8 +308,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
NULL, 0,
ci->i_truncate_seq, ci->i_truncate_size,
NULL, false, 1, 0);
if (!req)
return -ENOMEM;
if (IS_ERR(req))
return PTR_ERR(req);
/* build page vector */
nr_pages = len >> PAGE_CACHE_SHIFT;
......@@ -426,7 +426,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_inode_info *ci;
struct ceph_fs_client *fsc;
struct ceph_osd_client *osdc;
loff_t page_off = page->index << PAGE_CACHE_SHIFT;
loff_t page_off = page_offset(page);
int len = PAGE_CACHE_SIZE;
loff_t i_size;
int err = 0;
......@@ -817,8 +817,7 @@ static int ceph_writepages_start(struct address_space *mapping,
/* ok */
if (locked_pages == 0) {
/* prepare async write request */
offset = (unsigned long long)page->index
<< PAGE_CACHE_SHIFT;
offset = (u64) page_offset(page);
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout,
......@@ -832,8 +831,8 @@ static int ceph_writepages_start(struct address_space *mapping,
ci->i_truncate_size,
&inode->i_mtime, true, 1, 0);
if (!req) {
rc = -ENOMEM;
if (IS_ERR(req)) {
rc = PTR_ERR(req);
unlock_page(page);
break;
}
......@@ -1180,7 +1179,7 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = vma->vm_file->f_dentry->d_inode;
struct page *page = vmf->page;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
loff_t off = page->index << PAGE_CACHE_SHIFT;
loff_t off = page_offset(page);
loff_t size, len;
int ret;
......
......@@ -1005,7 +1005,7 @@ static void __queue_cap_release(struct ceph_mds_session *session,
BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
head = msg->front.iov_base;
head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
le32_add_cpu(&head->num, 1);
item = msg->front.iov_base + msg->front.iov_len;
item->ino = cpu_to_le64(ino);
item->cap_id = cpu_to_le64(cap_id);
......
......@@ -536,8 +536,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
do_sync,
ci->i_truncate_seq, ci->i_truncate_size,
&mtime, false, 2, page_align);
if (!req)
return -ENOMEM;
if (IS_ERR(req))
return PTR_ERR(req);
if (file->f_flags & O_DIRECT) {
pages = ceph_get_direct_page_vector(data, num_pages, false);
......
......@@ -187,14 +187,18 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
u64 tmp;
struct ceph_object_layout ol;
struct ceph_pg pgid;
int r;
/* copy and validate */
if (copy_from_user(&dl, arg, sizeof(dl)))
return -EFAULT;
down_read(&osdc->map_sem);
ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
&dl.object_no, &dl.object_offset, &olen);
r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
&dl.object_no, &dl.object_offset,
&olen);
if (r < 0)
return -EIO;
dl.file_offset -= dl.object_offset;
dl.object_size = ceph_file_layout_object_size(ci->i_layout);
dl.block_size = ceph_file_layout_su(ci->i_layout);
......
......@@ -2625,7 +2625,8 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
if (i >= newmap->m_max_mds ||
memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
......
......@@ -307,7 +307,10 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
{
struct ceph_mount_options *fsopt;
const char *dev_name_end;
int err = -ENOMEM;
int err;
if (!dev_name || !*dev_name)
return -EINVAL;
fsopt = kzalloc(sizeof(*fsopt), GFP_KERNEL);
if (!fsopt)
......@@ -328,21 +331,33 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb();
/* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
/*
* Distinguish the server list from the path in "dev_name".
* Internally we do not include the leading '/' in the path.
*
* "dev_name" will look like:
* <server_spec>[,<server_spec>...]:[<path>]
* where
* <server_spec> is <ip>[:<port>]
* <path> is optional, but if present must begin with '/'
*/
dev_name_end = strchr(dev_name, '/');
if (dev_name_end) {
/* skip over leading '/' for path */
*path = dev_name_end + 1;
} else {
/* path is empty */
dev_name_end = dev_name + strlen(dev_name);
*path = dev_name_end;
}
err = -EINVAL;
if (!dev_name)
goto out;
*path = strstr(dev_name, ":/");
if (*path == NULL) {
pr_err("device name is missing path (no :/ in %s)\n",
dev_name_end--; /* back up to ':' separator */
if (*dev_name_end != ':') {
pr_err("device name is missing path (no : separator in %s)\n",
dev_name);
goto out;
}
dev_name_end = *path;
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
/* path on server */
*path += 2;
dout("server path '%s'\n", *path);
*popt = ceph_parse_options(options, dev_name, dev_name_end,
......
......@@ -71,7 +71,6 @@ struct ceph_mon_client {
int cur_mon; /* last monitor i contacted */
unsigned long sub_sent, sub_renew_after;
struct ceph_connection con;
bool have_fsid;
/* pending generic requests */
struct rb_root generic_request_tree;
......
......@@ -207,7 +207,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno,
......
......@@ -109,9 +109,9 @@ extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
/* calculate mapping of a file extent to an object */
extern void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen,
u64 *bno, u64 *oxoff, u64 *oxlen);
extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen,
u64 *bno, u64 *oxoff, u64 *oxlen);
/* calculate mapping of object to a placement group */
extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
......
......@@ -637,7 +637,7 @@ static void handle_poolop_reply(struct ceph_mon_client *monc,
/*
* Do a synchronous pool op.
*/
int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
static int do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
......@@ -687,7 +687,7 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
......@@ -696,7 +696,7 @@ EXPORT_SYMBOL(ceph_monc_create_snapid);
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, 0, 0);
}
......@@ -769,7 +769,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
}
monc->monmap->num_mon = num_mon;
monc->have_fsid = false;
return 0;
}
......
......@@ -52,7 +52,7 @@ static int op_has_extent(int op)
op == CEPH_OSD_OP_WRITE);
}
void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno,
......@@ -62,12 +62,15 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */
int r;
reqhead->snapid = cpu_to_le64(snapid);
/* object extent? */
ceph_calc_file_object_mapping(layout, off, plen, bno,
&objoff, &objlen);
r = ceph_calc_file_object_mapping(layout, off, plen, bno,
&objoff, &objlen);
if (r < 0)
return r;
if (*plen < orig_len)
dout(" skipping last %llu, final file extent %llu~%llu\n",
orig_len - *plen, off, *plen);
......@@ -83,7 +86,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
*bno, objoff, objlen, req->r_num_pages);
return 0;
}
EXPORT_SYMBOL(ceph_calc_raw_layout);
......@@ -112,20 +115,25 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
*
* fill osd op in request message.
*/
static void calc_layout(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
struct ceph_osd_request *req,
struct ceph_osd_req_op *op)
static int calc_layout(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
struct ceph_osd_request *req,
struct ceph_osd_req_op *op)
{
u64 bno;
int r;
ceph_calc_raw_layout(osdc, layout, vino.snap, off,
plen, &bno, req, op);
r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
plen, &bno, req, op);
if (r < 0)
return r;
snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
return r;
}
/*
......@@ -456,6 +464,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
{
struct ceph_osd_req_op ops[3];
struct ceph_osd_request *req;
int r;
ops[0].op = opcode;
ops[0].extent.truncate_seq = truncate_seq;
......@@ -474,10 +483,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
use_mempool,
GFP_NOFS, NULL, NULL);
if (!req)
return NULL;
return ERR_PTR(-ENOMEM);
/* calculate max write size */
calc_layout(osdc, vino, layout, off, plen, req, ops);
r = calc_layout(osdc, vino, layout, off, plen, req, ops);
if (r < 0)
return ERR_PTR(r);
req->r_file_layout = *layout; /* keep a copy */
/* in case it differs from natural (file) alignment that
......@@ -1920,8 +1931,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL,
false, 1, page_align);
if (!req)
return -ENOMEM;
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
req->r_pages = pages;
......@@ -1963,8 +1974,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
snapc, do_sync,
truncate_seq, truncate_size, mtime,
nofail, 1, page_align);
if (!req)
return -ENOMEM;
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
req->r_pages = pages;
......
......@@ -984,7 +984,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
* for now, we write only a single su, until we can
* pass a stride back to the caller.
*/
void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen,
u64 *ono,
u64 *oxoff, u64 *oxlen)
......@@ -998,11 +998,17 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen,
osize, su);
if (su == 0 || sc == 0)
goto invalid;
su_per_object = osize / su;
if (su_per_object == 0)
goto invalid;
dout("osize %u / su %u = su_per_object %u\n", osize, su,
su_per_object);
BUG_ON((su & ~PAGE_MASK) != 0);
if ((su & ~PAGE_MASK) != 0)
goto invalid;
/* bl = *off / su; */
t = off;
do_div(t, su);
......@@ -1030,6 +1036,14 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
*plen = *oxlen;
dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
return 0;
invalid:
dout(" invalid layout\n");
*ono = 0;
*oxoff = 0;
*oxlen = 0;
return -EINVAL;
}
EXPORT_SYMBOL(ceph_calc_file_object_mapping);
......
#include <linux/module.h>
#include <linux/gfp.h>
#include <linux/pagemap.h>
......@@ -134,8 +133,8 @@ int ceph_pagelist_truncate(struct ceph_pagelist *pl,
ceph_pagelist_unmap_tail(pl);
while (pl->head.prev != c->page_lru) {
page = list_entry(pl->head.prev, struct page, lru);
list_del(&page->lru); /* remove from pagelist */
list_add_tail(&page->lru, &pl->free_list); /* add to reserve */
/* move from pagelist to reserve */
list_move_tail(&page->lru, &pl->free_list);
++pl->num_pages_free;
}
pl->room = c->room;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment