Commit 8e5f1d86 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

more merging of 1032 to 1032b. addresses #1032

git-svn-id: file:///svn/toku/tokudb.1032b@7779 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3d2f1b62
......@@ -3,14 +3,14 @@
#include "includes.h"
struct bread {
off_t current_offset; // The current offset to be read (in the file). That offset includes anything that is unread in the buffer.
toku_off_t current_offset; // The current offset to be read (in the file). That offset includes anything that is unread in the buffer.
int fd;
size_t bufsize;
char *buf; // A buffer of size bufsize;
int bufoff; // The current offset buf.
};
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize) {
BREAD create_bread_from_fd_initialize_at(int fd, toku_off_t filesize, size_t bufsize) {
BREAD MALLOC(result);
result->current_offset=filesize;
result->fd=fd;
......@@ -31,7 +31,7 @@ ssize_t bread_backwards(BREAD br, void *vbuf, size_t nbytes) {
char *buf=vbuf;
ssize_t result=0;
while (nbytes > 0) {
assert(br->current_offset >= (off_t)nbytes);
assert(br->current_offset >= (toku_off_t)nbytes);
// read whatever we can out of the buffer.
{
size_t to_copy = ((size_t)br->bufoff >= nbytes) ? nbytes : (size_t)br->bufoff;
......
......@@ -9,7 +9,7 @@
#include <sys/types.h>
typedef struct bread *BREAD;
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize);
BREAD create_bread_from_fd_initialize_at(int fd, toku_off_t filesize, size_t bufsize);
// Effect: Given a file descriptor, fd, that points at a file of size filesize, create a BREAD.
// Requires: The filesize must be correct. The fd must be an open fd.
......
......@@ -191,7 +191,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h);
int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h);
int toku_deserialize_brtheader_from (int fd, BLOCKNUM off, struct brt_header **brth);
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo); // Write a fifo into a disk, without worrying about fitting it into a block. This write is done at the end of the file.
int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo); // Write a fifo into a disk, without worrying about fitting it into a block. This write is done at the end of the file.
void toku_brtnode_free (BRTNODE *node);
......
......@@ -34,7 +34,7 @@ static void maybe_preallocate_in_file (int fd, u_int64_t size) {
const int N = umin64(size, 16<<20); // Double the size of the file, or add 16MB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
off_t start_write = alignup(sbuf.st_size, 4096);
toku_off_t start_write = alignup(sbuf.st_size, 4096);
assert(start_write >= sbuf.st_size);
ssize_t r = pwrite(fd, wbuf, N, start_write);
assert(r==N);
......@@ -62,7 +62,7 @@ unlock_for_pwrite (void) {
}
static ssize_t
toku_pwrite (int fd, const void *buf, size_t count, off_t offset)
toku_pwrite (int fd, const void *buf, size_t count, toku_off_t offset)
// requires that the pwrite has been locked
{
assert(pwrite_is_locked);
......@@ -84,7 +84,7 @@ static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokulea
4+ // localfingerprint
4); // crc32 at the end
static int deserialize_fifo_at (int fd, off_t at, FIFO *fifo);
static int deserialize_fifo_at (int fd, toku_off_t at, FIFO *fifo);
static int
addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
......@@ -134,7 +134,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
// This is the size of the uncompressed data, including the uncompressed header, and including the 4 bytes for the information about how big is the compressed version, and how big is the uncompressed version.
unsigned int toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead;
assert(sizeof(off_t)==8);
assert(sizeof(toku_off_t)==8);
if (node->height>0) {
result+=4; /* subtree fingerpirnt */
result+=4; /* n_children */
......@@ -861,7 +861,7 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
// For now, just do all the writes as separate system calls. This function is hardly ever called, and
// we might not be able to allocate a large enough buffer to hold everything,
// and it would be more complex to batch up several writes.
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) {
int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) {
//printf("%s:%d Serializing fifo at %" PRId64 " (count=%d)\n", __FILE__, __LINE__, freeoff, toku_fifo_n_entries(fifo));
lock_for_pwrite();
{
......@@ -902,7 +902,7 @@ int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) {
}
static int
read_int (int fd, off_t *at, u_int32_t *result) {
read_int (int fd, toku_off_t *at, u_int32_t *result) {
int v;
ssize_t r = pread(fd, &v, 4, *at);
if (r<0) return errno;
......@@ -913,7 +913,7 @@ read_int (int fd, off_t *at, u_int32_t *result) {
}
static int
read_char (int fd, off_t *at, char *result) {
read_char (int fd, toku_off_t *at, char *result) {
ssize_t r = pread(fd, result, 1, *at);
if (r<0) return errno;
assert(r==1);
......@@ -922,7 +922,7 @@ read_char (int fd, off_t *at, char *result) {
}
static int
read_u_int64_t (int fd, off_t *at, u_int64_t *result) {
read_u_int64_t (int fd, toku_off_t *at, u_int64_t *result) {
u_int32_t v1=0,v2=0;
int r;
if ((r = read_int(fd, at, &v1))) return r;
......@@ -932,7 +932,7 @@ read_u_int64_t (int fd, off_t *at, u_int64_t *result) {
}
static int
read_nbytes (int fd, off_t *at, char **data, u_int32_t len) {
read_nbytes (int fd, toku_off_t *at, char **data, u_int32_t len) {
char *result = toku_malloc(len);
if (result==0) return errno;
ssize_t r = pread(fd, result, len, *at);
......@@ -944,7 +944,7 @@ read_nbytes (int fd, off_t *at, char **data, u_int32_t len) {
return 0;
}
static int deserialize_fifo_at (int fd, off_t at, FIFO *fifo) {
static int deserialize_fifo_at (int fd, toku_off_t at, FIFO *fifo) {
FIFO result;
int r = toku_fifo_create(&result);
if (r) return r;
......
......@@ -369,7 +369,7 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
// }
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p thisnodename=%" PRId64 " keep_me=%u height=%d", __FILE__, __LINE__, brtnode, brtnode->thisnodename.b, keep_me, brtnode->height);
printf("%s:%d toku_brtnode_flush_callback %p thisnodename=%" PRId64 " keep_me=%u height=%d", __FILE__, __LINE__, brtnode, brtnode->thisnodename.b, (unsigned)keep_me, brtnode->height);
if (brtnode->height==0) printf(" buf=%p mempool-base=%p", brtnode->u.l.buffer, brtnode->u.l.buffer_mempool.base);
printf("\n");
}
......@@ -4136,7 +4136,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
{
data=data; datalen=datalen; keylen=keylen;
fprintf(file, "%*s xid=%"PRIu64" %u (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
fprintf(file, "%*s xid=%"PRIu64" %u (type=%d)\n", depth+2, "", xid, (unsigned)ntohl(*(int*)key), type);
//assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
});
......@@ -4144,7 +4144,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
for (i=0; i<node->u.n.n_children; i++) {
fprintf(file, "%*schild %d\n", depth, "", i);
if (i>0) {
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, (unsigned)ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
}
toku_dump_brtnode(file, brt, BNC_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1]->key,
......
......@@ -22,6 +22,7 @@ typedef unsigned int ITEMLEN;
typedef const void *bytevec;
//typedef const void *bytevec;
typedef int64_t toku_off_t;
typedef int64_t DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef u_int64_t TXNID;
typedef struct s_blocknum { int64_t b; } BLOCKNUM; // make a struct so that we will notice type problems.
......@@ -41,7 +42,9 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN;
/* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
#ifndef TOKU_WINDOWS
typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
#endif
typedef struct tokulogger *TOKULOGGER;
#define NULL_LOGGER ((TOKULOGGER)0)
......
......@@ -124,6 +124,7 @@ struct cachefile {
};
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
#if defined __linux__
{
static int did_mallopt = 0;
if (!did_mallopt) {
......@@ -131,6 +132,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
did_mallopt = 1;
}
}
#endif
TAGMALLOC(CACHETABLE, t);
if (t == 0) return ENOMEM;
t->n_in_table = 0;
......@@ -154,7 +156,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
r = toku_pthread_mutex_init(&t->mutex, 0); assert(r == 0);
// set the max number of writeback threads to min(MAX_WRITER_THREADS,nprocs_online)
int nprocs = sysconf(_SC_NPROCESSORS_ONLN);
int nprocs = os_get_number_active_processors();
if (nprocs > MAX_WRITER_THREADS) nprocs = MAX_WRITER_THREADS;
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
......@@ -565,34 +567,12 @@ static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
#endif
}
static unsigned long toku_maxrss=0;
unsigned long toku_get_maxrss(void) {
return toku_maxrss;
}
static unsigned long check_maxrss (void) __attribute__((__unused__));
static unsigned long check_maxrss (void) {
pid_t pid = getpid();
char fname[100];
snprintf(fname, sizeof(fname), "/proc/%d/statm", pid);
FILE *f = fopen(fname, "r");
unsigned long ignore = 0, rss = 0;
if (f) {
fscanf(f, "%lu %lu", &ignore, &rss);
fclose(f);
}
if (toku_maxrss<rss) toku_maxrss=rss;
return rss;
}
static int maybe_flush_some (CACHETABLE t, long size) {
int r = 0;
again:
if (size + t->size_current > t->size_limit + t->size_writing) {
{
unsigned long rss __attribute__((__unused__)) = check_maxrss();
//unsigned long rss __attribute__((__unused__)) = check_max_rss();
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
//struct mallinfo m = mallinfo();
//printf(" arena=%d hblks=%d hblkhd=%d\n", m.arena, m.hblks, m.hblkhd);
......@@ -1035,13 +1015,13 @@ int cachetable_fsync (CACHETABLE t) {
#endif
#if 0
int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, off_t offset) {
int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, toku_off_t offset) {
ssize_t r = pwrite(cf->fd, buf, count, offset);
if (r==-1) return errno;
assert((size_t)r==count);
return 0;
}
int cachefile_pread (CACHEFILE cf, void *buf, size_t count, off_t offset) {
int cachefile_pread (CACHEFILE cf, void *buf, size_t count, toku_off_t offset) {
ssize_t r = pread(cf->fd, buf, count, offset);
if (r==-1) return errno;
if (r==0) return -1; /* No error for EOF ??? */
......
......@@ -136,8 +136,8 @@ int toku_cachefile_flush (CACHEFILE);
void toku_cachefile_refup (CACHEFILE cfp);
// Return on success (different from pread and pwrite)
//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, off_t offset);
//int cachefile_pread (CACHEFILE, void *buf, size_t count, off_t offset);
//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, toku_off_t offset);
//int cachefile_pread (CACHEFILE, void *buf, size_t count, toku_off_t offset);
// Get the file descriptor associated with the cachefile
// Return the file descriptor
......
......@@ -7,9 +7,15 @@
#define _FILE_OFFSET_BITS 64
// Portability first!
#include "stdint.h"
#include "portability.h"
#include "os.h"
#if !defined(TOKU_WINDOWS)
#if defined(TOKU_WINDOWS)
#include "zlib.h"
#include "toku_pthread.h"
#include <dirent.h>
#else
#include <dirent.h>
#include <inttypes.h>
#include <toku_pthread.h>
......
......@@ -100,7 +100,7 @@ struct tokutxn {
size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
char *rollentry_filename;
int rollentry_fd; // If we spill the roll_entries, we write them into this fd.
off_t rollentry_filesize; // How many bytes are in the rollentry.
toku_off_t rollentry_filesize; // How many bytes are in the rollentry.
OMT open_brts; // a collection of the brts that we touched. Indexed by filenum.
};
......
......@@ -166,8 +166,8 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_maybe_spill_rollbacks (TOKUTXN txn);
struct roll_entry;
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn);
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn);
int toku_rollback_fileentries (int fd, toku_off_t filesize, TOKUTXN txn);
int toku_commit_fileentries (int fd, toku_off_t filesize, TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
......
......@@ -115,7 +115,7 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn);
}
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
int toku_commit_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
int r=0;
MEMARENA ma = memarena_create();
......@@ -133,7 +133,7 @@ int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
return r;
}
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) {
int toku_rollback_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
assert(f);
int r=0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment