Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
45aaaac8
Commit
45aaaac8
authored
Jan 24, 2008
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge joreland@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb
into perch.ndb.mysql.com:/home/jonas/src/51-ndb
parents
49b8a3fc
8f77ea23
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
148 additions
and
64 deletions
+148
-64
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
+50
-8
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
+37
-2
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
+61
-54
No files found.
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
View file @
45aaaac8
...
@@ -106,8 +106,11 @@ void Dbtup::removeActiveOpList(Operationrec* const regOperPtr,
...
@@ -106,8 +106,11 @@ void Dbtup::removeActiveOpList(Operationrec* const regOperPtr,
* Release copy tuple
* Release copy tuple
*/
*/
if
(
!
regOperPtr
->
m_copy_tuple_location
.
isNull
())
if
(
!
regOperPtr
->
m_copy_tuple_location
.
isNull
())
{
jam
();
c_undo_buffer
.
free_copy_tuple
(
&
regOperPtr
->
m_copy_tuple_location
);
c_undo_buffer
.
free_copy_tuple
(
&
regOperPtr
->
m_copy_tuple_location
);
}
if
(
regOperPtr
->
op_struct
.
in_active_list
)
{
if
(
regOperPtr
->
op_struct
.
in_active_list
)
{
regOperPtr
->
op_struct
.
in_active_list
=
false
;
regOperPtr
->
op_struct
.
in_active_list
=
false
;
if
(
regOperPtr
->
nextActiveOp
!=
RNIL
)
{
if
(
regOperPtr
->
nextActiveOp
!=
RNIL
)
{
...
@@ -170,6 +173,7 @@ Dbtup::dealloc_tuple(Signal* signal,
...
@@ -170,6 +173,7 @@ Dbtup::dealloc_tuple(Signal* signal,
Uint32
extra_bits
=
Tuple_header
::
FREED
;
Uint32
extra_bits
=
Tuple_header
::
FREED
;
if
(
bits
&
Tuple_header
::
DISK_PART
)
if
(
bits
&
Tuple_header
::
DISK_PART
)
{
{
jam
();
Local_key
disk
;
Local_key
disk
;
memcpy
(
&
disk
,
ptr
->
get_disk_ref_ptr
(
regTabPtr
),
sizeof
(
disk
));
memcpy
(
&
disk
,
ptr
->
get_disk_ref_ptr
(
regTabPtr
),
sizeof
(
disk
));
PagePtr
tmpptr
;
PagePtr
tmpptr
;
...
@@ -182,6 +186,7 @@ Dbtup::dealloc_tuple(Signal* signal,
...
@@ -182,6 +186,7 @@ Dbtup::dealloc_tuple(Signal* signal,
if
(
!
(
bits
&
(
Tuple_header
::
LCP_SKIP
|
Tuple_header
::
ALLOC
))
&&
if
(
!
(
bits
&
(
Tuple_header
::
LCP_SKIP
|
Tuple_header
::
ALLOC
))
&&
lcpScan_ptr_i
!=
RNIL
)
lcpScan_ptr_i
!=
RNIL
)
{
{
jam
();
ScanOpPtr
scanOp
;
ScanOpPtr
scanOp
;
c_scanOpPool
.
getPtr
(
scanOp
,
lcpScan_ptr_i
);
c_scanOpPool
.
getPtr
(
scanOp
,
lcpScan_ptr_i
);
Local_key
rowid
=
regOperPtr
->
m_tuple_location
;
Local_key
rowid
=
regOperPtr
->
m_tuple_location
;
...
@@ -189,6 +194,7 @@ Dbtup::dealloc_tuple(Signal* signal,
...
@@ -189,6 +194,7 @@ Dbtup::dealloc_tuple(Signal* signal,
rowid
.
m_page_no
=
page
->
frag_page_id
;
rowid
.
m_page_no
=
page
->
frag_page_id
;
if
(
rowid
>
scanpos
)
if
(
rowid
>
scanpos
)
{
{
jam
();
extra_bits
=
Tuple_header
::
LCP_KEEP
;
// Note REMOVE FREE
extra_bits
=
Tuple_header
::
LCP_KEEP
;
// Note REMOVE FREE
ptr
->
m_operation_ptr_i
=
lcp_keep_list
;
ptr
->
m_operation_ptr_i
=
lcp_keep_list
;
regFragPtr
->
m_lcp_keep_list
=
rowid
.
ref
();
regFragPtr
->
m_lcp_keep_list
=
rowid
.
ref
();
...
@@ -229,11 +235,13 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -229,11 +235,13 @@ Dbtup::commit_operation(Signal* signal,
Uint32
mm_vars
=
regTabPtr
->
m_attributes
[
MM
].
m_no_of_varsize
;
Uint32
mm_vars
=
regTabPtr
->
m_attributes
[
MM
].
m_no_of_varsize
;
if
(
mm_vars
==
0
)
if
(
mm_vars
==
0
)
{
{
jam
();
memcpy
(
tuple_ptr
,
copy
,
4
*
fixsize
);
memcpy
(
tuple_ptr
,
copy
,
4
*
fixsize
);
disk_ptr
=
(
Tuple_header
*
)(((
Uint32
*
)
copy
)
+
fixsize
);
disk_ptr
=
(
Tuple_header
*
)(((
Uint32
*
)
copy
)
+
fixsize
);
}
}
else
else
{
{
jam
();
/**
/**
* Var_part_ref is only stored in *allocated* tuple
* Var_part_ref is only stored in *allocated* tuple
* so memcpy from copy, will over write it...
* so memcpy from copy, will over write it...
...
@@ -258,6 +266,7 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -258,6 +266,7 @@ Dbtup::commit_operation(Signal* signal,
if
(
copy_bits
&
Tuple_header
::
MM_SHRINK
)
if
(
copy_bits
&
Tuple_header
::
MM_SHRINK
)
{
{
jam
();
vpagePtrP
->
shrink_entry
(
tmp
.
m_page_idx
,
(
sz
+
3
)
>>
2
);
vpagePtrP
->
shrink_entry
(
tmp
.
m_page_idx
,
(
sz
+
3
)
>>
2
);
update_free_page_list
(
regFragPtr
,
vpagePtr
);
update_free_page_list
(
regFragPtr
,
vpagePtr
);
}
}
...
@@ -268,6 +277,7 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -268,6 +277,7 @@ Dbtup::commit_operation(Signal* signal,
if
(
regTabPtr
->
m_no_of_disk_attributes
&&
if
(
regTabPtr
->
m_no_of_disk_attributes
&&
(
copy_bits
&
Tuple_header
::
DISK_INLINE
))
(
copy_bits
&
Tuple_header
::
DISK_INLINE
))
{
{
jam
();
Local_key
key
;
Local_key
key
;
memcpy
(
&
key
,
copy
->
get_disk_ref_ptr
(
regTabPtr
),
sizeof
(
Local_key
));
memcpy
(
&
key
,
copy
->
get_disk_ref_ptr
(
regTabPtr
),
sizeof
(
Local_key
));
Uint32
logfile_group_id
=
regFragPtr
->
m_logfile_group_id
;
Uint32
logfile_group_id
=
regFragPtr
->
m_logfile_group_id
;
...
@@ -278,22 +288,26 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -278,22 +288,26 @@ Dbtup::commit_operation(Signal* signal,
Uint32
sz
,
*
dst
;
Uint32
sz
,
*
dst
;
if
(
copy_bits
&
Tuple_header
::
DISK_ALLOC
)
if
(
copy_bits
&
Tuple_header
::
DISK_ALLOC
)
{
{
jam
();
disk_page_alloc
(
signal
,
regTabPtr
,
regFragPtr
,
&
key
,
diskPagePtr
,
gci
);
disk_page_alloc
(
signal
,
regTabPtr
,
regFragPtr
,
&
key
,
diskPagePtr
,
gci
);
}
}
if
(
regTabPtr
->
m_attributes
[
DD
].
m_no_of_varsize
==
0
)
if
(
regTabPtr
->
m_attributes
[
DD
].
m_no_of_varsize
==
0
)
{
{
jam
();
sz
=
regTabPtr
->
m_offsets
[
DD
].
m_fix_header_size
;
sz
=
regTabPtr
->
m_offsets
[
DD
].
m_fix_header_size
;
dst
=
((
Fix_page
*
)
diskPagePtr
.
p
)
->
get_ptr
(
key
.
m_page_idx
,
sz
);
dst
=
((
Fix_page
*
)
diskPagePtr
.
p
)
->
get_ptr
(
key
.
m_page_idx
,
sz
);
}
}
else
else
{
{
jam
();
dst
=
((
Var_page
*
)
diskPagePtr
.
p
)
->
get_ptr
(
key
.
m_page_idx
);
dst
=
((
Var_page
*
)
diskPagePtr
.
p
)
->
get_ptr
(
key
.
m_page_idx
);
sz
=
((
Var_page
*
)
diskPagePtr
.
p
)
->
get_entry_len
(
key
.
m_page_idx
);
sz
=
((
Var_page
*
)
diskPagePtr
.
p
)
->
get_entry_len
(
key
.
m_page_idx
);
}
}
if
(
!
(
copy_bits
&
Tuple_header
::
DISK_ALLOC
))
if
(
!
(
copy_bits
&
Tuple_header
::
DISK_ALLOC
))
{
{
jam
();
disk_page_undo_update
(
diskPagePtr
.
p
,
disk_page_undo_update
(
diskPagePtr
.
p
,
&
key
,
dst
,
sz
,
gci
,
logfile_group_id
);
&
key
,
dst
,
sz
,
gci
,
logfile_group_id
);
}
}
...
@@ -307,6 +321,7 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -307,6 +321,7 @@ Dbtup::commit_operation(Signal* signal,
if
(
lcpScan_ptr_i
!=
RNIL
&&
(
bits
&
Tuple_header
::
ALLOC
))
if
(
lcpScan_ptr_i
!=
RNIL
&&
(
bits
&
Tuple_header
::
ALLOC
))
{
{
jam
();
ScanOpPtr
scanOp
;
ScanOpPtr
scanOp
;
c_scanOpPool
.
getPtr
(
scanOp
,
lcpScan_ptr_i
);
c_scanOpPool
.
getPtr
(
scanOp
,
lcpScan_ptr_i
);
Local_key
rowid
=
regOperPtr
->
m_tuple_location
;
Local_key
rowid
=
regOperPtr
->
m_tuple_location
;
...
@@ -314,6 +329,7 @@ Dbtup::commit_operation(Signal* signal,
...
@@ -314,6 +329,7 @@ Dbtup::commit_operation(Signal* signal,
rowid
.
m_page_no
=
pagePtr
.
p
->
frag_page_id
;
rowid
.
m_page_no
=
pagePtr
.
p
->
frag_page_id
;
if
(
rowid
>
scanpos
)
if
(
rowid
>
scanpos
)
{
{
jam
();
copy_bits
|=
Tuple_header
::
LCP_SKIP
;
copy_bits
|=
Tuple_header
::
LCP_SKIP
;
}
}
}
}
...
@@ -372,7 +388,10 @@ Dbtup::disk_page_commit_callback(Signal* signal,
...
@@ -372,7 +388,10 @@ Dbtup::disk_page_commit_callback(Signal* signal,
execTUP_COMMITREQ
(
signal
);
execTUP_COMMITREQ
(
signal
);
if
(
signal
->
theData
[
0
]
==
0
)
if
(
signal
->
theData
[
0
]
==
0
)
{
jam
();
c_lqh
->
tupcommit_conf_callback
(
signal
,
regOperPtr
.
p
->
userpointer
);
c_lqh
->
tupcommit_conf_callback
(
signal
,
regOperPtr
.
p
->
userpointer
);
}
}
}
void
void
...
@@ -410,6 +429,7 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
...
@@ -410,6 +429,7 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
void
void
Dbtup
::
fix_commit_order
(
OperationrecPtr
opPtr
)
Dbtup
::
fix_commit_order
(
OperationrecPtr
opPtr
)
{
{
jam
();
ndbassert
(
!
opPtr
.
p
->
is_first_operation
());
ndbassert
(
!
opPtr
.
p
->
is_first_operation
());
OperationrecPtr
firstPtr
=
opPtr
;
OperationrecPtr
firstPtr
=
opPtr
;
while
(
firstPtr
.
p
->
prevActiveOp
!=
RNIL
)
while
(
firstPtr
.
p
->
prevActiveOp
!=
RNIL
)
...
@@ -435,7 +455,10 @@ Dbtup::fix_commit_order(OperationrecPtr opPtr)
...
@@ -435,7 +455,10 @@ Dbtup::fix_commit_order(OperationrecPtr opPtr)
c_operation_pool
.
getPtr
(
seco
)
->
prevActiveOp
=
opPtr
.
i
;
c_operation_pool
.
getPtr
(
seco
)
->
prevActiveOp
=
opPtr
.
i
;
c_operation_pool
.
getPtr
(
prev
)
->
nextActiveOp
=
firstPtr
.
i
;
c_operation_pool
.
getPtr
(
prev
)
->
nextActiveOp
=
firstPtr
.
i
;
if
(
next
!=
RNIL
)
if
(
next
!=
RNIL
)
{
jam
();
c_operation_pool
.
getPtr
(
next
)
->
prevActiveOp
=
firstPtr
.
i
;
c_operation_pool
.
getPtr
(
next
)
->
prevActiveOp
=
firstPtr
.
i
;
}
}
}
/* ----------------------------------------------------------------- */
/* ----------------------------------------------------------------- */
...
@@ -500,6 +523,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -500,6 +523,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
bool
get_page
=
false
;
bool
get_page
=
false
;
if
(
regOperPtr
.
p
->
op_struct
.
m_load_diskpage_on_commit
)
if
(
regOperPtr
.
p
->
op_struct
.
m_load_diskpage_on_commit
)
{
{
jam
();
Page_cache_client
::
Request
req
;
Page_cache_client
::
Request
req
;
ndbassert
(
regOperPtr
.
p
->
is_first_operation
()
&&
ndbassert
(
regOperPtr
.
p
->
is_first_operation
()
&&
regOperPtr
.
p
->
is_last_operation
());
regOperPtr
.
p
->
is_last_operation
());
...
@@ -509,6 +533,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -509,6 +533,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
*/
*/
if
(
!
regOperPtr
.
p
->
m_copy_tuple_location
.
isNull
())
if
(
!
regOperPtr
.
p
->
m_copy_tuple_location
.
isNull
())
{
{
jam
();
Tuple_header
*
tmp
=
(
Tuple_header
*
)
Tuple_header
*
tmp
=
(
Tuple_header
*
)
c_undo_buffer
.
get_ptr
(
&
regOperPtr
.
p
->
m_copy_tuple_location
);
c_undo_buffer
.
get_ptr
(
&
regOperPtr
.
p
->
m_copy_tuple_location
);
...
@@ -518,23 +543,26 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -518,23 +543,26 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if
(
unlikely
(
regOperPtr
.
p
->
op_struct
.
op_type
==
ZDELETE
&&
if
(
unlikely
(
regOperPtr
.
p
->
op_struct
.
op_type
==
ZDELETE
&&
tmp
->
m_header_bits
&
Tuple_header
::
DISK_ALLOC
))
tmp
->
m_header_bits
&
Tuple_header
::
DISK_ALLOC
))
{
{
jam
();
jam
();
/**
/**
* Insert+Delete
* Insert+Delete
*/
*/
regOperPtr
.
p
->
op_struct
.
m_load_diskpage_on_commit
=
0
;
regOperPtr
.
p
->
op_struct
.
m_load_diskpage_on_commit
=
0
;
regOperPtr
.
p
->
op_struct
.
m_wait_log_buffer
=
0
;
regOperPtr
.
p
->
op_struct
.
m_wait_log_buffer
=
0
;
disk_page_abort_prealloc
(
signal
,
regFragPtr
.
p
,
disk_page_abort_prealloc
(
signal
,
regFragPtr
.
p
,
&
req
.
m_page
,
req
.
m_page
.
m_page_idx
);
&
req
.
m_page
,
req
.
m_page
.
m_page_idx
);
c_lgman
->
free_log_space
(
regFragPtr
.
p
->
m_logfile_group_id
,
c_lgman
->
free_log_space
(
regFragPtr
.
p
->
m_logfile_group_id
,
regOperPtr
.
p
->
m_undo_buffer_space
);
regOperPtr
.
p
->
m_undo_buffer_space
);
if
(
0
)
ndbout_c
(
"insert+delete"
);
goto
skip_disk
;
goto
skip_disk
;
if
(
0
)
ndbout_c
(
"insert+delete"
);
jamEntry
();
goto
skip_disk
;
}
}
}
}
else
else
{
{
jam
();
// initial delete
// initial delete
ndbassert
(
regOperPtr
.
p
->
op_struct
.
op_type
==
ZDELETE
);
ndbassert
(
regOperPtr
.
p
->
op_struct
.
op_type
==
ZDELETE
);
memcpy
(
&
req
.
m_page
,
memcpy
(
&
req
.
m_page
,
...
@@ -558,11 +586,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -558,11 +586,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
/**
/**
* Timeslice
* Timeslice
*/
*/
jam
();
signal
->
theData
[
0
]
=
1
;
signal
->
theData
[
0
]
=
1
;
return
;
return
;
case
-
1
:
case
-
1
:
ndbrequire
(
"NOT YET IMPLEMENTED"
==
0
);
ndbrequire
(
"NOT YET IMPLEMENTED"
==
0
);
break
;
break
;
default:
jam
();
}
}
get_page
=
true
;
get_page
=
true
;
...
@@ -579,6 +610,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -579,6 +610,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if
(
regOperPtr
.
p
->
op_struct
.
m_wait_log_buffer
)
if
(
regOperPtr
.
p
->
op_struct
.
m_wait_log_buffer
)
{
{
jam
();
ndbassert
(
regOperPtr
.
p
->
is_first_operation
()
&&
ndbassert
(
regOperPtr
.
p
->
is_first_operation
()
&&
regOperPtr
.
p
->
is_last_operation
());
regOperPtr
.
p
->
is_last_operation
());
...
@@ -590,18 +622,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
...
@@ -590,18 +622,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
Logfile_client
lgman
(
this
,
c_lgman
,
regFragPtr
.
p
->
m_logfile_group_id
);
Logfile_client
lgman
(
this
,
c_lgman
,
regFragPtr
.
p
->
m_logfile_group_id
);
int
res
=
lgman
.
get_log_buffer
(
signal
,
sz
,
&
cb
);
int
res
=
lgman
.
get_log_buffer
(
signal
,
sz
,
&
cb
);
jamEntry
();
switch
(
res
){
switch
(
res
){
case
0
:
case
0
:
jam
();
signal
->
theData
[
0
]
=
1
;
signal
->
theData
[
0
]
=
1
;
return
;
return
;
case
-
1
:
case
-
1
:
ndbrequire
(
"NOT YET IMPLEMENTED"
==
0
);
ndbrequire
(
"NOT YET IMPLEMENTED"
==
0
);
break
;
break
;
default:
jam
();
}
}
}
}
if
(
!
tuple_ptr
)
if
(
!
tuple_ptr
)
{
{
jam
();
tuple_ptr
=
(
Tuple_header
*
)
tuple_ptr
=
(
Tuple_header
*
)
get_ptr
(
&
page
,
&
regOperPtr
.
p
->
m_tuple_location
,
regTabPtr
.
p
);
get_ptr
(
&
page
,
&
regOperPtr
.
p
->
m_tuple_location
,
regTabPtr
.
p
);
}
}
...
@@ -610,6 +647,7 @@ skip_disk:
...
@@ -610,6 +647,7 @@ skip_disk:
if
(
get_tuple_state
(
regOperPtr
.
p
)
==
TUPLE_PREPARED
)
if
(
get_tuple_state
(
regOperPtr
.
p
)
==
TUPLE_PREPARED
)
{
{
jam
();
/**
/**
* Execute all tux triggers at first commit
* Execute all tux triggers at first commit
* since previous tuple is otherwise removed...
* since previous tuple is otherwise removed...
...
@@ -635,6 +673,7 @@ skip_disk:
...
@@ -635,6 +673,7 @@ skip_disk:
if
(
regOperPtr
.
p
->
is_last_operation
())
if
(
regOperPtr
.
p
->
is_last_operation
())
{
{
jam
();
/**
/**
* Perform "real" commit
* Perform "real" commit
*/
*/
...
@@ -645,12 +684,14 @@ skip_disk:
...
@@ -645,12 +684,14 @@ skip_disk:
if
(
regOperPtr
.
p
->
op_struct
.
op_type
!=
ZDELETE
)
if
(
regOperPtr
.
p
->
op_struct
.
op_type
!=
ZDELETE
)
{
{
jam
();
commit_operation
(
signal
,
gci
,
tuple_ptr
,
page
,
commit_operation
(
signal
,
gci
,
tuple_ptr
,
page
,
regOperPtr
.
p
,
regFragPtr
.
p
,
regTabPtr
.
p
);
regOperPtr
.
p
,
regFragPtr
.
p
,
regTabPtr
.
p
);
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
}
}
else
else
{
{
jam
();
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
if
(
get_page
)
if
(
get_page
)
ndbassert
(
tuple_ptr
->
m_header_bits
&
Tuple_header
::
DISK_PART
);
ndbassert
(
tuple_ptr
->
m_header_bits
&
Tuple_header
::
DISK_PART
);
...
@@ -660,6 +701,7 @@ skip_disk:
...
@@ -660,6 +701,7 @@ skip_disk:
}
}
else
else
{
{
jam
();
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
removeActiveOpList
(
regOperPtr
.
p
,
tuple_ptr
);
}
}
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
View file @
45aaaac8
...
@@ -282,6 +282,7 @@ Dbtup::update_extent_pos(Disk_alloc_info& alloc,
...
@@ -282,6 +282,7 @@ Dbtup::update_extent_pos(Disk_alloc_info& alloc,
void
void
Dbtup
::
restart_setup_page
(
Disk_alloc_info
&
alloc
,
PagePtr
pagePtr
)
Dbtup
::
restart_setup_page
(
Disk_alloc_info
&
alloc
,
PagePtr
pagePtr
)
{
{
jam
();
/**
/**
* Link to extent, clear uncommitted_used_space
* Link to extent, clear uncommitted_used_space
*/
*/
...
@@ -302,6 +303,7 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr)
...
@@ -302,6 +303,7 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr)
ddassert
(
real_free
>=
estimated
);
ddassert
(
real_free
>=
estimated
);
if
(
real_free
!=
estimated
)
if
(
real_free
!=
estimated
)
{
{
jam
();
extentPtr
.
p
->
m_free_space
+=
(
real_free
-
estimated
);
extentPtr
.
p
->
m_free_space
+=
(
real_free
-
estimated
);
update_extent_pos
(
alloc
,
extentPtr
);
update_extent_pos
(
alloc
,
extentPtr
);
}
}
...
@@ -374,6 +376,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -374,6 +376,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
key
->
m_file_no
=
tmp
.
p
->
m_file_no
;
key
->
m_file_no
=
tmp
.
p
->
m_file_no
;
if
(
DBG_DISK
)
if
(
DBG_DISK
)
ndbout
<<
" found dirty page "
<<
*
key
<<
endl
;
ndbout
<<
" found dirty page "
<<
*
key
<<
endl
;
jam
();
return
0
;
// Page in memory
return
0
;
// Page in memory
}
}
}
}
...
@@ -395,6 +398,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -395,6 +398,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
*
key
=
req
.
p
->
m_key
;
*
key
=
req
.
p
->
m_key
;
if
(
DBG_DISK
)
if
(
DBG_DISK
)
ndbout
<<
" found transit page "
<<
*
key
<<
endl
;
ndbout
<<
" found transit page "
<<
*
key
<<
endl
;
jam
();
return
0
;
return
0
;
}
}
}
}
...
@@ -404,6 +408,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -404,6 +408,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
*/
*/
if
(
!
c_page_request_pool
.
seize
(
req
))
if
(
!
c_page_request_pool
.
seize
(
req
))
{
{
jam
();
err
=
1
;
err
=
1
;
//XXX set error code
//XXX set error code
ndbout_c
(
"no free request"
);
ndbout_c
(
"no free request"
);
...
@@ -468,6 +473,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -468,6 +473,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
err
=
c_lgman
->
alloc_log_space
(
logfile_group_id
,
err
=
c_lgman
->
alloc_log_space
(
logfile_group_id
,
sizeof
(
Disk_undo
::
AllocExtent
)
>>
2
);
sizeof
(
Disk_undo
::
AllocExtent
)
>>
2
);
jamEntry
();
if
(
unlikely
(
err
))
if
(
unlikely
(
err
))
{
{
return
-
err
;
return
-
err
;
...
@@ -568,6 +574,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -568,6 +574,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
Uint32
newPageBits
=
alloc
.
calc_page_free_bits
(
new_size
);
Uint32
newPageBits
=
alloc
.
calc_page_free_bits
(
new_size
);
if
(
newPageBits
!=
(
Uint32
)
pageBits
)
if
(
newPageBits
!=
(
Uint32
)
pageBits
)
{
{
jam
();
ddassert
(
ext
.
p
->
m_free_page_count
[
pageBits
]
>
0
);
ddassert
(
ext
.
p
->
m_free_page_count
[
pageBits
]
>
0
);
ext
.
p
->
m_free_page_count
[
pageBits
]
--
;
ext
.
p
->
m_free_page_count
[
pageBits
]
--
;
ext
.
p
->
m_free_page_count
[
newPageBits
]
++
;
ext
.
p
->
m_free_page_count
[
newPageBits
]
++
;
...
@@ -595,6 +602,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -595,6 +602,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
int
flags
=
Page_cache_client
::
ALLOC_REQ
;
int
flags
=
Page_cache_client
::
ALLOC_REQ
;
if
(
pageBits
==
0
)
if
(
pageBits
==
0
)
{
{
jam
();
//XXX empty page -> fast to map
//XXX empty page -> fast to map
flags
|=
Page_cache_client
::
EMPTY_PAGE
;
flags
|=
Page_cache_client
::
EMPTY_PAGE
;
preq
.
m_callback
.
m_callbackFunction
=
preq
.
m_callback
.
m_callbackFunction
=
...
@@ -606,11 +614,13 @@ Dbtup::disk_page_prealloc(Signal* signal,
...
@@ -606,11 +614,13 @@ Dbtup::disk_page_prealloc(Signal* signal,
switch
(
res
)
switch
(
res
)
{
{
case
0
:
case
0
:
jam
();
break
;
break
;
case
-
1
:
case
-
1
:
ndbassert
(
false
);
ndbassert
(
false
);
break
;
break
;
default:
default:
jam
();
execute
(
signal
,
preq
.
m_callback
,
res
);
// run callback
execute
(
signal
,
preq
.
m_callback
,
res
);
// run callback
}
}
...
@@ -622,6 +632,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
...
@@ -622,6 +632,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
PagePtr
pagePtr
,
PagePtr
pagePtr
,
Uint32
old_idx
,
Uint32
sz
)
Uint32
old_idx
,
Uint32
sz
)
{
{
jam
();
ddassert
(
pagePtr
.
p
->
list_index
==
old_idx
);
ddassert
(
pagePtr
.
p
->
list_index
==
old_idx
);
Uint32
free
=
pagePtr
.
p
->
free_space
;
Uint32
free
=
pagePtr
.
p
->
free_space
;
...
@@ -637,6 +648,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
...
@@ -637,6 +648,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
if
(
old_idx
!=
new_idx
)
if
(
old_idx
!=
new_idx
)
{
{
jam
();
LocalDLList
<
Page
>
old_list
(
*
pool
,
alloc
.
m_dirty_pages
[
old_idx
]);
LocalDLList
<
Page
>
old_list
(
*
pool
,
alloc
.
m_dirty_pages
[
old_idx
]);
LocalDLList
<
Page
>
new_list
(
*
pool
,
alloc
.
m_dirty_pages
[
new_idx
]);
LocalDLList
<
Page
>
new_list
(
*
pool
,
alloc
.
m_dirty_pages
[
new_idx
]);
old_list
.
remove
(
pagePtr
);
old_list
.
remove
(
pagePtr
);
...
@@ -660,6 +672,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
...
@@ -660,6 +672,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
Ptr
<
Page_request
>
req
,
Ptr
<
Page_request
>
req
,
Uint32
old_idx
,
Uint32
sz
)
Uint32
old_idx
,
Uint32
sz
)
{
{
jam
();
ddassert
(
req
.
p
->
m_list_index
==
old_idx
);
ddassert
(
req
.
p
->
m_list_index
==
old_idx
);
Uint32
free
=
req
.
p
->
m_estimated_free_space
;
Uint32
free
=
req
.
p
->
m_estimated_free_space
;
...
@@ -674,6 +687,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
...
@@ -674,6 +687,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
if
(
old_idx
!=
new_idx
)
if
(
old_idx
!=
new_idx
)
{
{
jam
();
Page_request_list
::
Head
*
lists
=
alloc
.
m_page_requests
;
Page_request_list
::
Head
*
lists
=
alloc
.
m_page_requests
;
Local_page_request_list
old_list
(
c_page_request_pool
,
lists
[
old_idx
]);
Local_page_request_list
old_list
(
c_page_request_pool
,
lists
[
old_idx
]);
Local_page_request_list
new_list
(
c_page_request_pool
,
lists
[
new_idx
]);
Local_page_request_list
new_list
(
c_page_request_pool
,
lists
[
new_idx
]);
...
@@ -698,6 +712,7 @@ void
...
@@ -698,6 +712,7 @@ void
Dbtup
::
disk_page_prealloc_callback
(
Signal
*
signal
,
Dbtup
::
disk_page_prealloc_callback
(
Signal
*
signal
,
Uint32
page_request
,
Uint32
page_id
)
Uint32
page_request
,
Uint32
page_id
)
{
{
jamEntry
();
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
Ptr
<
Page_request
>
req
;
Ptr
<
Page_request
>
req
;
...
@@ -727,6 +742,7 @@ Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
...
@@ -727,6 +742,7 @@ Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
Uint32
page_request
,
Uint32
page_request
,
Uint32
page_id
)
Uint32
page_id
)
{
{
jamEntry
();
//ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
//ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
/**
/**
* 1) lookup page request
* 1) lookup page request
...
@@ -818,6 +834,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
...
@@ -818,6 +834,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
if
(
old_idx
!=
new_idx
||
free
!=
real_free
)
if
(
old_idx
!=
new_idx
||
free
!=
real_free
)
{
{
jam
();
Ptr
<
Extent_info
>
extentPtr
;
Ptr
<
Extent_info
>
extentPtr
;
c_extent_pool
.
getPtr
(
extentPtr
,
ext
);
c_extent_pool
.
getPtr
(
extentPtr
,
ext
);
...
@@ -825,6 +842,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
...
@@ -825,6 +842,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
if
(
old_idx
!=
new_idx
)
if
(
old_idx
!=
new_idx
)
{
{
jam
();
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
...
@@ -843,9 +861,11 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
...
@@ -843,9 +861,11 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
void
void
Dbtup
::
disk_page_set_dirty
(
PagePtr
pagePtr
)
Dbtup
::
disk_page_set_dirty
(
PagePtr
pagePtr
)
{
{
jam
();
Uint32
idx
=
pagePtr
.
p
->
list_index
;
Uint32
idx
=
pagePtr
.
p
->
list_index
;
if
((
idx
&
0x8000
)
==
0
)
if
((
idx
&
0x8000
)
==
0
)
{
{
jam
();
/**
/**
* Already in dirty list
* Already in dirty list
*/
*/
...
@@ -874,7 +894,6 @@ Dbtup::disk_page_set_dirty(PagePtr pagePtr)
...
@@ -874,7 +894,6 @@ Dbtup::disk_page_set_dirty(PagePtr pagePtr)
Uint32
used
=
pagePtr
.
p
->
uncommitted_used_space
;
Uint32
used
=
pagePtr
.
p
->
uncommitted_used_space
;
if
(
unlikely
(
pagePtr
.
p
->
m_restart_seq
!=
globalData
.
m_restart_seq
))
if
(
unlikely
(
pagePtr
.
p
->
m_restart_seq
!=
globalData
.
m_restart_seq
))
{
{
jam
();
restart_setup_page
(
alloc
,
pagePtr
);
restart_setup_page
(
alloc
,
pagePtr
);
idx
=
alloc
.
calc_page_free_bits
(
free
);
idx
=
alloc
.
calc_page_free_bits
(
free
);
used
=
0
;
used
=
0
;
...
@@ -918,6 +937,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
...
@@ -918,6 +937,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
type
!=
File_formats
::
PT_Tup_varsize_page
)
||
type
!=
File_formats
::
PT_Tup_varsize_page
)
||
f_undo_done
==
false
))
f_undo_done
==
false
))
{
{
jam
();
return
;
return
;
}
}
...
@@ -1014,6 +1034,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
...
@@ -1014,6 +1034,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
<<
endl
;
<<
endl
;
}
}
tsman
.
update_page_free_bits
(
&
key
,
alloc
.
calc_page_free_bits
(
real_free
));
tsman
.
update_page_free_bits
(
&
key
,
alloc
.
calc_page_free_bits
(
real_free
));
jamEntry
();
}
}
}
}
...
@@ -1022,6 +1043,7 @@ Dbtup::disk_page_alloc(Signal* signal,
...
@@ -1022,6 +1043,7 @@ Dbtup::disk_page_alloc(Signal* signal,
Tablerec
*
tabPtrP
,
Fragrecord
*
fragPtrP
,
Tablerec
*
tabPtrP
,
Fragrecord
*
fragPtrP
,
Local_key
*
key
,
PagePtr
pagePtr
,
Uint32
gci
)
Local_key
*
key
,
PagePtr
pagePtr
,
Uint32
gci
)
{
{
jam
();
Uint32
logfile_group_id
=
fragPtrP
->
m_logfile_group_id
;
Uint32
logfile_group_id
=
fragPtrP
->
m_logfile_group_id
;
Disk_alloc_info
&
alloc
=
fragPtrP
->
m_disk_alloc_info
;
Disk_alloc_info
&
alloc
=
fragPtrP
->
m_disk_alloc_info
;
...
@@ -1050,6 +1072,7 @@ Dbtup::disk_page_free(Signal *signal,
...
@@ -1050,6 +1072,7 @@ Dbtup::disk_page_free(Signal *signal,
Tablerec
*
tabPtrP
,
Fragrecord
*
fragPtrP
,
Tablerec
*
tabPtrP
,
Fragrecord
*
fragPtrP
,
Local_key
*
key
,
PagePtr
pagePtr
,
Uint32
gci
)
Local_key
*
key
,
PagePtr
pagePtr
,
Uint32
gci
)
{
{
jam
();
if
(
DBG_DISK
)
if
(
DBG_DISK
)
ndbout
<<
" disk_page_free "
<<
*
key
<<
endl
;
ndbout
<<
" disk_page_free "
<<
*
key
<<
endl
;
...
@@ -1100,6 +1123,7 @@ Dbtup::disk_page_free(Signal *signal,
...
@@ -1100,6 +1123,7 @@ Dbtup::disk_page_free(Signal *signal,
if
(
old_idx
!=
new_idx
)
if
(
old_idx
!=
new_idx
)
{
{
jam
();
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
...
@@ -1126,6 +1150,7 @@ void
...
@@ -1126,6 +1150,7 @@ void
Dbtup
::
disk_page_abort_prealloc
(
Signal
*
signal
,
Fragrecord
*
fragPtrP
,
Dbtup
::
disk_page_abort_prealloc
(
Signal
*
signal
,
Fragrecord
*
fragPtrP
,
Local_key
*
key
,
Uint32
sz
)
Local_key
*
key
,
Uint32
sz
)
{
{
jam
();
Page_cache_client
::
Request
req
;
Page_cache_client
::
Request
req
;
req
.
m_callback
.
m_callbackData
=
sz
;
req
.
m_callback
.
m_callbackData
=
sz
;
req
.
m_callback
.
m_callbackFunction
=
req
.
m_callback
.
m_callbackFunction
=
...
@@ -1139,9 +1164,13 @@ Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
...
@@ -1139,9 +1164,13 @@ Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
switch
(
res
)
switch
(
res
)
{
{
case
0
:
case
0
:
jam
();
break
;
case
-
1
:
case
-
1
:
ndbrequire
(
false
);
break
;
break
;
default:
default:
jam
();
Ptr
<
GlobalPage
>
gpage
;
Ptr
<
GlobalPage
>
gpage
;
m_global_page_pool
.
getPtr
(
gpage
,
(
Uint32
)
res
);
m_global_page_pool
.
getPtr
(
gpage
,
(
Uint32
)
res
);
PagePtr
pagePtr
;
PagePtr
pagePtr
;
...
@@ -1157,7 +1186,7 @@ Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
...
@@ -1157,7 +1186,7 @@ Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
Uint32
sz
,
Uint32
page_id
)
Uint32
sz
,
Uint32
page_id
)
{
{
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
jamEntry
();
Ptr
<
GlobalPage
>
gpage
;
Ptr
<
GlobalPage
>
gpage
;
m_global_page_pool
.
getPtr
(
gpage
,
page_id
);
m_global_page_pool
.
getPtr
(
gpage
,
page_id
);
...
@@ -1200,12 +1229,14 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
...
@@ -1200,12 +1229,14 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
c_extent_pool
.
getPtr
(
extentPtr
,
ext
);
c_extent_pool
.
getPtr
(
extentPtr
,
ext
);
if
(
old_idx
!=
new_idx
)
if
(
old_idx
!=
new_idx
)
{
{
jam
();
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
ddassert
(
extentPtr
.
p
->
m_free_page_count
[
old_idx
]);
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
old_idx
]
--
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
extentPtr
.
p
->
m_free_page_count
[
new_idx
]
++
;
if
(
old_idx
==
page_idx
)
if
(
old_idx
==
page_idx
)
{
{
jam
();
ArrayPool
<
Page
>
*
pool
=
(
ArrayPool
<
Page
>*
)
&
m_global_page_pool
;
ArrayPool
<
Page
>
*
pool
=
(
ArrayPool
<
Page
>*
)
&
m_global_page_pool
;
LocalDLList
<
Page
>
old_list
(
*
pool
,
alloc
.
m_dirty_pages
[
old_idx
]);
LocalDLList
<
Page
>
old_list
(
*
pool
,
alloc
.
m_dirty_pages
[
old_idx
]);
LocalDLList
<
Page
>
new_list
(
*
pool
,
alloc
.
m_dirty_pages
[
new_idx
]);
LocalDLList
<
Page
>
new_list
(
*
pool
,
alloc
.
m_dirty_pages
[
new_idx
]);
...
@@ -1215,6 +1246,7 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
...
@@ -1215,6 +1246,7 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
}
}
else
else
{
{
jam
();
pagePtr
.
p
->
list_index
=
new_idx
|
0x8000
;
pagePtr
.
p
->
list_index
=
new_idx
|
0x8000
;
}
}
}
}
...
@@ -1272,6 +1304,7 @@ Uint64
...
@@ -1272,6 +1304,7 @@ Uint64
Dbtup
::
disk_page_undo_alloc
(
Page
*
page
,
const
Local_key
*
key
,
Dbtup
::
disk_page_undo_alloc
(
Page
*
page
,
const
Local_key
*
key
,
Uint32
sz
,
Uint32
gci
,
Uint32
logfile_group_id
)
Uint32
sz
,
Uint32
gci
,
Uint32
logfile_group_id
)
{
{
jam
();
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Disk_undo
::
Alloc
alloc
;
Disk_undo
::
Alloc
alloc
;
...
@@ -1293,6 +1326,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
...
@@ -1293,6 +1326,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
const
Uint32
*
src
,
Uint32
sz
,
const
Uint32
*
src
,
Uint32
sz
,
Uint32
gci
,
Uint32
logfile_group_id
)
Uint32
gci
,
Uint32
logfile_group_id
)
{
{
jam
();
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Disk_undo
::
Update
update
;
Disk_undo
::
Update
update
;
...
@@ -1323,6 +1357,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
...
@@ -1323,6 +1357,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
const
Uint32
*
src
,
Uint32
sz
,
const
Uint32
*
src
,
Uint32
sz
,
Uint32
gci
,
Uint32
logfile_group_id
)
Uint32
gci
,
Uint32
logfile_group_id
)
{
{
jam
();
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Logfile_client
lgman
(
this
,
c_lgman
,
logfile_group_id
);
Disk_undo
::
Free
free
;
Disk_undo
::
Free
free
;
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
View file @
45aaaac8
...
@@ -54,7 +54,14 @@ Dbtup::execACC_SCANREQ(Signal* signal)
...
@@ -54,7 +54,14 @@ Dbtup::execACC_SCANREQ(Signal* signal)
// flags
// flags
Uint32
bits
=
0
;
Uint32
bits
=
0
;
if
(
!
AccScanReq
::
getLcpScanFlag
(
req
->
requestInfo
))
if
(
AccScanReq
::
getLcpScanFlag
(
req
->
requestInfo
))
{
jam
();
bits
|=
ScanOp
::
SCAN_LCP
;
c_scanOpPool
.
getPtr
(
scanPtr
,
c_lcp_scan_op
);
}
else
{
{
// seize from pool and link to per-fragment list
// seize from pool and link to per-fragment list
LocalDLList
<
ScanOp
>
list
(
c_scanOpPool
,
frag
.
m_scanList
);
LocalDLList
<
ScanOp
>
list
(
c_scanOpPool
,
frag
.
m_scanList
);
...
@@ -62,37 +69,26 @@ Dbtup::execACC_SCANREQ(Signal* signal)
...
@@ -62,37 +69,26 @@ Dbtup::execACC_SCANREQ(Signal* signal)
jam
();
jam
();
break
;
break
;
}
}
}
if
(
!
AccScanReq
::
getNoDiskScanFlag
(
req
->
requestInfo
)
&&
tablePtr
.
p
->
m_no_of_disk_attributes
)
if
(
!
AccScanReq
::
getNoDiskScanFlag
(
req
->
requestInfo
)
{
&&
tablePtr
.
p
->
m_no_of_disk_attributes
)
bits
|=
ScanOp
::
SCAN_DD
;
{
}
bits
|=
ScanOp
::
SCAN_DD
;
}
bool
mm
=
(
bits
&
ScanOp
::
SCAN_DD
);
if
(
tablePtr
.
p
->
m_attributes
[
mm
].
m_no_of_varsize
>
0
)
{
bool
mm
=
(
bits
&
ScanOp
::
SCAN_DD
);
bits
|=
ScanOp
::
SCAN_VS
;
if
(
tablePtr
.
p
->
m_attributes
[
mm
].
m_no_of_varsize
>
0
)
{
bits
|=
ScanOp
::
SCAN_VS
;
// disk pages have fixed page format
ndbrequire
(
!
(
bits
&
ScanOp
::
SCAN_DD
));
}
if
(
!
AccScanReq
::
getReadCommittedFlag
(
req
->
requestInfo
))
{
if
(
AccScanReq
::
getLockMode
(
req
->
requestInfo
)
==
0
)
bits
|=
ScanOp
::
SCAN_LOCK_SH
;
else
bits
|=
ScanOp
::
SCAN_LOCK_EX
;
}
}
else
{
jam
();
// LCP scan and disk
ndbrequire
(
frag
.
m_lcp_scan_op
==
c_lcp_scan_op
);
// disk pages have fixed page format
c_scanOpPool
.
getPtr
(
scanPtr
,
frag
.
m_lcp_scan_op
);
ndbrequire
(
!
(
bits
&
ScanOp
::
SCAN_DD
));
ndbrequire
(
scanPtr
.
p
->
m_fragPtrI
==
fragPtr
.
i
);
}
bits
|=
ScanOp
::
SCAN_LCP
;
if
(
!
AccScanReq
::
getReadCommittedFlag
(
req
->
requestInfo
))
{
if
(
tablePtr
.
p
->
m_attributes
[
MM
].
m_no_of_varsize
>
0
)
{
if
(
AccScanReq
::
getLockMode
(
req
->
requestInfo
)
==
0
)
bits
|=
ScanOp
::
SCAN_VS
;
bits
|=
ScanOp
::
SCAN_LOCK_SH
;
}
else
bits
|=
ScanOp
::
SCAN_LOCK_EX
;
}
}
if
(
AccScanReq
::
getNRScanFlag
(
req
->
requestInfo
))
if
(
AccScanReq
::
getNRScanFlag
(
req
->
requestInfo
))
...
@@ -112,6 +108,13 @@ Dbtup::execACC_SCANREQ(Signal* signal)
...
@@ -112,6 +108,13 @@ Dbtup::execACC_SCANREQ(Signal* signal)
jam
();
jam
();
scanPtr
.
p
->
m_endPage
=
RNIL
;
scanPtr
.
p
->
m_endPage
=
RNIL
;
}
}
if
(
AccScanReq
::
getLcpScanFlag
(
req
->
requestInfo
))
{
jam
();
ndbrequire
((
bits
&
ScanOp
::
SCAN_DD
)
==
0
);
ndbrequire
((
bits
&
ScanOp
::
SCAN_LOCK
)
==
0
);
}
// set up scan op
// set up scan op
new
(
scanPtr
.
p
)
ScanOp
();
new
(
scanPtr
.
p
)
ScanOp
();
...
@@ -1159,16 +1162,17 @@ Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
...
@@ -1159,16 +1162,17 @@ Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
fragPtr
.
i
=
scanPtr
.
p
->
m_fragPtrI
;
fragPtr
.
i
=
scanPtr
.
p
->
m_fragPtrI
;
ptrCheckGuard
(
fragPtr
,
cnoOfFragrec
,
fragrecord
);
ptrCheckGuard
(
fragPtr
,
cnoOfFragrec
,
fragrecord
);
if
(
!
(
scanPtr
.
p
->
m_bits
&
ScanOp
::
SCAN_LCP
)
)
if
(
scanPtr
.
p
->
m_bits
&
ScanOp
::
SCAN_LCP
)
{
{
LocalDLList
<
ScanOp
>
list
(
c_scanOpPool
,
fragPtr
.
p
->
m_scanList
);
jam
();
list
.
release
(
scanPtr
);
fragPtr
.
p
->
m_lcp_scan_op
=
RNIL
;
scanPtr
.
p
->
m_fragPtrI
=
RNIL
;
}
}
else
else
{
{
ndbrequire
(
fragPtr
.
p
->
m_lcp_scan_op
==
scanPtr
.
i
);
jam
(
);
fragPtr
.
p
->
m_lcp_scan_op
=
RNIL
;
LocalDLList
<
ScanOp
>
list
(
c_scanOpPool
,
fragPtr
.
p
->
m_scanList
);
scanPtr
.
p
->
m_fragPtrI
=
RNIL
;
list
.
release
(
scanPtr
)
;
}
}
}
}
...
@@ -1181,21 +1185,24 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
...
@@ -1181,21 +1185,24 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
tablePtr
.
i
=
req
->
tableId
;
tablePtr
.
i
=
req
->
tableId
;
ptrCheckGuard
(
tablePtr
,
cnoOfTablerec
,
tablerec
);
ptrCheckGuard
(
tablePtr
,
cnoOfTablerec
,
tablerec
);
jam
();
if
(
tablePtr
.
p
->
m_no_of_disk_attributes
)
FragrecordPtr
fragPtr
;
{
Uint32
fragId
=
req
->
fragmentId
;
jam
();
fragPtr
.
i
=
RNIL
;
FragrecordPtr
fragPtr
;
getFragmentrec
(
fragPtr
,
fragId
,
tablePtr
.
p
);
Uint32
fragId
=
req
->
fragmentId
;
ndbrequire
(
fragPtr
.
i
!=
RNIL
);
fragPtr
.
i
=
RNIL
;
Fragrecord
&
frag
=
*
fragPtr
.
p
;
getFragmentrec
(
fragPtr
,
fragId
,
tablePtr
.
p
);
ndbrequire
(
fragPtr
.
i
!=
RNIL
);
ndbrequire
(
frag
.
m_lcp_scan_op
==
RNIL
&&
c_lcp_scan_op
!=
RNIL
);
Fragrecord
&
frag
=
*
fragPtr
.
p
;
frag
.
m_lcp_scan_op
=
c_lcp_scan_op
;
ScanOpPtr
scanPtr
;
ndbrequire
(
frag
.
m_lcp_scan_op
==
RNIL
&&
c_lcp_scan_op
!=
RNIL
);
c_scanOpPool
.
getPtr
(
scanPtr
,
frag
.
m_lcp_scan_op
);
frag
.
m_lcp_scan_op
=
c_lcp_scan_op
;
ndbrequire
(
scanPtr
.
p
->
m_fragPtrI
==
RNIL
);
ScanOpPtr
scanPtr
;
scanPtr
.
p
->
m_fragPtrI
=
fragPtr
.
i
;
c_scanOpPool
.
getPtr
(
scanPtr
,
frag
.
m_lcp_scan_op
);
ndbrequire
(
scanPtr
.
p
->
m_fragPtrI
==
RNIL
);
scanFirst
(
signal
,
scanPtr
);
scanPtr
.
p
->
m_fragPtrI
=
fragPtr
.
i
;
scanPtr
.
p
->
m_state
=
ScanOp
::
First
;
scanFirst
(
signal
,
scanPtr
);
scanPtr
.
p
->
m_state
=
ScanOp
::
First
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment