Commit 9a8fd72c authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

associate implemented up to a point: put works, get doesn't. Addresses #48.

git-svn-id: file:///svn/tokudb@848 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4d38445d
...@@ -63,10 +63,13 @@ static inline void list_move(struct list *newhead, struct list *oldhead) { ...@@ -63,10 +63,13 @@ static inline void list_move(struct list *newhead, struct list *oldhead) {
list_init(oldhead); list_init(oldhead);
} }
// Note: Need an extra level of parens in these macros so that
// list_struct(h, foo, b)->zot
// will work right. Otherwise the type cast will try to include ->zot, and it will be all messed up.
#if defined(__GNUC__) && __GNUC__ >= 4 #if defined(__GNUC__) && __GNUC__ >= 4
#define list_struct(p, t, f) (t*)((char*)(p) - __builtin_offsetof(t, f)) #define list_struct(p, t, f) ((t*)((char*)(p) - __builtin_offsetof(t, f)))
#else #else
#define list_struct(p, t, f) (t*)((char*)(p) - ((char*)&((t*)0)->f)) #define list_struct(p, t, f) ((t*)((char*)(p) - ((char*)&((t*)0)->f)))
#endif #endif
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
LIBNAME=libdb LIBNAME=libdb
OPTFLAGS = -O2 # OPTFLAGS = -O2
CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS) CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS)
CPPFLAGS = -I../include -I../newbrt CPPFLAGS = -I../include -I../newbrt
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
......
...@@ -15,6 +15,7 @@ struct db_header { ...@@ -15,6 +15,7 @@ struct db_header {
}; };
struct __toku_db_internal { struct __toku_db_internal {
DB *db; // A pointer back to the DB.
int freed; int freed;
struct db_header *header; struct db_header *header;
int database_number; // -1 if it is the single unnamed database. Nonnengative number otherwise. int database_number; // -1 if it is the single unnamed database. Nonnengative number otherwise.
...@@ -28,5 +29,6 @@ struct __toku_db_internal { ...@@ -28,5 +29,6 @@ struct __toku_db_internal {
struct list associated; // All the associated databases. The primary is the head of the list. struct list associated; // All the associated databases. The primary is the head of the list.
DB *primary; // For secondary (associated) databases, what is the primary? NULL if not a secondary. DB *primary; // For secondary (associated) databases, what is the primary? NULL if not a secondary.
int(*associate_callback)(DB*, const DBT*, const DBT*, DBT*); // For secondary, the callback function for associate. NULL if not secondary int(*associate_callback)(DB*, const DBT*, const DBT*, DBT*); // For secondary, the callback function for associate. NULL if not secondary
int associate_is_immutable; // If this DB is a secondary then this field indicates that the index never changes due to updates.
}; };
#endif #endif
...@@ -522,8 +522,27 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) { ...@@ -522,8 +522,27 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) {
static int toku_db_associate (DB *primary, DB_TXN *txn, DB *secondary, static int toku_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result),
u_int32_t flags) { u_int32_t flags) {
primary=primary; txn=txn; secondary=secondary; callback=callback; flags=flags; if (secondary->i->primary) return EINVAL; // The secondary already has a primary
return EINVAL; if (primary->i->primary) return EINVAL; // The primary already has a primary
if (!list_empty(&secondary->i->associated)) return EINVAL; // The secondary is in some list (or it is a primary)
assert(secondary->i->associate_callback==0); // Something's wrong if this isn't null we made it this far.
secondary->i->associate_callback = callback;
#ifdef DB_IMMUTABLE_KEY
secondary->i->associate_is_immutable = (DB_IMMUTABLE_KEY&flags)!=0;
flags &= ~DB_IMMUTABLE_KEY;
#else
secondary->i->associate_is_immutable = 0;
#endif
if (flags!=0 && flags!=DB_CREATE) return EINVAL; // after removing DB_IMMUTABLE_KEY the flags better be 0 or DB_CREATE
if (flags==DB_CREATE) {
txn=txn;
// To do this: Open a cursor on the primary. Step through it all, doing the callbacks.
// Then insert each callback result into the secondary.
return -1; // We aren't ready for this case.
}
list_push(&primary->i->associated, &secondary->i->associated);
secondary->i->primary = primary;
return 0;
} }
static int toku_db_close(DB * db, u_int32_t flags) { static int toku_db_close(DB * db, u_int32_t flags) {
...@@ -733,6 +752,23 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -733,6 +752,23 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
return r; return r;
} }
static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary) {
DBT idx;
memset(&idx, 0, sizeof(idx));
int r = secondary->i->associate_callback(secondary, key, data, &idx);
if (r==DB_DONOTINDEX) return 0;
#ifdef DB_DBT_MULTIPLE
if (idx.flags & DB_DBT_MULTIPLE) {
return EINVAL; // We aren't ready for this
}
#endif
r = secondary->put(secondary, txn, &idx, key, 0);
if (idx.flags & DB_DBT_APPMALLOC) {
free(idx.data);
}
return r;
}
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r; int r;
...@@ -756,7 +792,18 @@ static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t f ...@@ -756,7 +792,18 @@ static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t f
r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0); r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0);
//printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r); //printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r);
return r; if (r!=0) return r;
// For each secondary add the relevant records.
if (db->i->associate_callback) {
struct list *h;
for (h=list_head(&db->i->associated); h!=&db->i->associated; h=h->next) {
struct __toku_db_internal *dbi=list_struct(h, struct __toku_db_internal, associated);
r=do_associated_inserts(txn, key, data, dbi->db);
if (r!=0) return r;
}
}
return 0;
} }
static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) { static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
...@@ -883,6 +930,7 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -883,6 +930,7 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
return ENOMEM; return ENOMEM;
} }
memset(result->i, 0, sizeof *result->i); memset(result->i, 0, sizeof *result->i);
result->i->db = result;
result->i->freed = 0; result->i->freed = 0;
result->i->header = 0; result->i->header = 0;
result->i->database_number = 0; result->i->database_number = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment