Commit 1cbc087f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improve logging. Addresses #27.

git-svn-id: file:///svn/tokudb@1690 c7de825b-a66e-492c-adef-691d508d4ae1
parent a2803507
......@@ -97,6 +97,7 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
struct wbuf w;
int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node);
assert(calculated_size<=size);
//char buf[size];
char *MALLOC_N(size,buf);
toku_verify_counts(node);
......
......@@ -79,7 +79,8 @@ const struct logtype logtypes[] = {
NULLFIELD}},
{"pmadistribute", 'M', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"DISKOFF", "old_diskoff", 0},
{"DISKOFF", "new_diskoff", 0},
{"INTPAIRARRAY", "fromto", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
......
......@@ -460,6 +460,30 @@ static int distribute_data (struct kv_pair *destpairs[], int dcount,
}
}
static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs) {
INTPAIRARRAY ipa;
MALLOC_N(n_pairs, ipa.array);
if (ipa.array==0) return errno;
int j=0;
int i;
for (i=0; i<n_pairs; i++) {
if (pairs[i].pair!=0) {
ipa.array[j].a = pairs[i].oldtag;
ipa.array[j].b = pairs[i].newtag;
j++;
}
}
ipa.size=j;
int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, old_diskoff, new_diskoff, ipa);
// if (0 && pma) {
// printf("Pma state:\n");
// PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen,
// printf(" %d:(%d:%s) (%d:%s)\n", pidx, keylen, (char*)key, datalen, (char*)data));
// }
toku_free(ipa.array);
return r;
}
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */
int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx) {
......@@ -497,35 +521,18 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof
/* Now the tricky part. Distribute the data. */
newidx=distribute_data (pairs, n,
tmppairs, n_saved, pma);
{
INTPAIRARRAY ipa;
ipa.size=n_saved-1; /* Don't move the blank spot. */
MALLOC_N(n_saved, ipa.array);
if (ipa.array==0) return errno;
int j=0;
for (i=0; i<n_saved; i++) {
if (tmppairs[i].pair!=0) {
ipa.array[j].a = tmppairs[i].oldtag;
ipa.array[j].b = tmppairs[i].newtag;
j++;
}
}
int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, diskoff, ipa);
if (0 && pma) {
printf("Pma state:\n");
PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen,
printf(" %d:(%d:%s) (%d:%s)\n", pidx, keylen, (char*)key, datalen, (char*)data));
}
toku_free(ipa.array);
if (r!=0) return r;
}
int r = pma_log_distribute(txn, filenum, diskoff, diskoff,
n_saved,
tmppairs);
if (r!=0) goto cleanup;
if (pma && !list_empty(&pma->cursors))
__pma_update_my_cursors(pma, tmppairs, n_present);
*new_idx = newidx;
cleanup:
#ifdef USE_MALLOC_IN_SMOOTH
toku_free(tmppairs);
#endif
*new_idx = newidx;
return 0;
}
}
......@@ -1590,7 +1597,8 @@ int toku_pma_clear_at_index (PMA pma, unsigned int idx) {
}
// assume no cursors
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
// Move stuff from pmaa to pmab
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto) {
u_int32_t i;
for (i=0; i<fromto.size; i++) {
// First handle the case for sliding something left. We can simply move it.
......@@ -1599,9 +1607,9 @@ int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
int b=fromto.array[i].b;
if (b==a) continue;
if (b<a) {
assert(pma->pairs[b]==0);
pma->pairs[b] = pma->pairs[a];
pma->pairs[a] = 0;
assert(pma_to->pairs[b]==0);
pma_to->pairs[b] = pma_from->pairs[a];
pma_from->pairs[a] = 0;
continue;
}
}
......@@ -1619,9 +1627,9 @@ int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
int a=fromto.array[jdown].a;
int b=fromto.array[jdown].b;
if (a!=b) {
assert(pma->pairs[b]==0);
pma->pairs[b] = pma->pairs[a];
pma->pairs[a] = 0;
assert(pma_to->pairs[b]==0);
pma_to->pairs[b] = pma_from->pairs[a];
pma_from->pairs[a] = 0;
}
if (i==jdown) break; // Do it this way so everything can be unsigned and we won't try to go negative.
}
......@@ -1640,10 +1648,10 @@ static void reverse_fromto (INTPAIRARRAY fromto) {
}
}
int toku_pma_move_indices_back (PMA pma, INTPAIRARRAY fromto) {
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto) {
int r;
reverse_fromto(fromto);
r = toku_pma_move_indices(pma, fromto);
r = toku_pma_move_indices(pma_backfrom, pma_backto, fromto);
reverse_fromto(fromto);
return r;
}
......@@ -155,9 +155,9 @@ int toku_pma_clear_at_index (PMA, unsigned int /*index*/); // If the index is wr
// Requires: No open cursors on the pma.
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
// Move things backwards according to fromto.
int toku_pma_move_indices_back (PMA pma, INTPAIRARRAY fromto);
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto);
void toku_pma_show_stats (void);
......
......@@ -287,26 +287,32 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
void *node_va, *node_vb;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->old_diskoff, &node_va, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
{
r = toku_cachetable_get_and_pin(pair->cf, c->new_diskoff, &node_vb, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE nodea = node_va; assert(nodea->height==0);
BRTNODE nodeb = node_vb; assert(nodeb->height==0);
{
unsigned int i;
for (i=0; i<c->fromto.size; i++) {
assert(c->fromto.array[i].a < toku_pma_index_limit(node->u.l.buffer));
assert(c->fromto.array[i].b < toku_pma_index_limit(node->u.l.buffer));
assert(c->fromto.array[i].a < toku_pma_index_limit(nodea->u.l.buffer));
assert(c->fromto.array[i].b < toku_pma_index_limit(nodeb->u.l.buffer));
}
}
r = toku_pma_move_indices (node->u.l.buffer, c->fromto);
r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, c->fromto);
// The bytes in bufer and fingerprint shouldn't change
VERIFY_COUNTS(node);
VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodea));
assert(r==0);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
assert(r==0);
toku_free(c->fromto.array);
}
......@@ -316,13 +322,27 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn)
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, le->filenum, &cf, &brt);
if (r!=0) return r;
void *node_v;
r = toku_cachetable_get_and_pin(cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
r = toku_pma_move_indices_back(node->u.l.buffer, le->fromto);
void *node_va, *node_vb;
r = toku_cachetable_get_and_pin(cf, le->old_diskoff, &node_va, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
r = toku_cachetable_unpin(cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
if (0) {
died0: toku_cachetable_unpin(cf, le->old_diskoff, 1, 0);
return r;
}
r = toku_cachetable_get_and_pin(cf, le->new_diskoff, &node_vb, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) goto died0;
if (0) {
died1:
toku_cachetable_unpin(cf, le->new_diskoff, 1, 0);
goto died0;
}
BRTNODE nodea = node_va;
BRTNODE nodeb = node_vb;
r = toku_pma_move_indices_back(nodea->u.l.buffer, nodeb->u.l.buffer, le->fromto);
if (r!=0) goto died1;
r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
return r;
}
......
......@@ -135,9 +135,12 @@ NO_VGRIND = \
$(patsubst %,test_%.bdbrun,$(NO_VGRIND)): VGRIND=
$(patsubst %,test_%.bdbrun,$(NO_VGRIND)): BDB_SUPPRESSIONS=
libs:
cd ..;make
%.bdb: %.c
$(UNSETTOKUENV) cc -DDIR=\"dir.$<.bdb\" $(BDB_CPPFLAGS) -DUSE_BDB -DIS_TDB=0 $(CFLAGS) $< $(BDB_LDFLAGS) -ldb -o $@
%.tdb: %.c
%.tdb: %.c libs
$(SETTOKUENV) cc -DDIR=\"dir.$<.tdb\" -DUSE_TDB -DIS_TDB=1 $(CFLAGS) $(TDB_CPPFLAGS) $(TDB_LOADLIBES) $< -o $@
.PHONY: %.recover
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment