Commit 382f4581 authored by Martin Brandenburg's avatar Martin Brandenburg Committed by Mike Marshall

orangefs: rewrite readdir to fix several bugs

In the past, readdir assumed that the user buffer will be large enough
that all entries from the server will fit.  If this was not true,
entries would be skipped.

Since it works now, request 512 entries rather than 96 per server
operation.
Signed-off-by: default avatarMartin Brandenburg <martin@omnibond.com>
Signed-off-by: default avatarMike Marshall <hubcap@omnibond.com>
parent 17930b25
/* /*
* (C) 2001 Clemson University and The University of Chicago * Copyright 2017 Omnibond Systems, L.L.C.
*
* See COPYING in top-level directory.
*/ */
#include "protocol.h" #include "protocol.h"
...@@ -9,388 +7,244 @@ ...@@ -9,388 +7,244 @@
#include "orangefs-bufmap.h" #include "orangefs-bufmap.h"
/* /*
* decode routine used by kmod to deal with the blob sent from * There can be up to 512 directory entries. Each entry is encoded as
* userspace for readdirs. The blob contains zero or more of these * follows:
* sub-blobs: * 4 bytes: string size (n)
* __u32 - represents length of the character string that follows. * n bytes: string
* string - between 1 and ORANGEFS_NAME_MAX bytes long. * 1 byte: trailing zero
* padding - (if needed) to cause the __u32 plus the string to be * padding to 8 bytes
* eight byte aligned. * 16 bytes: khandle
* khandle - sizeof(khandle) bytes. * padding to 8 bytes
*/ */
static long decode_dirents(char *ptr, size_t size, #define MAX_DIRECTORY ((4 + 257 + 3 + 16)*512)
struct orangefs_readdir_response_s *readdir)
{
int i;
struct orangefs_readdir_response_s *rd =
(struct orangefs_readdir_response_s *) ptr;
char *buf = ptr;
int khandle_size = sizeof(struct orangefs_khandle);
size_t offset = offsetof(struct orangefs_readdir_response_s,
dirent_array);
/* 8 reflects eight byte alignment */
int smallest_blob = khandle_size + 8;
__u32 len;
int aligned_len;
int sizeof_u32 = sizeof(__u32);
long ret;
gossip_debug(GOSSIP_DIR_DEBUG, "%s: size:%zu:\n", __func__, size);
/* size is = offset on empty dirs, > offset on non-empty dirs... */ struct orangefs_dir {
if (size < offset) { __u64 token;
gossip_err("%s: size:%zu: offset:%zu:\n", void *directory;
__func__, size_t i, len;
size, int error;
offset); };
ret = -EINVAL;
goto out;
}
if ((size == offset) && (readdir->orangefs_dirent_outcount != 0)) {
gossip_err("%s: size:%zu: dirent_outcount:%d:\n",
__func__,
size,
readdir->orangefs_dirent_outcount);
ret = -EINVAL;
goto out;
}
readdir->token = rd->token;
readdir->orangefs_dirent_outcount = rd->orangefs_dirent_outcount;
readdir->dirent_array = kcalloc(readdir->orangefs_dirent_outcount,
sizeof(*readdir->dirent_array),
GFP_KERNEL);
if (readdir->dirent_array == NULL) {
gossip_err("%s: kcalloc failed.\n", __func__);
ret = -ENOMEM;
goto out;
}
buf += offset;
size -= offset;
for (i = 0; i < readdir->orangefs_dirent_outcount; i++) { /*
if (size < smallest_blob) { * The userspace component sends several directory entries of the
gossip_err("%s: size:%zu: smallest_blob:%d:\n", * following format. The first four bytes are the string length not
__func__, * including a trailing zero byte. This is followed by the string and a
size, * trailing zero padded to the next four byte boundry. This is followed
smallest_blob); * by the sixteen byte khandle padded to the next eight byte boundry.
ret = -EINVAL; *
goto free; * The trailer_buf starts with a struct orangefs_readdir_response_s
} * which must be skipped to get to the directory data.
*/
len = *(__u32 *)buf; static int orangefs_dir_more(struct orangefs_inode_s *oi,
if ((len < 1) || (len > ORANGEFS_NAME_MAX)) { struct orangefs_dir *od, struct dentry *dentry)
gossip_err("%s: len:%d:\n", __func__, len); {
ret = -EINVAL; const size_t offset =
goto free; sizeof(struct orangefs_readdir_response_s);
struct orangefs_readdir_response_s *resp;
struct orangefs_kernel_op_s *op;
int bufi, r;
op = op_alloc(ORANGEFS_VFS_OP_READDIR);
if (!op) {
od->error = -ENOMEM;
return -ENOMEM;
} }
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: size:%zu: len:%d:\n",
__func__,
size,
len);
readdir->dirent_array[i].d_name = buf + sizeof_u32;
readdir->dirent_array[i].d_length = len;
/* /*
* Calculate "aligned" length of this string and its * Despite the badly named field, readdir does not use shared
* associated __u32 descriptor. * memory. However, there are a limited number of readdir
* slots, which must be allocated here. This flag simply tells
* the op scheduler to return the op here for retry.
*/ */
aligned_len = ((sizeof_u32 + len + 1) + 7) & ~7; op->uses_shared_memory = 1;
gossip_debug(GOSSIP_DIR_DEBUG, op->upcall.req.readdir.refn = oi->refn;
"%s: aligned_len:%d:\n", op->upcall.req.readdir.token = od->token;
__func__, op->upcall.req.readdir.max_dirent_count =
aligned_len); ORANGEFS_MAX_DIRENT_COUNT_READDIR;
/* again:
* The end of the blob should coincide with the end bufi = orangefs_readdir_index_get();
* of the last sub-blob. if (bufi < 0) {
*/ op_release(op);
if (size < aligned_len + khandle_size) { od->error = bufi;
gossip_err("%s: ran off the end of the blob.\n", return bufi;
__func__);
ret = -EINVAL;
goto free;
} }
size -= aligned_len + khandle_size;
buf += aligned_len; op->upcall.req.readdir.buf_index = bufi;
readdir->dirent_array[i].khandle = r = service_operation(op, "orangefs_readdir",
*(struct orangefs_khandle *) buf; get_interruptible_flag(dentry->d_inode));
buf += khandle_size;
}
ret = buf - ptr;
gossip_debug(GOSSIP_DIR_DEBUG, "%s: returning:%ld:\n", __func__, ret);
goto out;
free:
kfree(readdir->dirent_array);
readdir->dirent_array = NULL;
out: orangefs_readdir_index_put(bufi);
return ret;
if (op_state_purged(op)) {
if (r == -EAGAIN) {
vfree(op->downcall.trailer_buf);
goto again;
} else if (r == -EIO) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = r;
return r;
}
}
if (r < 0) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = r;
return r;
} else if (op->downcall.status) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = op->downcall.status;
return op->downcall.status;
}
resp = (struct orangefs_readdir_response_s *)
op->downcall.trailer_buf;
od->token = resp->token;
if (od->len + op->downcall.trailer_size - offset <=
MAX_DIRECTORY) {
memcpy(od->directory + od->len,
op->downcall.trailer_buf + offset,
op->downcall.trailer_size - offset);
od->len += op->downcall.trailer_size - offset;
} else {
/* This limit was chosen based on protocol limits. */
gossip_err("orangefs_dir_more: userspace sent too much data\n");
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = -EIO;
return -EIO;
}
vfree(op->downcall.trailer_buf);
op_release(op);
return 0;
} }
/* static int orangefs_dir_fill(struct orangefs_inode_s *oi,
* Read directory entries from an instance of an open directory. struct orangefs_dir *od, struct dentry *dentry,
*/ struct dir_context *ctx)
static int orangefs_readdir(struct file *file, struct dir_context *ctx)
{ {
int ret = 0; struct orangefs_khandle *khandle;
int buffer_index; __u32 *len, padlen;
char *s;
while (od->i < od->len) {
if (od->len < od->i + sizeof *len)
goto eio;
len = od->directory + od->i;
/* /*
* ptoken supports Orangefs' distributed directory logic, added * len is the size of the string itself. padlen is the
* in 2.9.2. * total size of the encoded string.
*/ */
__u64 *ptoken = file->private_data; padlen = (sizeof *len + *len + 1) +
__u64 pos = 0; (4 - (sizeof *len + *len + 1)%8)%8;
ino_t ino = 0; if (od->len < od->i + padlen + sizeof *khandle)
struct dentry *dentry = file->f_path.dentry; goto eio;
struct orangefs_kernel_op_s *new_op = NULL; s = od->directory + od->i + sizeof *len;
struct orangefs_inode_s *orangefs_inode = ORANGEFS_I(dentry->d_inode); if (s[*len] != 0)
struct orangefs_readdir_response_s readdir_response; goto eio;
void *dents_buf; khandle = od->directory + od->i + padlen;
int i = 0;
int len = 0; if (!dir_emit(ctx, s, *len,
ino_t current_ino = 0; orangefs_khandle_to_ino(khandle), DT_UNKNOWN))
char *current_entry = NULL;
long bytes_decoded;
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: ctx->pos:%lld, ptoken = %llu\n",
__func__,
lld(ctx->pos),
llu(*ptoken));
pos = (__u64) ctx->pos;
/* are we done? */
if (pos == ORANGEFS_READDIR_END) {
gossip_debug(GOSSIP_DIR_DEBUG,
"Skipping to termination path\n");
return 0; return 0;
od->i += padlen + sizeof *khandle;
od->i = od->i + (8 - od->i%8)%8;
ctx->pos = 2 + od->i;
} }
BUG_ON(od->i > od->len);
return 0;
eio:
gossip_err("orangefs_dir_fill: userspace returns corrupt data\n");
od->error = -EIO;
return -EIO;
}
gossip_debug(GOSSIP_DIR_DEBUG, static int orangefs_dir_iterate(struct file *file,
"orangefs_readdir called on %pd (pos=%llu)\n", struct dir_context *ctx)
dentry, llu(pos)); {
struct orangefs_inode_s *oi;
memset(&readdir_response, 0, sizeof(readdir_response)); struct orangefs_dir *od;
struct dentry *dentry;
new_op = op_alloc(ORANGEFS_VFS_OP_READDIR); int r;
if (!new_op)
return -ENOMEM;
/*
* Only the indices are shared. No memory is actually shared, but the
* mechanism is used.
*/
new_op->uses_shared_memory = 1;
new_op->upcall.req.readdir.refn = orangefs_inode->refn;
new_op->upcall.req.readdir.max_dirent_count =
ORANGEFS_MAX_DIRENT_COUNT_READDIR;
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: upcall.req.readdir.refn.khandle: %pU\n",
__func__,
&new_op->upcall.req.readdir.refn.khandle);
new_op->upcall.req.readdir.token = *ptoken;
get_new_buffer_index:
buffer_index = orangefs_readdir_index_get();
if (buffer_index < 0) {
ret = buffer_index;
gossip_lerr("orangefs_readdir: orangefs_readdir_index_get() failure (%d)\n",
ret);
goto out_free_op;
}
new_op->upcall.req.readdir.buf_index = buffer_index;
ret = service_operation(new_op,
"orangefs_readdir",
get_interruptible_flag(dentry->d_inode));
gossip_debug(GOSSIP_DIR_DEBUG,
"Readdir downcall status is %d. ret:%d\n",
new_op->downcall.status,
ret);
orangefs_readdir_index_put(buffer_index);
if (ret == -EAGAIN && op_state_purged(new_op)) {
/* Client-core indices are invalid after it restarted. */
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: Getting new buffer_index for retry of readdir..\n",
__func__);
goto get_new_buffer_index;
}
if (ret == -EIO && op_state_purged(new_op)) {
gossip_err("%s: Client is down. Aborting readdir call.\n",
__func__);
goto out_free_op;
}
if (ret < 0 || new_op->downcall.status != 0) {
gossip_debug(GOSSIP_DIR_DEBUG,
"Readdir request failed. Status:%d\n",
new_op->downcall.status);
if (ret >= 0)
ret = new_op->downcall.status;
goto out_free_op;
}
dents_buf = new_op->downcall.trailer_buf;
if (dents_buf == NULL) {
gossip_err("Invalid NULL buffer in readdir response\n");
ret = -ENOMEM;
goto out_free_op;
}
bytes_decoded = decode_dirents(dents_buf, new_op->downcall.trailer_size,
&readdir_response);
if (bytes_decoded < 0) {
ret = bytes_decoded;
gossip_err("Could not decode readdir from buffer %d\n", ret);
goto out_vfree;
}
if (bytes_decoded != new_op->downcall.trailer_size) { dentry = file->f_path.dentry;
gossip_err("orangefs_readdir: # bytes decoded (%ld) " oi = ORANGEFS_I(dentry->d_inode);
"!= trailer size (%ld)\n", od = file->private_data;
bytes_decoded,
(long)new_op->downcall.trailer_size);
ret = -EINVAL;
goto out_destroy_handle;
}
/* if (od->error)
* orangefs doesn't actually store dot and dot-dot, but return od->error;
* we need to have them represented.
*/
if (pos == 0) {
ino = get_ino_from_khandle(dentry->d_inode);
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: calling dir_emit of \".\" with pos = %llu\n",
__func__,
llu(pos));
ret = dir_emit(ctx, ".", 1, ino, DT_DIR);
pos += 1;
}
if (pos == 1) { if (ctx->pos == 0) {
ino = get_parent_ino_from_dentry(dentry); if (!dir_emit_dot(file, ctx))
gossip_debug(GOSSIP_DIR_DEBUG, return 0;
"%s: calling dir_emit of \"..\" with pos = %llu\n", ctx->pos++;
__func__,
llu(pos));
ret = dir_emit(ctx, "..", 2, ino, DT_DIR);
pos += 1;
} }
if (ctx->pos == 1) {
/* if (!dir_emit_dotdot(file, ctx))
* we stored ORANGEFS_ITERATE_NEXT in ctx->pos last time around return 0;
* to prevent "finding" dot and dot-dot on any iteration
* other than the first.
*/
if (ctx->pos == ORANGEFS_ITERATE_NEXT)
ctx->pos = 0;
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: dirent_outcount:%d:\n",
__func__,
readdir_response.orangefs_dirent_outcount);
for (i = ctx->pos;
i < readdir_response.orangefs_dirent_outcount;
i++) {
len = readdir_response.dirent_array[i].d_length;
current_entry = readdir_response.dirent_array[i].d_name;
current_ino = orangefs_khandle_to_ino(
&readdir_response.dirent_array[i].khandle);
gossip_debug(GOSSIP_DIR_DEBUG,
"calling dir_emit for %s with len %d"
", ctx->pos %ld\n",
current_entry,
len,
(unsigned long)ctx->pos);
/*
* type is unknown. We don't return object type
* in the dirent_array. This leaves getdents
* clueless about type.
*/
ret =
dir_emit(ctx, current_entry, len, current_ino, DT_UNKNOWN);
if (!ret)
break;
ctx->pos++; ctx->pos++;
gossip_debug(GOSSIP_DIR_DEBUG,
"%s: ctx->pos:%lld\n",
__func__,
lld(ctx->pos));
} }
/* r = 0;
* we ran all the way through the last batch, set up for
* getting another batch... if (od->i < od->len) {
*/ r = orangefs_dir_fill(oi, od, dentry, ctx);
if (ret) { if (r)
*ptoken = readdir_response.token; return r;
ctx->pos = ORANGEFS_ITERATE_NEXT;
} }
/* if (od->token != ORANGEFS_READDIR_END) {
* Did we hit the end of the directory? r = orangefs_dir_more(oi, od, dentry);
*/ if (r)
if (readdir_response.token == ORANGEFS_READDIR_END) { return r;
gossip_debug(GOSSIP_DIR_DEBUG, r = orangefs_dir_fill(oi, od, dentry, ctx);
"End of dir detected; setting ctx->pos to ORANGEFS_READDIR_END.\n");
ctx->pos = ORANGEFS_READDIR_END;
} }
out_destroy_handle: return r;
/* kfree(NULL) is safe */
kfree(readdir_response.dirent_array);
out_vfree:
gossip_debug(GOSSIP_DIR_DEBUG, "vfree %p\n", dents_buf);
vfree(dents_buf);
out_free_op:
op_release(new_op);
gossip_debug(GOSSIP_DIR_DEBUG, "orangefs_readdir returning %d\n", ret);
return ret;
} }
static int orangefs_dir_open(struct inode *inode, struct file *file) static int orangefs_dir_open(struct inode *inode, struct file *file)
{ {
__u64 *ptoken; struct orangefs_dir *od;
file->private_data = kmalloc(sizeof(struct orangefs_dir),
file->private_data = kmalloc(sizeof(__u64), GFP_KERNEL); GFP_KERNEL);
if (!file->private_data) if (!file->private_data)
return -ENOMEM; return -ENOMEM;
od = file->private_data;
ptoken = file->private_data; od->token = ORANGEFS_READDIR_START;
*ptoken = ORANGEFS_READDIR_START; /*
* XXX: It seems wasteful to allocate such a large buffer for
* each request. Most will be much smaller.
*/
od->directory = alloc_pages_exact(MAX_DIRECTORY, GFP_KERNEL);
if (!od->directory) {
kfree(file->private_data);
return -ENOMEM;
}
od->i = 0;
od->len = 0;
od->error = 0;
return 0; return 0;
} }
static int orangefs_dir_release(struct inode *inode, struct file *file) static int orangefs_dir_release(struct inode *inode, struct file *file)
{ {
struct orangefs_dir *od = file->private_data;
orangefs_flush_inode(inode); orangefs_flush_inode(inode);
kfree(file->private_data); free_pages_exact(od->directory, MAX_DIRECTORY);
kfree(od);
return 0; return 0;
} }
/** ORANGEFS implementation of VFS directory operations */
const struct file_operations orangefs_dir_operations = { const struct file_operations orangefs_dir_operations = {
.read = generic_read_dir, .read = generic_read_dir,
.iterate = orangefs_readdir, .iterate = orangefs_dir_iterate,
.open = orangefs_dir_open, .open = orangefs_dir_open,
.release = orangefs_dir_release, .release = orangefs_dir_release
}; };
...@@ -40,16 +40,6 @@ struct orangefs_mkdir_response { ...@@ -40,16 +40,6 @@ struct orangefs_mkdir_response {
struct orangefs_object_kref refn; struct orangefs_object_kref refn;
}; };
/*
* duplication of some system interface structures so that I don't have
* to allocate extra memory
*/
struct orangefs_dirent {
char *d_name;
int d_length;
struct orangefs_khandle khandle;
};
struct orangefs_statfs_response { struct orangefs_statfs_response {
__s64 block_size; __s64 block_size;
__s64 blocks_total; __s64 blocks_total;
...@@ -131,12 +121,16 @@ struct orangefs_downcall_s { ...@@ -131,12 +121,16 @@ struct orangefs_downcall_s {
} resp; } resp;
}; };
/*
* The readdir response comes in the trailer. It is followed by the
* directory entries as described in dir.c.
*/
struct orangefs_readdir_response_s { struct orangefs_readdir_response_s {
__u64 token; __u64 token;
__u64 directory_version; __u64 directory_version;
__u32 __pad2; __u32 __pad2;
__u32 orangefs_dirent_outcount; __u32 orangefs_dirent_outcount;
struct orangefs_dirent *dirent_array;
}; };
#endif /* __DOWNCALL_H */ #endif /* __DOWNCALL_H */
...@@ -52,12 +52,7 @@ ...@@ -52,12 +52,7 @@
*/ */
#define ORANGEFS_MAX_DEBUG_STRING_LEN 0x00000800 #define ORANGEFS_MAX_DEBUG_STRING_LEN 0x00000800
/* #define ORANGEFS_MAX_DIRENT_COUNT_READDIR 512
* The maximum number of directory entries in a single request is 96.
* XXX: Why can this not be higher. The client-side code can handle up to 512.
* XXX: What happens if we expect more than the client can return?
*/
#define ORANGEFS_MAX_DIRENT_COUNT_READDIR 96
#include "upcall.h" #include "upcall.h"
#include "downcall.h" #include "downcall.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment