Commit 58d8b240 authored by unknown's avatar unknown

Fixed merge problems, optimized bulk insert

parent 56c14b30
...@@ -41,7 +41,10 @@ static const int parallelism= 240; ...@@ -41,7 +41,10 @@ static const int parallelism= 240;
// Default value for max number of transactions // Default value for max number of transactions
// createable against NDB from this handler // createable against NDB from this handler
static const int max_transactions = 256; static const int max_transactions= 256;
// Default value for prefetch of autoincrement values
static const ha_rows autoincrement_prefetch= 32;
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
...@@ -286,7 +289,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, ...@@ -286,7 +289,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
} }
// Blob type // Blob type
NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr); NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL) if (ndb_blob != NULL)
{ {
if (field->is_null()) if (field->is_null())
...@@ -832,7 +835,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -832,7 +835,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
int res; int res;
if (res= set_primary_key_from_old_data(op, old_data)) if ((res= set_primary_key_from_old_data(op, old_data)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
// Read all unreferenced non-key field(s) // Read all unreferenced non-key field(s)
...@@ -950,7 +953,7 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -950,7 +953,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
If this an update or delete, call nextResult with false If this an update or delete, call nextResult with false
to process any records already cached in NdbApi to process any records already cached in NdbApi
*/ */
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE;
do { do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/* /*
...@@ -1328,7 +1331,8 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1328,7 +1331,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected! Find out how this is detected!
*/ */
rows_inserted++; rows_inserted++;
if ((rows_inserted == rows_to_insert) || bulk_insert_not_flushed= true;
if ((rows_to_insert == 1) ||
((rows_inserted % bulk_insert_rows) == 0) || ((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0) uses_blob_value(false) != 0)
{ {
...@@ -1336,6 +1340,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1336,6 +1340,7 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_PRINT("info", ("Sending inserts to NDB, "\ DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d", "rows_inserted:%d, bulk_insert_rows: %d",
(int)rows_inserted, (int)bulk_insert_rows)); (int)rows_inserted, (int)bulk_insert_rows));
bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -1398,38 +1403,34 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1398,38 +1403,34 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if ((table->primary_key != MAX_KEY) && if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data))) (key_cmp(table->primary_key, old_data, new_data)))
{ {
DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete")); int read_res, insert_res, delete_res;
DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
// Get all old fields, since we optimize away fields not in query // Get all old fields, since we optimize away fields not in query
int read_res= complemented_pk_read(old_data, new_data); read_res= complemented_pk_read(old_data, new_data);
if (read_res) if (read_res)
{ {
DBUG_PRINT("info", ("pk read failed")); DBUG_PRINT("info", ("pk read failed"));
DBUG_RETURN(read_res); DBUG_RETURN(read_res);
} }
// Insert new row // Insert new row
int insert_res= write_row(new_data); insert_res= write_row(new_data);
if (!insert_res) if (insert_res)
{
// Delete old row
DBUG_PRINT("info", ("insert succeded"));
int delete_res= delete_row(old_data);
if (!delete_res)
{
DBUG_PRINT("info", ("insert+delete succeeded"));
DBUG_RETURN(0);
}
else
{
DBUG_PRINT("info", ("delete failed"));
DBUG_RETURN(delete_row(new_data));
}
}
else
{ {
DBUG_PRINT("info", ("insert failed")); DBUG_PRINT("info", ("insert failed"));
DBUG_RETURN(insert_res); DBUG_RETURN(insert_res);
} }
// Delete old row
DBUG_PRINT("info", ("insert succeded"));
delete_res= delete_row(old_data);
if (delete_res)
{
DBUG_PRINT("info", ("delete failed"));
// Undo write_row(new_data)
DBUG_RETURN(delete_row(new_data));
}
DBUG_PRINT("info", ("insert+delete succeeded"));
DBUG_RETURN(0);
} }
if (cursor) if (cursor)
...@@ -1833,7 +1834,7 @@ int ha_ndbcluster::index_next(byte *buf) ...@@ -1833,7 +1834,7 @@ int ha_ndbcluster::index_next(byte *buf)
{ {
DBUG_ENTER("index_next"); DBUG_ENTER("index_next");
int error = 1; int error= 1;
statistic_increment(ha_read_next_count,&LOCK_status); statistic_increment(ha_read_next_count,&LOCK_status);
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
} }
...@@ -2208,7 +2209,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) ...@@ -2208,7 +2209,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
degrade if too many bytes are inserted, thus it's limited by this degrade if too many bytes are inserted, thus it's limited by this
calculation. calculation.
*/ */
const int bytesperbatch = 8192; const int bytesperbatch= 8192;
bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns(); bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
batch= bytesperbatch/bytes; batch= bytesperbatch/bytes;
batch= batch == 0 ? 1 : batch; batch= batch == 0 ? 1 : batch;
...@@ -2223,10 +2224,25 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) ...@@ -2223,10 +2224,25 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
*/ */
int ha_ndbcluster::end_bulk_insert() int ha_ndbcluster::end_bulk_insert()
{ {
int error= 0;
DBUG_ENTER("end_bulk_insert"); DBUG_ENTER("end_bulk_insert");
// Check if last inserts need to be flushed
if (bulk_insert_not_flushed)
{
NdbConnection *trans= m_active_trans;
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows));
bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0)
error= ndb_err(trans);
}
rows_inserted= 0; rows_inserted= 0;
rows_to_insert= 1; rows_to_insert= 1;
DBUG_RETURN(0); DBUG_RETURN(error);
} }
...@@ -2247,7 +2263,7 @@ int ha_ndbcluster::reset() ...@@ -2247,7 +2263,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const const char **ha_ndbcluster::bas_ext() const
{ static const char *ext[1] = { NullS }; return ext; } { static const char *ext[1]= { NullS }; return ext; }
/* /*
...@@ -2751,7 +2767,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -2751,7 +2767,7 @@ int ha_ndbcluster::create(const char *name,
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(), field->field_name, field->real_type(),
field->pack_length())); field->pack_length()));
if (my_errno= create_ndb_column(col, field, info)) if ((my_errno= create_ndb_column(col, field, info)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
tab.addColumn(col); tab.addColumn(col);
} }
...@@ -3001,7 +3017,10 @@ longlong ha_ndbcluster::get_auto_increment() ...@@ -3001,7 +3017,10 @@ longlong ha_ndbcluster::get_auto_increment()
{ {
DBUG_ENTER("get_auto_increment"); DBUG_ENTER("get_auto_increment");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
int cache_size = rows_to_insert ? rows_to_insert : 32; int cache_size=
(rows_to_insert > autoincrement_prefetch) ?
rows_to_insert
: autoincrement_prefetch;
Uint64 auto_value= Uint64 auto_value=
m_ndb->getAutoIncrementValue(m_tabname, cache_size); m_ndb->getAutoIncrementValue(m_tabname, cache_size);
DBUG_RETURN((longlong)auto_value); DBUG_RETURN((longlong)auto_value);
...@@ -3026,6 +3045,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3026,6 +3045,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
rows_to_insert(1), rows_to_insert(1),
rows_inserted(0), rows_inserted(0),
bulk_insert_rows(1024), bulk_insert_rows(1024),
bulk_insert_not_flushed(false),
ops_pending(0), ops_pending(0),
blobs_buffer(0), blobs_buffer(0),
blobs_buffer_size(0) blobs_buffer_size(0)
...@@ -3378,7 +3398,7 @@ void ha_ndbcluster::set_tabname(const char *path_name) ...@@ -3378,7 +3398,7 @@ void ha_ndbcluster::set_tabname(const char *path_name)
ptr= m_tabname; ptr= m_tabname;
while (*ptr != '\0') { while (*ptr != '\0') {
*ptr = tolower(*ptr); *ptr= tolower(*ptr);
ptr++; ptr++;
} }
#endif #endif
...@@ -3394,17 +3414,17 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname) ...@@ -3394,17 +3414,17 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
char *end, *ptr; char *end, *ptr;
/* Scan name from the end */ /* Scan name from the end */
end = strend(path_name)-1; end= strend(path_name)-1;
ptr = end; ptr= end;
while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
ptr--; ptr--;
} }
uint name_len = end - ptr; uint name_len= end - ptr;
memcpy(tabname, ptr + 1, end - ptr); memcpy(tabname, ptr + 1, end - ptr);
tabname[name_len] = '\0'; tabname[name_len]= '\0';
#ifdef __WIN__ #ifdef __WIN__
/* Put to lower case */ /* Put to lower case */
ptr = tabname; ptr= tabname;
while (*ptr != '\0') { while (*ptr != '\0') {
*ptr= tolower(*ptr); *ptr= tolower(*ptr);
...@@ -3567,7 +3587,7 @@ static int packfrm(const void *data, uint len, ...@@ -3567,7 +3587,7 @@ static int packfrm(const void *data, uint len,
DBUG_PRINT("enter", ("data: %x, len: %d", data, len)); DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
error= 1; error= 1;
org_len = len; org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len)) if (my_compress((byte*)data, &org_len, &comp_len))
goto err; goto err;
...@@ -3587,9 +3607,9 @@ static int packfrm(const void *data, uint len, ...@@ -3587,9 +3607,9 @@ static int packfrm(const void *data, uint len,
// Copy frm data into blob, already in machine independent format // Copy frm data into blob, already in machine independent format
memcpy(blob->data, data, org_len); memcpy(blob->data, data, org_len);
*pack_data = blob; *pack_data= blob;
*pack_len = blob_len; *pack_len= blob_len;
error = 0; error= 0;
DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len)); DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
err: err:
...@@ -3601,7 +3621,7 @@ static int packfrm(const void *data, uint len, ...@@ -3601,7 +3621,7 @@ static int packfrm(const void *data, uint len,
static int unpackfrm(const void **unpack_data, uint *unpack_len, static int unpackfrm(const void **unpack_data, uint *unpack_len,
const void *pack_data) const void *pack_data)
{ {
const frm_blob_struct *blob = (frm_blob_struct*)pack_data; const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
byte *data; byte *data;
ulong complen, orglen, ver; ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm"); DBUG_ENTER("unpackfrm");
...@@ -3617,7 +3637,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, ...@@ -3617,7 +3637,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
if (ver != 1) if (ver != 1)
DBUG_RETURN(1); DBUG_RETURN(1);
if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME)))) if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
DBUG_RETURN(2); DBUG_RETURN(2);
memcpy(data, blob->data, complen); memcpy(data, blob->data, complen);
...@@ -3627,8 +3647,8 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, ...@@ -3627,8 +3647,8 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
DBUG_RETURN(3); DBUG_RETURN(3);
} }
*unpack_data = data; *unpack_data= data;
*unpack_len = complen; *unpack_len= complen;
DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len)); DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
......
...@@ -221,6 +221,7 @@ class ha_ndbcluster: public handler ...@@ -221,6 +221,7 @@ class ha_ndbcluster: public handler
ha_rows rows_to_insert; ha_rows rows_to_insert;
ha_rows rows_inserted; ha_rows rows_inserted;
ha_rows bulk_insert_rows; ha_rows bulk_insert_rows;
bool bulk_insert_not_flushed;
ha_rows ops_pending; ha_rows ops_pending;
bool blobs_pending; bool blobs_pending;
// memory for blobs in one tuple // memory for blobs in one tuple
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment