Commit ffb0bd6d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

some basic stuff with cursor_next works

git-svn-id: file:///svn/tokudb@24 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2d089f2f
...@@ -650,13 +650,16 @@ void test_cursor_next (void) { ...@@ -650,13 +650,16 @@ void test_cursor_next (void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items(); //printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = brt_insert(brt, "hello", 6, "there", 6); r = brt_insert(brt, "hello", 6, "there", 6);
r = brt_insert(brt, "byebye", 7, "byenow", 7); r = brt_insert(brt, "byebye", 7, "byenow", 7);
printf("%s:%d calling brt_cursor(...)\n", __FILE__, __LINE__);
r = brt_cursor(brt, &cursor); assert(r==0); r = brt_cursor(brt, &cursor); assert(r==0);
r = ybt_init(&kbt); assert(r==0); r = ybt_init(&kbt); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items(); //printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = ybt_init(&vbt); assert(r==0); r = ybt_init(&vbt); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items(); //printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
printf("%s:%d calling brt_c_get(...)\n", __FILE__, __LINE__);
r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT); r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT);
printf("%s:%d called brt_c_get(...)\n", __FILE__, __LINE__);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items(); //printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
assert(r==0); assert(r==0);
assert(kbt.size==7); assert(kbt.size==7);
......
...@@ -110,6 +110,7 @@ int brtheader_fetch_callback (CACHEFILE cachefile, diskoff nodename, void **head ...@@ -110,6 +110,7 @@ int brtheader_fetch_callback (CACHEFILE cachefile, diskoff nodename, void **head
int read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) { int read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) {
void *header_p; void *header_p;
//fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__);
int r = cachetable_get_and_pin(cf, 0, &header_p, int r = cachetable_get_and_pin(cf, 0, &header_p,
brtheader_flush_callback, brtheader_fetch_callback, 0); brtheader_flush_callback, brtheader_fetch_callback, 0);
if (r!=0) return r; if (r!=0) return r;
...@@ -881,7 +882,7 @@ static int setup_brt_root_node (BRT t, diskoff offset) { ...@@ -881,7 +882,7 @@ static int setup_brt_root_node (BRT t, diskoff offset) {
return 0; return 0;
} }
#define BRT_TRACE //#define BRT_TRACE
#ifdef BRT_TRACE #ifdef BRT_TRACE
#define WHEN_BRTTRACE(x) x #define WHEN_BRTTRACE(x) x
#else #else
...@@ -894,8 +895,8 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, ...@@ -894,8 +895,8 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt,
BRT t; BRT t;
char *malloced_name=0; char *malloced_name=0;
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items(); //printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE: open_brt(%s, \"%s\", %d, %p, %d, %p)\n", WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE: %s:%d open_brt(%s, \"%s\", %d, %p, %d, %p)\n",
fname, dbname, is_create, newbrt, nodesize, cachetable)); __FILE__, __LINE__, fname, dbname, is_create, newbrt, nodesize, cachetable));
if ((MALLOC(t))==0) { if ((MALLOC(t))==0) {
assert(errno==ENOMEM); assert(errno==ENOMEM);
r = ENOMEM; r = ENOMEM;
...@@ -998,7 +999,7 @@ int close_brt (BRT brt) { ...@@ -998,7 +999,7 @@ int close_brt (BRT brt) {
r=brt_cursor_close(c); r=brt_cursor_close(c);
if (r!=0) return r; if (r!=0) return r;
} }
assert(0==cachefile_assert_all_unpinned(brt->cf)); assert(0==cachefile_count_pinned(brt->cf, 1));
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__); //printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
if ((r = cachefile_close(brt->cf))!=0) return r; if ((r = cachefile_close(brt->cf))!=0) return r;
if (brt->database_name) my_free(brt->database_name); if (brt->database_name) my_free(brt->database_name);
...@@ -1146,12 +1147,12 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec ...@@ -1146,12 +1147,12 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec
int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned int *vallen) { int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned int *vallen) {
int r; int r;
CACHEKEY *rootp; CACHEKEY *rootp;
assert(0==cachefile_assert_all_unpinned(brt->cf)); assert(0==cachefile_count_pinned(brt->cf, 1));
if ((r = read_and_pin_brt_header(brt->cf, &brt->h))) { if ((r = read_and_pin_brt_header(brt->cf, &brt->h))) {
printf("%s:%d\n", __FILE__, __LINE__); printf("%s:%d\n", __FILE__, __LINE__);
if (0) { died0: unpin_brt_header(brt); } if (0) { died0: unpin_brt_header(brt); }
printf("%s:%d returning %d\n", __FILE__, __LINE__, r); printf("%s:%d returning %d\n", __FILE__, __LINE__, r);
assert(0==cachefile_assert_all_unpinned(brt->cf)); assert(0==cachefile_count_pinned(brt->cf, 1));
return r; return r;
} }
rootp = calculate_root_offset_pointer(brt); rootp = calculate_root_offset_pointer(brt);
...@@ -1161,7 +1162,7 @@ int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned ...@@ -1161,7 +1162,7 @@ int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned
} }
//printf("%s:%d r=%d", __FILE__, __LINE__, r); if (r==0) printf(" vallen=%d", *vallen); printf("\n"); //printf("%s:%d r=%d", __FILE__, __LINE__, r); if (r==0) printf(" vallen=%d", *vallen); printf("\n");
if ((r = unpin_brt_header(brt))!=0) return r; if ((r = unpin_brt_header(brt))!=0) return r;
assert(0==cachefile_assert_all_unpinned(brt->cf)); assert(0==cachefile_count_pinned(brt->cf, 1));
return 0; return 0;
} }
...@@ -1496,12 +1497,14 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off) { ...@@ -1496,12 +1497,14 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off) {
} }
} }
/* reuqires that the cursor is initialized. */ /* requires that the cursor is initialized. */
int brtcurs_set_position_next (BRT_CURSOR cursor) { int brtcurs_set_position_next (BRT_CURSOR cursor) {
int r = pma_cursor_set_position_next(cursor->pmacurs); int r = pma_cursor_set_position_next(cursor->pmacurs);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
/* We fell off the end of the pma. */ /* We fell off the end of the pma. */
if (cursor->path_len==1) return DB_NOTFOUND;
fprintf(stderr, "Need to deal with falling off the end of the pma in a cursor\n"); fprintf(stderr, "Need to deal with falling off the end of the pma in a cursor\n");
/* Part of the trickyness is we need to leave the cursor pointing at the current (possibly deleted) value if there is no next value. */
abort(); abort();
} }
return 0; return 0;
...@@ -1525,6 +1528,7 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) { ...@@ -1525,6 +1528,7 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) {
CACHEKEY *rootp; CACHEKEY *rootp;
dump_brt(cursor->brt); dump_brt(cursor->brt);
fprintf(stderr, "%s:%d in brt_c_get(...)\n", __FILE__, __LINE__);
if ((r = read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h))) { if ((r = read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h))) {
if (0) { died0: unpin_brt_header(cursor->brt); } if (0) { died0: unpin_brt_header(cursor->brt); }
return r; return r;
...@@ -1541,20 +1545,24 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) { ...@@ -1541,20 +1545,24 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) {
r=pma_cget_current(cursor->pmacurs, kbt, vbt); r=pma_cget_current(cursor->pmacurs, kbt, vbt);
break; break;
case DB_FIRST: case DB_FIRST:
do_db_first:
r=unpin_cursor(cursor); if (r!=0) goto died0; r=unpin_cursor(cursor); if (r!=0) goto died0;
r=brtcurs_set_position_first(cursor, *rootp); if (r!=0) goto died0; r=brtcurs_set_position_first(cursor, *rootp); if (r!=0) goto died0;
r=pma_cget_current(cursor->pmacurs, kbt, vbt); r=pma_cget_current(cursor->pmacurs, kbt, vbt);
break; break;
case DB_NEXT: case DB_NEXT:
if (cursor->path_len<=0) return brt_c_get(cursor, kbt, vbt, (flags&(~DB_NEXT))|DB_FIRST); if (cursor->path_len<=0) {
goto do_db_first;
}
assert(cursor->path_len>0); assert(cursor->path_len>0);
r=brtcurs_set_position_next(cursor); if (r!=0) goto died0; r=brtcurs_set_position_next(cursor); if (r!=0) goto died0;
r=pma_cget_current(cursor->pmacurs, kbt, vbt); r=pma_cget_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
break; break;
default: default:
fprintf(stderr, "%s:%d c_get(...,%d) not ready\n", __FILE__, __LINE__, flags); fprintf(stderr, "%s:%d c_get(...,%d) not ready\n", __FILE__, __LINE__, flags);
abort(); abort();
} }
printf("%s:%d unpinning header\n", __FILE__, __LINE__);
if ((r = unpin_brt_header(cursor->brt))!=0) return r; if ((r = unpin_brt_header(cursor->brt))!=0) return r;
return 0; return 0;
} }
...@@ -143,21 +143,21 @@ int cachetable_assert_all_unpinned (CACHETABLE t) { ...@@ -143,21 +143,21 @@ int cachetable_assert_all_unpinned (CACHETABLE t) {
return some_pinned; return some_pinned;
} }
int cachefile_assert_all_unpinned (CACHEFILE cf) { int cachefile_count_pinned (CACHEFILE cf, int print_them) {
int i; int i;
int some_pinned=0; int n_pinned=0;
CACHETABLE t = cf->cachetable; CACHETABLE t = cf->cachetable;
for (i=0; i<t->table_size; i++) { for (i=0; i<t->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=t->table[i]; p; p=p->hash_chain) {
assert(p->pinned>=0); assert(p->pinned>=0);
if (p->pinned && p->cachefile==cf) { if (p->pinned && p->cachefile==cf) {
printf("%s:%d pinned: %lld (%p)\n", __FILE__, __LINE__, p->key, p->value); if (print_them) printf("%s:%d pinned: %lld (%p)\n", __FILE__, __LINE__, p->key, p->value);
some_pinned=1; n_pinned++;
} }
} }
} }
return some_pinned; return n_pinned;
} }
static unsigned int hash_key (const char *key, int keylen) { static unsigned int hash_key (const char *key, int keylen) {
...@@ -223,7 +223,7 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) { ...@@ -223,7 +223,7 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
unsigned int h = hashit(t, remove_me->key); unsigned int h = hashit(t, remove_me->key);
lru_remove(t, remove_me); lru_remove(t, remove_me);
//printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value); //printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value);
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->otherargs, remove_me->dirty && write_me)); WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->dirty && write_me));
remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->dirty && write_me, 0); remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->dirty && write_me, 0);
t->n_in_table--; t->n_in_table--;
// Remove it from the hash chain. // Remove it from the hash chain.
...@@ -233,7 +233,7 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) { ...@@ -233,7 +233,7 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
static void flush_and_keep (PAIR flush_me) { static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) { if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value, flush_me->otherargs)); WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, 1, 1); flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, 1, 1);
flush_me->dirty=0; flush_me->dirty=0;
} }
...@@ -332,7 +332,7 @@ int cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, void**value ...@@ -332,7 +332,7 @@ int cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, void**value
*value = p->value; *value = p->value;
p->pinned++; p->pinned++;
lru_touch(t,p); lru_touch(t,p);
printf("%s:%d cachtable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value); printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
return 0; return 0;
} }
} }
...@@ -344,15 +344,17 @@ int cachetable_unpin (CACHEFILE cachefile, CACHEKEY key, int dirty) { ...@@ -344,15 +344,17 @@ int cachetable_unpin (CACHEFILE cachefile, CACHEKEY key, int dirty) {
CACHETABLE t = cachefile->cachetable; CACHETABLE t = cachefile->cachetable;
int h = hashit(t,key); int h = hashit(t,key);
PAIR p; PAIR p;
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)\n", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
for (p=t->table[h]; p; p=p->hash_chain) { for (p=t->table[h]; p; p=p->hash_chain) {
if (p->key==key && p->cachefile==cachefile) { if (p->key==key && p->cachefile==cachefile) {
assert(p->pinned>0); assert(p->pinned>0);
p->pinned--; p->pinned--;
p->dirty |= dirty; p->dirty |= dirty;
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
return 0; return 0;
} }
} }
printf("\n");
return 0; return 0;
} }
......
...@@ -42,7 +42,7 @@ int cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, void**); ...@@ -42,7 +42,7 @@ int cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, void**);
int cachetable_unpin (CACHEFILE, CACHEKEY, int dirty); /* Note whether it is dirty when we unpin it. */ int cachetable_unpin (CACHEFILE, CACHEKEY, int dirty); /* Note whether it is dirty when we unpin it. */
int cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */ int cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */
int cachetable_assert_all_unpinned (CACHETABLE); int cachetable_assert_all_unpinned (CACHETABLE);
int cachefile_assert_all_unpinned (CACHEFILE); int cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */ //int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int cachetable_close (CACHETABLE); /* Flushes everything to disk, and destroys the cachetable. */ int cachetable_close (CACHETABLE); /* Flushes everything to disk, and destroys the cachetable. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment