Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
653e99de
Commit
653e99de
authored
Aug 23, 2005
by
tomas@poseidon.ndb.mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
removed ndb grep from configure .in
parent
28e3fe87
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
0 additions
and
3432 deletions
+0
-3432
configure.in
configure.in
+0
-1
ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile
ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile
+0
-9
ndb/src/kernel/blocks/grep/Grep.cpp
ndb/src/kernel/blocks/grep/Grep.cpp
+0
-2010
ndb/src/kernel/blocks/grep/Grep.hpp
ndb/src/kernel/blocks/grep/Grep.hpp
+0
-535
ndb/src/kernel/blocks/grep/GrepInit.cpp
ndb/src/kernel/blocks/grep/GrepInit.cpp
+0
-164
ndb/src/kernel/blocks/grep/Makefile.am
ndb/src/kernel/blocks/grep/Makefile.am
+0
-23
ndb/src/kernel/blocks/grep/systab_test/Makefile
ndb/src/kernel/blocks/grep/systab_test/Makefile
+0
-12
ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp
ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp
+0
-138
ndb/test/ndbapi/testGrep.cpp
ndb/test/ndbapi/testGrep.cpp
+0
-540
No files found.
configure.in
View file @
653e99de
...
...
@@ -3281,7 +3281,6 @@ AC_CONFIG_FILES(ndb/Makefile ndb/include/Makefile dnl
ndb/src/kernel/blocks/backup/Makefile dnl
ndb/src/kernel/blocks/dbutil/Makefile dnl
ndb/src/kernel/blocks/suma/Makefile dnl
ndb/src/kernel/blocks/grep/Makefile dnl
ndb/src/kernel/blocks/dbtux/Makefile dnl
ndb/src/kernel/vm/Makefile dnl
ndb/src/mgmapi/Makefile dnl
...
...
ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile
deleted
100644 → 0
View file @
28e3fe87
include
.defs.mk
BIN_TARGET
:=
redoLogFileReader
SOURCES
:=
records.cpp redoLogFileReader.cpp
TYPE
:=
util
include
$(NDB_TOP)/Epilogue.mk
ndb/src/kernel/blocks/grep/Grep.cpp
deleted
100644 → 0
View file @
28e3fe87
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Grep.hpp"
#include <ndb_version.h>
#include <NdbTCP.h>
#include <Bitmask.hpp>
#include <signaldata/NodeFailRep.hpp>
#include <signaldata/ReadNodesConf.hpp>
#include <signaldata/CheckNodeGroups.hpp>
#include <signaldata/GrepImpl.hpp>
#include <signaldata/RepImpl.hpp>
#include <signaldata/EventReport.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/WaitGCP.hpp>
#include <GrepEvent.hpp>
#include <AttributeHeader.hpp>
#define CONTINUEB_DELAY 500
#define SSREPBLOCKNO 2
#define PSREPBLOCKNO 2
//#define DEBUG_GREP
//#define DEBUG_GREP_SUBSCRIPTION
//#define DEBUG_GREP_TRANSFER
//#define DEBUG_GREP_APPLY
//#define DEBUG_GREP_DELETE
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: STARTUP of GREP Block, etc
* ------------------------------------------------------------------------
**************************************************************************/
static
Uint32
g_TypeOfStart
=
NodeState
::
ST_ILLEGAL_TYPE
;
void
Grep
::
getNodeGroupMembers
(
Signal
*
signal
)
{
jam
();
/**
* Ask DIH for nodeGroupMembers
*/
CheckNodeGroups
*
sd
=
(
CheckNodeGroups
*
)
signal
->
getDataPtrSend
();
sd
->
blockRef
=
reference
();
sd
->
requestType
=
CheckNodeGroups
::
Direct
|
CheckNodeGroups
::
GetNodeGroupMembers
;
sd
->
nodeId
=
getOwnNodeId
();
EXECUTE_DIRECT
(
DBDIH
,
GSN_CHECKNODEGROUPSREQ
,
signal
,
CheckNodeGroups
::
SignalLength
);
jamEntry
();
c_nodeGroup
=
sd
->
output
;
c_noNodesInGroup
=
0
;
for
(
int
i
=
0
;
i
<
MAX_NDB_NODES
;
i
++
)
{
if
(
sd
->
mask
.
get
(
i
))
{
if
(
i
==
getOwnNodeId
())
c_idInNodeGroup
=
c_noNodesInGroup
;
c_nodesInGroup
[
c_noNodesInGroup
]
=
i
;
c_noNodesInGroup
++
;
}
}
ndbrequire
(
c_noNodesInGroup
>
0
);
// at least 1 node in the nodegroup
#ifdef NODEFAIL_DEBUG
for
(
Uint32
i
=
0
;
i
<
c_noNodesInGroup
;
i
++
)
{
ndbout_c
(
"Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u"
,
c_nodeGroup
,
getOwnNodeId
(),
c_idInNodeGroup
,
i
,
c_nodesInGroup
[
i
]);
}
#endif
}
void
Grep
::
execSTTOR
(
Signal
*
signal
)
{
jamEntry
();
const
Uint32
startphase
=
signal
->
theData
[
1
];
const
Uint32
typeOfStart
=
signal
->
theData
[
7
];
if
(
startphase
==
3
)
{
jam
();
signal
->
theData
[
0
]
=
reference
();
g_TypeOfStart
=
typeOfStart
;
sendSignal
(
NDBCNTR_REF
,
GSN_READ_NODESREQ
,
signal
,
1
,
JBB
);
return
;
}
if
(
startphase
==
5
)
{
jam
();
/**
* we don't want any log/meta records comming to use
* until we are done with the recovery.
*/
if
(
g_TypeOfStart
==
NodeState
::
ST_NODE_RESTART
)
{
jam
();
pspart
.
m_recoveryMode
=
true
;
getNodeGroupMembers
(
signal
);
for
(
Uint32
i
=
0
;
i
<
c_noNodesInGroup
;
i
++
)
{
Uint32
ref
=
numberToRef
(
GREP
,
c_nodesInGroup
[
i
]);
if
(
ref
!=
reference
())
sendSignal
(
ref
,
GSN_GREP_START_ME
,
signal
,
1
/*SumaStartMe::SignalLength*/
,
JBB
);
}
}
else
pspart
.
m_recoveryMode
=
false
;
}
if
(
startphase
==
7
)
{
jam
();
if
(
g_TypeOfStart
==
NodeState
::
ST_NODE_RESTART
)
{
pspart
.
m_recoveryMode
=
false
;
}
}
sendSTTORRY
(
signal
);
}
void
Grep
::
PSPart
::
execSTART_ME
(
Signal
*
signal
)
{
jamEntry
();
GrepStartMe
*
me
=
(
GrepStartMe
*
)
signal
->
getDataPtr
();
BlockReference
ref
=
me
->
senderRef
;
GrepAddSubReq
*
const
addReq
=
(
GrepAddSubReq
*
)
signal
->
getDataPtr
();
SubscriptionPtr
subPtr
;
c_subscriptions
.
first
(
c_subPtr
);
for
(;
!
c_subPtr
.
isNull
();
c_subscriptions
.
next
(
c_subPtr
))
{
jam
();
subPtr
.
i
=
c_subPtr
.
curr
.
i
;
subPtr
.
p
=
c_subscriptions
.
getPtr
(
subPtr
.
i
);
addReq
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
addReq
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
addReq
->
subscriberData
=
subPtr
.
p
->
m_subscriberData
;
addReq
->
subscriptionType
=
subPtr
.
p
->
m_subscriptionType
;
addReq
->
senderRef
=
subPtr
.
p
->
m_coordinatorRef
;
addReq
->
subscriberRef
=
subPtr
.
p
->
m_subscriberRef
;
sendSignal
(
ref
,
GSN_GREP_ADD_SUB_REQ
,
signal
,
GrepAddSubReq
::
SignalLength
,
JBB
);
}
addReq
->
subscriptionId
=
0
;
addReq
->
subscriptionKey
=
0
;
addReq
->
subscriberData
=
0
;
addReq
->
subscriptionType
=
0
;
addReq
->
senderRef
=
0
;
addReq
->
subscriberRef
=
0
;
sendSignal
(
ref
,
GSN_GREP_ADD_SUB_REQ
,
signal
,
GrepAddSubReq
::
SignalLength
,
JBB
);
}
void
Grep
::
PSPart
::
execGREP_ADD_SUB_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepAddSubReq
*
const
grepReq
=
(
GrepAddSubReq
*
)
signal
->
getDataPtr
();
const
Uint32
subId
=
grepReq
->
subscriptionId
;
const
Uint32
subKey
=
grepReq
->
subscriptionKey
;
const
Uint32
subData
=
grepReq
->
subscriberData
;
const
Uint32
subType
=
grepReq
->
subscriptionType
;
const
Uint32
coordinatorRef
=
grepReq
->
senderRef
;
/**
* this is ref to the REP node for this subscription.
*/
const
Uint32
subRef
=
grepReq
->
subscriberRef
;
if
(
subId
!=
0
&&
subKey
!=
0
)
{
jam
();
SubscriptionPtr
subPtr
;
ndbrequire
(
c_subscriptionPool
.
seize
(
subPtr
));
subPtr
.
p
->
m_coordinatorRef
=
coordinatorRef
;
subPtr
.
p
->
m_subscriptionId
=
subId
;
subPtr
.
p
->
m_subscriptionKey
=
subKey
;
subPtr
.
p
->
m_subscriberRef
=
subRef
;
subPtr
.
p
->
m_subscriberData
=
subData
;
subPtr
.
p
->
m_subscriptionType
=
subType
;
c_subscriptions
.
add
(
subPtr
);
}
else
{
jam
();
GrepAddSubConf
*
conf
=
(
GrepAddSubConf
*
)
grepReq
;
conf
->
noOfSub
=
c_subscriptionPool
.
getSize
()
-
c_subscriptionPool
.
getNoOfFree
();
sendSignal
(
signal
->
getSendersBlockRef
(),
GSN_GREP_ADD_SUB_CONF
,
signal
,
GrepAddSubConf
::
SignalLength
,
JBB
);
}
}
void
Grep
::
PSPart
::
execGREP_ADD_SUB_REF
(
Signal
*
signal
)
{
/**
* @todo fix error stuff
*/
}
void
Grep
::
PSPart
::
execGREP_ADD_SUB_CONF
(
Signal
*
signal
)
{
jamEntry
();
GrepAddSubConf
*
const
conf
=
(
GrepAddSubConf
*
)
signal
->
getDataPtr
();
Uint32
noOfSubscriptions
=
conf
->
noOfSub
;
Uint32
noOfRestoredSubscriptions
=
c_subscriptionPool
.
getSize
()
-
c_subscriptionPool
.
getNoOfFree
();
if
(
noOfSubscriptions
!=
noOfRestoredSubscriptions
)
{
jam
();
/**
*@todo send ref signal
*/
ndbrequire
(
false
);
}
}
void
Grep
::
execREAD_NODESCONF
(
Signal
*
signal
)
{
jamEntry
();
ReadNodesConf
*
conf
=
(
ReadNodesConf
*
)
signal
->
getDataPtr
();
#if 0
ndbout_c("Grep: Recd READ_NODESCONF");
#endif
/******************************
* Check which REP nodes exist
******************************/
Uint32
i
;
for
(
i
=
1
;
i
<
MAX_NODES
;
i
++
)
{
jam
();
#if 0
ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType());
#endif
if
(
getNodeInfo
(
i
).
getType
()
==
NodeInfo
::
REP
)
{
jam
();
/**
* @todo This should work for more than ONE rep node!
*/
pscoord
.
m_repRef
=
numberToRef
(
PSREPBLOCKNO
,
i
);
pspart
.
m_repRef
=
numberToRef
(
PSREPBLOCKNO
,
i
);
#if 0
ndbout_c("Grep: REP node %d detected", i);
#endif
}
}
/*****************************
* Check which DB nodes exist
*****************************/
m_aliveNodes
.
clear
();
Uint32
count
=
0
;
for
(
i
=
0
;
i
<
MAX_NDB_NODES
;
i
++
)
{
if
(
NodeBitmask
::
get
(
conf
->
allNodes
,
i
))
{
jam
();
count
++
;
NodePtr
node
;
ndbrequire
(
m_nodes
.
seize
(
node
));
node
.
p
->
nodeId
=
i
;
if
(
NodeBitmask
::
get
(
conf
->
inactiveNodes
,
i
))
{
node
.
p
->
alive
=
0
;
}
else
{
node
.
p
->
alive
=
1
;
m_aliveNodes
.
set
(
i
);
}
}
}
m_masterNodeId
=
conf
->
masterNodeId
;
ndbrequire
(
count
==
conf
->
noOfNodes
);
sendSTTORRY
(
signal
);
}
void
Grep
::
sendSTTORRY
(
Signal
*
signal
)
{
signal
->
theData
[
0
]
=
0
;
signal
->
theData
[
3
]
=
1
;
signal
->
theData
[
4
]
=
3
;
signal
->
theData
[
5
]
=
5
;
signal
->
theData
[
6
]
=
7
;
signal
->
theData
[
7
]
=
255
;
// No more start phases from missra
sendSignal
(
NDBCNTR_REF
,
GSN_STTORRY
,
signal
,
8
,
JBB
);
}
void
Grep
::
execNDB_STTOR
(
Signal
*
signal
)
{
jamEntry
();
}
void
Grep
::
execDUMP_STATE_ORD
(
Signal
*
signal
)
{
jamEntry
();
//Uint32 tCase = signal->theData[0];
#if 0
if(sscoord.m_repRef == 0)
{
ndbout << "Grep: Recd DUMP signal but has no connection with REP node"
<< endl;
return;
}
#endif
/*
switch (tCase)
{
case 8100: sscoord.grepReq(signal, GrepReq::START_SUBSCR); break;
case 8102: sscoord.grepReq(signal, GrepReq::START_METALOG); break;
case 8104: sscoord.grepReq(signal, GrepReq::START_METASCAN); break;
case 8106: sscoord.grepReq(signal, GrepReq::START_DATALOG); break;
case 8108: sscoord.grepReq(signal, GrepReq::START_DATASCAN); break;
case 8110: sscoord.grepReq(signal, GrepReq::STOP_SUBSCR); break;
case 8500: sscoord.grepReq(signal, GrepReq::REMOVE_BUFFERS); break;
case 8300: sscoord.grepReq(signal, GrepReq::SLOWSTOP); break;
case 8400: sscoord.grepReq(signal, GrepReq::FASTSTOP); break;
case 8600: sscoord.grepReq(signal, GrepReq::CREATE_SUBSCR); break;
case 8700: sscoord.dropTable(signal,(Uint32)signal->theData[1]);break;
default: break;
}
*/
}
/**
* Signal received when REP node has failed
*/
void
Grep
::
execAPI_FAILREQ
(
Signal
*
signal
)
{
jamEntry
();
//Uint32 failedApiNode = signal->theData[0];
//BlockReference retRef = signal->theData[1];
/**
* @todo We should probably do something smart if the
* PS REP node fails???? /Lars
*/
#if 0
ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode);
#endif
/**
* @note This signal received is NOT allowed to send any CONF
* signal, since this would screw up TC/DICT to API
* "connections".
*/
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: GREP Control
* ------------------------------------------------------------------------
**************************************************************************/
void
Grep
::
execGREP_REQ
(
Signal
*
signal
)
{
jamEntry
();
//GrepReq * req = (GrepReq *)signal->getDataPtr();
/**
* @todo Fix so that request is redirected to REP Server
* Obsolete?
* Was: sscoord.grepReq(signal, req->request);
*/
ndbout_c
(
"Warning! REP commands can only be executed at REP SERVER prompt!"
);
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: NODE STATE HANDLING
* ------------------------------------------------------------------------
**************************************************************************/
void
Grep
::
execNODE_FAILREP
(
Signal
*
signal
)
{
jamEntry
();
NodeFailRep
*
rep
=
(
NodeFailRep
*
)
signal
->
getDataPtr
();
bool
changed
=
false
;
NodePtr
nodePtr
;
for
(
m_nodes
.
first
(
nodePtr
);
nodePtr
.
i
!=
RNIL
;
m_nodes
.
next
(
nodePtr
))
{
jam
();
if
(
NodeBitmask
::
get
(
rep
->
theNodes
,
nodePtr
.
p
->
nodeId
))
{
jam
();
if
(
nodePtr
.
p
->
alive
)
{
jam
();
ndbassert
(
m_aliveNodes
.
get
(
nodePtr
.
p
->
nodeId
));
changed
=
true
;
}
else
{
ndbassert
(
!
m_aliveNodes
.
get
(
nodePtr
.
p
->
nodeId
));
}
nodePtr
.
p
->
alive
=
0
;
m_aliveNodes
.
clear
(
nodePtr
.
p
->
nodeId
);
}
}
/**
* Problem: Fix a node failure running a protocol
*
* 1. Coordinator node of a protocol dies
* - Elect a new coordinator
* - send ref to user
*
* 2. Non-coordinator dies.
* - make coordinator aware of this
* so that coordinator does not wait for
* conf from faulty node
* - node recovery will restore the non-coordinator.
*
*/
}
void
Grep
::
execINCL_NODEREQ
(
Signal
*
signal
)
{
jamEntry
();
//const Uint32 senderRef = signal->theData[0];
const
Uint32
inclNode
=
signal
->
theData
[
1
];
NodePtr
node
;
for
(
m_nodes
.
first
(
node
);
node
.
i
!=
RNIL
;
m_nodes
.
next
(
node
))
{
jam
();
const
Uint32
nodeId
=
node
.
p
->
nodeId
;
if
(
inclNode
==
nodeId
)
{
jam
();
ndbrequire
(
node
.
p
->
alive
==
0
);
ndbassert
(
!
m_aliveNodes
.
get
(
nodeId
));
node
.
p
->
alive
=
1
;
m_aliveNodes
.
set
(
nodeId
);
break
;
}
}
/**
* @todo: if we include this DIH's got to be prepared, later if needed...
*/
#if 0
signal->theData[0] = reference();
sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
#endif
}
/**
* Helper methods
*/
void
Grep
::
PSCoord
::
prepareOperationRec
(
SubCoordinatorPtr
subPtr
,
BlockReference
subscriber
,
Uint32
subId
,
Uint32
subKey
,
Uint32
request
)
{
subPtr
.
p
->
m_coordinatorRef
=
reference
();
subPtr
.
p
->
m_subscriberRef
=
subscriber
;
subPtr
.
p
->
m_subscriberData
=
subPtr
.
i
;
subPtr
.
p
->
m_subscriptionId
=
subId
;
subPtr
.
p
->
m_subscriptionKey
=
subKey
;
subPtr
.
p
->
m_outstandingRequest
=
request
;
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: CREATE SUBSCRIPTION ID
* ------------------------------------------------------------------------
*
* Requests SUMA to create a unique subscription id
**************************************************************************/
void
Grep
::
PSCoord
::
execGREP_CREATE_SUBID_REQ
(
Signal
*
signal
)
{
jamEntry
();
CreateSubscriptionIdReq
*
req
=
(
CreateSubscriptionIdReq
*
)
signal
->
getDataPtr
();
BlockReference
ref
=
signal
->
getSendersBlockRef
();
SubCoordinatorPtr
subPtr
;
if
(
!
c_subCoordinatorPool
.
seize
(
subPtr
))
{
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
ref
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
SUBSCRIPTION_ID_NOMEM
);
return
;
}
prepareOperationRec
(
subPtr
,
ref
,
0
,
0
,
GSN_CREATE_SUBID_REQ
);
ndbout_c
(
"SUBID_REQ Ref %d"
,
ref
);
req
->
senderData
=
subPtr
.
p
->
m_subscriberData
;
sendSignal
(
SUMA_REF
,
GSN_CREATE_SUBID_REQ
,
signal
,
SubCreateReq
::
SignalLength
,
JBB
);
#if 1 //def DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA"
);
#endif
}
void
Grep
::
PSCoord
::
execCREATE_SUBID_CONF
(
Signal
*
signal
)
{
jamEntry
();
CreateSubscriptionIdConf
const
*
conf
=
(
CreateSubscriptionIdConf
*
)
signal
->
getDataPtr
();
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
subData
=
conf
->
subscriberData
;
#if 1 //def DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)"
,
subId
,
subKey
);
#endif
SubCoordinatorPtr
subPtr
;
c_subCoordinatorPool
.
getPtr
(
subPtr
,
subData
);
BlockReference
repRef
=
subPtr
.
p
->
m_subscriberRef
;
{
// Check that id/key is unique
SubCoordinator
key
;
SubCoordinatorPtr
tmp
;
key
.
m_subscriptionId
=
subId
;
key
.
m_subscriptionKey
=
subKey
;
if
(
c_runningSubscriptions
.
find
(
tmp
,
key
)){
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
repRef
;
sub
.
m_subscriptionId
=
subId
;
sub
.
m_subscriptionKey
=
subKey
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
SUBSCRIPTION_ID_NOT_UNIQUE
);
return
;
}
}
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_CREATE_SUBID_CONF
,
signal
,
CreateSubscriptionIdConf
::
SignalLength
,
JBB
);
c_subCoordinatorPool
.
release
(
subData
);
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
GrepEvent
::
GrepPS_CreateSubIdConf
,
subId
,
subKey
,
(
Uint32
)
GrepError
::
GE_NO_ERROR
);
}
void
Grep
::
PSCoord
::
execCREATE_SUBID_REF
(
Signal
*
signal
)
{
jamEntry
();
CreateSubscriptionIdRef
const
*
ref
=
(
CreateSubscriptionIdRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
subscriberData
;
GrepError
::
GE_Code
err
;
Uint32
sendersBlockRef
=
signal
->
getSendersBlockRef
();
if
(
sendersBlockRef
==
SUMA_REF
)
{
jam
();
err
=
GrepError
::
SUBSCRIPTION_ID_SUMA_FAILED_CREATE
;
}
else
{
jam
();
ndbrequire
(
false
);
/* Added since errorcode err unhandled
* TODO: fix correct errorcode
*/
err
=
GrepError
::
GE_NO_ERROR
;
// remove compiler warning
}
SubCoordinatorPtr
subPtr
;
c_runningSubscriptions
.
getPtr
(
subPtr
,
subData
);
BlockReference
repref
=
subPtr
.
p
->
m_subscriberRef
;
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
repref
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sendRefToSS
(
signal
,
sub
,
err
);
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: CREATE SUBSCRIPTION
* ------------------------------------------------------------------------
*
* Creates a subscription for every GREP to its local SUMA.
* GREP node that executes createSubscription becomes the GREP Coord.
**************************************************************************/
/**
* Request to create a subscription (sent from SS)
*/
void
Grep
::
PSCoord
::
execGREP_SUB_CREATE_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepSubCreateReq
const
*
grepReq
=
(
GrepSubCreateReq
*
)
signal
->
getDataPtr
();
Uint32
subId
=
grepReq
->
subscriptionId
;
Uint32
subKey
=
grepReq
->
subscriptionKey
;
Uint32
subType
=
grepReq
->
subscriptionType
;
BlockReference
rep
=
signal
->
getSendersBlockRef
();
GrepCreateReq
*
req
=
(
GrepCreateReq
*
)
grepReq
;
SubCoordinatorPtr
subPtr
;
if
(
!
c_subCoordinatorPool
.
seize
(
subPtr
))
{
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
rep
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sub
.
m_outstandingRequest
=
GSN_GREP_CREATE_REQ
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
NOSPACE_IN_POOL
);
return
;
}
prepareOperationRec
(
subPtr
,
numberToRef
(
PSREPBLOCKNO
,
refToNode
(
rep
)),
subId
,
subKey
,
GSN_GREP_CREATE_REQ
);
/* Get the payload of the signal.
*/
SegmentedSectionPtr
selectedTablesPtr
;
if
(
subType
==
SubCreateReq
::
SelectiveTableSnapshot
)
{
jam
();
ndbrequire
(
signal
->
getNoOfSections
()
==
1
);
signal
->
getSection
(
selectedTablesPtr
,
0
);
signal
->
header
.
m_noOfSections
=
0
;
}
/**
* Prepare the signal to be sent to Grep participatns
*/
subPtr
.
p
->
m_subscriptionType
=
subType
;
req
->
senderRef
=
reference
();
req
->
subscriberRef
=
numberToRef
(
PSREPBLOCKNO
,
refToNode
(
rep
));
req
->
subscriberData
=
subPtr
.
p
->
m_subscriberData
;
req
->
subscriptionId
=
subId
;
req
->
subscriptionKey
=
subKey
;
req
->
subscriptionType
=
subType
;
/*add payload if it is a selectivetablesnap*/
if
(
subType
==
SubCreateReq
::
SelectiveTableSnapshot
)
{
jam
();
signal
->
setSection
(
selectedTablesPtr
,
0
);
}
/******************************
* Send to all PS participants
******************************/
NodeReceiverGroup
rg
(
GREP
,
m_grep
->
m_aliveNodes
);
subPtr
.
p
->
m_outstandingParticipants
=
rg
;
sendSignal
(
rg
,
GSN_GREP_CREATE_REQ
,
signal
,
GrepCreateReq
::
SignalLength
,
JBB
);
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSCoord: Sent GREP_CREATE_REQ "
"(subId:%d, subKey:%d, subData:%d, subType:%d) to parts"
,
subId
,
subKey
,
subPtr
.
p
->
m_subscriberData
,
subType
);
#endif
}
void
Grep
::
PSPart
::
execGREP_CREATE_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepCreateReq
*
const
grepReq
=
(
GrepCreateReq
*
)
signal
->
getDataPtr
();
const
Uint32
subId
=
grepReq
->
subscriptionId
;
const
Uint32
subKey
=
grepReq
->
subscriptionKey
;
const
Uint32
subData
=
grepReq
->
subscriberData
;
const
Uint32
subType
=
grepReq
->
subscriptionType
;
const
Uint32
coordinatorRef
=
grepReq
->
senderRef
;
const
Uint32
subRef
=
grepReq
->
subscriberRef
;
//this is ref to the
//REP node for this
//subscription.
SubscriptionPtr
subPtr
;
ndbrequire
(
c_subscriptionPool
.
seize
(
subPtr
));
subPtr
.
p
->
m_coordinatorRef
=
coordinatorRef
;
subPtr
.
p
->
m_subscriptionId
=
subId
;
subPtr
.
p
->
m_subscriptionKey
=
subKey
;
subPtr
.
p
->
m_subscriberRef
=
subRef
;
subPtr
.
p
->
m_subscriberData
=
subPtr
.
i
;
subPtr
.
p
->
m_subscriptionType
=
subType
;
subPtr
.
p
->
m_outstandingRequest
=
GSN_GREP_CREATE_REQ
;
subPtr
.
p
->
m_operationPtrI
=
subData
;
c_subscriptions
.
add
(
subPtr
);
SegmentedSectionPtr
selectedTablesPtr
;
if
(
subType
==
SubCreateReq
::
SelectiveTableSnapshot
)
{
jam
();
ndbrequire
(
signal
->
getNoOfSections
()
==
1
);
signal
->
getSection
(
selectedTablesPtr
,
0
);
// SubCreateReq::TABLE_LIST);
signal
->
header
.
m_noOfSections
=
0
;
}
/**
* Prepare signal to be sent to SUMA
*/
SubCreateReq
*
sumaReq
=
(
SubCreateReq
*
)
grepReq
;
sumaReq
->
subscriberRef
=
GREP_REF
;
sumaReq
->
subscriberData
=
subPtr
.
p
->
m_subscriberData
;
sumaReq
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
sumaReq
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
sumaReq
->
subscriptionType
=
subPtr
.
p
->
m_subscriptionType
;
/*add payload if it is a selectivetablesnap*/
if
(
subType
==
SubCreateReq
::
SelectiveTableSnapshot
)
{
jam
();
signal
->
setSection
(
selectedTablesPtr
,
0
);
}
sendSignal
(
SUMA_REF
,
GSN_SUB_CREATE_REQ
,
signal
,
SubCreateReq
::
SignalLength
,
JBB
);
}
void
Grep
::
PSPart
::
execSUB_CREATE_CONF
(
Signal
*
signal
)
{
jamEntry
();
SubCreateConf
*
const
conf
=
(
SubCreateConf
*
)
signal
->
getDataPtr
();
Uint32
subData
=
conf
->
subscriberData
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
/**
@todo check why this can fuck up -johan
ndbrequire(subPtr.p->m_subscriptionId == conf->subscriptionId);
ndbrequire(subPtr.p->m_subscriptionKey == conf->subscriptionKey);
*/
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSPart: Recd SUB_CREATE_CONF "
"(subId:%d, subKey:%d) from SUMA"
,
conf
->
subscriptionId
,
conf
->
subscriptionKey
);
#endif
/*********************
* Send conf to coord
*********************/
GrepCreateConf
*
grepConf
=
(
GrepCreateConf
*
)
conf
;
grepConf
->
senderNodeId
=
getOwnNodeId
();
grepConf
->
senderData
=
subPtr
.
p
->
m_operationPtrI
;
sendSignal
(
subPtr
.
p
->
m_coordinatorRef
,
GSN_GREP_CREATE_CONF
,
signal
,
GrepCreateConf
::
SignalLength
,
JBB
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
}
/**
* Handle errors that either occured in:
* 1) PSPart
* or
* 2) propagated from local SUMA
*/
void
Grep
::
PSPart
::
execSUB_CREATE_REF
(
Signal
*
signal
)
{
jamEntry
();
SubCreateRef
*
const
ref
=
(
SubCreateRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
subscriberData
;
GrepError
::
GE_Code
err
=
(
GrepError
::
GE_Code
)
ref
->
err
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToPSCoord
(
signal
,
*
subPtr
.
p
,
err
/*error*/
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
}
void
Grep
::
PSCoord
::
execGREP_CREATE_CONF
(
Signal
*
signal
)
{
jamEntry
();
GrepCreateConf
const
*
conf
=
(
GrepCreateConf
*
)
signal
->
getDataPtr
();
Uint32
subData
=
conf
->
senderData
;
Uint32
nodeId
=
conf
->
senderNodeId
;
SubCoordinatorPtr
subPtr
;
c_subCoordinatorPool
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_outstandingRequest
==
GSN_GREP_CREATE_REQ
);
subPtr
.
p
->
m_outstandingParticipants
.
clearWaitingFor
(
nodeId
);
if
(
!
subPtr
.
p
->
m_outstandingParticipants
.
done
())
return
;
/********************************
* All participants have CONF:ed
********************************/
Uint32
subId
=
subPtr
.
p
->
m_subscriptionId
;
Uint32
subKey
=
subPtr
.
p
->
m_subscriptionKey
;
GrepSubCreateConf
*
grepConf
=
(
GrepSubCreateConf
*
)
signal
->
getDataPtr
();
grepConf
->
subscriptionId
=
subId
;
grepConf
->
subscriptionKey
=
subKey
;
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_SUB_CREATE_CONF
,
signal
,
GrepSubCreateConf
::
SignalLength
,
JBB
);
/**
* Send event report
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
GrepEvent
::
GrepPS_SubCreateConf
,
subId
,
subKey
,
(
Uint32
)
GrepError
::
GE_NO_ERROR
);
c_subCoordinatorPool
.
release
(
subPtr
);
}
/**
* Handle errors that either occured in:
* 1) PSCoord
* or
* 2) propagated from PSPart
*/
void
Grep
::
PSCoord
::
execGREP_CREATE_REF
(
Signal
*
signal
)
{
jamEntry
();
GrepCreateRef
*
const
ref
=
(
GrepCreateRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
senderData
;
Uint32
err
=
ref
->
err
;
SubCoordinatorPtr
subPtr
;
c_runningSubscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToSS
(
signal
,
*
subPtr
.
p
,
(
GrepError
::
GE_Code
)
err
/*error*/
);
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: START SUBSCRIPTION
* ------------------------------------------------------------------------
*
* Starts a subscription at SUMA.
* Each participant starts its own subscription.
**************************************************************************/
/**
* Request to start subscription (Sent from SS)
*/
void
Grep
::
PSCoord
::
execGREP_SUB_START_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepSubStartReq
*
const
subReq
=
(
GrepSubStartReq
*
)
signal
->
getDataPtr
();
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
subReq
->
part
;
Uint32
subId
=
subReq
->
subscriptionId
;
Uint32
subKey
=
subReq
->
subscriptionKey
;
BlockReference
rep
=
signal
->
getSendersBlockRef
();
SubCoordinatorPtr
subPtr
;
if
(
!
c_subCoordinatorPool
.
seize
(
subPtr
))
{
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
rep
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sub
.
m_outstandingRequest
=
GSN_GREP_START_REQ
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
NOSPACE_IN_POOL
);
return
;
}
prepareOperationRec
(
subPtr
,
numberToRef
(
PSREPBLOCKNO
,
refToNode
(
rep
)),
subId
,
subKey
,
GSN_GREP_START_REQ
);
GrepStartReq
*
const
req
=
(
GrepStartReq
*
)
subReq
;
req
->
part
=
(
Uint32
)
part
;
req
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
req
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
req
->
senderData
=
subPtr
.
p
->
m_subscriberData
;
/***************************
* Send to all participants
***************************/
NodeReceiverGroup
rg
(
GREP
,
m_grep
->
m_aliveNodes
);
subPtr
.
p
->
m_outstandingParticipants
=
rg
;
sendSignal
(
rg
,
GSN_GREP_START_REQ
,
signal
,
GrepStartReq
::
SignalLength
,
JBB
);
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSCoord: Sent GREP_START_REQ "
"(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants"
,
req
->
subscriptionId
,
req
->
subscriptionKey
,
req
->
senderData
,
part
);
#endif
}
void
Grep
::
PSPart
::
execGREP_START_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepStartReq
*
const
grepReq
=
(
GrepStartReq
*
)
signal
->
getDataPtr
();
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
grepReq
->
part
;
Uint32
subId
=
grepReq
->
subscriptionId
;
Uint32
subKey
=
grepReq
->
subscriptionKey
;
Uint32
operationPtrI
=
grepReq
->
senderData
;
Subscription
key
;
key
.
m_subscriptionId
=
subId
;
key
.
m_subscriptionKey
=
subKey
;
SubscriptionPtr
subPtr
;
ndbrequire
(
c_subscriptions
.
find
(
subPtr
,
key
));;
subPtr
.
p
->
m_outstandingRequest
=
GSN_GREP_START_REQ
;
subPtr
.
p
->
m_operationPtrI
=
operationPtrI
;
/**
* send SUB_START_REQ to local SUMA
*/
SubStartReq
*
sumaReq
=
(
SubStartReq
*
)
grepReq
;
sumaReq
->
subscriptionId
=
subId
;
sumaReq
->
subscriptionKey
=
subKey
;
sumaReq
->
subscriberData
=
subPtr
.
i
;
sumaReq
->
part
=
(
Uint32
)
part
;
sendSignal
(
SUMA_REF
,
GSN_SUB_START_REQ
,
signal
,
SubStartReq
::
SignalLength
,
JBB
);
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)"
,
subId
,
subKey
,
(
Uint32
)
part
);
#endif
}
void
Grep
::
PSPart
::
execSUB_START_CONF
(
Signal
*
signal
)
{
jamEntry
();
SubStartConf
*
const
conf
=
(
SubStartConf
*
)
signal
->
getDataPtr
();
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
conf
->
part
;
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
subData
=
conf
->
subscriberData
;
Uint32
firstGCI
=
conf
->
firstGCI
;
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSPart: Recd SUB_START_CONF "
"(subId:%d, subKey:%d, subData:%d)"
,
subId
,
subKey
,
subData
);
#endif
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionId
==
subId
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionKey
==
subKey
);
GrepStartConf
*
grepConf
=
(
GrepStartConf
*
)
conf
;
grepConf
->
senderData
=
subPtr
.
p
->
m_operationPtrI
;
grepConf
->
part
=
(
Uint32
)
part
;
grepConf
->
subscriptionKey
=
subKey
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
firstGCI
=
firstGCI
;
grepConf
->
senderNodeId
=
getOwnNodeId
();
sendSignal
(
subPtr
.
p
->
m_coordinatorRef
,
GSN_GREP_START_CONF
,
signal
,
GrepStartConf
::
SignalLength
,
JBB
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSPart: Sent GREP_START_CONF "
"(subId:%d, subKey:%d, subData:%d, part:%d)"
,
subId
,
subKey
,
subData
,
part
);
#endif
}
/**
* Handle errors that either occured in:
* 1) PSPart
* or
* 2) propagated from local SUMA
*
* Propagates REF signal to PSCoord
*/
void
Grep
::
PSPart
::
execSUB_START_REF
(
Signal
*
signal
)
{
SubStartRef
*
const
ref
=
(
SubStartRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
subscriberData
;
GrepError
::
GE_Code
err
=
(
GrepError
::
GE_Code
)
ref
->
err
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
ref
->
part
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToPSCoord
(
signal
,
*
subPtr
.
p
,
err
/*error*/
,
part
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
}
/**
* Logging has started... (says PS Participant)
*/
void
Grep
::
PSCoord
::
execGREP_START_CONF
(
Signal
*
signal
)
{
jamEntry
();
GrepStartConf
*
const
conf
=
(
GrepStartConf
*
)
signal
->
getDataPtr
();
Uint32
subData
=
conf
->
senderData
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
conf
->
part
;
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
firstGCI
=
conf
->
firstGCI
;
SubCoordinatorPtr
subPtr
;
c_subCoordinatorPool
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_outstandingRequest
==
GSN_GREP_START_REQ
);
subPtr
.
p
->
m_outstandingParticipants
.
clearWaitingFor
(
conf
->
senderNodeId
);
if
(
!
subPtr
.
p
->
m_outstandingParticipants
.
done
())
return
;
jam
();
/*************************
* All participants ready
*************************/
GrepSubStartConf
*
grepConf
=
(
GrepSubStartConf
*
)
conf
;
grepConf
->
part
=
part
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
subscriptionKey
=
subKey
;
grepConf
->
firstGCI
=
firstGCI
;
bool
ok
=
false
;
switch
(
part
)
{
case
SubscriptionData
:
:
MetaData
:
ok
=
true
;
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_SUB_START_CONF
,
signal
,
GrepSubStartConf
::
SignalLength
,
JBB
);
/**
* Send event report
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
GrepEvent
::
GrepPS_SubStartMetaConf
,
subId
,
subKey
,
(
Uint32
)
GrepError
::
GE_NO_ERROR
);
c_subCoordinatorPool
.
release
(
subPtr
);
break
;
case
SubscriptionData
:
:
TableData
:
ok
=
true
;
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_SUB_START_CONF
,
signal
,
GrepSubStartConf
::
SignalLength
,
JBB
);
/**
* Send event report
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
GrepEvent
::
GrepPS_SubStartDataConf
,
subId
,
subKey
,
(
Uint32
)
GrepError
::
GE_NO_ERROR
);
c_subCoordinatorPool
.
release
(
subPtr
);
break
;
}
ndbrequire
(
ok
);
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) "
"from all slaves"
,
subId
,
subKey
,
(
Uint32
)
part
);
#endif
}
/**
* Handle errors that either occured in:
* 1) PSCoord
* or
* 2) propagated from PSPart
*/
void
Grep
::
PSCoord
::
execGREP_START_REF
(
Signal
*
signal
)
{
jamEntry
();
GrepStartRef
*
const
ref
=
(
GrepStartRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
senderData
;
GrepError
::
GE_Code
err
=
(
GrepError
::
GE_Code
)
ref
->
err
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
ref
->
part
;
SubCoordinatorPtr
subPtr
;
c_runningSubscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToSS
(
signal
,
*
subPtr
.
p
,
err
/*error*/
,
part
);
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: REMOVE SUBSCRIPTION
* ------------------------------------------------------------------------
*
* Remove a subscription at SUMA.
* Each participant removes its own subscription.
* We start by deleting the subscription inside the requestor
* since, we don't know if nodes (REP nodes or DB nodes)
* have disconnected after we sent out this and
* if we dont delete the sub in the requestor now,
* we won't be able to create a new subscription
**************************************************************************/
/**
* Request to abort subscription (Sent from SS)
*/
void
Grep
::
PSCoord
::
execGREP_SUB_REMOVE_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepSubRemoveReq
*
const
subReq
=
(
GrepSubRemoveReq
*
)
signal
->
getDataPtr
();
Uint32
subId
=
subReq
->
subscriptionId
;
Uint32
subKey
=
subReq
->
subscriptionKey
;
BlockReference
rep
=
signal
->
getSendersBlockRef
();
SubCoordinatorPtr
subPtr
;
if
(
!
c_subCoordinatorPool
.
seize
(
subPtr
))
{
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
rep
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sub
.
m_outstandingRequest
=
GSN_GREP_REMOVE_REQ
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
NOSPACE_IN_POOL
);
return
;
}
prepareOperationRec
(
subPtr
,
numberToRef
(
PSREPBLOCKNO
,
refToNode
(
rep
)),
subId
,
subKey
,
GSN_GREP_REMOVE_REQ
);
c_runningSubscriptions
.
add
(
subPtr
);
GrepRemoveReq
*
req
=
(
GrepRemoveReq
*
)
subReq
;
req
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
req
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
req
->
senderData
=
subPtr
.
p
->
m_subscriberData
;
req
->
senderRef
=
subPtr
.
p
->
m_coordinatorRef
;
/***************************
* Send to all participants
***************************/
NodeReceiverGroup
rg
(
GREP
,
m_grep
->
m_aliveNodes
);
subPtr
.
p
->
m_outstandingParticipants
=
rg
;
sendSignal
(
rg
,
GSN_GREP_REMOVE_REQ
,
signal
,
GrepRemoveReq
::
SignalLength
,
JBB
);
}
void
Grep
::
PSPart
::
execGREP_REMOVE_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepRemoveReq
*
const
grepReq
=
(
GrepRemoveReq
*
)
signal
->
getDataPtr
();
Uint32
subId
=
grepReq
->
subscriptionId
;
Uint32
subKey
=
grepReq
->
subscriptionKey
;
Uint32
subData
=
grepReq
->
senderData
;
Uint32
coordinator
=
grepReq
->
senderRef
;
Subscription
key
;
key
.
m_subscriptionId
=
subId
;
key
.
m_subscriptionKey
=
subKey
;
SubscriptionPtr
subPtr
;
if
(
!
c_subscriptions
.
find
(
subPtr
,
key
))
{
/**
* The subscription was not found, so it must be deleted.
* Send CONF back, since it does not exist (thus, it is removed)
*/
GrepRemoveConf
*
grepConf
=
(
GrepRemoveConf
*
)
grepReq
;
grepConf
->
subscriptionKey
=
subKey
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
senderData
=
subData
;
grepConf
->
senderNodeId
=
getOwnNodeId
();
sendSignal
(
coordinator
,
GSN_GREP_REMOVE_CONF
,
signal
,
GrepRemoveConf
::
SignalLength
,
JBB
);
return
;
}
subPtr
.
p
->
m_operationPtrI
=
subData
;
subPtr
.
p
->
m_coordinatorRef
=
coordinator
;
subPtr
.
p
->
m_outstandingRequest
=
GSN_GREP_REMOVE_REQ
;
/**
* send SUB_REMOVE_REQ to local SUMA
*/
SubRemoveReq
*
sumaReq
=
(
SubRemoveReq
*
)
grepReq
;
sumaReq
->
subscriptionId
=
subId
;
sumaReq
->
subscriptionKey
=
subKey
;
sumaReq
->
senderData
=
subPtr
.
i
;
sendSignal
(
SUMA_REF
,
GSN_SUB_REMOVE_REQ
,
signal
,
SubStartReq
::
SignalLength
,
JBB
);
}
/**
* SUB_REMOVE_CONF (from local SUMA)
*/
void
Grep
::
PSPart
::
execSUB_REMOVE_CONF
(
Signal
*
signal
)
{
jamEntry
();
SubRemoveConf
*
const
conf
=
(
SubRemoveConf
*
)
signal
->
getDataPtr
();
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
subData
=
conf
->
subscriberData
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionId
==
subId
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionKey
==
subKey
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
GrepRemoveConf
*
grepConf
=
(
GrepRemoveConf
*
)
conf
;
grepConf
->
subscriptionKey
=
subKey
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
senderData
=
subPtr
.
p
->
m_operationPtrI
;
grepConf
->
senderNodeId
=
getOwnNodeId
();
sendSignal
(
subPtr
.
p
->
m_coordinatorRef
,
GSN_GREP_REMOVE_CONF
,
signal
,
GrepRemoveConf
::
SignalLength
,
JBB
);
c_subscriptions
.
release
(
subPtr
);
}
/**
* SUB_REMOVE_CONF (from local SUMA)
*/
void
Grep
::
PSPart
::
execSUB_REMOVE_REF
(
Signal
*
signal
)
{
jamEntry
();
SubRemoveRef
*
const
ref
=
(
SubRemoveRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
subscriberData
;
/* GrepError::GE_Code err = (GrepError::GE_Code)ref->err;*/
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
//sendSubRemoveRef_PSCoord(signal, *subPtr.p, err /*error*/);
}
/**
* Aborting has been carried out (says Participants)
*/
void
Grep
::
PSCoord
::
execGREP_REMOVE_CONF
(
Signal
*
signal
)
{
jamEntry
();
GrepRemoveConf
*
const
conf
=
(
GrepRemoveConf
*
)
signal
->
getDataPtr
();
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
senderNodeId
=
conf
->
senderNodeId
;
Uint32
subData
=
conf
->
senderData
;
SubCoordinatorPtr
subPtr
;
c_subCoordinatorPool
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_outstandingRequest
==
GSN_GREP_REMOVE_REQ
);
subPtr
.
p
->
m_outstandingParticipants
.
clearWaitingFor
(
senderNodeId
);
if
(
!
subPtr
.
p
->
m_outstandingParticipants
.
done
())
{
jam
();
return
;
}
jam
();
/*************************
* All participants ready
*************************/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
GrepEvent
::
GrepPS_SubRemoveConf
,
subId
,
subKey
,
GrepError
::
GE_NO_ERROR
);
GrepSubRemoveConf
*
grepConf
=
(
GrepSubRemoveConf
*
)
conf
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
subscriptionKey
=
subKey
;
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_SUB_REMOVE_CONF
,
signal
,
GrepSubRemoveConf
::
SignalLength
,
JBB
);
c_subCoordinatorPool
.
release
(
subPtr
);
}
void
Grep
::
PSCoord
::
execGREP_REMOVE_REF
(
Signal
*
signal
)
{
jamEntry
();
GrepRemoveRef
*
const
ref
=
(
GrepRemoveRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
senderData
;
Uint32
err
=
ref
->
err
;
SubCoordinatorPtr
subPtr
;
/**
* Get the operationrecord matching subdata and remove it. Subsequent
* execGREP_REMOVE_REF will simply be ignored at this stage.
*/
for
(
c_runningSubscriptions
.
first
(
c_subPtr
);
!
c_subPtr
.
isNull
();
c_runningSubscriptions
.
next
(
c_subPtr
))
{
jam
();
subPtr
.
i
=
c_subPtr
.
curr
.
i
;
subPtr
.
p
=
c_runningSubscriptions
.
getPtr
(
subPtr
.
i
);
if
(
subData
==
subPtr
.
i
)
{
sendRefToSS
(
signal
,
*
subPtr
.
p
,
(
GrepError
::
GE_Code
)
err
/*error*/
);
c_runningSubscriptions
.
release
(
subPtr
);
return
;
}
}
return
;
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: LOG RECORDS (COMING IN FROM LOCAL SUMA)
* ------------------------------------------------------------------------
*
* After the subscription is started, we get log records from SUMA.
* Both table data and meta data log records are received.
*
* TODO:
* @todo Changes in meta data is currently not
* allowed during global replication
**************************************************************************/
void
Grep
::
PSPart
::
execSUB_META_DATA
(
Signal
*
signal
)
{
jamEntry
();
if
(
m_recoveryMode
)
{
jam
();
return
;
}
/**
* METASCAN and METALOG
*/
SubMetaData
*
data
=
(
SubMetaData
*
)
signal
->
getDataPtrSend
();
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
data
->
subscriberData
);
/***************************
* Forward data to REP node
***************************/
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_SUB_META_DATA
,
signal
,
SubMetaData
::
SignalLength
,
JBB
);
#ifdef DEBUG_GREP_SUBSCRIPTION
ndbout_c
(
"Grep::PSPart: Sent SUB_META_DATA to REP "
"(TableId: %d, SenderData: %d, GCI: %d)"
,
data
->
tableId
,
data
->
senderData
,
data
->
gci
);
#endif
}
/**
* Receive table data from SUMA and dispatches it to REP node.
*/
void
Grep
::
PSPart
::
execSUB_TABLE_DATA
(
Signal
*
signal
)
{
jamEntry
();
if
(
m_recoveryMode
)
{
jam
();
return
;
}
ndbrequire
(
m_repRef
!=
0
);
if
(
!
assembleFragments
(
signal
))
{
jam
();
return
;
}
/**
* Check if it is SCAN or LOG data that has arrived
*/
if
(
signal
->
getNoOfSections
()
==
2
)
{
jam
();
/**
* DATASCAN - Not marked with GCI, so mark with latest seen GCI
*/
if
(
m_firstScanGCI
==
1
&&
m_lastScanGCI
==
0
)
{
m_firstScanGCI
=
m_latestSeenGCI
;
m_lastScanGCI
=
m_latestSeenGCI
;
}
SubTableData
*
data
=
(
SubTableData
*
)
signal
->
getDataPtrSend
();
Uint32
subData
=
data
->
senderData
;
data
->
gci
=
m_latestSeenGCI
;
data
->
logType
=
SubTableData
::
SCAN
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_SUB_TABLE_DATA
,
signal
,
SubTableData
::
SignalLength
,
JBB
);
#ifdef DEBUG_GREP
ndbout_c
(
"Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)"
,
data
->
gci
);
#endif
}
else
{
jam
();
/**
* DATALOG (TRIGGER) - Already marked with GCI
*/
SubTableData
*
data
=
(
SubTableData
*
)
signal
->
getDataPtrSend
();
data
->
logType
=
SubTableData
::
LOG
;
Uint32
subData
=
data
->
senderData
;
if
(
data
->
gci
>
m_latestSeenGCI
)
m_latestSeenGCI
=
data
->
gci
;
// Reformat to sections and send to replication node.
LinearSectionPtr
ptr
[
3
];
ptr
[
0
].
p
=
signal
->
theData
+
25
;
ptr
[
0
].
sz
=
data
->
noOfAttributes
;
ptr
[
1
].
p
=
signal
->
theData
+
25
+
MAX_ATTRIBUTES_IN_TABLE
;
ptr
[
1
].
sz
=
data
->
dataSize
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_SUB_TABLE_DATA
,
signal
,
SubTableData
::
SignalLength
,
JBB
,
ptr
,
2
);
#ifdef DEBUG_GREP
ndbout_c
(
"Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)"
,
data
->
gci
);
#endif
}
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: START SYNCHRONIZATION
* ------------------------------------------------------------------------
*
*
**************************************************************************/
/**
* Request to start sync (from Rep SS)
*/
void
Grep
::
PSCoord
::
execGREP_SUB_SYNC_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepSubSyncReq
*
const
subReq
=
(
GrepSubSyncReq
*
)
signal
->
getDataPtr
();
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
subReq
->
part
;
Uint32
subId
=
subReq
->
subscriptionId
;
Uint32
subKey
=
subReq
->
subscriptionKey
;
BlockReference
rep
=
signal
->
getSendersBlockRef
();
SubCoordinatorPtr
subPtr
;
if
(
!
c_subCoordinatorPool
.
seize
(
subPtr
))
{
jam
();
SubCoordinator
sub
;
sub
.
m_subscriberRef
=
rep
;
sub
.
m_subscriptionId
=
0
;
sub
.
m_subscriptionKey
=
0
;
sub
.
m_outstandingRequest
=
GSN_GREP_SYNC_REQ
;
sendRefToSS
(
signal
,
sub
,
GrepError
::
NOSPACE_IN_POOL
);
return
;
}
prepareOperationRec
(
subPtr
,
numberToRef
(
PSREPBLOCKNO
,
refToNode
(
rep
)),
subId
,
subKey
,
GSN_GREP_SYNC_REQ
);
GrepSyncReq
*
req
=
(
GrepSyncReq
*
)
subReq
;
req
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
req
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
req
->
senderData
=
subPtr
.
p
->
m_subscriberData
;
req
->
part
=
(
Uint32
)
part
;
/***************************
* Send to all participants
***************************/
NodeReceiverGroup
rg
(
GREP
,
m_grep
->
m_aliveNodes
);
subPtr
.
p
->
m_outstandingParticipants
=
rg
;
sendSignal
(
rg
,
GSN_GREP_SYNC_REQ
,
signal
,
GrepSyncReq
::
SignalLength
,
JBB
);
}
/**
* Sync req from Grep::PSCoord to PS particpant
*/
void
Grep
::
PSPart
::
execGREP_SYNC_REQ
(
Signal
*
signal
)
{
jamEntry
();
GrepSyncReq
*
const
grepReq
=
(
GrepSyncReq
*
)
signal
->
getDataPtr
();
Uint32
part
=
grepReq
->
part
;
Uint32
subId
=
grepReq
->
subscriptionId
;
Uint32
subKey
=
grepReq
->
subscriptionKey
;
Uint32
subData
=
grepReq
->
senderData
;
Subscription
key
;
key
.
m_subscriptionId
=
subId
;
key
.
m_subscriptionKey
=
subKey
;
SubscriptionPtr
subPtr
;
ndbrequire
(
c_subscriptions
.
find
(
subPtr
,
key
));
subPtr
.
p
->
m_operationPtrI
=
subData
;
subPtr
.
p
->
m_outstandingRequest
=
GSN_GREP_SYNC_REQ
;
/**********************************
* Send SUB_SYNC_REQ to local SUMA
**********************************/
SubSyncReq
*
sumaReq
=
(
SubSyncReq
*
)
grepReq
;
sumaReq
->
subscriptionId
=
subId
;
sumaReq
->
subscriptionKey
=
subKey
;
sumaReq
->
subscriberData
=
subPtr
.
i
;
sumaReq
->
part
=
part
;
sendSignal
(
SUMA_REF
,
GSN_SUB_SYNC_REQ
,
signal
,
SubSyncReq
::
SignalLength
,
JBB
);
}
/**
* SYNC conf from SUMA
*/
void
Grep
::
PSPart
::
execSUB_SYNC_CONF
(
Signal
*
signal
)
{
jamEntry
();
SubSyncConf
*
const
conf
=
(
SubSyncConf
*
)
signal
->
getDataPtr
();
Uint32
part
=
conf
->
part
;
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
subData
=
conf
->
subscriberData
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionId
==
subId
);
ndbrequire
(
subPtr
.
p
->
m_subscriptionKey
==
subKey
);
GrepSyncConf
*
grepConf
=
(
GrepSyncConf
*
)
conf
;
grepConf
->
senderNodeId
=
getOwnNodeId
();
grepConf
->
part
=
part
;
grepConf
->
firstGCI
=
m_firstScanGCI
;
grepConf
->
lastGCI
=
m_lastScanGCI
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
subscriptionKey
=
subKey
;
grepConf
->
senderData
=
subPtr
.
p
->
m_operationPtrI
;
sendSignal
(
subPtr
.
p
->
m_coordinatorRef
,
GSN_GREP_SYNC_CONF
,
signal
,
GrepSyncConf
::
SignalLength
,
JBB
);
m_firstScanGCI
=
1
;
m_lastScanGCI
=
0
;
subPtr
.
p
->
m_outstandingRequest
=
0
;
}
/**
* Handle errors that either occured in:
* 1) PSPart
* or
* 2) propagated from local SUMA
*
* Propagates REF signal to PSCoord
*/
void
Grep
::
PSPart
::
execSUB_SYNC_REF
(
Signal
*
signal
)
{
jamEntry
();
SubSyncRef
*
const
ref
=
(
SubSyncRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
subscriberData
;
GrepError
::
GE_Code
err
=
(
GrepError
::
GE_Code
)
ref
->
err
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
ref
->
part
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToPSCoord
(
signal
,
*
subPtr
.
p
,
err
/*error*/
,
part
);
subPtr
.
p
->
m_outstandingRequest
=
0
;
}
/**
* Syncing has started... (says PS Participant)
*/
void
Grep
::
PSCoord
::
execGREP_SYNC_CONF
(
Signal
*
signal
)
{
jamEntry
();
GrepSyncConf
const
*
conf
=
(
GrepSyncConf
*
)
signal
->
getDataPtr
();
Uint32
part
=
conf
->
part
;
Uint32
firstGCI
=
conf
->
firstGCI
;
Uint32
lastGCI
=
conf
->
lastGCI
;
Uint32
subId
=
conf
->
subscriptionId
;
Uint32
subKey
=
conf
->
subscriptionKey
;
Uint32
subData
=
conf
->
senderData
;
SubCoordinatorPtr
subPtr
;
c_subCoordinatorPool
.
getPtr
(
subPtr
,
subData
);
ndbrequire
(
subPtr
.
p
->
m_outstandingRequest
==
GSN_GREP_SYNC_REQ
);
subPtr
.
p
->
m_outstandingParticipants
.
clearWaitingFor
(
conf
->
senderNodeId
);
if
(
!
subPtr
.
p
->
m_outstandingParticipants
.
done
())
return
;
/**
* Send event
*/
GrepEvent
::
Subscription
event
;
if
(
part
==
SubscriptionData
::
MetaData
)
event
=
GrepEvent
::
GrepPS_SubSyncMetaConf
;
else
event
=
GrepEvent
::
GrepPS_SubSyncDataConf
;
/* @todo Johan: Add firstGCI here. /Lars */
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionInfo
,
event
,
subId
,
subKey
,
(
Uint32
)
GrepError
::
GE_NO_ERROR
,
lastGCI
);
/*************************
* All participants ready
*************************/
GrepSubSyncConf
*
grepConf
=
(
GrepSubSyncConf
*
)
conf
;
grepConf
->
part
=
part
;
grepConf
->
firstGCI
=
firstGCI
;
grepConf
->
lastGCI
=
lastGCI
;
grepConf
->
subscriptionId
=
subId
;
grepConf
->
subscriptionKey
=
subKey
;
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_GREP_SUB_SYNC_CONF
,
signal
,
GrepSubSyncConf
::
SignalLength
,
JBB
);
c_subCoordinatorPool
.
release
(
subPtr
);
}
/**
* Handle errors that either occured in:
* 1) PSCoord
* or
* 2) propagated from PSPart
*/
void
Grep
::
PSCoord
::
execGREP_SYNC_REF
(
Signal
*
signal
)
{
jamEntry
();
GrepSyncRef
*
const
ref
=
(
GrepSyncRef
*
)
signal
->
getDataPtr
();
Uint32
subData
=
ref
->
senderData
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
ref
->
part
;
GrepError
::
GE_Code
err
=
(
GrepError
::
GE_Code
)
ref
->
err
;
SubCoordinatorPtr
subPtr
;
c_runningSubscriptions
.
getPtr
(
subPtr
,
subData
);
sendRefToSS
(
signal
,
*
subPtr
.
p
,
err
/*error*/
,
part
);
}
void
Grep
::
PSCoord
::
sendRefToSS
(
Signal
*
signal
,
SubCoordinator
sub
,
GrepError
::
GE_Code
err
,
SubscriptionData
::
Part
part
)
{
/**
GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend();
ref->senderData = sub.m_subscriberData;
ref->subscriptionId = sub.m_subscriptionId;
ref->subscriptionKey = sub.m_subscriptionKey;
ref->err = err;
sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal,
GrepCreateRef::SignalLength, JBB);
*/
jam
();
GrepEvent
::
Subscription
event
;
switch
(
sub
.
m_outstandingRequest
)
{
case
GSN_GREP_CREATE_SUBID_REQ
:
{
jam
();
CreateSubscriptionIdRef
*
ref
=
(
CreateSubscriptionIdRef
*
)
signal
->
getDataPtrSend
();
ref
->
err
=
(
Uint32
)
err
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
sendSignal
(
sub
.
m_subscriberRef
,
GSN_GREP_CREATE_SUBID_REF
,
signal
,
CreateSubscriptionIdRef
::
SignalLength
,
JBB
);
event
=
GrepEvent
::
GrepPS_CreateSubIdRef
;
}
break
;
case
GSN_GREP_CREATE_REQ
:
{
jam
();
GrepSubCreateRef
*
ref
=
(
GrepSubCreateRef
*
)
signal
->
getDataPtrSend
();
ref
->
err
=
(
Uint32
)
err
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
sendSignal
(
sub
.
m_subscriberRef
,
GSN_GREP_SUB_CREATE_REF
,
signal
,
GrepSubCreateRef
::
SignalLength
,
JBB
);
event
=
GrepEvent
::
GrepPS_SubCreateRef
;
}
break
;
case
GSN_GREP_SYNC_REQ
:
{
jam
();
GrepSubSyncRef
*
ref
=
(
GrepSubSyncRef
*
)
signal
->
getDataPtrSend
();
ref
->
err
=
(
Uint32
)
err
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
part
=
(
SubscriptionData
::
Part
)
part
;
sendSignal
(
sub
.
m_subscriberRef
,
GSN_GREP_SUB_SYNC_REF
,
signal
,
GrepSubSyncRef
::
SignalLength
,
JBB
);
if
(
part
==
SubscriptionData
::
MetaData
)
event
=
GrepEvent
::
GrepPS_SubSyncMetaRef
;
else
event
=
GrepEvent
::
GrepPS_SubSyncDataRef
;
}
break
;
case
GSN_GREP_START_REQ
:
{
jam
();
GrepSubStartRef
*
ref
=
(
GrepSubStartRef
*
)
signal
->
getDataPtrSend
();
ref
->
err
=
(
Uint32
)
err
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
sendSignal
(
sub
.
m_subscriberRef
,
GSN_GREP_SUB_START_REF
,
signal
,
GrepSubStartRef
::
SignalLength
,
JBB
);
if
(
part
==
SubscriptionData
::
MetaData
)
event
=
GrepEvent
::
GrepPS_SubStartMetaRef
;
else
event
=
GrepEvent
::
GrepPS_SubStartDataRef
;
/**
* Send event report
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionAlert
,
event
,
sub
.
m_subscriptionId
,
sub
.
m_subscriptionKey
,
(
Uint32
)
err
);
}
break
;
case
GSN_GREP_REMOVE_REQ
:
{
jam
();
GrepSubRemoveRef
*
ref
=
(
GrepSubRemoveRef
*
)
signal
->
getDataPtrSend
();
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
err
=
(
Uint32
)
err
;
sendSignal
(
sub
.
m_subscriberRef
,
GSN_GREP_SUB_REMOVE_REF
,
signal
,
GrepSubRemoveRef
::
SignalLength
,
JBB
);
event
=
GrepEvent
::
GrepPS_SubRemoveRef
;
}
break
;
default:
ndbrequire
(
false
);
event
=
GrepEvent
::
Rep_Disconnect
;
// remove compiler warning
}
/**
* Finally, send an event.
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionAlert
,
event
,
sub
.
m_subscriptionId
,
sub
.
m_subscriptionKey
,
err
);
}
void
Grep
::
PSPart
::
sendRefToPSCoord
(
Signal
*
signal
,
Subscription
sub
,
GrepError
::
GE_Code
err
,
SubscriptionData
::
Part
part
)
{
jam
();
GrepEvent
::
Subscription
event
;
switch
(
sub
.
m_outstandingRequest
)
{
case
GSN_GREP_CREATE_REQ
:
{
GrepCreateRef
*
ref
=
(
GrepCreateRef
*
)
signal
->
getDataPtrSend
();
ref
->
senderData
=
sub
.
m_subscriberData
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
err
=
err
;
sendSignal
(
sub
.
m_coordinatorRef
,
GSN_GREP_CREATE_REF
,
signal
,
GrepCreateRef
::
SignalLength
,
JBB
);
event
=
GrepEvent
::
GrepPS_SubCreateRef
;
}
break
;
case
GSN_GREP_SYNC_REQ
:
{
GrepSyncRef
*
ref
=
(
GrepSyncRef
*
)
signal
->
getDataPtrSend
();
ref
->
senderData
=
sub
.
m_subscriberData
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
part
=
part
;
ref
->
err
=
err
;
sendSignal
(
sub
.
m_coordinatorRef
,
GSN_GREP_SYNC_REF
,
signal
,
GrepSyncRef
::
SignalLength
,
JBB
);
if
(
part
==
SubscriptionData
::
MetaData
)
event
=
GrepEvent
::
GrepPS_SubSyncMetaRef
;
else
event
=
GrepEvent
::
GrepPS_SubSyncDataRef
;
}
break
;
case
GSN_GREP_START_REQ
:
{
jam
();
GrepStartRef
*
ref
=
(
GrepStartRef
*
)
signal
->
getDataPtrSend
();
ref
->
senderData
=
sub
.
m_subscriberData
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
part
=
(
Uint32
)
part
;
ref
->
err
=
err
;
sendSignal
(
sub
.
m_coordinatorRef
,
GSN_GREP_START_REF
,
signal
,
GrepStartRef
::
SignalLength
,
JBB
);
if
(
part
==
SubscriptionData
::
MetaData
)
event
=
GrepEvent
::
GrepPS_SubStartMetaRef
;
else
event
=
GrepEvent
::
GrepPS_SubStartDataRef
;
}
break
;
case
GSN_GREP_REMOVE_REQ
:
{
jamEntry
();
GrepRemoveRef
*
ref
=
(
GrepRemoveRef
*
)
signal
->
getDataPtrSend
();
ref
->
senderData
=
sub
.
m_operationPtrI
;
ref
->
subscriptionId
=
sub
.
m_subscriptionId
;
ref
->
subscriptionKey
=
sub
.
m_subscriptionKey
;
ref
->
err
=
err
;
sendSignal
(
sub
.
m_coordinatorRef
,
GSN_GREP_REMOVE_REF
,
signal
,
GrepCreateRef
::
SignalLength
,
JBB
);
}
break
;
default:
ndbrequire
(
false
);
event
=
GrepEvent
::
Rep_Disconnect
;
// remove compiler warning
}
/**
* Finally, send an event.
*/
m_grep
->
sendEventRep
(
signal
,
EventReport
::
GrepSubscriptionAlert
,
event
,
sub
.
m_subscriptionId
,
sub
.
m_subscriptionKey
,
err
);
}
/**************************************************************************
* ------------------------------------------------------------------------
* MODULE: GREP PS Coordinator GCP
* ------------------------------------------------------------------------
*
*
**************************************************************************/
void
Grep
::
PSPart
::
execSUB_GCP_COMPLETE_REP
(
Signal
*
signal
)
{
jamEntry
();
if
(
m_recoveryMode
)
{
jam
();
return
;
}
SubGcpCompleteRep
*
rep
=
(
SubGcpCompleteRep
*
)
signal
->
getDataPtrSend
();
rep
->
senderRef
=
reference
();
if
(
rep
->
gci
>
m_latestSeenGCI
)
m_latestSeenGCI
=
rep
->
gci
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
first
(
c_subPtr
);
for
(;
!
c_subPtr
.
isNull
();
c_subscriptions
.
next
(
c_subPtr
))
{
subPtr
.
i
=
c_subPtr
.
curr
.
i
;
subPtr
.
p
=
c_subscriptions
.
getPtr
(
subPtr
.
i
);
sendSignal
(
subPtr
.
p
->
m_subscriberRef
,
GSN_SUB_GCP_COMPLETE_REP
,
signal
,
SubGcpCompleteRep
::
SignalLength
,
JBB
);
}
#ifdef DEBUG_GREP
ndbout_c
(
"Grep::PSPart: Recd SUB_GCP_COMPLETE_REP "
"(GCI: %d, nodeId: %d) from SUMA"
,
rep
->
gci
,
refToNode
(
rep
->
senderRef
));
#endif
}
void
Grep
::
PSPart
::
execSUB_SYNC_CONTINUE_REQ
(
Signal
*
signal
)
{
jamEntry
();
SubSyncContinueReq
*
const
req
=
(
SubSyncContinueReq
*
)
signal
->
getDataPtr
();
Uint32
subData
=
req
->
subscriberData
;
SubscriptionPtr
subPtr
;
c_subscriptions
.
getPtr
(
subPtr
,
subData
);
/**
* @todo Figure out how to control how much data we can receive?
*/
SubSyncContinueConf
*
conf
=
(
SubSyncContinueConf
*
)
req
;
conf
->
subscriptionId
=
subPtr
.
p
->
m_subscriptionId
;
conf
->
subscriptionKey
=
subPtr
.
p
->
m_subscriptionKey
;
sendSignal
(
SUMA_REF
,
GSN_SUB_SYNC_CONTINUE_CONF
,
signal
,
SubSyncContinueConf
::
SignalLength
,
JBB
);
}
void
Grep
::
sendEventRep
(
Signal
*
signal
,
EventReport
::
EventType
type
,
GrepEvent
::
Subscription
event
,
Uint32
subId
,
Uint32
subKey
,
Uint32
err
,
Uint32
other
)
{
jam
();
signal
->
theData
[
0
]
=
type
;
signal
->
theData
[
1
]
=
event
;
signal
->
theData
[
2
]
=
subId
;
signal
->
theData
[
3
]
=
subKey
;
signal
->
theData
[
4
]
=
err
;
if
(
other
==
0
)
sendSignal
(
CMVMI_REF
,
GSN_EVENT_REP
,
signal
,
5
,
JBB
);
else
{
signal
->
theData
[
5
]
=
other
;
sendSignal
(
CMVMI_REF
,
GSN_EVENT_REP
,
signal
,
6
,
JBB
);
}
}
ndb/src/kernel/blocks/grep/Grep.hpp
deleted
100644 → 0
View file @
28e3fe87
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef GREP_HPP
#define GREP_HPP
#include <ndb_limits.h>
#include <SimulatedBlock.hpp>
#include <NodeBitmask.hpp>
#include <SignalCounter.hpp>
#include <SLList.hpp>
#include <DLList.hpp>
#include <GrepError.hpp>
#include <GrepEvent.hpp>
#include <signaldata/EventReport.hpp>
#include <signaldata/SumaImpl.hpp>
/**
* Module in block (Should be placed elsewhere)
*/
class
BlockComponent
{
public:
BlockComponent
(
SimulatedBlock
*
);
BlockReference
reference
()
{
return
m_sb
->
reference
();
};
BlockNumber
number
()
{
return
m_sb
->
number
();
};
void
sendSignal
(
NodeReceiverGroup
rg
,
GlobalSignalNumber
gsn
,
Signal
*
signal
,
Uint32
length
,
JobBufferLevel
jbuf
)
const
{
m_sb
->
sendSignal
(
rg
,
gsn
,
signal
,
length
,
jbuf
);
}
void
sendSignal
(
BlockReference
ref
,
GlobalSignalNumber
gsn
,
Signal
*
signal
,
Uint32
length
,
JobBufferLevel
jbuf
)
const
{
m_sb
->
sendSignal
(
ref
,
gsn
,
signal
,
length
,
jbuf
);
}
void
sendSignal
(
BlockReference
ref
,
GlobalSignalNumber
gsn
,
Signal
*
signal
,
Uint32
length
,
JobBufferLevel
jbuf
,
LinearSectionPtr
ptr
[
3
],
Uint32
noOfSections
)
const
{
m_sb
->
sendSignal
(
ref
,
gsn
,
signal
,
length
,
jbuf
,
ptr
,
noOfSections
);
}
void
sendSignalWithDelay
(
BlockReference
ref
,
GlobalSignalNumber
gsn
,
Signal
*
signal
,
Uint32
delayInMilliSeconds
,
Uint32
length
)
const
{
m_sb
->
sendSignalWithDelay
(
ref
,
gsn
,
signal
,
delayInMilliSeconds
,
length
);
}
NodeId
getOwnNodeId
()
const
{
return
m_sb
->
getOwnNodeId
();
}
bool
assembleFragments
(
Signal
*
signal
)
{
return
m_sb
->
assembleFragments
(
signal
);
}
void
progError
(
int
line
,
int
err_code
,
const
char
*
extra
)
{
m_sb
->
progError
(
line
,
err_code
,
extra
);
}
private:
SimulatedBlock
*
m_sb
;
};
/**
* Participant of GREP Protocols (not necessarily a protocol coordinator)
*
* This object is only used on primary system
*/
#if 0
class GrepParticipant : public SimulatedBlock
{
protected:
GrepParticipant(const Configuration & conf);
virtual ~GrepParticipant();
BLOCK_DEFINES(GrepParticipant);
protected:
/***************************************************************************
* SUMA Signal Interface
***************************************************************************/
void execSUB_CREATE_CONF(Signal*);
void execSUB_STARTCONF(Signal*);
void execSUB_REMOVE_CONF(Signal*);
void execSUB_META_DATA(Signal*);
void execSUB_TABLE_DATA(Signal*);
void execSUB_SYNC_CONF(Signal*);
void execSUB_GCP_COMPLETE_REP(Signal*);
void execSUB_SYNC_CONTINUE_REQ(Signal*);
/***************************************************************************
* GREP Coordinator Signal Interface
***************************************************************************/
void execGREP_CREATE_REQ(Signal*);
void execGREP_START_REQ(Signal*);
void execGREP_SYNC_REQ(Signal*);
void execGREP_REMOVE_REQ(Signal*);
protected:
BlockReference m_repRef; ///< Replication node (only one rep node per grep)
private:
BlockReference m_coordinator;
Uint32 m_latestSeenGCI;
};
#endif
/**
* GREP Coordinator
*/
class
Grep
:
public
SimulatedBlock
//GrepParticipant
{
BLOCK_DEFINES
(
Grep
);
public:
Grep
(
const
Configuration
&
conf
);
virtual
~
Grep
();
private:
/***************************************************************************
* General Signal Recivers
***************************************************************************/
void
execSTTOR
(
Signal
*
);
void
sendSTTORRY
(
Signal
*
);
void
execNDB_STTOR
(
Signal
*
);
void
execDUMP_STATE_ORD
(
Signal
*
);
void
execREAD_NODESCONF
(
Signal
*
);
void
execNODE_FAILREP
(
Signal
*
);
void
execINCL_NODEREQ
(
Signal
*
);
void
execGREP_REQ
(
Signal
*
);
void
execAPI_FAILREQ
(
Signal
*
);
/**
* Forwarded to PSCoord
*/
//CONF
void
fwdGREP_CREATE_CONF
(
Signal
*
s
)
{
pscoord
.
execGREP_CREATE_CONF
(
s
);
};
void
fwdGREP_START_CONF
(
Signal
*
s
)
{
pscoord
.
execGREP_START_CONF
(
s
);
};
void
fwdGREP_SYNC_CONF
(
Signal
*
s
)
{
pscoord
.
execGREP_SYNC_CONF
(
s
);
};
void
fwdGREP_REMOVE_CONF
(
Signal
*
s
)
{
pscoord
.
execGREP_REMOVE_CONF
(
s
);
};
void
fwdCREATE_SUBID_CONF
(
Signal
*
s
)
{
pscoord
.
execCREATE_SUBID_CONF
(
s
);
};
//REF
void
fwdGREP_CREATE_REF
(
Signal
*
s
)
{
pscoord
.
execGREP_CREATE_REF
(
s
);
};
void
fwdGREP_START_REF
(
Signal
*
s
)
{
pscoord
.
execGREP_START_REF
(
s
);
};
void
fwdGREP_SYNC_REF
(
Signal
*
s
)
{
pscoord
.
execGREP_SYNC_REF
(
s
);
};
void
fwdGREP_REMOVE_REF
(
Signal
*
s
)
{
pscoord
.
execGREP_REMOVE_REF
(
s
);
};
void
fwdCREATE_SUBID_REF
(
Signal
*
s
)
{
pscoord
.
execCREATE_SUBID_REF
(
s
);
};
//REQ
void
fwdGREP_SUB_CREATE_REQ
(
Signal
*
s
)
{
pscoord
.
execGREP_SUB_CREATE_REQ
(
s
);
};
void
fwdGREP_SUB_START_REQ
(
Signal
*
s
)
{
pscoord
.
execGREP_SUB_START_REQ
(
s
);
};
void
fwdGREP_SUB_SYNC_REQ
(
Signal
*
s
)
{
pscoord
.
execGREP_SUB_SYNC_REQ
(
s
);
};
void
fwdGREP_SUB_REMOVE_REQ
(
Signal
*
s
)
{
pscoord
.
execGREP_SUB_REMOVE_REQ
(
s
);
};
void
fwdGREP_CREATE_SUBID_REQ
(
Signal
*
s
)
{
pscoord
.
execGREP_CREATE_SUBID_REQ
(
s
);
};
/**
* Forwarded to PSPart
*/
void
fwdSTART_ME
(
Signal
*
s
){
pspart
.
execSTART_ME
(
s
);
};
void
fwdGREP_ADD_SUB_REQ
(
Signal
*
s
){
pspart
.
execGREP_ADD_SUB_REQ
(
s
);
};
void
fwdGREP_ADD_SUB_REF
(
Signal
*
s
){
pspart
.
execGREP_ADD_SUB_REF
(
s
);
};
void
fwdGREP_ADD_SUB_CONF
(
Signal
*
s
){
pspart
.
execGREP_ADD_SUB_CONF
(
s
);
};
//CONF
void
fwdSUB_CREATE_CONF
(
Signal
*
s
)
{
pspart
.
execSUB_CREATE_CONF
(
s
);
};
void
fwdSUB_START_CONF
(
Signal
*
s
)
{
pspart
.
execSUB_START_CONF
(
s
);
};
void
fwdSUB_REMOVE_CONF
(
Signal
*
s
)
{
pspart
.
execSUB_REMOVE_CONF
(
s
);
};
void
fwdSUB_SYNC_CONF
(
Signal
*
s
)
{
pspart
.
execSUB_SYNC_CONF
(
s
);
};
//REF
void
fwdSUB_CREATE_REF
(
Signal
*
s
)
{
pspart
.
execSUB_CREATE_REF
(
s
);
};
void
fwdSUB_START_REF
(
Signal
*
s
)
{
pspart
.
execSUB_START_REF
(
s
);
};
void
fwdSUB_REMOVE_REF
(
Signal
*
s
)
{
pspart
.
execSUB_REMOVE_REF
(
s
);
};
void
fwdSUB_SYNC_REF
(
Signal
*
s
)
{
pspart
.
execSUB_SYNC_REF
(
s
);
};
//REQ
void
fwdSUB_SYNC_CONTINUE_REQ
(
Signal
*
s
)
{
pspart
.
execSUB_SYNC_CONTINUE_REQ
(
s
);
};
void
fwdGREP_CREATE_REQ
(
Signal
*
s
)
{
pspart
.
execGREP_CREATE_REQ
(
s
);
};
void
fwdGREP_START_REQ
(
Signal
*
s
)
{
pspart
.
execGREP_START_REQ
(
s
);
};
void
fwdGREP_SYNC_REQ
(
Signal
*
s
)
{
pspart
.
execGREP_SYNC_REQ
(
s
);
};
void
fwdGREP_REMOVE_REQ
(
Signal
*
s
)
{
pspart
.
execGREP_REMOVE_REQ
(
s
);
};
void
fwdSUB_META_DATA
(
Signal
*
s
)
{
pspart
.
execSUB_META_DATA
(
s
);
};
void
fwdSUB_TABLE_DATA
(
Signal
*
s
)
{
pspart
.
execSUB_TABLE_DATA
(
s
);
};
void
fwdSUB_GCP_COMPLETE_REP
(
Signal
*
s
)
{
pspart
.
execSUB_GCP_COMPLETE_REP
(
s
);
};
void
sendEventRep
(
Signal
*
signal
,
EventReport
::
EventType
type
,
GrepEvent
::
Subscription
event
,
Uint32
subId
,
Uint32
subKey
,
Uint32
err
,
Uint32
gci
=
0
);
void
getNodeGroupMembers
(
Signal
*
signal
);
/***************************************************************************
* Block Data
***************************************************************************/
struct
Node
{
Uint32
nodeId
;
Uint32
alive
;
Uint32
nextList
;
union
{
Uint32
prevList
;
Uint32
nextPool
;
};
};
typedef
Ptr
<
Node
>
NodePtr
;
NodeId
m_masterNodeId
;
SLList
<
Node
>
m_nodes
;
NdbNodeBitmask
m_aliveNodes
;
ArrayPool
<
Node
>
m_nodePool
;
/**
* for all Suma's to keep track of other Suma's in Node group
*/
Uint32
c_nodeGroup
;
Uint32
c_noNodesInGroup
;
Uint32
c_idInNodeGroup
;
NodeId
c_nodesInGroup
[
4
];
public:
/***************************************************************************
* GREP PS Coordinator
***************************************************************************/
class
PSCoord
:
public
BlockComponent
{
private:
struct
SubCoordinator
{
Uint32
m_subscriberRef
;
Uint32
m_subscriberData
;
Uint32
m_coordinatorRef
;
Uint32
m_subscriptionId
;
Uint32
m_subscriptionKey
;
Uint32
m_subscriptionType
;
NdbNodeBitmask
m_participants
;
Uint32
m_outstandingRequest
;
SignalCounter
m_outstandingParticipants
;
Uint32
nextHash
;
union
{
Uint32
prevHash
;
Uint32
nextPool
;
};
Uint32
hashValue
()
const
{
return
m_subscriptionId
+
m_subscriptionKey
;
}
bool
equal
(
const
SubCoordinator
&
s
)
const
{
return
m_subscriptionId
==
s
.
m_subscriptionId
&&
m_subscriptionKey
==
s
.
m_subscriptionKey
;
}
};
typedef
Ptr
<
SubCoordinator
>
SubCoordinatorPtr
;
ArrayPool
<
SubCoordinator
>
c_subCoordinatorPool
;
DLHashTable
<
SubCoordinator
>::
Iterator
c_subPtr
;
DLHashTable
<
SubCoordinator
>
c_runningSubscriptions
;
void
prepareOperationRec
(
SubCoordinatorPtr
ptr
,
BlockReference
subscriber
,
Uint32
subId
,
Uint32
subKey
,
Uint32
request
);
public:
PSCoord
(
class
Grep
*
);
void
execGREP_CREATE_CONF
(
Signal
*
);
void
execGREP_START_CONF
(
Signal
*
);
void
execGREP_SYNC_CONF
(
Signal
*
);
void
execGREP_REMOVE_CONF
(
Signal
*
);
void
execGREP_CREATE_REF
(
Signal
*
);
void
execGREP_START_REF
(
Signal
*
);
void
execGREP_SYNC_REF
(
Signal
*
);
void
execGREP_REMOVE_REF
(
Signal
*
);
void
execCREATE_SUBID_CONF
(
Signal
*
);
//comes from SUMA
void
execGREP_CREATE_SUBID_REQ
(
Signal
*
);
void
execGREP_SUB_CREATE_REQ
(
Signal
*
);
void
execGREP_SUB_START_REQ
(
Signal
*
);
void
execGREP_SUB_SYNC_REQ
(
Signal
*
);
void
execGREP_SUB_REMOVE_REQ
(
Signal
*
);
void
execCREATE_SUBID_REF
(
Signal
*
);
void
sendCreateSubIdRef_SS
(
Signal
*
signal
,
Uint32
subId
,
Uint32
subKey
,
BlockReference
to
,
GrepError
::
GE_Code
err
);
void
sendSubRemoveRef_SS
(
Signal
*
signal
,
SubCoordinator
sub
,
GrepError
::
GE_Code
err
);
void
sendRefToSS
(
Signal
*
signal
,
SubCoordinator
sub
,
GrepError
::
GE_Code
err
,
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
0
);
void
setRepRef
(
BlockReference
rr
)
{
m_repRef
=
rr
;
};
//void setAliveNodes(NdbNodeBitmask an) { m_aliveNodes = an; };
BlockReference
m_repRef
;
///< Rep node (only one rep node per grep)
// NdbNodeBitmask m_aliveNodes;
Uint32
m_outstandingRequest
;
SignalCounter
m_outstandingParticipants
;
Grep
*
m_grep
;
}
pscoord
;
friend
class
PSCoord
;
/***************************************************************************
* GREP PS Participant
***************************************************************************
* Participant of GREP Protocols (not necessarily a protocol coordinator)
*
* This object is only used on primary system
***************************************************************************/
class
PSPart
:
public
BlockComponent
{
//protected:
//GrepParticipant(const Configuration & conf);
//virtual ~GrepParticipant();
//BLOCK_DEFINES(GrepParticipant);
struct
Subscription
{
Uint32
m_subscriberRef
;
Uint32
m_subscriberData
;
Uint32
m_subscriptionId
;
Uint32
m_subscriptionKey
;
Uint32
m_subscriptionType
;
Uint32
m_coordinatorRef
;
Uint32
m_outstandingRequest
;
Uint32
m_operationPtrI
;
Uint32
nextHash
;
union
{
Uint32
prevHash
;
Uint32
nextPool
;
};
Uint32
hashValue
()
const
{
return
m_subscriptionId
+
m_subscriptionKey
;
}
bool
equal
(
const
Subscription
&
s
)
const
{
return
m_subscriptionId
==
s
.
m_subscriptionId
&&
m_subscriptionKey
==
s
.
m_subscriptionKey
;
}
};
typedef
Ptr
<
Subscription
>
SubscriptionPtr
;
DLHashTable
<
Subscription
>
c_subscriptions
;
DLHashTable
<
Subscription
>::
Iterator
c_subPtr
;
ArrayPool
<
Subscription
>
c_subscriptionPool
;
public:
PSPart
(
class
Grep
*
);
//protected:
/*************************************************************************
* SUMA Signal Interface
*************************************************************************/
void
execSUB_CREATE_CONF
(
Signal
*
);
void
execSUB_START_CONF
(
Signal
*
);
void
execSUB_SYNC_CONF
(
Signal
*
);
void
execSUB_REMOVE_CONF
(
Signal
*
);
void
execSUB_CREATE_REF
(
Signal
*
);
void
execSUB_START_REF
(
Signal
*
);
void
execSUB_SYNC_REF
(
Signal
*
);
void
execSUB_REMOVE_REF
(
Signal
*
);
void
execSUB_META_DATA
(
Signal
*
);
void
execSUB_TABLE_DATA
(
Signal
*
);
void
execSUB_GCP_COMPLETE_REP
(
Signal
*
);
void
execSUB_SYNC_CONTINUE_REQ
(
Signal
*
);
/*************************************************************************
* GREP Coordinator Signal Interface
*************************************************************************/
void
execGREP_CREATE_REQ
(
Signal
*
);
void
execGREP_START_REQ
(
Signal
*
);
void
execGREP_SYNC_REQ
(
Signal
*
);
void
execGREP_REMOVE_REQ
(
Signal
*
);
/**
* NR/NF signals
*/
void
execSTART_ME
(
Signal
*
);
void
execGREP_ADD_SUB_REQ
(
Signal
*
);
void
execGREP_ADD_SUB_REF
(
Signal
*
);
void
execGREP_ADD_SUB_CONF
(
Signal
*
);
/*************************************************************************
* GREP Coordinator error handling interface
*************************************************************************/
void
sendRefToPSCoord
(
Signal
*
signal
,
Subscription
sub
,
GrepError
::
GE_Code
err
,
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
0
);
//protected:
BlockReference
m_repRef
;
///< Replication node
///< (only one rep node per grep)
bool
m_recoveryMode
;
private:
BlockReference
m_coordinator
;
Uint32
m_firstScanGCI
;
Uint32
m_lastScanGCI
;
Uint32
m_latestSeenGCI
;
Grep
*
m_grep
;
}
pspart
;
friend
class
PSPart
;
/***************************************************************************
* AddRecSignal Stuff (should maybe be gerneralized)
***************************************************************************/
typedef
void
(
Grep
::*
ExecSignalLocal1
)
(
Signal
*
signal
);
typedef
void
(
Grep
::
PSCoord
::*
ExecSignalLocal2
)
(
Signal
*
signal
);
typedef
void
(
Grep
::
PSPart
::*
ExecSignalLocal4
)
(
Signal
*
signal
);
};
/*************************************************************************
* Requestor
*
* The following methods are callbacks (registered functions)
* for the Requestor. The Requestor calls these when it needs
* something to be done.
*************************************************************************/
void
startSubscription
(
void
*
cbObj
,
Signal
*
,
int
type
);
void
scanSubscription
(
void
*
cbObj
,
Signal
*
,
int
type
);
#endif
ndb/src/kernel/blocks/grep/GrepInit.cpp
deleted
100644 → 0
View file @
28e3fe87
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Grep.hpp"
#include <Properties.hpp>
#include <Configuration.hpp>
/*****************************************************************************
* Grep Participant
*****************************************************************************/
#if 0
GrepParticipant::GrepParticipant(const Configuration & conf) :
SimulatedBlock(GREP, conf)
{
BLOCK_CONSTRUCTOR(Grep);
//m_repRef = 0;
m_latestSeenGCI = 0;
}
GrepParticipant::~GrepParticipant()
{
}
BLOCK_FUNCTIONS(GrepParticipant);
#endif
/*****************************************************************************
* Grep Coordinator
*****************************************************************************/
Grep
::
Grep
(
const
Configuration
&
conf
)
:
// GrepParticipant(conf),
SimulatedBlock
(
GREP
,
conf
),
m_nodes
(
m_nodePool
),
pscoord
(
this
),
pspart
(
this
)
{
m_nodePool
.
setSize
(
MAX_NDB_NODES
);
m_masterNodeId
=
getOwnNodeId
();
/***************************************************************************
* General Signals
***************************************************************************/
addRecSignal
(
GSN_STTOR
,
&
Grep
::
execSTTOR
);
addRecSignal
(
GSN_NDB_STTOR
,
&
Grep
::
execNDB_STTOR
);
addRecSignal
(
GSN_DUMP_STATE_ORD
,
&
Grep
::
execDUMP_STATE_ORD
);
addRecSignal
(
GSN_READ_NODESCONF
,
&
Grep
::
execREAD_NODESCONF
);
addRecSignal
(
GSN_NODE_FAILREP
,
&
Grep
::
execNODE_FAILREP
);
addRecSignal
(
GSN_INCL_NODEREQ
,
&
Grep
::
execINCL_NODEREQ
);
addRecSignal
(
GSN_GREP_REQ
,
&
Grep
::
execGREP_REQ
);
addRecSignal
(
GSN_API_FAILREQ
,
&
Grep
::
execAPI_FAILREQ
);
/***************************************************************************
* Grep::PSCoord Signal Interface
***************************************************************************/
/**
* From Grep::PSPart
*/
addRecSignal
(
GSN_GREP_CREATE_CONF
,
&
Grep
::
fwdGREP_CREATE_CONF
);
addRecSignal
(
GSN_GREP_START_CONF
,
&
Grep
::
fwdGREP_START_CONF
);
addRecSignal
(
GSN_GREP_SYNC_CONF
,
&
Grep
::
fwdGREP_SYNC_CONF
);
addRecSignal
(
GSN_GREP_REMOVE_CONF
,
&
Grep
::
fwdGREP_REMOVE_CONF
);
addRecSignal
(
GSN_GREP_CREATE_REF
,
&
Grep
::
fwdGREP_CREATE_REF
);
addRecSignal
(
GSN_GREP_START_REF
,
&
Grep
::
fwdGREP_START_REF
);
addRecSignal
(
GSN_GREP_REMOVE_REF
,
&
Grep
::
fwdGREP_REMOVE_REF
);
/**
* From Grep::SSCoord to Grep::PSCoord
*/
addRecSignal
(
GSN_GREP_SUB_START_REQ
,
&
Grep
::
fwdGREP_SUB_START_REQ
);
addRecSignal
(
GSN_GREP_SUB_CREATE_REQ
,
&
Grep
::
fwdGREP_SUB_CREATE_REQ
);
addRecSignal
(
GSN_GREP_SUB_SYNC_REQ
,
&
Grep
::
fwdGREP_SUB_SYNC_REQ
);
addRecSignal
(
GSN_GREP_SUB_REMOVE_REQ
,
&
Grep
::
fwdGREP_SUB_REMOVE_REQ
);
addRecSignal
(
GSN_GREP_CREATE_SUBID_REQ
,
&
Grep
::
fwdGREP_CREATE_SUBID_REQ
);
/****************************************************************************
* PSPart
***************************************************************************/
/**
* From SUMA to GREP PS Participant. If suma is not a coodinator
*/
addRecSignal
(
GSN_SUB_START_CONF
,
&
Grep
::
fwdSUB_START_CONF
);
addRecSignal
(
GSN_SUB_CREATE_CONF
,
&
Grep
::
fwdSUB_CREATE_CONF
);
addRecSignal
(
GSN_SUB_SYNC_CONF
,
&
Grep
::
fwdSUB_SYNC_CONF
);
addRecSignal
(
GSN_SUB_REMOVE_CONF
,
&
Grep
::
fwdSUB_REMOVE_CONF
);
addRecSignal
(
GSN_SUB_CREATE_REF
,
&
Grep
::
fwdSUB_CREATE_REF
);
addRecSignal
(
GSN_SUB_START_REF
,
&
Grep
::
fwdSUB_START_REF
);
addRecSignal
(
GSN_SUB_SYNC_REF
,
&
Grep
::
fwdSUB_SYNC_REF
);
addRecSignal
(
GSN_SUB_REMOVE_REF
,
&
Grep
::
fwdSUB_REMOVE_REF
);
addRecSignal
(
GSN_SUB_SYNC_CONTINUE_REQ
,
&
Grep
::
fwdSUB_SYNC_CONTINUE_REQ
);
/**
* From Suma to Grep::PSPart. Data signals.
*/
addRecSignal
(
GSN_SUB_META_DATA
,
&
Grep
::
fwdSUB_META_DATA
);
addRecSignal
(
GSN_SUB_TABLE_DATA
,
&
Grep
::
fwdSUB_TABLE_DATA
);
addRecSignal
(
GSN_SUB_GCP_COMPLETE_REP
,
&
Grep
::
fwdSUB_GCP_COMPLETE_REP
);
/**
* From Grep::PSCoord to Grep::PSPart
*/
addRecSignal
(
GSN_GREP_CREATE_REQ
,
&
Grep
::
fwdGREP_CREATE_REQ
);
addRecSignal
(
GSN_GREP_START_REQ
,
&
Grep
::
fwdGREP_START_REQ
);
addRecSignal
(
GSN_GREP_REMOVE_REQ
,
&
Grep
::
fwdGREP_REMOVE_REQ
);
addRecSignal
(
GSN_GREP_SYNC_REQ
,
&
Grep
::
fwdGREP_SYNC_REQ
);
addRecSignal
(
GSN_CREATE_SUBID_CONF
,
&
Grep
::
fwdCREATE_SUBID_CONF
);
addRecSignal
(
GSN_GREP_START_ME
,
&
Grep
::
fwdSTART_ME
);
addRecSignal
(
GSN_GREP_ADD_SUB_REQ
,
&
Grep
::
fwdGREP_ADD_SUB_REQ
);
addRecSignal
(
GSN_GREP_ADD_SUB_REF
,
&
Grep
::
fwdGREP_ADD_SUB_REF
);
addRecSignal
(
GSN_GREP_ADD_SUB_CONF
,
&
Grep
::
fwdGREP_ADD_SUB_CONF
);
}
Grep
::~
Grep
()
{
}
BLOCK_FUNCTIONS
(
Grep
)
Grep
::
PSPart
::
PSPart
(
Grep
*
sb
)
:
BlockComponent
(
sb
),
c_subscriptions
(
c_subscriptionPool
)
{
m_grep
=
sb
;
m_firstScanGCI
=
1
;
// Empty interval = [1,0]
m_lastScanGCI
=
0
;
m_latestSeenGCI
=
0
;
c_subscriptions
.
setSize
(
10
);
c_subscriptionPool
.
setSize
(
10
);
}
Grep
::
PSCoord
::
PSCoord
(
Grep
*
sb
)
:
BlockComponent
(
sb
),
c_runningSubscriptions
(
c_subCoordinatorPool
)
{
m_grep
=
sb
;
c_runningSubscriptions
.
setSize
(
10
);
c_subCoordinatorPool
.
setSize
(
2
);
}
//BLOCK_FUNCTIONS(Grep::PSCoord);
BlockComponent
::
BlockComponent
(
SimulatedBlock
*
sb
)
{
m_sb
=
sb
;
}
ndb/src/kernel/blocks/grep/Makefile.am
deleted
100644 → 0
View file @
28e3fe87
noinst_LIBRARIES
=
libgrep.a
libgrep_a_SOURCES
=
Grep.cpp GrepInit.cpp
include
$(top_srcdir)/ndb/config/common.mk.am
include
$(top_srcdir)/ndb/config/type_kernel.mk.am
# Don't update the files from bitkeeper
%
::
SCCS/s.%
windoze-dsp
:
libgrep.dsp
libgrep.dsp
:
Makefile
\
$(top_srcdir)/ndb/config/win-lib.am
\
$(top_srcdir)/ndb/config/win-name
\
$(top_srcdir)/ndb/config/win-includes
\
$(top_srcdir)/ndb/config/win-sources
\
$(top_srcdir)/ndb/config/win-libraries
cat
$(top_srcdir)
/ndb/config/win-lib.am
>
$@
@
$(top_srcdir)
/ndb/config/win-name
$@
$(noinst_LIBRARIES)
@
$(top_srcdir)
/ndb/config/win-includes
$@
$(INCLUDES)
@
$(top_srcdir)
/ndb/config/win-sources
$@
$(libgrep_a_SOURCES)
@
$(top_srcdir)
/ndb/config/win-libraries
$@
LIB
$(LDADD)
ndb/src/kernel/blocks/grep/systab_test/Makefile
deleted
100644 → 0
View file @
28e3fe87
include
.defs.mk
TYPE
:=
kernel
BIN_TARGET
:=
grep_systab_test
BIN_TARGET_ARCHIVES
:=
portlib general
CCFLAGS_LOC
+=
-I
..
SOURCES
=
../GrepSystemTable.cpp grep_systab_test.cpp
include
$(NDB_TOP)/Epilogue.mk
ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp
deleted
100644 → 0
View file @
28e3fe87
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
* Unit Test for GrepSystemTable
*/
#include "../GrepSystemTable.hpp"
#include <SimulatedBlock.hpp>
#define EXEC(X) ( ndbout << endl, ndbout_c(#X), X )
int
main
()
{
GrepSystemTable
st
;
Uint32
f
,
l
;
ndbout_c
(
"*************************************"
);
ndbout_c
(
"* GrepSystemTable Unit Test Program *"
);
ndbout_c
(
"*************************************"
);
ndbout_c
(
"--------------------------------------------------------"
);
ndbout_c
(
"Test 1: Clear"
);
ndbout_c
(
"--------------------------------------------------------"
);
EXEC
(
st
.
set
(
GrepSystemTable
::
PS
,
22
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
20
,
24
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
25
,
26
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
0
,
100
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
1
,
0
);
EXEC
(
st
.
set
(
GrepSystemTable
::
PS
,
22
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
24
,
28
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
23
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
0
,
100
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
1
,
0
);
EXEC
(
st
.
set
(
GrepSystemTable
::
PS
,
22
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
24
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
23
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
0
,
100
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
1
,
0
);
EXEC
(
st
.
set
(
GrepSystemTable
::
PS
,
22
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
22
,
24
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
25
,
26
);
ndbout_c
(
"--------------------------------------------------------"
);
ndbout_c
(
"Test 2: PS --> SSreq"
);
ndbout_c
(
"--------------------------------------------------------"
);
EXEC
(
st
.
set
(
GrepSystemTable
::
PS
,
22
,
26
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
st
.
require
(
GrepSystemTable
::
SSReq
,
1
,
0
);
if
(
!
EXEC
(
st
.
copy
(
GrepSystemTable
::
PS
,
GrepSystemTable
::
SSReq
,
3
,
&
f
,
&
l
)))
ndbout_c
(
"%s:%d: Illegal copy!"
,
__FILE__
,
__FILE__
);
ndbout_c
(
"f=%d, l=%d"
,
f
,
l
);
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
22
,
26
);
st
.
require
(
GrepSystemTable
::
SSReq
,
22
,
24
);
EXEC
(
st
.
clear
(
GrepSystemTable
::
PS
,
22
,
22
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
23
,
26
);
st
.
require
(
GrepSystemTable
::
SSReq
,
22
,
24
);
if
(
!
EXEC
(
st
.
copy
(
GrepSystemTable
::
PS
,
GrepSystemTable
::
SSReq
,
2
,
&
f
,
&
l
)))
ndbout_c
(
"%s:%d: Illegal copy!"
,
__FILE__
,
__LINE__
);
ndbout_c
(
"f=%d, l=%d"
,
f
,
l
);
st
.
print
();
st
.
require
(
GrepSystemTable
::
PS
,
23
,
26
);
st
.
require
(
GrepSystemTable
::
SSReq
,
22
,
26
);
st
.
set
(
GrepSystemTable
::
SS
,
7
,
9
);
st
.
set
(
GrepSystemTable
::
InsReq
,
7
,
9
);
if
(
EXEC
(
st
.
movable
(
GrepSystemTable
::
SS
,
GrepSystemTable
::
InsReq
)))
ndbout_c
(
"%s:%d: Illegal move!"
,
__FILE__
,
__LINE__
);
st
.
print
();
st
.
require
(
GrepSystemTable
::
SS
,
7
,
9
);
st
.
require
(
GrepSystemTable
::
InsReq
,
7
,
9
);
EXEC
(
st
.
intervalMinus
(
7
,
9
,
7
,
7
,
&
f
,
&
l
));
ndbout_c
(
"f=%d, l=%d"
,
f
,
l
);
st
.
clear
(
GrepSystemTable
::
InsReq
,
8
,
9
);
st
.
require
(
GrepSystemTable
::
SS
,
7
,
9
);
st
.
require
(
GrepSystemTable
::
InsReq
,
7
,
7
);
if
(
EXEC
(
st
.
movable
(
GrepSystemTable
::
SS
,
GrepSystemTable
::
InsReq
))
!=
2
)
ndbout_c
(
"%s:%d: Illegal move!"
,
__FILE__
,
__LINE__
);
st
.
print
();
EXEC
(
st
.
copy
(
GrepSystemTable
::
SS
,
GrepSystemTable
::
InsReq
,
&
f
));
st
.
print
();
st
.
require
(
GrepSystemTable
::
SS
,
7
,
9
);
st
.
require
(
GrepSystemTable
::
InsReq
,
7
,
8
);
ndbout_c
(
"--------------------------------------------------------"
);
ndbout_c
(
"Test completed"
);
ndbout_c
(
"--------------------------------------------------------"
);
}
ndb/test/ndbapi/testGrep.cpp
deleted
100644 → 0
View file @
28e3fe87
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NDBT.hpp>
#include <NDBT_Test.hpp>
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <NdbGrep.hpp>
#define CHECK(b) if (!(b)) { \
g_err << "ERR: "<< step->getName() \
<< " failed on line " << __LINE__ << endl; \
result = NDBT_FAILED; \
continue; }
int
runLoadTable
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
int
records
=
ctx
->
getNumRecords
();
HugoTransactions
hugoTrans
(
*
ctx
->
getTab
());
if
(
hugoTrans
.
loadTable
(
GETNDB
(
step
),
records
)
!=
0
){
return
NDBT_FAILED
;
}
return
NDBT_OK
;
}
int
runPkUpdate
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
int
loops
=
ctx
->
getNumLoops
();
int
records
=
ctx
->
getNumRecords
();
int
batchSize
=
ctx
->
getProperty
(
"BatchSize"
,
1
);
int
i
=
0
;
HugoTransactions
hugoTrans
(
*
ctx
->
getTab
());
while
(
i
<
loops
)
{
g_info
<<
"|- "
<<
i
<<
": "
;
if
(
hugoTrans
.
pkUpdateRecords
(
GETNDB
(
step
),
records
,
batchSize
)
!=
0
){
g_info
<<
endl
;
return
NDBT_FAILED
;
}
i
++
;
}
g_info
<<
endl
;
return
NDBT_OK
;
}
int
runRestartInitial
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbRestarter
restarter
;
Ndb
*
pNdb
=
GETNDB
(
step
);
const
NdbDictionary
::
Table
*
tab
=
ctx
->
getTab
();
pNdb
->
getDictionary
()
->
dropTable
(
tab
->
getName
());
if
(
restarter
.
restartAll
(
true
)
!=
0
)
return
NDBT_FAILED
;
return
NDBT_OK
;
}
int
runRestarter
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
int
result
=
NDBT_OK
;
int
loops
=
ctx
->
getNumLoops
();
NdbRestarter
restarter
;
int
i
=
0
;
int
lastId
=
0
;
if
(
restarter
.
getNumDbNodes
()
<
2
){
ctx
->
stopTest
();
return
NDBT_OK
;
}
if
(
restarter
.
waitClusterStarted
(
60
)
!=
0
){
g_err
<<
"Cluster failed to start"
<<
endl
;
return
NDBT_FAILED
;
}
loops
*=
restarter
.
getNumDbNodes
();
while
(
i
<
loops
&&
result
!=
NDBT_FAILED
&&
!
ctx
->
isTestStopped
()){
int
id
=
lastId
%
restarter
.
getNumDbNodes
();
int
nodeId
=
restarter
.
getDbNodeId
(
id
);
ndbout
<<
"Restart node "
<<
nodeId
<<
endl
;
if
(
restarter
.
restartOneDbNode
(
nodeId
)
!=
0
){
g_err
<<
"Failed to restartNextDbNode"
<<
endl
;
result
=
NDBT_FAILED
;
break
;
}
if
(
restarter
.
waitClusterStarted
(
60
)
!=
0
){
g_err
<<
"Cluster failed to start"
<<
endl
;
result
=
NDBT_FAILED
;
break
;
}
NdbSleep_SecSleep
(
1
);
lastId
++
;
i
++
;
}
ctx
->
stopTest
();
return
result
;
}
int
runCheckAllNodesStarted
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbRestarter
restarter
;
if
(
restarter
.
waitClusterStarted
(
1
)
!=
0
){
g_err
<<
"All nodes was not started "
<<
endl
;
return
NDBT_FAILED
;
}
return
NDBT_OK
;
}
bool
testMaster
=
true
;
bool
testSlave
=
false
;
int
setMaster
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
testMaster
=
true
;
testSlave
=
false
;
return
NDBT_OK
;
}
int
setMasterAsSlave
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
testMaster
=
true
;
testSlave
=
true
;
return
NDBT_OK
;
}
int
setSlave
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
testMaster
=
false
;
testSlave
=
true
;
return
NDBT_OK
;
}
int
runAbort
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbGrep
grep
(
GETNDB
(
step
)
->
getNodeId
()
+
1
);
NdbRestarter
restarter
;
if
(
restarter
.
getNumDbNodes
()
<
2
){
ctx
->
stopTest
();
return
NDBT_OK
;
}
if
(
restarter
.
waitClusterStarted
(
60
)
!=
0
){
g_err
<<
"Cluster failed to start"
<<
endl
;
return
NDBT_FAILED
;
}
if
(
testMaster
)
{
if
(
testSlave
)
{
if
(
grep
.
NFMasterAsSlave
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
else
{
if
(
grep
.
NFMaster
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
}
else
{
if
(
grep
.
NFSlave
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
return
NDBT_OK
;
}
int
runFail
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbGrep
grep
(
GETNDB
(
step
)
->
getNodeId
()
+
1
);
NdbRestarter
restarter
;
if
(
restarter
.
getNumDbNodes
()
<
2
){
ctx
->
stopTest
();
return
NDBT_OK
;
}
if
(
restarter
.
waitClusterStarted
(
60
)
!=
0
){
g_err
<<
"Cluster failed to start"
<<
endl
;
return
NDBT_FAILED
;
}
if
(
testMaster
)
{
if
(
testSlave
)
{
if
(
grep
.
FailMasterAsSlave
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
else
{
if
(
grep
.
FailMaster
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
}
else
{
if
(
grep
.
FailSlave
(
restarter
)
==
-
1
){
return
NDBT_FAILED
;
}
}
return
NDBT_OK
;
}
int
runGrepBasic
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbGrep
grep
(
GETNDB
(
step
)
->
getNodeId
()
+
1
);
unsigned
grepId
=
0
;
if
(
grep
.
start
()
==
-
1
){
return
NDBT_FAILED
;
}
ndbout
<<
"Started grep "
<<
grepId
<<
endl
;
ctx
->
setProperty
(
"GrepId"
,
grepId
);
return
NDBT_OK
;
}
int
runVerifyBasic
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
NdbGrep
grep
(
GETNDB
(
step
)
->
getNodeId
()
+
1
,
ctx
->
getRemoteMgm
());
ndbout_c
(
"no of nodes %d"
,
grep
.
getNumDbNodes
());
int
result
;
if
((
result
=
grep
.
verify
(
ctx
))
==
-
1
){
return
NDBT_FAILED
;
}
return
result
;
}
int
runClearTable
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
int
records
=
ctx
->
getNumRecords
();
UtilTransactions
utilTrans
(
*
ctx
->
getTab
());
if
(
utilTrans
.
clearTable2
(
GETNDB
(
step
),
records
)
!=
0
){
return
NDBT_FAILED
;
}
return
NDBT_OK
;
}
#include "bank/Bank.hpp"
int
runCreateBank
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
int
overWriteExisting
=
true
;
if
(
bank
.
createAndLoadBank
(
overWriteExisting
)
!=
NDBT_OK
)
return
NDBT_FAILED
;
return
NDBT_OK
;
}
int
runBankTimer
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
int
wait
=
30
;
// Max seconds between each "day"
int
yield
=
1
;
// Loops before bank returns
while
(
ctx
->
isTestStopped
()
==
false
)
{
bank
.
performIncreaseTime
(
wait
,
yield
);
}
return
NDBT_OK
;
}
int
runBankTransactions
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
int
wait
=
10
;
// Max ms between each transaction
int
yield
=
100
;
// Loops before bank returns
while
(
ctx
->
isTestStopped
()
==
false
)
{
bank
.
performTransactions
(
wait
,
yield
);
}
return
NDBT_OK
;
}
int
runBankGL
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
int
yield
=
20
;
// Loops before bank returns
int
result
=
NDBT_OK
;
while
(
ctx
->
isTestStopped
()
==
false
)
{
if
(
bank
.
performMakeGLs
(
yield
)
!=
NDBT_OK
){
ndbout
<<
"bank.performMakeGLs FAILED"
<<
endl
;
result
=
NDBT_FAILED
;
}
}
return
NDBT_OK
;
}
int
runBankSum
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
int
wait
=
2000
;
// Max ms between each sum of accounts
int
yield
=
1
;
// Loops before bank returns
int
result
=
NDBT_OK
;
while
(
ctx
->
isTestStopped
()
==
false
)
{
if
(
bank
.
performSumAccounts
(
wait
,
yield
)
!=
NDBT_OK
){
ndbout
<<
"bank.performSumAccounts FAILED"
<<
endl
;
result
=
NDBT_FAILED
;
}
}
return
result
;
}
int
runDropBank
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
Bank
bank
;
if
(
bank
.
dropBank
()
!=
NDBT_OK
)
return
NDBT_FAILED
;
return
NDBT_OK
;
}
int
runGrepBank
(
NDBT_Context
*
ctx
,
NDBT_Step
*
step
){
int
loops
=
ctx
->
getNumLoops
();
int
l
=
0
;
int
maxSleep
=
30
;
// Max seconds between each grep
Ndb
*
pNdb
=
GETNDB
(
step
);
NdbGrep
grep
(
GETNDB
(
step
)
->
getNodeId
()
+
1
);
unsigned
minGrepId
=
~
0
;
unsigned
maxGrepId
=
0
;
unsigned
grepId
=
0
;
int
result
=
NDBT_OK
;
while
(
l
<
loops
&&
result
!=
NDBT_FAILED
){
if
(
pNdb
->
waitUntilReady
()
!=
0
){
result
=
NDBT_FAILED
;
continue
;
}
// Sleep for a while
NdbSleep_SecSleep
(
maxSleep
);
// Perform grep
if
(
grep
.
start
()
!=
0
){
ndbout
<<
"grep.start failed"
<<
endl
;
result
=
NDBT_FAILED
;
continue
;
}
ndbout
<<
"Started grep "
<<
grepId
<<
endl
;
// Remember min and max grepid
if
(
grepId
<
minGrepId
)
minGrepId
=
grepId
;
if
(
grepId
>
maxGrepId
)
maxGrepId
=
grepId
;
ndbout
<<
" maxGrepId = "
<<
maxGrepId
<<
", minGrepId = "
<<
minGrepId
<<
endl
;
ctx
->
setProperty
(
"MinGrepId"
,
minGrepId
);
ctx
->
setProperty
(
"MaxGrepId"
,
maxGrepId
);
l
++
;
}
ctx
->
stopTest
();
return
result
;
}
/*
int runRestoreBankAndVerify(NDBT_Context* ctx, NDBT_Step* step){
NdbRestarter restarter;
NdbGrep grep(GETNDB(step)->getNodeId()+1);
unsigned minGrepId = ctx->getProperty("MinGrepId");
unsigned maxGrepId = ctx->getProperty("MaxGrepId");
unsigned grepId = minGrepId;
int result = NDBT_OK;
int errSumAccounts = 0;
int errValidateGL = 0;
ndbout << " maxGrepId = " << maxGrepId << endl;
ndbout << " minGrepId = " << minGrepId << endl;
while (grepId <= maxGrepId){
// TEMPORARY FIX
// To erase all tables from cache(s)
// To be removed, maybe replaced by ndb.invalidate();
{
Bank bank;
if (bank.dropBank() != NDBT_OK){
result = NDBT_FAILED;
break;
}
}
// END TEMPORARY FIX
ndbout << "Performing initial restart" << endl;
if (restarter.restartAll(true) != 0)
return NDBT_FAILED;
if (restarter.waitClusterStarted() != 0)
return NDBT_FAILED;
ndbout << "Restoring grep " << grepId << endl;
if (grep.restore(grepId) == -1){
return NDBT_FAILED;
}
ndbout << "Grep " << grepId << " restored" << endl;
// Let bank verify
Bank bank;
int wait = 0;
int yield = 1;
if (bank.performSumAccounts(wait, yield) != 0){
ndbout << "bank.performSumAccounts FAILED" << endl;
ndbout << " grepId = " << grepId << endl << endl;
result = NDBT_FAILED;
errSumAccounts++;
}
if (bank.performValidateAllGLs() != 0){
ndbout << "bank.performValidateAllGLs FAILED" << endl;
ndbout << " grepId = " << grepId << endl << endl;
result = NDBT_FAILED;
errValidateGL++;
}
grepId++;
}
if (result != NDBT_OK){
ndbout << "Verification of grep failed" << endl
<< " errValidateGL="<<errValidateGL<<endl
<< " errSumAccounts="<<errSumAccounts<<endl << endl;
}
return result;
}
*/
NDBT_TESTSUITE
(
testGrep
);
TESTCASE
(
"GrepBasic"
,
"Test that Global Replication works on one table
\n
"
"1. Load table
\n
"
"2. Grep
\n
"
"3. Restart -i
\n
"
"4. Restore
\n
"
"5. Verify count and content of table
\n
"
){
INITIALIZER
(
runLoadTable
);
VERIFIER
(
runVerifyBasic
);
FINALIZER
(
runClearTable
);
}
TESTCASE
(
"GrepNodeRestart"
,
"Test that Global Replication works on one table
\n
"
"1. Load table
\n
"
"2. Grep
\n
"
"3. Restart -i
\n
"
"4. Restore
\n
"
"5. Verify count and content of table
\n
"
){
INITIALIZER
(
runLoadTable
);
STEP
(
runPkUpdate
);
STEP
(
runRestarter
);
VERIFIER
(
runVerifyBasic
);
FINALIZER
(
runClearTable
);
}
TESTCASE
(
"GrepBank"
,
"Test that grep and restore works during transaction load
\n
"
" by backing up the bank"
"1. Create bank
\n
"
"2a. Start bank and let it run
\n
"
"2b. Perform loop number of greps of the bank
\n
"
" when greps are finished tell bank to close
\n
"
"3. Restart ndb -i and reload each grep
\n
"
" let bank verify that the grep is consistent
\n
"
"4. Drop bank
\n
"
){
INITIALIZER
(
runCreateBank
);
STEP
(
runBankTimer
);
STEP
(
runBankTransactions
);
STEP
(
runBankGL
);
// TODO STEP(runBankSum);
STEP
(
runGrepBank
);
// VERIFIER(runRestoreBankAndVerify);
// FINALIZER(runDropBank);
}
TESTCASE
(
"NFMaster"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setMaster
);
STEP
(
runAbort
);
}
TESTCASE
(
"NFMasterAsSlave"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setMasterAsSlave
);
STEP
(
runAbort
);
}
TESTCASE
(
"NFSlave"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setSlave
);
STEP
(
runAbort
);
}
TESTCASE
(
"FailMaster"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setMaster
);
STEP
(
runFail
);
}
TESTCASE
(
"FailMasterAsSlave"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setMasterAsSlave
);
STEP
(
runFail
);
}
TESTCASE
(
"FailSlave"
,
"Test that grep behaves during node failiure
\n
"
){
INITIALIZER
(
setSlave
);
STEP
(
runFail
);
}
NDBT_TESTSUITE_END
(
testGrep
);
int
main
(
int
argc
,
const
char
**
argv
){
ndb_init
();
return
testGrep
.
execute
(
argc
,
argv
);
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment