Commit 086dcbfa authored by Filipe Manana's avatar Filipe Manana Committed by David Sterba

btrfs: insert items in batches when logging a directory when possible

When logging a directory, we scan its directory items from the subvolume
tree and then copy one by one into the log tree. This is not efficient
since we generally are able to insert several items in a batch, using a
single btree operation for adding several items at once. The reason we
copy items one by one is that we must check if each item was previously
logged in the current transaction, and if it was we either overwrite it
or skip it in case its content did not change in the subvolume tree (this
can happen only for dir item keys, but not for dir index keys), and doing
such check makes it a bit cumbersome to attempt batch insertions.

However the chances for doing batch insertions are very frequent and
always happen when:

1) Logging the directory for the first time in the current transaction,
   as none of the items exist in the log tree yet;

2) Logging new dir index keys, because the offset for new dir index keys
   comes from a monotonically increasing counter. This means if we keep
   adding dentries to a directory, through creation of new files and
   sub-directories or by adding new links or renaming from some other
   directory into the one we are logging, all the new dir index keys
   have a new offset that is greater than the offset of any previously
   logged index keys, so we can insert them in batches into the log tree.

For dir item keys, since their offset depends on the result of an hash
function against the dentry's name, unless the directory is being logged
for the first time in the current transaction, the chances being able to
insert the items in the log using batches is pretty much random and not
predictable, as it depends on the names of the dentries, but still happens
often enough.

So change directory logging to keep track of consecutive directory items
that don't exist yet in the log and batch insert them.

This patch is part of a patchset comprised of the following 5 patches:

  btrfs: remove root argument from btrfs_log_inode() and its callees
  btrfs: remove redundant log root assignment from log_dir_items()
  btrfs: factor out the copying loop of dir items from log_dir_items()
  btrfs: insert items in batches when logging a directory when possible
  btrfs: keep track of the last logged keys when logging a directory

This is patch 4/5. The change log of the last patch (5/5) has performance
results.
Signed-off-by: default avatarFilipe Manana <fdmanana@suse.com>
Signed-off-by: default avatarDavid Sterba <dsterba@suse.com>
parent eb10d85e
...@@ -368,25 +368,11 @@ static int process_one_buffer(struct btrfs_root *log, ...@@ -368,25 +368,11 @@ static int process_one_buffer(struct btrfs_root *log,
return ret; return ret;
} }
/* static int do_overwrite_item(struct btrfs_trans_handle *trans,
* Item overwrite used by replay and tree logging. eb, slot and key all refer struct btrfs_root *root,
* to the src data we are copying out. struct btrfs_path *path,
* struct extent_buffer *eb, int slot,
* root is the tree we are copying into, and path is a scratch struct btrfs_key *key)
* path for use in this function (it should be released on entry and
* will be released on exit).
*
* If the key is already in the destination tree the existing item is
* overwritten. If the existing item isn't big enough, it is extended.
* If it is too large, it is truncated.
*
* If the key isn't in the destination yet, a new item is inserted.
*/
static noinline int overwrite_item(struct btrfs_trans_handle *trans,
struct btrfs_root *root,
struct btrfs_path *path,
struct extent_buffer *eb, int slot,
struct btrfs_key *key)
{ {
int ret; int ret;
u32 item_size; u32 item_size;
...@@ -403,10 +389,22 @@ static noinline int overwrite_item(struct btrfs_trans_handle *trans, ...@@ -403,10 +389,22 @@ static noinline int overwrite_item(struct btrfs_trans_handle *trans,
item_size = btrfs_item_size_nr(eb, slot); item_size = btrfs_item_size_nr(eb, slot);
src_ptr = btrfs_item_ptr_offset(eb, slot); src_ptr = btrfs_item_ptr_offset(eb, slot);
/* look for the key in the destination tree */ /* Our caller must have done a search for the key for us. */
ret = btrfs_search_slot(NULL, root, key, path, 0, 0); ASSERT(path->nodes[0] != NULL);
if (ret < 0)
return ret; /*
* And the slot must point to the exact key or the slot where the key
* should be at (the first item with a key greater than 'key')
*/
if (path->slots[0] < btrfs_header_nritems(path->nodes[0])) {
struct btrfs_key found_key;
btrfs_item_key_to_cpu(path->nodes[0], &found_key, path->slots[0]);
ret = btrfs_comp_cpu_keys(&found_key, key);
ASSERT(ret >= 0);
} else {
ret = 1;
}
if (ret == 0) { if (ret == 0) {
char *src_copy; char *src_copy;
...@@ -584,6 +582,36 @@ static noinline int overwrite_item(struct btrfs_trans_handle *trans, ...@@ -584,6 +582,36 @@ static noinline int overwrite_item(struct btrfs_trans_handle *trans,
return 0; return 0;
} }
/*
* Item overwrite used by replay and tree logging. eb, slot and key all refer
* to the src data we are copying out.
*
* root is the tree we are copying into, and path is a scratch
* path for use in this function (it should be released on entry and
* will be released on exit).
*
* If the key is already in the destination tree the existing item is
* overwritten. If the existing item isn't big enough, it is extended.
* If it is too large, it is truncated.
*
* If the key isn't in the destination yet, a new item is inserted.
*/
static int overwrite_item(struct btrfs_trans_handle *trans,
struct btrfs_root *root,
struct btrfs_path *path,
struct extent_buffer *eb, int slot,
struct btrfs_key *key)
{
int ret;
/* Look for the key in the destination tree. */
ret = btrfs_search_slot(NULL, root, key, path, 0, 0);
if (ret < 0)
return ret;
return do_overwrite_item(trans, root, path, eb, slot, key);
}
/* /*
* simple helper to read an inode off the disk from a given root * simple helper to read an inode off the disk from a given root
* This can only be called for subvolume roots and not for the log * This can only be called for subvolume roots and not for the log
...@@ -3632,6 +3660,68 @@ static noinline int insert_dir_log_key(struct btrfs_trans_handle *trans, ...@@ -3632,6 +3660,68 @@ static noinline int insert_dir_log_key(struct btrfs_trans_handle *trans,
return 0; return 0;
} }
static int flush_dir_items_batch(struct btrfs_trans_handle *trans,
struct btrfs_root *log,
struct extent_buffer *src,
struct btrfs_path *dst_path,
int start_slot,
int count)
{
char *ins_data = NULL;
struct btrfs_key *ins_keys;
u32 *ins_sizes;
struct extent_buffer *dst;
struct btrfs_key key;
u32 item_size;
int ret;
int i;
ASSERT(count > 0);
if (count == 1) {
btrfs_item_key_to_cpu(src, &key, start_slot);
item_size = btrfs_item_size_nr(src, start_slot);
ins_keys = &key;
ins_sizes = &item_size;
} else {
ins_data = kmalloc(count * sizeof(u32) +
count * sizeof(struct btrfs_key), GFP_NOFS);
if (!ins_data)
return -ENOMEM;
ins_sizes = (u32 *)ins_data;
ins_keys = (struct btrfs_key *)(ins_data + count * sizeof(u32));
for (i = 0; i < count; i++) {
const int slot = start_slot + i;
btrfs_item_key_to_cpu(src, &ins_keys[i], slot);
ins_sizes[i] = btrfs_item_size_nr(src, slot);
}
}
ret = btrfs_insert_empty_items(trans, log, dst_path, ins_keys, ins_sizes,
count);
if (ret)
goto out;
dst = dst_path->nodes[0];
for (i = 0; i < count; i++) {
unsigned long src_offset;
unsigned long dst_offset;
dst_offset = btrfs_item_ptr_offset(dst, dst_path->slots[0]);
src_offset = btrfs_item_ptr_offset(src, start_slot + i);
copy_extent_buffer(dst, src, dst_offset, src_offset, ins_sizes[i]);
dst_path->slots[0]++;
}
btrfs_release_path(dst_path);
out:
kfree(ins_data);
return ret;
}
static int process_dir_items_leaf(struct btrfs_trans_handle *trans, static int process_dir_items_leaf(struct btrfs_trans_handle *trans,
struct btrfs_inode *inode, struct btrfs_inode *inode,
struct btrfs_path *path, struct btrfs_path *path,
...@@ -3643,21 +3733,22 @@ static int process_dir_items_leaf(struct btrfs_trans_handle *trans, ...@@ -3643,21 +3733,22 @@ static int process_dir_items_leaf(struct btrfs_trans_handle *trans,
struct extent_buffer *src = path->nodes[0]; struct extent_buffer *src = path->nodes[0];
const int nritems = btrfs_header_nritems(src); const int nritems = btrfs_header_nritems(src);
const u64 ino = btrfs_ino(inode); const u64 ino = btrfs_ino(inode);
const bool inode_logged_before = inode_logged(trans, inode);
bool last_found = false;
int batch_start = 0;
int batch_size = 0;
int i; int i;
for (i = path->slots[0]; i < nritems; i++) { for (i = path->slots[0]; i < nritems; i++) {
struct btrfs_key key; struct btrfs_key key;
struct btrfs_dir_item *di;
int ret; int ret;
btrfs_item_key_to_cpu(src, &key, i); btrfs_item_key_to_cpu(src, &key, i);
if (key.objectid != ino || key.type != key_type) if (key.objectid != ino || key.type != key_type) {
return 1; last_found = true;
break;
ret = overwrite_item(trans, log, dst_path, src, i, &key); }
if (ret < 0)
return ret;
/* /*
* We must make sure that when we log a directory entry, the * We must make sure that when we log a directory entry, the
...@@ -3681,15 +3772,67 @@ static int process_dir_items_leaf(struct btrfs_trans_handle *trans, ...@@ -3681,15 +3772,67 @@ static int process_dir_items_leaf(struct btrfs_trans_handle *trans,
* never be decremented to the value BTRFS_EMPTY_DIR_SIZE, * never be decremented to the value BTRFS_EMPTY_DIR_SIZE,
* resulting in -ENOTEMPTY errors. * resulting in -ENOTEMPTY errors.
*/ */
di = btrfs_item_ptr(src, i, struct btrfs_dir_item); if (!ctx->log_new_dentries) {
btrfs_dir_item_key_to_cpu(src, di, &key); struct btrfs_dir_item *di;
if ((btrfs_dir_transid(src, di) == trans->transid || struct btrfs_key di_key;
btrfs_dir_type(src, di) == BTRFS_FT_DIR) &&
key.type != BTRFS_ROOT_ITEM_KEY) di = btrfs_item_ptr(src, i, struct btrfs_dir_item);
ctx->log_new_dentries = true; btrfs_dir_item_key_to_cpu(src, di, &di_key);
if ((btrfs_dir_transid(src, di) == trans->transid ||
btrfs_dir_type(src, di) == BTRFS_FT_DIR) &&
di_key.type != BTRFS_ROOT_ITEM_KEY)
ctx->log_new_dentries = true;
}
if (!inode_logged_before)
goto add_to_batch;
/*
* Check if the key was already logged before. If not we can add
* it to a batch for bulk insertion.
*/
ret = btrfs_search_slot(NULL, log, &key, dst_path, 0, 0);
if (ret < 0) {
return ret;
} else if (ret > 0) {
btrfs_release_path(dst_path);
goto add_to_batch;
}
/*
* Item exists in the log. Overwrite the item in the log if it
* has different content or do nothing if it has exactly the same
* content. And then flush the current batch if any - do it after
* overwriting the current item, or we would deadlock otherwise,
* since we are holding a path for the existing item.
*/
ret = do_overwrite_item(trans, log, dst_path, src, i, &key);
if (ret < 0)
return ret;
if (batch_size > 0) {
ret = flush_dir_items_batch(trans, log, src, dst_path,
batch_start, batch_size);
if (ret < 0)
return ret;
batch_size = 0;
}
continue;
add_to_batch:
if (batch_size == 0)
batch_start = i;
batch_size++;
} }
return 0; if (batch_size > 0) {
int ret;
ret = flush_dir_items_batch(trans, log, src, dst_path,
batch_start, batch_size);
if (ret < 0)
return ret;
}
return last_found ? 1 : 0;
} }
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment