Commit b35a747e authored by unknown's avatar unknown

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  mysql.com:/space/pekka/ndb/version/my51
parents d2574ad4 5225ab6f
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
CREATE TABLE t1 (
a INT NOT NULL,
......@@ -328,3 +328,24 @@ select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
no_copy
no_copy
DROP TABLE t1, ndb_show_tables;
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
a b
3 103
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
a b
6 203
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
a b c
9 303 NULL
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
a b c
12 403 NULL
drop table t1;
......@@ -642,30 +642,30 @@ counter datavalue
6 newval
7 newval
8 newval
35 newval
36 newval
37 newval
38 newval
39 newval
40 newval
41 newval
42 newval
43 newval
44 newval
45 newval
46 newval
47 newval
48 newval
49 newval
50 newval
51 newval
52 newval
53 newval
54 newval
55 newval
56 newval
57 newval
58 newval
9 newval
10 newval
11 newval
12 newval
13 newval
14 newval
15 newval
16 newval
17 newval
18 newval
19 newval
20 newval
21 newval
22 newval
23 newval
24 newval
25 newval
26 newval
27 newval
28 newval
29 newval
30 newval
31 newval
32 newval
drop table t1;
CREATE TABLE t1 ( b INT ) PACK_KEYS = 0 ENGINE = ndb;
select * from t1;
......
......@@ -3,7 +3,7 @@
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
--enable_warnings
......@@ -383,3 +383,18 @@ LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
DROP TABLE t1, ndb_show_tables;
# simple test that auto incr is not lost at rename or alter
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
drop table t1;
......@@ -106,7 +106,6 @@ static uint ndbcluster_alter_table_flags(uint flags)
}
#define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
#define NDB_AUTO_INCREMENT_RETRIES 10
#define ERR_PRINT(err) \
......@@ -2465,14 +2464,16 @@ int ha_ndbcluster::write_row(byte *record)
{
// Table has hidden primary key
Ndb *ndb= get_ndb();
Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
int ret;
Uint64 auto_value;
uint retries= NDB_AUTO_INCREMENT_RETRIES;
do {
auto_value= ndb->getAutoIncrementValue(m_table);
} while (auto_value == NDB_FAILED_AUTO_INCREMENT &&
Ndb_tuple_id_range_guard g(m_share);
ret= ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1);
} while (ret == -1 &&
--retries &&
ndb->getNdbError().status == NdbError::TemporaryError);
if (auto_value == NDB_FAILED_AUTO_INCREMENT)
if (ret == -1)
ERR_RETURN(ndb->getNdbError());
if (set_hidden_key(op, table_share->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError());
......@@ -2567,11 +2568,12 @@ int ha_ndbcluster::write_row(byte *record)
Ndb *ndb= get_ndb();
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
DBUG_PRINT("info",
("Trying to set next auto increment value to %lu",
(ulong) next_val));
if (ndb->setAutoIncrementValue(m_table, next_val, TRUE))
DBUG_PRINT("info",
("Setting next auto increment value to %u", next_val));
("Trying to set next auto increment value to %llu",
(ulonglong) next_val));
Ndb_tuple_id_range_guard g(m_share);
if (ndb->setAutoIncrementValue(m_table, g.range, next_val, TRUE)
== -1)
ERR_RETURN(ndb->getNdbError());
}
m_skip_auto_increment= TRUE;
......@@ -3542,9 +3544,16 @@ void ha_ndbcluster::info(uint flag)
if (m_table)
{
Ndb *ndb= get_ndb();
Ndb_tuple_id_range_guard g(m_share);
auto_increment_value=
ndb->readAutoIncrementValue(m_table);
if (ndb->readAutoIncrementValue(m_table, g.range,
auto_increment_value) == -1)
{
const NdbError err= ndb->getNdbError();
sql_print_error("Error %lu in readAutoIncrementValue(): %s",
(ulong) err.code, err.message);
auto_increment_value= ~(Uint64)0;
}
}
}
DBUG_VOID_RETURN;
......@@ -5236,17 +5245,18 @@ ulonglong ha_ndbcluster::get_auto_increment()
m_rows_to_insert - m_rows_inserted :
((m_rows_to_insert > m_autoincrement_prefetch) ?
m_rows_to_insert : m_autoincrement_prefetch));
auto_value= NDB_FAILED_AUTO_INCREMENT;
int ret;
uint retries= NDB_AUTO_INCREMENT_RETRIES;
do {
auto_value=
(m_skip_auto_increment) ?
ndb->readAutoIncrementValue(m_table)
: ndb->getAutoIncrementValue(m_table, cache_size);
} while (auto_value == NDB_FAILED_AUTO_INCREMENT &&
Ndb_tuple_id_range_guard g(m_share);
ret=
m_skip_auto_increment ?
ndb->readAutoIncrementValue(m_table, g.range, auto_value) :
ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size);
} while (ret == -1 &&
--retries &&
ndb->getNdbError().status == NdbError::TemporaryError);
if (auto_value == NDB_FAILED_AUTO_INCREMENT)
if (ret == -1)
{
const NdbError err= ndb->getNdbError();
sql_print_error("Error %lu in ::get_auto_increment(): %s",
......
......@@ -106,6 +106,7 @@ typedef struct st_ndbcluster_share {
ulonglong commit_count;
char *db;
char *table_name;
Ndb::TupleIdRange tuple_id_range;
#ifdef HAVE_NDB_BINLOG
uint32 flags;
NdbEventOperation *op;
......@@ -139,6 +140,19 @@ set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state)
pthread_mutex_unlock(&share->mutex);
}
struct Ndb_tuple_id_range_guard {
Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
share(_share),
range(share->tuple_id_range) {
pthread_mutex_lock(&share->mutex);
}
~Ndb_tuple_id_range_guard() {
pthread_mutex_unlock(&share->mutex);
}
NDB_SHARE* share;
Ndb::TupleIdRange& range;
};
#ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
......@@ -726,7 +740,6 @@ static void set_tabname(const char *pathname, char *tabname);
int drop_indexes(Ndb *ndb, TABLE *tab);
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
KEY *key_info, const char *index_name, uint index_no);
int initialize_autoincrement(const void *table);
int get_metadata(const char* path);
void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
......
......@@ -986,6 +986,7 @@ class NdbBlob;
class NdbReceiver;
class TransporterFacade;
class PollGuard;
class Ndb_local_table_info;
template <class T> struct Ndb_free_list_t;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
......@@ -1463,35 +1464,58 @@ public:
/**
* Return a unique tuple id for a table. The id sequence is
* ascending but may contain gaps.
* ascending but may contain gaps. Methods which have no
* TupleIdRange argument use NDB API dict cache. They may
* not be called from mysqld.
*
* @param aTableName table name
*
* @param cacheSize number of values to cache in this Ndb object
*
* @return tuple id or 0 on error
* @return 0 or -1 on error, and tupleId in out parameter
*/
struct TupleIdRange {
Uint64 m_first_tuple_id;
Uint64 m_last_tuple_id;
void reset() {
m_first_tuple_id = ~(Uint64)0;
m_last_tuple_id = ~(Uint64)0;
};
};
int initAutoIncrement();
Uint64 getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1);
Uint64 getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint32 cacheSize = 1);
Uint64 readAutoIncrementValue(const char* aTableName);
Uint64 readAutoIncrementValue(const NdbDictionary::Table * aTable);
bool setAutoIncrementValue(const char* aTableName, Uint64 val,
bool increase = false);
bool setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val,
bool increase = false);
Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId,
Uint32 cacheSize = 1000);
Uint64 readTupleIdFromNdb(Uint32 aTableId);
bool setTupleIdInNdb(const char* aTableName, Uint64 val,
bool increase);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op);
int getAutoIncrementValue(const char* aTableName,
Uint64 & tupleId, Uint32 cacheSize);
int getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId, Uint32 cacheSize);
int getAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize);
int readAutoIncrementValue(const char* aTableName,
Uint64 & tupleId);
int readAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId);
int readAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId);
int setAutoIncrementValue(const char* aTableName,
Uint64 tupleId, bool increase);
int setAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 tupleId, bool increase);
int setAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 tupleId,
bool increase);
private:
int getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize);
int readTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId);
int setTupleIdInNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 tupleId, bool increase);
int opTupleIdOnNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & opValue, Uint32 op);
public:
/**
*/
......@@ -1694,12 +1718,8 @@ private:
Uint64 the_last_check_time;
Uint64 theFirstTransId;
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
// The tupleId is retrieved from DB
const NdbDictionary::Table *m_sys_tab_0;
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
Uint32 theRestartGCI; // the Restart GCI used by DIHNDBTAMPER
......
......@@ -1645,10 +1645,9 @@ void Ndbcntr::systemErrorLab(Signal* signal, int line)
/* |-2048| # 1 00000001 | */
/* | : | : | */
/* | -1 | # 1 00000001 | */
/* | 0 | 0 | */
/* | 1 | 0 | */
/* | : | : | */
/* | 2047| 0 | */
/* | 1 | 0 | tupleid sequence now created on first use */
/* | : | : | v */
/* | 2048| 0 | v */
/*---------------------------------------------------------------------------*/
void Ndbcntr::createSystableLab(Signal* signal, unsigned index)
{
......@@ -1859,8 +1858,7 @@ void Ndbcntr::crSystab8Lab(Signal* signal)
jam();
ckey = 1;
ctransidPhase = ZFALSE;
crSystab7Lab(signal);
return;
// skip 2nd loop - tupleid sequence now created on first use
}//if
signal->theData[0] = ctcConnectionP;
signal->theData[1] = reference();
......
......@@ -47,6 +47,7 @@ Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
assert(! is_ndb_blob_table(table_impl));
m_table_impl= table_impl;
m_tuple_id_range.reset();
}
Ndb_local_table_info::~Ndb_local_table_info()
......
......@@ -33,6 +33,10 @@ public:
static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0);
static void destroy(Ndb_local_table_info *);
NdbTableImpl *m_table_impl;
// range of cached tuple ids per thread
Ndb::TupleIdRange m_tuple_id_range;
Uint64 m_local_data[1]; // Must be last member. Used to access extra space.
private:
Ndb_local_table_info(NdbTableImpl *table_impl);
......
......@@ -747,158 +747,271 @@ Remark: Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
****************************************************************************/
Uint64
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
int
Ndb::getAutoIncrementValue(const char* aTableName,
Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("getAutoIncrementValue");
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl *table= info->m_table_impl;
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
Uint64
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize)
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("getAutoIncrementValue");
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize)
{
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return ~(Uint64)0;
return getTupleIdFromNdb(table->m_id, cacheSize);
DBUG_ENTER("Ndb::getAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
int
Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("getTupleIdFromNdb");
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
DBUG_ENTER("Ndb::getTupleIdFromNdb");
if (range.m_first_tuple_id != range.m_last_tuple_id)
{
theFirstTupleId[aTableId]++;
DBUG_PRINT("info", ("next cached value %ul",
(ulong) theFirstTupleId[aTableId]));
DBUG_RETURN(theFirstTupleId[aTableId]);
assert(range.m_first_tuple_id < range.m_last_tuple_id);
tupleId = ++range.m_first_tuple_id;
DBUG_PRINT("info", ("next cached value %llu", (ulonglong)tupleId));
}
else // theFirstTupleId == theLastTupleId
else
{
DBUG_PRINT("info",("reading %u values from database",
(cacheSize == 0) ? 1 : cacheSize));
DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
if (cacheSize == 0)
cacheSize = 1;
DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
/*
* reserve next cacheSize entries in db. adds cacheSize to NEXTID
* and returns first tupleId in the new range.
*/
Uint64 opValue = cacheSize;
if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
}
DBUG_RETURN(0);
}
Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
int
Ndb::readAutoIncrementValue(const char* aTableName,
Uint64 & tupleId)
{
DBUG_ENTER("readAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
DBUG_RETURN(~(Uint64)0);
DBUG_ENTER("Ndb::readAutoIncrementValue");
ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
Uint64 tupleId = readTupleIdFromNdb(table->m_id);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
Uint64
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId)
{
DBUG_ENTER("readAutoIncrementValue");
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
DBUG_ENTER("Ndb::readAutoIncrementValue");
ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_id);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId)
{
if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
// Cache is empty, check next in database
return opTupleIdOnNdb(aTableId, 0, 3);
DBUG_ENTER("Ndb::readAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
return theFirstTupleId[aTableId] + 1;
if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
}
bool
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
int
Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId)
{
DBUG_ENTER("setAutoIncrementValue");
DBUG_ENTER("Ndb::readTupleIdFromNdb");
if (range.m_first_tuple_id != range.m_last_tuple_id)
{
assert(range.m_first_tuple_id < range.m_last_tuple_id);
tupleId = range.m_first_tuple_id + 1;
}
else
{
/*
* peek at NEXTID. does not reserve it so the value is valid
* only if no other transactions are allowed.
*/
Uint64 opValue = 0;
if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
}
DBUG_RETURN(0);
}
int
Ndb::setAutoIncrementValue(const char* aTableName,
Uint64 tupleId, bool increase)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError= theDictionary->getNdbError();
DBUG_RETURN(false);
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
const NdbTableImpl* table= info->m_table_impl;
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
bool
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool increase)
int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 tupleId, bool increase)
{
DBUG_ENTER("setAutoIncrementValue");
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
DBUG_ENTER("Ndb::setAutoIncrementValue");
ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 tupleId,
bool increase)
{
DBUG_ENTER("setTupleIdInNdb(const char*, ...)");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
DBUG_RETURN(false);
}
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
DBUG_ENTER("Ndb::setAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
bool
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
int
Ndb::setTupleIdInNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 tupleId, bool increase)
{
DBUG_ENTER("setTupleIdInNdb(Uint32, ...)");
DBUG_ENTER("Ndb::setTupleIdInNdb");
if (increase)
{
if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
if (range.m_first_tuple_id != range.m_last_tuple_id)
{
// We have a cache sequence
if (val <= theFirstTupleId[aTableId]+1)
DBUG_RETURN(false);
if (val <= theLastTupleId[aTableId])
assert(range.m_first_tuple_id < range.m_last_tuple_id);
if (tupleId <= range.m_first_tuple_id + 1)
DBUG_RETURN(0);
if (tupleId <= range.m_last_tuple_id)
{
theFirstTupleId[aTableId] = val - 1;
DBUG_RETURN(true);
range.m_first_tuple_id = tupleId - 1;
DBUG_PRINT("info",
("Setting next auto increment cached value to %llu",
(ulonglong)tupleId));
DBUG_RETURN(0);
}
// else continue;
}
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 2) == val));
/*
* if tupleId <= NEXTID, do nothing. otherwise update NEXTID to
* tupleId and set cached range to first = last = tupleId - 1.
*/
if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
DBUG_RETURN(-1);
}
else
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
{
/*
* update NEXTID to given value. reset cached range.
*/
if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
int Ndb::initAutoIncrement()
......@@ -922,18 +1035,18 @@ int Ndb::initAutoIncrement()
return (m_sys_tab_0 == NULL);
}
Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
int
Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & opValue, Uint32 op)
{
DBUG_ENTER("Ndb::opTupleIdOnNdb");
Uint32 aTableId = table->m_id;
DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op));
NdbTransaction* tConnection;
NdbOperation* tOperation= 0; // Compiler warning if not initialized
Uint64 tValue;
NdbRecAttr* tRecAttrResult;
int result;
Uint64 ret;
CHECK_STATUS_MACRO_ZERO;
......@@ -961,42 +1074,44 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tValue = tRecAttrResult->u_64_value();
theFirstTupleId[aTableId] = tValue - opValue;
theLastTupleId[aTableId] = tValue - 1;
ret = theFirstTupleId[aTableId];
range.m_first_tuple_id = tValue - opValue;
range.m_last_tuple_id = tValue - 1;
opValue = range.m_first_tuple_id; // out
break;
case 1:
tOperation->updateTuple();
// create on first use
tOperation->writeTuple();
tOperation->equal("SYSKEY_0", aTableId );
tOperation->setValue("NEXTID", opValue);
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
theFirstTupleId[aTableId] = ~(Uint64)0;
theLastTupleId[aTableId] = ~(Uint64)0;
ret = opValue;
range.reset();
break;
case 2:
tOperation->interpretedUpdateTuple();
tOperation->equal("SYSKEY_0", aTableId );
tOperation->load_const_u64(1, opValue);
tOperation->read_attr("NEXTID", 2);
// compare NEXTID >= opValue
tOperation->branch_le(2, 1, 0);
tOperation->write_attr("NEXTID", 1);
tOperation->interpret_exit_ok();
tOperation->def_label(0);
tOperation->interpret_exit_nok(9999);
if ( (result = tConnection->execute( Commit )) == -1 )
goto error_handler;
if (result == 9999)
ret = ~(Uint64)0;
if (tConnection->execute( Commit ) == -1)
{
if (tConnection->theError.code != 9999)
goto error_handler;
}
else
{
theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
ret = opValue;
DBUG_PRINT("info",
("Setting next auto increment value (db) to %llu",
(ulonglong)opValue));
range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
}
break;
case 3:
......@@ -1005,7 +1120,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
ret = tRecAttrResult->u_64_value();
opValue = tRecAttrResult->u_64_value(); // out
break;
default:
goto error_handler;
......@@ -1013,7 +1128,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
this->closeTransaction(tConnection);
DBUG_RETURN(ret);
DBUG_RETURN(0);
error_handler:
theError.code = tConnection->theError.code;
......@@ -1023,7 +1138,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
theError.code,
tConnection ? tConnection->theError.code : -1,
tOperation ? tOperation->theError.code : -1));
DBUG_RETURN(~(Uint64)0);
DBUG_RETURN(-1);
}
Uint32
......
......@@ -1387,9 +1387,6 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(impl->m_internalName.c_str(), info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
}
int
......@@ -2249,11 +2246,11 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t)
}
if (autoIncrement) {
// XXX unlikely race condition - t.m_id may no longer be same table
if (! m_ndb.setTupleIdInNdb(t.m_id, initialValue, false)) {
if (m_ndb.theError.code)
m_error.code = m_ndb.theError.code;
else
m_error.code = 4336;
// the tuple id range is not used on input
Ndb::TupleIdRange range;
if (m_ndb.setTupleIdInNdb(&t, range, initialValue, false) == -1) {
assert(m_ndb.theError.code != 0);
m_error.code = m_ndb.theError.code;
delete t2;
DBUG_RETURN(-1);
}
......
......@@ -951,8 +951,6 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
if (info)
{
m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[tab->getTableId()] = ~0;
m_ndb.theLastTupleId[tab->getTableId()] = ~0;
}
}
}
......
......@@ -100,10 +100,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theConnectionArray[i] = NULL;
}//forg
m_sys_tab_0 = NULL;
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
}//for
theImpl->m_dbname.assign(aDataBase);
theImpl->m_schemaname.assign(aSchema);
......
......@@ -600,7 +600,6 @@ ErrorBundle ErrorCodes[] = {
{ 4269, DMEC, IE, "No connection to ndb management server" },
{ 4270, DMEC, IE, "Unknown blob error" },
{ 4335, DMEC, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" },
{ 4336, DMEC, AE, "Auto-increment value set below current value" },
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4272, DMEC, AE, "Table definition has undefined column" },
{ 4273, DMEC, IE, "No blob table in dict cache" },
......
......@@ -1139,9 +1139,13 @@ runCreateAutoincrementTable(NDBT_Context* ctx, NDBT_Step* step){
for (int i = 0; i < 16; i++) {
Uint64 value = myNdb->getAutoIncrementValue(tabname, 1);
if (value != (startvalue+i)) {
Uint64 value;
if (myNdb->getAutoIncrementValue(tabname, value, 1) == -1) {
g_err << "getAutoIncrementValue failed on " << tabname << endl;
APIERROR(myNdb->getNdbError());
return NDBT_FAILED;
}
else if (value != (startvalue+i)) {
g_err << "value = " << value << " expected " << startvalue+i << endl;;
APIERROR(myNdb->getNdbError());
// ret = NDBT_FAILED;
......
......@@ -151,9 +151,12 @@ BackupRestore::finalize_table(const TableS & table){
if (table.have_auto_inc())
{
Uint64 max_val= table.get_max_auto_val();
Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
Uint64 auto_val;
int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);
if (r == -1 && m_ndb->getNdbError().code != 626)
ret= false;
else if (r == -1 || max_val+1 > auto_val)
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false) != -1;
}
return ret;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment