Commit b1e2d907 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'upstream-linus' of git://oss.oracle.com/home/sourcebo/git/ocfs2

parents e0a5c578 81f2094a
...@@ -74,6 +74,7 @@ struct mlog_attribute { ...@@ -74,6 +74,7 @@ struct mlog_attribute {
#define define_mask(_name) { \ #define define_mask(_name) { \
.attr = { \ .attr = { \
.name = #_name, \ .name = #_name, \
.owner = THIS_MODULE, \
.mode = S_IRUGO | S_IWUSR, \ .mode = S_IRUGO | S_IWUSR, \
}, \ }, \
.mask = ML_##_name, \ .mask = ML_##_name, \
......
...@@ -256,7 +256,7 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits; ...@@ -256,7 +256,7 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits;
} \ } \
} while (0) } while (0)
#if (BITS_PER_LONG == 32) || defined(CONFIG_X86_64) #if (BITS_PER_LONG == 32) || defined(CONFIG_X86_64) || (defined(CONFIG_UML_X86) && defined(CONFIG_64BIT))
#define MLFi64 "lld" #define MLFi64 "lld"
#define MLFu64 "llu" #define MLFu64 "llu"
#define MLFx64 "llx" #define MLFx64 "llx"
......
...@@ -756,7 +756,7 @@ static int __init init_o2nm(void) ...@@ -756,7 +756,7 @@ static int __init init_o2nm(void)
if (!ocfs2_table_header) { if (!ocfs2_table_header) {
printk(KERN_ERR "nodemanager: unable to register sysctl\n"); printk(KERN_ERR "nodemanager: unable to register sysctl\n");
ret = -ENOMEM; /* or something. */ ret = -ENOMEM; /* or something. */
goto out; goto out_o2net;
} }
ret = o2net_register_hb_callbacks(); ret = o2net_register_hb_callbacks();
...@@ -780,6 +780,8 @@ static int __init init_o2nm(void) ...@@ -780,6 +780,8 @@ static int __init init_o2nm(void)
o2net_unregister_hb_callbacks(); o2net_unregister_hb_callbacks();
out_sysctl: out_sysctl:
unregister_sysctl_table(ocfs2_table_header); unregister_sysctl_table(ocfs2_table_header);
out_o2net:
o2net_exit();
out: out:
return ret; return ret;
} }
......
...@@ -1318,7 +1318,7 @@ static void o2net_start_connect(void *arg) ...@@ -1318,7 +1318,7 @@ static void o2net_start_connect(void *arg)
{ {
struct o2net_node *nn = arg; struct o2net_node *nn = arg;
struct o2net_sock_container *sc = NULL; struct o2net_sock_container *sc = NULL;
struct o2nm_node *node = NULL; struct o2nm_node *node = NULL, *mynode = NULL;
struct socket *sock = NULL; struct socket *sock = NULL;
struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
int ret = 0; int ret = 0;
...@@ -1334,6 +1334,12 @@ static void o2net_start_connect(void *arg) ...@@ -1334,6 +1334,12 @@ static void o2net_start_connect(void *arg)
goto out; goto out;
} }
mynode = o2nm_get_node_by_num(o2nm_this_node());
if (mynode == NULL) {
ret = 0;
goto out;
}
spin_lock(&nn->nn_lock); spin_lock(&nn->nn_lock);
/* see if we already have one pending or have given up */ /* see if we already have one pending or have given up */
if (nn->nn_sc || nn->nn_persistent_error) if (nn->nn_sc || nn->nn_persistent_error)
...@@ -1361,12 +1367,14 @@ static void o2net_start_connect(void *arg) ...@@ -1361,12 +1367,14 @@ static void o2net_start_connect(void *arg)
sock->sk->sk_allocation = GFP_ATOMIC; sock->sk->sk_allocation = GFP_ATOMIC;
myaddr.sin_family = AF_INET; myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = (__force u32)mynode->nd_ipv4_address;
myaddr.sin_port = (__force u16)htons(0); /* any port */ myaddr.sin_port = (__force u16)htons(0); /* any port */
ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr, ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
sizeof(myaddr)); sizeof(myaddr));
if (ret) { if (ret) {
mlog(0, "bind failed: %d\n", ret); mlog(ML_ERROR, "bind failed with %d at address %u.%u.%u.%u\n",
ret, NIPQUAD(mynode->nd_ipv4_address));
goto out; goto out;
} }
...@@ -1407,6 +1415,8 @@ static void o2net_start_connect(void *arg) ...@@ -1407,6 +1415,8 @@ static void o2net_start_connect(void *arg)
sc_put(sc); sc_put(sc);
if (node) if (node)
o2nm_node_put(node); o2nm_node_put(node);
if (mynode)
o2nm_node_put(mynode);
return; return;
} }
......
...@@ -85,13 +85,10 @@ enum { ...@@ -85,13 +85,10 @@ enum {
O2NET_DRIVER_READY, O2NET_DRIVER_READY,
}; };
int o2net_init_tcp_sock(struct inode *inode);
int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len, int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
u8 target_node, int *status); u8 target_node, int *status);
int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec, int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
size_t veclen, u8 target_node, int *status); size_t veclen, u8 target_node, int *status);
int o2net_broadcast_message(u32 msg_type, u32 key, void *data, u32 len,
struct inode *group);
int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
o2net_msg_handler_func *func, void *data, o2net_msg_handler_func *func, void *data,
...@@ -107,7 +104,5 @@ void o2net_disconnect_node(struct o2nm_node *node); ...@@ -107,7 +104,5 @@ void o2net_disconnect_node(struct o2nm_node *node);
int o2net_init(void); int o2net_init(void);
void o2net_exit(void); void o2net_exit(void);
int o2net_proc_init(struct proc_dir_entry *parent);
void o2net_proc_exit(struct proc_dir_entry *parent);
#endif /* O2CLUSTER_TCP_H */ #endif /* O2CLUSTER_TCP_H */
...@@ -37,9 +37,7 @@ ...@@ -37,9 +37,7 @@
#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes #define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes
#define DLM_THREAD_MS 200 // flush at least every 200 ms #define DLM_THREAD_MS 200 // flush at least every 200 ms
#define DLM_HASH_BITS 7 #define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head))
#define DLM_HASH_SIZE (1 << DLM_HASH_BITS)
#define DLM_HASH_MASK (DLM_HASH_SIZE - 1)
enum dlm_ast_type { enum dlm_ast_type {
DLM_AST = 0, DLM_AST = 0,
...@@ -87,7 +85,7 @@ enum dlm_ctxt_state { ...@@ -87,7 +85,7 @@ enum dlm_ctxt_state {
struct dlm_ctxt struct dlm_ctxt
{ {
struct list_head list; struct list_head list;
struct list_head *resources; struct hlist_head *lockres_hash;
struct list_head dirty_list; struct list_head dirty_list;
struct list_head purge_list; struct list_head purge_list;
struct list_head pending_asts; struct list_head pending_asts;
...@@ -217,7 +215,7 @@ struct dlm_lock_resource ...@@ -217,7 +215,7 @@ struct dlm_lock_resource
{ {
/* WARNING: Please see the comment in dlm_init_lockres before /* WARNING: Please see the comment in dlm_init_lockres before
* adding fields here. */ * adding fields here. */
struct list_head list; struct hlist_node hash_node;
struct kref refs; struct kref refs;
/* please keep these next 3 in this order /* please keep these next 3 in this order
......
...@@ -117,8 +117,8 @@ EXPORT_SYMBOL_GPL(dlm_print_one_lock); ...@@ -117,8 +117,8 @@ EXPORT_SYMBOL_GPL(dlm_print_one_lock);
void dlm_dump_lock_resources(struct dlm_ctxt *dlm) void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
{ {
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
struct list_head *iter; struct hlist_node *iter;
struct list_head *bucket; struct hlist_head *bucket;
int i; int i;
mlog(ML_NOTICE, "struct dlm_ctxt: %s, node=%u, key=%u\n", mlog(ML_NOTICE, "struct dlm_ctxt: %s, node=%u, key=%u\n",
...@@ -129,13 +129,11 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm) ...@@ -129,13 +129,11 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
} }
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
for (i=0; i<DLM_HASH_SIZE; i++) { for (i=0; i<DLM_HASH_BUCKETS; i++) {
bucket = &(dlm->resources[i]); bucket = &(dlm->lockres_hash[i]);
list_for_each(iter, bucket) { hlist_for_each_entry(res, iter, bucket, hash_node)
res = list_entry(iter, struct dlm_lock_resource, list);
dlm_print_one_lock_resource(res); dlm_print_one_lock_resource(res);
} }
}
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
} }
......
...@@ -77,26 +77,26 @@ static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); ...@@ -77,26 +77,26 @@ static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
{ {
list_del_init(&lockres->list); hlist_del_init(&lockres->hash_node);
dlm_lockres_put(lockres); dlm_lockres_put(lockres);
} }
void __dlm_insert_lockres(struct dlm_ctxt *dlm, void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res) struct dlm_lock_resource *res)
{ {
struct list_head *bucket; struct hlist_head *bucket;
struct qstr *q; struct qstr *q;
assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->spinlock);
q = &res->lockname; q = &res->lockname;
q->hash = full_name_hash(q->name, q->len); q->hash = full_name_hash(q->name, q->len);
bucket = &(dlm->resources[q->hash & DLM_HASH_MASK]); bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
/* get a reference for our hashtable */ /* get a reference for our hashtable */
dlm_lockres_get(res); dlm_lockres_get(res);
list_add_tail(&res->list, bucket); hlist_add_head(&res->hash_node, bucket);
} }
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
...@@ -104,9 +104,9 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, ...@@ -104,9 +104,9 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
unsigned int len) unsigned int len)
{ {
unsigned int hash; unsigned int hash;
struct list_head *iter; struct hlist_node *iter;
struct dlm_lock_resource *tmpres=NULL; struct dlm_lock_resource *tmpres=NULL;
struct list_head *bucket; struct hlist_head *bucket;
mlog_entry("%.*s\n", len, name); mlog_entry("%.*s\n", len, name);
...@@ -114,11 +114,11 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, ...@@ -114,11 +114,11 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
hash = full_name_hash(name, len); hash = full_name_hash(name, len);
bucket = &(dlm->resources[hash & DLM_HASH_MASK]); bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
/* check for pre-existing lock */ /* check for pre-existing lock */
list_for_each(iter, bucket) { hlist_for_each(iter, bucket) {
tmpres = list_entry(iter, struct dlm_lock_resource, list); tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
if (tmpres->lockname.len == len && if (tmpres->lockname.len == len &&
memcmp(tmpres->lockname.name, name, len) == 0) { memcmp(tmpres->lockname.name, name, len) == 0) {
dlm_lockres_get(tmpres); dlm_lockres_get(tmpres);
...@@ -193,8 +193,8 @@ static int dlm_wait_on_domain_helper(const char *domain) ...@@ -193,8 +193,8 @@ static int dlm_wait_on_domain_helper(const char *domain)
static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
{ {
if (dlm->resources) if (dlm->lockres_hash)
free_page((unsigned long) dlm->resources); free_page((unsigned long) dlm->lockres_hash);
if (dlm->name) if (dlm->name)
kfree(dlm->name); kfree(dlm->name);
...@@ -303,10 +303,10 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) ...@@ -303,10 +303,10 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
mlog(0, "Migrating locks from domain %s\n", dlm->name); mlog(0, "Migrating locks from domain %s\n", dlm->name);
restart: restart:
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
for (i=0; i<DLM_HASH_SIZE; i++) { for (i = 0; i < DLM_HASH_BUCKETS; i++) {
while (!list_empty(&dlm->resources[i])) { while (!hlist_empty(&dlm->lockres_hash[i])) {
res = list_entry(dlm->resources[i].next, res = hlist_entry(dlm->lockres_hash[i].first,
struct dlm_lock_resource, list); struct dlm_lock_resource, hash_node);
/* need reference when manually grabbing lockres */ /* need reference when manually grabbing lockres */
dlm_lockres_get(res); dlm_lockres_get(res);
/* this should unhash the lockres /* this should unhash the lockres
...@@ -1191,18 +1191,17 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, ...@@ -1191,18 +1191,17 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
goto leave; goto leave;
} }
dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL); dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
if (!dlm->resources) { if (!dlm->lockres_hash) {
mlog_errno(-ENOMEM); mlog_errno(-ENOMEM);
kfree(dlm->name); kfree(dlm->name);
kfree(dlm); kfree(dlm);
dlm = NULL; dlm = NULL;
goto leave; goto leave;
} }
memset(dlm->resources, 0, PAGE_SIZE);
for (i=0; i<DLM_HASH_SIZE; i++) for (i=0; i<DLM_HASH_BUCKETS; i++)
INIT_LIST_HEAD(&dlm->resources[i]); INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
strcpy(dlm->name, domain); strcpy(dlm->name, domain);
dlm->key = key; dlm->key = key;
......
...@@ -564,7 +564,7 @@ static void dlm_lockres_release(struct kref *kref) ...@@ -564,7 +564,7 @@ static void dlm_lockres_release(struct kref *kref)
/* By the time we're ready to blow this guy away, we shouldn't /* By the time we're ready to blow this guy away, we shouldn't
* be on any lists. */ * be on any lists. */
BUG_ON(!list_empty(&res->list)); BUG_ON(!hlist_unhashed(&res->hash_node));
BUG_ON(!list_empty(&res->granted)); BUG_ON(!list_empty(&res->granted));
BUG_ON(!list_empty(&res->converting)); BUG_ON(!list_empty(&res->converting));
BUG_ON(!list_empty(&res->blocked)); BUG_ON(!list_empty(&res->blocked));
...@@ -605,7 +605,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, ...@@ -605,7 +605,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
init_waitqueue_head(&res->wq); init_waitqueue_head(&res->wq);
spin_lock_init(&res->spinlock); spin_lock_init(&res->spinlock);
INIT_LIST_HEAD(&res->list); INIT_HLIST_NODE(&res->hash_node);
INIT_LIST_HEAD(&res->granted); INIT_LIST_HEAD(&res->granted);
INIT_LIST_HEAD(&res->converting); INIT_LIST_HEAD(&res->converting);
INIT_LIST_HEAD(&res->blocked); INIT_LIST_HEAD(&res->blocked);
......
...@@ -1693,7 +1693,10 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, ...@@ -1693,7 +1693,10 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
u8 dead_node, u8 new_master) u8 dead_node, u8 new_master)
{ {
int i; int i;
struct list_head *iter, *iter2, *bucket; struct list_head *iter, *iter2;
struct hlist_node *hash_iter;
struct hlist_head *bucket;
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
mlog_entry_void(); mlog_entry_void();
...@@ -1717,10 +1720,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, ...@@ -1717,10 +1720,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
* for now we need to run the whole hash, clear * for now we need to run the whole hash, clear
* the RECOVERING state and set the owner * the RECOVERING state and set the owner
* if necessary */ * if necessary */
for (i=0; i<DLM_HASH_SIZE; i++) { for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = &(dlm->resources[i]); bucket = &(dlm->lockres_hash[i]);
list_for_each(iter, bucket) { hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
res = list_entry (iter, struct dlm_lock_resource, list);
if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->state & DLM_LOCK_RES_RECOVERING) {
if (res->owner == dead_node) { if (res->owner == dead_node) {
mlog(0, "(this=%u) res %.*s owner=%u " mlog(0, "(this=%u) res %.*s owner=%u "
...@@ -1852,10 +1854,10 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, ...@@ -1852,10 +1854,10 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
{ {
struct list_head *iter; struct hlist_node *iter;
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
int i; int i;
struct list_head *bucket; struct hlist_head *bucket;
struct dlm_lock *lock; struct dlm_lock *lock;
...@@ -1876,10 +1878,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -1876,10 +1878,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
* can be kicked again to see if any ASTs or BASTs * can be kicked again to see if any ASTs or BASTs
* need to be fired as a result. * need to be fired as a result.
*/ */
for (i=0; i<DLM_HASH_SIZE; i++) { for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = &(dlm->resources[i]); bucket = &(dlm->lockres_hash[i]);
list_for_each(iter, bucket) { hlist_for_each_entry(res, iter, bucket, hash_node) {
res = list_entry (iter, struct dlm_lock_resource, list);
/* always prune any $RECOVERY entries for dead nodes, /* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */ * otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name, if (dlm_is_recovery_lock(res->lockname.name,
......
...@@ -181,6 +181,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode, ...@@ -181,6 +181,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
ret = -EBADR; ret = -EBADR;
if (rec_end > OCFS2_I(inode)->ip_clusters) { if (rec_end > OCFS2_I(inode)->ip_clusters) {
mlog_errno(ret); mlog_errno(ret);
ocfs2_error(inode->i_sb,
"Extent %d at e_blkno %"MLFu64" of inode %"MLFu64" goes past ip_clusters of %u\n",
i,
le64_to_cpu(rec->e_blkno),
OCFS2_I(inode)->ip_blkno,
OCFS2_I(inode)->ip_clusters);
goto out_free; goto out_free;
} }
...@@ -226,6 +232,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode, ...@@ -226,6 +232,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
ret = -EBADR; ret = -EBADR;
if (blkno) { if (blkno) {
mlog_errno(ret); mlog_errno(ret);
ocfs2_error(inode->i_sb,
"Multiple extents for (cpos = %u, clusters = %u) on inode %"MLFu64"; e_blkno %"MLFu64" and rec %d at e_blkno %"MLFu64"\n",
cpos, clusters,
OCFS2_I(inode)->ip_blkno,
blkno, i,
le64_to_cpu(rec->e_blkno));
goto out_free; goto out_free;
} }
...@@ -238,6 +250,10 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode, ...@@ -238,6 +250,10 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
*/ */
ret = -EBADR; ret = -EBADR;
if (!blkno) { if (!blkno) {
ocfs2_error(inode->i_sb,
"No record found for (cpos = %u, clusters = %u) on inode %"MLFu64"\n",
cpos, clusters,
OCFS2_I(inode)->ip_blkno);
mlog_errno(ret); mlog_errno(ret);
goto out_free; goto out_free;
} }
...@@ -266,6 +282,20 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode, ...@@ -266,6 +282,20 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
for (i = 0; i < le16_to_cpu(el->l_next_free_rec); i++) { for (i = 0; i < le16_to_cpu(el->l_next_free_rec); i++) {
rec = &el->l_recs[i]; rec = &el->l_recs[i];
if ((le32_to_cpu(rec->e_cpos) + le32_to_cpu(rec->e_clusters)) >
OCFS2_I(inode)->ip_clusters) {
ret = -EBADR;
mlog_errno(ret);
ocfs2_error(inode->i_sb,
"Extent %d at e_blkno %"MLFu64" of inode %"MLFu64" goes past ip_clusters of %u\n",
i,
le64_to_cpu(rec->e_blkno),
OCFS2_I(inode)->ip_blkno,
OCFS2_I(inode)->ip_clusters);
return ret;
}
ret = ocfs2_extent_map_insert(inode, rec, ret = ocfs2_extent_map_insert(inode, rec,
le16_to_cpu(el->l_tree_depth)); le16_to_cpu(el->l_tree_depth));
if (ret) { if (ret) {
...@@ -526,6 +556,10 @@ static int ocfs2_extent_map_insert(struct inode *inode, ...@@ -526,6 +556,10 @@ static int ocfs2_extent_map_insert(struct inode *inode,
OCFS2_I(inode)->ip_map.em_clusters) { OCFS2_I(inode)->ip_map.em_clusters) {
ret = -EBADR; ret = -EBADR;
mlog_errno(ret); mlog_errno(ret);
ocfs2_error(inode->i_sb,
"Zero e_clusters on non-tail extent record at e_blkno %"MLFu64" on inode %"MLFu64"\n",
le64_to_cpu(rec->e_blkno),
OCFS2_I(inode)->ip_blkno);
return ret; return ret;
} }
......
...@@ -933,9 +933,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb, ...@@ -933,9 +933,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
struct file *filp = iocb->ki_filp; struct file *filp = iocb->ki_filp;
struct inode *inode = filp->f_dentry->d_inode; struct inode *inode = filp->f_dentry->d_inode;
loff_t newsize, saved_pos; loff_t newsize, saved_pos;
#ifdef OCFS2_ORACORE_WORKAROUNDS
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
#endif
mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf, mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf,
(unsigned int)count, (unsigned int)count,
...@@ -951,14 +948,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb, ...@@ -951,14 +948,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
return -EIO; return -EIO;
} }
#ifdef OCFS2_ORACORE_WORKAROUNDS
/* ugh, work around some applications which open everything O_DIRECT +
* O_APPEND and really don't mean to use O_DIRECT. */
if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS &&
(filp->f_flags & O_APPEND) && (filp->f_flags & O_DIRECT))
filp->f_flags &= ~O_DIRECT;
#endif
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
/* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */ /* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */
if (filp->f_flags & O_DIRECT) { if (filp->f_flags & O_DIRECT) {
...@@ -1079,27 +1068,7 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb, ...@@ -1079,27 +1068,7 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
/* communicate with ocfs2_dio_end_io */ /* communicate with ocfs2_dio_end_io */
ocfs2_iocb_set_rw_locked(iocb); ocfs2_iocb_set_rw_locked(iocb);
#ifdef OCFS2_ORACORE_WORKAROUNDS ret = generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos);
if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS &&
filp->f_flags & O_DIRECT) {
unsigned int saved_flags = filp->f_flags;
int sector_size = 1 << osb->s_sectsize_bits;
if ((saved_pos & (sector_size - 1)) ||
(count & (sector_size - 1)) ||
((unsigned long)buf & (sector_size - 1))) {
filp->f_flags |= O_SYNC;
filp->f_flags &= ~O_DIRECT;
}
ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
&iocb->ki_pos);
filp->f_flags = saved_flags;
} else
#endif
ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
&iocb->ki_pos);
/* buffered aio wouldn't have proper lock coverage today */ /* buffered aio wouldn't have proper lock coverage today */
BUG_ON(ret == -EIOCBQUEUED && !(filp->f_flags & O_DIRECT)); BUG_ON(ret == -EIOCBQUEUED && !(filp->f_flags & O_DIRECT));
...@@ -1140,9 +1109,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb, ...@@ -1140,9 +1109,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
int ret = 0, rw_level = -1, have_alloc_sem = 0; int ret = 0, rw_level = -1, have_alloc_sem = 0;
struct file *filp = iocb->ki_filp; struct file *filp = iocb->ki_filp;
struct inode *inode = filp->f_dentry->d_inode; struct inode *inode = filp->f_dentry->d_inode;
#ifdef OCFS2_ORACORE_WORKAROUNDS
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
#endif
mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf, mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf,
(unsigned int)count, (unsigned int)count,
...@@ -1155,21 +1121,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb, ...@@ -1155,21 +1121,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
goto bail; goto bail;
} }
#ifdef OCFS2_ORACORE_WORKAROUNDS
if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS) {
if (filp->f_flags & O_DIRECT) {
int sector_size = 1 << osb->s_sectsize_bits;
if ((pos & (sector_size - 1)) ||
(count & (sector_size - 1)) ||
((unsigned long)buf & (sector_size - 1)) ||
(i_size_read(inode) & (sector_size -1))) {
filp->f_flags &= ~O_DIRECT;
}
}
}
#endif
/* /*
* buffered reads protect themselves in ->readpage(). O_DIRECT reads * buffered reads protect themselves in ->readpage(). O_DIRECT reads
* need locks to protect pending reads from racing with truncate. * need locks to protect pending reads from racing with truncate.
......
...@@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb) ...@@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
ocfs2_node_map_init(&osb->mounted_map); ocfs2_node_map_init(&osb->mounted_map);
ocfs2_node_map_init(&osb->recovery_map); ocfs2_node_map_init(&osb->recovery_map);
ocfs2_node_map_init(&osb->umount_map); ocfs2_node_map_init(&osb->umount_map);
ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
} }
static void ocfs2_do_node_down(int node_num, static void ocfs2_do_node_down(int node_num,
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "dlmglue.h" #include "dlmglue.h"
#include "extent_map.h" #include "extent_map.h"
#include "file.h" #include "file.h"
#include "heartbeat.h"
#include "inode.h" #include "inode.h"
#include "journal.h" #include "journal.h"
#include "namei.h" #include "namei.h"
...@@ -544,6 +545,42 @@ static int ocfs2_remove_inode(struct inode *inode, ...@@ -544,6 +545,42 @@ static int ocfs2_remove_inode(struct inode *inode,
return status; return status;
} }
/*
* Serialize with orphan dir recovery. If the process doing
* recovery on this orphan dir does an iget() with the dir
* i_mutex held, we'll deadlock here. Instead we detect this
* and exit early - recovery will wipe this inode for us.
*/
static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb,
int slot)
{
int ret = 0;
spin_lock(&osb->osb_lock);
if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) {
mlog(0, "Recovery is happening on orphan dir %d, will skip "
"this inode\n", slot);
ret = -EDEADLK;
goto out;
}
/* This signals to the orphan recovery process that it should
* wait for us to handle the wipe. */
osb->osb_orphan_wipes[slot]++;
out:
spin_unlock(&osb->osb_lock);
return ret;
}
static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb,
int slot)
{
spin_lock(&osb->osb_lock);
osb->osb_orphan_wipes[slot]--;
spin_unlock(&osb->osb_lock);
wake_up(&osb->osb_wipe_event);
}
static int ocfs2_wipe_inode(struct inode *inode, static int ocfs2_wipe_inode(struct inode *inode,
struct buffer_head *di_bh) struct buffer_head *di_bh)
{ {
...@@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode, ...@@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode,
/* We've already voted on this so it should be readonly - no /* We've already voted on this so it should be readonly - no
* spinlock needed. */ * spinlock needed. */
orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot; orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot);
if (status)
return status;
orphan_dir_inode = ocfs2_get_system_file_inode(osb, orphan_dir_inode = ocfs2_get_system_file_inode(osb,
ORPHAN_DIR_SYSTEM_INODE, ORPHAN_DIR_SYSTEM_INODE,
orphaned_slot); orphaned_slot);
...@@ -597,6 +639,7 @@ static int ocfs2_wipe_inode(struct inode *inode, ...@@ -597,6 +639,7 @@ static int ocfs2_wipe_inode(struct inode *inode,
brelse(orphan_dir_bh); brelse(orphan_dir_bh);
bail: bail:
iput(orphan_dir_inode); iput(orphan_dir_inode);
ocfs2_signal_wipe_completion(osb, orphaned_slot);
return status; return status;
} }
...@@ -822,6 +865,7 @@ void ocfs2_delete_inode(struct inode *inode) ...@@ -822,6 +865,7 @@ void ocfs2_delete_inode(struct inode *inode)
status = ocfs2_wipe_inode(inode, di_bh); status = ocfs2_wipe_inode(inode, di_bh);
if (status < 0) { if (status < 0) {
if (status != -EDEADLK)
mlog_errno(status); mlog_errno(status);
goto bail_unlock_inode; goto bail_unlock_inode;
} }
......
...@@ -1408,21 +1408,17 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) ...@@ -1408,21 +1408,17 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
return status; return status;
} }
static int ocfs2_recover_orphans(struct ocfs2_super *osb, static int ocfs2_queue_orphans(struct ocfs2_super *osb,
int slot) int slot,
struct inode **head)
{ {
int status = 0; int status;
int have_disk_lock = 0;
struct inode *inode = NULL;
struct inode *iter;
struct inode *orphan_dir_inode = NULL; struct inode *orphan_dir_inode = NULL;
struct inode *iter;
unsigned long offset, blk, local; unsigned long offset, blk, local;
struct buffer_head *bh = NULL; struct buffer_head *bh = NULL;
struct ocfs2_dir_entry *de; struct ocfs2_dir_entry *de;
struct super_block *sb = osb->sb; struct super_block *sb = osb->sb;
struct ocfs2_inode_info *oi;
mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
orphan_dir_inode = ocfs2_get_system_file_inode(osb, orphan_dir_inode = ocfs2_get_system_file_inode(osb,
ORPHAN_DIR_SYSTEM_INODE, ORPHAN_DIR_SYSTEM_INODE,
...@@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, ...@@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
if (!orphan_dir_inode) { if (!orphan_dir_inode) {
status = -ENOENT; status = -ENOENT;
mlog_errno(status); mlog_errno(status);
goto out; return status;
} }
mutex_lock(&orphan_dir_inode->i_mutex); mutex_lock(&orphan_dir_inode->i_mutex);
status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0); status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
if (status < 0) { if (status < 0) {
mutex_unlock(&orphan_dir_inode->i_mutex);
mlog_errno(status); mlog_errno(status);
goto out; goto out;
} }
have_disk_lock = 1;
offset = 0; offset = 0;
iter = NULL; iter = NULL;
...@@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, ...@@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
if (!bh) if (!bh)
status = -EINVAL; status = -EINVAL;
if (status < 0) { if (status < 0) {
mutex_unlock(&orphan_dir_inode->i_mutex);
if (bh) if (bh)
brelse(bh); brelse(bh);
mlog_errno(status); mlog_errno(status);
goto out; goto out_unlock;
} }
local = 0; local = 0;
...@@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, ...@@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
if (!ocfs2_check_dir_entry(orphan_dir_inode, if (!ocfs2_check_dir_entry(orphan_dir_inode,
de, bh, local)) { de, bh, local)) {
mutex_unlock(&orphan_dir_inode->i_mutex);
status = -EINVAL; status = -EINVAL;
mlog_errno(status); mlog_errno(status);
brelse(bh); brelse(bh);
goto out; goto out_unlock;
} }
local += le16_to_cpu(de->rec_len); local += le16_to_cpu(de->rec_len);
...@@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, ...@@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
mlog(0, "queue orphan %"MLFu64"\n", mlog(0, "queue orphan %"MLFu64"\n",
OCFS2_I(iter)->ip_blkno); OCFS2_I(iter)->ip_blkno);
OCFS2_I(iter)->ip_next_orphan = inode; /* No locking is required for the next_orphan
inode = iter; * queue as there is only ever a single
* process doing orphan recovery. */
OCFS2_I(iter)->ip_next_orphan = *head;
*head = iter;
} }
brelse(bh); brelse(bh);
} }
mutex_unlock(&orphan_dir_inode->i_mutex);
out_unlock:
ocfs2_meta_unlock(orphan_dir_inode, 0); ocfs2_meta_unlock(orphan_dir_inode, 0);
have_disk_lock = 0; out:
mutex_unlock(&orphan_dir_inode->i_mutex);
iput(orphan_dir_inode); iput(orphan_dir_inode);
orphan_dir_inode = NULL; return status;
}
static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
int slot)
{
int ret;
spin_lock(&osb->osb_lock);
ret = !osb->osb_orphan_wipes[slot];
spin_unlock(&osb->osb_lock);
return ret;
}
static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
int slot)
{
spin_lock(&osb->osb_lock);
/* Mark ourselves such that new processes in delete_inode()
* know to quit early. */
ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
while (osb->osb_orphan_wipes[slot]) {
/* If any processes are already in the middle of an
* orphan wipe on this dir, then we need to wait for
* them. */
spin_unlock(&osb->osb_lock);
wait_event_interruptible(osb->osb_wipe_event,
ocfs2_orphan_recovery_can_continue(osb, slot));
spin_lock(&osb->osb_lock);
}
spin_unlock(&osb->osb_lock);
}
static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
int slot)
{
ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
}
/*
* Orphan recovery. Each mounted node has it's own orphan dir which we
* must run during recovery. Our strategy here is to build a list of
* the inodes in the orphan dir and iget/iput them. The VFS does
* (most) of the rest of the work.
*
* Orphan recovery can happen at any time, not just mount so we have a
* couple of extra considerations.
*
* - We grab as many inodes as we can under the orphan dir lock -
* doing iget() outside the orphan dir risks getting a reference on
* an invalid inode.
* - We must be sure not to deadlock with other processes on the
* system wanting to run delete_inode(). This can happen when they go
* to lock the orphan dir and the orphan recovery process attempts to
* iget() inside the orphan dir lock. This can be avoided by
* advertising our state to ocfs2_delete_inode().
*/
static int ocfs2_recover_orphans(struct ocfs2_super *osb,
int slot)
{
int ret = 0;
struct inode *inode = NULL;
struct inode *iter;
struct ocfs2_inode_info *oi;
mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
ocfs2_mark_recovering_orphan_dir(osb, slot);
ret = ocfs2_queue_orphans(osb, slot, &inode);
ocfs2_clear_recovering_orphan_dir(osb, slot);
/* Error here should be noted, but we want to continue with as
* many queued inodes as we've got. */
if (ret)
mlog_errno(ret);
while (inode) { while (inode) {
oi = OCFS2_I(inode); oi = OCFS2_I(inode);
...@@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, ...@@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
inode = iter; inode = iter;
} }
out: return ret;
if (have_disk_lock)
ocfs2_meta_unlock(orphan_dir_inode, 0);
if (orphan_dir_inode)
iput(orphan_dir_inode);
return status;
} }
static int ocfs2_wait_on_mount(struct ocfs2_super *osb) static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
......
...@@ -174,9 +174,6 @@ enum ocfs2_mount_options ...@@ -174,9 +174,6 @@ enum ocfs2_mount_options
OCFS2_MOUNT_NOINTR = 1 << 2, /* Don't catch signals */ OCFS2_MOUNT_NOINTR = 1 << 2, /* Don't catch signals */
OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */ OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */
OCFS2_MOUNT_DATA_WRITEBACK = 1 << 4, /* No data ordering */ OCFS2_MOUNT_DATA_WRITEBACK = 1 << 4, /* No data ordering */
#ifdef OCFS2_ORACORE_WORKAROUNDS
OCFS2_MOUNT_COMPAT_OCFS = 1 << 30, /* ocfs1 compatibility mode */
#endif
}; };
#define OCFS2_OSB_SOFT_RO 0x0001 #define OCFS2_OSB_SOFT_RO 0x0001
...@@ -290,6 +287,10 @@ struct ocfs2_super ...@@ -290,6 +287,10 @@ struct ocfs2_super
struct inode *osb_tl_inode; struct inode *osb_tl_inode;
struct buffer_head *osb_tl_bh; struct buffer_head *osb_tl_bh;
struct work_struct osb_truncate_log_wq; struct work_struct osb_truncate_log_wq;
struct ocfs2_node_map osb_recovering_orphan_dirs;
unsigned int *osb_orphan_wipes;
wait_queue_head_t osb_wipe_event;
}; };
#define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info) #define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info)
......
...@@ -138,7 +138,6 @@ ...@@ -138,7 +138,6 @@
/* Journal limits (in bytes) */ /* Journal limits (in bytes) */
#define OCFS2_MIN_JOURNAL_SIZE (4 * 1024 * 1024) #define OCFS2_MIN_JOURNAL_SIZE (4 * 1024 * 1024)
#define OCFS2_MAX_JOURNAL_SIZE (500 * 1024 * 1024)
struct ocfs2_system_inode_info { struct ocfs2_system_inode_info {
char *si_name; char *si_name;
......
...@@ -1325,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb, ...@@ -1325,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb,
} }
mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots); mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots);
init_waitqueue_head(&osb->osb_wipe_event);
osb->osb_orphan_wipes = kcalloc(osb->max_slots,
sizeof(*osb->osb_orphan_wipes),
GFP_KERNEL);
if (!osb->osb_orphan_wipes) {
status = -ENOMEM;
mlog_errno(status);
goto bail;
}
osb->s_feature_compat = osb->s_feature_compat =
le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat); le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat);
osb->s_feature_ro_compat = osb->s_feature_ro_compat =
...@@ -1638,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb) ...@@ -1638,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
if (osb->slot_info) if (osb->slot_info)
ocfs2_free_slot_info(osb->slot_info); ocfs2_free_slot_info(osb->slot_info);
kfree(osb->osb_orphan_wipes);
/* FIXME /* FIXME
* This belongs in journal shutdown, but because we have to * This belongs in journal shutdown, but because we have to
* allocate osb->journal at the start of ocfs2_initalize_osb(), * allocate osb->journal at the start of ocfs2_initalize_osb(),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment