Commit 8b1b4f8f authored by unknown's avatar unknown

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  mysql.com:/space/pekka/ndb/version/my51
parents b39aff79 673f6288
......@@ -197,6 +197,10 @@ public:
Undofile = 23 ///< Undofile
};
// used 1) until type BlobTable added 2) in upgrade code
static bool
isBlobTableName(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
static inline bool
isTable(int tableType) {
return
......
......@@ -1602,6 +1602,12 @@ public:
const Table * getTable(const char * name) const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* Given main table, get blob table.
*/
const Table * getBlobTable(const Table *, const char * col_name);
const Table * getBlobTable(const Table *, Uint32 col_no);
/*
* Save a table definition in dictionary cache
* @param table Object to put into cache
......
......@@ -260,3 +260,33 @@ DictFilegroupInfo::File::init(){
FileSizeLo = 0;
FileFreeExtents = 0;
}
// blob table name hack
bool
DictTabInfo::isBlobTableName(const char* name, Uint32* ptab_id, Uint32* pcol_no)
{
const char* const prefix = "NDB$BLOB_";
const char* s = strrchr(name, table_name_separator);
s = (s == NULL ? name : s + 1);
if (strncmp(s, prefix, strlen(prefix)) != 0)
return false;
s += strlen(prefix);
uint i, n;
for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
n = 10 * n + (s[i] - '0');
if (i == 0 || s[i] != '_')
return false;
const uint tab_id = n;
s = &s[i + 1];
for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
n = 10 * n + (s[i] - '0');
if (i == 0 || s[i] != 0)
return false;
const uint col_no = n;
if (ptab_id)
*ptab_id = tab_id;
if (pcol_no)
*pcol_no = col_no;
return true;
}
......@@ -27,6 +27,7 @@ static NdbTableImpl f_altered_table;
Ndb_local_table_info *
Ndb_local_table_info::create(NdbTableImpl *table_impl, Uint32 sz)
{
assert(! is_ndb_blob_table(table_impl));
Uint32 tot_size= sizeof(Ndb_local_table_info) - sizeof(Uint64)
+ ((sz+7) & ~7); // round to Uint64
void *data= malloc(tot_size);
......@@ -44,6 +45,7 @@ void Ndb_local_table_info::destroy(Ndb_local_table_info *info)
Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
assert(! is_ndb_blob_table(table_impl));
m_table_impl= table_impl;
}
......@@ -61,18 +63,21 @@ LocalDictCache::~LocalDictCache(){
Ndb_local_table_info *
LocalDictCache::get(const char * name){
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
return m_tableHash.getData(name, len);
}
void
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
assert(! is_ndb_blob_table(name));
const Uint32 id = tab_info->m_table_impl->m_id;
m_tableHash.insertKey(name, strlen(name), id, tab_info);
}
void
LocalDictCache::drop(const char * name){
assert(! is_ndb_blob_table(name));
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
DBUG_ASSERT(info != 0);
Ndb_local_table_info::destroy(info);
......@@ -142,6 +147,7 @@ GlobalDictCache::get(const char * name)
{
DBUG_ENTER("GlobalDictCache::get");
DBUG_PRINT("enter", ("name: %s", name));
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * versions = 0;
......@@ -196,6 +202,7 @@ GlobalDictCache::put(const char * name, NdbTableImpl * tab)
tab ? tab->m_internalName.c_str() : "tab NULL",
tab ? tab->m_version & 0xFFFFFF : 0,
tab ? tab->m_version >> 24 : 0));
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers = m_tableHash.getData(name, len);
......@@ -261,6 +268,7 @@ GlobalDictCache::drop(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::drop");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
......@@ -316,6 +324,7 @@ GlobalDictCache::release(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::release");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
......@@ -365,6 +374,7 @@ GlobalDictCache::alter_table_rep(const char * name,
Uint32 tableVersion,
bool altered)
{
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers =
m_tableHash.getData(name, len);
......
......@@ -754,7 +754,7 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
theDictionary->get_local_table_info(internal_tabname);
if (info == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl *table= info->m_table_impl;
......@@ -846,7 +846,7 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError= theDictionary->getNdbError();
DBUG_RETURN(false);
......
......@@ -1367,6 +1367,25 @@ NdbDictionary::Dictionary::getTable(const char * name) const
return getTable(name, 0);
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getBlobTable(const NdbDictionary::Table* table,
const char* col_name)
{
const NdbDictionary::Column* col = table->getColumn(col_name);
if (col == NULL) {
m_impl.m_error.code = 4318;
return NULL;
}
return getBlobTable(table, col->getColumnNo());
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getBlobTable(const NdbDictionary::Table* table,
Uint32 col_no)
{
return m_impl.getBlobTable(NdbTableImpl::getImpl(*table), col_no);
}
void
NdbDictionary::Dictionary::invalidateTable(const char * name){
DBUG_ENTER("NdbDictionaryImpl::invalidateTable");
......
......@@ -49,6 +49,18 @@
extern Uint64 g_latest_trans_gci;
bool
is_ndb_blob_table(const char* name, Uint32* ptab_id, Uint32* pcol_no)
{
return DictTabInfo::isBlobTableName(name, ptab_id, pcol_no);
}
bool
is_ndb_blob_table(const NdbTableImpl* t)
{
return is_ndb_blob_table(t->m_internalName.c_str());
}
//#define EVENT_DEBUG
/**
......@@ -239,6 +251,9 @@ NdbColumnImpl::init(Type t)
NdbColumnImpl::~NdbColumnImpl()
{
if (m_blobTable != NULL)
delete m_blobTable;
m_blobTable = NULL;
}
bool
......@@ -1282,6 +1297,14 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
if (impl == 0){
impl = m_receiver.getTable(internalTableName,
m_ndb.usingFullyQualifiedNames());
if (impl != 0) {
int ret = getBlobTables(*impl);
if (ret != 0) {
delete impl;
return 0;
}
}
m_globalHash->lock();
m_globalHash->put(internalTableName.c_str(), impl);
m_globalHash->unlock();
......@@ -1307,6 +1330,9 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
{
NdbTableImpl *old;
int ret = getBlobTables(*impl);
assert(ret == 0);
m_globalHash->lock();
if ((old= m_globalHash->get(impl->m_internalName.c_str())))
{
......@@ -1322,12 +1348,78 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
m_localHash.put(impl->m_internalName.c_str(), info);
addBlobTables(*impl);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
}
int
NdbDictionaryImpl::getBlobTables(NdbTableImpl &t)
{
unsigned n= t.m_noOfBlobs;
DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
i--;
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
// retrieve blob table def from DICT - by-pass cache
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
BaseString btname_internal = m_ndb.internalize_table_name(btname);
NdbTableImpl* bt =
m_receiver.getTable(btname_internal, m_ndb.usingFullyQualifiedNames());
if (bt == NULL)
DBUG_RETURN(-1);
// TODO check primary id/version when returned by DICT
// the blob column owns the blob table
assert(c.m_blobTable == NULL);
c.m_blobTable = bt;
}
DBUG_RETURN(0);
}
NdbTableImpl*
NdbDictionaryImpl::getBlobTable(const NdbTableImpl& tab, uint col_no)
{
if (col_no < tab.m_columns.size()) {
NdbColumnImpl* col = tab.m_columns[col_no];
if (col != NULL) {
NdbTableImpl* bt = col->m_blobTable;
if (bt != NULL)
return bt;
else
m_error.code = 4273; // No blob table..
} else
m_error.code = 4249; // Invalid table..
} else
m_error.code = 4318; // Invalid attribute..
return NULL;
}
NdbTableImpl*
NdbDictionaryImpl::getBlobTable(uint tab_id, uint col_no)
{
DBUG_ENTER("NdbDictionaryImpl::getBlobTable");
DBUG_PRINT("enter", ("tab_id: %u col_no %u", tab_id, col_no));
NdbTableImpl* tab = m_receiver.getTable(tab_id,
m_ndb.usingFullyQualifiedNames());
if (tab == NULL)
DBUG_RETURN(NULL);
Ndb_local_table_info* info =
get_local_table_info(tab->m_internalName);
delete tab;
if (info == NULL)
DBUG_RETURN(NULL);
NdbTableImpl* bt = getBlobTable(*info->m_table_impl, col_no);
DBUG_RETURN(bt);
}
#if 0
bool
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
......@@ -1642,7 +1734,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* sig,
}
DBUG_RETURN(-1);
}
#if 0
/*
Get dictionary information for a table using table id as reference
......@@ -1666,8 +1758,6 @@ NdbDictInterface::getTable(int tableId, bool fullyQualifiedNames)
return getTable(&tSignal, 0, 0, fullyQualifiedNames);
}
#endif
/*
Get dictionary information for a table using table name as the reference
......@@ -2148,31 +2238,6 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
DBUG_RETURN(0);
}
int
NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
{
unsigned n= t.m_noOfBlobs;
DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
i--;
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
// Save BLOB table handle
NdbTableImpl * cachedBlobTable = getTable(btname);
if (cachedBlobTable == 0) {
DBUG_RETURN(-1);
}
c.m_blobTable = cachedBlobTable;
}
DBUG_RETURN(0);
}
int
NdbDictInterface::createTable(Ndb & ndb,
NdbTableImpl & impl)
......@@ -2188,7 +2253,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
DBUG_ENTER("NdbDictionaryImpl::alterTable");
Ndb_local_table_info * local = 0;
if((local= get_local_table_info(originalInternalName, false)) == 0)
if((local= get_local_table_info(originalInternalName)) == 0)
{
m_error.code = 709;
DBUG_RETURN(-1);
......@@ -2727,15 +2792,21 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
if (dropTable(btname) != 0) {
if (m_error.code != 709 && m_error.code != 723){
DBUG_PRINT("exit",("error %u - exiting",m_error.code));
NdbTableImpl* bt = c.m_blobTable;
if (bt == NULL) {
DBUG_PRINT("info", ("col %s: blob table pointer is NULL",
c.m_name.c_str()));
continue; // "force" mode on
}
// drop directly - by-pass cache
int ret = m_receiver.dropTable(*c.m_blobTable);
if (ret != 0) {
DBUG_PRINT("info", ("col %s: blob table %s: error %d",
c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
if (! (ret == 709 || ret == 723)) // "force" mode on
DBUG_RETURN(-1);
}
DBUG_PRINT("info",("error %u - continuing",m_error.code));
}
// leave c.m_blobTable defined
}
DBUG_RETURN(0);
}
......@@ -2794,52 +2865,30 @@ NdbDictInterface::execDROP_TABLE_REF(NdbApiSignal * signal,
}
int
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl)
{
const char * internalTableName = impl.m_internalName.c_str();
DBUG_ENTER("NdbDictionaryImpl::invalidateObject");
DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
if (lock)
m_globalHash->lock();
if (impl.m_noOfBlobs != 0) {
for (uint i = 0; i < impl.m_columns.size(); i++) {
NdbColumnImpl& c = *impl.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
assert(c.m_blobTable != NULL);
invalidateObject(*c.m_blobTable, false);
}
}
m_localHash.drop(internalTableName);
m_globalHash->lock();
impl.m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(&impl);
if (lock)
m_globalHash->unlock();
DBUG_RETURN(0);
}
int
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
{
const char * internalTableName = impl.m_internalName.c_str();
DBUG_ENTER("NdbDictionaryImpl::removeCachedObject");
DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
if (lock)
m_globalHash->lock();
if (impl.m_noOfBlobs != 0) {
for (uint i = 0; i < impl.m_columns.size(); i++) {
NdbColumnImpl& c = *impl.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
assert(c.m_blobTable != NULL);
removeCachedObject(*c.m_blobTable, false);
}
}
m_localHash.drop(internalTableName);
m_globalHash->lock();
m_globalHash->release(&impl);
if (lock)
m_globalHash->unlock();
DBUG_RETURN(0);
}
......@@ -2851,8 +2900,7 @@ NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
const BaseString& internalName)
{
Ndb_local_table_info * info = get_local_table_info(internalName,
false);
Ndb_local_table_info * info = get_local_table_info(internalName);
if(info == 0){
m_error.code = 4243;
return 0;
......@@ -3503,7 +3551,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
// We only have the table name with internal name
DBUG_PRINT("info",("table %s", ev->getTableName()));
Ndb_local_table_info *info;
info= get_local_table_info(ev->getTableName(), true);
info= get_local_table_info(ev->getTableName());
if (info == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
......@@ -3513,7 +3561,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
if (info->m_table_impl->m_status != NdbDictionary::Object::Retrieved)
{
removeCachedObject(*info->m_table_impl);
info= get_local_table_info(ev->getTableName(), true);
info= get_local_table_info(ev->getTableName());
if (info == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
......
......@@ -30,6 +30,11 @@
#include "NdbWaiter.hpp"
#include "DictCache.hpp"
bool
is_ndb_blob_table(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
bool
is_ndb_blob_table(const class NdbTableImpl* t);
class NdbDictObjectImpl {
public:
int m_id;
......@@ -437,7 +442,7 @@ public:
int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool fullyQualifiedNames);
int listObjects(NdbApiSignal* signal);
/* NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames); */
NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames);
NdbTableImpl * getTable(const BaseString& name, bool fullyQualifiedNames);
NdbTableImpl * getTable(class NdbApiSignal * signal,
LinearSectionPtr ptr[3],
......@@ -542,13 +547,12 @@ public:
int createTable(NdbTableImpl &t);
int createBlobTables(NdbTableImpl& t);
int addBlobTables(NdbTableImpl &);
int alterTable(NdbTableImpl &t);
int dropTable(const char * name);
int dropTable(NdbTableImpl &);
int dropBlobTables(NdbTableImpl &);
int invalidateObject(NdbTableImpl &, bool lock = true);
int removeCachedObject(NdbTableImpl &, bool lock = true);
int invalidateObject(NdbTableImpl &);
int removeCachedObject(NdbTableImpl &);
int createIndex(NdbIndexImpl &ix);
int dropIndex(const char * indexName,
......@@ -572,9 +576,12 @@ public:
int listIndexes(List& list, Uint32 indexId);
NdbTableImpl * getTable(const char * tableName, void **data= 0);
NdbTableImpl * getBlobTable(const NdbTableImpl&, uint col_no);
NdbTableImpl * getBlobTable(uint tab_id, uint col_no);
void putTable(NdbTableImpl *impl);
Ndb_local_table_info* get_local_table_info(
const BaseString& internalTableName, bool do_add_blob_tables);
int getBlobTables(NdbTableImpl &);
Ndb_local_table_info*
get_local_table_info(const BaseString& internalTableName);
NdbIndexImpl * getIndex(const char * indexName,
const char * tableName);
NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL);
......@@ -846,32 +853,42 @@ inline
NdbTableImpl *
NdbDictionaryImpl::getTable(const char * table_name, void **data)
{
DBUG_ENTER("NdbDictionaryImpl::getTable");
DBUG_PRINT("enter", ("table: %s", table_name));
if (unlikely(strchr(table_name, '$') != 0)) {
Uint32 tab_id, col_no;
if (is_ndb_blob_table(table_name, &tab_id, &col_no)) {
NdbTableImpl* t = getBlobTable(tab_id, col_no);
DBUG_RETURN(t);
}
}
const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
Ndb_local_table_info *info=
get_local_table_info(internal_tabname, true);
get_local_table_info(internal_tabname);
if (info == 0)
return 0;
DBUG_RETURN(0);
if (data)
*data= info->m_local_data;
return info->m_table_impl;
DBUG_RETURN(info->m_table_impl);
}
inline
Ndb_local_table_info *
NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
bool do_add_blob_tables)
NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
{
DBUG_ENTER("NdbDictionaryImpl::get_local_table_info");
DBUG_PRINT("enter", ("table: %s", internalTableName.c_str()));
Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
if (info == 0) {
info= fetchGlobalTableImpl(internalTableName);
if (info == 0) {
return 0;
DBUG_RETURN(0);
}
}
if (do_add_blob_tables && info->m_table_impl->m_noOfBlobs)
addBlobTables(*(info->m_table_impl));
return info; // autoincrement already initialized
DBUG_RETURN(info); // autoincrement already initialized
}
inline
......@@ -891,7 +908,7 @@ NdbDictionaryImpl::getIndex(const char * index_name,
if (internal_indexname.length())
{
Ndb_local_table_info * info=
get_local_table_info(internal_indexname, false);
get_local_table_info(internal_indexname);
if (info)
{
NdbTableImpl * tab= info->m_table_impl;
......@@ -956,6 +973,4 @@ NdbUndofileImpl::getImpl(const NdbDictionary::Undofile & t){
return t.m_impl;
}
#endif
......@@ -506,7 +506,7 @@ ErrorBundle ErrorCodes[] = {
{ 4315, DMEC, AE, "No more key attributes allowed after defining variable length key attribute" },
{ 4316, DMEC, AE, "Key attributes are not allowed to be NULL attributes" },
{ 4317, DMEC, AE, "Too many primary keys defined in table" },
{ 4318, DMEC, AE, "Invalid attribute name" },
{ 4318, DMEC, AE, "Invalid attribute name or number" },
{ 4319, DMEC, AE, "createAttribute called at erroneus place" },
{ 4322, DMEC, AE, "Attempt to define distribution key when not prepared to" },
{ 4323, DMEC, AE, "Distribution Key set on table but not defined on first attribute" },
......@@ -598,7 +598,8 @@ ErrorBundle ErrorCodes[] = {
{ 4335, DMEC, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" },
{ 4336, DMEC, AE, "Auto-increment value set below current value" },
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4272, DMEC, AE, "Table definition has undefined column" }
{ 4272, DMEC, AE, "Table definition has undefined column" },
{ 4273, DMEC, IE, "No blob table in dict cache" }
};
static
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment