// Update operation codes.  These codes get stuffed into update messages, so they can not change.
// The operations are currently stored in a single byte in the update message, so only 256 operations
// are supported.  When we need more, we can use the last (255) code to indicate that the operation code
// is expanded beyond 1 byte.
enum {
    UPDATE_OP_COL_ADD_OR_DROP = 0,

    UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1,
    UPDATE_OP_EXPAND_INT = 2,
    UPDATE_OP_EXPAND_UINT = 3,
    UPDATE_OP_EXPAND_CHAR = 4,
    UPDATE_OP_EXPAND_BINARY = 5,

    UPDATE_OP_SIMPLE_UPDATE = 10,
    UPDATE_OP_SIMPLE_UPSERT = 11,
};

// Field types used in the update messages
enum {
    UPDATE_TYPE_UNKNOWN = 0,
    UPDATE_TYPE_INT = 1,
    UPDATE_TYPE_UINT = 2,
    UPDATE_TYPE_CHAR = 3,
    UPDATE_TYPE_BINARY = 4,
    UPDATE_TYPE_VARCHAR = 5,
    UPDATE_TYPE_VARBINARY = 6,
};

#define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP

// add or drop column sub-operations
#define COL_DROP 0xaa
#define COL_ADD 0xbb

// add or drop column types
#define COL_FIXED 0xcc
#define COL_VAR 0xdd
#define COL_BLOB 0xee

#define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8

// how much space do I need for the mutators?
// static stuff first:
// operation 1 == UP_COL_ADD_OR_DROP
// 8 - old null, new null
// 2 - old num_offset, new num_offset
// 8 - old fixed_field size, new fixed_field_size
// 8 - old and new length of offsets
// 8 - old and new starting null bit position
// TOTAL: 27

// dynamic stuff:
// 4 - number of columns
// for each column:
// 1 - add or drop
// 1 - is nullable
// 4 - if nullable, position
// 1 - if add, whether default is null or not
// 1 - if fixed, var, or not
//  for fixed, entire default
//  for var, 4 bytes length, then entire default
//  for blob, nothing
// So, an upperbound is 4 + num_fields(12) + all default stuff

// static blob stuff:
// 4 - num blobs
// 1 byte for each num blobs in old table
// So, an upperbound is 4 + kc_info->num_blobs

// dynamic blob stuff:
// for each blob added:
// 1 - state if we are adding or dropping
// 4 - blob index
// if add, 1 len bytes
//  at most, 4 0's
// So, upperbound is num_blobs(1+4+1+4) = num_columns*10

// The expand varchar offsets message is used to expand the size of an offset from 1 to 2 bytes.
//     operation          1 == UPDATE_OP_EXPAND_VARIABLE_OFFSETS
//     n_offsets          4 number of offsets
//     offset_start       4 starting offset of the variable length field offsets 

// These expand messages are used to expand the size of a fixed length field.  
// The field type is encoded in the operation code.
//     operation          1 == UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY
//     offset             4 offset of the field
//     old length         4 the old length of the field's value
//     new length         4 the new length of the field's value

//     operation          1 == UPDATE_OP_EXPAND_CHAR/BINARY
//     offset             4 offset of the field
//     old length         4 the old length of the field's value
//     new length         4 the new length of the field's value
//     pad char           1

// Simple row descriptor:
//     fixed field offset 4 offset of the beginning of the fixed fields
//     var field offset   4 offset of the variable length offsets
//     var_offset_bytes   1 size of each variable length offset
//     num_var_fields     4 number of variable length offsets

// Field descriptor:
//     field type         4 see field types above
//     field num          4 unused for fixed length fields
//     field null num     4 bit 31 is 1 if the field is nullible and the remaining bits contain the null bit number
//     field offset       4 for fixed fields, this is the offset from begining of the row of the field
//     field length       4 for fixed fields, this is the length of the field

// Simple update operation:
//     update operation   4 == { '=', '+', '-' }
//         x = k
//         x = x + k
//         x = x - k
//     field descriptor
//     optional value:
//         value length   4 == N, length of the value
//         value          N value to add or subtract

// Simple update message:
//     Operation          1 == UPDATE_OP_UPDATE_FIELD
//     Simple row descriptor
//     Number of update ops 4 == N
//     Uupdate ops [N]

// Simple upsert message:
//     Operation          1 == UPDATE_OP_UPSERT
//     Insert row:
//         length         4 == N
//         data           N
//     Simple row descriptor
//     Number of update ops 4 == N
//     Update ops [N] 

#include "tokudb_buffer.h"
#include "tokudb_math.h"

//
// checks whether the bit at index pos in data is set or not
//
static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) {
    uint32_t offset = pos/8;
    uchar remainder = pos%8; 
    uchar null_bit = 1<<remainder;
    return ((data[offset] & null_bit) != 0);
}

//
// sets the bit at index pos in data to 1 if is_null, 0 otherwise
// 
static inline void set_overall_null_position(uchar* data, uint32_t pos, bool is_null) {
    uint32_t offset = pos/8;
    uchar remainder = pos%8;
    uchar null_bit = 1<<remainder;
    if (is_null) {
        data[offset] |= null_bit;
    }
    else {
        data[offset] &= ~null_bit;
    }
}

static inline void copy_null_bits(
    uint32_t start_old_pos,
    uint32_t start_new_pos,
    uint32_t num_bits,
    uchar* old_null_bytes,
    uchar* new_null_bytes
    ) 
{
    for (uint32_t i = 0; i < num_bits; i++) {
        uint32_t curr_old_pos = i + start_old_pos;
        uint32_t curr_new_pos = i + start_new_pos;
        // copy over old null bytes
        if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) {
            set_overall_null_position(new_null_bytes,curr_new_pos,true);
        }
        else {
            set_overall_null_position(new_null_bytes,curr_new_pos,false);
        }
    }
}

static inline void copy_var_fields(
    uint32_t start_old_num_var_field, //index of var fields that we should start writing
    uint32_t num_var_fields, // number of var fields to copy
    uchar* old_var_field_offset_ptr, //static ptr to where offset bytes begin in old row
    uchar old_num_offset_bytes, //number of offset bytes used in old row
    uchar* start_new_var_field_data_ptr, // where the new var data should be written
    uchar* start_new_var_field_offset_ptr, // where the new var offsets should be written
    uchar* new_var_field_data_ptr, // pointer to beginning of var fields in new row
    uchar* old_var_field_data_ptr, // pointer to beginning of var fields in old row
    uint32_t new_num_offset_bytes, // number of offset bytes used in new row
    uint32_t* num_data_bytes_written,
    uint32_t* num_offset_bytes_written
    ) 
{
    uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr;
    uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr;
    for (uint32_t i = 0; i < num_var_fields; i++) {
        uint32_t field_len;
        uint32_t start_read_offset;
        uint32_t curr_old = i + start_old_num_var_field;
        uchar* data_to_copy = NULL;
        // get the length and pointer to data that needs to be copied
        get_var_field_info(
            &field_len, 
            &start_read_offset, 
            curr_old, 
            old_var_field_offset_ptr, 
            old_num_offset_bytes
            );
        data_to_copy = old_var_field_data_ptr + start_read_offset;
        // now need to copy field_len bytes starting from data_to_copy
        curr_new_var_field_data_ptr = write_var_field(
            curr_new_var_field_offset_ptr,
            curr_new_var_field_data_ptr,
            new_var_field_data_ptr,
            data_to_copy,
            field_len,
            new_num_offset_bytes
            );
        curr_new_var_field_offset_ptr += new_num_offset_bytes;
    }
    *num_data_bytes_written = (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr);
    *num_offset_bytes_written = (uint32_t)(curr_new_var_field_offset_ptr - start_new_var_field_offset_ptr);
}

static inline uint32_t copy_toku_blob(uchar* to_ptr, uchar* from_ptr, uint32_t len_bytes, bool skip) {
    uint32_t length = 0;
    if (!skip) {
        memcpy(to_ptr, from_ptr, len_bytes);
    }
    length = get_blob_field_len(from_ptr,len_bytes);
    if (!skip) {
        memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length);
    }
    return (length + len_bytes);
}

static int tokudb_hcad_update_fun(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    ) 
{
    uint32_t max_num_bytes;
    uint32_t num_columns;
    DBT new_val;
    uint32_t num_bytes_left;
    uint32_t num_var_fields_to_copy;
    uint32_t num_data_bytes_written = 0;
    uint32_t num_offset_bytes_written = 0;
    int error;
    memset(&new_val, 0, sizeof(DBT));
    uchar operation;
    uchar* new_val_data = NULL;
    uchar* extra_pos = NULL;
    uchar* extra_pos_start = NULL;
    //
    // info for pointers into rows
    //
    uint32_t old_num_null_bytes;
    uint32_t new_num_null_bytes;
    uchar old_num_offset_bytes;
    uchar new_num_offset_bytes;
    uint32_t old_fixed_field_size;
    uint32_t new_fixed_field_size;
    uint32_t old_len_of_offsets;
    uint32_t new_len_of_offsets;

    uchar* old_fixed_field_ptr = NULL;
    uchar* new_fixed_field_ptr = NULL;
    uint32_t curr_old_fixed_offset;
    uint32_t curr_new_fixed_offset;

    uchar* old_null_bytes = NULL;
    uchar* new_null_bytes = NULL;
    uint32_t curr_old_null_pos;
    uint32_t curr_new_null_pos;    
    uint32_t old_null_bits_left;
    uint32_t new_null_bits_left;
    uint32_t overall_null_bits_left;

    uint32_t old_num_var_fields;
    // uint32_t new_num_var_fields;
    uint32_t curr_old_num_var_field;
    uint32_t curr_new_num_var_field;
    uchar* old_var_field_offset_ptr = NULL;
    uchar* new_var_field_offset_ptr = NULL;
    uchar* curr_new_var_field_offset_ptr = NULL;
    uchar* old_var_field_data_ptr = NULL;
    uchar* new_var_field_data_ptr = NULL;
    uchar* curr_new_var_field_data_ptr = NULL;

    uint32_t start_blob_offset;
    uchar* start_blob_ptr;
    uint32_t num_blob_bytes;

    // came across a delete, nothing to update
    if (old_val == NULL) {
        error = 0;
        goto cleanup;
    }

    extra_pos_start = (uchar *)extra->data;
    extra_pos = (uchar *)extra->data;

    operation = extra_pos[0];
    extra_pos++;
    assert(operation == UP_COL_ADD_OR_DROP);

    memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);
    memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);

    old_num_offset_bytes = extra_pos[0];
    extra_pos++;
    new_num_offset_bytes = extra_pos[0];
    extra_pos++;

    memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);
    memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);

    memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);
    memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);

    max_num_bytes = old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size;
    new_val_data = (uchar *)my_malloc(
        max_num_bytes, 
        MYF(MY_FAE)
        );
    if (new_val_data == NULL) { goto cleanup; }

    old_fixed_field_ptr = (uchar *) old_val->data;
    old_fixed_field_ptr += old_num_null_bytes;
    new_fixed_field_ptr = new_val_data + new_num_null_bytes;
    curr_old_fixed_offset = 0;
    curr_new_fixed_offset = 0;

    old_num_var_fields = old_len_of_offsets/old_num_offset_bytes;
    // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes;
    // following fields will change as we write the variable data
    old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size;
    new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size;
    old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets;
    new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets;
    curr_new_var_field_offset_ptr = new_var_field_offset_ptr;
    curr_new_var_field_data_ptr = new_var_field_data_ptr;
    curr_old_num_var_field = 0;
    curr_new_num_var_field = 0;

    old_null_bytes = (uchar *)old_val->data;
    new_null_bytes = new_val_data;

    
    memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);
    memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t));
    extra_pos += sizeof(uint32_t);

    memcpy(&num_columns, extra_pos, sizeof(num_columns));
    extra_pos += sizeof(num_columns);
    
    //
    // now go through and apply the change into new_val_data
    //
    for (uint32_t i = 0; i < num_columns; i++) {
        uchar op_type = extra_pos[0];
        bool is_null_default = false;
        extra_pos++;

        assert(op_type == COL_DROP || op_type == COL_ADD);
        bool nullable = (extra_pos[0] != 0);
        extra_pos++;
        if (nullable) {
            uint32_t null_bit_position;
            memcpy(&null_bit_position, extra_pos, sizeof(uint32_t));
            extra_pos += sizeof(uint32_t);
            uint32_t num_bits;
            if (op_type == COL_DROP) {
                assert(curr_old_null_pos <= null_bit_position);
                num_bits = null_bit_position - curr_old_null_pos;
            }
            else {
                assert(curr_new_null_pos <= null_bit_position);
                num_bits = null_bit_position - curr_new_null_pos;
            }
            copy_null_bits(
                curr_old_null_pos,
                curr_new_null_pos,
                num_bits,
                old_null_bytes,
                new_null_bytes
                );
            // update the positions
            curr_new_null_pos += num_bits;
            curr_old_null_pos += num_bits;
            if (op_type == COL_DROP) {
                curr_old_null_pos++; // account for dropped column
            }
            else {
                is_null_default = (extra_pos[0] != 0);
                extra_pos++;
                set_overall_null_position(
                    new_null_bytes,
                    null_bit_position,
                    is_null_default
                    );
                curr_new_null_pos++; //account for added column
            }
        }
        uchar col_type = extra_pos[0];
        extra_pos++;
        if (col_type == COL_FIXED) {
            uint32_t col_offset;
            uint32_t col_size;
            uint32_t num_bytes_to_copy;
            memcpy(&col_offset, extra_pos, sizeof(uint32_t));
            extra_pos += sizeof(uint32_t);
            memcpy(&col_size, extra_pos, sizeof(uint32_t));
            extra_pos += sizeof(uint32_t);

            if (op_type == COL_DROP) {
                num_bytes_to_copy = col_offset - curr_old_fixed_offset;
            }
            else {
                num_bytes_to_copy = col_offset - curr_new_fixed_offset;
            }
            memcpy(
                new_fixed_field_ptr + curr_new_fixed_offset,
                old_fixed_field_ptr + curr_old_fixed_offset, 
                num_bytes_to_copy
                );
            curr_old_fixed_offset += num_bytes_to_copy;
            curr_new_fixed_offset += num_bytes_to_copy;
            if (op_type == COL_DROP) {
                // move old_fixed_offset val to skip OVER column that is being dropped
                curr_old_fixed_offset += col_size;
            }
            else {
                if (is_null_default) {
                    // copy zeroes
                    memset(new_fixed_field_ptr + curr_new_fixed_offset, 0, col_size);
                }
                else {
                    // copy data from extra_pos into new row
                    memcpy(
                        new_fixed_field_ptr + curr_new_fixed_offset,
                        extra_pos,
                        col_size
                        );
                    extra_pos += col_size;
                }
                curr_new_fixed_offset += col_size;
            }
            
        }
        else if (col_type == COL_VAR) {
            uint32_t var_col_index;
            memcpy(&var_col_index, extra_pos, sizeof(uint32_t));
            extra_pos += sizeof(uint32_t);
            if (op_type == COL_DROP) {
                num_var_fields_to_copy = var_col_index - curr_old_num_var_field;
            }
            else {
                num_var_fields_to_copy = var_col_index - curr_new_num_var_field;
            }
            copy_var_fields(
                curr_old_num_var_field,
                num_var_fields_to_copy,
                old_var_field_offset_ptr,
                old_num_offset_bytes,
                curr_new_var_field_data_ptr,
                curr_new_var_field_offset_ptr,
                new_var_field_data_ptr, // pointer to beginning of var fields in new row
                old_var_field_data_ptr, // pointer to beginning of var fields in old row
                new_num_offset_bytes, // number of offset bytes used in new row
                &num_data_bytes_written,
                &num_offset_bytes_written
                );
            curr_new_var_field_data_ptr += num_data_bytes_written;
            curr_new_var_field_offset_ptr += num_offset_bytes_written;
            curr_new_num_var_field += num_var_fields_to_copy;
            curr_old_num_var_field += num_var_fields_to_copy;
            if (op_type == COL_DROP) {
                curr_old_num_var_field++; // skip over dropped field
            }
            else {
                if (is_null_default) {
                    curr_new_var_field_data_ptr = write_var_field(
                        curr_new_var_field_offset_ptr,
                        curr_new_var_field_data_ptr,
                        new_var_field_data_ptr,
                        NULL, //copying no data
                        0, //copying 0 bytes
                        new_num_offset_bytes
                        );
                    curr_new_var_field_offset_ptr += new_num_offset_bytes;
                }
                else {
                    uint32_t data_length;
                    memcpy(&data_length, extra_pos, sizeof(data_length));
                    extra_pos += sizeof(data_length);
                    curr_new_var_field_data_ptr = write_var_field(
                        curr_new_var_field_offset_ptr,
                        curr_new_var_field_data_ptr,
                        new_var_field_data_ptr,
                        extra_pos, //copying data from mutator
                        data_length, //copying data_length bytes
                        new_num_offset_bytes
                        );
                    extra_pos += data_length;
                    curr_new_var_field_offset_ptr += new_num_offset_bytes;
                }
                curr_new_num_var_field++; //account for added column
            }
        }
        else if (col_type == COL_BLOB) {
            // handle blob data later
            continue;
        }
        else {
            assert(false);
        }
    }
    // finish copying the null stuff
    old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos;
    new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos;
    overall_null_bits_left = old_null_bits_left;
    set_if_smaller(overall_null_bits_left, new_null_bits_left);
    copy_null_bits(
        curr_old_null_pos,
        curr_new_null_pos,
        overall_null_bits_left,
        old_null_bytes,
        new_null_bytes
        );
    // finish copying fixed field stuff
    num_bytes_left = old_fixed_field_size - curr_old_fixed_offset;
    memcpy(
        new_fixed_field_ptr + curr_new_fixed_offset,
        old_fixed_field_ptr + curr_old_fixed_offset, 
        num_bytes_left
        );
    curr_old_fixed_offset += num_bytes_left;
    curr_new_fixed_offset += num_bytes_left;
    // sanity check
    assert(curr_new_fixed_offset == new_fixed_field_size);

    // finish copying var field stuff
    num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field;
    copy_var_fields(
        curr_old_num_var_field,
        num_var_fields_to_copy,
        old_var_field_offset_ptr,
        old_num_offset_bytes,
        curr_new_var_field_data_ptr,
        curr_new_var_field_offset_ptr,
        new_var_field_data_ptr, // pointer to beginning of var fields in new row
        old_var_field_data_ptr, // pointer to beginning of var fields in old row
        new_num_offset_bytes, // number of offset bytes used in new row
        &num_data_bytes_written,
        &num_offset_bytes_written
        );
    curr_new_var_field_offset_ptr += num_offset_bytes_written;
    curr_new_var_field_data_ptr += num_data_bytes_written;
    // sanity check
    assert(curr_new_var_field_offset_ptr == new_var_field_data_ptr);

    // start handling blobs
    get_blob_field_info(
        &start_blob_offset, 
        old_len_of_offsets,
        old_var_field_data_ptr,
        old_num_offset_bytes
        );
    start_blob_ptr = old_var_field_data_ptr + start_blob_offset;
    // if nothing else in extra, then there are no blobs to add or drop, so can copy blobs straight
    if ((extra_pos - extra_pos_start) == extra->size) {
        num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes);
        memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes);
        curr_new_var_field_data_ptr += num_blob_bytes;
    }
    // else, there is blob information to process
    else {
        uchar* len_bytes = NULL;
        uint32_t curr_old_blob = 0;
        uint32_t curr_new_blob = 0;
        uint32_t num_old_blobs = 0;
        uchar* curr_old_blob_ptr = start_blob_ptr;
        memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs));
        extra_pos += sizeof(num_old_blobs);
        len_bytes = extra_pos;
        extra_pos += num_old_blobs;
        // copy over blob fields one by one
        while ((extra_pos - extra_pos_start) < extra->size) {
            uchar op_type = extra_pos[0];
            extra_pos++;
            uint32_t num_blobs_to_copy = 0;
            uint32_t blob_index;
            memcpy(&blob_index, extra_pos, sizeof(blob_index));
            extra_pos += sizeof(blob_index);
            assert (op_type == COL_DROP || op_type == COL_ADD);
            if (op_type == COL_DROP) {
                num_blobs_to_copy = blob_index - curr_old_blob;
            }
            else {
                num_blobs_to_copy = blob_index - curr_new_blob;
            }
            for (uint32_t i = 0; i < num_blobs_to_copy; i++) {
                uint32_t num_bytes_written = copy_toku_blob(
                    curr_new_var_field_data_ptr,
                    curr_old_blob_ptr,
                    len_bytes[curr_old_blob + i],
                    false
                    );
                curr_old_blob_ptr += num_bytes_written;
                curr_new_var_field_data_ptr += num_bytes_written;
            }
            curr_old_blob += num_blobs_to_copy;
            curr_new_blob += num_blobs_to_copy;
            if (op_type == COL_DROP) {
                // skip over blob in row
                uint32_t num_bytes = copy_toku_blob(
                    NULL,
                    curr_old_blob_ptr,
                    len_bytes[curr_old_blob],
                    true
                    );
                curr_old_blob++;
                curr_old_blob_ptr += num_bytes;
            }
            else {
                // copy new data
                uint32_t new_len_bytes = extra_pos[0];
                extra_pos++;
                uint32_t num_bytes = copy_toku_blob(
                    curr_new_var_field_data_ptr,
                    extra_pos,
                    new_len_bytes,
                    false
                    );
                curr_new_blob++;
                curr_new_var_field_data_ptr += num_bytes;
                extra_pos += num_bytes;
            }                
        }
        num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes);
        memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes);
        curr_new_var_field_data_ptr += num_blob_bytes;
    }
    new_val.data = new_val_data;
    new_val.size = curr_new_var_field_data_ptr - new_val_data;
    set_val(&new_val, set_extra);
    
    error = 0;
cleanup:
    my_free(new_val_data, MYF(MY_ALLOW_ZERO_PTR));
    return error;    
}

// Expand the variable offset array in the old row given the update mesage in the extra.
static int tokudb_expand_variable_offsets(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    ) 
{
    int error = 0;
    tokudb::buffer extra_val(extra->data, 0, extra->size);

    // decode the operation
    uchar operation;
    extra_val.consume(&operation, sizeof operation);
    assert(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS);

    // decode number of offsets
    uint32_t number_of_offsets;
    extra_val.consume(&number_of_offsets, sizeof number_of_offsets);

    // decode the offset start
    uint32_t offset_start;
    extra_val.consume(&offset_start, sizeof offset_start);

    assert(extra_val.size() == extra_val.limit());

    DBT new_val; memset(&new_val, 0, sizeof new_val);

    if (old_val != NULL) {
        assert(offset_start + number_of_offsets < old_val->size);
    
        // compute the new val from the old val
        uchar *old_val_ptr = (uchar *)old_val->data;

        // allocate space for the new val's data
        uchar *new_val_ptr = (uchar *)my_malloc(number_of_offsets + old_val->size, MYF(MY_FAE));
        if (!new_val_ptr) {
            error = ENOMEM;
            goto cleanup;
        }
        new_val.data = new_val_ptr;
        
        // copy up to the start of the varchar offset
        memcpy(new_val_ptr, old_val_ptr, offset_start);
        new_val_ptr += offset_start;
        old_val_ptr += offset_start;
        
        // expand each offset from 1 to 2 bytes
        for (uint32_t i = 0; i < number_of_offsets; i++) {
            uint16_t new_offset = *old_val_ptr;
            int2store(new_val_ptr, new_offset);
            new_val_ptr += 2;
            old_val_ptr += 1;
        }
        
        // copy the rest of the row
        size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
        memcpy(new_val_ptr, old_val_ptr, n);
        new_val_ptr += n;
        old_val_ptr += n;
        new_val.size = new_val_ptr - (uchar *)new_val.data;

        assert(new_val_ptr == (uchar *)new_val.data + new_val.size);
        assert(old_val_ptr == (uchar *)old_val->data + old_val->size);

        // set the new val
        set_val(&new_val, set_extra);
    }

    error = 0;

cleanup:
    my_free(new_val.data, MYF(MY_ALLOW_ZERO_PTR));        

    return error;
}

// Expand an int field in a old row given the expand message in the extra.
static int tokudb_expand_int_field(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    ) 
{
    int error = 0;
    tokudb::buffer extra_val(extra->data, 0, extra->size);

    uchar operation;
    extra_val.consume(&operation, sizeof operation);
    assert(operation == UPDATE_OP_EXPAND_INT || operation == UPDATE_OP_EXPAND_UINT);
    uint32_t the_offset;
    extra_val.consume(&the_offset, sizeof the_offset);
    uint32_t old_length;
    extra_val.consume(&old_length, sizeof old_length);
    uint32_t new_length;
    extra_val.consume(&new_length, sizeof new_length);
    assert(extra_val.size() == extra_val.limit());

    assert(new_length >= old_length); // expand only

    DBT new_val; memset(&new_val, 0, sizeof new_val);

    if (old_val != NULL) {
        assert(the_offset + old_length <= old_val->size); // old field within the old val

        // compute the new val from the old val
        uchar *old_val_ptr = (uchar *)old_val->data;

        // allocate space for the new val's data
        uchar *new_val_ptr = (uchar *)my_malloc(old_val->size + (new_length - old_length), MYF(MY_FAE));
        if (!new_val_ptr) {
            error = ENOMEM;
            goto cleanup;
        }
        new_val.data = new_val_ptr;
        
        // copy up to the old offset
        memcpy(new_val_ptr, old_val_ptr, the_offset);
        new_val_ptr += the_offset;
        old_val_ptr += the_offset;
        
        switch (operation) {
        case UPDATE_OP_EXPAND_INT:
            // fill the entire new value with ones or zeros depending on the sign bit
            // the encoding is little endian
            memset(new_val_ptr, (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00, new_length);
            memcpy(new_val_ptr, old_val_ptr, old_length);  // overlay the low bytes of the new value with the old value
            new_val_ptr += new_length;
            old_val_ptr += old_length;
            break;
        case UPDATE_OP_EXPAND_UINT:
            memset(new_val_ptr, 0, new_length);            // fill the entire new value with zeros
            memcpy(new_val_ptr, old_val_ptr, old_length);  // overlay the low bytes of the new value with the old value
            new_val_ptr += new_length;
            old_val_ptr += old_length;
            break;
        default:
            assert(0);
        }
        
        // copy the rest
        size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
        memcpy(new_val_ptr, old_val_ptr, n);
        new_val_ptr += n;
        old_val_ptr += n;
        new_val.size = new_val_ptr - (uchar *)new_val.data;

        assert(new_val_ptr == (uchar *)new_val.data + new_val.size);
        assert(old_val_ptr == (uchar *)old_val->data + old_val->size);

        // set the new val
        set_val(&new_val, set_extra);
    }

    error = 0;

cleanup:
    my_free(new_val.data, MYF(MY_ALLOW_ZERO_PTR));        

    return error;
}

// Expand a char field in a old row given the expand message in the extra.
static int tokudb_expand_char_field(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    ) 
{
    int error = 0;
    tokudb::buffer extra_val(extra->data, 0, extra->size);

    uchar operation;
    extra_val.consume(&operation, sizeof operation);
    assert(operation == UPDATE_OP_EXPAND_CHAR || operation == UPDATE_OP_EXPAND_BINARY);
    uint32_t the_offset;
    extra_val.consume(&the_offset, sizeof the_offset);
    uint32_t old_length;
    extra_val.consume(&old_length, sizeof old_length);
    uint32_t new_length;
    extra_val.consume(&new_length, sizeof new_length);
    uchar pad_char;
    extra_val.consume(&pad_char, sizeof pad_char);
    assert(extra_val.size() == extra_val.limit());

    assert(new_length >= old_length); // expand only

    DBT new_val; memset(&new_val, 0, sizeof new_val);

    if (old_val != NULL) {
        assert(the_offset + old_length <= old_val->size); // old field within the old val

        // compute the new val from the old val
        uchar *old_val_ptr = (uchar *)old_val->data;

        // allocate space for the new val's data
        uchar *new_val_ptr = (uchar *)my_malloc(old_val->size + (new_length - old_length), MYF(MY_FAE));
        if (!new_val_ptr) {
            error = ENOMEM;
            goto cleanup;
        }
        new_val.data = new_val_ptr;
        
        // copy up to the old offset
        memcpy(new_val_ptr, old_val_ptr, the_offset);
        new_val_ptr += the_offset;
        old_val_ptr += the_offset;
        
        switch (operation) {
        case UPDATE_OP_EXPAND_CHAR:
        case UPDATE_OP_EXPAND_BINARY:
            memset(new_val_ptr, pad_char, new_length);     // fill the entire new value with the pad char
            memcpy(new_val_ptr, old_val_ptr, old_length);  // overlay the low bytes of the new value with the old value
            new_val_ptr += new_length;
            old_val_ptr += old_length;
            break;
        default:
            assert(0);
        }
        
        // copy the rest
        size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
        memcpy(new_val_ptr, old_val_ptr, n);
        new_val_ptr += n;
        old_val_ptr += n;
        new_val.size = new_val_ptr - (uchar *)new_val.data;

        assert(new_val_ptr == (uchar *)new_val.data + new_val.size);
        assert(old_val_ptr == (uchar *)old_val->data + old_val->size);

        // set the new val
        set_val(&new_val, set_extra);
    }

    error = 0;

cleanup:
    my_free(new_val.data, MYF(MY_ALLOW_ZERO_PTR));        

    return error;
}

class Simple_row_descriptor {
public:
    Simple_row_descriptor() : m_fixed_field_offset(0), m_var_field_offset(0), m_var_offset_bytes(0), m_num_var_fields(0) {
    }
    ~Simple_row_descriptor() {
    }
    void consume(tokudb::buffer &b) {
        b.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
        b.consume(&m_var_field_offset, sizeof m_var_field_offset);
        b.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
        b.consume(&m_num_var_fields, sizeof m_num_var_fields);
    }
    void append(tokudb::buffer &b) {
        b.append(&m_fixed_field_offset, sizeof m_fixed_field_offset);
        b.append(&m_var_field_offset, sizeof m_var_field_offset);
        b.append(&m_var_offset_bytes, sizeof m_var_offset_bytes);
        b.append(&m_num_var_fields, sizeof m_num_var_fields);
    }
public:
    uint32_t m_fixed_field_offset;
    uint32_t m_var_field_offset;
    uint8_t  m_var_offset_bytes;
    uint32_t m_num_var_fields;
};

// Update a fixed field: new_val@offset = extra_val
static void set_fixed_field(uint32_t the_offset, uint32_t length, uint32_t field_num, uint32_t field_null_num,
                            tokudb::buffer &new_val, void *extra_val) {
    assert(the_offset + length <= new_val.size());
    new_val.replace(the_offset, length, extra_val, length);
    if (field_null_num)
        set_overall_null_position((uchar *) new_val.data(), field_null_num & ~(1<<31), false);
}

// Update an int field: signed newval@offset = old_val@offset OP extra_val
static void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_num, uint32_t field_null_num,
                   tokudb::buffer &new_val, tokudb::buffer &old_val, void *extra_val) {
    assert(the_offset + length <= new_val.size());
    assert(the_offset + length <= old_val.size());
    assert(length == 1 || length == 2 || length == 3 || length == 4 || length == 8);

    uchar *old_val_ptr = (uchar *) old_val.data();
    bool field_is_null = false;
    if (field_null_num)
        field_is_null = is_overall_null_position_set(old_val_ptr, field_null_num & ~(1<<31));
    int64_t v = 0;
    memcpy(&v, old_val_ptr + the_offset, length);
    v = tokudb::int_sign_extend(v, 8*length);
    int64_t extra_v = 0;
    memcpy(&extra_v, extra_val, length);
    extra_v = tokudb::int_sign_extend(extra_v, 8*length);
    switch (operation) {
    case '+':
        if (!field_is_null) {
            bool over;
            v = tokudb::int_add(v, extra_v, 8*length, &over);
            if (over) {
                if (extra_v > 0)
                    v = tokudb::int_high_endpoint(8*length);
                else
                    v = tokudb::int_low_endpoint(8*length);
            }
            new_val.replace(the_offset, length, &v, length);
        }
        break;
    case '-':
        if (!field_is_null) {
            bool over;
            v = tokudb::int_sub(v, extra_v, 8*length, &over);
            if (over) {
                if (extra_v > 0)
                    v = tokudb::int_low_endpoint(8*length);
                else
                    v = tokudb::int_high_endpoint(8*length);
            }
            new_val.replace(the_offset, length, &v, length);
        }
        break;
    default:
        assert(0);
    }
}

// Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val
static void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_num, uint32_t field_null_num,
                    tokudb::buffer &new_val, tokudb::buffer &old_val, void *extra_val) {
    assert(the_offset + length <= new_val.size());
    assert(the_offset + length <= old_val.size());
    assert(length == 1 || length == 2 || length == 3 || length == 4 || length == 8);

    uchar *old_val_ptr = (uchar *) old_val.data();
    bool field_is_null = false;
    if (field_null_num)
        field_is_null = is_overall_null_position_set(old_val_ptr, field_null_num & ~(1<<31));
    uint64_t v = 0;
    memcpy(&v, old_val_ptr + the_offset, length);
    uint64_t extra_v = 0;
    memcpy(&extra_v, extra_val, length);
    switch (operation) {
    case '+':
        if (!field_is_null) {
            bool over;
            v = tokudb::uint_add(v, extra_v, 8*length, &over);
            if (over) {
                v = tokudb::uint_high_endpoint(8*length);
            }
            new_val.replace(the_offset, length, &v, length);
        }
        break;
    case '-':
        if (!field_is_null) {
            bool over;
            v = tokudb::uint_sub(v, extra_v, 8*length, &over);
            if (over) {
                v = tokudb::uint_low_endpoint(8*length);
            }
            new_val.replace(the_offset, length, &v, length);
        }
        break;
    default:
        assert(0);
    }
}

// Decode and apply a sequence of update operations defined in the extra to the old value and put the result
// in the new value.
static void apply_updates(tokudb::buffer &new_val, tokudb::buffer &old_val, tokudb::buffer &extra_val, const Simple_row_descriptor &sd) {
    uint32_t num_updates;
    extra_val.consume(&num_updates, sizeof num_updates);
    for ( ; num_updates > 0; num_updates--) {
        // get the update operation
        uint32_t update_operation;
        extra_val.consume(&update_operation, sizeof update_operation);
        uint32_t field_type;
        extra_val.consume(&field_type, sizeof field_type);
        uint32_t field_num;
        extra_val.consume(&field_num, sizeof field_num);
        uint32_t field_null_num;
        extra_val.consume(&field_null_num, sizeof field_null_num);
        uint32_t the_offset;
        extra_val.consume(&the_offset, sizeof the_offset);
        uint32_t length;
        extra_val.consume(&length, sizeof length);
        void *extra_val_ptr = extra_val.consume_ptr(length);

        // apply the update
        switch (field_type) {
        case UPDATE_TYPE_INT:
            if (update_operation == '=')
                set_fixed_field(the_offset, length, field_num, field_null_num, new_val, extra_val_ptr);
            else
                int_op(update_operation, the_offset, length, field_num, field_null_num, new_val, old_val, extra_val_ptr);
            break;
        case UPDATE_TYPE_UINT:
            if (update_operation == '=')
                set_fixed_field(the_offset, length, field_num, field_null_num, new_val, extra_val_ptr);
            else
                uint_op(update_operation, the_offset, length, field_num, field_null_num, new_val, old_val, extra_val_ptr);
            break;
        case UPDATE_TYPE_CHAR:
        case UPDATE_TYPE_BINARY:
            if (update_operation == '=')
                set_fixed_field(the_offset, length, field_num, field_null_num, new_val, extra_val_ptr);
            else 
                assert(0);
            break;
        default:
            assert(0);
            break;
        }
    }
    assert(extra_val.size() == extra_val.limit());
}

// Simple update handler. Decode the update message, apply the update operations to the old value, and set
// the new value.
static int tokudb_simple_update_fun(
    DB* db,
    const DBT *key_dbt,
    const DBT *old_val_dbt,
    const DBT *extra,
    void (*set_val)(const DBT *new_val_dbt, void *set_extra),
    void *set_extra
    )
{
    tokudb::buffer extra_val(extra->data, 0, extra->size);
    
    uchar operation;
    extra_val.consume(&operation, sizeof operation);
    assert(operation == UPDATE_OP_SIMPLE_UPDATE);

    if (old_val_dbt != NULL) {
        // get the simple descriptor
        Simple_row_descriptor sd;
        sd.consume(extra_val);

        tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);

        // new val = old val
        tokudb::buffer new_val;
        new_val.append(old_val_dbt->data, old_val_dbt->size);

        // apply updates to new val
        apply_updates(new_val, old_val, extra_val, sd);

        // set the new val
        DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
        new_val_dbt.data = new_val.data();
        new_val_dbt.size = new_val.size();
        set_val(&new_val_dbt, set_extra);
    }

    return 0;
}

// Simple upsert handler. Decode the upsert message. If the key does not exist, then insert a new value from the extra.
// Otherwise, apply the update operations to the old value, and then set the new value.
static int tokudb_simple_upsert_fun(
    DB* db,
    const DBT *key_dbt,
    const DBT *old_val_dbt, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val_dbt, void *set_extra),
    void *set_extra
    )
{
    tokudb::buffer extra_val(extra->data, 0, extra->size);

    uchar operation;
    extra_val.consume(&operation, sizeof operation);
    assert(operation == UPDATE_OP_SIMPLE_UPSERT);

    uint32_t insert_length;
    extra_val.consume(&insert_length, sizeof insert_length);
    void *insert_row = extra_val.consume_ptr(insert_length);

    if (old_val_dbt == NULL) {
        // insert a new row
        DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
        new_val_dbt.size = insert_length;
        new_val_dbt.data = insert_row;
        set_val(&new_val_dbt, set_extra);
    } else {
        // decode the simple descriptor
        Simple_row_descriptor sd;
        sd.consume(extra_val);

        tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);

        // new val = old val
        tokudb::buffer new_val;
        new_val.append(old_val_dbt->data, old_val_dbt->size);

        // apply updates to new val
        apply_updates(new_val, old_val, extra_val, sd);

        // set the new val
        DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
        new_val_dbt.data = new_val.data();
        new_val_dbt.size = new_val.size();
        set_val(&new_val_dbt, set_extra);
    }

    return 0;
}

// This function is the update callback function that is registered with the YDB environment.
// It uses the first byte in the update message to identify the update message type and call
// the handler for that message.
int tokudb_update_fun(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    ) 
{
    uchar *extra_pos = (uchar *)extra->data;
    uchar operation = extra_pos[0];
    int error;
    switch (operation) {
    case UPDATE_OP_COL_ADD_OR_DROP:
        error = tokudb_hcad_update_fun(db, key, old_val, extra, set_val, set_extra);
        break;
    case UPDATE_OP_EXPAND_VARIABLE_OFFSETS:
        error = tokudb_expand_variable_offsets(db, key, old_val, extra, set_val, set_extra);
        break;
    case UPDATE_OP_EXPAND_INT:
    case UPDATE_OP_EXPAND_UINT:
        error = tokudb_expand_int_field(db, key, old_val, extra, set_val, set_extra);
        break;
    case UPDATE_OP_EXPAND_CHAR:
    case UPDATE_OP_EXPAND_BINARY:
        error = tokudb_expand_char_field(db, key, old_val, extra, set_val, set_extra);
        break;
    case UPDATE_OP_SIMPLE_UPDATE:
        error = tokudb_simple_update_fun(db, key, old_val, extra, set_val, set_extra);
        break;
    case UPDATE_OP_SIMPLE_UPSERT:
        error = tokudb_simple_upsert_fun(db, key, old_val, extra, set_val, set_extra);
        break;
    default:
        error = EINVAL;
        break;
    }
    return error;
}