Commit 32298c75 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - rbr blobs etc: set db/schema in injector_ndb before calling NDB

parent 5756c30c
......@@ -2065,7 +2065,19 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
DBUG_RETURN(-1);
}
NdbEventOperation *op= ndb->createEventOperation(event_name);
NdbEventOperation* op;
if (do_schema_share)
op= ndb->createEventOperation(event_name);
else
{
// set injector_ndb database/schema from table internal name
int ret= ndb->setDatabaseAndSchemaName(ndbtab);
assert(ret == 0);
op= ndb->createEventOperation(event_name);
// reset to catch errors
ndb->setDatabaseName("");
ndb->setDatabaseSchemaName("");
}
if (!op)
{
pthread_mutex_unlock(&injector_mutex);
......@@ -2632,7 +2644,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
goto err;
}
if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
// empty database and schema
if (!(ndb= new Ndb(g_ndb_cluster_connection, "", "")) ||
ndb->init())
{
sql_print_error("NDB Binlog: Getting Ndb object failed");
......@@ -2885,11 +2898,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
DBUG_ASSERT(share != 0);
}
#endif
// set injector_ndb database/schema from table internal name
int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
assert(ret == 0);
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
else
ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
// reset to catch errors
ndb->setDatabaseName("");
ndb->setDatabaseSchemaName("");
pOp= ndb->nextEvent();
} while (pOp && pOp->getGCI() == gci);
......
......@@ -1147,6 +1147,15 @@ public:
*/
void setDatabaseSchemaName(const char * aDatabaseSchemaName);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** Set database and schema name to match previously retrieved table
*
* Returns non-zero if table internal name does not contain
* non-empty database and schema names
*/
int setDatabaseAndSchemaName(const NdbDictionary::Table* t);
#endif
/**
* Initializes the Ndb object
*
......
......@@ -884,6 +884,7 @@ public:
private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class Ndb;
friend class NdbDictionaryImpl;
friend class NdbTableImpl;
friend class NdbEventOperationImpl;
......
......@@ -1035,39 +1035,37 @@ convertEndian(Uint32 Data)
return Data;
#endif
}
// <internal>
const char * Ndb::getCatalogName() const
{
return theImpl->m_dbname.c_str();
}
void Ndb::setCatalogName(const char * a_catalog_name)
{
if (a_catalog_name)
{
// TODO can table_name_separator be escaped?
if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
theImpl->m_dbname.assign(a_catalog_name);
theImpl->update_prefix();
}
}
const char * Ndb::getSchemaName() const
{
return theImpl->m_schemaname.c_str();
}
void Ndb::setSchemaName(const char * a_schema_name)
{
if (a_schema_name) {
// TODO can table_name_separator be escaped?
if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
theImpl->m_schemaname.assign(a_schema_name);
theImpl->update_prefix();
}
}
// </internal>
/*
Deprecated functions
*/
const char * Ndb::getDatabaseName() const
{
return getCatalogName();
......@@ -1088,6 +1086,24 @@ void Ndb::setDatabaseSchemaName(const char * a_schema_name)
setSchemaName(a_schema_name);
}
int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
{
const char* s0 = t->m_impl.m_internalName.c_str();
const char* s1 = strchr(s0, table_name_separator);
if (s1 && s1 != s0) {
const char* s2 = strchr(s1 + 1, table_name_separator);
if (s2 && s2 != s1 + 1) {
char buf[200];
sprintf(buf, "%.*s", s1 - s0, s0);
setDatabaseName(buf);
sprintf(buf, "%.*s", s2 - (s1 + 1), s1 + 1);
setDatabaseSchemaName(buf);
return 0;
}
}
return -1;
}
bool Ndb::usingFullyQualifiedNames()
{
return fullyQualifiedNames;
......@@ -1149,9 +1165,16 @@ Ndb::internalize_table_name(const char *external_name) const
if (fullyQualifiedNames)
{
/* Internal table name format <db>/<schema>/<table>
<db>/<schema> is already available in m_prefix
<db>/<schema>/ is already available in m_prefix
so just concat the two strings
*/
#ifdef VM_TRACE
// verify that m_prefix looks like abc/def/
const char* s0 = theImpl->m_prefix.c_str();
const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
#endif
ret.assfmt("%s%s",
theImpl->m_prefix.c_str(),
external_name);
......
......@@ -61,22 +61,26 @@ NdbBlob::setState(State newState)
int
NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
{
DBUG_ENTER("NdbBlob::getBlobTableName");
NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
if (t == NULL)
return -1;
DBUG_RETURN(-1);
NdbColumnImpl* c = t->getColumn(columnName);
if (c == NULL)
return -1;
DBUG_RETURN(-1);
getBlobTableName(btname, t, c);
return 0;
DBUG_RETURN(0);
}
void
NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
{
assert(t != 0 && c != 0 && c->getBlobType());
DBUG_ENTER("NdbBlob::getBlobTableName");
assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
DBUG_PRINT("info", ("blob table name: %s", btname));
DBUG_VOID_RETURN;
}
void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment