Commit d67ad26b authored by Olivier Bertrand's avatar Olivier Bertrand

- Adding files needed for block indexing

added:
  storage/connect/array.cpp
  storage/connect/array.h
  storage/connect/blkfil.cpp
  storage/connect/blkfil.h
  storage/connect/filter.cpp
  storage/connect/filter.h
parent 85e8aee4
/************* Array C++ Functions Source Code File (.CPP) *************/
/* Name: ARRAY.CPP Version 2.3 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2005-2014 */
/* */
/* This file contains the XOBJECT derived class ARRAY functions. */
/* ARRAY is used for elaborate type of processing, such as sorting */
/* and dichotomic search (Find). This new version does not use sub */
/* classes anymore for the different types but relies entirely on the */
/* functionalities provided by the VALUE and VALBLK classes. */
/* Currently the only supported types are STRING, SHORT, int, DATE, */
/* TOKEN, DOUBLE, and Compressed Strings. */
/***********************************************************************/
/***********************************************************************/
/* Include relevant MariaDB header file. */
/***********************************************************************/
#include "my_global.h"
#include "sql_class.h"
//#include "sql_time.h"
#if defined(WIN32)
//#include <windows.h>
#else // !WIN32
#include <string.h>
#endif // !WIN32
/***********************************************************************/
/* Include required application header files */
/* global.h is header containing all global Plug declarations. */
/* plgdbsem.h is header containing the DB applic. declarations. */
/* xobject.h is header containing XOBJECT derived classes declares. */
/***********************************************************************/
#include "global.h"
#include "plgdbsem.h"
#include "xtable.h"
#include "array.h"
//#include "select.h"
//#include "query.h"
//#include "token.h"
/***********************************************************************/
/* Macro definitions. */
/***********************************************************************/
#if defined(_DEBUG)
#define ASSERT(B) assert(B);
#else
#define ASSERT(B)
#endif
/***********************************************************************/
/* Static variables. */
/***********************************************************************/
extern "C" int trace;
/***********************************************************************/
/* DB static external variables. */
/***********************************************************************/
extern MBLOCK Nmblk; /* Used to initialize MBLOCK's */
/***********************************************************************/
/* External functions. */
/***********************************************************************/
BYTE OpBmp(PGLOBAL g, OPVAL opc);
void EncodeValue(int *lp, char *strp, int n);
/***********************************************************************/
/* MakeValueArray: Makes a value array from a value list. */
/***********************************************************************/
PARRAY MakeValueArray(PGLOBAL g, PPARM pp)
{
int n, valtyp = 0;
size_t len = 0;
PARRAY par;
PPARM parmp;
if (!pp)
return NULL;
/*********************************************************************/
/* New version with values coming in a list. */
/*********************************************************************/
if ((valtyp = pp->Type) != TYPE_STRING)
len = 1;
if (trace)
htrc("valtyp=%d len=%d\n", valtyp, len);
/*********************************************************************/
/* Firstly check the list and count the number of values in it. */
/*********************************************************************/
for (n = 0, parmp = pp; parmp; n++, parmp = parmp->Next)
if (parmp->Type != valtyp) {
sprintf(g->Message, MSG(BAD_PARAM_TYPE), "MakeValueArray", parmp->Type);
return NULL;
} else if (valtyp == TYPE_STRING)
len = max(len, strlen((char*)parmp->Value));
/*********************************************************************/
/* Make an array object with one block of the proper size. */
/*********************************************************************/
par = new(g) ARRAY(g, valtyp, n, (int)len);
if (par->GetResultType() == TYPE_ERROR)
return NULL; // Memory allocation error in ARRAY
/*********************************************************************/
/* All is right now, fill the array block. */
/*********************************************************************/
for (parmp = pp; parmp; parmp = parmp->Next)
switch (valtyp) {
case TYPE_STRING:
par->AddValue(g, (PSZ)parmp->Value);
break;
case TYPE_SHORT:
par->AddValue(g, *(SHORT*)parmp->Value);
break;
case TYPE_INT:
par->AddValue(g, *(int*)parmp->Value);
break;
case TYPE_DOUBLE:
par->AddValue(g, *(double*)parmp->Value);
break;
} // endswitch valtyp
/*********************************************************************/
/* Send back resulting array. */
/*********************************************************************/
return par;
} // end of MakeValueArray
/* -------------------------- Class ARRAY ---------------------------- */
/***********************************************************************/
/* ARRAY public constructor. */
/***********************************************************************/
ARRAY::ARRAY(PGLOBAL g, int type, int size, int length, int prec)
: CSORT(FALSE)
{
Nval = 0;
Ndif = 0;
Bot = 0;
Top = 0;
Size = size;
Type = type;
Xsize = -1;
Len = 1;
switch ((Type = type)) {
case TYPE_STRING:
Len = length;
break;
case TYPE_SHORT:
case TYPE_INT:
case TYPE_DOUBLE:
break;
#if 0
case TYPE_TOKEN:
break;
case TYPE_LIST:
Len = 0;
prec = length;
break;
#endif // 0
default: // This is illegal an causes an ill formed array building
sprintf(g->Message, MSG(BAD_ARRAY_TYPE), type);
Type = TYPE_ERROR;
return;
} // endswitch type
Valblk = new(g) MBVALS;
Vblp = Valblk->Allocate(g, Type, Len, prec, Size);
if (!Valblk->GetMemp() && Type != TYPE_LIST)
// The error message was built by PlgDBalloc
Type = TYPE_ERROR;
else
Value = AllocateValue(g, type, Len, prec, NULL);
Constant = TRUE;
} // end of ARRAY constructor
#if 0
/***********************************************************************/
/* ARRAY public constructor from a QUERY. */
/***********************************************************************/
ARRAY::ARRAY(PGLOBAL g, PQUERY qryp) : CSORT(FALSE)
{
Type = qryp->GetColType(0);
Nval = qryp->GetNblin();
Ndif = 0;
Bot = 0;
Top = 0;
Size = Nval;
Xsize = -1;
Len = qryp->GetColLength(0);
X = Inf = Sup = 0;
Correlated = FALSE;
switch (Type) {
case TYPE_STRING:
case TYPE_SHORT:
case TYPE_INT:
case TYPE_DATE:
case TYPE_DOUBLE:
// case TYPE_TOKEN:
// case TYPE_LIST:
// Valblk = qryp->GetCol(0)->Result;
// Vblp = qryp->GetColBlk(0);
// Value = qryp->GetColValue(0);
// break;
default: // This is illegal an causes an ill formed array building
sprintf(g->Message, MSG(BAD_ARRAY_TYPE), Type);
Type = TYPE_ERROR;
} // endswitch type
if (!Valblk || (!Valblk->GetMemp() && Type != TYPE_LIST))
// The error message was built by ???
Type = TYPE_ERROR;
Constant = TRUE;
} // end of ARRAY constructor
/***********************************************************************/
/* ARRAY constructor from a TYPE_LIST subarray. */
/***********************************************************************/
ARRAY::ARRAY(PGLOBAL g, PARRAY par, int k) : CSORT(FALSE)
{
int prec;
LSTBLK *lp;
if (par->Type != TYPE_LIST) {
Type = TYPE_ERROR;
return;
} // endif Type
lp = (LSTBLK*)par->Vblp;
Nval = par->Nval;
Ndif = 0;
Bot = 0;
Top = 0;
Size = par->Size;
Xsize = -1;
Valblk = lp->Mbvk[k];
Vblp = Valblk->Vblk;
Type = Vblp->GetType();
Len = (Type == TYPE_STRING) ? Vblp->GetVlen() : 0;
prec = (Type == TYPE_FLOAT) ? 2 : 0;
Value = AllocateValue(g, Type, Len, prec, NULL);
Constant = TRUE;
} // end of ARRAY constructor
#endif // 0
/***********************************************************************/
/* Empty: reset the array for a new use (correlated queries). */
/* Note: this is temporary as correlated queries will not use arrays */
/* anymore with future optimized algorithms. */
/***********************************************************************/
void ARRAY::Empty(void)
{
assert(Correlated);
Nval = Ndif = 0;
Bot = Top = X = Inf = Sup = 0;
} // end of Empty
/***********************************************************************/
/* Add a string element to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, PSZ strp)
{
if (Type != TYPE_STRING) {
sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "CHAR");
return TRUE;
} // endif Type
if (trace)
htrc(" adding string(%d): '%s'\n", Nval, strp);
//Value->SetValue_psz(strp);
//Vblp->SetValue(valp, Nval++);
Vblp->SetValue(strp, Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Add a SHORT integer element to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, SHORT n)
{
if (Type != TYPE_SHORT) {
sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "SHORT");
return TRUE;
} // endif Type
if (trace)
htrc(" adding SHORT(%d): %hd\n", Nval, n);
//Value->SetValue(n);
//Vblp->SetValue(valp, Nval++);
Vblp->SetValue(n, Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Add a int integer element to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, int n)
{
if (Type != TYPE_INT) {
sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "INTEGER");
return TRUE;
} // endif Type
if (trace)
htrc(" adding int(%d): %d\n", Nval, n);
//Value->SetValue(n);
//Vblp->SetValue(valp, Nval++);
Vblp->SetValue(n, Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Add a double float element to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, double d)
{
if (Type != TYPE_DOUBLE) {
sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "DOUBLE");
return TRUE;
} // endif Type
if (trace)
htrc(" adding float(%d): %lf\n", Nval, d);
Value->SetValue(d);
Vblp->SetValue(Value, Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Add the value of a XOBJECT block to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, PXOB xp)
{
if (Type != xp->GetResultType()) {
sprintf(g->Message, MSG(ADD_BAD_TYPE),
GetTypeName(xp->GetResultType()), GetTypeName(Type));
return TRUE;
} // endif Type
if (trace)
htrc(" adding (%d) from xp=%p\n", Nval, xp);
//AddValue(xp->GetValue());
Vblp->SetValue(xp->GetValue(), Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Add a value to an array. */
/***********************************************************************/
bool ARRAY::AddValue(PGLOBAL g, PVAL vp)
{
if (Type != vp->GetType()) {
sprintf(g->Message, MSG(ADD_BAD_TYPE),
GetTypeName(vp->GetType()), GetTypeName(Type));
return TRUE;
} // endif Type
if (trace)
htrc(" adding (%d) from vp=%p\n", Nval, vp);
Vblp->SetValue(vp, Nval++);
return FALSE;
} // end of AddValue
/***********************************************************************/
/* Retrieve the nth value of the array. */
/***********************************************************************/
void ARRAY::GetNthValue(PVAL valp, int n)
{
valp->SetValue_pvblk(Vblp, n);
} // end of GetNthValue
#if 0
/***********************************************************************/
/* Retrieve the nth subvalue of a list array. */
/***********************************************************************/
bool ARRAY::GetSubValue(PGLOBAL g, PVAL valp, int *kp)
{
PVBLK vblp;
if (Type != TYPE_LIST) {
sprintf(g->Message, MSG(NO_SUB_VAL), Type);
return TRUE;
} // endif Type
vblp = ((LSTBLK*)Vblp)->Mbvk[kp[0]]->Vblk;
valp->SetValue_pvblk(vblp, kp[1]);
return FALSE;
} // end of GetNthValue
#endif // 0
/***********************************************************************/
/* Return the nth value of a STRING array. */
/***********************************************************************/
char *ARRAY::GetStringValue(int n)
{
assert (Type == TYPE_STRING);
return Vblp->GetCharValue(n);
} // end of GetStringValue
/***********************************************************************/
/* Find whether a value is in an array. */
/* Provide a conversion limited to the Value limitation. */
/***********************************************************************/
bool ARRAY::Find(PVAL valp)
{
register int n;
PVAL vp;
if (Type != valp->GetType()) {
Value->SetValue_pval(valp);
vp = Value;
} else
vp = valp;
Inf = Bot, Sup = Top;
while (Sup - Inf > 1) {
X = (Inf + Sup) >> 1;
n = Vblp->CompVal(vp, X);
if (n < 0)
Sup = X;
else if (n > 0)
Inf = X;
else
return TRUE;
} // endwhile
return FALSE;
} // end of Find
/***********************************************************************/
/* ARRAY: Compare routine for a list of values. */
/***********************************************************************/
BYTE ARRAY::Vcompare(PVAL vp, int n)
{
Value->SetValue_pvblk(Vblp, n);
return vp->TestValue(Value);
} // end of Vcompare
/***********************************************************************/
/* Test a filter condition on an array depending on operator and mod. */
/* Modificator values are 1: ANY (or SOME) and 2: ALL. */
/***********************************************************************/
bool ARRAY::FilTest(PGLOBAL g, PVAL valp, OPVAL opc, int opm)
{
int i;
PVAL vp;
BYTE bt = OpBmp(g, opc);
int top = Nval - 1;
if (top < 0) // Array is empty
// Return TRUE for ALL because it means that there are no item that
// does not verify the condition, which is true indeed.
// Return FALSE for ANY because TRUE means that there is at least
// one item that verifies the condition, which is false.
return opm == 2;
if (valp) {
if (Type != valp->GetType()) {
Value->SetValue_pval(valp);
vp = Value;
} else
vp = valp;
} else if (opc != OP_EXIST) {
sprintf(g->Message, MSG(MISSING_ARG), opc);
longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
} else // OP_EXIST
return Nval > 0;
if (opc == OP_IN || (opc == OP_EQ && opm == 1))
return Find(vp);
else if (opc == OP_NE && opm == 2)
return !Find(vp);
else if (opc == OP_EQ && opm == 2)
return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : FALSE;
else if (opc == OP_NE && opm == 1)
return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : TRUE;
if (Type != TYPE_LIST) {
if (opc == OP_GT || opc == OP_GE)
return !(Vcompare(vp, (opm == 1) ? 0 : top) & bt);
else
return !(Vcompare(vp, (opm == 2) ? 0 : top) & bt);
} // endif Type
// Case of TYPE_LIST
if (opm == 2) {
for (i = 0; i < Nval; i++)
if (Vcompare(vp, i) & bt)
return FALSE;
return TRUE;
} else { // opm == 1
for (i = 0; i < Nval; i++)
if (!(Vcompare(vp, i) & bt))
return TRUE;
return FALSE;
} // endif opm
} // end of FilTest
/***********************************************************************/
/* Test whether this array can be converted to TYPE_SHORT. */
/* Must be called after the array is sorted. */
/***********************************************************************/
bool ARRAY::CanBeShort(void)
{
int* To_Val = (int*)Valblk->GetMemp();
if (Type != TYPE_INT || !Ndif)
return FALSE;
// Because the array is sorted, this is true if all the array
// int values are in the range of SHORT values
return (To_Val[0] >= -32768 && To_Val[Nval-1] < 32768);
} // end of CanBeShort
/***********************************************************************/
/* Convert an array to new numeric type k. */
/* Note: conversion is always made in ascending order from STRING to */
/* SHORT to int to double so no precision is lost in the conversion. */
/* One exception is converting from int to SHORT compatible arrays. */
/***********************************************************************/
int ARRAY::Convert(PGLOBAL g, int k, PVAL vp)
{
int i, prec = 0;
bool b = FALSE;
PMBV ovblk = Valblk;
PVBLK ovblp = Vblp;
Type = k; // k is the new type
Valblk = new(g) MBVALS;
switch (Type) {
case TYPE_DOUBLE:
prec = 2;
case TYPE_SHORT:
case TYPE_INT:
case TYPE_DATE:
Len = 1;
break;
default:
sprintf(g->Message, MSG(BAD_CONV_TYPE), Type);
return TYPE_ERROR;
} // endswitch k
Size = Nval;
Nval = 0;
Vblp = Valblk->Allocate(g, Type, Len, 0, Size);
if (!Valblk->GetMemp())
// The error message was built by PlgDBalloc
return TYPE_ERROR;
else
Value = AllocateValue(g, Type, Len, 0, NULL);
/*********************************************************************/
/* Converting STRING to DATE can be done according to date format. */
/*********************************************************************/
if (Type == TYPE_DATE && ovblp->GetType() == TYPE_STRING && vp)
if (((DTVAL*)Value)->SetFormat(g, vp))
return TYPE_ERROR;
else
b = TRUE; // Sort the new array on date internal values
/*********************************************************************/
/* Do the actual conversion. */
/*********************************************************************/
for (i = 0; i < Size; i++) {
Value->SetValue_pvblk(ovblp, i);
if (AddValue(g, Value))
return TYPE_ERROR;
} // endfor i
/*********************************************************************/
/* For sorted arrays, get the initial find values. */
/*********************************************************************/
if (b)
Sort(g);
ovblk->Free();
return Type;
} // end of Convert
/***********************************************************************/
/* ARRAY Save: save value at i (used while rordering). */
/***********************************************************************/
void ARRAY::Save(int i)
{
Value->SetValue_pvblk(Vblp, i);
} // end of Save
/***********************************************************************/
/* ARRAY Restore: restore value to j (used while rordering). */
/***********************************************************************/
void ARRAY::Restore(int j)
{
Vblp->SetValue(Value, j);
} // end of Restore
/***********************************************************************/
/* ARRAY Move: move value from k to j (used while rordering). */
/***********************************************************************/
void ARRAY::Move(int j, int k)
{
Vblp->Move(k, j); // VALBLK does the opposite !!!
} // end of Move
/***********************************************************************/
/* ARRAY: Compare routine for one LIST value (ascending only). */
/***********************************************************************/
int ARRAY::Qcompare(int *i1, int *i2)
{
return Vblp->CompVal(*i1, *i2);
} // end of Qcompare
/***********************************************************************/
/* Mainly meant to set the character arrays case sensitiveness. */
/***********************************************************************/
void ARRAY::SetPrecision(PGLOBAL g, int p)
{
if (Vblp == NULL) {
strcpy(g->Message, MSG(PREC_VBLP_NULL));
longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
} // endif Vblp
bool was = Vblp->IsCi();
if (was && !p) {
strcpy(g->Message, MSG(BAD_SET_CASE));
longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
} // endif Vblp
if (was || !p)
return;
else
Vblp->SetPrec(p);
if (!was && Type == TYPE_STRING)
// Must be resorted to eliminate duplicate strings
if (Sort(g))
longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
} // end of SetPrecision
/***********************************************************************/
/* Sort and eliminate distinct values from an array. */
/* Note: this is done by making a sorted index on distinct values. */
/* Returns FALSE if Ok or TRUE in case of error. */
/***********************************************************************/
bool ARRAY::Sort(PGLOBAL g)
{
int i, j, k;
// This is to avoid multiply allocating for correlated subqueries
if (Nval > Xsize) {
if (Xsize >= 0) {
// Was already allocated
PlgDBfree(Index);
PlgDBfree(Offset);
} // endif Xsize
// Prepare non conservative sort with offet values
Index.Size = Nval * sizeof(int);
if (!PlgDBalloc(g, NULL, Index))
goto error;
Offset.Size = (Nval + 1) * sizeof(int);
if (!PlgDBalloc(g, NULL, Offset))
goto error;
Xsize = Nval;
} // endif Nval
// Call the sort program, it returns the number of distinct values
Ndif = Qsort(g, Nval);
if (Ndif < 0)
goto error;
// Use the sort index to reorder the data in storage so it will
// be physically sorted and Index can be removed.
for (i = 0; i < Nval; i++) {
if (Pex[i] == i || Pex[i] == Nval)
// Already placed or already moved
continue;
Save(i);
for (j = i;; j = k) {
k = Pex[j];
Pex[j] = Nval; // Mark position as set
if (k == i) {
Restore(j);
break; // end of loop
} else
Move(j, k);
} // endfor j
} // endfor i
// Reduce the size of the To_Val array if Ndif < Nval
if (Ndif < Nval) {
for (i = 1; i < Ndif; i++)
if (i != Pof[i])
break;
for (; i < Ndif; i++)
Move(i, Pof[i]);
Nval = Ndif;
} // endif ndif
if (!Correlated) {
if (Size > Nval) {
Size = Nval;
Valblk->ReAllocate(g, Size);
} // endif Size
// Index and Offset are not used anymore
PlgDBfree(Index);
PlgDBfree(Offset);
Xsize = -1;
} // endif Correlated
Bot = -1; // For non optimized search
Top = Ndif; // Find searches the whole array.
return FALSE;
error:
Nval = Ndif = 0;
Valblk->Free();
PlgDBfree(Index);
PlgDBfree(Offset);
return TRUE;
} // end of Sort
/***********************************************************************/
/* Block filter testing for IN operator on Column/Array operands. */
/* Here we call Find that returns TRUE if the value is in the array */
/* with X equal to the index of the found value in the array, or */
/* FALSE if the value is not in the array with Inf and Sup being the */
/* indexes of the array values that are immediately below and over */
/* the not found value. This enables to restrict the array to the */
/* values that are between the min and max block values and to return */
/* the indication of whether the Find will be always true, always not */
/* true or other. */
/***********************************************************************/
int ARRAY::BlockTest(PGLOBAL g, int opc, int opm,
void *minp, void *maxp, bool s)
{
bool bin, bax, pin, pax, veq, all = (opm == 2);
if (Ndif == 0) // Array is empty
// Return TRUE for ALL because it means that there are no item that
// does not verify the condition, which is true indeed.
// Return FALSE for ANY because TRUE means that there is at least
// one item that verifies the condition, which is false.
return (all) ? 2 : -2;
else if (opc == OP_EQ && all && Ndif > 1)
return -2;
else if (opc == OP_NE && !all && Ndif > 1)
return 2;
// else if (Ndif == 1)
// all = FALSE;
// veq is true when all values in the block are equal
switch (Type) {
case TYPE_STRING: veq = (Vblp->IsCi())
? !stricmp((char*)minp, (char*)maxp)
: !strcmp((char*)minp, (char*)maxp); break;
case TYPE_SHORT: veq = *(SHORT*)minp == *(SHORT*)maxp; break;
case TYPE_INT: veq = *(PINT)minp == *(PINT)maxp; break;
case TYPE_DOUBLE: veq = *(double*)minp == *(double*)maxp; break;
default: veq = FALSE; // Error ?
} // endswitch type
if (!s)
Bot = -1;
Top = Ndif; // Reset Top at top of list
Value->SetBinValue(maxp);
Top = (bax = Find(Value)) ? X + 1 : Sup;
if (bax) {
if (opc == OP_EQ)
return (veq) ? 1 : 0;
else if (opc == OP_NE)
return (veq) ? -1 : 0;
if (X == 0) switch (opc) {
// Max value is equal to min list value
case OP_LE: return 1; break;
case OP_LT: return (veq) ? -1 : 0; break;
case OP_GE: return (veq) ? 1 : 0; break;
case OP_GT: return -1; break;
} // endswitch opc
pax = (opc == OP_GE) ? (X < Ndif - 1) : TRUE;
} else if (Inf == Bot) {
// Max value is smaller than min list value
return (opc == OP_LT || opc == OP_LE || opc == OP_NE) ? 1 : -1;
} else
pax = (Sup < Ndif); // True if max value is inside the list value
if (!veq) {
Value->SetBinValue(minp);
bin = Find(Value);
} else
bin = bax;
Bot = (bin) ? X - 1 : Inf;
if (bin) {
if (opc == OP_EQ || opc == OP_NE)
return 0;
if (X == Ndif - 1) switch (opc) {
case OP_GE: return (s) ? 2 : 1; break;
case OP_GT: return (veq) ? -1 : 0; break;
case OP_LE: return (veq) ? 1 : 0; break;
case OP_LT: return (s) ? -2 : -1; break;
} // endswitch opc
pin = (opc == OP_LE) ? (X > 0) : TRUE;
} else if (Sup == Ndif) {
// Min value is greater than max list value
if (opc == OP_GT || opc == OP_GE || opc == OP_NE)
return (s) ? 2 : 1;
else
return (s) ? -2 : -1;
} else
pin = (Inf >= 0); // True if min value is inside the list value
if (Top - Bot <= 1) {
// No list item between min and max value
#if defined(_DEBUG)
assert (!bin && !bax);
#endif
switch (opc) {
case OP_EQ: return -1; break;
case OP_NE: return 1; break;
default: return (all) ? -1 : 1; break;
} // endswitch opc
} // endif
#if defined(_DEBUG)
assert (Ndif > 1); // if Ndif = 1 we should have returned already
#endif
// At this point, if there are no logical errors in the algorithm,
// the only possible overlaps between the array and the block are:
// Array: +-------+ +-------+ +-------+ +-----+
// Block: +-----+ +---+ +------+ +--------+
// TRUE: pax pin pax pin
if (all) switch (opc) {
case OP_GT:
case OP_GE: return (pax) ? -1 : 0; break;
case OP_LT:
case OP_LE: return (pin) ? -1 : 0; break;
} // endswitch opc
return 0;
} // end of BlockTest
/***********************************************************************/
/* MakeArrayList: Makes a value list from an SQL IN array (in work). */
/***********************************************************************/
PSZ ARRAY::MakeArrayList(PGLOBAL g)
{
char *p, *tp;
int i;
size_t z, len = 2;
if (Type == TYPE_LIST)
return "(???)"; // To be implemented
z = max(24, GetTypeSize(Type, Len) + 4);
tp = (char*)PlugSubAlloc(g, NULL, z);
for (i = 0; i < Nval; i++) {
Value->SetValue_pvblk(Vblp, i);
Value->Print(g, tp, z);
len += strlen(tp);
} // enfor i
if (trace)
htrc("Arraylist: len=%d\n", len);
p = (char *)PlugSubAlloc(g, NULL, len);
strcpy(p, "(");
for (i = 0; i < Nval;) {
Value->SetValue_pvblk(Vblp, i);
Value->Print(g, tp, z);
strcat(p, tp);
strcat(p, (++i == Nval) ? ")" : ",");
} // enfor i
if (trace)
htrc("Arraylist: newlen=%d\n", strlen(p));
return p;
} // end of MakeArrayList
/***********************************************************************/
/* Make file output of ARRAY contents. */
/***********************************************************************/
void ARRAY::Print(PGLOBAL g, FILE *f, UINT n)
{
char m[64];
int lim = min(Nval,10);
memset(m, ' ', n); // Make margin string
m[n] = '\0';
fprintf(f, "%sARRAY: type=%d\n", m, Type);
memset(m, ' ', n + 2); // Make margin string
m[n] = '\0';
if (Type != TYPE_LIST) {
fprintf(f, "%sblock=%p numval=%d\n", m, Valblk->GetMemp(), Nval);
if (Vblp)
for (int i = 0; i < lim; i++) {
Value->SetValue_pvblk(Vblp, i);
Value->Print(g, f, n+4);
} // endfor i
} else
fprintf(f, "%sVALLST: numval=%d\n", m, Nval);
} // end of Print
/***********************************************************************/
/* Make string output of ARRAY contents. */
/***********************************************************************/
void ARRAY::Print(PGLOBAL g, char *ps, UINT z)
{
if (z < 16)
return;
sprintf(ps, "ARRAY: type=%d\n", Type);
// More to be implemented later
} // end of Print
/* -------------------------- Class MULAR ---------------------------- */
/***********************************************************************/
/* MULAR public constructor. */
/***********************************************************************/
MULAR::MULAR(PGLOBAL g, int n) : CSORT(FALSE)
{
Narray = n;
Pars = (PARRAY*)PlugSubAlloc(g, NULL, n * sizeof(PARRAY));
} // end of MULAR constructor
/***********************************************************************/
/* MULAR: Compare routine multiple arrays. */
/***********************************************************************/
int MULAR::Qcompare(int *i1, int *i2)
{
register int i, n = 0;
for (i = 0; i < Narray; i++)
if ((n = Pars[i]->Qcompare(i1, i2)))
break;
return n;
} // end of Qcompare
/***********************************************************************/
/* Sort and eliminate distinct values from multiple arrays. */
/* Note: this is done by making a sorted index on distinct values. */
/* Returns FALSE if Ok or TRUE in case of error. */
/***********************************************************************/
bool MULAR::Sort(PGLOBAL g)
{
int i, j, k, n, nval, ndif;
// All arrays must have the same number of values
nval = Pars[0]->Nval;
for (n = 1; n < Narray; n++)
if (Pars[n]->Nval != nval) {
strcpy(g->Message, MSG(BAD_ARRAY_VAL));
return TRUE;
} // endif nval
// Prepare non conservative sort with offet values
Index.Size = nval * sizeof(int);
if (!PlgDBalloc(g, NULL, Index))
goto error;
Offset.Size = (nval + 1) * sizeof(int);
if (!PlgDBalloc(g, NULL, Offset))
goto error;
// Call the sort program, it returns the number of distinct values
ndif = Qsort(g, nval);
if (ndif < 0)
goto error;
// Use the sort index to reorder the data in storage so it will
// be physically sorted and Index can be removed.
for (i = 0; i < nval; i++) {
if (Pex[i] == i || Pex[i] == nval)
// Already placed or already moved
continue;
for (n = 0; n < Narray; n++)
Pars[n]->Save(i);
for (j = i;; j = k) {
k = Pex[j];
Pex[j] = nval; // Mark position as set
if (k == i) {
for (n = 0; n < Narray; n++)
Pars[n]->Restore(j);
break; // end of loop
} else
for (n = 0; n < Narray; n++)
Pars[n]->Move(j, k);
} // endfor j
} // endfor i
// Reduce the size of the To_Val array if ndif < nval
if (ndif < nval) {
for (i = 1; i < ndif; i++)
if (i != Pof[i])
break;
for (; i < ndif; i++)
for (n = 0; n < Narray; n++)
Pars[n]->Move(i, Pof[i]);
for (n = 0; n < Narray; n++) {
Pars[n]->Nval = ndif;
Pars[n]->Size = ndif;
Pars[n]->Valblk->ReAllocate(g, ndif);
} // endfor n
} // endif ndif
// Index and Offset are not used anymore
PlgDBfree(Index);
PlgDBfree(Offset);
for (n = 0; n < Narray; n++) {
Pars[n]->Bot = -1; // For non optimized search
Pars[n]->Top = ndif; // Find searches the whole array.
} // endfor n
return FALSE;
error:
PlgDBfree(Index);
PlgDBfree(Offset);
return TRUE;
} // end of Sort
/**************** Array H Declares Source Code File (.H) ***************/
/* Name: ARRAY.H Version 3.1 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2005-2014 */
/* */
/* This file contains the ARRAY and VALBASE derived classes declares. */
/***********************************************************************/
/***********************************************************************/
/* Include required application header files */
/***********************************************************************/
#include "xobject.h"
#include "valblk.h"
#include "csort.h"
typedef class ARRAY *PARRAY;
/***********************************************************************/
/* Definition of class ARRAY with all its method functions. */
/* Note: This is not a general array class that could be defined as */
/* a template class, but rather a specific object containing a list */
/* of values to be processed by the filter IN operator. */
/* In addition it must act as a metaclass by being able to give back */
/* the type of values it contains. */
/* It must also be able to convert itself from some type to another. */
/***********************************************************************/
class DllExport ARRAY : public XOBJECT, public CSORT { // Array descblock
friend class MULAR;
//friend class VALLST;
//friend class SFROW;
public:
// Constructors
ARRAY(PGLOBAL g, int type, int size, int len = 1, int prec = 0);
//ARRAY(PGLOBAL g, PQUERY qryp);
//ARRAY(PGLOBAL g, PARRAY par, int k);
// Implementation
virtual int GetType(void) {return TYPE_ARRAY;}
virtual int GetResultType(void) {return Type;}
virtual int GetLength(void) {return Len;}
virtual int GetLengthEx(void) {return Len;}
virtual int GetScale() {return 0;}
int GetNval(void) {return Nval;}
int GetSize(void) {return Size;}
// PVAL GetValp(void) {return Valp;}
void SetType(int atype) {Type = atype;}
void SetCorrel(bool b) {Correlated = b;}
// Methods
virtual void Reset(void) {Bot = -1;}
virtual int Qcompare(int *, int *);
virtual bool Compare(PXOB) {assert(FALSE); return FALSE;}
virtual bool SetFormat(PGLOBAL, FORMAT&) {assert(FALSE); return FALSE;}
virtual int CheckSpcCol(PTDB, int) {return 0;}
virtual void Print(PGLOBAL g, FILE *f, UINT n);
virtual void Print(PGLOBAL g, char *ps, UINT z);
void Empty(void);
void SetPrecision(PGLOBAL g, int p);
bool AddValue(PGLOBAL g, PSZ sp);
bool AddValue(PGLOBAL g, SHORT n);
bool AddValue(PGLOBAL g, int n);
bool AddValue(PGLOBAL g, double f);
bool AddValue(PGLOBAL g, PXOB xp);
bool AddValue(PGLOBAL g, PVAL vp);
void GetNthValue(PVAL valp, int n);
char *GetStringValue(int n);
BYTE Vcompare(PVAL vp, int n);
void Save(int);
void Restore(int);
void Move(int, int);
bool Sort(PGLOBAL g);
bool Find(PVAL valp);
bool FilTest(PGLOBAL g, PVAL valp, OPVAL opc, int opm);
int Convert(PGLOBAL g, int k, PVAL vp = NULL);
int BlockTest(PGLOBAL g, int opc, int opm,
void *minp, void *maxp, bool s);
PSZ MakeArrayList(PGLOBAL g);
bool CanBeShort(void);
bool GetSubValue(PGLOBAL g, PVAL valp, int *kp);
protected:
// Members
PMBV Valblk; // To the MBVALS class
PVBLK Vblp; // To Valblock of the data array
//PVAL Valp; // The value used for Save and Restore is Value
int Size; // Size of value array
int Nval; // Total number of items in array
int Ndif; // Total number of distinct items in array
int Xsize; // Size of Index (used for correlated arrays)
int Type; // Type of individual values in the array
int Len; // Length of character string
int Bot; // Bottom of research index
int Top; // Top of research index
int X, Inf, Sup; // Used for block optimization
bool Correlated; // -----------> Temporary
}; // end of class ARRAY
/***********************************************************************/
/* Definition of class MULAR with all its method functions. */
/* This class is used when constructing the arrays of constants used */
/* for indexing. Its only purpose is to provide a way to sort, reduce */
/* and reorder the arrays of multicolumn indexes as one block. Indeed */
/* sorting the arrays independantly would break the correspondance of */
/* column values. */
/***********************************************************************/
class MULAR : public CSORT, public BLOCK { // No need to be an XOBJECT
public:
// Constructor
MULAR(PGLOBAL g, int n);
// Implementation
void SetPars(PARRAY par, int i) {Pars[i] = par;}
// Methods
virtual int Qcompare(int *i1, int *i2); // Sort compare routine
bool Sort(PGLOBAL g);
protected:
// Members
int Narray; // The number of sub-arrays
PARRAY *Pars; // To the block of real arrays
}; // end of class ARRAY
/************* BlkFil C++ Program Source Code File (.CPP) **************/
/* PROGRAM NAME: BLKFIL */
/* ------------- */
/* Version 2.5 */
/* */
/* COPYRIGHT: */
/* ---------- */
/* (C) Copyright to the author Olivier BERTRAND 2004-2014 */
/* */
/* WHAT THIS PROGRAM DOES: */
/* ----------------------- */
/* This program is the implementation of block indexing classes. */
/* */
/***********************************************************************/
/***********************************************************************/
/* Include relevant MariaDB header file. */
/***********************************************************************/
#include "my_global.h"
#include "sql_class.h"
//#include "sql_time.h"
#if defined(WIN32)
//#include <windows.h>
#else // !WIN32
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#endif // !WIN32
/***********************************************************************/
/* Include application header files: */
/***********************************************************************/
#include "global.h" // global declarations
#include "plgdbsem.h" // DB application declarations
#include "xindex.h" // Key Index class declarations
#include "filamtxt.h" // File access method dcls
#include "tabdos.h" // TDBDOS and DOSCOL class dcls
#include "array.h" // ARRAY classes dcls
#include "blkfil.h" // Block Filter classes dcls
/***********************************************************************/
/* Static variables. */
/***********************************************************************/
extern "C" int trace;
/* ------------------------ Class BLOCKFILTER ------------------------ */
/***********************************************************************/
/* BLOCKFILTER constructor. */
/***********************************************************************/
BLOCKFILTER::BLOCKFILTER(PTDBDOS tdbp, int op)
{
Tdbp = tdbp;
Correl = FALSE;
Opc = op;
Opm = 0;
Result = 0;
} // end of BLOCKFILTER constructor
/***********************************************************************/
/* Make file output of BLOCKFILTER contents. */
/***********************************************************************/
void BLOCKFILTER::Print(PGLOBAL g, FILE *f, UINT n)
{
char m[64];
memset(m, ' ', n); // Make margin string
m[n] = '\0';
fprintf(f, "%sBLOCKFILTER: at %p opc=%d opm=%d result=%d\n",
m, this, Opc, Opm, Result);
} // end of Print
/***********************************************************************/
/* Make string output of BLOCKFILTER contents. */
/***********************************************************************/
void BLOCKFILTER::Print(PGLOBAL g, char *ps, UINT z)
{
strncat(ps, "BlockFilter(s)", z);
} // end of Print
/* ---------------------- Class BLKFILLOG ---------------------------- */
/***********************************************************************/
/* BLKFILLOG constructor. */
/***********************************************************************/
BLKFILLOG::BLKFILLOG(PTDBDOS tdbp, int op, PBF *bfp, int n)
: BLOCKFILTER(tdbp, op)
{
N = n;
Fil = bfp;
for (int i = 0; i < N; i++)
if (Fil[i])
Correl |= Fil[i]->Correl;
} // end of BLKFILLOG constructor
/***********************************************************************/
/* Reset: this function is used only to check the existence of a */
/* BLKFILIN block and have it reset its Bot value for sorted columns. */
/***********************************************************************/
void BLKFILLOG::Reset(PGLOBAL g)
{
for (int i = 0; i < N; i++)
if (Fil[i])
Fil[i]->Reset(g);
} // end of Reset
/***********************************************************************/
/* This function is used for block filter evaluation. We use here a */
/* fuzzy logic between the values returned by evaluation blocks: */
/* -2: the condition will be always false for the rest of the file. */
/* -1: the condition will be false for the whole group. */
/* 0: the condition may be true for some of the group values. */
/* 1: the condition will be true for the whole group. */
/* 2: the condition will be always true for the rest of the file. */
/***********************************************************************/
int BLKFILLOG::BlockEval(PGLOBAL g)
{
int i, rc;
for (i = 0; i < N; i++) {
// 0: Means some block filter value may be True
rc = (Fil[i]) ? Fil[i]->BlockEval(g) : 0;
if (!i)
Result = (Opc == OP_NOT) ? -rc : rc;
else switch (Opc) {
case OP_AND:
Result = min(Result, rc);
break;
case OP_OR:
Result = max(Result, rc);
break;
default:
// Should never happen
Result = 0;
return Result;
} // endswitch Opc
} // endfor i
return Result;
} // end of BlockEval
/* ---------------------- Class BLKFILARI----------------------------- */
/***********************************************************************/
/* BLKFILARI constructor. */
/***********************************************************************/
BLKFILARI::BLKFILARI(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
: BLOCKFILTER(tdbp, op)
{
Colp = (PDOSCOL)xp[0];
if (xp[1]->GetType() == TYPE_COLBLK) {
Cpx = (PCOL)xp[1]; // Subquery pseudo constant column
Correl = TRUE;
} else
Cpx = NULL;
Sorted = Colp->IsSorted() > 0;
// Don't remember why this was changed. Anyway it is no good for
// correlated subqueries because the Value must reflect changes
if (Cpx)
Valp = xp[1]->GetValue();
else
Valp = AllocateValue(g, xp[1]->GetValue());
} // end of BLKFILARI constructor
/***********************************************************************/
/* Reset: re-eval the constant value in the case of pseudo constant */
/* column use in a correlated subquery. */
/***********************************************************************/
void BLKFILARI::Reset(PGLOBAL g)
{
if (Cpx) {
Cpx->Reset();
Cpx->Eval(g);
MakeValueBitmap(); // Does nothing for class BLKFILARI
} // endif Cpx
} // end of Reset
/***********************************************************************/
/* Evaluate block filter for arithmetic operators. */
/***********************************************************************/
int BLKFILARI::BlockEval(PGLOBAL g)
{
int mincmp, maxcmp, n;
#if defined(_DEBUG)
assert (Colp->IsClustered());
#endif
n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
mincmp = Colp->GetMin()->CompVal(Valp, n);
maxcmp = Colp->GetMax()->CompVal(Valp, n);
switch (Opc) {
case OP_EQ:
case OP_NE:
if (mincmp < 0) // Means minval > Val
Result = (Sorted) ? -2 : -1;
else if (maxcmp > 0) // Means maxval < Val
Result = -1;
else if (!mincmp && !maxcmp) // minval = maxval = val
Result = 1;
else
Result = 0;
break;
case OP_GT:
case OP_LE:
if (mincmp < 0) // minval > Val
Result = (Sorted) ? 2 : 1;
else if (maxcmp < 0) // maxval > Val
Result = 0;
else // maxval <= Val
Result = -1;
break;
case OP_GE:
case OP_LT:
if (mincmp <= 0) // minval >= Val
Result = (Sorted) ? 2 : 1;
else if (maxcmp <= 0) // Maxval >= Val
Result = 0;
else // Maxval < Val
Result = -1;
break;
} // endswitch Opc
switch (Opc) {
case OP_NE:
case OP_LE:
case OP_LT:
Result = -Result;
break;
} // endswitch Opc
if (trace)
htrc("BlockEval: op=%d n=%d rc=%d\n", Opc, n, Result);
return Result;
} // end of BlockEval
/* ---------------------- Class BLKFILAR2----------------------------- */
/***********************************************************************/
/* BLKFILAR2 constructor. */
/***********************************************************************/
BLKFILAR2::BLKFILAR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
: BLKFILARI(g, tdbp, op, xp)
{
MakeValueBitmap();
} // end of BLKFILAR2 constructor
/***********************************************************************/
/* MakeValueBitmap: Set the constant value bit map. It can be void */
/* if the constant value is not in the column distinct values list. */
/***********************************************************************/
void BLKFILAR2::MakeValueBitmap(void)
{
int i; // ndv = Colp->GetNdv();
bool found = FALSE;
PVBLK dval = Colp->GetDval();
assert(dval);
/*********************************************************************/
/* Here we cannot use Find because we must get the index */
/* of where to put the value if it is not found in the array. */
/* This is needed by operators other than OP_EQ or OP_NE. */
/*********************************************************************/
found = dval->Locate(Valp, i);
/*********************************************************************/
/* Set the constant value bitmap. The bitmaps are really matching */
/* the OP_EQ, OP_LE, and OP_LT operator but are also used for the */
/* other operators for which the Result will be inverted. */
/* The reason the bitmaps are not directly complemented for them is */
/* to be able to test easily the cases of sorted columns with Bxp, */
/* and the case of a void bitmap, which happens if the constant */
/* value is not in the column distinct values list. */
/*********************************************************************/
if (found) {
Bmp = 1 << i; // Bit of the found value
Bxp = Bmp - 1; // All smaller values
if (Opc != OP_LT && Opc != OP_GE)
Bxp |= Bmp; // Found value must be included
} else {
Bmp = 0;
Bxp = (1 << i) - 1;
} // endif found
if (!(Opc == OP_EQ || Opc == OP_NE))
Bmp = Bxp;
} // end of MakeValueBitmap
/***********************************************************************/
/* Evaluate XDB2 block filter for arithmetic operators. */
/***********************************************************************/
int BLKFILAR2::BlockEval(PGLOBAL g)
{
#if defined(_DEBUG)
assert (Colp->IsClustered());
#endif
int n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
ULONG bkmp = *(PULONG)Colp->GetBmap()->GetValPtr(n);
ULONG bres = Bmp & bkmp;
// Set result as if Opc were OP_EQ, OP_LT, or OP_LE
if (!bres) {
if (!Bmp)
Result = -2; // No good block in the table file
else if (!Sorted)
Result = -1; // No good values in this block
else // Sorted column, test for no more good blocks in file
Result = (Bxp & bkmp) ? -1 : -2;
} else
// Test whether all block values are good or only some ones
Result = (bres == bkmp) ? 1 : 0;
// For OP_NE, OP_GE, and OP_GT the result must be inverted.
switch (Opc) {
case OP_NE:
case OP_GE:
case OP_GT:
Result = -Result;
break;
} // endswitch Opc
if (trace)
htrc("BlockEval2: op=%d n=%d rc=%d\n", Opc, n, Result);
return Result;
} // end of BlockEval
/* ---------------------- Class BLKFILMR2----------------------------- */
/***********************************************************************/
/* BLKFILMR2 constructor. */
/***********************************************************************/
BLKFILMR2::BLKFILMR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
: BLKFILARI(g, tdbp, op, xp)
{
Nbm = Colp->GetNbm();
Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
MakeValueBitmap();
} // end of BLKFILMR2 constructor
/***********************************************************************/
/* MakeValueBitmap: Set the constant value bit map. It can be void */
/* if the constant value is not in the column distinct values list. */
/***********************************************************************/
void BLKFILMR2::MakeValueBitmap(void)
{
int i; // ndv = Colp->GetNdv();
bool found = FALSE, noteq = !(Opc == OP_EQ || Opc == OP_NE);
PVBLK dval = Colp->GetDval();
assert(dval);
for (i = 0; i < Nbm; i++)
Bmp[i] = Bxp[i] = 0;
/*********************************************************************/
/* Here we cannot use Find because we must get the index */
/* of where to put the value if it is not found in the array. */
/* This is needed by operators other than OP_EQ or OP_NE. */
/*********************************************************************/
found = dval->Locate(Valp, i);
/*********************************************************************/
/* For bitmaps larger than a ULONG, we must know where Bmp and Bxp */
/* are positioned in the ULONG bit map block array. */
/*********************************************************************/
N = i / MAXBMP;
i %= MAXBMP;
/*********************************************************************/
/* Set the constant value bitmaps. The bitmaps are really matching */
/* the OP_EQ, OP_LE, and OP_LT operator but are also used for the */
/* other operators for which the Result will be inverted. */
/* The reason the bitmaps are not directly complemented for them is */
/* to be able to easily test the cases of sorted columns with Bxp, */
/* and the case of a void bitmap, which happens if the constant */
/* value is not in the column distinct values list. */
/*********************************************************************/
if (found) {
Bmp[N] = 1 << i;
Bxp[N] = Bmp[N] - 1;
if (Opc != OP_LT && Opc != OP_GE)
Bxp[N] |= Bmp[N]; // Found value must be included
} else
Bxp[N] = (1 << i) - 1;
if (noteq)
Bmp[N] = Bxp[N];
Void = !Bmp[N]; // There are no good values in the file
for (i = 0; i < N; i++) {
Bxp[i] = ~0;
if (noteq)
Bmp[i] = Bxp[i];
Void = Void && !Bmp[i];
} // endfor i
if (!Bmp[N] && !Bxp[N])
N--;
} // end of MakeValueBitmap
/***********************************************************************/
/* Evaluate XDB2 block filter for arithmetic operators. */
/***********************************************************************/
int BLKFILMR2::BlockEval(PGLOBAL g)
{
#if defined(_DEBUG)
assert (Colp->IsClustered());
#endif
int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
bool fnd = FALSE, all = TRUE, gt = TRUE;
ULONG bres;
PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
// Set result as if Opc were OP_EQ, OP_LT, or OP_LE
for (i = 0; i < Nbm; i++)
if (i <= N) {
if ((bres = Bmp[i] & bkmp[i]))
fnd = TRUE; // Some good value(s) found in the block
if (bres != bkmp[i])
all = FALSE; // Not all block values are good
if (Bxp[i] & bkmp[i])
gt = FALSE; // Not all block values are > good value(s)
} else if (bkmp[i]) {
all = FALSE;
break;
} // endif's
if (!fnd) {
if (Void || (gt && Sorted))
Result = -2; // No (more) good block in file
else
Result = -1; // No good values in this block
} else
Result = (all) ? 1 : 0; // All block values are good
// For OP_NE, OP_GE, and OP_GT the result must be inverted.
switch (Opc) {
case OP_NE:
case OP_GE:
case OP_GT:
Result = -Result;
break;
} // endswitch Opc
if (trace)
htrc("BlockEval2: op=%d n=%d rc=%d\n", Opc, n, Result);
return Result;
} // end of BlockEval
/***********************************************************************/
/* BLKSPCARI constructor. */
/***********************************************************************/
BLKSPCARI::BLKSPCARI(PTDBDOS tdbp, int op, PXOB *xp, int bsize)
: BLOCKFILTER(tdbp, op)
{
if (xp[1]->GetType() == TYPE_COLBLK) {
Cpx = (PCOL)xp[1]; // Subquery pseudo constant column
Correl = TRUE;
} else
Cpx = NULL;
Valp = xp[1]->GetValue();
Val = (int)xp[1]->GetValue()->GetIntValue();
Bsize = bsize;
} // end of BLKFILARI constructor
/***********************************************************************/
/* Reset: re-eval the constant value in the case of pseudo constant */
/* column use in a correlated subquery. */
/***********************************************************************/
void BLKSPCARI::Reset(PGLOBAL g)
{
if (Cpx) {
Cpx->Reset();
Cpx->Eval(g);
Val = (int)Valp->GetIntValue();
} // endif Cpx
} // end of Reset
/***********************************************************************/
/* Evaluate block filter for arithmetic operators (ROWID) */
/***********************************************************************/
int BLKSPCARI::BlockEval(PGLOBAL g)
{
int mincmp, maxcmp, n, m;
n = Tdbp->GetCurBlk();
m = n * Bsize + 1; // Minimum Rowid value for this block
mincmp = (Val > m) ? 1 : (Val < m) ? (-1) : 0;
m = (n + 1) * Bsize; // Maximum Rowid value for this block
maxcmp = (Val > m) ? 1 : (Val < m) ? (-1) : 0;
switch (Opc) {
case OP_EQ:
case OP_NE:
if (mincmp < 0) // Means minval > Val
Result = -2; // Always sorted
else if (maxcmp > 0) // Means maxval < Val
Result = -1;
else if (!mincmp && !maxcmp) // minval = maxval = val
Result = 1;
else
Result = 0;
break;
case OP_GT:
case OP_LE:
if (mincmp < 0) // minval > Val
Result = 2; // Always sorted
else if (maxcmp < 0) // maxval > Val
Result = 0;
else // maxval <= Val
Result = -1;
break;
case OP_GE:
case OP_LT:
if (mincmp <= 0) // minval >= Val
Result = 2; // Always sorted
else if (maxcmp <= 0) // Maxval >= Val
Result = 0;
else // Maxval < Val
Result = -1;
break;
} // endswitch Opc
switch (Opc) {
case OP_NE:
case OP_LE:
case OP_LT:
Result = -Result;
break;
} // endswitch Opc
if (trace)
htrc("BlockEval: op=%d n=%d rc=%d\n", Opc, n, Result);
return Result;
} // end of BlockEval
/* ------------------------ Class BLKFILIN --------------------------- */
/***********************************************************************/
/* BLKFILIN constructor. */
/***********************************************************************/
BLKFILIN::BLKFILIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
: BLOCKFILTER(tdbp, op)
{
if (op == OP_IN) {
Opc = OP_EQ;
Opm = 1;
} else {
Opc = op;
Opm = opm;
} // endif op
Colp = (PDOSCOL)xp[0];
Arap = (PARRAY)xp[1];
Type = Arap->GetResultType();
if (Colp->GetResultType() != Type) {
sprintf(g->Message, "BLKFILIN: %s", MSG(VALTYPE_NOMATCH));
longjmp(g->jumper[g->jump_level], 99);
} else if (Colp->GetValue()->IsCi())
Arap->SetPrecision(g, 1); // Case insensitive
Sorted = Colp->IsSorted() > 0;
} // end of BLKFILIN constructor
/***********************************************************************/
/* Reset: have the sorted array reset its Bot value to -1 (bottom). */
/***********************************************************************/
void BLKFILIN::Reset(PGLOBAL g)
{
Arap->Reset();
// MakeValueBitmap(); // Does nothing for class BLKFILIN
} // end of Reset
/***********************************************************************/
/* Evaluate block filter for a IN operator on a constant array. */
/* Note: here we need to use the GetValPtrEx function to get a zero */
/* ended string in case of string argument. This is because the ARRAY */
/* can have a different width than the char column. */
/***********************************************************************/
int BLKFILIN::BlockEval(PGLOBAL g)
{
int n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
void *minp = Colp->GetMin()->GetValPtrEx(n);
void *maxp = Colp->GetMax()->GetValPtrEx(n);
Result = Arap->BlockTest(g, Opc, Opm, minp, maxp, Sorted);
return Result;
} // end of BlockEval
/* ------------------------ Class BLKFILIN2 -------------------------- */
/***********************************************************************/
/* BLKFILIN2 constructor. */
/* New version that takes care of all operators and modificators. */
/* It is also ready to handle the case of correlated sub-selects. */
/***********************************************************************/
BLKFILIN2::BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
: BLKFILIN(g, tdbp, op, opm, xp)
{
Nbm = Colp->GetNbm();
Valp = AllocateValue(g, Colp->GetValue());
Invert = (Opc == OP_NE || Opc == OP_GE || Opc ==OP_GT);
Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
MakeValueBitmap();
} // end of BLKFILIN2 constructor
/***********************************************************************/
/* MakeValueBitmap: Set the constant values bit map. It can be void */
/* if the constant values are not in the column distinct values list. */
/* The bitmaps are prepared for the EQ, LE, and LT operators and */
/* takes care of the ALL and ANY modificators. If the operators are */
/* NE, GE, or GT the modificator is inverted and the result will be. */
/***********************************************************************/
void BLKFILIN2::MakeValueBitmap(void)
{
int i, k, n, ndv = Colp->GetNdv();
bool found, noteq = !(Opc == OP_EQ || Opc == OP_NE);
bool all = (!Invert) ? (Opm == 2) : (Opm != 2);
ULONG btp;
PVBLK dval = Colp->GetDval();
N = -1;
// Take care of special cases
if (!(n = Arap->GetNval())) {
// Return TRUE for ALL because it means that there are no item that
// does not verify the condition, which is true indeed.
// Return FALSE for ANY because TRUE means that there is at least
// one item that verifies the condition, which is false.
Result = (Opm == 2) ? 2 : -2;
return;
} else if (!noteq && all && n > 1) {
// An item cannot be equal to all different values
// or an item is always unequal to any different values
Result = (Opc == OP_EQ) ? -2 : 2;
return;
} // endif's
for (i = 0; i < Nbm; i++)
Bmp[i] = Bxp[i] = 0;
for (k = 0; k < n; k++) {
Arap->GetNthValue(Valp, k);
found = dval->Locate(Valp, i);
N = i / MAXBMP;
btp = 1 << (i % MAXBMP);
if (found)
Bmp[N] |= btp;
// For LT and LE if ALL the condition applies to the smallest item
// if ANY it applies to the largest item. In the case of EQ we come
// here only if ANY or if n == 1, so it does applies to the largest.
if ((!k && all) || (k == n - 1 && !all)) {
Bxp[N] = btp - 1;
if (found && Opc != OP_LT && Opc != OP_GE)
Bxp[N] |= btp; // Found value must be included
} // endif k, opm
} // endfor k
if (noteq)
Bmp[N] = Bxp[N];
Void = !Bmp[N]; // There are no good values in the file
for (i = 0; i < N; i++) {
Bxp[i] = ~0;
if (noteq) {
Bmp[i] = Bxp[i];
Void = FALSE;
} // endif noteq
} // endfor i
if (!Bmp[N] && !Bxp[N]) {
if (--N < 0)
// All array values are smaller than block values
Result = (Invert) ? 2 : -2;
} else if (N == Nbm - 1 && (signed)Bmp[N] == (1 << (ndv % MAXBMP)) - 1) {
// Condition will be always TRUE or FALSE for the whole file
Result = (Invert) ? -2 : 2;
N = -1;
} // endif's
} // end of MakeValueBitmap
/***********************************************************************/
/* Evaluate block filter for set operators on a constant array. */
/* Note: here we need to use the GetValPtrEx function to get a zero */
/* ended string in case of string argument. This is because the ARRAY */
/* can have a different width than the char column. */
/***********************************************************************/
int BLKFILIN2::BlockEval(PGLOBAL g)
{
if (N < 0)
return Result; // Was set in MakeValueBitmap
int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
bool fnd = FALSE, all = TRUE, gt = TRUE;
ULONG bres;
PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
// Set result as if Opc were OP_EQ, OP_LT, or OP_LE
// The difference between ALL or ANY was handled in MakeValueBitmap
for (i = 0; i < Nbm; i++)
if (i <= N) {
if ((bres = Bmp[i] & bkmp[i]))
fnd = TRUE;
if (bres != bkmp[i])
all = FALSE;
if (Bxp[i] & bkmp[i])
gt = FALSE;
} else if (bkmp[i]) {
all = FALSE;
break;
} // endif's
if (!fnd) {
if (Void || (Sorted && gt))
Result = -2; // No more good block in file
else
Result = -1; // No good values in this block
} else if (all)
Result = 1; // All block values are good
else
Result = 0; // Block contains some good values
// For OP_NE, OP_GE, and OP_GT the result must be inverted.
switch (Opc) {
case OP_NE:
case OP_GE:
case OP_GT:
Result = -Result;
break;
} // endswitch Opc
return Result;
} // end of BlockEval
#if 0
/***********************************************************************/
/* BLKFILIN2 constructor. */
/***********************************************************************/
BLKFILIN2::BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
: BLKFILIN(g, tdbp, op, opm, xp)
{
// Currently, bitmap matching is only implemented for the IN operator
if (!(Bitmap = (op == OP_IN || (op == OP_EQ && opm != 2)))) {
Nbm = Colp->GetNbm();
N = 0;
return; // Revert to standard minmax method
} // endif minmax
int i, n;
ULONG btp;
PVAL valp = AllocateValue(g, Colp->GetValue());
PVBLK dval = Colp->GetDval();
Nbm = Colp->GetNbm();
N = -1;
Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
for (i = 0; i < Nbm; i++)
Bmp[i] = Bxp[i] = 0;
for (n = 0; n < Arap->GetNval(); n++) {
Arap->GetNthValue(valp, n);
if ((i = dval->Find(valp)) >= 0)
Bmp[i / MAXBMP] |= 1 << (i % MAXBMP);
} // endfor n
for (i = Nbm - 1; i >= 0; i--)
if (Bmp[i]) {
for (btp = Bmp[i]; btp; btp >>= 1)
Bxp[i] |= btp;
for (N = i--; i >= 0; i--)
Bxp[i] = ~0;
break;
} // endif Bmp
} // end of BLKFILIN2 constructor
/***********************************************************************/
/* Evaluate block filter for a IN operator on a constant array. */
/* Note: here we need to use the GetValPtrEx function to get a zero */
/* ended string in case of string argument. This is because the ARRAY */
/* can have a different width than the char column. */
/***********************************************************************/
int BLKFILIN2::BlockEval(PGLOBAL g)
{
if (N < 0)
return -2; // IN list contains no good values
int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
bool fnd = FALSE, all = TRUE, gt = TRUE;
ULONG bres;
PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
if (Bitmap) {
// For IN operator use the bitmap method
for (i = 0; i < Nbm; i++)
if (i <= N) {
if ((bres = Bmp[i] & bkmp[i]))
fnd = TRUE;
if (bres != bkmp[i])
all = FALSE;
if (Bxp[i] & bkmp[i])
gt = FALSE;
} else if (bkmp[i]) {
all = FALSE;
break;
} // endif's
if (!fnd) {
if (Sorted && gt)
Result = -2; // No more good block in file
else
Result = -1; // No good values in this block
} else if (all)
Result = 1; // All block values are good
else
Result = 0; // Block contains some good values
} else {
// For other than IN operators, revert to standard minmax method
int n = 0, ndv = Colp->GetNdv();
void *minp = NULL;
void *maxp = NULL;
ULONG btp;
PVBLK dval = Colp->GetDval();
for (i = 0; i < Nbm; i++)
for (btp = 1; btp && n < ndv; btp <<= 1, n++)
if (btp & bkmp[i]) {
if (!minp)
minp = dval->GetValPtrEx(n);
maxp = dval->GetValPtrEx(n);
} // endif btp
Result = Arap->BlockTest(g, Opc, Opm, minp, maxp, Colp->IsSorted());
} // endif Bitmap
return Result;
} // end of BlockEval
#endif // 0
/* ------------------------ Class BLKSPCIN --------------------------- */
/***********************************************************************/
/* BLKSPCIN constructor. */
/***********************************************************************/
BLKSPCIN::BLKSPCIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm,
PXOB *xp, int bsize)
: BLOCKFILTER(tdbp, op)
{
if (op == OP_IN) {
Opc = OP_EQ;
Opm = 1;
} else
Opm = opm;
Arap = (PARRAY)xp[1];
#if defined(_DEBUG)
assert (Opm);
assert (Arap->GetResultType() == TYPE_INT);
#endif
Bsize = bsize;
} // end of BLKSPCIN constructor
/***********************************************************************/
/* Reset: have the sorted array reset its Bot value to -1 (bottom). */
/***********************************************************************/
void BLKSPCIN::Reset(PGLOBAL g)
{
Arap->Reset();
} // end of Reset
/***********************************************************************/
/* Evaluate block filter for a IN operator on a constant array. */
/***********************************************************************/
int BLKSPCIN::BlockEval(PGLOBAL g)
{
int n = Tdbp->GetCurBlk();
int minrow = n * Bsize + 1; // Minimum Rowid value for this block
int maxrow = (n + 1) * Bsize; // Maximum Rowid value for this block
Result = Arap->BlockTest(g, Opc, Opm, &minrow, &maxrow, TRUE);
return Result;
} // end of BlockEval
/* ------------------------------------------------------------------- */
#if 0
/***********************************************************************/
/* Implementation of the BLOCKINDEX class. */
/***********************************************************************/
BLOCKINDEX::BLOCKINDEX(PBX nx, PDOSCOL cp, PKXBASE kp)
{
Next = nx;
Tdbp = (cp) ? (PTDBDOS)cp->GetTo_Tdb() : NULL;
Colp = cp;
Kxp = kp;
Type = (cp) ? cp->GetResultType() : TYPE_ERROR;
Sorted = (cp) ? cp->IsSorted() > 0 : FALSE;
Result = 0;
} // end of BLOCKINDEX constructor
/***********************************************************************/
/* Reset Bot and Top values of optimized Kindex blocks. */
/***********************************************************************/
void BLOCKINDEX::Reset(void)
{
if (Next)
Next->Reset();
Kxp->Reset();
} // end of Reset
/***********************************************************************/
/* Evaluate block indexing test. */
/***********************************************************************/
int BLOCKINDEX::BlockEval(PGLOBAL g)
{
#if defined(_DEBUG)
assert (Tdbp && Colp);
#endif
int n = Tdbp->GetCurBlk();
void *minp = Colp->GetMin()->GetValPtr(n);
void *maxp = Colp->GetMax()->GetValPtr(n);
Result = Kxp->BlockTest(g, minp, maxp, Type, Sorted);
return Result;
} // end of BlockEval
/***********************************************************************/
/* Make file output of BLOCKINDEX contents. */
/***********************************************************************/
void BLOCKINDEX::Print(PGLOBAL g, FILE *f, UINT n)
{
char m[64];
memset(m, ' ', n); // Make margin string
m[n] = '\0';
fprintf(f, "%sBLOCKINDEX: at %p next=%p col=%s kxp=%p result=%d\n",
m, this, Next, (Colp) ? Colp->GetName() : "Rowid", Kxp, Result);
if (Next)
Next->Print(g, f, n);
} // end of Print
/***********************************************************************/
/* Make string output of BLOCKINDEX contents. */
/***********************************************************************/
void BLOCKINDEX::Print(PGLOBAL g, char *ps, UINT z)
{
strncat(ps, "BlockIndex(es)", z);
} // end of Print
/* ------------------------------------------------------------------- */
/***********************************************************************/
/* Implementation of the BLOCKINDX2 class. */
/***********************************************************************/
BLOCKINDX2::BLOCKINDX2(PBX nx, PDOSCOL cp, PKXBASE kp)
: BLOCKINDEX(nx, cp, kp)
{
Nbm = Colp->GetNbm();
Dval = Colp->GetDval();
Bmap = Colp->GetBmap();
#if defined(_DEBUG)
assert(Dval && Bmap);
#endif
} // end of BLOCKINDX2 constructor
/***********************************************************************/
/* Evaluate block indexing test. */
/***********************************************************************/
int BLOCKINDX2::BlockEval(PGLOBAL g)
{
int n = Tdbp->GetCurBlk();
PUINT bmp = (PUINT)Bmap->GetValPtr(n * Nbm);
Result = Kxp->BlockTst2(g, Dval, bmp, Nbm, Type, Sorted);
return Result;
} // end of BlockEval
/* ------------------------------------------------------------------- */
/***********************************************************************/
/* Implementation of the BLKSPCINDX class. */
/***********************************************************************/
BLKSPCINDX::BLKSPCINDX(PBX nx, PTDBDOS tp, PKXBASE kp, int bsize)
: BLOCKINDEX(nx, NULL, kp)
{
Tdbp = tp;
Bsize = bsize;
Type = TYPE_INT;
Sorted = TRUE;
} // end of BLKSPCINDX constructor
/***********************************************************************/
/* Evaluate block indexing test. */
/***********************************************************************/
int BLKSPCINDX::BlockEval(PGLOBAL g)
{
int n = Tdbp->GetCurBlk();
int minrow = n * Bsize + 1; // Minimum Rowid value for this block
int maxrow = (n + 1) * Bsize; // Maximum Rowid value for this block
Result = Kxp->BlockTest(g, &minrow, &maxrow, TYPE_INT, TRUE);
return Result;
} // end of BlockEval
#endif // 0
/*************** BlkFil H Declares Source Code File (.H) ***************/
/* Name: BLKFIL.H Version 2.1 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2004-2010 */
/* */
/* This file contains the block optimization related classes declares */
/***********************************************************************/
#ifndef __BLKFIL__
#define __BLKFIL__
typedef class BLOCKFILTER *PBF;
typedef class BLOCKINDEX *PBX;
/***********************************************************************/
/* Definition of class BLOCKFILTER. */
/***********************************************************************/
class DllExport BLOCKFILTER : public BLOCK { /* Block Filter */
friend class BLKFILLOG;
public:
// Constructors
BLOCKFILTER(PTDBDOS tdbp, int op);
// Implementation
int GetResult(void) {return Result;}
bool Correlated(void) {return Correl;}
// Methods
virtual void Reset(PGLOBAL) = 0;
virtual int BlockEval(PGLOBAL) = 0;
virtual void Print(PGLOBAL g, FILE *f, UINT n);
virtual void Print(PGLOBAL g, char *ps, UINT z);
protected:
BLOCKFILTER(void) {} // Standard constructor not to be used
// Members
PTDBDOS Tdbp; // Owner TDB
bool Correl; // TRUE for correlated subqueries
int Opc; // Comparison operator
int Opm; // Operator modificator
int Result; // Result from evaluation
}; // end of class BLOCKFILTER
/***********************************************************************/
/* Definition of class BLKFILLOG (with Op=OP_AND,OP_OR, or OP_NOT) */
/***********************************************************************/
class DllExport BLKFILLOG : public BLOCKFILTER { /* Logical Op Block Filter */
public:
// Constructors
BLKFILLOG(PTDBDOS tdbp, int op, PBF *bfp, int n);
// Methods
virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
protected:
BLKFILLOG(void) {} // Standard constructor not to be used
// Members
PBF *Fil; // Points to Block filter args
int N;
}; // end of class BLKFILLOG
/***********************************************************************/
/* Definition of class BLKFILARI (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
/***********************************************************************/
class DllExport BLKFILARI : public BLOCKFILTER { /* Arithm. Op Block Filter */
public:
// Constructors
BLKFILARI(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
// Methods
virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
virtual void MakeValueBitmap(void) {}
protected:
BLKFILARI(void) {} // Standard constructor not to be used
// Members
PDOSCOL Colp; // Points to column argument
PCOL Cpx; // Point to subquery "constant" column
PVAL Valp; // Points to constant argument Value
bool Sorted; // True if the column is sorted
}; // end of class BLKFILARI
/***********************************************************************/
/* Definition of class BLKFILAR2 (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
/***********************************************************************/
class DllExport BLKFILAR2 : public BLKFILARI { /* Arithm. Op Block Filter */
public:
// Constructors
BLKFILAR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
// Methods
virtual int BlockEval(PGLOBAL g);
virtual void MakeValueBitmap(void);
protected:
BLKFILAR2(void) {} // Standard constructor not to be used
// Members
ULONG Bmp; // The value bitmap used to test blocks
ULONG Bxp; // Bitmap used when Opc = OP_EQ
}; // end of class BLKFILAR2
/***********************************************************************/
/* Definition of class BLKFILAR2 (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
/* To be used when the bitmap is an array of ULONG bitmaps; */
/***********************************************************************/
class DllExport BLKFILMR2 : public BLKFILARI { /* Arithm. Op Block Filter */
public:
// Constructors
BLKFILMR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
// Methods
virtual int BlockEval(PGLOBAL g);
virtual void MakeValueBitmap(void);
protected:
BLKFILMR2(void) {} // Standard constructor not to be used
// Members
int Nbm; // The number of ULONG bitmaps
int N; // The position of the leftmost ULONG
bool Void; // True if all file blocks can be skipped
PULONG Bmp; // The values bitmaps used to test blocks
PULONG Bxp; // Bit of values <= max value
}; // end of class BLKFILMR2
/***********************************************************************/
/* Definition of class BLKSPCARI (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
/***********************************************************************/
class DllExport BLKSPCARI : public BLOCKFILTER { /* Arithm. Op Block Filter */
public:
// Constructors
BLKSPCARI(PTDBDOS tdbp, int op, PXOB *xp, int bsize);
// Methods
virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
protected:
BLKSPCARI(void) {} // Standard constructor not to be used
// Members
PCOL Cpx; // Point to subquery "constant" column
PVAL Valp; // Points to constant argument Value
int Val; // Constant argument Value
int Bsize; // Table block size
}; // end of class BLKSPCARI
/***********************************************************************/
/* Definition of class BLKFILIN (with Op=OP_IN) */
/***********************************************************************/
class DllExport BLKFILIN : public BLOCKFILTER { // With array arguments.
public:
// Constructors
BLKFILIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp);
// Methods
virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
virtual void MakeValueBitmap(void) {}
protected:
// Member
PDOSCOL Colp; // Points to column argument
PARRAY Arap; // Points to array argument
bool Sorted; // True if the column is sorted
int Type; // Type of array elements
}; // end of class BLKFILIN
/***********************************************************************/
/* Definition of class BLKFILIN2 (with Op=OP_IN) */
/***********************************************************************/
class DllExport BLKFILIN2 : public BLKFILIN { // With array arguments.
public:
// Constructors
BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp);
// Methods
//virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
virtual void MakeValueBitmap(void);
protected:
// Member
int Nbm; // The number of ULONG bitmaps
int N; // The position of the leftmost ULONG
//bool Bitmap; // True for IN operator (temporary)
bool Void; // True if all file blocks can be skipped
bool Invert; // True when Result must be inverted
PULONG Bmp; // The values bitmaps used to test blocks
PULONG Bxp; // Bit of values <= max value
PVAL Valp; // Used while building the bitmaps
}; // end of class BLKFILIN2
/***********************************************************************/
/* Definition of class BLKSPCIN (with Op=OP_IN) Special column */
/***********************************************************************/
class DllExport BLKSPCIN : public BLOCKFILTER { // With array arguments.
public:
// Constructors
BLKSPCIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp, int bsize);
// Methods
virtual void Reset(PGLOBAL g);
virtual int BlockEval(PGLOBAL g);
protected:
// Member
PARRAY Arap; // Points to array argument
int Bsize; // Table block size
}; // end of class BLKSPCIN
// ---------------- Class used in block indexing testing ----------------
#if 0
/***********************************************************************/
/* Definition of class BLOCKINDEX. */
/* Used to test the indexing to joined tables when the foreign key is */
/* a clustered or sorted column. If the table is joined to several */
/* tables, blocks will be chained together. */
/***********************************************************************/
class DllExport BLOCKINDEX : public BLOCK { /* Indexing Test Block */
public:
// Constructors
BLOCKINDEX(PBX nx, PDOSCOL cp, PKXBASE kp);
// Implementation
PBX GetNext(void) {return Next;}
// Methods
void Reset(void);
virtual int BlockEval(PGLOBAL);
virtual void Print(PGLOBAL g, FILE *f, UINT n);
virtual void Print(PGLOBAL g, char *ps, UINT z);
protected:
BLOCKINDEX(void) {} // Standard constructor not to be used
// Members
PBX Next; // To next Index Block
PTDBDOS Tdbp; // To table description block
PDOSCOL Colp; // Clustered foreign key
PKXBASE Kxp; // To Kindex of joined table
bool Sorted; // TRUE if column is sorted
int Type; // Col/Index type
int Result; // Result from evaluation
}; // end of class BLOCKINDEX
/***********************************************************************/
/* Definition of class BLOCKINDX2. (XDB2) */
/***********************************************************************/
class DllExport BLOCKINDX2 : public BLOCKINDEX { /* Indexing Test Block */
public:
// Constructors
BLOCKINDX2(PBX nx, PDOSCOL cp, PKXBASE kp);
// Methods
virtual int BlockEval(PGLOBAL);
protected:
BLOCKINDX2(void) {} // Standard constructor not to be used
// Members
int Nbm; // The number of ULONG bitmaps
PVBLK Dval; // Array of column distinct values
PVBLK Bmap; // Array of block bitmap values
}; // end of class BLOCKINDX2
/***********************************************************************/
/* Definition of class BLKSPCINDX. */
/* Used to test the indexing to joined tables when the foreign key is */
/* the ROWID special column. If the table is joined to several */
/* tables, blocks will be chained together. */
/***********************************************************************/
class DllExport BLKSPCINDX : public BLOCKINDEX { /* Indexing Test Block */
public:
// Constructors
BLKSPCINDX(PBX nx, PTDBDOS tp, PKXBASE kp, int bsize);
// Methods
virtual int BlockEval(PGLOBAL);
protected:
BLKSPCINDX(void) {} // Standard constructor not to be used
// Members
int Bsize; // Table block size
}; // end of class BLOCKINDEX
#endif // 0
#endif // __BLKFIL__
/***************** Filter C++ Class Filter Code (.CPP) *****************/
/* Name: FILTER.CPP Version 3.9 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 1998-2014 */
/* */
/* This file contains the class FILTER function code. */
/***********************************************************************/
/***********************************************************************/
/* Include relevant MariaDB header file. */
/***********************************************************************/
#include "my_global.h"
#include "sql_class.h"
//#include "sql_time.h"
#if defined(WIN32)
//#include <windows.h>
#else // !WIN32
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#endif // !WIN32
/***********************************************************************/
/* Include required application header files */
/* global.h is header containing all global Plug declarations. */
/* plgdbsem.h is header containing the DB applic. declarations. */
/* xobject.h is header containing the XOBJECT derived classes dcls. */
/***********************************************************************/
#include "global.h"
#include "plgdbsem.h"
#include "tabcol.h"
#include "xtable.h"
#include "array.h"
//#include "subquery.h"
#include "filter.h"
//#include "token.h"
//#include "select.h"
#include "xindex.h"
/***********************************************************************/
/* Static variables. */
/***********************************************************************/
extern "C" int trace;
/***********************************************************************/
/* Utility routines. */
/***********************************************************************/
void PlugConvertConstant(PGLOBAL, PVOID&, SHORT&);
PVOID PlugCopyDB(PTABS, PVOID, INT);
void NewPointer(PTABS, PVOID, PVOID);
void AddPointer(PTABS, PVOID);
PPARM MakeParm(PGLOBAL g, PXOB xp)
{
PPARM pp = (PPARM)PlugSubAlloc(g, NULL, sizeof(PARM));
pp->Type = TYPE_XOBJECT;
pp->Value = xp;
pp->Domain = 0;
pp->Next = NULL;
return pp;
} // end of MakeParm
/***********************************************************************/
/* Routines called externally by FILTER function. */
/***********************************************************************/
bool PlugEvalLike(PGLOBAL, LPCSTR, LPCSTR, bool);
//bool ReadSubQuery(PGLOBAL, PSUBQ);
//PSUBQ OpenSubQuery(PGLOBAL, PSQL);
void PlugCloseDB(PGLOBAL, PSQL);
BYTE OpBmp(PGLOBAL g, OPVAL opc);
PARRAY MakeValueArray(PGLOBAL g, PPARM pp);
/***********************************************************************/
/* Routines called externally by CondFilter. */
/***********************************************************************/
PFIL MakeFilter(PGLOBAL g, PFIL fp1, OPVAL vop, PFIL fp2)
{
PFIL filp = new(g) FILTER(g, vop);
filp->Arg(0) = fp1;
filp->Arg(1) = fp2;
if (filp->Convert(g, false))
return NULL;
return filp;
} // end of MakeFilter
PFIL MakeFilter(PGLOBAL g, PCOL *colp, POPER pop, PPARM pfirst, bool neg)
{
PPARM parmp, pp[2];
PFIL fp1, fp2, filp = NULL;
if (pop->Val == OP_IN) {
PARRAY par = MakeValueArray(g, pfirst);
if (par) {
pp[0] = MakeParm(g, colp[0]);
pp[1] = MakeParm(g, par);
fp1 = new(g) FILTER(g, pop, pp);
if (fp1->Convert(g, false))
return NULL;
filp = (neg) ? MakeFilter(g, fp1, OP_NOT, NULL) : fp1;
} // endif par
} else if (pop->Val == OP_XX) { // BETWEEN
if (pfirst && pfirst->Next) {
pp[0] = MakeParm(g, colp[0]);
pp[1] = pfirst;
fp1 = new(g) FILTER(g, neg ? OP_LT : OP_GE, pp);
if (fp1->Convert(g, false))
return NULL;
pp[1] = pfirst->Next;
fp2 = new(g) FILTER(g, neg ? OP_GT : OP_LE, pp);
if (fp2->Convert(g, false))
return NULL;
filp = MakeFilter(g, fp1, neg ? OP_OR : OP_AND, fp2);
} // endif parmp
} else {
parmp = pfirst;
for (int i = 0; i < 2; i++)
if (colp[i]) {
pp[i] = MakeParm(g, colp[i]);
} else {
if (!parmp || parmp->Domain != i)
return NULL; // Logical error, should never happen
pp[i] = parmp;
parmp = parmp->Next;
} // endif colp
filp = new(g) FILTER(g, pop, pp);
if (filp->Convert(g, false))
return NULL;
} // endif's Val
return filp;
} // end of MakeFilter
/* --------------------------- Class FILTER -------------------------- */
/***********************************************************************/
/* FILTER public constructors. */
/***********************************************************************/
FILTER::FILTER(PGLOBAL g, POPER pop, PPARM *tp)
{
Constr(g, pop->Val, pop->Mod, tp);
} // end of FILTER constructor
FILTER::FILTER(PGLOBAL g, OPVAL opc, PPARM *tp)
{
Constr(g, opc, 0, tp);
} // end of FILTER constructor
void FILTER::Constr(PGLOBAL g, OPVAL opc, int opm, PPARM *tp)
{
Next = NULL;
Opc = opc;
Opm = opm;
Bt = 0x00;
for (int i = 0; i < 2; i++) {
Test[i].B_T = TYPE_VOID;
if (tp && tp[i]) {
PlugConvertConstant(g, tp[i]->Value, tp[i]->Type);
#if defined(_DEBUG)
assert(tp[i]->Type == TYPE_XOBJECT);
#endif
Arg(i) = (PXOB)tp[i]->Value;
} else
Arg(i) = pXVOID;
Val(i) = NULL;
Test[i].Conv = FALSE;
} // endfor i
} // end of Constr
/***********************************************************************/
/* FILTER copy constructor. */
/***********************************************************************/
FILTER::FILTER(PFIL fil1)
{
Next = NULL;
Opc = fil1->Opc;
Opm = fil1->Opm;
Test[0] = fil1->Test[0];
Test[1] = fil1->Test[1];
} // end of FILTER copy constructor
/***********************************************************************/
/* Linearize: Does the linearization of the filter tree: */
/* Independent filters (not implied in OR/NOT) will be separated */
/* from others and filtering operations will be automated by */
/* making a list of filter operations in polish operation style. */
/* Returned value points to the first filter of the list, which ends */
/* with the filter that was pointed by the first call argument, */
/* except for separators, in which case a loop is needed to find it. */
/* Note: a loop is used now in all cases (was not for OP_NOT) to be */
/* able to handle the case of filters whose arguments are already */
/* linearized, as it is done in LNA semantic routines. Indeed for */
/* already linearized chains, the first filter is never an OP_AND, */
/* OP_OR or OP_NOT filter, so this function just returns 'this'. */
/***********************************************************************/
PFIL FILTER::Linearize(bool nosep)
{
int i;
PFIL lfp[2], ffp[2] = {NULL,NULL};
switch (Opc) {
case OP_NOT:
if (GetArgType(0) == TYPE_FILTER) {
lfp[0] = (PFIL)Arg(0);
ffp[0] = lfp[0]->Linearize(TRUE);
} /* endif */
if (!ffp[0])
return NULL;
while (lfp[0]->Next) // See Note above
lfp[0] = lfp[0]->Next;
Arg(0) = lfp[0];
lfp[0]->Next = this;
break;
case OP_OR:
nosep = TRUE;
case OP_AND:
for (i = 0; i < 2; i++) {
if (GetArgType(i) == TYPE_FILTER) {
lfp[i] = (PFIL)Arg(i);
ffp[i] = lfp[i]->Linearize(nosep);
} /* endif */
if (!ffp[i])
return NULL;
while (lfp[i]->Next)
lfp[i] = lfp[i]->Next;
Arg(i) = lfp[i];
} /* endfor i */
if (nosep) {
lfp[0]->Next = ffp[1];
lfp[1]->Next = this;
} else {
lfp[0]->Next = this;
Opc = OP_SEP;
Arg(1) = pXVOID;
Next = ffp[1];
} /* endif */
break;
default:
ffp[0] = this;
} /* endswitch */
return (ffp[0]);
} // end of Linearize
/***********************************************************************/
/* Link the fil2 filter chain to the fil1(this) filter chain. */
/***********************************************************************/
PFIL FILTER::Link(PGLOBAL g, PFIL fil2)
{
PFIL fil1;
if (trace)
htrc("Linking filter %p with op=%d... to filter %p with op=%d\n",
this, Opc, fil2, (fil2) ? fil2->Opc : 0);
for (fil1 = this; fil1->Next; fil1 = fil1->Next) ;
if (fil1->Opc == OP_SEP)
fil1->Next = fil2; // Separator already exists
else {
// Create a filter separator and insert it between the chains
PFIL filp = new(g) FILTER(g, OP_SEP);
filp->Arg(0) = fil1;
filp->Next = fil2;
fil1->Next = filp;
} // endelse
return (this);
} // end of Link
/***********************************************************************/
/* Remove eventual last separator from a filter chain. */
/***********************************************************************/
PFIL FILTER::RemoveLastSep(void)
{
PFIL filp, gfp = NULL;
// Find last filter block (filp) and previous one (gfp).
for (filp = this; filp->Next; filp = filp->Next)
gfp = filp;
// If last filter is a separator, remove it
if (filp->Opc == OP_SEP)
if (gfp)
gfp->Next = NULL;
else
return NULL; // chain is now empty
return this;
} // end of RemoveLastSep
/***********************************************************************/
/* CheckColumn: Checks references to Columns in the filter and change */
/* them into references to Col Blocks. */
/* Returns the number of column references or -1 in case of column */
/* not found and -2 in case of unrecoverable error. */
/* WHERE filters are called with *aggreg == AGG_NO. */
/* HAVING filters are called with *aggreg == AGG_ANY. */
/***********************************************************************/
int FILTER::CheckColumn(PGLOBAL g, PSQL sqlp, PXOB &p, int &ag)
{
char errmsg[MAX_STR] = "";
int agg, k, n = 0;
if (trace)
htrc("FILTER CheckColumn: sqlp=%p ag=%d\n", sqlp, ag);
switch (Opc) {
case OP_SEP:
case OP_AND:
case OP_OR:
case OP_NOT:
return 0; // This because we are called for a linearized filter
default:
break;
} // endswitch Opc
// Check all arguments even in case of error for when we are called
// from CheckHaving, where references to an alias raise an error but
// we must have all other arguments to be set.
for (int i = 0; i < 2; i++) {
if (GetArgType(i) == TYPE_FILTER) // Should never happen in
return 0; // current implementation
agg = ag;
if ((k = Arg(i)->CheckColumn(g, sqlp, Arg(i), agg)) < -1) {
return k;
} else if (k < 0) {
if (!*errmsg) // Keep first error message
strcpy(errmsg, g->Message);
} else
n += k;
} // endfor i
if (*errmsg) {
strcpy(g->Message, errmsg);
return -1;
} else
return n;
} // end of CheckColumn
/***********************************************************************/
/* RefNum: Find the number of references correlated sub-queries make */
/* to the columns of the outer query (pointed by sqlp). */
/***********************************************************************/
int FILTER::RefNum(PSQL sqlp)
{
int n = 0;
for (int i = 0; i < 2; i++)
n += Arg(i)->RefNum(sqlp);
return n;
} // end of RefNum
#if 0
/***********************************************************************/
/* CheckSubQuery: see SUBQUERY::CheckSubQuery for comment. */
/***********************************************************************/
PXOB FILTER::CheckSubQuery(PGLOBAL g, PSQL sqlp)
{
switch (Opc) {
case OP_SEP:
case OP_AND:
case OP_OR:
case OP_NOT:
break;
default:
for (int i = 0; i < 2; i++)
if (!(Arg(i) = (PXOB)Arg(i)->CheckSubQuery(g, sqlp)))
return NULL;
break;
} // endswitch Opc
return this;
} // end of CheckSubQuery
/***********************************************************************/
/* SortJoin: function that places ahead of the list the 'good' groups */
/* for join filtering. These are groups with only one filter that */
/* specify equality between two different table columns, at least */
/* one is a table key column. Doing so the join filter will be in */
/* general compatible with linearization of the joined table tree. */
/* This function has been added a further sorting on column indexing. */
/***********************************************************************/
PFIL FILTER::SortJoin(PGLOBAL g)
{
int k;
PCOL cp1, cp2;
PTDBASE tp1, tp2;
PFIL fp, filp, gfp, filstart = this, filjoin = NULL, lfp = NULL;
bool join = TRUE, key = TRUE;
// This routine requires that the chain ends with a separator
// So check for it and eventually add one if necessary
for (filp = this; filp->Next; filp = filp->Next) ;
if (filp->Opc != OP_SEP)
filp->Next = new(g) FILTER(g, OP_SEP);
again:
for (k = (key) ? 0 : MAX_MULT_KEY; k <= MAX_MULT_KEY; k++)
for (gfp = NULL, fp = filp = filstart; filp; filp = filp->Next)
switch (filp->Opc) {
case OP_SEP:
if (join) {
// Put this filter group into the join filter group list.
if (!lfp)
filjoin = fp;
else
lfp->Next = fp;
if (!gfp)
filstart = filp->Next;
else
gfp->Next = filp->Next;
lfp = filp; // last block of join filter list
} else
gfp = filp; // last block of bad filter list
join = TRUE;
fp = filp->Next;
break;
case OP_LOJ:
case OP_ROJ:
case OP_DTJ:
join &= TRUE;
break;
case OP_EQ:
if (join && k > 0 // So specific join operators come first
&& filp->GetArgType(0) == TYPE_COLBLK
&& filp->GetArgType(1) == TYPE_COLBLK) {
cp1 = (PCOL)filp->Arg(0);
cp2 = (PCOL)filp->Arg(1);
tp1 = (PTDBASE)cp1->GetTo_Tdb();
tp2 = (PTDBASE)cp2->GetTo_Tdb();
if (tp1->GetTdb_No() != tp2->GetTdb_No()) {
if (key)
join &= (cp1->GetKey() == k || cp2->GetKey() == k);
else
join &= (tp1->GetColIndex(cp1) || tp2->GetColIndex(cp2));
} else
join = FALSE;
} else
join = FALSE;
break;
default:
join = FALSE;
} // endswitch filp->Opc
if (key) {
key = FALSE;
goto again;
} // endif key
if (filjoin) {
lfp->Next = filstart;
filstart = filjoin;
} // endif filjoin
// Removing last separator is perhaps unuseful, but it was so
return filstart->RemoveLastSep();
} // end of SortJoin
/***********************************************************************/
/* Check that this filter is a good join filter. */
/* If so the opj block will be set accordingly. */
/* opj points to the join block, fprec to the filter block to which */
/* the rest of the chain must be linked in case of success. */
/* teq, tek and tk2 indicates the severity of the tests: */
/* tk2 == TRUE means both columns must be primary keys. */
/* tc2 == TRUE means both args must be columns (not expression). */
/* tek == TRUE means at least one column must be a primary key. */
/* teq == TRUE means the filter operator must be OP_EQ. */
/* tix == TRUE means at least one column must be a simple index key. */
/* thx == TRUE means at least one column must be a leading index key. */
/***********************************************************************/
bool FILTER::FindJoinFilter(POPJOIN opj, PFIL fprec, bool teq, bool tek,
bool tk2, bool tc2, bool tix, bool thx)
{
if (trace)
htrc("FindJoinFilter: opj=%p fprec=%p tests=(%d,%d,%d,%d)\n",
opj, fprec, teq, tek, tk2, tc2);
// Firstly check that this filter is an independent filter
// meaning that it is the only one in its own group.
if (Next && Next->Opc != OP_SEP)
return (Opc < 0);
// Keep only equi-joins and specific joins (Outer and Distinct)
// Normally specific join operators comme first because they have
// been placed first by SortJoin.
if (teq && Opc > OP_EQ)
return FALSE;
// We have a candidate for join filter, now check that it
// fulfil the requirement about its operands, to point to
// columns of respectively the two TDB's of that join.
int col1 = 0, col2 = 0;
bool key = tk2;
bool idx = FALSE, ihx = FALSE;
PIXDEF pdx;
for (int i = 0; i < 2; i++)
if (GetArgType(i) == TYPE_COLBLK) {
PCOL colp = (PCOL)Arg(i);
if (tk2)
key &= (colp->IsKey());
else
key |= (colp->IsKey());
pdx = ((PTDBASE)colp->GetTo_Tdb())->GetColIndex(colp);
idx |= (pdx && pdx->GetNparts() == 1);
ihx |= (pdx != NULL);
if (colp->VerifyColumn(opj->GetTbx1()))
col1 = i + 1;
else if (colp->VerifyColumn(opj->GetTbx2()))
col2 = i + 1;
} else if (!tc2 && GetArgType(i) != TYPE_CONST) {
PXOB xp = Arg(i);
if (xp->VerifyColumn(opj->GetTbx1()))
col1 = i + 1;
else if (xp->VerifyColumn(opj->GetTbx2()))
col2 = i + 1;
} else
return (Opc < 0);
if (col1 == 0 || col2 == 0)
return (Opc < 0);
if (((tek && !key) || (tix && !idx) || (thx && !ihx)) && Opc != OP_DTJ)
return FALSE;
// This is the join filter, set the join block.
if (col1 == 1) {
opj->SetCol1(Arg(0));
opj->SetCol2(Arg(1));
} else {
opj->SetCol1(Arg(1));
opj->SetCol2(Arg(0));
switch (Opc) {
// case OP_GT: Opc = OP_LT; break;
// case OP_LT: Opc = OP_GT; break;
// case OP_GE: Opc = OP_LE; break;
// case OP_LE: Opc = OP_GE; break;
case OP_LOJ:
case OP_ROJ:
case OP_DTJ:
// For expended join operators, the filter must indicate
// the way the join should be done, and not the order of
// appearance of tables in the table list (which is kept
// because tables are sorted in AddTdb). Therefore the
// join is inversed, not the filter.
opj->InverseJoin();
default: break;
} // endswitch Opc
} // endif col1
if (Opc < 0) {
// For join operators, special processing is needed
int knum = 0;
PFIL fp;
switch (Opc) {
case OP_LOJ:
opj->SetJtype(JT_LEFT);
knum = opj->GetCol2()->GetKey();
break;
case OP_ROJ:
opj->SetJtype(JT_RIGHT);
knum = opj->GetCol1()->GetKey();
break;
case OP_DTJ:
for (knum = 1, fp = this->Next; fp; fp = fp->Next)
if (fp->Opc == OP_DTJ)
knum++;
else if (fp->Opc != OP_SEP)
break;
opj->SetJtype(JT_DISTINCT);
opj->GetCol2()->SetKey(knum);
break;
default:
break;
} // endswitch Opc
if (knum > 1) {
// Lets take care of a multiple key join
// We do a minimum of checking here as it will done later
int k = 1;
OPVAL op;
BYTE tmp[sizeof(Test[0])];
for (fp = this->Next; k < knum && fp; fp = fp->Next) {
switch (op = fp->Opc) {
case OP_SEP:
continue;
case OP_LOJ:
if (Opc == OP_ROJ) {
op = Opc;
memcpy(tmp, &fp->Test[0], sizeof(Test[0]));
fp->Test[0] = fp->Test[1];
memcpy(&fp->Test[1], tmp, sizeof(Test[0]));
} // endif Opc
k++;
break;
case OP_ROJ:
if (Opc == OP_LOJ) {
op = Opc;
memcpy(tmp, &fp->Test[0], sizeof(Test[0]));
fp->Test[0] = fp->Test[1];
memcpy(&fp->Test[1], tmp, sizeof(Test[0]));
} // endif Opc
k++;
break;
case OP_DTJ:
if (op == Opc && fp->GetArgType(1) == TYPE_COLBLK)
((PCOL)fp->Arg(1))->SetKey(knum);
k++;
break;
default:
break;
} // endswitch op
if (op != Opc)
return TRUE;
fp->Opc = OP_EQ;
} // endfor fp
} // endif k
Opc = OP_EQ;
} // endif Opc
// Set the join filter operator
opj->SetOpc(Opc);
// Now mark the columns involved in the join filter because
// this information will be used by the linearize program.
// Note: this should be replaced in the future by something
// enabling to mark tables as Parent or Child.
opj->GetCol1()->MarkCol(U_J_EXT);
opj->GetCol2()->MarkCol(U_J_EXT);
// Remove the filter from the filter chain. If the filter is
// not last in the chain, also remove the SEP filter after it.
if (Next) // Next->Opc == OP_SEP
Next = Next->Next;
if (!fprec)
opj->SetFilter(Next);
else
fprec->Next = Next;
return FALSE;
} // end of FindJoinFilter
/***********************************************************************/
/* CheckHaving: check and process a filter of an HAVING clause. */
/* Check references to Columns and Functions in the filter. */
/* All these references can correspond to items existing in the */
/* SELECT list, else if it is a function, allocate a SELECT block */
/* to be added to the To_Sel list (non projected blocks). */
/***********************************************************************/
bool FILTER::CheckHaving(PGLOBAL g, PSQL sqlp)
{
int agg = AGG_ANY;
PXOB xp;
//sqlp->SetOk(TRUE); // Ok to look into outer queries for filters
switch (Opc) {
case OP_SEP:
case OP_AND:
case OP_OR:
case OP_NOT:
return FALSE;
default:
if (CheckColumn(g, sqlp, xp, agg) < -1)
return TRUE; // Unrecovable error
break;
} // endswitch Opc
sqlp->SetOk(TRUE); // Ok to look into outer queries for filters
for (int i = 0; i < 2; i++)
if (!(xp = Arg(i)->SetSelect(g, sqlp, TRUE)))
return TRUE;
else if (xp != Arg(i)) {
Arg(i) = xp;
Val(i) = Arg(i)->GetValue();
} // endif
sqlp->SetOk(FALSE);
return FALSE;
} // end of CheckHaving
#endif // 0
/***********************************************************************/
/* Used while building a table index. This function split the filter */
/* attached to the tdbp table into the local and not local part. */
/* The local filter is used to restrict the size of the index and the */
/* not local part remains to be executed later. This has been added */
/* recently and not only to improve the performance but chiefly to */
/* avoid loosing rows when processing distinct joins. */
/* Returns: */
/* 0: the whole filter is local (both arguments are) */
/* 1: the whole filter is not local */
/* 2: the filter was split in local (attached to fp[0]) and */
/* not local (attached to fp[1]). */
/***********************************************************************/
int FILTER::SplitFilter(PFIL *fp)
{
int i, rc[2];
if (Opc == OP_AND) {
for (i = 0; i < 2; i++)
rc[i] = ((PFIL)Arg(i))->SplitFilter(fp);
// Filter first argument should never be split because of the
// algorithm used to de-linearize the filter.
assert(rc[0] != 2);
if (rc[0] != rc[1]) {
// Splitting to be done
if (rc[1] == 2) {
// 2nd argument already split, add 1st to the proper filter
assert(fp[*rc]);
Arg(1) = fp[*rc];
Val(1) = fp[*rc]->GetValue();
fp[*rc] = this;
} else for (i = 0; i < 2; i++) {
// Split the filter arguments
assert(!fp[rc[i]]);
fp[rc[i]] = (PFIL)Arg(i);
} // endfor i
*rc = 2;
} // endif rc
} else
*rc = (CheckLocal(NULL)) ? 0 : 1;
return *rc;
} // end of SplitFilter
/***********************************************************************/
/* This function is called when making a Kindex after the filter was */
/* split in local and nolocal part in the case of many to many joins. */
/* Indeed the whole filter must be reconstructed to take care of next */
/* same values when doing the explosive join. In addition, the link */
/* must be done respecting the way filters are de-linearized, no AND */
/* filter in the first argument of an AND filter, because this is */
/* expected to be true if SplitFilter is used again on this filter. */
/***********************************************************************/
PFIL FILTER::LinkFilter(PGLOBAL g, PFIL fp2)
{
PFIL fp1, filp, filand = NULL;
assert(fp2); // Test must be made by caller
// Find where the new AND filter must be attached
for (fp1 = this; fp1->Opc == OP_AND; fp1 = (PFIL)fp1->Arg(1))
filand = fp1;
filp = new(g) FILTER(g, OP_AND);
filp->Arg(0) = fp1;
filp->Val(0) = fp1->GetValue();
filp->Test[0].B_T = TYPE_INT;
filp->Test[0].Conv = FALSE;
filp->Arg(1) = fp2;
filp->Val(1) = fp2->GetValue();
filp->Test[1].B_T = TYPE_INT;
filp->Test[1].Conv = FALSE;
filp->Value = AllocateValue(g, TYPE_INT);
if (filand) {
// filp must be inserted here
filand->Arg(1) = filp;
filand->Val(1) = filp->GetValue();
filp = this;
} // endif filand
return filp;
} // end of LinkFilter
/***********************************************************************/
/* Checks whether filter contains reference to a previous table that */
/* is not logically joined to the currently openned table, or whether */
/* it is a Sub-Select filter. In any case, local is set to FALSE. */
/* Note: This function is now applied to de-linearized filters. */
/***********************************************************************/
bool FILTER::CheckLocal(PTDB tdbp)
{
bool local = TRUE;
if (trace) {
if (tdbp)
htrc("CheckLocal: filp=%p R%d\n", this, tdbp->GetTdb_No());
else
htrc("CheckLocal: filp=%p\n", this);
} // endif trace
for (int i = 0; local && i < 2; i++)
local = Arg(i)->CheckLocal(tdbp);
if (trace)
htrc("FCL: returning %d\n", local);
return (local);
} // end of CheckLocal
/***********************************************************************/
/* This routine is used to split the filter attached to the tdbp */
/* table into the local and not local part where "local" means that */
/* it applies "locally" to the FILEID special column with crit = 2 */
/* and to the SERVID and/or TABID special columns with crit = 3. */
/* Returns: */
/* 0: the whole filter is local (both arguments are) */
/* 1: the whole filter is not local */
/* 2: the filter was split in local (attached to fp[0]) and */
/* not local (attached to fp[1]). */
/* Note: "Locally" means that the "local" filter can be evaluated */
/* before opening the table. This implies that the special column be */
/* compared only with constants and that this filter not to be or'ed */
/* with a non "local" filter. */
/***********************************************************************/
int FILTER::SplitFilter(PFIL *fp, PTDB tp, int crit)
{
int i, rc[2];
if (Opc == OP_AND) {
for (i = 0; i < 2; i++)
rc[i] = ((PFIL)Arg(i))->SplitFilter(fp, tp, crit);
// Filter first argument should never be split because of the
// algorithm used to de-linearize the filter.
assert(rc[0] != 2);
if (rc[0] != rc[1]) {
// Splitting to be done
if (rc[1] == 2) {
// 2nd argument already split, add 1st to the proper filter
assert(fp[*rc]);
Arg(1) = fp[*rc];
Val(1) = fp[*rc]->GetValue();
fp[*rc] = this;
} else for (i = 0; i < 2; i++) {
// Split the filter arguments
assert(!fp[rc[i]]);
fp[rc[i]] = (PFIL)Arg(i);
} // endfor i
*rc = 2;
} // endif rc
} else
*rc = (CheckSpcCol(tp, crit) == 1) ? 0 : 1;
return *rc;
} // end of SplitFilter
/***********************************************************************/
/* Checks whether filter contains only references to FILEID, SERVID, */
/* or TABID with constants or pseudo constants. */
/***********************************************************************/
int FILTER::CheckSpcCol(PTDB tdbp, int n)
{
int n1 = Arg(0)->CheckSpcCol(tdbp, n);
int n2 = Arg(1)->CheckSpcCol(tdbp, n);
return max(n1, n2);
} // end of CheckSpcCol
/***********************************************************************/
/* Reset the filter arguments to non evaluated yet. */
/***********************************************************************/
void FILTER::Reset(void)
{
for (int i = 0; i < 2; i++)
Arg(i)->Reset();
} // end of Reset
/***********************************************************************/
/* Init: called when reinitializing a query (Correlated subqueries) */
/***********************************************************************/
bool FILTER::Init(PGLOBAL g)
{
for (int i = 0; i < 2; i++)
Arg(i)->Init(g);
return FALSE;
} // end of Init
/***********************************************************************/
/* Convert: does all filter setting and conversions. */
/* (having = TRUE for Having Clauses, FALSE for Where Clauses) */
/* Note: hierarchy of types is implied by the ConvertType */
/* function, currently FLOAT, int, STRING and TOKEN. */
/* Returns FALSE if successful or TRUE in case of error. */
/* Note on result type for filters: */
/* Currently the result type is of TYPE_INT (should be TYPE_BOOL). */
/* This avoids to introduce a new type and perhaps will permit */
/* conversions. However the boolean operators will result in a */
/* boolean int result, meaning that result shall be only 0 or 1 . */
/***********************************************************************/
bool FILTER::Convert(PGLOBAL g, bool having)
{
int i, comtype = TYPE_ERROR;
if (trace)
htrc("converting(?) %s %p opc=%d\n",
(having) ? "having" : "filter", this, Opc);
for (i = 0; i < 2; i++) {
switch (GetArgType(i)) {
case TYPE_COLBLK:
if (((PCOL)Arg(i))->InitValue(g))
return TRUE;
break;
case TYPE_ARRAY:
if ((Opc != OP_IN && !Opm) || i == 0) {
strcpy(g->Message, MSG(BAD_ARRAY_OPER));
return TRUE;
} // endif
if (((PARRAY)Arg(i))->Sort(g)) // Sort the array
return TRUE; // Error
break;
case TYPE_VOID:
if (i == 1) {
Val(0) = Arg(0)->GetValue();
goto TEST; // Filter has only one argument
} // endif i
strcpy(g->Message, MSG(VOID_FIRST_ARG));
return TRUE;
} // endswitch
if (trace)
htrc("Filter(%d): Arg type=%d\n", i, GetArgType(i));
// Set default values
Test[i].B_T = Arg(i)->GetResultType();
Test[i].Conv = FALSE;
// Special case of the LIKE operator.
if (Opc == OP_LIKE) {
if (!IsTypeChar((int)Test[i].B_T)) {
sprintf(g->Message, MSG(BAD_TYPE_LIKE), i, Test[i].B_T);
return TRUE;
} // endif
comtype = TYPE_STRING;
} else {
// Set the common type for both (eventually converted) arguments
int argtyp = Test[i].B_T;
if (GetArgType(i) == TYPE_CONST && argtyp == TYPE_INT) {
// If possible, downcast the type to smaller types to avoid
// convertion as much as possible.
int n = Arg(i)->GetValue()->GetIntValue();
if (n >= INT_MIN8 && n <= INT_MAX8)
argtyp = TYPE_TINY;
else if (n >= INT_MIN16 && n <= INT_MAX16)
argtyp = TYPE_SHORT;
} else if (GetArgType(i) == TYPE_ARRAY) {
// If possible, downcast int arrays target type to TYPE_SHORT
// to take care of filters written like shortcol in (34,35,36).
if (((PARRAY)Arg(i))->CanBeShort())
argtyp = TYPE_SHORT;
} // endif TYPE_CONST
comtype = ConvertType(comtype, argtyp, CNV_ANY);
} // endif Opc
if (comtype == TYPE_ERROR) {
strcpy(g->Message, MSG(ILL_FILTER_CONV));
return TRUE;
} // endif
if (trace)
htrc(" comtype=%d, B_T(%d)=%d Val(%d)=%p\n",
comtype, i, Test[i].B_T, i, Val(i));
} // endfor i
// Set or allocate the filter argument values and buffers
for (i = 0; i < 2; i++) {
if (trace)
htrc(" conv type %d ? i=%d B_T=%d comtype=%d\n",
GetArgType(i), i, Test[i].B_T, comtype);
if (Test[i].B_T == comtype) {
// No conversion, set Value to argument Value
Val(i) = Arg(i)->GetValue();
#if defined(_DEBUG)
assert (Val(i) && Val(i)->GetType() == Test[i].B_T);
#endif
} else {
// Conversion between filter arguments to be done.
// Note that the argument must be converted, not only the
// buffer and buffer type, so GetArgType() returns the new type.
switch (GetArgType(i)) {
case TYPE_CONST:
if (comtype == TYPE_DATE && Test[i].B_T == TYPE_STRING) {
// Convert according to the format of the other argument
Val(i) = AllocateValue(g, comtype, Arg(i)->GetLength());
if (((DTVAL*)Val(i))->SetFormat(g, Val(1-i)))
return TRUE;
Val(i)->SetValue_psz(Arg(i)->GetValue()->GetCharValue());
} else {
((PCONST)Arg(i))->Convert(g, comtype);
Val(i) = Arg(i)->GetValue();
} // endif comtype
break;
case TYPE_ARRAY:
// Conversion PSZ or int array to int or double FLOAT.
if (((PARRAY)Arg(i))->Convert(g, comtype, Val(i-1)) == TYPE_ERROR)
return TRUE;
break;
case TYPE_FILTER:
strcpy(g->Message, MSG(UNMATCH_FIL_ARG));
return TRUE;
default:
// Conversion from Column, Select/Func, Expr, Scalfnc...
// The argument requires conversion during Eval
// A separate Value block must be allocated.
// Note: the test on comtype is to prevent unnecessary
// domain initialization and get the correct length in
// case of Token -> numeric conversion.
Val(i) = AllocateValue(g, comtype, (comtype == TYPE_STRING)
? Arg(i)->GetLengthEx() : Arg(i)->GetLength());
if (comtype == TYPE_DATE && Test[i].B_T == TYPE_STRING)
// Convert according to the format of the other argument
if (((DTVAL*)Val(i))->SetFormat(g, Val(1 - i)))
return TRUE;
Test[i].Conv = TRUE;
break;
} // endswitch GetType
Test[i].B_T = comtype;
} // endif comtype
} // endfor i
// Last check to be sure all is correct.
if (Test[0].B_T != Test[1].B_T) {
sprintf(g->Message, MSG(BAD_FILTER_CONV), Test[0].B_T, Test[1].B_T);
return TRUE;
//} else if (Test[0].B_T == TYPE_LIST &&
// ((LSTVAL*)Val(0))->GetN() != ((LSTVAL*)Val(1))->GetN()) {
// sprintf(g->Message, MSG(ROW_ARGNB_ERR),
// ((LSTVAL*)Val(0))->GetN(), ((LSTVAL*)Val(1))->GetN());
// return TRUE;
} // endif's B_T
TEST: // Test for possible Eval optimization
if (trace)
htrc("Filp %p op=%d argtypes=(%d,%d)\n",
this, Opc, GetArgType(0), GetArgType(1));
// Check whether we have a "simple" filter and in that case
// change its class so an optimized Eval function will be used
if (!Test[0].Conv && !Test[1].Conv) {
if (Opm) switch (Opc) {
case OP_EQ:
case OP_NE:
case OP_GT:
case OP_GE:
case OP_LT:
case OP_LE:
if (GetArgType(1) != TYPE_ARRAY)
break; // On subquery, do standard processing
// Change the FILTER class to FILTERIN
new(this) FILTERIN;
break;
default:
break;
} // endswitch Opc
else switch (Opc) {
#if 0
case OP_EQ: new(this) FILTEREQ; break;
case OP_NE: new(this) FILTERNE; break;
case OP_GT: new(this) FILTERGT; break;
case OP_GE: new(this) FILTERGE; break;
case OP_LT: new(this) FILTERLT; break;
case OP_LE: new(this) FILTERLE; break;
#endif // 0
case OP_EQ:
case OP_NE:
case OP_GT:
case OP_GE:
case OP_LT:
case OP_LE: new(this) FILTERCMP(g); break;
case OP_AND: new(this) FILTERAND; break;
case OP_OR: new(this) FILTEROR; break;
case OP_NOT: new(this) FILTERNOT; break;
case OP_EXIST:
if (GetArgType(1) == TYPE_VOID) {
// For EXISTS it is the first argument that should be null
Arg(1) = Arg(0);
Arg(0) = pXVOID;
} // endif void
// pass thru
case OP_IN:
// For IN operator do optimize if operand is an array
if (GetArgType(1) != TYPE_ARRAY)
break; // IN on subquery, do standard processing
// Change the FILTER class to FILTERIN
new(this) FILTERIN;
break;
default:
break;
} // endswitch Opc
} // endif Conv
// The result value (should be TYPE_BOOL ???)
Value = AllocateValue(g, TYPE_INT);
return FALSE;
} // end of Convert
/***********************************************************************/
/* Eval: Compute filter result value. */
/* New algorithm: evaluation is now done from the root for each group */
/* so Eval is now a recursive process for FILTER operands. */
/***********************************************************************/
bool FILTER::Eval(PGLOBAL g)
{
int i; // n = 0;
//PSUBQ subp = NULL;
PARRAY ap = NULL;
PDBUSER dup = PlgGetUser(g);
if (Opc <= OP_XX)
for (i = 0; i < 2; i++)
// Evaluate the object and eventually convert it.
if (Arg(i)->Eval(g))
return TRUE;
else if (Test[i].Conv)
Val(i)->SetValue_pval(Arg(i)->GetValue());
if (trace)
htrc(" Filter: op=%d type=%d %d B_T=%d %d val=%p %p\n",
Opc, GetArgType(0), GetArgType(1), Test[0].B_T, Test[1].B_T,
Val(0), Val(1));
// Main switch on filtering according to operator type.
switch (Opc) {
case OP_EQ:
case OP_NE:
case OP_GT:
case OP_GE:
case OP_LT:
case OP_LE:
if (!Opm) {
// Comparison boolean operators.
#if defined(_DEBUG)
if (Val(0)->GetType() != Val(1)->GetType())
goto FilterError;
#endif
// Compare the two arguments
// New algorithm to take care of TYPE_LIST
Bt = OpBmp(g, Opc);
Value->SetValue_bool(!(Val(0)->TestValue(Val(1)) & Bt));
break;
} // endif Opm
// For modified operators, pass thru
case OP_IN:
case OP_EXIST:
// For IN operations, special processing is done here
switch (GetArgType(1)) {
case TYPE_ARRAY:
ap = (PARRAY)Arg(1);
break;
default:
strcpy(g->Message, MSG(IN_WITHOUT_SUB));
goto FilterError;
} // endswitch Type
if (trace) {
htrc(" IN filtering: ap=%p\n", ap);
if (ap)
htrc(" Array: type=%d size=%d other_type=%d\n",
ap->GetType(), ap->GetSize(), Test[0].B_T);
} // endif trace
/*****************************************************************/
/* Implementation note: The Find function is now able to do a */
/* conversion but limited to SHORT, int, and FLOAT arrays. */
/*****************************************************************/
// Value->SetValue_bool(ap->Find(g, Val(0)));
if (ap)
Value->SetValue_bool(ap->FilTest(g, Val(0), Opc, Opm));
break;
case OP_LIKE:
#if defined(_DEBUG)
if (!IsTypeChar((int)Test[0].B_T) || !IsTypeChar((int)Test[1].B_T))
goto FilterError;
#endif
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue_bool(PlugEvalLike(g, Val(0)->GetCharValue(),
Val(1)->GetCharValue(),
Val(0)->IsCi()));
break;
case OP_AND:
#if defined(_DEBUG)
if (Test[0].B_T != TYPE_INT || Test[1].B_T != TYPE_INT)
goto FilterError;
#endif
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue(Val(0)->GetIntValue());
if (!Value->GetIntValue())
return FALSE; // No need to evaluate 2nd argument
if (Arg(1)->Eval(g))
return TRUE;
Value->SetValue(Val(1)->GetIntValue());
break;
case OP_OR:
#if defined(_DEBUG)
if (Test[0].B_T != TYPE_INT || Test[1].B_T != TYPE_INT)
goto FilterError;
#endif
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue(Val(0)->GetIntValue());
if (Value->GetIntValue())
return FALSE; // No need to evaluate 2nd argument
if (Arg(1)->Eval(g))
return TRUE;
Value->SetValue(Val(1)->GetIntValue());
break;
case OP_NOT:
#if defined(_DEBUG)
if (Test[0].B_T != TYPE_INT) // Should be type bool ???
goto FilterError;
#endif
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue_bool(!Val(0)->GetIntValue());
break;
case OP_SEP: // No more used while evaluating
default:
goto FilterError;
} // endswitch Opc
if (trace)
htrc("Eval: filter %p Opc=%d result=%d\n",
this, Opc, Value->GetIntValue());
return FALSE;
FilterError:
sprintf(g->Message, MSG(BAD_FILTER),
Opc, Test[0].B_T, Test[1].B_T, GetArgType(0), GetArgType(1));
return TRUE;
} // end of Eval
#if 0
/***********************************************************************/
/* Called by PlugCopyDB to make a copy of a (linearized) filter chain.*/
/***********************************************************************/
PFIL FILTER::Copy(PTABS t)
{
int i;
PFIL fil1, fil2, newfilchain = NULL, fprec = NULL;
for (fil1 = this; fil1; fil1 = fil1->Next) {
fil2 = new(t->G) FILTER(fil1);
if (!fprec)
newfilchain = fil2;
else
fprec->Next = fil2;
NewPointer(t, fil1, fil2);
for (i = 0; i < 2; i++)
if (fil1->GetArgType(i) == TYPE_COLBLK ||
fil1->GetArgType(i) == TYPE_FILTER)
AddPointer(t, &fil2->Arg(i));
fprec = fil2;
} /* endfor fil1 */
return newfilchain;
} // end of Copy
#endif // 0
/*********************************************************************/
/* Make file output of FILTER contents. */
/*********************************************************************/
void FILTER::Print(PGLOBAL g, FILE *f, UINT n)
{
char m[64];
memset(m, ' ', n); // Make margin string
m[n] = '\0';
bool lin = (Next != NULL); // lin == TRUE if linearized
for (PFIL fp = this; fp; fp = fp->Next) {
fprintf(f, "%sFILTER: at %p opc=%d lin=%d result=%d\n",
m, fp, fp->Opc, lin,
(Value) ? Value->GetIntValue() : 0);
for (int i = 0; i < 2; i++) {
fprintf(f, "%s Arg(%d) type=%d value=%p B_T=%d val=%p\n",
m, i, fp->GetArgType(i), fp->Arg(i),
fp->Test[i].B_T, fp->Val(i));
if (lin && fp->GetArgType(i) == TYPE_FILTER)
fprintf(f, "%s Filter at %p\n", m, fp->Arg(i));
else
fp->Arg(i)->Print(g, f, n + 2);
} // endfor i
} // endfor fp
} // end of Print
/***********************************************************************/
/* Make string output of TABLE contents (z should be checked). */
/***********************************************************************/
void FILTER::Print(PGLOBAL g, char *ps, UINT z)
{
#define FLEN 100
typedef struct _bc {
struct _bc *Next;
char Cold[FLEN+1];
} BC, *PBC;
char *p;
int n;
PFIL fp;
PBC bxp, bcp = NULL;
*ps = '\0';
for (fp = this; fp && z > 0; fp = fp->Next) {
if (fp->Opc < OP_CNC || fp->Opc == OP_IN || fp->Opc == OP_NULL
|| fp->Opc == OP_LIKE || fp->Opc == OP_EXIST) {
if (!(bxp = new BC)) {
strncat(ps, "Filter(s)", z);
return;
} /* endif */
bxp->Next = bcp;
bcp = bxp;
p = bcp->Cold;
n = FLEN;
fp->Arg(0)->Print(g, p, n);
n = FLEN - strlen(p);
switch (fp->Opc) {
case OP_EQ:
strncat(bcp->Cold, "=", n);
break;
case OP_NE:
strncat(bcp->Cold, "!=", n);
break;
case OP_GT:
strncat(bcp->Cold, ">", n);
break;
case OP_GE:
strncat(bcp->Cold, ">=", n);
break;
case OP_LT:
strncat(bcp->Cold, "<", n);
break;
case OP_LE:
strncat(bcp->Cold, "<=", n);
break;
case OP_IN:
strncat(bcp->Cold, " in ", n);
break;
case OP_NULL:
strncat(bcp->Cold, " is null", n);
break;
case OP_LIKE:
strncat(bcp->Cold, " like ", n);
break;
case OP_EXIST:
strncat(bcp->Cold, " exists ", n);
break;
case OP_AND:
strncat(bcp->Cold, " and ", n);
break;
case OP_OR:
strncat(bcp->Cold, " or ", n);
break;
default:
strncat(bcp->Cold, "?", n);
} // endswitch Opc
n = FLEN - strlen(p);
p += strlen(p);
fp->Arg(1)->Print(g, p, n);
} else
if (!bcp) {
strncat(ps, "???", z);
z -= 3;
} else
switch (fp->Opc) {
case OP_SEP: // Filter list separator
strncat(ps, bcp->Cold, z);
z -= strlen(bcp->Cold);
strncat(ps, ";", z--);
bxp = bcp->Next;
delete bcp;
bcp = bxp;
break;
case OP_NOT: // Filter NOT operator
for (n = min((int)strlen(bcp->Cold), FLEN-3); n >= 0; n--)
bcp->Cold[n+2] = bcp->Cold[n];
bcp->Cold[0] = '^';
bcp->Cold[1] = '(';
strcat(bcp->Cold, ")");
break;
default:
for (n = min((int)strlen(bcp->Cold), FLEN-4); n >= 0; n--)
bcp->Cold[n+3] = bcp->Cold[n];
bcp->Cold[0] = ')';
switch (fp->Opc) {
case OP_AND: bcp->Cold[1] = '&'; break;
case OP_OR: bcp->Cold[1] = '|'; break;
default: bcp->Cold[1] = '?';
} // endswitch
bcp->Cold[2] = '(';
strcat(bcp->Cold, ")");
bxp = bcp->Next;
for (n = min((int)strlen(bxp->Cold), FLEN-1); n >= 0; n--)
bxp->Cold[n+1] = bxp->Cold[n];
bxp->Cold[0] = '(';
strncat(bxp->Cold, bcp->Cold, FLEN-strlen(bxp->Cold));
delete bcp;
bcp = bxp;
} // endswitch
} // endfor fp
n = 0;
if (!bcp)
strncat(ps, "Null-Filter", z);
else do {
if (z > 0) {
if (n++ > 0) {
strncat(ps, "*?*", z);
z = max(0, (int)z-3);
} // endif
strncat(ps, bcp->Cold, z);
z -= strlen(bcp->Cold);
} // endif
bxp = bcp->Next;
delete bcp;
bcp = bxp;
} while (bcp); // enddo
} // end of Print
/* -------------------- Derived Classes Functions -------------------- */
/***********************************************************************/
/* FILTERCMP constructor. */
/***********************************************************************/
FILTERCMP::FILTERCMP(PGLOBAL g)
{
Bt = OpBmp(g, Opc);
} // end of FILTERCMP constructor
/***********************************************************************/
/* Eval: Compute result value for comparison operators. */
/***********************************************************************/
bool FILTERCMP::Eval(PGLOBAL g)
{
if (Arg(0)->Eval(g) || Arg(1)->Eval(g))
return TRUE;
Value->SetValue_bool(!(Val(0)->TestValue(Val(1)) & Bt));
return FALSE;
} // end of Eval
/***********************************************************************/
/* Eval: Compute result value for AND filters. */
/***********************************************************************/
bool FILTERAND::Eval(PGLOBAL g)
{
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue(Val(0)->GetIntValue());
if (!Value->GetIntValue())
return FALSE; // No need to evaluate 2nd argument
if (Arg(1)->Eval(g))
return TRUE;
Value->SetValue(Val(1)->GetIntValue());
return FALSE;
} // end of Eval
/***********************************************************************/
/* Eval: Compute result value for OR filters. */
/***********************************************************************/
bool FILTEROR::Eval(PGLOBAL g)
{
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue(Val(0)->GetIntValue());
if (Value->GetIntValue())
return FALSE; // No need to evaluate 2nd argument
if (Arg(1)->Eval(g))
return TRUE;
Value->SetValue(Val(1)->GetIntValue());
return FALSE;
} // end of Eval
/***********************************************************************/
/* Eval: Compute result value for NOT filters. */
/***********************************************************************/
bool FILTERNOT::Eval(PGLOBAL g)
{
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue_bool(!Val(0)->GetIntValue());
return FALSE;
} // end of Eval
/***********************************************************************/
/* Eval: Compute result value for IN filters. */
/***********************************************************************/
bool FILTERIN::Eval(PGLOBAL g)
{
if (Arg(0)->Eval(g))
return TRUE;
Value->SetValue_bool(((PARRAY)Arg(1))->FilTest(g, Val(0), Opc, Opm));
return FALSE;
} // end of Eval
/***********************************************************************/
/* FILTERTRUE does nothing and returns TRUE. */
/***********************************************************************/
void FILTERTRUE::Reset(void)
{
} // end of Reset
bool FILTERTRUE::Eval(PGLOBAL)
{
return FALSE;
} // end of Eval
/* ------------------------- Friend Functions ------------------------ */
#if 0
/***********************************************************************/
/* Prepare: prepare a filter for execution. This implies two things: */
/* 1) de-linearize the filter to be able to evaluate it recursively. */
/* This permit to conditionally evaluate only the first argument */
/* of OP_OR and OP_AND filters without having to pass by an */
/* intermediate Apply function (as this has a performance cost). */
/* 2) do all the necessary conversion for all filter block arguments. */
/***********************************************************************/
PFIL PrepareFilter(PGLOBAL g, PFIL fp, bool having)
{
PFIL filp = NULL;
if (trace)
htrc("PrepareFilter: fp=%p having=%d\n", fp, having);
//if (fp)
// fp->Print(g, debug, 0);
while (fp) {
if (fp->Opc == OP_SEP)
// If separator is not last transform it into an AND filter
if (fp->Next) {
filp = PrepareFilter(g, fp->Next, having);
fp->Arg(1) = filp;
fp->Opc = OP_AND;
fp->Next = NULL; // This will end the loop
} else
break; // Remove eventual ending separator(s)
// if (fp->Convert(g, having))
// longjmp(g->jumper[g->jump_level], TYPE_FILTER);
filp = fp;
fp = fp->Next;
filp->Next = NULL;
} // endwhile
if (trace)
htrc(" returning filp=%p\n", filp);
//if (filp)
// filp->Print(g, debug, 0);
return filp;
} // end of PrepareFilter
#endif // 0
/***********************************************************************/
/* ApplyFilter: Apply filtering for a table (where or having clause). */
/* New algorithm: evaluate from the root a de-linearized filter so */
/* AND/OR clauses can be optimized throughout the whole tree. */
/***********************************************************************/
DllExport bool ApplyFilter(PGLOBAL g, PFIL filp, PTDB tdbp)
{
if (!filp)
return TRUE;
// Must be done for null tables
filp->Reset();
//if (tdbp && tdbp->IsNull())
// return TRUE;
if (filp->Eval(g))
longjmp(g->jumper[g->jump_level], TYPE_FILTER);
if (trace)
htrc("PlugFilter filp=%p result=%d\n",
filp, filp->GetResult());
return filp->GetResult();
} // end of ApplyFilter
/*************** Filter H Declares Source Code File (.H) ***************/
/* Name: FILTER.H Version 1.2 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2010-2012 */
/* */
/* This file contains the FILTER and derived classes declares. */
/***********************************************************************/
/***********************************************************************/
/* Include required application header files */
/***********************************************************************/
#include "xobject.h"
/***********************************************************************/
/* Utilities for WHERE condition building. */
/***********************************************************************/
PFIL MakeFilter(PGLOBAL g, PFIL filp, OPVAL vop, PFIL fp);
PFIL MakeFilter(PGLOBAL g, PCOL *colp, POPER pop, PPARM pfirst, bool neg);
/***********************************************************************/
/* Definition of class FILTER with all its method functions. */
/* Note: Most virtual implementation functions are not in use yet */
/* but could be in future system evolution. */
/***********************************************************************/
class DllExport FILTER : public XOBJECT { /* Filter description block */
//friend PFIL PrepareFilter(PGLOBAL, PFIL, bool);
friend DllExport bool ApplyFilter(PGLOBAL, PFIL, PTDB = NULL);
public:
// Constructors
FILTER(PGLOBAL g, POPER pop, PPARM *tp = NULL);
FILTER(PGLOBAL g, OPVAL opc, PPARM *tp = NULL);
FILTER(PFIL fil1);
// Implementation
virtual int GetType(void) {return TYPE_FILTER;}
virtual int GetResultType(void) {return TYPE_INT;}
virtual int GetLength(void) {return 1;}
virtual int GetLengthEx(void) {assert(FALSE); return 0;}
virtual int GetScale() {return 0;};
PFIL GetNext(void) {return Next;}
OPVAL GetOpc(void) {return Opc;}
int GetOpm(void) {return Opm;}
int GetArgType(int i) {return Arg(i)->GetType();}
bool GetResult(void) {return Value->GetIntValue() != 0;}
PXOB &Arg(int i) {return Test[i].Arg;}
PVAL &Val(int i) {return Test[i].Value;}
bool &Conv(int i) {return Test[i].Conv;}
void SetNext(PFIL filp) {Next = filp;}
// Methods
virtual void Reset(void);
virtual bool Compare(PXOB) {return FALSE;} // Not used yet
virtual bool Init(PGLOBAL);
virtual bool Eval(PGLOBAL);
virtual bool SetFormat(PGLOBAL, FORMAT&) {return TRUE;} // NUY
virtual int CheckColumn(PGLOBAL g, PSQL sqlp, PXOB &xp, int &ag);
virtual int RefNum(PSQL);
virtual PXOB SetSelect(PGLOBAL, PSQL, bool) {return NULL;} // NUY
//virtual PXOB CheckSubQuery(PGLOBAL, PSQL);
virtual bool CheckLocal(PTDB);
virtual int CheckSpcCol(PTDB tdbp, int n);
virtual void Print(PGLOBAL g, FILE *f, UINT n);
virtual void Print(PGLOBAL g, char *ps, UINT z);
PFIL Linearize(bool nosep);
PFIL Link(PGLOBAL g, PFIL fil2);
PFIL RemoveLastSep(void);
// PFIL SortJoin(PGLOBAL g);
// bool FindJoinFilter(POPJOIN opj, PFIL fprec, bool teq,
// bool tek, bool tk2, bool tc2, bool tix, bool thx);
// bool CheckHaving(PGLOBAL g, PSQL sqlp);
bool Convert(PGLOBAL g, bool having);
int SplitFilter(PFIL *fp);
int SplitFilter(PFIL *fp, PTDB tp, int n);
PFIL LinkFilter(PGLOBAL g, PFIL fp2);
// PFIL Copy(PTABS t);
protected:
FILTER(void) {} // Standard constructor not to be used
void Constr(PGLOBAL g, OPVAL opc, int opm, PPARM *tp);
// Members
PFIL Next; // Used for linearization
OPVAL Opc; // Comparison operator
int Opm; // Modificator
BYTE Bt; // Operator bitmap
struct {
int B_T; // Buffer type
PXOB Arg; // Points to argument
PVAL Value; // Points to argument value
bool Conv; // TRUE if argument must be converted
} Test[2];
}; // end of class FILTER
/***********************************************************************/
/* Derived class FILTERX: used to replace a filter by a derived class */
/* using an Eval method optimizing the filtering evaluation. */
/* Note: this works only if the members of the derived class are the */
/* same than the ones of the original class (NO added members). */
/***********************************************************************/
class FILTERX : public FILTER {
public:
// Methods
virtual bool Eval(PGLOBAL) = 0; // just to prevent direct FILTERX use
// Fake operator new used to change a filter into a derived filter
void * operator new(size_t size, PFIL filp) {return filp;}
#if !defined(__BORLANDC__)
// Avoid warning C4291 by defining a matching dummy delete operator
void operator delete(void *, PFIL) {}
#endif
}; // end of class FILTERX
/***********************************************************************/
/* Derived class FILTEREQ: OP_EQ, no conversion and Xobject args. */
/***********************************************************************/
class FILTERCMP : public FILTERX {
public:
// Constructor
FILTERCMP(PGLOBAL g);
// Methods
virtual bool Eval(PGLOBAL);
}; // end of class FILTEREQ
/***********************************************************************/
/* Derived class FILTERAND: OP_AND, no conversion and Xobject args. */
/***********************************************************************/
class FILTERAND : public FILTERX {
public:
// Methods
virtual bool Eval(PGLOBAL);
}; // end of class FILTERAND
/***********************************************************************/
/* Derived class FILTEROR: OP_OR, no conversion and Xobject args. */
/***********************************************************************/
class FILTEROR : public FILTERX {
public:
// Methods
virtual bool Eval(PGLOBAL);
}; // end of class FILTEROR
/***********************************************************************/
/* Derived class FILTERNOT: OP_NOT, no conversion and Xobject args. */
/***********************************************************************/
class FILTERNOT : public FILTERX {
public:
// Methods
virtual bool Eval(PGLOBAL);
}; // end of class FILTERNOT
/***********************************************************************/
/* Derived class FILTERIN: OP_IN, no conversion and Array 2nd arg. */
/***********************************************************************/
class FILTERIN : public FILTERX {
public:
// Methods
virtual bool Eval(PGLOBAL);
}; // end of class FILTERIN
/***********************************************************************/
/* Derived class FILTERTRUE: Always returns TRUE. */
/***********************************************************************/
class FILTERTRUE : public FILTERX {
public:
// Constructor
FILTERTRUE(PVAL valp) {Value = valp; Value->SetValue_bool(TRUE);}
// Methods
virtual void Reset(void);
virtual bool Eval(PGLOBAL);
}; // end of class FILTERTRUE
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment