Commit d95d9be0 authored by Rich Prohaska's avatar Rich Prohaska

add cursor next_dup, next_nodup, prev_dup, and prev_nodup. addresses #259

git-svn-id: file:///svn/tokudb@1585 c7de825b-a66e-492c-adef-691d508d4ae1
parent dc6da6a7
...@@ -2583,50 +2583,6 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT ...@@ -2583,50 +2583,6 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0; r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
case DB_NEXT_DUP:
if (cursor->path_len<=0) {
r = EINVAL; goto died0;
}
/* get the current key, move the cursor, get the new key and compare them
if the keys are the same then return the duplicate key and data */
DBT k1; memset(&k1, 0, sizeof k1);
r = toku_pma_cursor_get_current(cursor->pmacurs, &k1, 0, 1); if (r != 0) goto died0;
r = toku_pma_cursor_set_position_next(cursor->pmacurs);
if (r == 0) {
DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0, 0); if (r != 0) goto died0;
int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
toku_free(k2.data);
if (cmp == 0) {
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) {
toku_pma_cursor_set_position_prev(cursor->pmacurs); goto died0;
}
} else {
toku_pma_cursor_set_position_prev(cursor->pmacurs); r = DB_NOTFOUND; goto died0;
}
} else if (r == DB_NOTFOUND) {
/* we are at the end of the pma. move to the next tuple in the tree and search there */
r = brtcurs_set_position_next(cursor, 0, txn);
if (r != 0) {
unpin_cursor(cursor);
brtcurs_set_position_last(cursor, *rootp, kbt, txn);
goto died0;
}
DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0, 0); assert(r == 0);
int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
toku_free(k2.data);
if (cmp != 0) {
brtcurs_set_position_prev(cursor, 0, txn);
r = DB_NOTFOUND;
} else
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
} else
goto died0;
break;
case DB_PREV: case DB_PREV:
if (cursor->path_len<= 0) if (cursor->path_len<= 0)
goto do_db_last; goto do_db_last;
...@@ -2635,10 +2591,11 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT ...@@ -2635,10 +2591,11 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
case DB_CURRENT: case DB_CURRENT:
case DB_CURRENT+256:
if (cursor->path_len<=0) { if (cursor->path_len<=0) {
r = EINVAL; goto died0; r = EINVAL; goto died0;
} }
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0; r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, (flags&256)!=0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
case DB_SET: case DB_SET:
......
...@@ -845,9 +845,133 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) ...@@ -845,9 +845,133 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey)
return r; return r;
} }
static inline int keyeq(DBC *c, DBT *a, DBT *b) {
DB *db = c->dbp;
return db->i->brt->compare_fun(db, a, b) == 0;
}
static int toku_c_get_next_dup(DBC *c, DBT *key, DBT *data) {
int r;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT nkey; memset(&nkey, 0, sizeof nkey); nkey.flags = DB_DBT_REALLOC;
DBT nval; memset(&nval, 0, sizeof nval); nval.flags = DB_DBT_REALLOC;
r = c->c_get(c, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0) goto finish;
r = c->c_get(c, &nkey, &nval, DB_NEXT);
if (r != 0) {
int rr = c->c_get(c, &nkey, &nval, DB_LAST); assert(rr == 0); /* sticky */
goto finish;
}
if (!keyeq(c, &currentkey, &nkey)) {
int rr = c->c_get(c, &nkey, &nval, DB_PREV); assert(rr == 0); /* sticky */
r = DB_NOTFOUND;
goto finish;
}
r = c->c_get(c, key, data, DB_CURRENT);
finish:
if (nkey.data) toku_free(nkey.data);
if (nval.data) toku_free(nval.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
return r;
}
#ifdef DB_PREV_DUP
static int toku_c_get_prev_dup(DBC *c, DBT *key, DBT *data) {
int r;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT nkey; memset(&nkey, 0, sizeof nkey); nkey.flags = DB_DBT_REALLOC;
DBT nval; memset(&nval, 0, sizeof nval); nval.flags = DB_DBT_REALLOC;
r = c->c_get(c, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0) goto finish;
r = c->c_get(c, &nkey, &nval, DB_PREV);
if (r != 0) {
int rr = c->c_get(c, &nkey, &nval, DB_FIRST); assert(rr == 0); /* sticky */
goto finish;
}
if (!keyeq(c, &currentkey, &nkey)) {
int rr = c->c_get(c, &nkey, &nval, DB_NEXT); assert(rr == 0); /* sticky */
r = DB_NOTFOUND;
goto finish;
}
r = c->c_get(c, key, data, DB_CURRENT);
finish:
if (nkey.data) toku_free(nkey.data);
if (nval.data) toku_free(nval.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
return r;
}
#endif
static int toku_c_get_next_nodup(DBC *c, DBT *key, DBT *data) {
int r;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT nkey; memset(&nkey, 0, sizeof nkey); nkey.flags = DB_DBT_REALLOC;
DBT nval; memset(&nval, 0, sizeof nval); nval.flags = DB_DBT_REALLOC;
r = c->c_get(c, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0)
r = c->c_get(c, key, data, DB_FIRST);
else {
while (r == 0) {
r = c->c_get(c, &nkey, &nval, DB_NEXT);
if (r != 0) break;
if (!keyeq(c, &currentkey, &nkey)) break;
}
if (r == 0) r = c->c_get(c, key, data, DB_CURRENT);
}
if (nkey.data) toku_free(nkey.data);
if (nval.data) toku_free(nval.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
return r;
}
static int toku_c_get_prev_nodup(DBC *c, DBT *key, DBT *data) {
int r;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT nkey; memset(&nkey, 0, sizeof nkey); nkey.flags = DB_DBT_REALLOC;
DBT nval; memset(&nval, 0, sizeof nval); nval.flags = DB_DBT_REALLOC;
r = c->c_get(c, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0)
r = c->c_get(c, key, data, DB_LAST);
else {
while (r == 0) {
r = c->c_get(c, &nkey, &nval, DB_PREV);
if (r != 0) break;
if (!keyeq(c, &currentkey, &nkey)) break;
}
if (r == 0) r = c->c_get(c, key, data, DB_CURRENT);
}
if (nkey.data) toku_free(nkey.data);
if (nval.data) toku_free(nval.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
return r;
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) { static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp); HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_get(c->i->c, key, data, flag, c->i->txn ? c->i->txn->i->tokutxn : 0); int r;
u_int32_t op = flag & DB_OPFLAGS_MASK;
if (op == DB_NEXT_DUP)
r = toku_c_get_next_dup(c, key, data);
else if (op == DB_NEXT_NODUP)
r = toku_c_get_next_nodup(c, key, data);
else if (op == DB_PREV_NODUP)
r = toku_c_get_prev_nodup(c, key, data);
#ifdef DB_PREV_DUP
else if (op == DB_PREV_DUP)
r = toku_c_get_prev_dup(c, key, data);
#endif
else
r = toku_brt_cursor_get(c->i->c, key, data, flag, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment