Commit 271137c9 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

test_log5 recovers correctly now. Addresses #27.

git-svn-id: file:///svn/tokudb@1826 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2a2304f3
...@@ -395,7 +395,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -395,7 +395,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
} }
if (n_in_buf > 0) { if (n_in_buf > 0) {
u_int32_t actual_sum = 0; u_int32_t actual_sum = 0;
r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum); r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum, 0);
if (r!=0) goto died_21; if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) { if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf); //fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
......
...@@ -79,6 +79,13 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnod ...@@ -79,6 +79,13 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnod
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk); static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void toku_update_brtnode_lsn(BRTNODE node, TOKUTXN txn) {
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKUTXN txn) { static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKUTXN txn) {
u_int32_t old_fingerprint = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node); u_int32_t old_fingerprint = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node);
u_int32_t sum = child->local_fingerprint; u_int32_t sum = child->local_fingerprint;
...@@ -93,6 +100,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE ...@@ -93,6 +100,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum; BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum;
node->dirty=1; node->dirty=1;
toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum); toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_lsn(node, txn);
} }
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) { static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
...@@ -191,12 +199,12 @@ int toku_unpin_brt_header (BRT brt) { ...@@ -191,12 +199,12 @@ int toku_unpin_brt_header (BRT brt) {
brt->h=0; brt->h=0;
return r; return r;
} }
static int unpin_brtnode (BRT brt, BRTNODE node, TOKUTXN txn) { static int unpin_brtnode (BRT brt, BRTNODE node) {
if (node->dirty && txn) { // if (node->dirty && txn) {
// For now just update the log_lsn. Later we'll have to deal with the checksums. // // For now just update the log_lsn. Later we'll have to deal with the checksums.
node->log_lsn = toku_txn_get_last_lsn(txn); // node->log_lsn = toku_txn_get_last_lsn(txn);
//if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn); // //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
} // }
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node)); return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
} }
...@@ -302,6 +310,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn) ...@@ -302,6 +310,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
assert(r==0); assert(r==0);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint); r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0); assert(r==0);
toku_update_brtnode_lsn(n, txn);
} }
static void delete_node (BRT t, BRTNODE node) { static void delete_node (BRT t, BRTNODE node) {
...@@ -356,9 +365,9 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT ...@@ -356,9 +365,9 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
int r; int r;
r = toku_pma_split(txn, filenum, r = toku_pma_split(txn, filenum,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn,
splitk, splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint); B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn);
assert(r == 0); assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */ /* Remove it from the cache table, and free its storage. */
...@@ -684,9 +693,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -684,9 +693,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
toku_verify_counts(childa); toku_verify_counts(childa);
toku_verify_counts(childb); toku_verify_counts(childb);
r=unpin_brtnode(t, childa, txn); r=unpin_brtnode(t, childa);
assert(r==0); assert(r==0);
r=unpin_brtnode(t, childb, txn); r=unpin_brtnode(t, childb);
assert(r==0); assert(r==0);
if (node->u.n.n_children>TREE_FANOUT) { if (node->u.n.n_children>TREE_FANOUT) {
...@@ -804,7 +813,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -804,7 +813,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild); if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild);
assert(toku_serialize_brtnode_size(node)<=node->nodesize); assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
r=unpin_brtnode(t, child, txn); r=unpin_brtnode(t, child);
if (r!=0) return r; if (r!=0) return r;
*did_split=0; *did_split=0;
return 0; return 0;
...@@ -870,7 +879,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -870,7 +879,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *k = cmd->u.id.key; DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val; DBT *v = cmd->u.id.val;
int replaced_v_size; int replaced_v_size;
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint); enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn);
assert(pma_status==BRT_OK); assert(pma_status==BRT_OK);
//printf("replaced_v_size=%d\n", replaced_v_size); //printf("replaced_v_size=%d\n", replaced_v_size);
if (replaced_v_size>=0) { if (replaced_v_size>=0) {
...@@ -970,7 +979,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -970,7 +979,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
&child_did_split, &childa, &childb, &childsplitk, debug, txn); &child_did_split, &childa, &childb, &childsplitk, debug, txn);
if (r != 0) { if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */ /* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = unpin_brtnode(t, child, txn); int rr = unpin_brtnode(t, child);
assert(rr == 0); assert(rr == 0);
return r; return r;
} }
...@@ -985,7 +994,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -985,7 +994,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
} else { } else {
//verify_local_fingerprint_nonleaf(child); //verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child, t, txn); fixup_child_fingerprint(node, childnum, child, t, txn);
int rr = unpin_brtnode(t, child, txn); int rr = unpin_brtnode(t, child);
assert(rr == 0); assert(rr == 0);
} }
return r; return r;
...@@ -1253,11 +1262,8 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) { ...@@ -1253,11 +1262,8 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
toku_verify_counts(node); toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node); // verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint); toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
if (txn) { toku_update_brtnode_lsn(node, txn);
node->log_lsn = toku_txn_get_last_lsn(txn); r=unpin_brtnode(t, node);
//fprintf(stderr, "%s:%d last lsn=%" PRId64 "\n", __FILE__, __LINE__, node->log_lsn.lsn);
}
r=unpin_brtnode(t, node, txn);
if (r!=0) { if (r!=0) {
toku_free(node); toku_free(node);
return r; return r;
...@@ -1618,10 +1624,11 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1618,10 +1624,11 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
.data = kv_pair_key(newroot->u.n.childkeys[0]) }; .data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs); r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r; if (r!=0) return r;
toku_update_brtnode_lsn(newroot, txn);
} }
r=unpin_brtnode(brt, nodea, txn); r=unpin_brtnode(brt, nodea);
if (r!=0) return r; if (r!=0) return r;
r=unpin_brtnode(brt, nodeb, txn); r=unpin_brtnode(brt, nodeb);
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root); //printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot), toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
...@@ -1673,7 +1680,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) { ...@@ -1673,7 +1680,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
if (node->height>0) if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT); assert(node->u.n.n_children<=TREE_FANOUT);
} }
r = unpin_brtnode(brt, node, txn); r = unpin_brtnode(brt, node);
assert(r==0); assert(r==0);
r = toku_unpin_brt_header(brt); r = toku_unpin_brt_header(brt);
assert(r == 0); assert(r == 0);
...@@ -1883,7 +1890,7 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor ...@@ -1883,7 +1890,7 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor
BRTNODE newnode; BRTNODE newnode;
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn, &newnode); r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn, &newnode);
assert(r == 0); assert(r == 0);
r = unpin_brtnode(t, newnode, txn); r = unpin_brtnode(t, newnode);
assert(r == 0); assert(r == 0);
} else { } else {
BRTNODE upnode; BRTNODE upnode;
......
...@@ -36,9 +36,9 @@ struct pma { ...@@ -36,9 +36,9 @@ struct pma {
int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi); int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void toku_pmainternal_calculate_parameters (PMA pma); void toku_pmainternal_calculate_parameters (PMA pma);
int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/); int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/, LSN */*node_lsn*/);
int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N); int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N);
int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index); int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn);
int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called. int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called.
void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */ void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
......
...@@ -44,7 +44,7 @@ static void test_make_space_at (void) { ...@@ -44,7 +44,7 @@ static void test_make_space_at (void) {
r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0); assert(r==0);
assert(toku_pma_n_entries(pma)==0); assert(toku_pma_n_entries(pma)==0);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0); assert(r==0);
assert(toku_pma_index_limit(pma)==4); assert(toku_pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL); assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL);
...@@ -52,7 +52,7 @@ static void test_make_space_at (void) { ...@@ -52,7 +52,7 @@ static void test_make_space_at (void) {
pma->pairs[2] = key_A; pma->pairs[2] = key_A;
pma->n_pairs_present++; pma->n_pairs_present++;
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0); assert(r==0);
if (verbose) printf("Requested space at 2, got space at %d\n", newi); if (verbose) printf("Requested space at 2, got space at %d\n", newi);
if (verbose) toku_print_pma(pma); if (verbose) toku_print_pma(pma);
...@@ -66,7 +66,7 @@ static void test_make_space_at (void) { ...@@ -66,7 +66,7 @@ static void test_make_space_at (void) {
pma->pairs[3] = 0; pma->pairs[3] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma); if (verbose) toku_print_pma(pma);
toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi); toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi, (LSN*)0);
assert(r==0); assert(r==0);
if (verbose) printf("Requested space at 0, got space at %d\n", newi); if (verbose) printf("Requested space at 0, got space at %d\n", newi);
if (verbose) toku_print_pma(pma); if (verbose) toku_print_pma(pma);
...@@ -83,7 +83,7 @@ static void test_make_space_at (void) { ...@@ -83,7 +83,7 @@ static void test_make_space_at (void) {
pma->pairs[7] = 0; pma->pairs[7] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma); if (verbose) toku_print_pma(pma);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi, (LSN*)0);
assert(r==0); assert(r==0);
if (verbose) toku_print_pma(pma); if (verbose) toku_print_pma(pma);
if (verbose) printf("r=%d\n", newi); if (verbose) printf("r=%d\n", newi);
...@@ -177,7 +177,7 @@ static void test_smooth_region_N (int N) { ...@@ -177,7 +177,7 @@ static void test_smooth_region_N (int N) {
} }
} }
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); } if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); }
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r); toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r, (LSN*)0);
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r); } if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r); }
assert(0<=r); assert(r<N); assert(0<=r); assert(r<N);
assert(pairs[r]==0); assert(pairs[r]==0);
...@@ -219,7 +219,7 @@ static void test_smooth_region6 (void) { ...@@ -219,7 +219,7 @@ static void test_smooth_region6 (void) {
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0); pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r; int r;
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r); toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r, (LSN*)0);
if (verbose) { if (verbose) {
printf("{ "); printf("{ ");
for (i=0; i<N; i++) for (i=0; i<N; i++)
...@@ -284,7 +284,7 @@ static void add_fingerprint_and_check(u_int32_t rand4fingerprint, u_int32_t actu ...@@ -284,7 +284,7 @@ static void add_fingerprint_and_check(u_int32_t rand4fingerprint, u_int32_t actu
static void do_insert (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) { static void do_insert (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k,v; DBT k,v;
assert(*sum==*expect_fingerprint); assert(*sum==*expect_fingerprint);
int r = toku_pma_insert(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), NULL_ARGS, rand4fingerprint, sum); int r = toku_pma_insert(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), NULL_ARGS, rand4fingerprint, sum, (LSN*)0);
assert(r==BRT_OK); assert(r==BRT_OK);
add_fingerprint_and_check(rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); add_fingerprint_and_check(rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen);
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum); toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
...@@ -880,9 +880,9 @@ static void test_pma_split_n(int n) { ...@@ -880,9 +880,9 @@ static void test_pma_split_n(int n) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); } if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum, error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0, 0,
null_diskoff, pmac, 0, crand, &csum); null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmac); toku_pma_verify(pmac);
...@@ -940,9 +940,9 @@ static void test_pma_dup_split_n(int n, int dup_mode) { ...@@ -940,9 +940,9 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
DBT splitk; DBT splitk;
error = toku_pma_split(null_txn, null_filenum, error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
&splitk, &splitk,
null_diskoff, pmac, 0, crand, &csum); null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmac); toku_pma_verify(pmac);
...@@ -1006,9 +1006,9 @@ static void test_pma_split_varkey(void) { ...@@ -1006,9 +1006,9 @@ static void test_pma_split_varkey(void) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); } if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum, error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0, 0,
null_diskoff, pmac, 0, crand, &csum); null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmac); toku_pma_verify(pmac);
...@@ -1145,9 +1145,9 @@ static void test_pma_split_cursor(void) { ...@@ -1145,9 +1145,9 @@ static void test_pma_split_cursor(void) {
assert_cursor_val(cursorc, 16); assert_cursor_val(cursorc, 16);
error = toku_pma_split(null_txn, null_filenum, error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0, 0,
null_diskoff, pmac, 0, crand, &csum); null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0); assert(error == 0);
toku_pma_verify_fingerprint(pmaa, arand, asum); toku_pma_verify_fingerprint(pmaa, arand, asum);
...@@ -1252,7 +1252,7 @@ static void test_pma_bulk_insert_n(int n) { ...@@ -1252,7 +1252,7 @@ static void test_pma_bulk_insert_n(int n) {
} }
/* bulk insert n kv pairs */ /* bulk insert n kv pairs */
error = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum); error = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum, 0);
assert(error == 0); assert(error == 0);
assert(sum==expect_fingerprint); assert(sum==expect_fingerprint);
toku_pma_verify(pma); toku_pma_verify(pma);
...@@ -1305,14 +1305,14 @@ static void test_pma_insert_or_replace(void) { ...@@ -1305,14 +1305,14 @@ static void test_pma_insert_or_replace(void) {
u_int32_t expect_fingerprint = 0; u_int32_t expect_fingerprint = 0;
r = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); r = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0); assert(r==0);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_fill_dbt(&dbtv, "zzz", 4), &n_diff, NULL_ARGS, rand4fingerprint, &sum); r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_fill_dbt(&dbtv, "zzz", 4), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==-1); assert(r==0); assert(n_diff==-1);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "aaa", 4, "zzz", 4); add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "aaa", 4, "zzz", 4);
r = toku_pma_lookup(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_init_dbt(&dbtv)); r = toku_pma_lookup(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_init_dbt(&dbtv));
assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0); assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "ww", 3), &n_diff, NULL_ARGS, rand4fingerprint, &sum); r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "ww", 3), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==-1); assert(r==0); assert(n_diff==-1);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "ww", 3); add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "ww", 3);
...@@ -1323,7 +1323,7 @@ static void test_pma_insert_or_replace(void) { ...@@ -1323,7 +1323,7 @@ static void test_pma_insert_or_replace(void) {
assert(r==0); assert(dbtv.size==3); assert(memcmp(dbtv.data, "ww", 3)==0); assert(r==0); assert(dbtv.size==3); assert(memcmp(dbtv.data, "ww", 3)==0);
// replae bbbb // replae bbbb
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "xxxx", 5), &n_diff, NULL_ARGS, rand4fingerprint, &sum); r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "xxxx", 5), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==3); assert(r==0); assert(n_diff==3);
expect_fingerprint -= rand4fingerprint*toku_calccrc32_kvpair("bbbb", 5, "ww", 3); expect_fingerprint -= rand4fingerprint*toku_calccrc32_kvpair("bbbb", 5, "ww", 3);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "xxxx", 5); add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "xxxx", 5);
...@@ -1817,10 +1817,10 @@ static void test_pma_already_there() { ...@@ -1817,10 +1817,10 @@ static void test_pma_already_there() {
k = 1; v = 1; k = 1; v = 1;
toku_fill_dbt(&key, &k, sizeof k); toku_fill_dbt(&key, &k, sizeof k);
toku_fill_dbt(&val, &v, sizeof v); toku_fill_dbt(&val, &v, sizeof v);
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum); error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(error == 0); assert(error == 0);
u_int32_t savesum = sum; u_int32_t savesum = sum;
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum); error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(error == BRT_ALREADY_THERE); assert(error == BRT_ALREADY_THERE);
assert(sum==savesum); assert(sum==savesum);
...@@ -1847,7 +1847,7 @@ static void test_pma_cursor_first(int n) { ...@@ -1847,7 +1847,7 @@ static void test_pma_cursor_first(int n) {
int i; int i;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i); k = htonl(i); v = htonl(i);
r = toku_pma_insert(pma, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), NULL_ARGS, rand4fingerprint, &sum); r = toku_pma_insert(pma, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r == 0); assert(r == 0);
} }
for (i=0; ; i++) { for (i=0; ; i++) {
...@@ -2217,7 +2217,7 @@ static void test_nodup_key_insert(int n) { ...@@ -2217,7 +2217,7 @@ static void test_nodup_key_insert(int n) {
v = i; v = i;
toku_fill_dbt(&key, &k, sizeof k); toku_fill_dbt(&key, &k, sizeof k);
toku_fill_dbt(&val, &v, sizeof v); toku_fill_dbt(&val, &v, sizeof v);
r = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum); r = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
if (i == 0) { if (i == 0) {
assert(r == 0); assert(r == 0);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, &k, sizeof k, &v, sizeof v); add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, &k, sizeof k, &v, sizeof v);
......
...@@ -33,9 +33,9 @@ static void __pma_delete_finish(PMA pma, int here); ...@@ -33,9 +33,9 @@ static void __pma_delete_finish(PMA pma, int here);
/* /*
* resize the pma array to asksize. zero all array entries starting from startx. * resize the pma array to asksize. zero all array entries starting from startx.
*/ */
static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx); static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn);
static int old_pma_resize_array(PMA pma, int asksize, int startx) { static int old_pma_resize_array(PMA pma, int asksize, int startx) {
return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx); return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0);
} }
/* /*
...@@ -363,7 +363,7 @@ static int distribute_data (struct kv_pair *destpairs[], int dcount, ...@@ -363,7 +363,7 @@ static int distribute_data (struct kv_pair *destpairs[], int dcount,
} }
} }
static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs) { static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs, LSN *oldnode_lsn, LSN*newnode_lsn) {
INTPAIRARRAY ipa; INTPAIRARRAY ipa;
MALLOC_N(n_pairs, ipa.array); MALLOC_N(n_pairs, ipa.array);
if (ipa.array==0) return errno; if (ipa.array==0) return errno;
...@@ -378,6 +378,8 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff ...@@ -378,6 +378,8 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
} }
ipa.size=j; ipa.size=j;
int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, old_diskoff, new_diskoff, ipa); int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, old_diskoff, new_diskoff, ipa);
if (txn && oldnode_lsn) *oldnode_lsn = toku_txn_get_last_lsn(txn);
if (txn && newnode_lsn) *newnode_lsn = toku_txn_get_last_lsn(txn);
// if (0 && pma) { // if (0 && pma) {
// printf("Pma state:\n"); // printf("Pma state:\n");
// PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen, // PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen,
...@@ -389,7 +391,7 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff ...@@ -389,7 +391,7 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth /* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */ element, and return that slot's index in the smoothed array. */
int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx) { int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx, LSN *node_lsn) {
int i; int i;
int n_present=0; int n_present=0;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
...@@ -426,7 +428,8 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof ...@@ -426,7 +428,8 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof
tmppairs, n_saved, pma); tmppairs, n_saved, pma);
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, int r = pma_log_distribute(txn, filenum, diskoff, diskoff,
n_saved, n_saved,
tmppairs); tmppairs,
node_lsn, node_lsn);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
if (pma && !list_empty(&pma->cursors)) if (pma && !list_empty(&pma->cursors))
__pma_update_my_cursors(pma, tmppairs, n_present); __pma_update_my_cursors(pma, tmppairs, n_present);
...@@ -537,13 +540,14 @@ int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) { ...@@ -537,13 +540,14 @@ int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
return 0; return 0;
} }
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) { static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
unsigned int oldN = pma->N; unsigned int oldN = pma->N;
unsigned int n = pma_array_size(pma, asksize); unsigned int n = pma_array_size(pma, asksize);
int r = toku_resize_pma_exactly(pma, startz, n); int r = toku_resize_pma_exactly(pma, startz, n);
if (r!=0) return r; if (r!=0) return r;
toku_pmainternal_calculate_parameters(pma); toku_pmainternal_calculate_parameters(pma);
toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n); toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return 0; return 0;
} }
...@@ -769,7 +773,7 @@ int toku_pma_cursor_free (PMA_CURSOR *cursp) { ...@@ -769,7 +773,7 @@ int toku_pma_cursor_free (PMA_CURSOR *cursp) {
/* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */ /* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */
/* (Making space may involve moving things around, including the hole at index.) */ /* (Making space may involve moving things around, including the hole at index.) */
int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index) { int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) {
/* Within a range LO to HI we have a limit of how much packing we will tolerate. /* Within a range LO to HI we have a limit of how much packing we will tolerate.
* We allow the entire array to be 50% full. * We allow the entire array to be 50% full.
* We allow a region of size lgN to be full. * We allow a region of size lgN to be full.
...@@ -805,7 +809,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset ...@@ -805,7 +809,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
size*=2; size*=2;
// printf("pma_make_space_realloc %d to %d hi %d\n", pma->N, size, hi); // printf("pma_make_space_realloc %d to %d hi %d\n", pma->N, size, hi);
pma_resize_array(txn, filenum, offset, pma, size, hi); pma_resize_array(txn, filenum, offset, pma, size, hi, node_lsn);
hi=size; hi=size;
//printf("doubled N\n"); //printf("doubled N\n");
...@@ -818,7 +822,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset ...@@ -818,7 +822,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
//printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density); //printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density);
{ {
int sub_new_index; int sub_new_index;
int r = toku_pmainternal_smooth_region(txn, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index); int r = toku_pmainternal_smooth_region(txn, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index, node_lsn);
if (r!=0) return r; if (r!=0) return r;
*new_index=sub_new_index+lo; *new_index=sub_new_index+lo;
return 0; return 0;
...@@ -870,7 +874,7 @@ int toku_pma_free (PMA *pmap) { ...@@ -870,7 +874,7 @@ int toku_pma_free (PMA *pmap) {
/* Copies keylen and datalen */ /* Copies keylen and datalen */
/* returns an error if the key is already present. */ /* returns an error if the key is already present. */
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint) { int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) {
int found; int found;
unsigned int idx; unsigned int idx;
...@@ -885,13 +889,14 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK ...@@ -885,13 +889,14 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
assert(pma->pairs[idx]); assert(pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]); int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
return r; return r;
} else } else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */ return BRT_ALREADY_THERE; /* It is already here. Return an error. */
} }
if (kv_pair_inuse(pma->pairs[idx])) { if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx; unsigned int newidx;
int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx); /* returns the new idx. */ int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */
if (r!=0) return r; if (r!=0) return r;
idx = newidx; idx = newidx;
} }
...@@ -905,7 +910,9 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK ...@@ -905,7 +910,9 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
const struct kv_pair *pair = pma->pairs[idx]; const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) }; const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) }; const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
return toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data); int r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
} }
} }
...@@ -1050,7 +1057,8 @@ static void __pma_delete_at(PMA pma, int here) { ...@@ -1050,7 +1057,8 @@ static void __pma_delete_at(PMA pma, int here) {
int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */ int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff,
u_int32_t rand4fingerprint, u_int32_t *fingerprint) { u_int32_t rand4fingerprint, u_int32_t *fingerprint,
LSN *node_lsn) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r; int r;
unsigned int idx; unsigned int idx;
...@@ -1069,6 +1077,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1069,6 +1077,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv)); *fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv); r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r; if (r!=0) return r;
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
} }
if (v->size == (unsigned int) kv_pair_vallen(kv)) { if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size); memcpy(kv_pair_val(kv), v->data, v->size);
...@@ -1078,12 +1087,13 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1078,12 +1087,13 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
assert(pma->pairs[idx]); assert(pma->pairs[idx]);
} }
r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]); r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r; return r;
} }
if (kv_pair_inuse(pma->pairs[idx])) { if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx; unsigned int newidx;
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx); /* returns the new idx. */ r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
if (r!=0) return r; if (r!=0) return r;
idx=newidx; idx=newidx;
} }
...@@ -1099,6 +1109,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1099,6 +1109,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) }; const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) }; const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data); r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
} }
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r; return r;
...@@ -1218,9 +1229,9 @@ static void __pma_relocate_kvpairs(PMA pma) { ...@@ -1218,9 +1229,9 @@ static void __pma_relocate_kvpairs(PMA pma) {
int toku_pma_split(TOKUTXN txn, FILENUM filenum, int toku_pma_split(TOKUTXN txn, FILENUM filenum,
DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p, DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p, LSN *lsn,
DBT *splitk, DBT *splitk,
DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p) { DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p, LSN *newlsn) {
int error; int error;
int npairs; int npairs;
struct kv_pair_tag *pairs; struct kv_pair_tag *pairs;
...@@ -1294,11 +1305,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum, ...@@ -1294,11 +1305,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the second half of pairs into the right pma */ /* put the second half of pairs into the right pma */
/* Do this first, so that the logging will move the stuff out of the left pma first, and then later when we redistribute in the left PMA, we won't overwrite something. */ /* Do this first, so that the logging will move the stuff out of the left pma first, and then later when we redistribute in the left PMA, we won't overwrite something. */
n = npairs - spliti; n = npairs - spliti;
error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0); error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0, newlsn);
assert(error == 0); assert(error == 0);
distribute_data(newpma->pairs, toku_pma_index_limit(newpma), &pairs[spliti], n, newpma); distribute_data(newpma->pairs, toku_pma_index_limit(newpma), &pairs[spliti], n, newpma);
{ {
int r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti]); int r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti], lsn, newlsn);
if (r!=0) { toku_free(pairs); return r; } if (r!=0) { toku_free(pairs); return r; }
} }
#if PMA_USE_MEMPOOL #if PMA_USE_MEMPOOL
...@@ -1313,11 +1324,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum, ...@@ -1313,11 +1324,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the first half of pairs into the left pma */ /* put the first half of pairs into the left pma */
n = spliti; n = spliti;
error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0); // zeros the elements error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0, lsn); // zeros the elements
assert(error == 0); assert(error == 0);
distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma); distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma);
{ {
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0]); int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; } if (r!=0) { toku_free(pairs); return r; }
} }
// Don't have to relocate kvpairs, because these ones are still there. // Don't have to relocate kvpairs, because these ones are still there.
...@@ -1344,7 +1355,7 @@ static void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n ...@@ -1344,7 +1355,7 @@ static void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n
pma_mfree_kv_pair(pma, pairs[i].pair); pma_mfree_kv_pair(pma, pairs[i].pair);
} }
int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum) { int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum, LSN *node_lsn) {
struct kv_pair_tag *newpairs; struct kv_pair_tag *newpairs;
int i; int i;
int error; int error;
...@@ -1378,7 +1389,7 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, ...@@ -1378,7 +1389,7 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma,
} }
} }
error = pma_resize_array(txn, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0); error = pma_resize_array(txn, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0, node_lsn);
if (error) { if (error) {
__pma_bulk_cleanup(pma, newpairs, n_newpairs); __pma_bulk_cleanup(pma, newpairs, n_newpairs);
toku_free(newpairs); toku_free(newpairs);
......
...@@ -47,7 +47,7 @@ int toku_pma_n_entries (PMA); ...@@ -47,7 +47,7 @@ int toku_pma_n_entries (PMA);
//enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); //enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called. // The DB pointer is there so that the comparison function can be called.
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/); enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/, LSN *node_lsn);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
...@@ -61,7 +61,8 @@ int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fin ...@@ -61,7 +61,8 @@ int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fin
int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/, int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */ int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN /*txn*/, FILENUM, DISKOFF, TOKUTXN /*txn*/, FILENUM, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/); u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/,
LSN */*node_lsn*/);
/* Exposes internals of the PMA by returning a pointer to the guts. /* Exposes internals of the PMA by returning a pointer to the guts.
...@@ -87,9 +88,9 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*); ...@@ -87,9 +88,9 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
* The NEWPMA gets keys > pivot key * The NEWPMA gets keys > pivot key
*/ */
int toku_pma_split(TOKUTXN, FILENUM, int toku_pma_split(TOKUTXN, FILENUM,
DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/, DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/, LSN* /*lsn*/,
DBT */*splitk*/, DBT */*splitk*/,
DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/); DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/, LSN* /*newlsn*/);
/* /*
* Insert several key value pairs into an empty pma. * Insert several key value pairs into an empty pma.
...@@ -102,7 +103,7 @@ int toku_pma_split(TOKUTXN, FILENUM, ...@@ -102,7 +103,7 @@ int toku_pma_split(TOKUTXN, FILENUM,
* vals - an array of values * vals - an array of values
* n_newpairs - the number of key value pairs * n_newpairs - the number of key value pairs
*/ */
int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint); int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/);
/* Move the cursor to the beginning or the end or to a key */ /* Move the cursor to the beginning or the end or to a key */
int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS. int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS.
......
...@@ -194,6 +194,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { ...@@ -194,6 +194,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
VERIFY_COUNTS(n); VERIFY_COUNTS(n);
n->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n)); r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0); assert(r==0);
} }
...@@ -246,6 +247,7 @@ void toku_recover_addchild (struct logtype_addchild *le) { ...@@ -246,6 +247,7 @@ void toku_recover_addchild (struct logtype_addchild *le) {
node->u.n.n_bytes_in_buffer[le->childnum] = 0; node->u.n.n_bytes_in_buffer[le->childnum] = 0;
node->u.n.n_cursors[le->childnum] = 0; node->u.n.n_cursors[le->childnum] = 0;
node->u.n.n_children++; node->u.n.n_children++;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
...@@ -263,6 +265,7 @@ void toku_recover_setchild (struct logtype_setchild *le) { ...@@ -263,6 +265,7 @@ void toku_recover_setchild (struct logtype_setchild *le) {
assert(node->height>0); assert(node->height>0);
assert(le->childnum < (unsigned)node->u.n.n_children); assert(le->childnum < (unsigned)node->u.n.n_children);
node->u.n.children[le->childnum] = le->child; node->u.n.children[le->childnum] = le->child;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
...@@ -280,6 +283,7 @@ void toku_recover_setpivot (struct logtype_setpivot *le) { ...@@ -280,6 +283,7 @@ void toku_recover_setpivot (struct logtype_setpivot *le) {
node->u.n.childkeys[le->childnum] = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0); node->u.n.childkeys[le->childnum] = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0);
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[le->childnum]); node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[le->childnum]);
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
...@@ -297,6 +301,7 @@ void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint ...@@ -297,6 +301,7 @@ void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height>0); assert(node->height>0);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint; BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
...@@ -339,6 +344,7 @@ void toku_recover_insertinleaf (struct logtype_insertinleaf *c) { ...@@ -339,6 +344,7 @@ void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(c->key.data); toku_free(c->key.data);
...@@ -359,6 +365,7 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) { ...@@ -359,6 +365,7 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
return r; return r;
} }
...@@ -380,6 +387,7 @@ void toku_recover_resizepma (struct logtype_resizepma *c) { ...@@ -380,6 +387,7 @@ void toku_recover_resizepma (struct logtype_resizepma *c) {
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
...@@ -413,6 +421,8 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) { ...@@ -413,6 +421,8 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
VERIFY_COUNTS(nodea); VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb); VERIFY_COUNTS(nodeb);
nodea->log_lsn = c->lsn;
nodeb->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->old_diskoff, 1, toku_serialize_brtnode_size(nodea)); r = toku_cachetable_unpin(pair->cf, c->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
assert(r==0); assert(r==0);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb)); r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
...@@ -449,6 +459,8 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) ...@@ -449,6 +459,8 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn)
&nodeb->u.l.n_bytes_in_buffer, &nodea->u.l.n_bytes_in_buffer &nodeb->u.l.n_bytes_in_buffer, &nodea->u.l.n_bytes_in_buffer
); );
if (r!=0) goto died1; if (r!=0) goto died1;
nodea->log_lsn = le->lsn;
nodeb->log_lsn = le->lsn;
r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea)); r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb)); r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment