Commit 030df27e authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2090 refs[t:2090] Logging for file create is done before actually creating file on disk.

We reserve a filenum before logging, and then use the reserved filenum when we actually open the cachefile.

git-svn-id: file:///svn/toku/tokudb.2037b@15645 c7de825b-a66e-492c-adef-691d508d4ae1
parent 97fa5e24
......@@ -2818,27 +2818,38 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
return 0;
}
// open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, int is_create, int *fdp, BOOL *did_create) {
// open a file for use by the brt
// Requires: File does not exist.
static int brt_create_file(BRT brt, const char *fname, int *fdp) {
brt = brt;
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
int r;
int fd;
*did_create = FALSE;
fd = open(fname, O_RDWR | O_BINARY, mode);
assert(fd==-1);
if (errno != ENOENT) {
r = errno;
return r;
}
fd = open(fname, O_RDWR | O_CREAT | O_BINARY, mode);
if (fd==-1) {
r = errno;
if (errno == ENOENT) {
if (!is_create) {
return r;
}
fd = open(fname, O_RDWR | O_CREAT | O_BINARY, mode);
if (fd == -1) {
r = errno; return r;
}
*did_create = TRUE;
} else
return r;
return r;
}
*fdp = fd;
return 0;
}
// open a file for use by the brt. if the file does not exist, error
static int brt_open_file(BRT brt, const char *fname, int *fdp) {
brt = brt;
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
int r;
int fd;
fd = open(fname, O_RDWR | O_BINARY, mode);
if (fd==-1) {
r = errno;
return r;
}
*fdp = fd;
return 0;
......@@ -3037,15 +3048,29 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
{
int fd = -1;
BOOL did_create = FALSE;
r = brt_open_file(t, fname, is_create, &fd, &did_create);
if (r != 0) goto died00;
r = brt_open_file(t, fname, &fd);
FILENUM reserved_filenum = t->filenum;
if (r==ENOENT && is_create) {
toku_cachetable_reserve_filenum(cachetable, &reserved_filenum, t->did_set_filenum, t->filenum);
if (0) {
died1:
if (did_create)
toku_cachetable_unreserve_filenum(cachetable, reserved_filenum);
goto died00;
}
if (t->did_set_filenum) assert(reserved_filenum.fileid == t->filenum.fileid);
did_create = TRUE;
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, t->flags, &(t->temp_descriptor));
if (r!=0) goto died1;
r = brt_create_file(t, fname, &fd);
}
if (r != 0) goto died1;
// TODO: #2090
r=toku_cachetable_openfd_with_filenum(&t->cf, cachetable, fd, fname_in_env, t->did_set_filenum, t->filenum);
if (r != 0) goto died00;
r=toku_cachetable_openfd_with_filenum(&t->cf,
cachetable, fd, fname_in_env, t->did_set_filenum||did_create, reserved_filenum, did_create);
if (r != 0) goto died1;
if (did_create) {
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
r = toku_logger_log_fcreate(txn, fname_in_env, toku_cachefile_filenum(t->cf), mode, t->flags, &(t->temp_descriptor));
if (r != 0) goto died_after_open;
if (txn) {
BYTESTRING bs = { .len=strlen(fname), .data = toku_strdup_in_rollback(txn, fname) };
r = toku_logger_save_rollback_fcreate(txn, toku_cachefile_filenum(t->cf), bs); // bs is a copy of the fname relative to the cwd
......@@ -3058,7 +3083,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
if (r!=0) {
died_after_open:
toku_cachefile_close(&t->cf, toku_txn_logger(txn), 0, FALSE, ZERO_LSN);
goto died00;
goto died1;
}
assert(t->nodesize>0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
......
......@@ -162,6 +162,7 @@ struct cachetable {
struct minicron checkpointer; // the periodic checkpointing thread
toku_pthread_mutex_t openfd_mutex; // make toku_cachetable_openfd() single-threaded
LEAFLOCK_POOL leaflock_pool;
OMT reserved_filenums;
};
......@@ -276,6 +277,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
r = toku_pthread_mutex_init(&ct->cachefiles_mutex, 0); assert(r == 0);
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
r = toku_leaflock_create(&ct->leaflock_pool); assert(r==0);
r = toku_omt_create(&ct->reserved_filenums); assert(r==0);
*result = ct;
return 0;
}
......@@ -354,14 +356,129 @@ static void cachefile_init_filenum(CACHEFILE cf, int fd, const char *fname_relat
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_relative_to_env) {
return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_relative_to_env, FALSE, next_filenum_to_use);
return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_relative_to_env, FALSE, next_filenum_to_use, FALSE);
}
int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum) {
static int
find_by_filenum (OMTVALUE v, void *filenumv) {
FILENUM fnum = *(FILENUM*)v;
FILENUM fnumfind = *(FILENUM*)filenumv;
if (fnum.fileid<fnumfind.fileid) return -1;
if (fnum.fileid>fnumfind.fileid) return +1;
return 0;
}
static BOOL
is_filenum_reserved(CACHETABLE ct, FILENUM filenum) {
OMTVALUE v;
int r;
BOOL rval;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, NULL, NULL);
if (r==0) {
FILENUM* found = v;
assert(found->fileid == filenum.fileid);
rval = TRUE;
}
else {
assert(r==DB_NOTFOUND);
rval = FALSE;
}
return rval;
}
static void
reserve_filenum(CACHETABLE ct, FILENUM filenum) {
int r;
uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL);
assert(r==DB_NOTFOUND);
FILENUM *XMALLOC(entry);
*entry = filenum;
r = toku_omt_insert_at(ct->reserved_filenums, entry, index);
assert(r==0);
}
static void
unreserve_filenum(CACHETABLE ct, FILENUM filenum) {
OMTVALUE v;
int r;
uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, &index, NULL);
assert(r==0);
FILENUM* found = v;
assert(found->fileid == filenum.fileid);
toku_free(found);
r = toku_omt_delete_at(ct->reserved_filenums, index);
assert(r==0);
}
int
toku_cachetable_reserve_filenum (CACHETABLE ct, FILENUM *reserved_filenum, BOOL with_filenum, FILENUM filenum) {
int r;
CACHEFILE extant;
cachetable_lock(ct);
cachefiles_lock(ct);
if (with_filenum) {
// verify that filenum is not in use
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (filenum.fileid == extant->filenum.fileid) {
r = EEXIST;
goto exit;
}
}
if (is_filenum_reserved(ct, next_filenum_to_use)) {
r = EEXIST;
goto exit;
}
} else {
// find an unused fileid and use it
try_again:
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++;
goto try_again;
}
}
if (is_filenum_reserved(ct, next_filenum_to_use)) {
next_filenum_to_use.fileid++;
goto try_again;
}
}
{
//Reserve a filenum.
FILENUM reserved;
reserved.fileid = next_filenum_to_use.fileid++;
reserve_filenum(ct, reserved);
*reserved_filenum = reserved;
r = 0;
}
exit:
cachefiles_unlock(ct);
cachetable_unlock(ct);
return r;
}
void
toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum) {
cachetable_lock(ct);
cachefiles_lock(ct);
unreserve_filenum(ct, reserved_filenum);
cachefiles_unlock(ct);
cachetable_unlock(ct);
}
int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum, BOOL reserved) {
int r;
CACHEFILE extant;
struct fileid fileid;
if (reserved) assert(with_filenum);
r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) {
r=errno; close(fd);
......@@ -383,6 +500,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
cachefiles_lock(ct);
goto try_again; // other thread has closed this file, go create a new cachefile
}
assert(!is_filenum_reserved(ct, extant->filenum));
r = close(fd);
assert(r == 0);
// re-use pre-existing cachefile
......@@ -402,6 +520,14 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
goto exit;
}
}
if (is_filenum_reserved(ct, filenum)) {
if (reserved)
unreserve_filenum(ct, filenum);
else {
r = EEXIST;
goto exit;
}
}
} else {
// find an unused fileid and use it
try_again:
......@@ -411,6 +537,10 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
goto try_again;
}
}
if (is_filenum_reserved(ct, next_filenum_to_use)) {
next_filenum_to_use.fileid++;
goto try_again;
}
}
{
// create a new cachefile entry in the cachetable
......@@ -1632,6 +1762,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
cachetable_unlock(ct);
toku_destroy_workers(&ct->wq, &ct->threadpool);
r = toku_leaflock_destroy(&ct->leaflock_pool); assert(r==0);
toku_omt_destroy(&ct->reserved_filenums);
r = toku_pthread_mutex_destroy(&ct->cachefiles_mutex); assert(r == 0);
toku_free(ct->table);
toku_free(ct);
......
......@@ -71,7 +71,11 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, const
// Bind a file to a new cachefile object.
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env /*(used for logging)*/);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum, BOOL reserved);
int toku_cachetable_reserve_filenum (CACHETABLE ct, FILENUM *reserved_filenum, BOOL with_filenum, FILENUM filenum);
void toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum);
// Get access to the asynchronous work queue
// Returns: a pointer to the work queue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment