Commit 36206acc authored by Olivier Bertrand's avatar Olivier Bertrand

Work on new MONGO table type

Handle discovery, insert, update and delete
Add support for Pipeline
  modified:   storage/connect/tabmgo.cpp
  modified:   storage/connect/tabmgo.h

Handle double call to CondPush
Cond moved to TDB
  modified:   storage/connect/ha_connect.cc
  modified:   storage/connect/tabext.cpp
  modified:   storage/connect/tabext.h
  modified:   storage/connect/tabjdbc.cpp
  modified:   storage/connect/table.cpp
  modified:   storage/connect/tabmysql.cpp
  modified:   storage/connect/tabodbc.cpp
  modified:   storage/connect/xtable.h

Add building Mongo selector to FILTER
  modified:   storage/connect/filter.cpp
  modified:   storage/connect/filter.h

Change Print function of values (needed by FILTER)
  modified:   storage/connect/value.cpp
  modified:   storage/connect/value.h

Fix crash when dbname is null forJSON MGO tables
  modified:   storage/connect/tabjson.cpp
  modified:   storage/connect/tabjson.h

Fix MDEV-12520: Decimal values can be truncated for JDBC tables
  modified:   storage/connect/jdbconn.cpp

Fix bug. Date value was null when retrieved from a json expanded array.
  modified:   storage/connect/tabjson.cpp
parent 0149f9c2
......@@ -38,6 +38,10 @@
//#include "token.h"
//#include "select.h"
#include "xindex.h"
#if defined(MONGO_SUPPORT)
#include "tabext.h"
#include "tabmgo.h"
#endif // MONGO_SUPPORT
/***********************************************************************/
/* Utility routines. */
......@@ -1405,6 +1409,88 @@ PFIL FILTER::Copy(PTABS t)
} // end of Copy
#endif // 0
/***********************************************************************/
/* Make selector json representation for Mongo tables. */
/***********************************************************************/
#if defined(MONGO_SUPPORT)
bool FILTER::MakeSelector(PGLOBAL g, PSTRG s)
{
s->Append('{');
if (Opc == OP_AND || Opc == OP_OR) {
if (GetArgType(0) != TYPE_FILTER || GetArgType(1) != TYPE_FILTER)
return true;
s->Append("\"$");
s->Append(Opc == OP_AND ? "and" : "or");
s->Append("\":[");
if (((PFIL)Arg(0))->MakeSelector(g, s))
return true;
s->Append(',');
if (((PFIL)Arg(1))->MakeSelector(g, s))
return true;
s->Append(']');
} else {
char buf[501];
if (GetArgType(0) != TYPE_COLBLK)
return true;
s->Append('"');
s->Append(((PMGOCOL)Arg(0))->Jpath);
s->Append("\":{\"$");
switch (Opc) {
case OP_EQ:
s->Append("eq");
break;
case OP_NE:
s->Append("ne");
break;
case OP_GT:
s->Append("gt");
break;
case OP_GE:
s->Append("gte");
break;
case OP_LT:
s->Append("lt");
break;
case OP_LE:
s->Append("lte");
break;
//case OP_NULL:
// s->Append("ne");
// break;
//case OP_LIKE:
// s->Append("ne");
// break;
//case OP_EXIST:
// s->Append("ne");
// break;
default:
return true;
} // endswitch Opc
s->Append("\":");
if (GetArgType(1) == TYPE_COLBLK)
return true;
Arg(1)->Print(g, buf, 500);
s->Append(buf);
s->Append('}');
} // endif Opc
s->Append('}');
return false;
} // end of MakeSelector
#endif // MONGO_SUPPORT
/*********************************************************************/
/* Make file output of FILTER contents. */
/*********************************************************************/
......
......@@ -61,7 +61,10 @@ class DllExport FILTER : public XOBJECT { /* Filter description block */
//virtual PXOB CheckSubQuery(PGLOBAL, PSQL);
//virtual bool CheckLocal(PTDB);
//virtual int CheckSpcCol(PTDB tdbp, int n);
virtual void Print(PGLOBAL g, FILE *f, uint n);
#if defined(MONGO_SUPPORT)
bool MakeSelector(PGLOBAL g, PSTRG s);
#endif // MONGO_SUPPORT
virtual void Print(PGLOBAL g, FILE *f, uint n);
virtual void Print(PGLOBAL g, char *ps, uint z);
// PFIL Linearize(bool nosep);
// PFIL Link(PGLOBAL g, PFIL fil2);
......
......@@ -211,6 +211,9 @@ PQRYRES OEMColumns(PGLOBAL g, PTOS topt, char *tab, char *db, bool info);
PQRYRES VirColumns(PGLOBAL g, bool info);
PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info);
PQRYRES XMLColumns(PGLOBAL g, char *db, char *tab, PTOS topt, bool info);
#if defined(MONGO_SUPPORT)
PQRYRES MGOColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info);
#endif // MONGO_SUPPORT
int TranslateJDBCType(int stp, char *tn, int prec, int& len, char& v);
void PushWarning(PGLOBAL g, THD *thd, int level);
bool CheckSelf(PGLOBAL g, TABLE_SHARE *s, const char *host,
......@@ -3072,11 +3075,11 @@ const COND *ha_connect::cond_push(const COND *cond)
PCFIL filp;
int rc;
if ((filp = tdbp->GetCondFil()) && filp->Cond == cond &&
if ((filp = tdbp->GetCondFil()) && tdbp->GetCond() == cond &&
filp->Idx == active_index && filp->Type == tty)
goto fin;
filp = new(g) CONDFIL(cond, active_index, tty);
filp = new(g) CONDFIL(active_index, tty);
rc = filp->Init(g, this);
if (rc == RC_INFO) {
......@@ -3095,6 +3098,8 @@ const COND *ha_connect::cond_push(const COND *cond)
if (trace)
htrc("cond_push: %s\n", filp->Body);
tdbp->SetCond(cond);
if (!x)
PlugSubAlloc(g, NULL, strlen(filp->Body) + 1);
else
......@@ -3104,8 +3109,16 @@ const COND *ha_connect::cond_push(const COND *cond)
} else if (x && cond)
tdbp->SetCondFil(filp); // Wrong filter
} else if (tty != TYPE_AM_JSN && tty != TYPE_AM_JSON)
tdbp->SetFilter(CondFilter(g, (Item *)cond));
} else if (tty != TYPE_AM_JSN && tty != TYPE_AM_JSON) {
if (!tdbp->GetCond() || tdbp->GetCond() != cond) {
tdbp->SetFilter(CondFilter(g, (Item *)cond));
if (tdbp->GetFilter())
tdbp->SetCond(cond);
} // endif cond
} // endif tty
} catch (int n) {
if (trace)
......@@ -5545,6 +5558,17 @@ static int connect_assisted_discovery(handlerton *, THD* thd,
ok = true;
break;
#if defined(MONGO_SUPPORT)
case TAB_MONGO:
dsn = strz(g, create_info->connect_string);
if (!dsn)
strcpy(g->Message, "Missing URI");
else
ok = true;
break;
#endif // MONGO_SUPPORT
case TAB_VIR:
ok = true;
break;
......@@ -5688,6 +5712,11 @@ static int connect_assisted_discovery(handlerton *, THD* thd,
case TAB_JSON:
qrp = JSONColumns(g, (char*)db, dsn, topt, fnc == FNC_COL);
break;
#if defined(MONGO_SUPPORT)
case TAB_MONGO:
qrp = MGOColumns(g, (char*)db, dsn, topt, fnc == FNC_COL);
break;
#endif // MONGO_SUPPORT
#if defined(LIBXML2_SUPPORT) || defined(DOMDOC_SUPPORT)
case TAB_XML:
qrp = XMLColumns(g, (char*)db, tab, topt, fnc == FNC_COL);
......
......@@ -1227,7 +1227,8 @@ void JDBConn::SetColumnValue(int rank, PSZ name, PVAL val)
case 12: // VARCHAR
case -1: // LONGVARCHAR
case 1: // CHAR
if (jb)
case 3: // DECIMAL
if (jb && ctyp != 3)
cn = (jstring)jb;
else if (!gmID(g, chrfldid, "StringField", "(ILjava/lang/String;)Ljava/lang/String;"))
cn = (jstring)env->CallObjectMethod(job, chrfldid, (jint)rank, jn);
......@@ -1253,7 +1254,7 @@ void JDBConn::SetColumnValue(int rank, PSZ name, PVAL val)
break;
case 8: // DOUBLE
case 2: // NUMERIC
case 3: // DECIMAL
//case 3: // DECIMAL
if (!gmID(g, dblfldid, "DoubleField", "(ILjava/lang/String;)D"))
val->SetValue((double)env->CallDoubleMethod(job, dblfldid, rank, jn));
else
......
......@@ -35,9 +35,9 @@
/***********************************************************************/
/* CONDFIL Constructor. */
/***********************************************************************/
CONDFIL::CONDFIL(const Item *cond, uint idx, AMT type)
CONDFIL::CONDFIL(uint idx, AMT type)
{
Cond = cond;
//Cond = cond;
Idx = idx;
Type = type;
Op = OP_XX;
......
......@@ -28,14 +28,14 @@ class ALIAS : public BLOCK {
class CONDFIL : public BLOCK {
public:
// Constructor
CONDFIL(const Item *cond, uint idx, AMT type);
CONDFIL(uint idx, AMT type);
// Functions
int Init(PGLOBAL g, PHC hc);
const char *Chk(const char *cln, bool *h);
// Members
const Item *Cond;
//const Item *Cond;
AMT Type;
uint Idx;
OPVAL Op;
......
......@@ -750,7 +750,7 @@ bool TDBJDBC::ReadKey(PGLOBAL g, OPVAL op, const key_range *kr)
To_CondFil->Body= (char*)PlugSubAlloc(g, NULL, 0);
*To_CondFil->Body= 0;
if ((To_CondFil = hc->CheckCond(g, To_CondFil, To_CondFil->Cond)))
if ((To_CondFil = hc->CheckCond(g, To_CondFil, Cond)))
PlugSubAlloc(g, NULL, strlen(To_CondFil->Body) + 1);
} // endif active_index
......
......@@ -47,6 +47,7 @@ TDB::TDB(PTABDEF tdp) : Tdb_No(++Tnum)
To_Orig = NULL;
To_Filter = NULL;
To_CondFil = NULL;
Cond = NULL;
Next = NULL;
Name = (tdp) ? tdp->GetName() : NULL;
To_Table = NULL;
......@@ -68,6 +69,7 @@ TDB::TDB(PTDB tdbp) : Tdb_No(++Tnum)
To_Orig = tdbp;
To_Filter = NULL;
To_CondFil = NULL;
Cond = NULL;
Next = NULL;
Name = tdbp->Name;
To_Table = tdbp->To_Table;
......
......@@ -27,6 +27,7 @@
#include "checklvl.h"
#include "resource.h"
#include "mycat.h" // for FNC_COL
#include "filter.h"
/***********************************************************************/
/* This should be an option. */
......@@ -34,46 +35,23 @@
#define MAXCOL 200 /* Default max column nb in result */
#define TYPE_UNKNOWN 12 /* Must be greater than other types */
typedef struct _jncol {
struct _jncol *Next;
char *Name;
char *Fmt;
int Type;
int Len;
int Scale;
bool Cbn;
bool Found;
} JCOL, *PJCL;
#if 0
/***********************************************************************/
/* JSONColumns: construct the result blocks containing the description */
/* of all the columns of a table contained inside a JSON file. */
/* MGOColumns: construct the result blocks containing the description */
/* of all the columns of a document contained inside MongoDB. */
/***********************************************************************/
PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info)
PQRYRES MGOColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info)
{
static int buftyp[] = {TYPE_STRING, TYPE_SHORT, TYPE_STRING, TYPE_INT,
TYPE_INT, TYPE_SHORT, TYPE_SHORT, TYPE_STRING};
TYPE_INT, TYPE_SHORT, TYPE_SHORT, TYPE_STRING};
static XFLD fldtyp[] = {FLD_NAME, FLD_TYPE, FLD_TYPENAME, FLD_PREC,
FLD_LENGTH, FLD_SCALE, FLD_NULL, FLD_FORMAT};
static unsigned int length[] = {0, 6, 8, 10, 10, 6, 6, 0};
char colname[65], fmt[129];
int i, j, lvl, n = 0;
int ncol = sizeof(buftyp) / sizeof(int);
PVAL valp;
JCOL jcol;
PJCL jcp, fjcp = NULL, pjcp = NULL;
PJPR *jrp, jpp;
PJSON jsp;
PJVAL jvp;
PJOB row;
PMGODEF tdp;
TDBMGO *tjnp = NULL;
PJTDB tjsp = NULL;
PQRYRES qrp;
PCOLRES crp;
jcol.Name = jcol.Fmt = NULL;
FLD_LENGTH, FLD_SCALE, FLD_NULL, FLD_FORMAT};
unsigned int length[] = {0, 6, 8, 10, 10, 6, 6, 0};
int ncol = sizeof(buftyp) / sizeof(int);
int i, n = 0;
PBCOL bcp;
MGODISC *mgd;
PQRYRES qrp;
PCOLRES crp;
if (info) {
length[0] = 128;
......@@ -81,265 +59,13 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info)
goto skipit;
} // endif info
if (GetIntegerTableOption(g, topt, "Multiple", 0)) {
strcpy(g->Message, "Cannot find column definition for multiple table");
return NULL;
} // endif Multiple
/*********************************************************************/
/* Open the input file. */
/*********************************************************************/
lvl = GetIntegerTableOption(g, topt, "Level", 0);
lvl = (lvl < 0) ? 0 : (lvl > 16) ? 16 : lvl;
tdp = new(g) MGODEF;
#if defined(ZIP_SUPPORT)
tdp->Entry = GetStringTableOption(g, topt, "Entry", NULL);
tdp->Zipped = GetBooleanTableOption(g, topt, "Zipped", false);
#endif // ZIP_SUPPORT
tdp->Fn = GetStringTableOption(g, topt, "Filename", NULL);
if (!tdp->Fn && !dsn) {
strcpy(g->Message, MSG(MISSING_FNAME));
return NULL;
} // endif Fn
tdp->Database = SetPath(g, db);
tdp->Objname = GetStringTableOption(g, topt, "Object", NULL);
tdp->Base = GetIntegerTableOption(g, topt, "Base", 0) ? 1 : 0;
tdp->Pretty = GetIntegerTableOption(g, topt, "Pretty", 2);
if (trace)
htrc("File %s objname=%s pretty=%d lvl=%d\n",
tdp->Fn, tdp->Objname, tdp->Pretty, lvl);
if (tdp->Uri = dsn) {
#if defined(MONGO_SUPPORT)
tdp->Collname = GetStringTableOption(g, topt, "Name", NULL);
tdp->Collname = GetStringTableOption(g, topt, "Tabname", tdp->Collname);
tdp->Schema = GetStringTableOption(g, topt, "Dbname", "test");
tdp->Options = GetStringTableOption(g, topt, "Colist", NULL);
tdp->Pretty = 0;
#else // !MONGO_SUPPORT
sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO");
return NULL;
#endif // !MONGO_SUPPORT
} // endif Uri
if (tdp->Pretty == 2) {
if (tdp->Zipped) {
#if defined(ZIP_SUPPORT)
tjsp = new(g) TDBJSON(tdp, new(g) UNZFAM(tdp));
#else // !ZIP_SUPPORT
sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "ZIP");
return NULL;
#endif // !ZIP_SUPPORT
} else
tjsp = new(g) TDBJSON(tdp, new(g) MAPFAM(tdp));
if (tjsp->MakeDocument(g))
return NULL;
jsp = (tjsp->GetDoc()) ? tjsp->GetDoc()->GetValue(0) : NULL;
} else {
if (!(tdp->Lrecl = GetIntegerTableOption(g, topt, "Lrecl", 0))) {
sprintf(g->Message, "LRECL must be specified for pretty=%d", tdp->Pretty);
return NULL;
} // endif lrecl
tdp->Ending = GetIntegerTableOption(g, topt, "Ending", CRLF);
if (tdp->Zipped) {
#if defined(ZIP_SUPPORT)
tjnp = new(g)TDBMGO(tdp, new(g) UNZFAM(tdp));
#else // !ZIP_SUPPORT
sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "ZIP");
return NULL;
#endif // !ZIP_SUPPORT
} else if (tdp->Uri) {
#if defined(MONGO_SUPPORT)
tjnp = new(g) TDBMGO(tdp, new(g) MGOFAM(tdp));
#else // !MONGO_SUPPORT
sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO");
return NULL;
#endif // !MONGO_SUPPORT
} else
tjnp = new(g) TDBMGO(tdp, new(g) DOSFAM(tdp));
tjnp->SetMode(MODE_READ);
#if USE_G
// Allocate the parse work memory
PGLOBAL G = (PGLOBAL)PlugSubAlloc(g, NULL, sizeof(GLOBAL));
memset(G, 0, sizeof(GLOBAL));
G->Sarea_Size = tdp->Lrecl * 10;
G->Sarea = PlugSubAlloc(g, NULL, G->Sarea_Size);
PlugSubSet(G, G->Sarea, G->Sarea_Size);
G->jump_level = 0;
tjnp->SetG(G);
#else
tjnp->SetG(g);
#endif
if (tjnp->OpenDB(g))
return NULL;
switch (tjnp->ReadDB(g)) {
case RC_EF:
strcpy(g->Message, "Void json table");
case RC_FX:
goto err;
default:
jsp = tjnp->GetRow();
} // endswitch ReadDB
} // endif pretty
if (!(row = (jsp) ? jsp->GetObject() : NULL)) {
strcpy(g->Message, "Can only retrieve columns from object rows");
goto err;
} // endif row
jcol.Next = NULL;
jcol.Found = true;
colname[64] = 0;
fmt[128] = 0;
jrp = (PJPR*)PlugSubAlloc(g, NULL, sizeof(PJPR) * lvl);
/*********************************************************************/
/* Analyse the JSON tree and define columns. */
/* Open MongoDB. */
/*********************************************************************/
for (i = 1; ; i++) {
for (jpp = row->GetFirst(); jpp; jpp = jpp->GetNext()) {
for (j = 0; j < lvl; j++)
jrp[j] = NULL;
more:
strncpy(colname, jpp->GetKey(), 64);
*fmt = 0;
j = 0;
jvp = jpp->GetVal();
retry:
if ((valp = jvp ? jvp->GetValue() : NULL)) {
jcol.Type = valp->GetType();
jcol.Len = valp->GetValLen();
jcol.Scale = valp->GetValPrec();
jcol.Cbn = valp->IsNull();
} else if (!jvp || jvp->IsNull()) {
jcol.Type = TYPE_UNKNOWN;
jcol.Len = jcol.Scale = 0;
jcol.Cbn = true;
} else if (j < lvl) {
if (!*fmt)
strcpy(fmt, colname);
jsp = jvp->GetJson();
switch (jsp->GetType()) {
case TYPE_JOB:
if (!jrp[j])
jrp[j] = jsp->GetFirst();
strncat(strncat(fmt, ":", 128), jrp[j]->GetKey(), 128);
strncat(strncat(colname, "_", 64), jrp[j]->GetKey(), 64);
jvp = jrp[j]->GetVal();
j++;
break;
case TYPE_JAR:
strncat(fmt, ":", 128);
jvp = jsp->GetValue(0);
break;
default:
sprintf(g->Message, "Logical error after %s", fmt);
goto err;
} // endswitch jsp
mgd = new(g) MGODISC(g, (int*)length);
goto retry;
} else {
jcol.Type = TYPE_STRING;
jcol.Len = 256;
jcol.Scale = 0;
jcol.Cbn = true;
} // endif's
// Check whether this column was already found
for (jcp = fjcp; jcp; jcp = jcp->Next)
if (!strcmp(colname, jcp->Name))
break;
if (jcp) {
if (jcp->Type != jcol.Type)
jcp->Type = TYPE_STRING;
if (*fmt && (!jcp->Fmt || strlen(jcp->Fmt) < strlen(fmt))) {
jcp->Fmt = PlugDup(g, fmt);
length[7] = MY_MAX(length[7], strlen(fmt));
} // endif *fmt
jcp->Len = MY_MAX(jcp->Len, jcol.Len);
jcp->Scale = MY_MAX(jcp->Scale, jcol.Scale);
jcp->Cbn |= jcol.Cbn;
jcp->Found = true;
} else {
// New column
jcp = (PJCL)PlugSubAlloc(g, NULL, sizeof(JCOL));
*jcp = jcol;
jcp->Cbn |= (i > 1);
jcp->Name = PlugDup(g, colname);
length[0] = MY_MAX(length[0], strlen(colname));
if (*fmt) {
jcp->Fmt = PlugDup(g, fmt);
length[7] = MY_MAX(length[7], strlen(fmt));
} else
jcp->Fmt = NULL;
if (pjcp) {
jcp->Next = pjcp->Next;
pjcp->Next = jcp;
} else
fjcp = jcp;
n++;
} // endif jcp
pjcp = jcp;
for (j = lvl - 1; j >= 0; j--)
if (jrp[j] && (jrp[j] = jrp[j]->GetNext()))
goto more;
} // endfor jpp
// Missing column can be null
for (jcp = fjcp; jcp; jcp = jcp->Next) {
jcp->Cbn |= !jcp->Found;
jcp->Found = false;
} // endfor jcp
if (tdp->Pretty != 2) {
// Read next record
switch (tjnp->ReadDB(g)) {
case RC_EF:
jsp = NULL;
break;
case RC_FX:
goto err;
default:
jsp = tjnp->GetRow();
} // endswitch ReadDB
} else
jsp = tjsp->GetDoc()->GetValue(i);
if (!(row = (jsp) ? jsp->GetObject() : NULL))
break;
} // endor i
if (tdp->Pretty != 2)
tjnp->CloseDB(g);
if ((n = mgd->GetColumns(g, db, dsn, topt)) < 0)
goto err;
skipit:
if (trace)
......@@ -353,7 +79,7 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info)
crp = qrp->Colresp->Next->Next->Next->Next->Next->Next;
crp->Name = "Nullable";
crp->Next->Name = "Jpath";
crp->Next->Name = "Bpath";
if (info || !qrp)
return qrp;
......@@ -363,43 +89,291 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info)
/*********************************************************************/
/* Now get the results into blocks. */
/*********************************************************************/
for (i = 0, jcp = fjcp; jcp; i++, jcp = jcp->Next) {
if (jcp->Type == TYPE_UNKNOWN) // Void column
jcp->Type = TYPE_STRING;
for (i = 0, bcp = mgd->fbcp; bcp; i++, bcp = bcp->Next) {
if (bcp->Type == TYPE_UNKNOWN) // Void column
bcp->Type = TYPE_STRING;
crp = qrp->Colresp; // Column Name
crp->Kdata->SetValue(jcp->Name, i);
crp->Kdata->SetValue(bcp->Name, i);
crp = crp->Next; // Data Type
crp->Kdata->SetValue(jcp->Type, i);
crp->Kdata->SetValue(bcp->Type, i);
crp = crp->Next; // Type Name
crp->Kdata->SetValue(GetTypeName(jcp->Type), i);
crp->Kdata->SetValue(GetTypeName(bcp->Type), i);
crp = crp->Next; // Precision
crp->Kdata->SetValue(jcp->Len, i);
crp->Kdata->SetValue(bcp->Len, i);
crp = crp->Next; // Length
crp->Kdata->SetValue(jcp->Len, i);
crp->Kdata->SetValue(bcp->Len, i);
crp = crp->Next; // Scale (precision)
crp->Kdata->SetValue(jcp->Scale, i);
crp->Kdata->SetValue(bcp->Scale, i);
crp = crp->Next; // Nullable
crp->Kdata->SetValue(jcp->Cbn ? 1 : 0, i);
crp->Kdata->SetValue(bcp->Cbn ? 1 : 0, i);
crp = crp->Next; // Field format
if (crp->Kdata)
crp->Kdata->SetValue(jcp->Fmt, i);
crp->Kdata->SetValue(bcp->Fmt, i);
} // endfor i
/*********************************************************************/
/* Return the result pointer. */
/*********************************************************************/
/*********************************************************************/
/* Return the result pointer. */
/*********************************************************************/
return qrp;
err:
if (tdp->Pretty != 2)
tjnp->CloseDB(g);
if (mgd->tmgp)
mgd->tmgp->CloseDB(g);
return NULL;
} // end of JSONColumns
#endif // 0
} // end of MGOColumns
/***********************************************************************/
/* Class used to get the columns of a mongo collection. */
/***********************************************************************/
// Constructor
MGODISC::MGODISC(PGLOBAL g, int *lg) {
length = lg;
fbcp = NULL;
pbcp = NULL;
tmgp = NULL;
n = k = lvl = 0;
all = false;
} // end of MGODISC constructor
/***********************************************************************/
/* Class used to get the columns of a mongo collection. */
/***********************************************************************/
int MGODISC::GetColumns(PGLOBAL g, char *db, char *dsn, PTOS topt)
{
bson_iter_t iter;
const bson_t *doc;
PMGODEF tdp;
TDBMGO *tmgp = NULL;
lvl = GetIntegerTableOption(g, topt, "Level", 0);
lvl = (lvl > 16) ? 16 : lvl;
all = GetBooleanTableOption(g, topt, "Fullarray", false);
if (!dsn) {
strcpy(g->Message, "Missing URI");
return -1;
} // endif dsn
/*********************************************************************/
/* Open the MongoDB collection. */
/*********************************************************************/
tdp = new(g) MGODEF;
tdp->Uri = dsn;
tdp->Tabname = GetStringTableOption(g, topt, "Name", NULL);
tdp->Tabname = GetStringTableOption(g, topt, "Tabname", tdp->Tabname);
tdp->Tabschema = GetStringTableOption(g, topt, "Dbname", db);
tdp->Base = GetIntegerTableOption(g, topt, "Base", 0) ? 1 : 0;
tdp->Colist = GetStringTableOption(g, topt, "Colist", NULL);
tdp->Pipe = GetBooleanTableOption(g, topt, "Pipeline", false);
if (tdp->Colist) {
char *p = (char*)strchr(tdp->Colist, ';');
if (p) {
*p++ = 0;
tdp->Filter = *p ? p : NULL;
} // endif p
} // endif Colist
if (trace)
htrc("Uri %s coll=%s db=%s colist=%s filter=%s lvl=%d\n",
tdp->Uri, tdp->Tabname, tdp->Tabschema, tdp->Colist, tdp->Filter, lvl);
tmgp = new(g) TDBMGO(tdp);
tmgp->SetMode(MODE_READ);
if (tmgp->OpenDB(g))
return -1;
bcol.Next = NULL;
bcol.Name = bcol.Fmt = NULL;
bcol.Type = TYPE_UNKNOWN;
bcol.Len = bcol.Scale = 0;
bcol.Found = true;
bcol.Cbn = false;
/*********************************************************************/
/* Analyse the BSON tree and define columns. */
/*********************************************************************/
for (int i = 1; ; i++) {
switch (tmgp->ReadDB(g)) {
case RC_EF:
return n;
case RC_FX:
return -1;
default:
doc = tmgp->Document;
} // endswitch ReadDB
if (FindInDoc(g, &iter, doc, NULL, NULL, i, k, false))
return -1;
// Missing columns can be null
for (bcp = fbcp; bcp; bcp = bcp->Next) {
bcp->Cbn |= !bcp->Found;
bcp->Found = false;
} // endfor bcp
} // endfor i
return n;
} // end of GetColumns
/*********************************************************************/
/* Analyse passed document. */
/*********************************************************************/
bool MGODISC::FindInDoc(PGLOBAL g, bson_iter_t *iter, const bson_t *doc,
char *pcn, char *pfmt, int i, int k, bool b)
{
if (!doc || bson_iter_init(iter, doc)) {
const char *key;
char colname[65];
char fmt[129];
bool newcol;
while (bson_iter_next(iter)) {
key = bson_iter_key(iter);
newcol = true;
if (pcn) {
strncpy(colname, pcn, 64);
colname[64] = 0;
strncat(strncat(colname, "_", 65), key, 65);
} else
strcpy(colname, key);
if (pfmt) {
strncpy(fmt, pfmt, 128);
fmt[128] = 0;
strncat(strncat(fmt, ".", 129), key, 129);
} else
strcpy(fmt, key);
bcol.Cbn = false;
if (BSON_ITER_HOLDS_UTF8(iter)) {
bcol.Type = TYPE_STRING;
bcol.Len = strlen(bson_iter_utf8(iter, NULL));
} else if (BSON_ITER_HOLDS_INT32(iter)) {
bcol.Type = TYPE_INT;
bcol.Len = 11; // bson_iter_int32(iter)
} else if (BSON_ITER_HOLDS_INT64(iter)) {
bcol.Type = TYPE_BIGINT;
bcol.Len = 22; // bson_iter_int64(iter)
} else if (BSON_ITER_HOLDS_DOUBLE(iter)) {
bcol.Type = TYPE_DOUBLE;
bcol.Len = 12;
bcol.Scale = 6; // bson_iter_double(iter)
} else if (BSON_ITER_HOLDS_DATE_TIME(iter)) {
bcol.Type = TYPE_DATE;
bcol.Len = 19; // bson_iter_date_time(iter)
} else if (BSON_ITER_HOLDS_BOOL(iter)) {
bcol.Type = TYPE_TINY;
bcol.Len = 1;
} else if (BSON_ITER_HOLDS_OID(iter)) {
bcol.Type = TYPE_STRING;
bcol.Len = 24; // bson_iter_oid(iter)
} else if (BSON_ITER_HOLDS_DECIMAL128(iter)) {
bcol.Type = TYPE_DECIM;
bcol.Len = 32; // bson_iter_decimal128(iter, &dec)
} else if (BSON_ITER_HOLDS_DOCUMENT(iter)) {
if (lvl < 0)
continue;
else if (lvl <= k) {
bcol.Type = TYPE_STRING;
bcol.Len = 512;
} else {
bson_iter_t child;
if (bson_iter_recurse(iter, &child))
if (FindInDoc(g, &child, NULL, colname, fmt, i, k + 1, false))
return true;
newcol = false;
} // endif lvl
} else if (BSON_ITER_HOLDS_ARRAY(iter)) {
if (lvl < 0)
continue;
else if (lvl <= k) {
bcol.Type = TYPE_STRING;
bcol.Len = 512;
} else {
bson_t *arr;
bson_iter_t itar;
const uint8_t *data = NULL;
uint32_t len = 0;
bson_iter_array(iter, &len, &data);
arr = bson_new_from_data(data, len);
if (FindInDoc(g, &itar, arr, colname, fmt, i, k + 1, !all))
return true;
newcol = false;
} // endif lvl
} // endif's
if (newcol) {
// Check whether this column was already found
for (bcp = fbcp; bcp; bcp = bcp->Next)
if (!strcmp(colname, bcp->Name))
break;
if (bcp) {
if (bcp->Type != bcol.Type)
bcp->Type = TYPE_STRING;
if (k && *fmt && (!bcp->Fmt || strlen(bcp->Fmt) < strlen(fmt))) {
bcp->Fmt = PlugDup(g, fmt);
length[7] = MY_MAX(length[7], strlen(fmt));
} // endif *fmt
bcp->Len = MY_MAX(bcp->Len, bcol.Len);
bcp->Scale = MY_MAX(bcp->Scale, bcol.Scale);
bcp->Cbn |= bcol.Cbn;
bcp->Found = true;
} else {
// New column
bcp = (PBCOL)PlugSubAlloc(g, NULL, sizeof(BCOL));
*bcp = bcol;
bcp->Cbn |= (i > 1);
bcp->Name = PlugDup(g, colname);
length[0] = MY_MAX(length[0], strlen(colname));
if (k) {
bcp->Fmt = PlugDup(g, fmt);
length[7] = MY_MAX(length[7], strlen(fmt));
} else
bcp->Fmt = NULL;
if (pbcp) {
bcp->Next = pbcp->Next;
pbcp->Next = bcp;
} else
fbcp = bcp;
n++;
} // endif jcp
pbcp = bcp;
} // endif newcol
if (b)
break; // Test only first element of arrays
} // endwhile iter
} // endif doc
return false;
} // end of FindInDoc
/* -------------------------- Class MGODEF --------------------------- */
......@@ -410,6 +384,7 @@ MGODEF::MGODEF(void)
Filter = NULL;
Level = 0;
Base = 0;
Pipe = false;
} // end of MGODEF constructor
/***********************************************************************/
......@@ -426,6 +401,7 @@ bool MGODEF::DefineAM(PGLOBAL g, LPCSTR, int poff)
Colist = GetStringCatInfo(g, "Colist", NULL);
Filter = GetStringCatInfo(g, "Filter", NULL);
Base = GetIntCatInfo("Base", 0) ? 1 : 0;
Pipe = GetBoolCatInfo("Pipeline", false);
return false;
} // end of DefineAM
......@@ -440,6 +416,64 @@ PTDB MGODEF::GetTable(PGLOBAL g, MODE m)
return new(g) TDBMGO(this);
} // end of GetTable
/* --------------------------- Class INCOL --------------------------- */
/***********************************************************************/
/* Add a column in the column list. */
/***********************************************************************/
void INCOL::AddCol(PGLOBAL g, PCOL colp, char *jp)
{
char *p;
PKC kp, kcp;
if ((p = strchr(jp, '.'))) {
PINCOL icp;
*p = 0;
for (kp = Klist; kp; kp = kp->Next)
if (kp->Incolp && !strcmp(jp, kp->Key))
break;
if (!kp) {
icp = new(g) INCOL;
kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL));
kcp->Next = NULL;
kcp->Incolp = icp;
kcp->Colp = NULL;
kcp->Key = PlugDup(g, jp);
if (Klist) {
for (kp = Klist; kp->Next; kp = kp->Next);
kp->Next = kcp;
} else
Klist = kcp;
} else
icp = kp->Incolp;
*p = '.';
icp->AddCol(g, colp, p + 1);
} else {
kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL));
kcp->Next = NULL;
kcp->Incolp = NULL;
kcp->Colp = colp;
kcp->Key = jp;
if (Klist) {
for (kp = Klist; kp->Next; kp = kp->Next);
kp->Next = kcp;
} else
Klist = kcp;
} // endif jp
} // end of AddCol
/* --------------------------- Class TDBMGO -------------------------- */
/***********************************************************************/
......@@ -447,12 +481,15 @@ PTDB MGODEF::GetTable(PGLOBAL g, MODE m)
/***********************************************************************/
TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp)
{
Uri = NULL;
Pool = NULL;
Client = NULL;
Database = NULL;
Collection = NULL;
Cursor = NULL;
Query = NULL;
Opts = NULL;
Fpc = NULL;
if (tdp) {
Uristr = tdp->Uri;
......@@ -461,6 +498,7 @@ TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp)
Options = tdp->Colist;
Filter = tdp->Filter;
B = tdp->Base ? 1 : 0;
Pipe = tdp->Pipe && Options != NULL;
} else {
Uristr = NULL;
Db_name = NULL;
......@@ -468,6 +506,7 @@ TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp)
Options = NULL;
Filter = NULL;
B = 0;
Pipe = false;
} // endif tdp
Fpos = -1;
......@@ -477,21 +516,28 @@ TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp)
TDBMGO::TDBMGO(TDBMGO *tdbp) : TDBEXT(tdbp)
{
Uri = tdbp->Uri;
Pool = tdbp->Pool;
Client = tdbp->Client;
Database = NULL;
Collection = tdbp->Collection;
Cursor = tdbp->Cursor;
Query = tdbp->Query;
Opts = tdbp->Opts;
Fpc = tdbp->Fpc;
Uristr = tdbp->Uristr;
Db_name = tdbp->Db_name;;
Coll_name = tdbp->Coll_name;
Options = tdbp->Options;
Filter = tdbp->Filter;
B = tdbp->B;
Fpos = tdbp->Fpos;
N = tdbp->N;
B = tdbp->B;
Done = tdbp->Done;
Pipe = tdbp->Pipe;
} // end of TDBMGO copy constructor
// Used for update
// Used for update
PTDB TDBMGO::Clone(PTABS t)
{
PTDB tp;
......@@ -500,10 +546,11 @@ PTDB TDBMGO::Clone(PTABS t)
tp = new(g) TDBMGO(this);
for (cp1 = (PMGOCOL)Columns; cp1; cp1 = (PMGOCOL)cp1->GetNext()) {
cp2 = new(g) MGOCOL(cp1, tp); // Make a copy
NewPointer(t, cp1, cp2);
} // endfor cp1
for (cp1 = (PMGOCOL)Columns; cp1; cp1 = (PMGOCOL)cp1->GetNext())
if (!cp1->IsSpecial()) {
cp2 = new(g) MGOCOL(cp1, tp); // Make a copy
NewPointer(t, cp1, cp2);
} // endif cp1
return tp;
} // end of Clone
......@@ -515,8 +562,9 @@ PCOL TDBMGO::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
{
PMGOCOL colp = new(g) MGOCOL(g, cdp, this, cprec, n);
//return (colp->ParseJpath(g)) ? NULL : colp;
colp->Mbuf = (char*)PlugSubAlloc(g, NULL, colp->Long + 1);
return colp;
//return (colp->ParseJpath(g)) ? NULL : colp;
} // end of MakeCol
/***********************************************************************/
......@@ -564,32 +612,114 @@ bool TDBMGO::Init(PGLOBAL g)
if (Done)
return false;
if (Filter && *Filter) {
if (Options && !Pipe) {
char *p = (char*)strchr(Options, ';');
if (p) {
*p++ = 0;
if (p)
Filter = p;
} // endif p
if (*Options) {
if (trace)
htrc("options=%s\n", Options);
Opts = bson_new_from_json((const uint8_t *)Options, -1, &Error);
if (!Opts) {
sprintf(g->Message, "Wrong options: %s", Error.message);
return true;
} // endif Opts
} // endif *Options
} // endif Options
if (Pipe) {
const char *p;
if (trace)
htrc("filter=%s\n", Filter);
htrc("Pipeline: %s\n", Options);
if (To_Filter) {
p = strrchr(Options, ']');
if (!p) {
strcpy(g->Message, "Missing ] in pipeline");
return true;
} else
*(char*)p = 0;
PSTRG s = new(g) STRING(g, 1023, (PSZ)Options);
s->Append(",{\"$match\":");
Query = bson_new_from_json((const uint8_t *)Filter, -1, &Error);
if (To_Filter->MakeSelector(g, s)) {
strcpy(g->Message, "Failed making selector");
return true;
} // endif Selector
s->Append("}]}");
s->Resize(s->GetLength() + 1);
p = s->GetStr();
if (trace)
htrc("New Pipeline: %s\n", p);
To_Filter = NULL; // Not needed anymore
} else
p = Options;
Query = bson_new_from_json((const uint8_t *)p, -1, &Error);
if (!Query) {
sprintf(g->Message, "Wrong filter: %s", Error.message);
sprintf(g->Message, "Wrong pipeline: %s", Error.message);
return true;
} // endif Query
} else
Query = bson_new();
} else if (Filter || To_Filter) {
if (trace) {
if (Filter)
htrc("Filter: %s\n", Filter);
if (To_Filter) {
char buf[512];
To_Filter->Print(g, buf, 511);
htrc("To_Filter: %s\n", buf);
} // endif To_Filter
} // endif trace
PSTRG s = new(g) STRING(g, 1023, (PSZ)Filter);
if (To_Filter) {
if (Filter)
s->Append(',');
if (To_Filter->MakeSelector(g, s)) {
strcpy(g->Message, "Failed making selector");
return true;
} // endif Selector
s->Resize(s->GetLength() + 1);
To_Filter = NULL; // Not needed anymore
} // endif To_Filter
if (Options && *Options) {
if (trace)
htrc("options=%s\n", Options);
htrc("selector: %s\n", s->GetStr());
Opts = bson_new_from_json((const uint8_t *)Options, -1, &Error);
Query = bson_new_from_json((const uint8_t *)s->GetStr(), -1, &Error);
if (!Opts) {
sprintf(g->Message, "Wrong options: %s", Error.message);
if (!Query) {
sprintf(g->Message, "Wrong filter: %s", Error.message);
return true;
} // endif Opts
} // endif Query
} // endif options
} else
Query = bson_new();
Uri = mongoc_uri_new(Uristr);
......@@ -649,9 +779,34 @@ bool TDBMGO::OpenDB(PGLOBAL g)
/*******************************************************************/
/* First opening. */
/*******************************************************************/
if (Pipe && Mode != MODE_READ) {
strcpy(g->Message, "Pipeline tables are read only");
return true;
} // endif Pipe
if (Init(g))
return true;
else if (Mode != MODE_INSERT)
if (Mode == MODE_DELETE && !Next) {
// Delete all documents
if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE,
Query, NULL, &Error)) {
sprintf(g->Message, "Mongo remove all: %s", Error.message);
return true;
} // endif remove
} else if (Mode == MODE_INSERT)
MakeColumnGroups(g);
else if (Pipe) {
Cursor = mongoc_collection_aggregate(Collection, MONGOC_QUERY_NONE,
Query, NULL, NULL);
if (mongoc_cursor_error(Cursor, &Error)) {
sprintf(g->Message, "Mongo aggregate Failure: %s", Error.message);
return true;
} // endif cursor
} else
Cursor = mongoc_collection_find_with_opts(Collection, Query, Opts, NULL);
} // endif Use
......@@ -754,13 +909,120 @@ void TDBMGO::ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k)
} // end of ShowDocument
/***********************************************************************/
/* Group columns for inserting or updating. */
/***********************************************************************/
void TDBMGO::MakeColumnGroups(PGLOBAL g)
{
Fpc = new(g) INCOL;
for (PCOL colp = Columns; colp; colp = colp->GetNext())
if (!colp->IsSpecial())
Fpc->AddCol(g, colp, ((PMGOCOL)colp)->Jpath);
} // end of MakeColumnGroups
/***********************************************************************/
/* DocWrite. */
/***********************************************************************/
bool TDBMGO::DocWrite(PGLOBAL g, PINCOL icp)
{
for (PKC kp = icp->Klist; kp; kp = kp->Next)
if (kp->Incolp) {
BSON_APPEND_DOCUMENT_BEGIN(&icp->Child, kp->Key, &kp->Incolp->Child);
if (DocWrite(g, kp->Incolp))
return true;
bson_append_document_end(&icp->Child, &kp->Incolp->Child);
} else if (((PMGOCOL)kp->Colp)->AddValue(g, &icp->Child, kp->Key, false))
return true;
return false;
} // end of DocWrite
/***********************************************************************/
/* WriteDB: Data Base write routine for DOS access method. */
/***********************************************************************/
int TDBMGO::WriteDB(PGLOBAL g)
{
strcpy(g->Message, "MONGO tables are read only");
return RC_FX;
int rc = RC_OK;
if (Mode == MODE_INSERT) {
bson_init(&Fpc->Child);
if (DocWrite(g, Fpc))
return RC_FX;
if (trace) {
char *str = bson_as_json(&Fpc->Child, NULL);
htrc("Inserting: %s\n", str);
bson_free(str);
} // endif trace
if (!mongoc_collection_insert(Collection, MONGOC_INSERT_NONE,
&Fpc->Child, NULL, &Error)) {
sprintf(g->Message, "Mongo insert: %s", Error.message);
rc = RC_FX;
} // endif insert
} else {
bool b = false;
bson_iter_t iter;
bson_t *query = bson_new();
bson_iter_init(&iter, Document);
if (bson_iter_find(&iter, "_id")) {
if (BSON_ITER_HOLDS_OID(&iter))
b = BSON_APPEND_OID(query, "_id", bson_iter_oid(&iter));
else if (BSON_ITER_HOLDS_INT32(&iter))
b = BSON_APPEND_INT32(query, "_id", bson_iter_int32(&iter));
else if (BSON_ITER_HOLDS_INT64(&iter))
b = BSON_APPEND_INT64(query, "_id", bson_iter_int64(&iter));
else if (BSON_ITER_HOLDS_DOUBLE(&iter))
b = BSON_APPEND_DOUBLE(query, "_id", bson_iter_double(&iter));
else if (BSON_ITER_HOLDS_UTF8(&iter))
b = BSON_APPEND_UTF8(query, "_id", bson_iter_utf8(&iter, NULL));
} // endif iter
if (b) {
if (Mode == MODE_UPDATE) {
bson_t child;
bson_t *update = bson_new();
BSON_APPEND_DOCUMENT_BEGIN(update, "$set", &child);
for (PCOL colp = To_SetCols; colp; colp = colp->GetNext())
if (((PMGOCOL)colp)->AddValue(g, &child, ((PMGOCOL)colp)->Jpath, true))
rc = RC_FX;
bson_append_document_end(update, &child);
if (rc == RC_OK)
if (!mongoc_collection_update(Collection, MONGOC_UPDATE_NONE,
query, update, NULL, &Error)) {
sprintf(g->Message, "Mongo update: %s", Error.message);
rc = RC_FX;
} // endif update
bson_destroy(update);
} else if (!mongoc_collection_remove(Collection,
MONGOC_REMOVE_SINGLE_REMOVE, query, NULL, &Error)) {
sprintf(g->Message, "Mongo delete: %s", Error.message);
rc = RC_FX;
} // endif remove
} else {
strcpy(g->Message, "Mongo update: cannot find _id");
rc = RC_FX;
} // endif b
bson_destroy(query);
} // endif Mode
return rc;
} // end of WriteDB
/***********************************************************************/
......@@ -768,8 +1030,7 @@ int TDBMGO::WriteDB(PGLOBAL g)
/***********************************************************************/
int TDBMGO::DeleteDB(PGLOBAL g, int irc)
{
strcpy(g->Message, "MONGO tables are read only");
return RC_FX;
return (irc == RC_OK) ? WriteDB(g) : RC_OK;
} // end of DeleteDB
/***********************************************************************/
......@@ -799,6 +1060,7 @@ MGOCOL::MGOCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i)
{
Tmgp = (PTDBMGO)(tdbp->GetOrig() ? tdbp->GetOrig() : tdbp);
Jpath = cdp->GetFmt() ? cdp->GetFmt() : cdp->GetName();
Mbuf = NULL;
} // end of MGOCOL constructor
/***********************************************************************/
......@@ -809,15 +1071,46 @@ MGOCOL::MGOCOL(MGOCOL *col1, PTDB tdbp) : EXTCOL(col1, tdbp)
{
Tmgp = col1->Tmgp;
Jpath = col1->Jpath;
Mbuf = col1->Mbuf;
} // end of MGOCOL copy constructor
/***********************************************************************/
/* SetBuffer: prepare a column block for write operation. */
/* Mini: used to suppress blanks to json strings. */
/***********************************************************************/
bool MGOCOL::SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check)
char *MGOCOL::Mini(PGLOBAL g, const bson_t *bson, bool b)
{
return false;
} // end of SetBuffer
char *s, *str = NULL;
int i, k = 0;
bool ok = true;
if (b)
s = str = bson_array_as_json(bson, NULL);
else
s = str = bson_as_json(bson, NULL);
for (i = 0; i < Long && s[i]; i++) {
switch (s[i]) {
case ' ':
if (ok) continue;
case '"':
ok = !ok;
default:
break;
} // endswitch s[i]
Mbuf[k++] = s[i];
} // endfor i
bson_free(str);
if (i >= Long) {
sprintf(g->Message, "Value too long for column %s", Name);
throw(TYPE_AM_MGO);
} // endif i
Mbuf[k] = 0;
return Mbuf;
} // end of Mini
/***********************************************************************/
/* ReadColumn: */
......@@ -825,7 +1118,9 @@ bool MGOCOL::SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check)
void MGOCOL::ReadColumn(PGLOBAL g)
{
if (bson_iter_init(&Iter, Tmgp->Document) &&
if (!strcmp(Jpath, "*")) {
Value->SetValue_psz(Mini(g, Tmgp->Document, false));
} else if (bson_iter_init(&Iter, Tmgp->Document) &&
bson_iter_find_descendant(&Iter, Jpath, &Desc)) {
if (BSON_ITER_HOLDS_UTF8(&Desc))
Value->SetValue_psz((PSZ)bson_iter_utf8(&Desc, NULL));
......@@ -837,7 +1132,15 @@ void MGOCOL::ReadColumn(PGLOBAL g)
Value->SetValue(bson_iter_double(&Desc));
else if (BSON_ITER_HOLDS_DATE_TIME(&Desc))
Value->SetValue(bson_iter_date_time(&Desc) / 1000);
else if (BSON_ITER_HOLDS_OID(&Desc)) {
else if (BSON_ITER_HOLDS_BOOL(&Desc)) {
bool b = bson_iter_bool(&Desc);
if (Value->IsTypeNum())
Value->SetValue(b ? 1 : 0);
else
Value->SetValue_psz(b ? "true" : "false");
} else if (BSON_ITER_HOLDS_OID(&Desc)) {
char str[25];
bson_oid_to_string(bson_iter_oid(&Desc), str);
......@@ -851,29 +1154,49 @@ void MGOCOL::ReadColumn(PGLOBAL g)
Value->SetValue_psz(str);
bson_free(str);
} else if (BSON_ITER_HOLDS_DOCUMENT(&Iter)) {
char *str = NULL;
bson_t *doc;
const uint8_t *data = NULL;
uint32_t len = 0;
bson_iter_document(&Desc, &len, &data);
doc = bson_new_from_data(data, len);
str = bson_as_json(doc, NULL);
Value->SetValue_psz(str);
bson_free(str);
bson_destroy(doc);
if (data) {
doc = bson_new_from_data(data, len);
Value->SetValue_psz(Mini(g, doc, false));
bson_destroy(doc);
} else
Value->Reset();
} else if (BSON_ITER_HOLDS_ARRAY(&Iter)) {
char *str = NULL;
bson_t *arr;
const uint8_t *data = NULL;
uint32_t len = 0;
bson_iter_array(&Desc, &len, &data);
arr = bson_new_from_data(data, len);
str = bson_as_json(arr, NULL);
Value->SetValue_psz(str);
bson_free(str);
bson_destroy(arr);
if (data) {
arr = bson_new_from_data(data, len);
Value->SetValue_psz(Mini(g, arr, true));
bson_destroy(arr);
} else {
// This is a bug in returning the wrong type
// This fix is only for document items
bson_t *doc;
bson_iter_document(&Desc, &len, &data);
if (data) {
doc = bson_new_from_data(data, len);
Value->SetValue_psz(Mini(g, doc, false));
bson_destroy(doc);
} else {
//strcpy(g->Message, "bson_iter_array failed (data is null)");
//throw(TYPE_AM_MGO);
Value->Reset();
} // endif data
} // endif data
} else
Value->Reset();
......@@ -891,10 +1214,65 @@ void MGOCOL::ReadColumn(PGLOBAL g)
/***********************************************************************/
void MGOCOL::WriteColumn(PGLOBAL g)
{
strcpy(g->Message, "Write MONGO columns not implemented yet");
throw 666;
// Check whether this node must be written
if (Value != To_Val)
Value->SetValue_pval(To_Val, FALSE); // Convert the updated value
} // end of WriteColumn
/***********************************************************************/
/* AddValue: Add column value to the document to insert or update. */
/***********************************************************************/
bool MGOCOL::AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd)
{
bool rc = false;
if (Value->IsNull()) {
if (upd)
rc = BSON_APPEND_NULL(doc, key);
else
return false;
} else switch (Buf_Type) {
case TYPE_STRING:
rc = BSON_APPEND_UTF8(doc, key, Value->GetCharValue());
break;
case TYPE_INT:
case TYPE_SHORT:
rc = BSON_APPEND_INT32(doc, key, Value->GetIntValue());
break;
case TYPE_TINY:
rc = BSON_APPEND_BOOL(doc, key, Value->GetIntValue());
break;
case TYPE_BIGINT:
rc = BSON_APPEND_INT64(doc, key, Value->GetBigintValue());
break;
case TYPE_DOUBLE:
rc = BSON_APPEND_DOUBLE(doc, key, Value->GetFloatValue());
break;
case TYPE_DECIM:
{bson_decimal128_t dec;
if (bson_decimal128_from_string(Value->GetCharValue(), &dec))
rc = BSON_APPEND_DECIMAL128(doc, key, &dec);
} break;
case TYPE_DATE:
rc = BSON_APPEND_DATE_TIME(doc, key, Value->GetBigintValue() * 1000);
break;
default:
sprintf(g->Message, "Type %d not supported yet", Buf_Type);
return true;
} // endswitch Buf_Type
if (!rc) {
strcpy(g->Message, "Adding value failed");
return true;
} else
return false;
} // end of AddValue
#if 0
/* ---------------------------TDBGOL class --------------------------- */
......
......@@ -19,6 +19,48 @@
typedef class MGODEF *PMGODEF;
typedef class TDBMGO *PTDBMGO;
typedef class MGOCOL *PMGOCOL;
typedef class INCOL *PINCOL;
typedef struct _bncol {
struct _bncol *Next;
char *Name;
char *Fmt;
int Type;
int Len;
int Scale;
bool Cbn;
bool Found;
} BCOL, *PBCOL;
typedef struct KEYCOL {
KEYCOL *Next;
PINCOL Incolp;
PCOL Colp;
char *Key;
} *PKC;
/***********************************************************************/
/* Class used to get the columns of a mongo collection. */
/***********************************************************************/
class MGODISC : public BLOCK {
public:
// Constructor
MGODISC(PGLOBAL g, int *lg);
// Functions
int GetColumns(PGLOBAL g, char *db, char *dsn, PTOS topt);
bool FindInDoc(PGLOBAL g, bson_iter_t *iter, const bson_t *doc,
char *pcn, char *pfmt, int i, int k, bool b);
// Members
BCOL bcol;
PBCOL bcp, fbcp, pbcp;
PMGODEF tdp;
TDBMGO *tmgp;
int *length;
int n, k, lvl;
bool all;
}; // end of MGODISC
/***********************************************************************/
/* MongoDB table. */
......@@ -26,6 +68,8 @@ typedef class MGOCOL *PMGOCOL;
class DllExport MGODEF : public EXTDEF { /* Table description */
friend class TDBMGO;
friend class MGOFAM;
friend class MGODISC;
friend PQRYRES MGOColumns(PGLOBAL, char *, char *, PTOS, bool);
public:
// Constructor
MGODEF(void);
......@@ -44,9 +88,26 @@ class DllExport MGODEF : public EXTDEF { /* Table description */
char *Filter; /* Filtering query */
int Level; /* Used for catalog table */
int Base; /* The array index base */
bool Pipe; /* True is Colist is a pipeline */
}; // end of MGODEF
/* -------------------------- TDBMGO class --------------------------- */
/* ------------------------- TDBMGO classes -------------------------- */
/***********************************************************************/
/* Used when inserting values in a MongoDB collection. */
/***********************************************************************/
class INCOL : public BLOCK {
public:
// Constructor
INCOL(void) { Klist = NULL; }
// Methods
void AddCol(PGLOBAL g, PCOL colp, char *jp);
//Members
bson_t Child;
PKC Klist;
}; // end of INCOL;
/***********************************************************************/
/* This is the MongoDB Access Method class declaration. */
......@@ -55,6 +116,8 @@ class DllExport MGODEF : public EXTDEF { /* Table description */
class DllExport TDBMGO : public TDBEXT {
friend class MGOCOL;
friend class MGODEF;
friend class MGODISC;
friend PQRYRES MGOColumns(PGLOBAL, char *, char *, PTOS, bool);
public:
// Constructor
TDBMGO(PMGODEF tdp);
......@@ -83,6 +146,8 @@ class DllExport TDBMGO : public TDBEXT {
protected:
bool Init(PGLOBAL g);
void ShowDocument(bson_iter_t *i, const bson_t *b, const char *k);
void MakeColumnGroups(PGLOBAL g);
bool DocWrite(PGLOBAL g, PINCOL icp);
// Members
mongoc_uri_t *Uri;
......@@ -95,6 +160,7 @@ class DllExport TDBMGO : public TDBEXT {
bson_t *Query; // MongoDB cursor filter
bson_t *Opts; // MongoDB cursor options
bson_error_t Error;
PINCOL Fpc; // To insert INCOL classes
const char *Uristr;
const char *Db_name;
const char *Coll_name;
......@@ -104,6 +170,7 @@ class DllExport TDBMGO : public TDBEXT {
int N; // The current Rownum
int B; // Array index base
bool Done; // Init done
bool Pipe; // True for pipeline
}; // end of class TDBMGO
/* --------------------------- MGOCOL class -------------------------- */
......@@ -113,6 +180,7 @@ class DllExport TDBMGO : public TDBEXT {
/***********************************************************************/
class DllExport MGOCOL : public EXTCOL {
friend class TDBMGO;
friend class FILTER;
public:
// Constructors
MGOCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i);
......@@ -122,19 +190,22 @@ class DllExport MGOCOL : public EXTCOL {
virtual int GetAmType(void) { return Tmgp->GetAmType(); }
// Methods
virtual bool SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check);
//virtual bool SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check);
virtual void ReadColumn(PGLOBAL g);
virtual void WriteColumn(PGLOBAL g);
bool AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd);
protected:
// Default constructor not to be used
MGOCOL(void) {}
char *Mini(PGLOBAL g, const bson_t *bson, bool b);
// Members
TDBMGO *Tmgp; // To the MGO table block
bson_iter_t Iter; // Used to retrieve column value
bson_iter_t Desc; // Descendant iter
char *Jpath; // The json path
char *Mbuf; // The Mini buffer
}; // end of class MGOCOL
#if 0
......
......@@ -1096,7 +1096,7 @@ bool TDBMYSQL::ReadKey(PGLOBAL g, OPVAL op, const key_range *kr)
To_CondFil->Body= (char*)PlugSubAlloc(g, NULL, 0);
*To_CondFil->Body= 0;
if ((To_CondFil = hc->CheckCond(g, To_CondFil, To_CondFil->Cond)))
if ((To_CondFil = hc->CheckCond(g, To_CondFil, Cond)))
PlugSubAlloc(g, NULL, strlen(To_CondFil->Body) + 1);
} // endif active_index
......
......@@ -733,7 +733,7 @@ bool TDBODBC::ReadKey(PGLOBAL g, OPVAL op, const key_range *kr)
To_CondFil->Body= (char*)PlugSubAlloc(g, NULL, 0);
*To_CondFil->Body= 0;
if ((To_CondFil = hc->CheckCond(g, To_CondFil, To_CondFil->Cond)))
if ((To_CondFil = hc->CheckCond(g, To_CondFil, Cond)))
PlugSubAlloc(g, NULL, strlen(To_CondFil->Body) + 1);
} // endif active_index
......
......@@ -558,6 +558,38 @@ bool VALUE::Compute(PGLOBAL g, PVAL *, int, OPVAL)
return true;
} // end of Compute
/***********************************************************************/
/* Make file output of an object value. */
/***********************************************************************/
void VALUE::Print(PGLOBAL g, FILE *f, uint n)
{
char m[64], buf[64];
memset(m, ' ', n); /* Make margin string */
m[n] = '\0';
if (Null)
fprintf(f, "%s<null>\n", m);
else
fprintf(f, strcat(strcat(GetCharString(buf), "\n"), m));
} /* end of Print */
/***********************************************************************/
/* Make string output of an object value. */
/***********************************************************************/
void VALUE::Print(PGLOBAL g, char *ps, uint z)
{
char *p, buf[64];
if (Null)
p = strcpy(buf, "<null>");
else
p = GetCharString(buf);
strncpy(ps, p, z);
} // end of Print
/* -------------------------- Class TYPVAL ---------------------------- */
/***********************************************************************/
......@@ -1192,37 +1224,6 @@ bool TYPVAL<TYPE>::SetConstFormat(PGLOBAL g, FORMAT& fmt)
return false;
} // end of SetConstFormat
/***********************************************************************/
/* Make file output of a typed object. */
/***********************************************************************/
template <class TYPE>
void TYPVAL<TYPE>::Print(PGLOBAL g, FILE *f, uint n)
{
char m[64], buf[12];
memset(m, ' ', n); /* Make margin string */
m[n] = '\0';
if (Null)
fprintf(f, "%s<null>\n", m);
else
fprintf(f, strcat(strcat(strcpy(buf, "%s"), Fmt), "\n"), m, Tval);
} /* end of Print */
/***********************************************************************/
/* Make string output of a int object. */
/***********************************************************************/
template <class TYPE>
void TYPVAL<TYPE>::Print(PGLOBAL g, char *ps, uint z)
{
if (Null)
strcpy(ps, "<null>");
else
sprintf(ps, Fmt, Tval);
} /* end of Print */
/* -------------------------- Class STRING --------------------------- */
/***********************************************************************/
......@@ -1708,6 +1709,18 @@ bool TYPVAL<PSZ>::SetConstFormat(PGLOBAL, FORMAT& fmt)
return false;
} // end of SetConstFormat
/***********************************************************************/
/* Make string output of an object value. */
/***********************************************************************/
void TYPVAL<PSZ>::Print(PGLOBAL g, char *ps, uint z)
{
if (Null)
strncpy(ps, "null", z);
else
strcat(strncat(strncpy(ps, "\"", z), Strp, z-2), "\"");
} // end of Print
/* -------------------------- Class DECIMAL -------------------------- */
/***********************************************************************/
......
......@@ -122,6 +122,8 @@ class DllExport VALUE : public BLOCK {
virtual bool IsEqual(PVAL vp, bool chktype) = 0;
virtual bool Compute(PGLOBAL g, PVAL *vp, int np, OPVAL op);
virtual bool FormatValue(PVAL vp, char *fmt) = 0;
virtual void Print(PGLOBAL g, FILE *, uint);
virtual void Print(PGLOBAL g, char *ps, uint z);
/**
Set value from a non-aligned in-memory value in the machine byte order.
......@@ -233,8 +235,6 @@ class DllExport TYPVAL : public VALUE {
virtual bool Compute(PGLOBAL g, PVAL *vp, int np, OPVAL op);
virtual bool SetConstFormat(PGLOBAL, FORMAT&);
virtual bool FormatValue(PVAL vp, char *fmt);
virtual void Print(PGLOBAL g, FILE *, uint);
virtual void Print(PGLOBAL g, char *, uint);
protected:
static TYPE MinMaxVal(bool b);
......@@ -308,6 +308,7 @@ class DllExport TYPVAL<PSZ>: public VALUE {
virtual bool Compute(PGLOBAL g, PVAL *vp, int np, OPVAL op);
virtual bool FormatValue(PVAL vp, char *fmt);
virtual bool SetConstFormat(PGLOBAL, FORMAT&);
virtual void Print(PGLOBAL g, char *ps, uint z);
protected:
// Members
......
......@@ -33,29 +33,6 @@ class CMD : public BLOCK {
char *Cmd;
}; // end of class CMD
#if 0
// Condition filter structure
class CONDFIL : public BLOCK {
public:
// Constructor
CONDFIL(const Item *cond, uint idx, AMT type)
{
Cond = cond; Idx = idx; Type = type; Op = OP_XX;
Cmds = NULL; All = true; Body = NULL, Having = NULL;
}
// Members
const Item *Cond;
AMT Type;
uint Idx;
OPVAL Op;
PCMD Cmds;
bool All;
char *Body;
char *Having;
}; // end of class CONDFIL
#endif // 0
typedef class EXTCOL *PEXTCOL;
typedef class CONDFIL *PCFIL;
typedef class TDBCAT *PTDBCAT;
......@@ -94,6 +71,8 @@ class DllExport TDB: public BLOCK { // Table Descriptor Block.
inline void SetColumns(PCOL colp) {Columns = colp;}
inline void SetDegree(int degree) {Degree = degree;}
inline void SetMode(MODE mode) {Mode = mode;}
inline const Item *GetCond(void) {return Cond;}
inline void SetCond(const Item *cond) {Cond = cond;}
// Properties
virtual AMT GetAmType(void) {return TYPE_AM_ERROR;}
......@@ -157,6 +136,7 @@ class DllExport TDB: public BLOCK { // Table Descriptor Block.
TUSE Use;
PFIL To_Filter;
PCFIL To_CondFil; // To condition filter structure
const Item *Cond; // The condition used to make filters
static int Tnum; // Used to generate Tdb_no's
const int Tdb_No; // GetTdb_No() is always 0 for OPJOIN
PTDB Next; // Next in linearized queries
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment