Commit 211ae3ee authored by unknown's avatar unknown

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  mysql.com:/usr/local/home/marty/MySQL/mysql-5.1-new-wl1892


sql/ha_ndbcluster_binlog.cc:
  Auto merged
storage/ndb/src/kernel/blocks/suma/Suma.cpp:
  Auto merged
parents 0b88ef9d 53f1a42d
......@@ -2723,7 +2723,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
// sometimes get TE_ALTER with invalid table
DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
......@@ -2864,7 +2864,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
DBUG_PRINT("info",("removing all event operations"));
while ((op= ndb->getEventOperation()))
{
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
......
......@@ -754,7 +754,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_SUB_SYNC_REQ 582
#define GSN_SUB_SYNC_REF 583
#define GSN_SUB_SYNC_CONF 584
#define GSN_SUB_META_DATA 585
/* 585 unused */
#define GSN_SUB_TABLE_DATA 586
#define GSN_CREATE_TABLE_REQ 587
......
......@@ -33,6 +33,7 @@ class AlterTabReq {
friend class Dbdih;
friend class Dbtc;
friend class Dblqh;
friend class Suma;
/**
* For printing
......@@ -103,7 +104,6 @@ class AlterTabConf {
friend class Dbtc;
friend class Dblqh;
friend class Dbtup;
friend class Suma;
/**
* For printing
......
......@@ -36,6 +36,7 @@ class AlterTableReq {
* Sender(s) / Reciver(s)
*/
friend class NdbTableImpl;
friend class NdbEventOperationImpl;
friend class NdbDictInterface;
friend class Dbdict;
......
......@@ -277,22 +277,6 @@ struct SubSyncConf {
Uint32 senderData;
};
struct SubMetaData {
/**
* Sender(s)/Reciver(s)
*/
friend struct SumaParticipant;
friend struct Grep;
friend bool printSUB_META_DATA(FILE *, const Uint32 *, Uint32, Uint16);
STATIC_CONST( SignalLength = 3 );
SECTION( DICT_TAB_INFO = 0 );
Uint32 gci;
Uint32 senderData;
Uint32 tableId;
};
struct SubTableData {
/**
* Sender(s)/Reciver(s)
......@@ -301,7 +285,11 @@ struct SubTableData {
friend struct Grep;
friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16);
STATIC_CONST( SignalLength = 5 );
STATIC_CONST( SignalLength = 7 );
SECTION( DICT_TAB_INFO = 0 );
SECTION( ATTR_INFO = 0 );
SECTION( AFTER_VALUES = 1 );
SECTION( BEFORE_VALUES = 2 );
enum LogType {
SCAN = 1,
......@@ -317,6 +305,8 @@ struct SubTableData {
Uint8 ndbd_nodeid;
Uint8 not_used3;
Uint32 logType;
Uint32 changeMask;
Uint32 totalLen;
};
struct SubSyncContinueReq {
......
......@@ -162,6 +162,7 @@ public:
class Table; // forward declaration
class Tablespace; // forward declaration
// class NdbEventOperation; // forward declaration
/**
* @class Column
......@@ -885,6 +886,7 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class NdbDictionaryImpl;
friend class NdbTableImpl;
friend class NdbEventOperationImpl;
#endif
class NdbTableImpl & m_impl;
Table(NdbTableImpl&);
......@@ -1182,6 +1184,12 @@ public:
* Get unique identifier for the event
*/
const char *getName() const;
/**
* Get table that the event is defined on
*
* @return pointer to table or NULL if no table has been defined
*/
const NdbDictionary::Table * getTable() const;
/**
* Define table on which events should be detected
*
......
......@@ -176,6 +176,26 @@ public:
*/
NdbDictionary::Event::TableEvent getEventType() const;
/**
* Check if table name has changed, for event TE_ALTER
*/
const bool tableNameChanged() const;
/**
* Check if table frm has changed, for event TE_ALTER
*/
const bool tableFrmChanged() const;
/**
* Check if table fragmentation has changed, for event TE_ALTER
*/
const bool tableFragmentationChanged() const;
/**
* Check if table range partition list name has changed, for event TE_ALTER
*/
const bool tableRangeListChanged() const;
/**
* Retrieve the GCI of the latest retrieved event
*
......@@ -200,14 +220,13 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** these are subject to change at any time */
const NdbDictionary::Table *getTable() const;
const NdbDictionary::Event *getEvent() const;
const NdbRecAttr *getFirstPkAttr() const;
const NdbRecAttr *getFirstPkPreAttr() const;
const NdbRecAttr *getFirstDataAttr() const;
const NdbRecAttr *getFirstDataPreAttr() const;
bool validateTable(NdbDictionary::Table &table) const;
// bool validateTable(NdbDictionary::Table &table) const;
void setCustomData(void * data);
void * getCustomData() const;
......
......@@ -166,7 +166,6 @@ SignalDataPrintFunctions[] = {
{ GSN_SUB_SYNC_REQ, printSUB_SYNC_REQ },
{ GSN_SUB_SYNC_REF, printSUB_SYNC_REF },
{ GSN_SUB_SYNC_CONF, printSUB_SYNC_CONF },
{ GSN_SUB_META_DATA, printSUB_META_DATA },
{ GSN_SUB_TABLE_DATA, printSUB_TABLE_DATA },
{ GSN_SUB_SYNC_CONTINUE_REQ, printSUB_SYNC_CONTINUE_REQ },
{ GSN_SUB_SYNC_CONTINUE_REF, printSUB_SYNC_CONTINUE_REF },
......
......@@ -540,7 +540,6 @@ const GsnName SignalNames [] = {
,{ GSN_SUB_SYNC_REQ, "SUB_SYNC_REQ" }
,{ GSN_SUB_SYNC_REF, "SUB_SYNC_REF" }
,{ GSN_SUB_SYNC_CONF, "SUB_SYNC_CONF" }
,{ GSN_SUB_META_DATA, "SUB_META_DATA" }
,{ GSN_SUB_TABLE_DATA, "SUB_TABLE_DATA" }
,{ GSN_SUB_SYNC_CONTINUE_REQ, "SUB_SYNC_CONTINUE_REQ" }
,{ GSN_SUB_SYNC_CONTINUE_REF, "SUB_SYNC_CONTINUE_REF" }
......
......@@ -169,17 +169,6 @@ printSUB_SYNC_CONF(FILE * output, const Uint32 * theData,
return false;
}
bool
printSUB_META_DATA(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubMetaData * const sig = (SubMetaData *)theData;
fprintf(output, " gci: %x\n", sig->gci);
fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " tableId: %x\n", sig->tableId);
return false;
}
bool
printSUB_TABLE_DATA(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
......
......@@ -4666,12 +4666,7 @@ Dbdict::alterTab_writeSchemaConf(Signal* signal,
SegmentedSectionPtr tabInfoPtr;
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
writeTableFile(signal, tableId, tabInfoPtr, &callback);
alterTabPtr.p->m_tabInfoPtrI = RNIL;
signal->setSection(tabInfoPtr, 0);
releaseSections(signal);
}
void
......@@ -4685,8 +4680,32 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
Uint32 coordinatorRef = alterTabPtr.p->m_coordinatorRef;
TableRecordPtr tabPtr;
c_tableRecordPool.getPtr(tabPtr, alterTabPtr.p->m_alterTableId);
// Alter table commit request handled successfully
// Inform Suma so it can send events to any subscribers of the table
AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
if (coordinatorRef == reference())
req->senderRef = alterTabPtr.p->m_senderRef;
else
req->senderRef = 0;
req->senderData = callbackData;
req->tableId = tabPtr.p->tableId;
req->tableVersion = tabPtr.p->tableVersion;
req->gci = tabPtr.p->gciTableCreated;
req->requestType = AlterTabReq::AlterTableCommit;
req->changeMask = alterTabPtr.p->m_changeMask;
SegmentedSectionPtr tabInfoPtr;
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
signal->setSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
#ifndef DBUG_OFF
ndbout_c("DICT_TAB_INFO in DICT");
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
reader.printAll(ndbout);
#endif
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_REQ, signal,
AlterTabReq::SignalLength);
releaseSections(signal);
alterTabPtr.p->m_tabInfoPtrI = RNIL;
jamEntry();
AlterTabConf * conf = (AlterTabConf*)signal->getDataPtrSend();
conf->senderRef = reference();
conf->senderData = callbackData;
......@@ -4694,17 +4713,7 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
conf->tableVersion = tabPtr.p->tableVersion;
conf->gci = tabPtr.p->gciTableCreated;
conf->requestType = AlterTabReq::AlterTableCommit;
{
AlterTabConf tmp= *conf;
if (coordinatorRef == reference())
conf->senderRef = alterTabPtr.p->m_senderRef;
else
conf->senderRef = 0;
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_CONF, signal,
AlterTabConf::SignalLength);
jamEntry();
*conf= tmp;
}
conf->changeMask = alterTabPtr.p->m_changeMask;
sendSignal(coordinatorRef, GSN_ALTER_TAB_CONF, signal,
AlterTabConf::SignalLength, JBB);
......
......@@ -2559,8 +2559,9 @@ Suma::reportAllSubscribers(Signal *signal,
data->gci = m_last_complete_gci + 1;
data->tableId = subPtr.p->m_tableId;
data->operation = table_event;
data->logType = 0;
data->ndbd_nodeid = refToNode(reference());
data->changeMask = 0;
data->totalLen = 0;
TablePtr tabPtr;
c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
......@@ -3177,7 +3178,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
LinearSectionPtr ptr[3];
const Uint32 nptr= reformat(signal, ptr,
f_buffer, sz, b_buffer, b_trigBufferSize);
Uint32 ptrLen= 0;
for(Uint32 i =0; i < nptr; i++)
ptrLen+= ptr[i].sz;
/**
* Signal to subscriber(s)
*/
......@@ -3188,6 +3191,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
data->tableId = tabPtr.p->m_tableId;
data->operation = event;
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
{
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
......@@ -3435,17 +3440,19 @@ Suma::execDROP_TAB_CONF(Signal *signal)
DBUG_VOID_RETURN;
}
static Uint32 b_dti_buf[10000];
void
Suma::execALTER_TAB_CONF(Signal *signal)
Suma::execALTER_TAB_REQ(Signal *signal)
{
jamEntry();
DBUG_ENTER("Suma::execALTER_TAB_CONF");
ndbassert(signal->getNoOfSections() == 0);
AlterTabConf * const conf = (AlterTabConf*)signal->getDataPtr();
Uint32 senderRef= conf->senderRef;
Uint32 tableId= conf->tableId;
DBUG_ENTER("Suma::execALTER_TAB_REQ");
ndbassert(signal->getNoOfSections() == 1);
AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
Uint32 senderRef= req->senderRef;
Uint32 tableId= req->tableId;
Uint32 changeMask= req->changeMask;
TablePtr tabPtr;
if (!c_tables.find(tabPtr, tableId) ||
tabPtr.p->m_state == Table::DROPPED ||
......@@ -3465,12 +3472,29 @@ Suma::execALTER_TAB_CONF(Signal *signal)
}
// dict coordinator sends info to API
// Copy DICT_TAB_INFO to local buffer
SegmentedSectionPtr tabInfoPtr;
signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
#ifndef DBUG_OFF
ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
reader.printAll(ndbout);
#endif
copy(b_dti_buf, tabInfoPtr);
LinearSectionPtr ptr[3];
ptr[0].p = b_dti_buf;
ptr[0].sz = tabInfoPtr.sz;
releaseSections(signal);
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci+1;
data->tableId = tableId;
data->operation = NdbDictionary::Event::_TE_ALTER;
data->req_nodeid = refToNode(senderRef);
data->logType = 0;
data->changeMask = changeMask;
data->totalLen = tabInfoPtr.sz;
{
LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
SubscriberPtr subbPtr;
......@@ -3490,8 +3514,9 @@ Suma::execALTER_TAB_CONF(Signal *signal)
}
data->senderData= subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
Callback c = { 0, 0 };
sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB, ptr, 1, c);
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
}
}
......@@ -4668,7 +4693,9 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
const Uint32 nptr= reformat(signal, ptr,
src, sz_1,
src + sz_1, sz - 2 - sz_1);
Uint32 ptrLen= 0;
for(Uint32 i =0; i < nptr; i++)
ptrLen+= ptr[i].sz;
/**
* Signal to subscriber(s)
*/
......@@ -4680,6 +4707,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
data->tableId = tabPtr.p->m_tableId;
data->operation = event;
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
{
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
......
......@@ -70,7 +70,7 @@ public:
void execGET_TABLEID_REF(Signal* signal);
void execDROP_TAB_CONF(Signal* signal);
void execALTER_TAB_CONF(Signal* signal);
void execALTER_TAB_REQ(Signal* signal);
void execCREATE_TAB_CONF(Signal* signal);
/**
* Scan interface
......
......@@ -76,7 +76,7 @@ Suma::Suma(const Configuration & conf) :
* Dict interface
*/
addRecSignal(GSN_DROP_TAB_CONF, &Suma::execDROP_TAB_CONF);
addRecSignal(GSN_ALTER_TAB_CONF, &Suma::execALTER_TAB_CONF);
addRecSignal(GSN_ALTER_TAB_REQ, &Suma::execALTER_TAB_REQ);
addRecSignal(GSN_CREATE_TAB_CONF, &Suma::execCREATE_TAB_CONF);
#if 0
......
......@@ -827,6 +827,12 @@ NdbDictionary::Event::setTable(const Table& table)
m_impl.setTable(table);
}
const NdbDictionary::Table *
NdbDictionary::Event::getTable() const
{
return m_impl.getTable();
}
void
NdbDictionary::Event::setTable(const char * table)
{
......
......@@ -1069,7 +1069,6 @@ void NdbEventImpl::init()
{
m_eventId= RNIL;
m_eventKey= RNIL;
m_tableId= RNIL;
mi_type= 0;
m_dur= NdbDictionary::Event::ED_UNDEFINED;
m_mergeEvents = false;
......@@ -1081,6 +1080,8 @@ NdbEventImpl::~NdbEventImpl()
{
for (unsigned i = 0; i < m_columns.size(); i++)
delete m_columns[i];
if (m_tableImpl)
delete m_tableImpl;
}
void NdbEventImpl::setName(const char * name)
......@@ -1096,10 +1097,29 @@ const char *NdbEventImpl::getName() const
void
NdbEventImpl::setTable(const NdbDictionary::Table& table)
{
m_tableImpl= &NdbTableImpl::getImpl(table);
setTable(&NdbTableImpl::getImpl(table));
m_tableName.assign(m_tableImpl->getName());
}
void
NdbEventImpl::setTable(NdbTableImpl *tableImpl)
{
DBUG_ASSERT(tableImpl->m_status != NdbDictionary::Object::Invalid);
if (!m_tableImpl)
m_tableImpl = new NdbTableImpl();
// Copy table, since event might be accessed from different threads
m_tableImpl->assign(*tableImpl);
}
const NdbDictionary::Table *
NdbEventImpl::getTable() const
{
if (m_tableImpl)
return m_tableImpl->m_facade;
else
return NULL;
}
void
NdbEventImpl::setTable(const char * table)
{
......@@ -1248,6 +1268,21 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
return info;
}
void
NdbDictionaryImpl::putTable(NdbTableImpl *impl)
{
m_globalHash->lock();
m_globalHash->put(impl->m_internalName.c_str(), impl);
m_globalHash->unlock();
Ndb_local_table_info *info=
Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(impl->m_internalName.c_str(), info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
}
#if 0
bool
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
......@@ -3075,13 +3110,11 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
evnt.getTableName()));
DBUG_RETURN(-1);
}
evnt.setTable(tab);
}
DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_id, tab->m_version));
evnt.m_tableId = tab->m_id;
evnt.m_tableVersion = tab->m_version;
evnt.m_tableImpl = tab;
NdbTableImpl &table = *evnt.m_tableImpl;
int attributeList_sz = evnt.m_attrIds.size();
......@@ -3103,7 +3136,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
attributeList_sz = evnt.m_columns.size();
DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
evnt.m_tableId, evnt.m_tableVersion,
table.m_id, table.m_version,
evnt.m_name.c_str(),
evnt.m_columns.size()));
......@@ -3211,11 +3244,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
req->setRequestType(CreateEvntReq::RT_USER_GET);
} else {
DBUG_PRINT("info",("tableId: %u tableVersion: %u",
evnt.m_tableId, evnt.m_tableVersion));
evnt.m_tableImpl->m_id,
evnt.m_tableImpl->m_version));
// creating event in Dictionary
req->setRequestType(CreateEvntReq::RT_USER_CREATE);
req->setTableId(evnt.m_tableId);
req->setTableVersion(evnt.m_tableVersion);
req->setTableId(evnt.m_tableImpl->m_id);
req->setTableVersion(evnt.m_tableImpl->m_version);
req->setAttrListBitmask(evnt.m_attrListBitmask);
req->setEventType(evnt.mi_type);
req->clearFlags();
......@@ -3266,14 +3300,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
// NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData();
if (getFlag) {
evnt.m_tableId = evntConf->getTableId();
evnt.m_tableVersion = evntConf->getTableVersion();
evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
evnt.mi_type = evntConf->getEventType();
evnt.setTable(dataPtr);
} else {
if (evnt.m_tableId != evntConf->getTableId() ||
evnt.m_tableVersion != evntConf->getTableVersion() ||
if (evnt.m_tableImpl->m_id != evntConf->getTableId() ||
evnt.m_tableImpl->m_version != evntConf->getTableVersion() ||
//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
evnt.mi_type != evntConf->getEventType()) {
ndbout_c("ERROR*************");
......@@ -3384,9 +3416,6 @@ NdbDictionaryImpl::getEvent(const char * eventName)
// We only have the table name with internal name
DBUG_PRINT("info",("table %s", ev->getTableName()));
Ndb_local_table_info *info;
int retry= 0;
while (1)
{
info= get_local_table_info(ev->getTableName(), true);
if (info == 0)
{
......@@ -3394,40 +3423,32 @@ NdbDictionaryImpl::getEvent(const char * eventName)
delete ev;
DBUG_RETURN(NULL);
}
if (ev->m_tableId == info->m_table_impl->m_id &&
ev->m_tableVersion == info->m_table_impl->m_version)
break;
DBUG_PRINT("error",("%s: retry=%d: "
"table version mismatch, event: [%u,%u] table: [%u,%u]",
ev->getTableName(), retry,
ev->m_tableId, ev->m_tableVersion,
info->m_table_impl->m_id, info->m_table_impl->m_version));
if (retry)
if (info->m_table_impl->m_status == NdbDictionary::Object::Invalid)
{
m_error.code= 241;
removeCachedObject(*info->m_table_impl);
info= get_local_table_info(ev->getTableName(), true);
if (info == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
delete ev;
DBUG_RETURN(NULL);
}
invalidateObject(*info->m_table_impl);
retry++;
}
ev->m_tableImpl= info->m_table_impl;
ev->setTable(info->m_table_impl);
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
// get the columns from the attrListBitmask
NdbTableImpl &table = *ev->m_tableImpl;
AttributeMask & mask = ev->m_attrListBitmask;
unsigned attributeList_sz = mask.count();
DBUG_PRINT("info",("Table: id: %d version: %d", table.m_id, table.m_version));
DBUG_PRINT("info",("Table: id: %d version: %d",
table.m_id, table.m_version));
#ifndef DBUG_OFF
char buf[128] = {0};
mask.getText(buf);
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", attributeList_sz, buf));
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s",
attributeList_sz, buf));
#endif
......
......@@ -261,6 +261,12 @@ public:
};
class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
friend class NdbDictInterface;
friend class NdbDictionaryImpl;
friend class NdbEventOperation;
friend class NdbEventOperationImpl;
friend class NdbEventBuffer;
friend class EventBufData_hash;
public:
NdbEventImpl();
NdbEventImpl(NdbDictionary::Event &);
......@@ -270,6 +276,7 @@ public:
void setName(const char * name);
const char * getName() const;
void setTable(const NdbDictionary::Table& table);
const NdbDictionary::Table * getTable() const;
void setTable(const char * table);
const char * getTableName() const;
void addTableEvent(const NdbDictionary::Event::TableEvent t);
......@@ -287,8 +294,6 @@ public:
Uint32 m_eventId;
Uint32 m_eventKey;
Uint32 m_tableId;
Uint32 m_tableVersion;
AttributeMask m_attrListBitmask;
BaseString m_name;
Uint32 mi_type;
......@@ -296,7 +301,6 @@ public:
NdbDictionary::Event::EventReport m_rep;
bool m_mergeEvents;
NdbTableImpl *m_tableImpl;
BaseString m_tableName;
Vector<NdbColumnImpl *> m_columns;
Vector<unsigned> m_attrIds;
......@@ -304,6 +308,9 @@ public:
static NdbEventImpl & getImpl(NdbDictionary::Event & t);
static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
NdbDictionary::Event * m_facade;
private:
NdbTableImpl *m_tableImpl;
void setTable(NdbTableImpl *tableImpl);
};
struct NdbFilegroupImpl : public NdbDictObjectImpl {
......@@ -561,6 +568,7 @@ public:
int listIndexes(List& list, Uint32 indexId);
NdbTableImpl * getTable(const char * tableName, void **data= 0);
void putTable(NdbTableImpl *impl);
Ndb_local_table_info* get_local_table_info(
const BaseString& internalTableName, bool do_add_blob_tables);
NdbIndexImpl * getIndex(const char * indexName,
......
......@@ -97,6 +97,26 @@ NdbEventOperation::hasError() const
return m_impl.m_has_error;
}
const bool NdbEventOperation::tableNameChanged() const
{
return m_impl.tableNameChanged();
}
const bool NdbEventOperation::tableFrmChanged() const
{
return m_impl.tableFrmChanged();
}
const bool NdbEventOperation::tableFragmentationChanged() const
{
return m_impl.tableFragmentationChanged();
}
const bool NdbEventOperation::tableRangeListChanged() const
{
return m_impl.tableRangeListChanged();
}
Uint64
NdbEventOperation::getGCI() const
{
......@@ -124,10 +144,6 @@ NdbEventOperation::print()
/*
* Internal for the mysql server
*/
const NdbDictionary::Table *NdbEventOperation::getTable() const
{
return m_impl.m_eventImpl->m_tableImpl->m_facade;
}
const NdbDictionary::Event *NdbEventOperation::getEvent() const
{
return m_impl.m_eventImpl->m_facade;
......@@ -148,6 +164,7 @@ const NdbRecAttr* NdbEventOperation::getFirstDataPreAttr() const
{
return m_impl.theFirstDataAttrs[1];
}
/*
bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
{
DBUG_ENTER("NdbEventOperation::validateTable");
......@@ -159,7 +176,7 @@ bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
}
DBUG_RETURN(res);
}
*/
void NdbEventOperation::setCustomData(void * data)
{
m_impl.m_custom_data= data;
......
......@@ -41,6 +41,7 @@
#include <NdbBlob.hpp>
#include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp"
#include <signaldata/AlterTable.hpp>
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
......@@ -92,6 +93,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
const char* eventName)
: NdbEventOperation(*this), m_facade(&N), m_magic_number(0),
m_ndb(theNdb), m_state(EO_ERROR), mi_type(0), m_oid(~(Uint32)0),
m_change_mask(0),
#ifdef VM_TRACE
m_data_done_count(0), m_data_count(0),
#endif
......@@ -553,6 +555,26 @@ NdbEventOperationImpl::stop()
DBUG_RETURN(r);
}
const bool NdbEventOperationImpl::tableNameChanged() const
{
return (bool)AlterTableReq::getNameFlag(m_change_mask);
}
const bool NdbEventOperationImpl::tableFrmChanged() const
{
return (bool)AlterTableReq::getFrmFlag(m_change_mask);
}
const bool NdbEventOperationImpl::tableFragmentationChanged() const
{
return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
}
const bool NdbEventOperationImpl::tableRangeListChanged() const
{
return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
}
Uint64
NdbEventOperationImpl::getGCI()
{
......@@ -565,6 +587,35 @@ NdbEventOperationImpl::getLatestGCI()
return m_ndb->theEventBuffer->getLatestGCI();
}
bool
NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
LinearSectionPtr ptr[3])
{
DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
const SubTableData * const sdata=
CAST_CONSTPTR(SubTableData, signal->getDataPtr());
if(signal->isFirstFragment()){
m_fragmentId = signal->getFragmentId();
m_buffer.grow(4 * sdata->totalLen);
} else {
if(m_fragmentId != signal->getFragmentId()){
abort();
}
}
const Uint32 i = SubTableData::DICT_TAB_INFO;
DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
4 * ptr[i].sz, m_fragmentId));
m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
if(!signal->isLastFragment()){
DBUG_RETURN(FALSE);
}
DBUG_RETURN(TRUE);
}
int
NdbEventOperationImpl::receive_event()
{
......@@ -573,6 +624,26 @@ NdbEventOperationImpl::receive_event()
Uint32 operation= (Uint32)m_data_item->sdata->operation;
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
if (operation == NdbDictionary::Event::_TE_ALTER)
{
// Parse the new table definition and
// create a table object
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
NdbError error;
NdbDictInterface dif(error);
NdbTableImpl *at;
m_change_mask = m_data_item->sdata->changeMask;
error.code = dif.parseTableInfo(&at,
(Uint32*)m_buffer.get_data(),
m_buffer.length() / 4,
true);
m_buffer.clear();
if ( m_eventImpl->m_tableImpl)
delete m_eventImpl->m_tableImpl;
m_eventImpl->m_tableImpl = at;
}
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
{
DBUG_RETURN_EVENT(1);
......
......@@ -22,6 +22,7 @@
#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <UtilBuffer.hpp>
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
......@@ -200,8 +201,14 @@ public:
NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
int receive_event();
const bool tableNameChanged() const;
const bool tableFrmChanged() const;
const bool tableFragmentationChanged() const;
const bool tableRangeListChanged() const;
Uint64 getGCI();
Uint64 getLatestGCI();
bool execSUB_TABLE_DATA(NdbApiSignal * signal,
LinearSectionPtr ptr[3]);
NdbDictionary::Event::TableEvent getEventType();
......@@ -240,6 +247,12 @@ public:
void *m_custom_data;
int m_has_error;
Uint32 m_fragmentId;
UtilBuffer m_buffer;
// Bit mask for what has changed in a table (for TE_ALTER event)
Uint32 m_change_mask;
#ifdef VM_TRACE
Uint32 m_data_done_count;
Uint32 m_data_count;
......
......@@ -703,7 +703,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
aSignal, ptr);
return;
case GSN_SUB_META_DATA:
case GSN_SUB_REMOVE_CONF:
case GSN_SUB_REMOVE_REF:
return; // ignore these signals
......@@ -726,6 +725,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
const SubTableData * const sdata=
CAST_CONSTPTR(SubTableData, aSignal->getDataPtr());
const Uint32 oid = sdata->senderData;
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
if (op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER)
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
"number");
// Accumulate DIC_TAB_INFO for TE_ALTER events
if (sdata->operation == NdbDictionary::Event::_TE_ALTER &&
!op->execSUB_TABLE_DATA(aSignal, ptr))
return;
for (int i= aSignal->m_noOfSections;i < 3; i++) {
ptr[i].p = NULL;
......@@ -736,12 +745,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
sdata->senderData, sdata->gci, sdata->operation,
sdata->tableId));
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
if (op->m_magic_number == NDB_EVENT_OP_MAGIC_NUMBER)
theEventBuffer->insertDataL(op,sdata, ptr);
else
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
"number");
return;
}
case GSN_DIHNDBTAMPER:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment