Commit b97e9257 authored by Goldwyn Rodrigues's avatar Goldwyn Rodrigues

Use separate bitmaps for each nodes in the cluster

On-disk format:

0                    4k                     8k                    12k
-------------------------------------------------------------------
| idle                | md super            | bm super [0] + bits |
| bm bits[0, contd]   | bm super[1] + bits  | bm bits[1, contd]   |
| bm super[2] + bits  | bm bits [2, contd]  | bm super[3] + bits  |
| bm bits [3, contd]  |                     |                     |

Bitmap super has a field nodes, which defines the maximum number
of nodes the device can use. While reading the bitmap super, if
the cluster finds out that the number of nodes is > 0:
1. Requests the md-cluster module.
2. Calls md_cluster_ops->join(), which sets up clustering such as
   joining DLM lockspace.

Since the first time, the first bitmap is read. After the call
to the cluster_setup, the bitmap offset is adjusted and the
superblock is re-read. This also ensures the bitmap is read
the bitmap lock (when bitmap lock is introduced in later patches)

Questions:
1. cluster name is repeated in all bitmap supers. Is that okay?
Signed-off-by: default avatarGoldwyn Rodrigues <rgoldwyn@suse.com>
parent cf921cc1
...@@ -205,6 +205,10 @@ static int write_sb_page(struct bitmap *bitmap, struct page *page, int wait) ...@@ -205,6 +205,10 @@ static int write_sb_page(struct bitmap *bitmap, struct page *page, int wait)
struct block_device *bdev; struct block_device *bdev;
struct mddev *mddev = bitmap->mddev; struct mddev *mddev = bitmap->mddev;
struct bitmap_storage *store = &bitmap->storage; struct bitmap_storage *store = &bitmap->storage;
int node_offset = 0;
if (mddev_is_clustered(bitmap->mddev))
node_offset = bitmap->cluster_slot * store->file_pages;
while ((rdev = next_active_rdev(rdev, mddev)) != NULL) { while ((rdev = next_active_rdev(rdev, mddev)) != NULL) {
int size = PAGE_SIZE; int size = PAGE_SIZE;
...@@ -549,6 +553,7 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -549,6 +553,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
unsigned long sectors_reserved = 0; unsigned long sectors_reserved = 0;
int err = -EINVAL; int err = -EINVAL;
struct page *sb_page; struct page *sb_page;
int cluster_setup_done = 0;
if (!bitmap->storage.file && !bitmap->mddev->bitmap_info.offset) { if (!bitmap->storage.file && !bitmap->mddev->bitmap_info.offset) {
chunksize = 128 * 1024 * 1024; chunksize = 128 * 1024 * 1024;
...@@ -564,6 +569,7 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -564,6 +569,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
return -ENOMEM; return -ENOMEM;
bitmap->storage.sb_page = sb_page; bitmap->storage.sb_page = sb_page;
re_read:
if (bitmap->storage.file) { if (bitmap->storage.file) {
loff_t isize = i_size_read(bitmap->storage.file->f_mapping->host); loff_t isize = i_size_read(bitmap->storage.file->f_mapping->host);
int bytes = isize > PAGE_SIZE ? PAGE_SIZE : isize; int bytes = isize > PAGE_SIZE ? PAGE_SIZE : isize;
...@@ -579,6 +585,7 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -579,6 +585,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
if (err) if (err)
return err; return err;
err = -EINVAL;
sb = kmap_atomic(sb_page); sb = kmap_atomic(sb_page);
chunksize = le32_to_cpu(sb->chunksize); chunksize = le32_to_cpu(sb->chunksize);
...@@ -586,6 +593,7 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -586,6 +593,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
write_behind = le32_to_cpu(sb->write_behind); write_behind = le32_to_cpu(sb->write_behind);
sectors_reserved = le32_to_cpu(sb->sectors_reserved); sectors_reserved = le32_to_cpu(sb->sectors_reserved);
nodes = le32_to_cpu(sb->nodes); nodes = le32_to_cpu(sb->nodes);
strlcpy(bitmap->mddev->bitmap_info.cluster_name, sb->cluster_name, 64);
/* verify that the bitmap-specific fields are valid */ /* verify that the bitmap-specific fields are valid */
if (sb->magic != cpu_to_le32(BITMAP_MAGIC)) if (sb->magic != cpu_to_le32(BITMAP_MAGIC))
...@@ -622,7 +630,7 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -622,7 +630,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
goto out; goto out;
} }
events = le64_to_cpu(sb->events); events = le64_to_cpu(sb->events);
if (events < bitmap->mddev->events) { if (!nodes && (events < bitmap->mddev->events)) {
printk(KERN_INFO printk(KERN_INFO
"%s: bitmap file is out of date (%llu < %llu) " "%s: bitmap file is out of date (%llu < %llu) "
"-- forcing full recovery\n", "-- forcing full recovery\n",
...@@ -639,8 +647,34 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -639,8 +647,34 @@ static int bitmap_read_sb(struct bitmap *bitmap)
bitmap->events_cleared = le64_to_cpu(sb->events_cleared); bitmap->events_cleared = le64_to_cpu(sb->events_cleared);
strlcpy(bitmap->mddev->bitmap_info.cluster_name, sb->cluster_name, 64); strlcpy(bitmap->mddev->bitmap_info.cluster_name, sb->cluster_name, 64);
err = 0; err = 0;
out: out:
kunmap_atomic(sb); kunmap_atomic(sb);
if (nodes && !cluster_setup_done) {
sector_t bm_blocks;
bm_blocks = sector_div(bitmap->mddev->resync_max_sectors, (chunksize >> 9));
bm_blocks = bm_blocks << 3;
/* We have bitmap supers at 4k boundaries, hence this
* is hardcoded */
bm_blocks = DIV_ROUND_UP(bm_blocks, 4096);
err = md_setup_cluster(bitmap->mddev, nodes);
if (err) {
pr_err("%s: Could not setup cluster service (%d)\n",
bmname(bitmap), err);
goto out_no_sb;
}
bitmap->cluster_slot = md_cluster_ops->slot_number(bitmap->mddev);
bitmap->mddev->bitmap_info.offset +=
bitmap->cluster_slot * (bm_blocks << 3);
pr_info("%s:%d bm slot: %d offset: %llu\n", __func__, __LINE__,
bitmap->cluster_slot,
(unsigned long long)bitmap->mddev->bitmap_info.offset);
cluster_setup_done = 1;
goto re_read;
}
out_no_sb: out_no_sb:
if (test_bit(BITMAP_STALE, &bitmap->flags)) if (test_bit(BITMAP_STALE, &bitmap->flags))
bitmap->events_cleared = bitmap->mddev->events; bitmap->events_cleared = bitmap->mddev->events;
...@@ -651,8 +685,11 @@ static int bitmap_read_sb(struct bitmap *bitmap) ...@@ -651,8 +685,11 @@ static int bitmap_read_sb(struct bitmap *bitmap)
if (bitmap->mddev->bitmap_info.space == 0 || if (bitmap->mddev->bitmap_info.space == 0 ||
bitmap->mddev->bitmap_info.space > sectors_reserved) bitmap->mddev->bitmap_info.space > sectors_reserved)
bitmap->mddev->bitmap_info.space = sectors_reserved; bitmap->mddev->bitmap_info.space = sectors_reserved;
if (err) if (err) {
bitmap_print_sb(bitmap); bitmap_print_sb(bitmap);
if (cluster_setup_done)
md_cluster_stop(bitmap->mddev);
}
return err; return err;
} }
...@@ -697,9 +734,10 @@ static inline struct page *filemap_get_page(struct bitmap_storage *store, ...@@ -697,9 +734,10 @@ static inline struct page *filemap_get_page(struct bitmap_storage *store,
} }
static int bitmap_storage_alloc(struct bitmap_storage *store, static int bitmap_storage_alloc(struct bitmap_storage *store,
unsigned long chunks, int with_super) unsigned long chunks, int with_super,
int slot_number)
{ {
int pnum; int pnum, offset = 0;
unsigned long num_pages; unsigned long num_pages;
unsigned long bytes; unsigned long bytes;
...@@ -708,6 +746,7 @@ static int bitmap_storage_alloc(struct bitmap_storage *store, ...@@ -708,6 +746,7 @@ static int bitmap_storage_alloc(struct bitmap_storage *store,
bytes += sizeof(bitmap_super_t); bytes += sizeof(bitmap_super_t);
num_pages = DIV_ROUND_UP(bytes, PAGE_SIZE); num_pages = DIV_ROUND_UP(bytes, PAGE_SIZE);
offset = slot_number * (num_pages - 1);
store->filemap = kmalloc(sizeof(struct page *) store->filemap = kmalloc(sizeof(struct page *)
* num_pages, GFP_KERNEL); * num_pages, GFP_KERNEL);
...@@ -718,20 +757,22 @@ static int bitmap_storage_alloc(struct bitmap_storage *store, ...@@ -718,20 +757,22 @@ static int bitmap_storage_alloc(struct bitmap_storage *store,
store->sb_page = alloc_page(GFP_KERNEL|__GFP_ZERO); store->sb_page = alloc_page(GFP_KERNEL|__GFP_ZERO);
if (store->sb_page == NULL) if (store->sb_page == NULL)
return -ENOMEM; return -ENOMEM;
store->sb_page->index = 0;
} }
pnum = 0; pnum = 0;
if (store->sb_page) { if (store->sb_page) {
store->filemap[0] = store->sb_page; store->filemap[0] = store->sb_page;
pnum = 1; pnum = 1;
store->sb_page->index = offset;
} }
for ( ; pnum < num_pages; pnum++) { for ( ; pnum < num_pages; pnum++) {
store->filemap[pnum] = alloc_page(GFP_KERNEL|__GFP_ZERO); store->filemap[pnum] = alloc_page(GFP_KERNEL|__GFP_ZERO);
if (!store->filemap[pnum]) { if (!store->filemap[pnum]) {
store->file_pages = pnum; store->file_pages = pnum;
return -ENOMEM; return -ENOMEM;
} }
store->filemap[pnum]->index = pnum; store->filemap[pnum]->index = pnum + offset;
} }
store->file_pages = pnum; store->file_pages = pnum;
...@@ -940,7 +981,7 @@ static void bitmap_set_memory_bits(struct bitmap *bitmap, sector_t offset, int n ...@@ -940,7 +981,7 @@ static void bitmap_set_memory_bits(struct bitmap *bitmap, sector_t offset, int n
*/ */
static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start) static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
{ {
unsigned long i, chunks, index, oldindex, bit; unsigned long i, chunks, index, oldindex, bit, node_offset = 0;
struct page *page = NULL; struct page *page = NULL;
unsigned long bit_cnt = 0; unsigned long bit_cnt = 0;
struct file *file; struct file *file;
...@@ -986,6 +1027,9 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start) ...@@ -986,6 +1027,9 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
if (!bitmap->mddev->bitmap_info.external) if (!bitmap->mddev->bitmap_info.external)
offset = sizeof(bitmap_super_t); offset = sizeof(bitmap_super_t);
if (mddev_is_clustered(bitmap->mddev))
node_offset = bitmap->cluster_slot * (DIV_ROUND_UP(store->bytes, PAGE_SIZE));
for (i = 0; i < chunks; i++) { for (i = 0; i < chunks; i++) {
int b; int b;
index = file_page_index(&bitmap->storage, i); index = file_page_index(&bitmap->storage, i);
...@@ -1006,7 +1050,7 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start) ...@@ -1006,7 +1050,7 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
bitmap->mddev, bitmap->mddev,
bitmap->mddev->bitmap_info.offset, bitmap->mddev->bitmap_info.offset,
page, page,
index, count); index + node_offset, count);
if (ret) if (ret)
goto err; goto err;
...@@ -1212,7 +1256,6 @@ void bitmap_daemon_work(struct mddev *mddev) ...@@ -1212,7 +1256,6 @@ void bitmap_daemon_work(struct mddev *mddev)
j < bitmap->storage.file_pages j < bitmap->storage.file_pages
&& !test_bit(BITMAP_STALE, &bitmap->flags); && !test_bit(BITMAP_STALE, &bitmap->flags);
j++) { j++) {
if (test_page_attr(bitmap, j, if (test_page_attr(bitmap, j,
BITMAP_PAGE_DIRTY)) BITMAP_PAGE_DIRTY))
/* bitmap_unplug will handle the rest */ /* bitmap_unplug will handle the rest */
...@@ -1596,6 +1639,9 @@ static void bitmap_free(struct bitmap *bitmap) ...@@ -1596,6 +1639,9 @@ static void bitmap_free(struct bitmap *bitmap)
if (!bitmap) /* there was no bitmap */ if (!bitmap) /* there was no bitmap */
return; return;
if (mddev_is_clustered(bitmap->mddev) && bitmap->mddev->cluster_info)
md_cluster_stop(bitmap->mddev);
/* Shouldn't be needed - but just in case.... */ /* Shouldn't be needed - but just in case.... */
wait_event(bitmap->write_wait, wait_event(bitmap->write_wait,
atomic_read(&bitmap->pending_writes) == 0); atomic_read(&bitmap->pending_writes) == 0);
...@@ -1854,7 +1900,8 @@ int bitmap_resize(struct bitmap *bitmap, sector_t blocks, ...@@ -1854,7 +1900,8 @@ int bitmap_resize(struct bitmap *bitmap, sector_t blocks,
memset(&store, 0, sizeof(store)); memset(&store, 0, sizeof(store));
if (bitmap->mddev->bitmap_info.offset || bitmap->mddev->bitmap_info.file) if (bitmap->mddev->bitmap_info.offset || bitmap->mddev->bitmap_info.file)
ret = bitmap_storage_alloc(&store, chunks, ret = bitmap_storage_alloc(&store, chunks,
!bitmap->mddev->bitmap_info.external); !bitmap->mddev->bitmap_info.external,
bitmap->cluster_slot);
if (ret) if (ret)
goto err; goto err;
......
...@@ -227,6 +227,7 @@ struct bitmap { ...@@ -227,6 +227,7 @@ struct bitmap {
wait_queue_head_t behind_wait; wait_queue_head_t behind_wait;
struct kernfs_node *sysfs_can_clear; struct kernfs_node *sysfs_can_clear;
int cluster_slot; /* Slot offset for clustered env */
}; };
/* the bitmap API */ /* the bitmap API */
......
...@@ -196,6 +196,12 @@ static int join(struct mddev *mddev, int nodes) ...@@ -196,6 +196,12 @@ static int join(struct mddev *mddev, int nodes)
if (ret) if (ret)
goto err; goto err;
wait_for_completion(&cinfo->completion); wait_for_completion(&cinfo->completion);
if (nodes <= cinfo->slot_number) {
pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
nodes);
ret = -ERANGE;
goto err;
}
cinfo->sb_lock = lockres_init(mddev, "cmd-super", cinfo->sb_lock = lockres_init(mddev, "cmd-super",
NULL, 0); NULL, 0);
if (!cinfo->sb_lock) { if (!cinfo->sb_lock) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment