Commit d33980af authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Manage the header not in the cachetable. This will help with #1054. ...

Manage the header not in the cachetable.  This will help with #1054.   Addresses #1000, #1054, #1080, #1131.

git-svn-id: file:///svn/tokudb.1131b+1080a@6128 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4103a85c
......@@ -119,9 +119,11 @@ struct block_translation_pair {
DISKOFF size;
};
// The brt_header is not managed by the cachetable. Instead, it hangs off the cachefile as userdata.
struct brt_header {
int refcount;
int dirty;
u_int32_t fullhash;
int layout_version;
unsigned int nodesize;
int n_named_roots; /* -1 if the only one is unnamed */
......@@ -172,6 +174,7 @@ struct brt {
OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also)
u_int64_t txn_that_created; // which txn created it. Use 0 if no such txn.
u_int64_t root_put_counter;
};
/* serialization code */
......@@ -185,7 +188,7 @@ void toku_verify_counts(BRTNODE);
int toku_serialize_brt_header_size (struct brt_header *h);
int toku_serialize_brt_header_to (int fd, struct brt_header *h);
int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h);
int toku_deserialize_brtheader_from (int fd, BLOCKNUM off, u_int32_t fullhash, struct brt_header **brth);
int toku_deserialize_brtheader_from (int fd, BLOCKNUM off, struct brt_header **brth);
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo); // Write a fifo into a disk, without worrying about fitting it into a block. This write is done at the end of the file.
......@@ -204,10 +207,9 @@ struct brtenv {
// SPINLOCK checkpointing;
};
extern void toku_brtnode_flush_callback(), toku_brtheader_flush_callback();
extern int toku_brtnode_fetch_callback(), toku_brtheader_fetch_callback();
extern int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header);
extern int toku_unpin_brt_header (BRT brt);
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn);
extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
static const BRTNODE null_brtnode=0;
......@@ -277,5 +279,6 @@ enum brt_layout_version_e {
};
void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v);
#endif
......@@ -255,18 +255,26 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt
{
// If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0);
printf("%s:%d trans=%lu\n", __FILE__, __LINE__, brt->h->translated_blocknum_limit);
if (brt->h->translated_blocknum_limit > (u_int64_t)blocknum.b) {
printf("%s:%d brt=%p\n", __FILE__, __LINE__, brt);
printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, brt->h->translated_blocknum_limit, blocknum.b);
printf("%s:%d allocator=%p\n", __FILE__, __LINE__, brt->h->block_allocator);
printf("%s:%d bt=%p\n", __FILE__, __LINE__, brt->h->block_translation);
if (brt->h->translated_blocknum_limit <= (u_int64_t)blocknum.b) {
if (brt->h->block_translation == 0) assert(brt->h->translated_blocknum_limit==0);
u_int64_t new_limit = blocknum.b + 1;
u_int64_t old_limit = brt->h->translated_blocknum_limit;
u_int64_t j;
XREALLOC_N(new_limit, brt->h->block_translation);
while (++old_limit < new_limit) {
brt->h->block_translation[old_limit].diskoff = 0;
brt->h->block_translation[old_limit].size = 0;
for (j=old_limit; j<new_limit; j++) {
brt->h->block_translation[j].diskoff = 0;
brt->h->block_translation[j].size = 0;
}
brt->h->translated_blocknum_limit = new_limit;
} else {
}
if (brt->h->block_translation[blocknum.b].size > 0) {
block_allocator_free_block(brt->h->block_allocator, brt->h->block_translation[blocknum.b].diskoff);
brt->h->block_translation[blocknum.b].diskoff = 0;
brt->h->block_translation[blocknum.b].size = 0;
}
size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
u_int64_t offset;
......@@ -646,16 +654,17 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
}
{
struct wbuf w;
u_int64_t size = 4 + h->translated_blocknum_limit * 8; // 4 for the checksum
u_int64_t size = 4 + h->translated_blocknum_limit * 16; // 4 for the checksum
printf("%s:%d writing translation table of size %ld\n", __FILE__, __LINE__, size);
wbuf_init(&w, toku_malloc(size), size);
u_int64_t i;
for (i=0; i<h->translated_blocknum_limit; i++) {
printf("%s:%d %ld,%ld\n", __FILE__, __LINE__, h->block_translation[i].diskoff, h->block_translation[i].size);
wbuf_ulonglong(&w, h->block_translation[i].diskoff);
wbuf_ulonglong(&w, h->block_translation[i].size);
}
u_int32_t checksum = x1764_finish(&w.checksum);
printf("%s:%d writing to %ld, checksum=%d offset=%d size=%ld\n", __FILE__, __LINE__, h->block_translation_address_on_disk, checksum, w.ndone, size);
printf("%s:%d writing to %d\n", __FILE__, __LINE__, checksum);
wbuf_int(&w, checksum);
ssize_t nwrote = pwrite(fd, w.buf, size, h->block_translation_address_on_disk);
assert(nwrote==(ssize_t)size);
......@@ -664,12 +673,12 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
return 0;
}
int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **brth, u_int32_t fullhash) {
// We only deserialize brt header once and then share everything with all the brts.
int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **brth) {
// We already know the first 8 bytes are "tokudata", and we read in the size.
struct brt_header *MALLOC(h);
if (h==0) return errno;
int ret=-1;
h->fullhash = fullhash;
if (0) { died0: toku_free(h); return ret; }
struct rbuf rc;
rc.buf = toku_malloc(size-12); // we can skip the first 12 bytes.
......@@ -689,10 +698,11 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
h->unused_blocks = rbuf_blocknum(&rc);
h->n_named_roots = rbuf_int(&rc);
h->translated_blocknum_limit = rbuf_diskoff(&rc);
h->block_translation_size_on_disk = 4 + 8 * h->translated_blocknum_limit;
h->block_translation_size_on_disk = 4 + 16 * h->translated_blocknum_limit;
h->block_translation_address_on_disk = rbuf_diskoff(&rc);
// Set up the the block translation buffer.
create_block_allocator(&h->block_allocator, h->nodesize);
printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, h->translated_blocknum_limit, h->block_translation_address_on_disk);
if (h->block_translation_address_on_disk == 0) {
h->block_translation = 0;
} else {
......@@ -763,7 +773,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
return 0;
}
int toku_deserialize_brtheader_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, struct brt_header **brth) {
int toku_deserialize_brtheader_from (int fd, BLOCKNUM blocknum, struct brt_header **brth) {
//printf("%s:%d calling MALLOC\n", __FILE__, __LINE__);
assert(blocknum.b==0);
DISKOFF offset = 0;
......@@ -776,7 +786,7 @@ int toku_deserialize_brtheader_from (int fd, BLOCKNUM blocknum, u_int32_t fullha
if (r!=12) return EINVAL;
assert(memcmp(magic,"tokudata",8)==0);
// It's version 7 or later, and the magi clooks OK
return deserialize_brtheader(ntohl(*(int*)(&magic[8])), fd, offset, brth, fullhash);
return deserialize_brtheader(ntohl(*(int*)(&magic[8])), fd, offset, brth);
}
unsigned int toku_brt_pivot_key_len (BRT brt, struct kv_pair *pk) {
......
......@@ -3,15 +3,13 @@
int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node;
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r!=0) return r;
toku_create_new_brtnode(brt, &node, 0, (TOKULOGGER)0);
*blocknum = node->thisnodename;
r = toku_unpin_brtnode(brt, node);
if (r!=0) return r;
r = toku_unpin_brt_header(brt);
if (r!=0) return r;
return 0;
}
......@@ -19,7 +17,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens) {
BRTNODE node;
assert(n_children<=BRT_FANOUT);
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r!=0) return r;
toku_create_new_brtnode(brt, &node, height, (TOKULOGGER)0);
node->u.n.n_children=n_children;
......@@ -40,20 +38,15 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child
node->u.n.totalchildkeylens += keylens[i];
}
*blocknum = node->thisnodename;
r = toku_unpin_brtnode(brt, node);
if (r!=0) return r;
r = toku_unpin_brt_header(brt);
if (r!=0) return r;
return 0;
return toku_unpin_brtnode(brt, node);
}
int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) {
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r!=0) return r;
brt->h->roots[0] = blocknum;
brt->h->root_hashes[0].valid = FALSE;
r = toku_unpin_brt_header(brt);
return r;
return 0;
}
int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on disk
......
......@@ -154,15 +154,9 @@ int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lo
}
int toku_verify_brt (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
assert(brt->h);
u_int32_t root_hash;
rootp = toku_calculate_root_offset_pointer(brt, &root_hash);
if ((r=toku_verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
return toku_verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1);
}
This diff is collapsed.
......@@ -26,7 +26,7 @@ void print_item (bytevec val, ITEMLEN len) {
void dump_header (int f, struct brt_header **header) {
struct brt_header *h;
int r;
r = toku_deserialize_brtheader_from (f, header_blocknum, 0/*pass 0 for hash. It doesn't matter.*/, &h); assert(r==0);
r = toku_deserialize_brtheader_from (f, header_blocknum, &h); assert(r==0);
printf("brtheader:\n");
if (h->layout_version==BRT_LAYOUT_VERSION_6) printf(" layout_version<=6\n");
else printf(" layout_version=%d\n", h->layout_version);
......
......@@ -124,7 +124,6 @@ struct fileid {
struct cachefile {
CACHEFILE next;
u_int32_t header_fullhash;
u_int64_t refcount; /* CACHEFILEs are shared. Use a refcount to decide when to really close it.
* The reference count is one for every open DB.
* Plus one for every commit/rollback record. (It would be harder to keep a count for every open transaction,
......@@ -136,6 +135,9 @@ struct cachefile {
struct fileid fileid;
FILENUM filenum;
char *fname;
void *userdata;
int (*close_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function.
};
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
......@@ -163,6 +165,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
t->size_writing = 0;
t->lsn_of_checkpoint = initial_lsn;
t->logger = logger;
int r;
writequeue_init(&t->wq);
r = pthread_mutex_init(&t->mutex, 0); assert(r == 0);
......@@ -233,9 +236,12 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna
newcf->filenum.fileid = next_filenum_to_use.fileid++;
cachefile_init_filenum(newcf, fd, fname, fileid);
newcf->refcount = 1;
newcf->header_fullhash = toku_cachetable_hash(newcf, header_blocknum);
newcf->next = t->cachefiles;
t->cachefiles = newcf;
newcf->userdata = 0;
newcf->close_userdata = 0;
*cf = newcf;
return 0;
}
......@@ -301,6 +307,12 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
cachetable_unlock(ct);
return r;
}
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata))) {
cachetable_unlock(ct);
return r;
}
cf->close_userdata = NULL;
cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct);
r = close(cf->fd);
......@@ -1117,11 +1129,6 @@ FILENUM toku_cachefile_filenum (CACHEFILE cf) {
return cf->filenum;
}
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile) {
return cachefile->header_fullhash;
}
#if DO_WRITER_THREAD
// The writer thread waits for work in the write queue and writes the pair
......@@ -1205,3 +1212,11 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
note_hash_count(count);
return r;
}
void toku_cachefile_set_userdata (CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*)) {
cf->userdata = userdata;
cf->close_userdata = close_userdata;
}
void *toku_cachefile_get_userdata(CACHEFILE cf) {
return cf->userdata;
}
......@@ -46,14 +46,18 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value,
// the fetch callback
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
// Put a key and value pair into the cachetable
// effects: if the key,cachefile is not in the cachetable, then insert the pair and pin it.
// returns: 0 if success, otherwise an error
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*));
// Effect: Store some cachefile-specific data. When the last reference to a cachefile is closed, we call close_userdata.
// If userdata is already non-NULL, then we simply overwrite it.
void *toku_cachefile_get_userdata(CACHEFILE);
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
// Effect: Put a key and value pair into the cachetable
// If the key,cachefile is not in the cachetable, then insert the pair and pin it.
// returns: 0 if success, otherwise an error
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
void **/*value*/, long *sizep,
......
......@@ -149,9 +149,7 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
} else {
assert(0);
}
u_int32_t fullhash = toku_cachetable_hash(pair->cf, header_blocknum);
h->fullhash = fullhash;
toku_cachetable_put(pair->cf, header_blocknum, fullhash, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
//toku_cachetable_put(pair->cf, header_blocknum, fullhash, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (pair->brt) {
toku_free(pair->brt->h);
} else {
......@@ -168,8 +166,7 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
pair->brt->h = h;
pair->brt->nodesize = h->nodesize;
pair->brt->flags = h->nodesize;
r = toku_unpin_brt_header(pair->brt);
assert(r==0);
toku_cachefile_set_userdata(pair->cf, pair->brt->h, toku_brtheader_close);
}
void toku_recover_newbrtnode (LSN lsn, FILENUM filenum, BLOCKNUM blocknum,u_int32_t height,u_int32_t nodesize,u_int8_t is_dup_sort,u_int32_t rand4fingerprint) {
......@@ -238,12 +235,10 @@ static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILE
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *h_v;
u_int32_t fullhash = toku_cachetable_hash(pair->cf, header_blocknum);
r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash,
&h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
assert(r==0);
struct brt_header *h=h_v;
//void *h_v;
//r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash,
// &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
struct brt_header *h=0;
bytevec storedkey,storeddata;
ITEMLEN storedkeylen, storeddatalen;
TXNID storedxid;
......@@ -252,8 +247,8 @@ static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILE
assert(r==0);
r = toku_fifo_deq(h->fifo);
assert(r==0);
r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, 1, 0);
assert(r==0);
//r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, 1, 0);
//assert(r==0);
}
void toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
......@@ -262,7 +257,12 @@ void toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM fil
assert(r==0);
void *h_v;
u_int32_t fullhash = toku_cachetable_hash(pair->cf, header_blocknum);
r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash, &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (0) {
//r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash, &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
} else {
h_v=0;
assert(0);
}
assert(r==0);
struct brt_header *h=h_v;
r = toku_fifo_enq(h->fifo, key.data, key.len, val.data, val.len, typ, xid);
......@@ -655,11 +655,9 @@ void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(o
int r = find_cachefile(filenum, &pair);
assert(r==0);
assert(pair->brt);
r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h);
assert(r==0);
assert(pair->brt->h);
pair->brt->h->roots[0] = newroot;
pair->brt->h->root_hashes[0].valid = FALSE;
r = toku_unpin_brt_header(pair->brt);
}
void toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), BLOCKNUM UU(oldroot), BLOCKNUM UU(newroot)) { assert(0); }
......@@ -668,10 +666,8 @@ void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(
int r = find_cachefile(filenum, &pair);
assert(r==0);
assert(pair->brt);
r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h);
assert(r==0);
assert(pair->brt->h);
pair->brt->h->unused_blocks = newunused;
r = toku_unpin_brt_header(pair->brt);
}
static int toku_recover_checkpoint (LSN UU(lsn)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment