Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
1d981111
Commit
1d981111
authored
Nov 22, 2004
by
tomas@poseidon.ndb.mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added force send interface to scan
prepared for using query cache in ndb
parent
fee21dd3
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
74 additions
and
46 deletions
+74
-46
ndb/include/ndbapi/NdbIndexScanOperation.hpp
ndb/include/ndbapi/NdbIndexScanOperation.hpp
+3
-3
ndb/include/ndbapi/NdbResultSet.hpp
ndb/include/ndbapi/NdbResultSet.hpp
+3
-3
ndb/include/ndbapi/NdbScanOperation.hpp
ndb/include/ndbapi/NdbScanOperation.hpp
+6
-5
ndb/src/ndbapi/NdbResultSet.cpp
ndb/src/ndbapi/NdbResultSet.cpp
+6
-6
ndb/src/ndbapi/NdbScanOperation.cpp
ndb/src/ndbapi/NdbScanOperation.cpp
+34
-16
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+20
-13
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+2
-0
No files found.
ndb/include/ndbapi/NdbIndexScanOperation.hpp
View file @
1d981111
...
...
@@ -113,7 +113,7 @@ public:
* Reset bounds and put operation in list that will be
* sent on next execute
*/
int
reset_bounds
();
int
reset_bounds
(
bool
forceSend
=
false
);
bool
getSorted
()
const
{
return
m_ordered
;
}
private:
...
...
@@ -127,8 +127,8 @@ private:
virtual
NdbRecAttr
*
getValue_impl
(
const
NdbColumnImpl
*
,
char
*
);
void
fix_get_values
();
int
next_result_ordered
(
bool
fetchAllowed
);
int
send_next_scan_ordered
(
Uint32
idx
);
int
next_result_ordered
(
bool
fetchAllowed
,
bool
forceSend
=
false
);
int
send_next_scan_ordered
(
Uint32
idx
,
bool
forceSend
=
false
);
int
compare
(
Uint32
key
,
Uint32
cols
,
const
NdbReceiver
*
,
const
NdbReceiver
*
);
Uint32
m_sort_columns
;
...
...
ndb/include/ndbapi/NdbResultSet.hpp
View file @
1d981111
...
...
@@ -89,17 +89,17 @@ public:
* - 1: if there are no more tuples to scan.
* - 2: if there are no more cached records in NdbApi
*/
int
nextResult
(
bool
fetchAllowed
=
true
);
int
nextResult
(
bool
fetchAllowed
=
true
,
bool
forceSend
=
false
);
/**
* Close result set (scan)
*/
void
close
();
void
close
(
bool
forceSend
=
false
);
/**
* Restart
*/
int
restart
();
int
restart
(
bool
forceSend
=
false
);
/**
* Transfer scan operation to an updating transaction. Use this function
...
...
ndb/include/ndbapi/NdbScanOperation.hpp
View file @
1d981111
...
...
@@ -90,11 +90,11 @@ protected:
NdbScanOperation
(
Ndb
*
aNdb
);
virtual
~
NdbScanOperation
();
int
nextResult
(
bool
fetchAllowed
=
true
);
int
nextResult
(
bool
fetchAllowed
=
true
,
bool
forceSend
=
false
);
virtual
void
release
();
void
closeScan
();
int
close_impl
(
class
TransporterFacade
*
);
void
closeScan
(
bool
forceSend
=
false
);
int
close_impl
(
class
TransporterFacade
*
,
bool
forceSend
=
false
);
// Overloaded methods from NdbCursorOperation
int
executeCursor
(
int
ProcessorId
);
...
...
@@ -103,6 +103,7 @@ protected:
int
init
(
const
NdbTableImpl
*
tab
,
NdbConnection
*
myConnection
);
int
prepareSend
(
Uint32
TC_ConnectPtr
,
Uint64
TransactionId
);
int
doSend
(
int
ProcessorId
);
void
checkForceSend
(
bool
forceSend
);
virtual
void
setErrorCode
(
int
aErrorCode
);
virtual
void
setErrorCodeAbort
(
int
aErrorCode
);
...
...
@@ -138,7 +139,7 @@ protected:
Uint32
m_sent_receivers_count
;
// NOTE needs mutex to access
NdbReceiver
**
m_sent_receivers
;
// receive thread puts them here
int
send_next_scan
(
Uint32
cnt
,
bool
close
);
int
send_next_scan
(
Uint32
cnt
,
bool
close
,
bool
forceSend
=
false
);
void
receiver_delivered
(
NdbReceiver
*
);
void
receiver_completed
(
NdbReceiver
*
);
void
execCLOSE_SCAN_REP
();
...
...
@@ -148,7 +149,7 @@ protected:
Uint32
m_ordered
;
int
restart
();
int
restart
(
bool
forceSend
=
false
);
};
inline
...
...
ndb/src/ndbapi/NdbResultSet.cpp
View file @
1d981111
...
...
@@ -44,10 +44,10 @@ void NdbResultSet::init()
{
}
int
NdbResultSet
::
nextResult
(
bool
fetchAllowed
)
int
NdbResultSet
::
nextResult
(
bool
fetchAllowed
,
bool
forceSend
)
{
int
res
;
if
((
res
=
m_operation
->
nextResult
(
fetchAllowed
))
==
0
)
{
if
((
res
=
m_operation
->
nextResult
(
fetchAllowed
,
forceSend
))
==
0
)
{
// handle blobs
NdbBlob
*
tBlob
=
m_operation
->
theBlobList
;
while
(
tBlob
!=
0
)
{
...
...
@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return
res
;
}
void
NdbResultSet
::
close
()
void
NdbResultSet
::
close
(
bool
forceSend
)
{
m_operation
->
closeScan
();
m_operation
->
closeScan
(
forceSend
);
}
NdbOperation
*
...
...
@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
}
int
NdbResultSet
::
restart
(){
return
m_operation
->
restart
();
NdbResultSet
::
restart
(
bool
forceSend
){
return
m_operation
->
restart
(
forceSend
);
}
ndb/src/ndbapi/NdbScanOperation.cpp
View file @
1d981111
...
...
@@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){
#define DEBUG_NEXT_RESULT 0
int
NdbScanOperation
::
nextResult
(
bool
fetchAllowed
)
int
NdbScanOperation
::
nextResult
(
bool
fetchAllowed
,
bool
forceSend
)
{
if
(
m_ordered
)
return
((
NdbIndexScanOperation
*
)
this
)
->
next_result_ordered
(
fetchAllowed
);
return
((
NdbIndexScanOperation
*
)
this
)
->
next_result_ordered
(
fetchAllowed
,
forceSend
);
/**
* Check current receiver
...
...
@@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
TransporterFacade
*
tp
=
TransporterFacade
::
instance
();
Guard
guard
(
tp
->
theMutexPtr
);
Uint32
seq
=
theNdbCon
->
theNodeSequence
;
if
(
seq
==
tp
->
getNodeSequence
(
nodeId
)
&&
send_next_scan
(
idx
,
false
)
==
0
){
if
(
seq
==
tp
->
getNodeSequence
(
nodeId
)
&&
send_next_scan
(
idx
,
false
,
forceSend
)
==
0
){
idx
=
m_current_api_receiver
;
last
=
m_api_receivers_count
;
...
...
@@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}
int
NdbScanOperation
::
send_next_scan
(
Uint32
cnt
,
bool
stopScanFlag
){
NdbScanOperation
::
send_next_scan
(
Uint32
cnt
,
bool
stopScanFlag
,
bool
forceSend
){
if
(
cnt
>
0
||
stopScanFlag
){
NdbApiSignal
tSignal
(
theNdb
->
theMyRef
);
tSignal
.
setSignal
(
GSN_SCAN_NEXTREQ
);
...
...
@@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
ret
=
tp
->
sendSignal
(
&
tSignal
,
nodeId
);
}
if
(
!
ret
)
checkForceSend
(
forceSend
);
m_sent_receivers_count
=
last
+
cnt
+
stopScanFlag
;
m_api_receivers_count
-=
cnt
;
m_current_api_receiver
=
0
;
...
...
@@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
return
0
;
}
void
NdbScanOperation
::
checkForceSend
(
bool
forceSend
)
{
if
(
forceSend
)
{
TransporterFacade
::
instance
()
->
forceSend
(
theNdb
->
theNdbBlockNumber
);
}
else
{
TransporterFacade
::
instance
()
->
checkForceSend
(
theNdb
->
theNdbBlockNumber
);
}
//if
}
int
NdbScanOperation
::
prepareSend
(
Uint32
TC_ConnectPtr
,
Uint64
TransactionId
)
{
...
...
@@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId)
return
0
;
}
void
NdbScanOperation
::
closeScan
()
void
NdbScanOperation
::
closeScan
(
bool
forceSend
)
{
if
(
m_transConnection
){
if
(
DEBUG_NEXT_RESULT
)
...
...
@@ -657,7 +671,7 @@ void NdbScanOperation::closeScan()
TransporterFacade
*
tp
=
TransporterFacade
::
instance
();
Guard
guard
(
tp
->
theMutexPtr
);
close_impl
(
tp
);
close_impl
(
tp
,
forceSend
);
}
while
(
0
);
...
...
@@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}
int
NdbIndexScanOperation
::
next_result_ordered
(
bool
fetchAllowed
){
NdbIndexScanOperation
::
next_result_ordered
(
bool
fetchAllowed
,
bool
forceSend
){
Uint32
u_idx
=
0
,
u_last
=
0
;
Uint32
s_idx
=
m_current_api_receiver
;
// first sorted
...
...
@@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard
guard
(
tp
->
theMutexPtr
);
Uint32
seq
=
theNdbCon
->
theNodeSequence
;
Uint32
nodeId
=
theNdbCon
->
theDBnode
;
if
(
seq
==
tp
->
getNodeSequence
(
nodeId
)
&&
!
send_next_scan_ordered
(
s_idx
)){
if
(
seq
==
tp
->
getNodeSequence
(
nodeId
)
&&
!
send_next_scan_ordered
(
s_idx
,
forceSend
)){
Uint32
tmp
=
m_sent_receivers_count
;
s_idx
=
m_current_api_receiver
;
while
(
m_sent_receivers_count
>
0
&&
!
theError
.
code
){
...
...
@@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}
int
NdbIndexScanOperation
::
send_next_scan_ordered
(
Uint32
idx
){
NdbIndexScanOperation
::
send_next_scan_ordered
(
Uint32
idx
,
bool
forceSend
){
if
(
idx
==
theParallelism
)
return
0
;
...
...
@@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
Uint32
nodeId
=
theNdbCon
->
theDBnode
;
TransporterFacade
*
tp
=
TransporterFacade
::
instance
();
tSignal
.
setLength
(
4
+
1
);
return
tp
->
sendSignal
(
&
tSignal
,
nodeId
);
int
ret
=
tp
->
sendSignal
(
&
tSignal
,
nodeId
);
if
(
!
ret
)
checkForceSend
(
forceSend
);
return
ret
;
}
int
NdbScanOperation
::
close_impl
(
TransporterFacade
*
tp
){
NdbScanOperation
::
close_impl
(
TransporterFacade
*
tp
,
bool
forceSend
){
Uint32
seq
=
theNdbCon
->
theNodeSequence
;
Uint32
nodeId
=
theNdbCon
->
theDBnode
;
...
...
@@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
if
(
m_api_receivers_count
+
m_conf_receivers_count
){
// Send close scan
if
(
send_next_scan
(
0
,
true
)
==
-
1
){
// Close scan
if
(
send_next_scan
(
0
,
true
,
forceSend
)
==
-
1
){
// Close scan
theNdbCon
->
theReleaseOnClose
=
true
;
return
-
1
;
}
...
...
@@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}
int
NdbScanOperation
::
restart
()
NdbScanOperation
::
restart
(
bool
forceSend
)
{
TransporterFacade
*
tp
=
TransporterFacade
::
instance
();
...
...
@@ -1529,7 +1547,7 @@ NdbScanOperation::restart()
{
int
res
;
if
((
res
=
close_impl
(
tp
)))
if
((
res
=
close_impl
(
tp
,
forceSend
)))
{
return
res
;
}
...
...
@@ -1548,13 +1566,13 @@ NdbScanOperation::restart()
}
int
NdbIndexScanOperation
::
reset_bounds
(){
NdbIndexScanOperation
::
reset_bounds
(
bool
forceSend
){
int
res
;
{
TransporterFacade
*
tp
=
TransporterFacade
::
instance
();
Guard
guard
(
tp
->
theMutexPtr
);
res
=
close_impl
(
tp
);
res
=
close_impl
(
tp
,
forceSend
);
}
if
(
!
res
)
...
...
sql/ha_ndbcluster.cc
View file @
1d981111
...
...
@@ -1247,7 +1247,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
m_ops_pending
=
0
;
m_blobs_pending
=
FALSE
;
}
check
=
cursor
->
nextResult
(
contact_ndb
);
check
=
cursor
->
nextResult
(
contact_ndb
,
m_force_send
);
if
(
check
==
0
)
{
// One more record found
...
...
@@ -1540,7 +1540,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_ASSERT
(
op
->
getSorted
()
==
sorted
);
DBUG_ASSERT
(
op
->
getLockMode
()
==
(
NdbOperation
::
LockMode
)
get_ndb_lock_type
(
m_lock
.
type
));
if
(
op
->
reset_bounds
())
if
(
op
->
reset_bounds
(
m_force_send
))
DBUG_RETURN
(
ndb_err
(
m_active_trans
));
}
...
...
@@ -2367,7 +2367,7 @@ int ha_ndbcluster::index_last(byte *buf)
int
res
;
if
((
res
=
ordered_index_scan
(
0
,
0
,
TRUE
,
buf
))
==
0
){
NdbResultSet
*
cursor
=
m_active_cursor
;
while
((
res
=
cursor
->
nextResult
(
TRUE
))
==
0
);
while
((
res
=
cursor
->
nextResult
(
TRUE
,
m_force_send
))
==
0
);
if
(
res
==
1
){
unpack_record
(
buf
);
table
->
status
=
0
;
...
...
@@ -2453,7 +2453,7 @@ int ha_ndbcluster::rnd_init(bool scan)
{
if
(
!
scan
)
DBUG_RETURN
(
1
);
int
res
=
cursor
->
restart
();
int
res
=
cursor
->
restart
(
m_force_send
);
DBUG_ASSERT
(
res
==
0
);
}
index_init
(
table
->
primary_key
);
...
...
@@ -2484,7 +2484,7 @@ int ha_ndbcluster::close_scan()
m_ops_pending
=
0
;
}
cursor
->
close
();
cursor
->
close
(
m_force_send
);
m_active_cursor
=
NULL
;
DBUG_RETURN
(
0
);
}
...
...
@@ -3004,6 +3004,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_transaction_on
=
FALSE
;
else
m_transaction_on
=
thd
->
variables
.
ndb_use_transactions
;
// m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
m_active_trans
=
thd
->
transaction
.
all
.
ndb_tid
?
(
NdbConnection
*
)
thd
->
transaction
.
all
.
ndb_tid
:
...
...
@@ -3728,7 +3729,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ha_not_exact_count
(
FALSE
),
m_force_send
(
TRUE
),
m_autoincrement_prefetch
(
32
),
m_transaction_on
(
TRUE
)
m_transaction_on
(
TRUE
),
m_use_local_query_cache
(
FALSE
)
{
int
i
;
...
...
@@ -4415,7 +4417,7 @@ bool ha_ndbcluster::low_byte_first() const
}
bool
ha_ndbcluster
::
has_transactions
()
{
return
TRUE
;
return
m_transaction_on
;
}
const
char
*
ha_ndbcluster
::
index_type
(
uint
key_number
)
{
...
...
@@ -4432,6 +4434,9 @@ const char* ha_ndbcluster::index_type(uint key_number)
}
uint8
ha_ndbcluster
::
table_cache_type
()
{
if
(
m_use_local_query_cache
)
return
HA_CACHE_TBL_TRANSACT
;
else
return
HA_CACHE_TBL_NOCACHE
;
}
...
...
@@ -4600,10 +4605,9 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{
DBUG_ENTER
(
"ndb_get_table_statistics"
);
DBUG_PRINT
(
"enter"
,
(
"table: %s"
,
table
));
NdbConnection
*
pTrans
=
ndb
->
startTransaction
();
do
{
NdbConnection
*
pTrans
=
ndb
->
startTransaction
();
if
(
pTrans
==
NULL
)
break
;
...
...
@@ -4623,13 +4627,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
pOp
->
getValue
(
NdbDictionary
::
Column
::
ROW_COUNT
,
(
char
*
)
&
rows
);
pOp
->
getValue
(
NdbDictionary
::
Column
::
COMMIT_COUNT
,
(
char
*
)
&
commits
);
check
=
pTrans
->
execute
(
NoCommit
);
check
=
pTrans
->
execute
(
NoCommit
,
AbortOnError
,
TRUE
);
if
(
check
==
-
1
)
break
;
Uint64
sum_rows
=
0
;
Uint64
sum_commits
=
0
;
while
((
check
=
rs
->
nextResult
(
TRUE
))
==
0
)
while
((
check
=
rs
->
nextResult
(
TRUE
,
TRUE
))
==
0
)
{
sum_rows
+=
rows
;
sum_commits
+=
commits
;
...
...
@@ -4638,6 +4642,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if
(
check
==
-
1
)
break
;
rs
->
close
(
TRUE
);
ndb
->
closeTransaction
(
pTrans
);
if
(
row_count
)
*
row_count
=
sum_rows
;
...
...
@@ -4647,6 +4653,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN
(
0
);
}
while
(
0
);
ndb
->
closeTransaction
(
pTrans
);
DBUG_PRINT
(
"exit"
,
(
"failed"
));
DBUG_RETURN
(
-
1
);
}
...
...
sql/ha_ndbcluster.h
View file @
1d981111
...
...
@@ -238,10 +238,12 @@ class ha_ndbcluster: public handler
char
*
m_blobs_buffer
;
uint32
m_blobs_buffer_size
;
uint
m_dupkey
;
// set from thread variables at external lock
bool
m_ha_not_exact_count
;
bool
m_force_send
;
ha_rows
m_autoincrement_prefetch
;
bool
m_transaction_on
;
bool
m_use_local_query_cache
;
void
set_rec_per_key
();
void
records_update
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment