Commit 9df57eba authored by Olivier Bertrand's avatar Olivier Bertrand

- Adding parallelism to the TBL table type

modified:
  storage/connect/tabcol.h
  storage/connect/tabtbl.cpp
  storage/connect/tabtbl.h
  storage/connect/value.cpp
parent 0a01953c
......@@ -17,6 +17,7 @@
/***********************************************************************/
class DllExport XTAB: public BLOCK { // Table Name-Owner-Srcdef block.
friend class TDBPRX;
friend class TDBTBM;
public:
// Constructors
XTAB(LPCSTR name, LPCSTR srcdef = NULL);
......
/************* TabTbl C++ Program Source Code File (.CPP) **************/
/* PROGRAM NAME: TABTBL */
/* ------------- */
/* Version 1.5 */
/* Version 1.6 */
/* */
/* COPYRIGHT: */
/* ---------- */
......@@ -77,6 +77,16 @@
#include "ha_connect.h"
#include "mycat.h" // For GetHandler
#if defined(WIN32)
#if defined(__BORLANDC__)
#define SYSEXIT void _USERENTRY
#else
#define SYSEXIT void
#endif
#else // !WIN32
#define SYSEXIT void *
#endif // !WIN32
extern "C" int trace;
/* ---------------------------- Class TBLDEF ---------------------------- */
......@@ -87,6 +97,9 @@ extern "C" int trace;
TBLDEF::TBLDEF(void)
{
//To_Tables = NULL;
Accept = false;
Thread = false;
Maxerr = 0;
Ntables = 0;
Pseudo = 3;
} // end of TBLDEF constructor
......@@ -143,8 +156,9 @@ bool TBLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
} // endfor pdb
Maxerr = Cat->GetIntCatInfo("Maxerr", 0);
Accept = (Cat->GetBoolCatInfo("Accept", 0) != 0);
} // endif fsec || tablist
Accept = Cat->GetBoolCatInfo("Accept", false);
Thread = Cat->GetBoolCatInfo("Thread", false);
} // endif tablist
return FALSE;
} // end of DefineAM
......@@ -156,6 +170,8 @@ PTDB TBLDEF::GetTable(PGLOBAL g, MODE m)
{
if (Catfunc == FNC_COL)
return new(g) TDBTBC(this);
else if (Thread)
return new(g) TDBTBM(this);
else
return new(g) TDBTBL(this);
......@@ -173,7 +189,7 @@ TDBTBL::TDBTBL(PTBLDEF tdp) : TDBPRX(tdp)
//Tdbp = NULL;
Accept = tdp->Accept;
Maxerr = tdp->Maxerr;
Nbf = 0;
Nbc = 0;
Rows = 0;
Crp = 0;
// NTables = 0;
......@@ -227,7 +243,7 @@ bool TDBTBL::InitTableList(PGLOBAL g)
// Get the table description block of this table
if (!(Tdbp = GetSubTable(g, tabp))) {
if (++Nbf > Maxerr)
if (++Nbc > Maxerr)
return TRUE; // Error return
else
continue; // Skip this table
......@@ -389,7 +405,7 @@ bool TDBTBL::OpenDB(PGLOBAL g)
/*********************************************************************/
if (To_Filter && Tablist) {
Tablist = NULL;
Nbf = 0;
Nbc = 0;
} // endif To_Filter
/*********************************************************************/
......@@ -497,4 +513,262 @@ void TBTBLK::ReadColumn(PGLOBAL g)
} // end of ReadColumn
/* ------------------------- Class TDBTBM ---------------------------- */
/***********************************************************************/
/* Thread routine that check and open one remote connection. */
/***********************************************************************/
pthread_handler_t ThreadOpen(void *p)
{
PTBMT cmp = (PTBMT)p;
if (!my_thread_init()) {
set_current_thd(cmp->Thd);
// Try to open the connection
if (!cmp->Tap->GetTo_Tdb()->OpenDB(cmp->G)) {
cmp->Ready = true;
} else
cmp->Rc = RC_FX;
my_thread_end();
} else
cmp->Rc = RC_FX;
return NULL;
} // end of ThreadOpen
/***********************************************************************/
/* TDBTBM constructors. */
/***********************************************************************/
TDBTBM::TDBTBM(PTBLDEF tdp) : TDBTBL(tdp)
{
Tmp = NULL; // To data table TBMT structures
Cmp = NULL; // Current data table TBMT
Bmp = NULL; // To bad (unconnected) TBMT structures
Done = false; // TRUE after first GetAllResults
Nrc = 0; // Number of remote connections
Nlc = 0; // Number of local connections
} // end of TDBTBL standard constructor
/***********************************************************************/
/* Reset read/write position values. */
/***********************************************************************/
void TDBTBM::ResetDB(void)
{
for (PCOL colp = Columns; colp; colp = colp->GetNext())
if (colp->GetAmType() == TYPE_AM_TABID)
colp->COLBLK::Reset();
for (PTABLE tabp = Tablist; tabp; tabp = tabp->GetNext())
((PTDBASE)tabp->GetTo_Tdb())->ResetDB();
Tdbp = (PTDBASE)Tablist->GetTo_Tdb();
Crp = 0;
} // end of ResetDB
/***********************************************************************/
/* Returns RowId if b is false or Rownum if b is true. */
/***********************************************************************/
int TDBTBM::RowNumber(PGLOBAL g, bool b)
{
return Tdbp->RowNumber(g) + ((b) ? 0 : Rows);
} // end of RowNumber
/***********************************************************************/
/* Initialyze table parallel processing. */
/***********************************************************************/
bool TDBTBM::OpenTables(PGLOBAL g)
{
int k;
THD *thd = current_thd;
PTABLE tabp, *ptabp = &Tablist;
PTBMT tp, *ptp = &Tmp;
// Allocates the TBMT blocks for the tables
for (tabp = Tablist; tabp; tabp = tabp->Next)
if (tabp->GetTo_Tdb()->GetAmType() == TYPE_AM_MYSQL) {
// Remove remote table from the local list
*ptabp = tabp->Next;
// Make the remote table block
tp = (PTBMT)PlugSubAlloc(g, NULL, sizeof(TBMT));
memset(tp, 0, sizeof(TBMT));
tp->G = g;
tp->Tap = tabp;
tp->Thd = thd;
// Create the thread that will do the table opening.
pthread_attr_init(&tp->attr);
// pthread_attr_setdetachstate(&tp->attr, PTHREAD_CREATE_JOINABLE);
if ((k = pthread_create(&tp->Tid, &tp->attr, ThreadOpen, tp))) {
sprintf(g->Message, "pthread_create error %d", k);
Nbc++;
continue;
} // endif k
// Add it to the remote list
*ptp = tp;
ptp = &tp->Next;
Nrc++; // Number of remote connections
} else {
ptabp = &tabp->Next;
Nlc++; // Number of local connections
} // endif Type
return false;
} // end of OpenTables
/***********************************************************************/
/* TBL Access Method opening routine. */
/* Open first file, other will be opened sequencially when reading. */
/***********************************************************************/
bool TDBTBM::OpenDB(PGLOBAL g)
{
if (trace)
htrc("TBM OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n",
this, Tdb_No, Use, To_Key_Col, Mode);
if (Use == USE_OPEN) {
/*******************************************************************/
/* Table already open, replace it at its beginning. */
/*******************************************************************/
ResetDB();
return Tdbp->OpenDB(g); // Re-open fist table
} // endif use
#if 0
/*********************************************************************/
/* When GetMaxsize was called, To_Filter was not set yet. */
/*********************************************************************/
if (To_Filter && Tablist) {
Tablist = NULL;
Nbc = 0;
} // endif To_Filter
#endif // 0
/*********************************************************************/
/* Make the table list. */
/*********************************************************************/
if (/*!Tablist &&*/ InitTableList(g))
return TRUE;
/*********************************************************************/
/* Open all remote tables of the list. */
/*********************************************************************/
if (OpenTables(g))
return TRUE;
/*********************************************************************/
/* Proceed with local tables. */
/*********************************************************************/
if ((CurTable = Tablist)) {
Tdbp = (PTDBASE)CurTable->GetTo_Tdb();
Tdbp->SetMode(Mode);
// Check and initialize the subtable columns
for (PCOL cp = Columns; cp; cp = cp->GetNext())
if (cp->GetAmType() == TYPE_AM_TABID)
cp->COLBLK::Reset();
else if (((PPRXCOL)cp)->Init(g) && !Accept)
return TRUE;
if (trace)
htrc("Opening subtable %s\n", Tdbp->GetName());
// Now we can safely open the table
if (Tdbp->OpenDB(g))
return TRUE;
} // endif *Tablist
Use = USE_OPEN;
return FALSE;
} // end of OpenDB
/***********************************************************************/
/* ReadDB: Data Base read routine for MUL access method. */
/***********************************************************************/
int TDBTBM::ReadDB(PGLOBAL g)
{
int rc;
if (!Done) {
// Get result from local tables
if ((rc = TDBTBL::ReadDB(g)) != RC_EF)
return rc;
else if ((rc = ReadNextRemote(g)) != RC_OK)
return rc;
Done = true;
} // endif Done
/*********************************************************************/
/* Now start the reading process of remote tables. */
/*********************************************************************/
retry:
rc = Tdbp->ReadDB(g);
if (rc == RC_EF) {
// Total number of rows met so far
Rows += Tdbp->RowNumber(g) - 1;
Crp += Tdbp->GetProgMax(g);
Cmp->Complete = true;
if ((rc = ReadNextRemote(g)) == RC_OK)
goto retry;
} else if (rc == RC_FX)
strcat(strcat(strcat(g->Message, " ("), Tdbp->GetName()), ")");
return rc;
} // end of ReadDB
/***********************************************************************/
/* ReadNext: Continue reading from next table. */
/***********************************************************************/
int TDBTBM::ReadNextRemote(PGLOBAL g)
{
bool b = false;
if (Tdbp)
Tdbp->CloseDB(g);
Cmp = NULL;
retry:
// Search for a remote table having its result set
for (PTBMT tp = Tmp; tp; tp = tp->Next)
if (tp->Ready) {
if (!tp->Complete)
Cmp = tp;
} else
b = true;
if (!Cmp) {
if (b) { // more result to come
// sleep(20);
goto retry;
} else
return RC_EF;
} // endif Curtable
Tdbp = (PTDBASE)Cmp->Tap->GetTo_Tdb();
// Check and initialize the subtable columns
for (PCOL cp = Columns; cp; cp = cp->GetNext())
if (cp->GetAmType() == TYPE_AM_TABID)
cp->COLBLK::Reset();
else if (((PPRXCOL)cp)->Init(g) && !Accept)
return RC_FX;
if (trace)
htrc("Reading subtable %s\n", Tdbp->GetName());
return RC_OK;
} // end of ReadNextRemote
/* ------------------------------------------------------------------- */
/*************** TabTbl H Declares Source Code File (.H) ***************/
/* Name: TABTBL.H Version 1.2 */
/* Name: TABTBL.H Version 1.3 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2008-2013 */
/* */
......@@ -11,7 +11,29 @@
typedef class TBLDEF *PTBLDEF;
typedef class TDBTBL *PTDBTBL;
typedef class TDBTBM *PTDBTBM;
typedef class MYSQLC *PMYC;
/***********************************************************************/
/* Defines the structures used for distributed TBM tables. */
/***********************************************************************/
typedef struct _TBMtable *PTBMT;
typedef struct _TBMtable {
PTBMT Next; // Points to next data table struct
PTABLE Tap; // Points to the sub table
PGLOBAL G; // Needed in thread routine
bool Complete; // TRUE when all results are read
bool Ready; // TRUE when results are there
int Rows; // Total number of rows read so far
int ProgCur; // Current pos
int ProgMax; // Max pos
int Rc; // Return code
THD *Thd;
pthread_attr_t attr; // ???
pthread_t Tid; // CheckOpen thread ID
} TBMT;
/***********************************************************************/
/* TBL table. */
/***********************************************************************/
......@@ -32,6 +54,7 @@ class DllExport TBLDEF : public PRXDEF { /* Logical table description */
protected:
// Members
bool Accept; /* TRUE if bad tables are accepted */
bool Thread; /* Use thread for remote tables */
int Maxerr; /* Maximum number of bad tables */
int Ntables; /* Number of tables */
}; // end of TBLDEF
......@@ -41,7 +64,6 @@ class DllExport TBLDEF : public PRXDEF { /* Logical table description */
/***********************************************************************/
class DllExport TDBTBL : public TDBPRX {
friend class TBTBLK;
friend class TDBPLG;
public:
// Constructor
TDBTBL(PTBLDEF tdp = NULL);
......@@ -51,8 +73,8 @@ class DllExport TDBTBL : public TDBPRX {
// Methods
virtual void ResetDB(void);
virtual int GetRecpos(void) {return Rows;}
virtual int GetBadLines(void) {return (int)Nbf;}
virtual int GetRecpos(void) {return Rows;}
virtual int GetBadLines(void) {return (int)Nbc;}
// Database routines
virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n);
......@@ -72,7 +94,7 @@ class DllExport TDBTBL : public TDBPRX {
PTABLE CurTable; // Points to the current table
bool Accept; // TRUE if bad tables are accepted
int Maxerr; // Maximum number of bad tables
int Nbf; // Number of bad connections
int Nbc; // Number of bad connections
int Rows; // Used for RowID
int Crp; // Used for CurPos
}; // end of class TDBTBL
......@@ -100,3 +122,44 @@ class TBTBLK : public TIDBLK {
protected:
// Must not have additional members
}; // end of class TBTBLK
/***********************************************************************/
/* This is the TBM Access Method class declaration. */
/***********************************************************************/
class DllExport TDBTBM : public TDBTBL {
friend class TBTBLK;
public:
// Constructor
TDBTBM(PTBLDEF tdp = NULL);
// Implementation
//virtual AMT GetAmType(void) {return TYPE_AM_TBL;}
// Methods
virtual void ResetDB(void);
//virtual int GetRecpos(void) {return Rows;}
//virtual int GetBadLines(void) {return (int)Nbc;}
// Database routines
//virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n);
virtual int GetMaxSize(PGLOBAL g) {return 10;} // Temporary
virtual int RowNumber(PGLOBAL g, bool b = FALSE);
//virtual PCOL InsertSpecialColumn(PGLOBAL g, PCOL scp);
virtual bool OpenDB(PGLOBAL g);
virtual int ReadDB(PGLOBAL g);
protected:
// Internal functions
//bool InitTableList(PGLOBAL g);
//bool TestFil(PGLOBAL g, PFIL filp, PTABLE tabp);
bool OpenTables(PGLOBAL g);
int ReadNextRemote(PGLOBAL g);
// Members
PTBMT Tmp; // To data table TBMT structures
PTBMT Cmp; // Current data table PLGF (to move to TDBTBL)
PTBMT Bmp; // To bad (unconnected) PLGF structures
bool Done; // TRUE after first GetAllResults
int Nrc; // Number of remote connections
int Nlc; // Number of local connections
}; // end of class TDBTBM
......@@ -566,7 +566,7 @@ void TYPVAL<TYPE>::SetValue_char(char *p, int n)
if (minus && Tval)
Tval = - Tval;
if (trace)
if (trace > 1)
htrc(strcat(strcat(strcpy(buf, " setting %s to: "), Fmt), "\n"),
GetTypeName(Type), Tval);
......@@ -585,7 +585,7 @@ void TYPVAL<double>::SetValue_char(char *p, int n)
buf[n] = '\0';
Tval = atof(buf);
if (trace)
if (trace > 1)
htrc(" setting double: '%s' -> %lf\n", buf, Tval);
Null = false;
......@@ -900,7 +900,7 @@ void TYPVAL<PSZ>::SetValue_char(char *p, int n)
*(++p) = '\0';
if (trace)
if (trace > 1)
htrc(" Setting string to: '%s'\n", Strp);
Null = false;
......@@ -1291,7 +1291,7 @@ bool DTVAL::MakeTime(struct tm *ptm)
int n, y = ptm->tm_year;
time_t t = mktime_mysql(ptm);
if (trace)
if (trace > 1)
htrc("MakeTime from (%d,%d,%d,%d,%d,%d)\n",
ptm->tm_year, ptm->tm_mon, ptm->tm_mday,
ptm->tm_hour, ptm->tm_min, ptm->tm_sec);
......@@ -1314,7 +1314,7 @@ bool DTVAL::MakeTime(struct tm *ptm)
}
Tval= (int) t;
if (trace)
if (trace > 1)
htrc("MakeTime Ival=%d\n", Tval);
return false;
......@@ -1334,7 +1334,7 @@ bool DTVAL::MakeDate(PGLOBAL g, int *val, int nval)
datm.tm_mon=0;
datm.tm_year=70;
if (trace)
if (trace > 1)
htrc("MakeDate from(%d,%d,%d,%d,%d,%d) nval=%d\n",
val[0], val[1], val[2], val[3], val[4], val[5], nval);
......@@ -1398,7 +1398,7 @@ bool DTVAL::MakeDate(PGLOBAL g, int *val, int nval)
} // endfor i
if (trace)
if (trace > 1)
htrc("MakeDate datm=(%d,%d,%d,%d,%d,%d)\n",
datm.tm_year, datm.tm_mon, datm.tm_mday,
datm.tm_hour, datm.tm_min, datm.tm_sec);
......@@ -1459,7 +1459,7 @@ void DTVAL::SetValue_char(char *p, int n)
ndv = ExtractDate(Sdate, Pdtp, DefYear, dval);
MakeDate(NULL, dval, ndv);
if (trace)
if (trace > 1)
htrc(" setting date: '%s' -> %d\n", Sdate, Tval);
Null = false;
......@@ -1483,7 +1483,7 @@ void DTVAL::SetValue_psz(PSZ p)
ndv = ExtractDate(Sdate, Pdtp, DefYear, dval);
MakeDate(NULL, dval, ndv);
if (trace)
if (trace > 1)
htrc(" setting date: '%s' -> %d\n", Sdate, Tval);
Null = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment