Commit fe236125 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in changes. Refs #2608, #2609, and #2610. [t:2608] [t:2609] [t:2610]

{{{
svn merge -r 20187:20196 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20197 c7de825b-a66e-492c-adef-691d508d4ae1
parent dbd2d1b4
......@@ -65,7 +65,6 @@ BRT_SOURCES = \
logcursor \
memarena \
mempool \
merger \
minicron \
omt \
pqueue \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the brt_msg,
* which is the ephemeral version of the fifo_msg.
*/
......@@ -10,6 +7,10 @@
#ifndef TOKU_BRT_MSG_H
#define TOKU_BRT_MSG_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
......@@ -2,7 +2,9 @@
#ifndef TOKU_BRT_LOADER_H
#define TOKU_BRT_LOADER_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the fifo_msg,
* which is the stored representation of the brt_msg.
*
......@@ -12,6 +9,10 @@
#ifndef TOKU_FIFO_MSG_H
#define TOKU_FIFO_MSG_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
/* merger test.
* Test 0: A super-simple merge of one file with a few records.
* Test 1: A fairly simple merge of a few hundred records in a few files, the data in the files is interleaved randomly (that is the "pop" operation will tend to get data from a randomly chosen file.
* Test 2: A merge of a thousand files, (interleaved randomly), each file perhaps a megabyte. This test is intended to demonstrate performance improvements when we have a proper merge heap.
* Test 3: A merge of 10 files, (interleaved randomly) each file perhaps a gigabyte. This test is intended to demonstrate performance improvements when we pipeline the reads properly.
* Test 4: A merge of 100 files, presorted, each perhaps a 10MB. All the records in the first file are less than all the records in the second. This test is intended to show performance improvements when we prefer to refill the buffer that is the most empty.
*/
#include "toku_assert.h"
#include "merger.h"
#include <stdio.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <arpa/inet.h>
static int default_compare (DB *db __attribute__((__unused__)), const DBT *keya, const DBT *keyb) {
if (keya->size < keyb->size) {
int c = memcmp(keya->data, keyb->data, keya->size);
if (c==0) return -1; // tie breaker is the length, a is shorter so it's first.
else return c;
} else if (keya->size > keyb->size) {
int c = memcmp(keya->data, keyb->data, keyb->size);
if (c==0) return +1; // tie breaker is the length, a is longer so it's second
else return c;
} else {
return memcmp(keya->data, keyb->data, keya->size);
}
}
static void write_ij (FILE *f, u_int32_t i, u_int32_t j) {
u_int32_t len=sizeof(i);
{ int r = fwrite(&len, sizeof(len), 1, f); assert(r==1); }
i = htonl(i);
{ int r = fwrite(&i, sizeof(i), 1, f); assert(r==1); }
{ int r = fwrite(&len, sizeof(len), 1, f); assert(r==1); }
j = htonl(j);
{ int r = fwrite(&j, sizeof(j), 1, f); assert(r==1); }
}
static void check_ij (MERGER m, u_int32_t i, u_int32_t j) {
DBT key,val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
int r = merger_pop(m, &key, &val);
assert(r==0);
assert(key.size = 4);
assert(val.size = 4);
u_int32_t goti = ntohl(*(u_int32_t*)key.data);
u_int32_t gotj = ntohl(*(u_int32_t*)val.data);
//printf("Got %d,%d (expect %d,%d)\n", goti, gotj, i, j);
assert(goti == i);
assert(gotj == j);
}
static void check_nothing_is_left (MERGER m) {
DBT key,val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
int r = merger_pop(m, &key, &val);
assert(r!=0); // should be nothing left.
}
static void test0 (void) {
char *fname = "merger-test0.data";
char *fnames[] = {fname};
FILE *f = fopen(fname, "w");
assert(f);
const u_int32_t N = 100;
for (u_int32_t i=0; i<N; i++) {
write_ij(f, i, N-i);
}
{ int r = fclose(f); assert(r==0); }
MERGER m = create_merger (1, fnames, NULL, default_compare, NULL);
for (u_int32_t i=0; i<N; i++) {
check_ij(m, i, N-i);
}
check_nothing_is_left(m);
merger_close(m);
{ int r = unlink(fname); assert(r==0); }
}
static void test1 (void) {
const int NFILES = 10;
const u_int32_t NRECORDS = NFILES*100;
char *fnames[NFILES];
{
FILE *files[NFILES];
for (int i=0; i<NFILES; i++) {
char fname[] = "merger-test-XXX.data";
snprintf(fname, sizeof(fname), "merger-test-%3x.data", i);
fnames[i] = strdup(fname);
files [i] = fopen(fname, "w");
}
for (u_int32_t i=0; i<NRECORDS; i++) {
int fnum = random()%NFILES;
write_ij(files[fnum], i, 2*i);
}
for (int i=0; i<NFILES; i++) {
int r = fclose(files[i]); assert(r==0);
}
}
MERGER m = create_merger(NFILES, fnames, NULL, default_compare, NULL);
for (u_int32_t i=0; i<NRECORDS; i++) {
check_ij(m, i, 2*i);
}
check_nothing_is_left(m);
merger_close(m);
for (int i=0; i<NFILES; i++) {
free(fnames[i]);
}
}
int main (int argc, char *const argv[] __attribute__((__unused__))) {
assert(argc==1);
test0();
test1();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2009-2010 Tokutek Inc. All rights reserved."
/* See merger.h for a description of this module. */
#include <toku_portability.h>
#include "brttypes.h"
#include "merger.h"
#include <memory.h>
#include <toku_assert.h>
#include <string.h>
struct merger {
int n_files;
FILE **files;
BOOL *present;// one for each file: present[i] is TRUE if and only if keys[i] and vals[i] are valid. (If the file gets empty then files[i] will be NULL)
DBT *keys; // one for each file. ulen keeps track of how much memory is actually allocated.
DBT *vals; // one for each file.
DB *db;
COMPARISON_FUNCTION cf;
MEMORY_ALLOCATION_UPDATER mup;
};
MERGER create_merger (int n_files, char *file_names[n_files], DB *db, COMPARISON_FUNCTION cf, MEMORY_ALLOCATION_UPDATER mup) {
MERGER MALLOC(result);
result->n_files = n_files;
MALLOC_N(n_files, result->files);
MALLOC_N(n_files, result->present);
MALLOC_N(n_files, result->keys);
MALLOC_N(n_files, result->vals);
for (int i=0; i<n_files; i++) {
result->files[i] = fopen(file_names[i], "r");
assert(result->files[i]);
result->present[i] = FALSE;
memset(&result->keys[i], 0, sizeof(result->keys[0]));
memset(&result->vals[i], 0, sizeof(result->vals[0]));
}
result->db = db;
result->cf = cf;
result->mup = mup;
return result;
}
void merger_close (MERGER m) {
for (int i=0; i<m->n_files; i++) {
if (m->files[i]) {
int r = fclose(m->files[i]);
assert(r==0);
}
if (m->keys[i].data) {
toku_free(m->keys[i].data);
}
if (m->vals[i].data) {
toku_free(m->vals[i].data);
}
}
toku_free(m->files);
toku_free(m->present);
toku_free(m->keys);
toku_free(m->vals);
toku_free(m);
}
static int merge_fill_dbt (MERGER m, int i)
// Effect: Make sure that keys[i] has data in it and return 0.
// If we cannot, then return nonzero.
{
if (m->present[i]) return 0; // it's there, so we are OK.
if (m->files[i]==NULL) return -1; // the file was previously empty, so no more.
u_int32_t keylen;
{
int n = fread(&keylen, sizeof(keylen), 1, m->files[i]);
if (n!=1) {
// must have hit EOF, so close the file and set return -1.
int r = fclose(m->files[i]);
assert(r==0);
m->files[i] = NULL;
return -1;
}
}
// Got something, so we should be able to get the rest.
if (m->keys[i].ulen < keylen) {
m->keys[i].data = toku_xrealloc(m->keys[i].data, keylen);
m->keys[i].ulen = keylen;
}
{
size_t n = fread(m->keys[i].data, 1, keylen, m->files[i]);
assert(n==keylen);
}
u_int32_t vallen;
{
int n = fread(&vallen, sizeof(vallen), 1, m->files[i]);
assert(n==1);
}
if (m->vals[i].ulen < vallen) {
m->vals[i].data = toku_xrealloc(m->vals[i].data, vallen);
m->vals[i].ulen = vallen;
}
{
size_t n = fread(m->vals[i].data, 1, vallen, m->files[i]);
assert(n==vallen);
}
m->keys[i].size = keylen;
m->vals[i].size = vallen;
m->present[i] = TRUE;
return 0;
}
static int find_first_nonempty (MERGER m, int *besti) {
for (int i=0; i<m->n_files; i++) {
if (merge_fill_dbt(m, i)==0) {
*besti = i;
return 0;
}
}
return -1;
}
int merger_pop (MERGER m,
/*out*/ DBT *key,
/*out*/ DBT *val)
// This version is as simple as I can make it.
{
int firsti = -1;
if (find_first_nonempty(m, &firsti)) {
// there are no more nonempty rows.
return -1;
}
int besti = firsti;
// besti is the first nonempty item.
for (int i=firsti+1; i<m->n_files; i++) {
if (merge_fill_dbt(m, i)==0) {
// there is something there, so we continue
if (m->cf(m->db, &m->keys[besti], &m->keys[i])>0) {
// then i is the new besti.
besti = i;
}
}
}
// Now besti is the one to return.
*key = m->keys[besti];
*val = m->vals[besti];
m->present[besti] = FALSE;
return 0;
}
#ifndef TOKU_MERGER_H
#define TOKU_MERGER_H
/* This is a C header (no Cilk or C++ inside here) */
/* The merger abstraction:
*
* This module implements a multithreaded file merger, specialized for the temporary file format used by the loader.
* The input files have rows stored as follows
* <keylen (4 byte integer in native order)>
* <key (char[keylen])>
* <vallen (4 byte integer in native order)>
* <val (char[vallen])>
* The input files are sorted according to the comparison function.
*
* Given a bunch of input files each containing rows, the merger can produce the minimal row from all those files.
*
* The merger periodically asks for memory, and the allocated memory may go up or down. If the memory allocation increases, the merger may malloc() more memory.
* If the memory allocation decreases, the merger should free some memory.
*
* Implementation hints: The merger should double buffer its input.
* That is, for each file, the merger should use two buffers. It should fill the first buffer, and then in the background fill the other buffer.
* Whenever a buffer empties, we hope that the other buffer is full (if not we wait) and we swap buffers, and then have the background thread fill the other buffer.
* This strategy implies that there is a background thread filling those other buffers.
* The background thread may have several refillable buffers to choose from at any moment.
* There are two obvious approaches for choosing which buffer to refill next:
* 1) Refill the one that's been empty the longest.
* 2) Refill the one for which the "front" of the buffer is the most empty.
* The advantage of approach (1) is that it's simple, and less likely to have race conditions.
* The advantage of approach (2) is that if some buffer gets emptied quickly we start refilling it earlier, possibly avoiding a pipeline stall.
* This could be an issue if the data was already sorted, so that file[0] is always emptying first, then file[1], and so forth.
*/
#include "db.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct merger *MERGER;
typedef void (*MEMORY_ALLOCATION_UPDATER) (/*in */ size_t currently_using,
/*in */ size_t currently_requested,
/*out*/size_t *new_allocation);
typedef int (*COMPARISON_FUNCTION) (DB *db, const DBT *keya, const DBT *keyb);
MERGER create_merger (int n_files, char *file_names[n_files], DB *db, COMPARISON_FUNCTION f, MEMORY_ALLOCATION_UPDATER mup);
// Effect: Create a new merger, which will merge the files named by file_names.
// The comparison function, f, decides which rows are smaller when they come from different files.
// The merger calls mup, a memory allocation updater, periodically, with three arguments:
// currently_using how much memory is the merger currently using.
// currently_requested how much total memory would the merger like.
// new_allocation (out) how much memory the system says the merger may have. If new_allocation is more than currently_using, then the merger
// may allocate more memory (up to the new allocation). If new_allocation is less, then the merger must free some memory,
// (and it should call the mup function again to indicate that the memory has been reduced).
void merger_close (MERGER);
// Effect: Close the files and free the memory used by the merger.
int merger_pop (MERGER m,
/*out*/ DBT *key,
/*out*/ DBT *val);
// Effect: If there are any rows left then return the minimal row in *key and *val.
// The pointers to key and val remain valid until the next call to merger_pop or merger_close. That is, we force the flags to be 0 in the DBT.
// Requires: The flags in the dbts must be zero.
// Rationale: We are trying to make this path as fast as possible, so we don't want to copy the data unnecessarily, and we don't want to mess around with DB_DBT_MALLOC and so forth.
// It is fairly straightforward to keep the key and val "live": In most cases, the buffer is still valid. In the case where the key and val are the last
// item, then we must take care not to reuse the buffer until the next merger_pop.
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
#ifndef TOKU_PQUEUE_H
#define TOKU_PQUEUE_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
#ifndef TOKU_QUEUE_H
#define TOKU_QUEUE_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
......
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <stdio.h>
#include <string.h>
......
#ifndef TOKU_SUB_BLOCK_H
#define TOKU_SUB_BLOCK_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
#ifndef _TOKU_SUB_BLOCK_MAP_H
#define TOKU_SUB_BLOCK_MAP_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// Map objects to a sequence of sub block
struct sub_block_map {
u_int32_t idx;
......
......@@ -3,6 +3,11 @@
#ifndef TOKUCONST_H
#define TOKUCONST_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* The number of transaction ids stored in the xids structure is
* represented by an 8-bit value. The value 255 is reserved.
* The constant MAX_NESTED_TRANSACTIONS is one less because
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the nested transaction logic and nothing else. No internal
......@@ -9,6 +8,10 @@
#ifndef TOKU_ULE_H
#define TOKU_ULE_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
#ifndef _TOKU_WORKSET_H
#define _TOKU_WORKSET_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_list.h>
#include <toku_pthread.h>
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ifndef XIDS_INTERNAL_H
#define XIDS_INTERNAL_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Purpose of this file is to implement xids list of nested transactions
* ids.
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the xids and nothing else.
......@@ -15,6 +14,10 @@
#ifndef XIDS_H
#define XIDS_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "x1764.h"
#include "rbuf.h"
#include "wbuf.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment