Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
0c722c90
Commit
0c722c90
authored
Jun 11, 2004
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge mysql.com:/home/jonas/src/mysql-4.1
into mysql.com:/home/jonas/src/mysql-4.1-ndb
parents
2e8e6bd6
0f464561
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
1907 additions
and
1432 deletions
+1907
-1432
ndb/include/kernel/signaldata/TcKeyConf.hpp
ndb/include/kernel/signaldata/TcKeyConf.hpp
+1
-1
ndb/include/util/Parser.hpp
ndb/include/util/Parser.hpp
+1
-0
ndb/src/kernel/blocks/backup/restore/Makefile.am
ndb/src/kernel/blocks/backup/restore/Makefile.am
+1
-1
ndb/src/kernel/blocks/backup/restore/Makefile_old
ndb/src/kernel/blocks/backup/restore/Makefile_old
+0
-20
ndb/src/kernel/blocks/backup/restore/Restore.cpp
ndb/src/kernel/blocks/backup/restore/Restore.cpp
+278
-146
ndb/src/kernel/blocks/backup/restore/Restore.hpp
ndb/src/kernel/blocks/backup/restore/Restore.hpp
+82
-40
ndb/src/kernel/blocks/backup/restore/consumer.cpp
ndb/src/kernel/blocks/backup/restore/consumer.cpp
+107
-0
ndb/src/kernel/blocks/backup/restore/consumer.hpp
ndb/src/kernel/blocks/backup/restore/consumer.hpp
+34
-0
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
+55
-0
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
+50
-0
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
+516
-0
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
+75
-0
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
+652
-0
ndb/src/kernel/blocks/backup/restore/main.cpp
ndb/src/kernel/blocks/backup/restore/main.cpp
+55
-1096
ndb/src/kernel/blocks/backup/restore/myVector.hpp
ndb/src/kernel/blocks/backup/restore/myVector.hpp
+0
-128
No files found.
ndb/include/kernel/signaldata/TcKeyConf.hpp
View file @
0c722c90
...
...
@@ -111,7 +111,7 @@ inline
void
TcKeyConf
::
setNoOfOperations
(
Uint32
&
confInfo
,
Uint32
noOfOps
){
ASSERT_MAX
(
noOfOps
,
65535
,
"TcKeyConf::setNoOfOperations"
);
confInfo
|=
noOfOps
;
confInfo
=
(
confInfo
&
0xFFFF0000
)
|
noOfOps
;
}
inline
...
...
ndb/include/util/Parser.hpp
View file @
0c722c90
...
...
@@ -165,6 +165,7 @@ Parser<T>::Parser(const ParserRow<T> rows[], class InputStream & in,
template
<
class
T
>
inline
Parser
<
T
>::~
Parser
(){
delete
impl
;
}
template
<
class
T
>
...
...
ndb/src/kernel/blocks/backup/restore/Makefile.am
View file @
0c722c90
ndbtools_PROGRAMS
=
ndb_restore
ndb_restore_SOURCES
=
main.cpp Restore.cpp
ndb_restore_SOURCES
=
main.cpp
consumer.cpp consumer_restore.cpp consumer_printer.cpp
Restore.cpp
LDADD_LOC
=
$(top_builddir)
/ndb/src/libndbclient.la
...
...
ndb/src/kernel/blocks/backup/restore/Makefile_old
deleted
100644 → 0
View file @
2e8e6bd6
include .defs.mk
TYPE := *
BIN_TARGET := restore
BIN_TARGET_LIBS :=
BIN_TARGET_ARCHIVES := NDB_API
CCFLAGS_LOC = -I.. -I$(NDB_TOP)/src/ndbapi -I$(NDB_TOP)/include/ndbapi -I$(NDB_TOP)/include/util -I$(NDB_TOP)/include/portlib -I$(NDB_TOP)/include/kernel
#ifneq ($(MYSQLCLUSTER_TOP),)
#CCFLAGS_LOC +=-I$(MYSQLCLUSTER_TOP)/include -D USE_MYSQL
#LDFLAGS_LOC += -L$(MYSQLCLUSTER_TOP)/libmysql_r/ -lmysqlclient_r
#endif
SOURCES = main.cpp Restore.cpp
include $(NDB_TOP)/Epilogue.mk
ndb/src/kernel/blocks/backup/restore/Restore.cpp
View file @
0c722c90
...
...
@@ -33,32 +33,32 @@ Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data
Uint64
Twiddle64
(
Uint64
in
);
// Byte shift 64-bit data
bool
BackupFile
::
Twiddle
(
AttributeS
*
attr
,
Uint32
arraySize
){
BackupFile
::
Twiddle
(
const
AttributeDesc
*
attr_desc
,
AttributeData
*
attr_data
,
Uint32
arraySize
){
if
(
m_hostByteOrder
)
return
true
;
if
(
arraySize
==
0
){
arraySize
=
attr
->
D
esc
->
arraySize
;
arraySize
=
attr
_d
esc
->
arraySize
;
}
switch
(
attr
->
D
esc
->
size
){
switch
(
attr
_d
esc
->
size
){
case
8
:
return
true
;
case
16
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int16_value
[
i
]
=
Twiddle16
(
attr
->
Data
.
u_int16_value
[
i
]);
attr
_data
->
u_int16_value
[
i
]
=
Twiddle16
(
attr_data
->
u_int16_value
[
i
]);
}
return
true
;
case
32
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int32_value
[
i
]
=
Twiddle32
(
attr
->
Data
.
u_int32_value
[
i
]);
attr
_data
->
u_int32_value
[
i
]
=
Twiddle32
(
attr_data
->
u_int32_value
[
i
]);
}
return
true
;
case
64
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int64_value
[
i
]
=
Twiddle64
(
attr
->
Data
.
u_int64_value
[
i
]);
attr
_data
->
u_int64_value
[
i
]
=
Twiddle64
(
attr_data
->
u_int64_value
[
i
]);
}
return
true
;
default:
...
...
@@ -82,14 +82,14 @@ RestoreMetaData::RestoreMetaData(const char* path, Uint32 nodeId, Uint32 bNo) {
}
RestoreMetaData
::~
RestoreMetaData
(){
for
(
int
i
=
0
;
i
<
allTables
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
allTables
.
size
();
i
++
)
delete
allTables
[
i
];
allTables
.
clear
();
}
const
TableS
*
RestoreMetaData
::
getTable
(
Uint32
tableId
)
const
{
for
(
int
i
=
0
;
i
<
allTables
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
allTables
.
size
();
i
++
)
if
(
allTables
[
i
]
->
getTableId
()
==
tableId
)
return
allTables
[
i
];
return
NULL
;
...
...
@@ -122,7 +122,8 @@ RestoreMetaData::readMetaTableList() {
Uint32
sectionInfo
[
2
];
if
(
fread
(
&
sectionInfo
,
sizeof
(
sectionInfo
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
sectionInfo
,
sizeof
(
sectionInfo
),
1
)
!=
1
){
err
<<
"readMetaTableList read header error"
<<
endl
;
return
0
;
}
sectionInfo
[
0
]
=
ntohl
(
sectionInfo
[
0
]);
...
...
@@ -130,11 +131,9 @@ RestoreMetaData::readMetaTableList() {
const
Uint32
tabCount
=
sectionInfo
[
1
]
-
2
;
const
Uint32
len
=
4
*
tabCount
;
if
(
createBuffer
(
len
)
==
0
)
abort
();
if
(
fread
(
m_buffer
,
1
,
len
,
m_file
)
!=
len
){
void
*
tmp
;
if
(
buffer_get_ptr
(
&
tmp
,
4
,
tabCount
)
!=
tabCount
){
err
<<
"readMetaTableList read tabCount error"
<<
endl
;
return
0
;
}
...
...
@@ -147,7 +146,7 @@ RestoreMetaData::readMetaTableDesc() {
Uint32
sectionInfo
[
2
];
// Read section header
if
(
fread
(
&
sectionInfo
,
sizeof
(
sectionInfo
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
sectionInfo
,
sizeof
(
sectionInfo
),
1
)
!=
1
){
err
<<
"readMetaTableDesc read header error"
<<
endl
;
return
false
;
}
// if
...
...
@@ -156,20 +155,15 @@ RestoreMetaData::readMetaTableDesc() {
assert
(
sectionInfo
[
0
]
==
BackupFormat
::
TABLE_DESCRIPTION
);
// Allocate temporary storage for dictTabInfo buffer
const
Uint32
len
=
(
sectionInfo
[
1
]
-
2
);
if
(
createBuffer
(
4
*
(
len
+
1
))
==
NULL
)
{
err
<<
"readMetaTableDesc allocation error"
<<
endl
;
return
false
;
}
// if
// Read dictTabInfo buffer
if
(
fread
(
m_buffer
,
4
,
len
,
m_file
)
!=
len
){
const
Uint32
len
=
(
sectionInfo
[
1
]
-
2
);
void
*
ptr
;
if
(
buffer_get_ptr
(
&
ptr
,
4
,
len
)
!=
len
){
err
<<
"readMetaTableDesc read error"
<<
endl
;
return
false
;
}
// if
return
parseTableDescriptor
(
m_buffe
r
,
len
);
return
parseTableDescriptor
(
(
Uint32
*
)
pt
r
,
len
);
}
bool
...
...
@@ -177,11 +171,10 @@ RestoreMetaData::readGCPEntry() {
Uint32
data
[
4
];
BackupFormat
::
CtlFile
::
GCPEntry
*
dst
=
(
BackupFormat
::
CtlFile
::
GCPEntry
*
)
&
data
[
0
];
if
(
fread
(
dst
,
4
,
4
,
m_file
)
!=
4
){
if
(
buffer_read
(
dst
,
4
,
4
)
!=
4
){
err
<<
"readGCPEntry read error"
<<
endl
;
return
false
;
}
...
...
@@ -208,10 +201,16 @@ TableS::TableS(NdbTableImpl* tableImpl)
m_dictTable
=
tableImpl
;
m_noOfNullable
=
m_nullBitmaskSize
=
0
;
for
(
Uint32
i
=
0
;
i
<
tableImpl
->
getNoOfColumns
();
i
++
)
for
(
int
i
=
0
;
i
<
tableImpl
->
getNoOfColumns
();
i
++
)
createAttr
(
tableImpl
->
getColumn
(
i
));
}
TableS
::~
TableS
()
{
for
(
Uint32
i
=
0
;
i
<
allAttributesDesc
.
size
();
i
++
)
delete
allAttributesDesc
[
i
];
}
// Parse dictTabInfo buffer and pushback to to vector storage
bool
RestoreMetaData
::
parseTableDescriptor
(
const
Uint32
*
data
,
Uint32
len
)
...
...
@@ -246,56 +245,68 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
}
// Constructor
RestoreDataIterator
::
RestoreDataIterator
(
const
RestoreMetaData
&
md
)
:
m_metaData
(
md
)
RestoreDataIterator
::
RestoreDataIterator
(
const
RestoreMetaData
&
md
,
void
(
*
_free_data_callback
)()
)
:
BackupFile
(
_free_data_callback
),
m_metaData
(
md
)
{
debug
<<
"RestoreDataIterator constructor"
<<
endl
;
setDataFile
(
md
,
0
);
}
RestoreDataIterator
::~
RestoreDataIterator
(){
TupleS
&
TupleS
::
operator
=
(
const
TupleS
&
tuple
)
{
prepareRecord
(
*
tuple
.
m_currentTable
);
if
(
allAttrData
)
memcpy
(
allAttrData
,
tuple
.
allAttrData
,
getNoOfAttributes
()
*
sizeof
(
AttributeData
));
return
*
this
;
};
int
TupleS
::
getNoOfAttributes
()
const
{
if
(
m_currentTable
==
0
)
return
0
;
return
m_currentTable
->
getNoOfAttributes
();
};
const
TableS
*
TupleS
::
getTable
()
const
{
return
m_currentTable
;
};
const
AttributeDesc
*
TupleS
::
getDesc
(
int
i
)
const
{
return
m_currentTable
->
allAttributesDesc
[
i
];
}
AttributeData
*
TupleS
::
getData
(
int
i
)
const
{
return
&
(
allAttrData
[
i
]);
};
bool
TupleS
::
prepareRecord
(
const
TableS
&
tab
){
m_currentTable
=
&
tab
;
for
(
int
i
=
0
;
i
<
allAttributes
.
size
();
i
++
)
{
if
(
allAttributes
[
i
]
!=
NULL
)
delete
allAttributes
[
i
];
}
allAttributes
.
clear
();
AttributeS
*
a
;
for
(
int
i
=
0
;
i
<
tab
.
getNoOfAttributes
();
i
++
){
a
=
new
AttributeS
;
if
(
a
==
NULL
)
{
ndbout_c
(
"Restore: Failed to allocate memory"
);
return
false
;
if
(
allAttrData
)
{
if
(
getNoOfAttributes
()
==
tab
.
getNoOfAttributes
())
{
m_currentTable
=
&
tab
;
return
true
;
}
a
->
Desc
=
tab
[
i
]
;
allAttributes
.
push_back
(
a
)
;
delete
[]
allAttrData
;
m_currentTable
=
0
;
}
allAttrData
=
new
AttributeData
[
tab
.
getNoOfAttributes
()];
if
(
allAttrData
==
0
)
return
false
;
m_currentTable
=
&
tab
;
return
true
;
}
const
TupleS
*
RestoreDataIterator
::
getNextTuple
(
int
&
res
)
{
TupleS
*
tup
=
new
TupleS
();
if
(
tup
==
NULL
)
{
ndbout_c
(
"Restore: Failed to allocate memory"
);
res
=
-
1
;
return
NULL
;
}
if
(
!
tup
->
prepareRecord
(
*
m_currentTable
))
{
res
=-
1
;
return
NULL
;
}
RestoreDataIterator
::
getNextTuple
(
int
&
res
)
{
Uint32
dataLength
=
0
;
// Read record length
if
(
fread
(
&
dataLength
,
sizeof
(
dataLength
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
dataLength
,
sizeof
(
dataLength
),
1
)
!=
1
){
err
<<
"getNextTuple:Error reading length of data part"
<<
endl
;
delete
tup
;
res
=
-
1
;
return
NULL
;
}
// if
...
...
@@ -309,34 +320,34 @@ RestoreDataIterator::getNextTuple(int & res) {
// End of this data fragment
debug
<<
"End of fragment"
<<
endl
;
res
=
0
;
delete
tup
;
return
NULL
;
}
// if
tup
->
createDataRecord
(
dataLenBytes
);
// Read tuple data
if
(
fread
(
tup
->
getDataRecord
(),
1
,
dataLenBytes
,
m_file
)
!=
dataLenBytes
)
{
void
*
_buf_ptr
;
if
(
buffer_get_ptr
(
&
_buf_ptr
,
1
,
dataLenBytes
)
!=
dataLenBytes
)
{
err
<<
"getNextTuple:Read error: "
<<
endl
;
delete
tup
;
res
=
-
1
;
return
NULL
;
}
Uint32
*
ptr
=
tup
->
getDataRecord
()
;
Uint32
*
buf_ptr
=
(
Uint32
*
)
_buf_ptr
,
*
ptr
=
buf_ptr
;
ptr
+=
m_currentTable
->
m_nullBitmaskSize
;
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_fixedKeys
.
size
();
i
++
){
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
for
(
Uint32
i
=
0
;
i
<
m_currentTable
->
m_fixedKeys
.
size
();
i
++
){
assert
(
ptr
<
buf_ptr
+
dataLength
);
const
Uint32
attrId
=
m_currentTable
->
m_fixedKeys
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
const
Uint32
sz
=
attr
->
Desc
->
getSizeInWords
();
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
ptr
;
const
Uint32
sz
=
attr_desc
->
getSizeInWords
();
if
(
!
Twiddle
(
attr
))
attr_data
->
null
=
false
;
attr_data
->
void_value
=
ptr
;
if
(
!
Twiddle
(
attr_desc
,
attr_data
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -344,18 +355,20 @@ RestoreDataIterator::getNextTuple(int & res) {
ptr
+=
sz
;
}
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_fixedAttribs
.
size
();
i
++
){
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
for
(
Uint32
i
=
0
;
i
<
m_currentTable
->
m_fixedAttribs
.
size
();
i
++
){
assert
(
ptr
<
buf_ptr
+
dataLength
);
const
Uint32
attrId
=
m_currentTable
->
m_fixedAttribs
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
const
Uint32
sz
=
attr
->
Desc
->
getSizeInWords
();
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
const
Uint32
sz
=
attr_desc
->
getSizeInWords
();
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
ptr
;
attr
_data
->
null
=
false
;
attr
_data
->
void_value
=
ptr
;
if
(
!
Twiddle
(
attr
))
if
(
!
Twiddle
(
attr
_desc
,
attr_data
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -364,21 +377,23 @@ RestoreDataIterator::getNextTuple(int & res) {
ptr
+=
sz
;
}
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_variableAttribs
.
size
();
i
++
){
for
(
Uint32
i
=
0
;
i
<
m_currentTable
->
m_variableAttribs
.
size
();
i
++
){
const
Uint32
attrId
=
m_currentTable
->
m_variableAttribs
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
if
(
attr
->
D
esc
->
m_column
->
getNullable
()){
const
Uint32
ind
=
attr
->
D
esc
->
m_nullBitIndex
;
if
(
attr
_d
esc
->
m_column
->
getNullable
()){
const
Uint32
ind
=
attr
_d
esc
->
m_nullBitIndex
;
if
(
BitmaskImpl
::
get
(
m_currentTable
->
m_nullBitmaskSize
,
tup
->
getDataRecord
()
,
ind
)){
attr
->
Data
.
null
=
true
;
attr
->
Data
.
void_value
=
NULL
;
buf_ptr
,
ind
)){
attr
_data
->
null
=
true
;
attr
_data
->
void_value
=
NULL
;
continue
;
}
}
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
assert
(
ptr
<
buf_ptr
+
dataLength
);
typedef
BackupFormat
::
DataFile
::
VariableData
VarData
;
VarData
*
data
=
(
VarData
*
)
ptr
;
...
...
@@ -386,15 +401,15 @@ RestoreDataIterator::getNextTuple(int & res) {
Uint32
id
=
ntohl
(
data
->
Id
);
assert
(
id
==
attrId
);
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
&
data
->
Data
[
0
];
attr
_data
->
null
=
false
;
attr
_data
->
void_value
=
&
data
->
Data
[
0
];
/**
* Compute array size
*/
const
Uint32
arraySize
=
(
4
*
sz
)
/
(
attr
->
D
esc
->
size
/
8
);
assert
(
arraySize
>=
attr
->
D
esc
->
arraySize
);
if
(
!
Twiddle
(
attr
,
attr
->
D
esc
->
arraySize
))
const
Uint32
arraySize
=
(
4
*
sz
)
/
(
attr
_d
esc
->
size
/
8
);
assert
(
arraySize
>=
attr
_d
esc
->
arraySize
);
if
(
!
Twiddle
(
attr
_desc
,
attr_data
,
attr_d
esc
->
arraySize
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -405,15 +420,20 @@ RestoreDataIterator::getNextTuple(int & res) {
m_count
++
;
res
=
0
;
return
tup
;
return
&
m_tuple
;
}
// RestoreDataIterator::getNextTuple
BackupFile
::
BackupFile
(){
BackupFile
::
BackupFile
(
void
(
*
_free_data_callback
)())
:
free_data_callback
(
_free_data_callback
)
{
m_file
=
0
;
m_path
[
0
]
=
0
;
m_fileName
[
0
]
=
0
;
m_buffer
=
0
;
m_bufferSize
=
0
;
m_buffer_sz
=
64
*
1024
;
m_buffer
=
malloc
(
m_buffer_sz
);
m_buffer_ptr
=
m_buffer
;
m_buffer_data_left
=
0
;
}
BackupFile
::~
BackupFile
(){
...
...
@@ -434,15 +454,54 @@ BackupFile::openFile(){
return
m_file
!=
0
;
}
Uint32
*
BackupFile
::
createBuffer
(
Uint32
bytes
){
if
(
bytes
>
m_bufferSize
){
if
(
m_buffer
!=
0
)
free
(
m_buffer
);
m_bufferSize
=
m_bufferSize
+
2
*
bytes
;
m_buffer
=
(
Uint32
*
)
malloc
(
m_bufferSize
);
Uint32
BackupFile
::
buffer_get_ptr_ahead
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
)
{
Uint32
sz
=
size
*
nmemb
;
if
(
sz
>
m_buffer_data_left
)
{
if
(
free_data_callback
)
(
*
free_data_callback
)();
memcpy
(
m_buffer
,
m_buffer_ptr
,
m_buffer_data_left
);
size_t
r
=
fread
(((
char
*
)
m_buffer
)
+
m_buffer_data_left
,
1
,
m_buffer_sz
-
m_buffer_data_left
,
m_file
);
m_buffer_data_left
+=
r
;
m_buffer_ptr
=
m_buffer
;
if
(
sz
>
m_buffer_data_left
)
sz
=
size
*
(
m_buffer_data_left
/
size
);
}
return
m_buffer
;
*
p_buf_ptr
=
m_buffer_ptr
;
return
sz
/
size
;
}
Uint32
BackupFile
::
buffer_get_ptr
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
)
{
Uint32
r
=
buffer_get_ptr_ahead
(
p_buf_ptr
,
size
,
nmemb
);
m_buffer_ptr
=
((
char
*
)
m_buffer_ptr
)
+
(
r
*
size
);
m_buffer_data_left
-=
(
r
*
size
);
return
r
;
}
Uint32
BackupFile
::
buffer_read_ahead
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
)
{
void
*
buf_ptr
;
Uint32
r
=
buffer_get_ptr_ahead
(
&
buf_ptr
,
size
,
nmemb
);
memcpy
(
ptr
,
buf_ptr
,
r
*
size
);
return
r
;
}
Uint32
BackupFile
::
buffer_read
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
)
{
void
*
buf_ptr
;
Uint32
r
=
buffer_get_ptr
(
&
buf_ptr
,
size
,
nmemb
);
memcpy
(
ptr
,
buf_ptr
,
r
*
size
);
return
r
;
}
void
...
...
@@ -503,7 +562,7 @@ BackupFile::readHeader(){
return
false
;
}
if
(
fread
(
&
m_fileHeader
,
sizeof
(
m_fileHeader
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
m_fileHeader
,
sizeof
(
m_fileHeader
),
1
)
!=
1
){
err
<<
"readDataFileHeader: Error reading header"
<<
endl
;
return
false
;
}
...
...
@@ -551,14 +610,13 @@ BackupFile::validateFooter(){
return
true
;
}
bool
RestoreDataIterator
::
readFragmentHeader
(
int
&
ret
)
bool
RestoreDataIterator
::
readFragmentHeader
(
int
&
ret
)
{
BackupFormat
::
DataFile
::
FragmentHeader
Header
;
debug
<<
"RestoreDataIterator::getNextFragment"
<<
endl
;
if
(
fread
(
&
Header
,
sizeof
(
Header
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
Header
,
sizeof
(
Header
),
1
)
!=
1
){
ret
=
0
;
return
false
;
}
// if
...
...
@@ -581,6 +639,12 @@ RestoreDataIterator::readFragmentHeader(int & ret)
return
false
;
}
if
(
!
m_tuple
.
prepareRecord
(
*
m_currentTable
))
{
ret
=-
1
;
return
false
;
}
info
<<
"_____________________________________________________"
<<
endl
<<
"Restoring data in table: "
<<
m_currentTable
->
getTableName
()
<<
"("
<<
Header
.
TableId
<<
") fragment "
...
...
@@ -588,6 +652,7 @@ RestoreDataIterator::readFragmentHeader(int & ret)
m_count
=
0
;
ret
=
0
;
return
true
;
}
// RestoreDataIterator::getNextFragment
...
...
@@ -596,7 +661,7 @@ bool
RestoreDataIterator
::
validateFragmentFooter
()
{
BackupFormat
::
DataFile
::
FragmentFooter
footer
;
if
(
fread
(
&
footer
,
sizeof
(
footer
),
1
,
m_file
)
!=
1
){
if
(
buffer_read
(
&
footer
,
sizeof
(
footer
),
1
)
!=
1
){
err
<<
"getFragmentFooter:Error reading fragment footer"
<<
endl
;
return
false
;
}
...
...
@@ -704,45 +769,32 @@ RestoreLogIterator::getNextLogEntry(int & res) {
// Read record length
typedef
BackupFormat
::
LogFile
::
LogEntry
LogE
;
Uint32
gcp
=
0
;
LogE
*
logE
=
0
;
Uint32
len
=
~
0
;
Uint32
gcp
=
0
;
LogE
*
logE
=
0
;
Uint32
len
=
~
0
;
const
Uint32
stopGCP
=
m_metaData
.
getStopGCP
();
do
{
if
(
createBuffer
(
4
)
==
0
)
{
res
=
-
1
;
return
NULL
;
if
(
buffer_read_ahead
(
&
len
,
sizeof
(
Uint32
),
1
)
!=
1
){
res
=
-
1
;
return
0
;
}
len
=
ntohl
(
len
);
if
(
fread
(
m_buffer
,
sizeof
(
Uint32
),
1
,
m_file
)
!=
1
){
res
=
-
1
;
return
NULL
;
Uint32
data_len
=
sizeof
(
Uint32
)
+
len
*
4
;
if
(
buffer_get_ptr
((
void
**
)(
&
logE
),
1
,
data_len
)
!=
data_len
)
{
res
=
-
2
;
return
0
;
}
m_buffer
[
0
]
=
ntohl
(
m_buffer
[
0
]);
len
=
m_buffer
[
0
];
if
(
len
==
0
){
res
=
0
;
res
=
0
;
return
0
;
}
if
(
createBuffer
(
4
*
(
len
+
1
))
==
0
){
res
=
-
1
;
return
NULL
;
}
if
(
fread
(
&
m_buffer
[
1
],
4
,
len
,
m_file
)
!=
len
)
{
res
=
-
1
;
return
NULL
;
}
logE
->
TableId
=
ntohl
(
logE
->
TableId
);
logE
->
TriggerEvent
=
ntohl
(
logE
->
TriggerEvent
);
logE
=
(
LogE
*
)
&
m_buffer
[
0
];
logE
->
TableId
=
ntohl
(
logE
->
TableId
);
logE
->
TriggerEvent
=
ntohl
(
logE
->
TriggerEvent
);
const
bool
hasGcp
=
(
logE
->
TriggerEvent
&
0x10000
)
!=
0
;
const
bool
hasGcp
=
(
logE
->
TriggerEvent
&
0x10000
)
!=
0
;
logE
->
TriggerEvent
&=
0xFFFF
;
if
(
hasGcp
){
...
...
@@ -751,9 +803,6 @@ RestoreLogIterator::getNextLogEntry(int & res) {
}
}
while
(
gcp
>
stopGCP
+
1
);
for
(
int
i
=
0
;
i
<
m_logEntry
.
m_values
.
size
();
i
++
)
delete
m_logEntry
.
m_values
[
i
];
m_logEntry
.
m_values
.
clear
();
m_logEntry
.
m_table
=
m_metaData
.
getTable
(
logE
->
TableId
);
switch
(
logE
->
TriggerEvent
){
case
TriggerEvent
:
:
TE_INSERT
:
...
...
@@ -771,17 +820,19 @@ RestoreLogIterator::getNextLogEntry(int & res) {
}
const
TableS
*
tab
=
m_logEntry
.
m_table
;
m_logEntry
.
clear
();
AttributeHeader
*
ah
=
(
AttributeHeader
*
)
&
logE
->
Data
[
0
];
AttributeHeader
*
end
=
(
AttributeHeader
*
)
&
logE
->
Data
[
len
-
2
];
AttributeS
*
attr
;
while
(
ah
<
end
){
attr
=
new
AttributeS
;
attr
=
m_logEntry
.
add_attr
()
;
if
(
attr
==
NULL
)
{
ndbout_c
(
"Restore: Failed to allocate memory"
);
res
=
-
1
;
return
NULL
;
return
0
;
}
attr
->
Desc
=
(
*
tab
)[
ah
->
getAttributeId
()];
assert
(
attr
->
Desc
!=
0
);
...
...
@@ -794,13 +845,94 @@ RestoreLogIterator::getNextLogEntry(int & res) {
attr
->
Data
.
void_value
=
ah
->
getDataPtr
();
}
Twiddle
(
attr
);
m_logEntry
.
m_values
.
push_back
(
attr
);
Twiddle
(
attr
->
Desc
,
&
(
attr
->
Data
));
ah
=
ah
->
getNext
();
}
m_count
++
;
res
=
0
;
return
&
m_logEntry
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
AttributeS
&
attr
){
const
AttributeData
&
data
=
attr
.
Data
;
const
AttributeDesc
&
desc
=
*
(
attr
.
Desc
);
if
(
data
.
null
)
{
ndbout
<<
"<NULL>"
;
return
ndbout
;
}
NdbRecAttr
tmprec
;
tmprec
.
setup
(
desc
.
m_column
,
(
char
*
)
data
.
void_value
);
ndbout
<<
tmprec
;
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
)
{
ndbout
<<
tuple
.
getTable
()
->
getTableName
()
<<
"; "
;
for
(
int
i
=
0
;
i
<
tuple
.
getNoOfAttributes
();
i
++
)
{
AttributeData
*
attr_data
=
tuple
.
getData
(
i
);
const
AttributeDesc
*
attr_desc
=
tuple
.
getDesc
(
i
);
const
AttributeS
attr
=
{
attr_desc
,
*
attr_data
};
debug
<<
i
<<
" "
<<
attr_desc
->
m_column
->
getName
();
ndbout
<<
attr
;
if
(
i
!=
(
tuple
.
getNoOfAttributes
()
-
1
))
ndbout
<<
delimiter
<<
" "
;
}
// for
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logE
)
{
switch
(
logE
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
ndbout
<<
"INSERT "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_DELETE
:
ndbout
<<
"DELETE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_UPDATE
:
ndbout
<<
"UPDATE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
default:
ndbout
<<
"Unknown log entry type (not insert, delete or update)"
;
}
for
(
Uint32
i
=
0
;
i
<
logE
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
logE
[
i
];
ndbout
<<
attr
->
Desc
->
m_column
->
getName
()
<<
"="
;
ndbout
<<
(
*
attr
);
if
(
i
<
(
logE
.
size
()
-
1
))
ndbout
<<
", "
;
}
return
ndbout
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
table
){
ndbout
<<
endl
<<
"Table: "
<<
table
.
getTableName
()
<<
endl
;
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
ndbout
<<
desc
->
m_column
->
getName
()
<<
": "
<<
desc
->
m_column
->
getType
();
ndbout
<<
" key: "
<<
desc
->
m_column
->
getPrimaryKey
();
ndbout
<<
" array: "
<<
desc
->
arraySize
;
ndbout
<<
" size: "
<<
desc
->
size
<<
endl
;
}
// for
return
ndbout
;
}
ndb/src/kernel/blocks/backup/restore/Restore.hpp
View file @
0c722c90
...
...
@@ -18,13 +18,15 @@
#define RESTORE_H
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <BackupFormat.hpp>
#include <NdbApi.hpp>
#include "myVector.hpp"
#include <ndb_version.h>
#include <version.h>
static
const
char
*
delimiter
=
";"
;
// Delimiter in file dump
const
int
FileNameLenC
=
256
;
const
int
TableNameLenC
=
256
;
const
int
AttrNameLenC
=
256
;
...
...
@@ -89,19 +91,26 @@ class TupleS {
private:
friend
class
RestoreDataIterator
;
const
TableS
*
m_currentTable
;
myVector
<
AttributeS
*>
allAttributes
;
Uint32
*
dataRecord
;
const
TableS
*
m_currentTable
;
AttributeData
*
allAttrData
;
bool
prepareRecord
(
const
TableS
&
);
public:
TupleS
()
{
dataRecord
=
NULL
;};
~
TupleS
()
{
if
(
dataRecord
!=
NULL
)
delete
[]
dataRecord
;};
int
getNoOfAttributes
()
const
{
return
allAttributes
.
size
();
};
const
TableS
*
getTable
()
const
{
return
m_currentTable
;};
const
AttributeS
*
operator
[](
int
i
)
const
{
return
allAttributes
[
i
];};
Uint32
*
getDataRecord
()
{
return
dataRecord
;};
void
createDataRecord
(
Uint32
bytes
)
{
dataRecord
=
new
Uint32
[
bytes
];};
TupleS
()
{
m_currentTable
=
0
;
allAttrData
=
0
;
};
~
TupleS
()
{
if
(
allAttrData
)
delete
[]
allAttrData
;
};
TupleS
(
const
TupleS
&
tuple
);
// disable copy constructor
TupleS
&
operator
=
(
const
TupleS
&
tuple
);
int
getNoOfAttributes
()
const
;
const
TableS
*
getTable
()
const
;
const
AttributeDesc
*
getDesc
(
int
i
)
const
;
AttributeData
*
getData
(
int
i
)
const
;
};
// class TupleS
class
TableS
{
...
...
@@ -112,27 +121,23 @@ class TableS {
Uint32
schemaVersion
;
Uint32
backupVersion
;
my
Vector
<
AttributeDesc
*>
allAttributesDesc
;
my
Vector
<
AttributeDesc
*>
m_fixedKeys
;
//
my
Vector<AttributeDesc *> m_variableKey;
my
Vector
<
AttributeDesc
*>
m_fixedAttribs
;
my
Vector
<
AttributeDesc
*>
m_variableAttribs
;
Vector
<
AttributeDesc
*>
allAttributesDesc
;
Vector
<
AttributeDesc
*>
m_fixedKeys
;
//Vector<AttributeDesc *> m_variableKey;
Vector
<
AttributeDesc
*>
m_fixedAttribs
;
Vector
<
AttributeDesc
*>
m_variableAttribs
;
Uint32
m_noOfNullable
;
Uint32
m_nullBitmaskSize
;
int
pos
;
char
create_string
[
2048
];
/*
char mysqlTableName[1024];
char mysqlDatabaseName[1024];
*/
void
createAttr
(
NdbDictionary
::
Column
*
column
);
public:
class
NdbDictionary
::
Table
*
m_dictTable
;
TableS
(
class
NdbTableImpl
*
dictTable
);
~
TableS
();
Uint32
getTableId
()
const
{
return
m_dictTable
->
getTableId
();
...
...
@@ -185,18 +190,26 @@ protected:
BackupFormat
::
FileHeader
m_expectedFileHeader
;
Uint32
m_nodeId
;
Uint32
*
m_buffer
;
Uint32
m_bufferSize
;
Uint32
*
createBuffer
(
Uint32
bytes
);
void
*
m_buffer
;
void
*
m_buffer_ptr
;
Uint32
m_buffer_sz
;
Uint32
m_buffer_data_left
;
void
(
*
free_data_callback
)();
bool
openFile
();
void
setCtlFile
(
Uint32
nodeId
,
Uint32
backupId
,
const
char
*
path
);
void
setDataFile
(
const
BackupFile
&
bf
,
Uint32
no
);
void
setLogFile
(
const
BackupFile
&
bf
,
Uint32
no
);
Uint32
buffer_get_ptr
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
);
Uint32
buffer_read
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
);
Uint32
buffer_get_ptr_ahead
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
);
Uint32
buffer_read_ahead
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
);
void
setName
(
const
char
*
path
,
const
char
*
name
);
BackupFile
();
BackupFile
(
void
(
*
free_data_callback
)()
=
0
);
~
BackupFile
();
public:
bool
readHeader
();
...
...
@@ -206,12 +219,12 @@ public:
const
char
*
getFilename
()
const
{
return
m_fileName
;}
Uint32
getNodeId
()
const
{
return
m_nodeId
;}
const
BackupFormat
::
FileHeader
&
getFileHeader
()
const
{
return
m_fileHeader
;}
bool
Twiddle
(
AttributeS
*
attr
,
Uint32
arraySize
=
0
);
bool
Twiddle
(
const
AttributeDesc
*
attr_desc
,
AttributeData
*
attr_data
,
Uint32
arraySize
=
0
);
};
class
RestoreMetaData
:
public
BackupFile
{
my
Vector
<
TableS
*>
allTables
;
Vector
<
TableS
*>
allTables
;
bool
readMetaFileHeader
();
bool
readMetaTableDesc
();
...
...
@@ -224,14 +237,11 @@ class RestoreMetaData : public BackupFile {
bool
parseTableDescriptor
(
const
Uint32
*
data
,
Uint32
len
);
public:
RestoreMetaData
(
const
char
*
path
,
Uint32
nodeId
,
Uint32
bNo
);
~
RestoreMetaData
();
virtual
~
RestoreMetaData
();
int
loadContent
();
Uint32
getNoOfTables
()
const
{
return
allTables
.
size
();}
const
TableS
*
operator
[](
int
i
)
const
{
return
allTables
[
i
];}
...
...
@@ -243,20 +253,20 @@ public:
class
RestoreDataIterator
:
public
BackupFile
{
const
RestoreMetaData
&
m_metaData
;
Uint32
m_count
;
TupleS
m_tuple
;
const
TableS
*
m_currentTable
;
TupleS
m_tuple
;
public:
// Constructor
RestoreDataIterator
(
const
RestoreMetaData
&
);
~
RestoreDataIterator
();
RestoreDataIterator
(
const
RestoreMetaData
&
,
void
(
*
free_data_callback
)()
);
~
RestoreDataIterator
()
{}
;
// Read data file fragment header
bool
readFragmentHeader
(
int
&
res
);
bool
validateFragmentFooter
();
const
TupleS
*
getNextTuple
(
int
&
res
);
};
...
...
@@ -269,9 +279,35 @@ public:
};
EntryType
m_type
;
const
TableS
*
m_table
;
myVector
<
AttributeS
*>
m_values
;
Vector
<
AttributeS
*>
m_values
;
Vector
<
AttributeS
*>
m_values_e
;
AttributeS
*
add_attr
()
{
AttributeS
*
attr
;
if
(
m_values_e
.
size
()
>
0
)
{
attr
=
m_values_e
[
m_values_e
.
size
()
-
1
];
m_values_e
.
erase
(
m_values_e
.
size
()
-
1
);
}
else
{
attr
=
new
AttributeS
;
}
m_values
.
push_back
(
attr
);
return
attr
;
}
void
clear
()
{
for
(
Uint32
i
=
0
;
i
<
m_values
.
size
();
i
++
)
m_values_e
.
push_back
(
m_values
[
i
]);
m_values
.
clear
();
}
~
LogEntry
()
{
for
(
Uint32
i
=
0
;
i
<
m_values
.
size
();
i
++
)
delete
m_values
[
i
];
for
(
Uint32
i
=
0
;
i
<
m_values_e
.
size
();
i
++
)
delete
m_values_e
[
i
];
}
Uint32
size
()
const
{
return
m_values
.
size
();
}
const
AttributeS
*
operator
[](
int
i
)
const
{
return
m_values
[
i
];}
};
class
RestoreLogIterator
:
public
BackupFile
{
...
...
@@ -282,10 +318,16 @@ private:
LogEntry
m_logEntry
;
public:
RestoreLogIterator
(
const
RestoreMetaData
&
);
virtual
~
RestoreLogIterator
()
{};
const
LogEntry
*
getNextLogEntry
(
int
&
res
);
};
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
RestoreMetaData
&
);
#endif
ndb/src/kernel/blocks/backup/restore/consumer.cpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer.hpp"
#ifdef USE_MYSQL
int
BackupConsumer
::
create_table_string
(
const
TableS
&
table
,
char
*
tableName
,
char
*
buf
){
int
pos
=
0
;
int
pos2
=
0
;
char
buf2
[
2048
];
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
"CREATE TABLE "
,
tableName
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"("
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
" primary key("
);
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
// ndbout << desc->name << ": ";
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
desc
->
m_column
->
getName
(),
" "
);
switch
(
desc
->
m_column
->
getType
()){
case
NdbDictionary
:
:
Column
::
Int
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int"
);
break
;
case
NdbDictionary
:
:
Column
::
Unsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Float
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"float"
);
break
;
case
NdbDictionary
:
:
Column
::
Decimal
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"decimal"
);
break
;
case
NdbDictionary
:
:
Column
::
Char
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"char"
);
break
;
case
NdbDictionary
:
:
Column
::
Varchar
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar"
);
break
;
case
NdbDictionary
:
:
Column
::
Binary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Varbinary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigint
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigunsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Double
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"double"
);
break
;
case
NdbDictionary
:
:
Column
::
Datetime
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"datetime"
);
break
;
case
NdbDictionary
:
:
Column
::
Timespec
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"time"
);
break
;
case
NdbDictionary
:
:
Column
::
Undefined
:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
break
;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
}
if
(
desc
->
arraySize
>
1
)
{
int
attrSize
=
desc
->
arraySize
;
pos
+=
sprintf
(
buf
+
pos
,
"%s%u%s"
,
"("
,
attrSize
,
")"
);
}
if
(
desc
->
m_column
->
getPrimaryKey
())
{
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
" not null"
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s%s"
,
desc
->
m_column
->
getName
(),
","
);
}
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
","
);
}
// for
pos2
--
;
// remove trailing comma
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
")"
);
// pos--; // remove trailing comma
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
buf2
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
") type=ndbcluster"
);
return
0
;
}
#endif // USE_MYSQL
ndb/src/kernel/blocks/backup/restore/consumer.hpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_HPP
#define CONSUMER_HPP
#include "Restore.hpp"
class
BackupConsumer
{
public:
virtual
~
BackupConsumer
()
{
}
virtual
bool
init
()
{
return
true
;}
virtual
bool
table
(
const
TableS
&
){
return
true
;}
virtual
void
tuple
(
const
TupleS
&
){}
virtual
void
tuple_free
(){}
virtual
void
endOfTuples
(){}
virtual
void
logEntry
(
const
LogEntry
&
){}
virtual
void
endOfLogEntrys
(){}
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_printer.hpp"
bool
BackupPrinter
::
table
(
const
TableS
&
tab
)
{
if
(
m_print
||
m_print_meta
)
{
m_ndbout
<<
tab
;
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
void
BackupPrinter
::
tuple
(
const
TupleS
&
tup
)
{
m_dataCount
++
;
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupPrinter
::
logEntry
(
const
LogEntry
&
logE
)
{
if
(
m_print
||
m_print_log
)
m_ndbout
<<
logE
<<
endl
;
m_logCount
++
;
}
void
BackupPrinter
::
endOfLogEntrys
()
{
if
(
m_print
||
m_print_log
)
{
ndbout
<<
"Printed "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
" to stdout."
<<
endl
;
}
}
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_PRINTER_HPP
#define CONSUMER_PRINTER_HPP
#include "consumer.hpp"
class
BackupPrinter
:
public
BackupConsumer
{
NdbOut
&
m_ndbout
;
public:
BackupPrinter
(
NdbOut
&
out
=
ndbout
)
:
m_ndbout
(
out
)
{
m_print
=
false
;
m_print_log
=
false
;
m_print_data
=
false
;
m_print_meta
=
false
;
}
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfTuples
()
{};
virtual
void
endOfLogEntrys
();
bool
m_print
;
bool
m_print_log
;
bool
m_print_data
;
bool
m_print_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
void
callback
(
int
,
NdbConnection
*
,
void
*
);
bool
BackupRestore
::
init
()
{
release
();
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
err
<<
"Failed to connect to ndb!!"
<<
endl
;
return
false
;
}
info
<<
"Connected to ndb!!"
<<
endl
;
m_callback
=
new
restore_callback_t
[
m_parallelism
];
if
(
m_callback
==
0
)
{
err
<<
"Failed to allocate callback structs"
<<
endl
;
return
false
;
}
m_tuples
=
new
TupleS
[
m_parallelism
];
if
(
m_tuples
==
0
)
{
err
<<
"Failed to allocate tuples"
<<
endl
;
return
false
;
}
m_free_callback
=
m_callback
;
for
(
Uint32
i
=
0
;
i
<
m_parallelism
;
i
++
)
{
m_callback
[
i
].
restore
=
this
;
m_callback
[
i
].
connection
=
0
;
m_callback
[
i
].
tup
=
&
m_tuples
[
i
];
if
(
i
>
0
)
m_callback
[
i
-
1
].
next
=
&
(
m_callback
[
i
]);
}
m_callback
[
m_parallelism
-
1
].
next
=
0
;
return
true
;
}
void
BackupRestore
::
release
()
{
if
(
m_ndb
)
{
delete
m_ndb
;
m_ndb
=
0
;
}
if
(
m_callback
)
{
delete
[]
m_callback
;
m_callback
=
0
;
}
if
(
m_tuples
)
{
delete
[]
m_tuples
;
m_tuples
=
0
;
}
}
BackupRestore
::~
BackupRestore
()
{
release
();
}
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
return
true
;
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
return
;
restore_callback_t
*
cb
=
m_free_callback
;
if
(
cb
==
0
)
assert
(
false
);
m_free_callback
=
cb
->
next
;
cb
->
retries
=
0
;
*
(
cb
->
tup
)
=
tup
;
// must do copy!
tuple_a
(
cb
);
if
(
m_free_callback
==
0
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
1
);
}
}
void
BackupRestore
::
tuple_a
(
restore_callback_t
*
cb
)
{
while
(
cb
->
retries
<
10
)
{
/**
* start transactions
*/
cb
->
connection
=
m_ndb
->
startTransaction
();
if
(
cb
->
connection
==
NULL
)
{
/*
if (errorHandler(cb))
{
continue;
}
*/
exitHandler
();
}
// if
const
TupleS
&
tup
=
*
(
cb
->
tup
);
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
cb
->
connection
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// if
int
ret
=
0
;
for
(
int
j
=
0
;
j
<
2
;
j
++
)
{
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeDesc
*
attr_desc
=
tup
.
getDesc
(
i
);
const
AttributeData
*
attr_data
=
tup
.
getData
(
i
);
int
size
=
attr_desc
->
size
;
int
arraySize
=
attr_desc
->
arraySize
;
char
*
dataPtr
=
attr_data
->
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr_desc
->
m_column
->
getPrimaryKey
())
{
if
(
j
==
1
)
continue
;
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
}
else
{
if
(
j
==
0
)
continue
;
if
(
attr_data
->
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
}
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
attr_desc
->
m_column
->
getType
());
break
;
}
}
if
(
ret
<
0
)
break
;
}
if
(
ret
<
0
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb
->
connection
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
m_transactions
++
;
return
;
}
err
<<
"Unable to recover from errors. Exiting..."
<<
endl
;
exitHandler
();
}
void
BackupRestore
::
cback
(
int
result
,
restore_callback_t
*
cb
)
{
m_transactions
--
;
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
errorHandler
(
cb
))
tuple_a
(
cb
);
// retry
else
{
err
<<
"Restore: Failed to restore data due to a unrecoverable error. Exiting..."
<<
endl
;
exitHandler
();
}
}
else
{
/**
* OK! close transaction
*/
m_ndb
->
closeTransaction
(
cb
->
connection
);
cb
->
connection
=
0
;
cb
->
next
=
m_free_callback
;
m_free_callback
=
cb
;
m_dataCount
++
;
}
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
bool
BackupRestore
::
errorHandler
(
restore_callback_t
*
cb
)
{
NdbError
error
=
cb
->
connection
->
getNdbError
();
m_ndb
->
closeTransaction
(
cb
->
connection
);
cb
->
connection
=
0
;
cb
->
retries
++
;
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
err
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
err
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
void
BackupRestore
::
exitHandler
()
{
release
();
exit
(
-
1
);
}
void
BackupRestore
::
tuple_free
()
{
if
(
!
m_restore
)
return
;
if
(
m_transactions
>
0
)
{
// Send all transactions to NDB
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
while
(
m_transactions
>
0
)
m_ndb
->
pollNdb
(
3000
,
m_transactions
);
}
}
void
BackupRestore
::
endOfTuples
()
{
tuple_free
();
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
err
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
err
<<
"Cannot get operation: "
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
err
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
Uint32
i
=
0
;
i
<
tup
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
const
int
ret
=
trans
->
execute
(
Commit
);
if
(
ret
!=
0
)
{
// Both insert update and delete can fail during log running
// and it's ok
// TODO: check that the error is either tuple exists or tuple does not exist?
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
break
;
case
LogEntry
:
:
LE_UPDATE
:
break
;
case
LogEntry
:
:
LE_DELETE
:
break
;
}
if
(
false
)
{
err
<<
"execute failed: "
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
}
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
!
m_restore
)
return
;
info
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cb
=
(
restore_callback_t
*
)
aObject
;
(
cb
->
restore
)
->
cback
(
result
,
cb
);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_RESTORE_HPP
#define CONSUMER_RESTORE_HPP
#include "consumer.hpp"
struct
restore_callback_t
{
class
BackupRestore
*
restore
;
class
TupleS
*
tup
;
class
NdbConnection
*
connection
;
int
retries
;
restore_callback_t
*
next
;
};
class
BackupRestore
:
public
BackupConsumer
{
public:
BackupRestore
(
Uint32
parallelism
=
1
)
{
m_ndb
=
0
;
m_logCount
=
m_dataCount
=
0
;
m_restore
=
false
;
m_restore_meta
=
false
;
m_parallelism
=
parallelism
;
m_callback
=
0
;
m_tuples
=
0
;
m_free_callback
=
0
;
m_transactions
=
0
;
}
virtual
~
BackupRestore
();
virtual
bool
init
();
virtual
void
release
();
virtual
bool
table
(
const
TableS
&
);
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
tuple_free
();
virtual
void
tuple_a
(
restore_callback_t
*
cb
);
virtual
void
cback
(
int
result
,
restore_callback_t
*
cb
);
virtual
bool
errorHandler
(
restore_callback_t
*
cb
);
virtual
void
exitHandler
();
virtual
void
endOfTuples
();
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfLogEntrys
();
void
connectToMysql
();
Ndb
*
m_ndb
;
bool
m_restore
;
bool
m_restore_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
Uint32
m_parallelism
;
Uint32
m_transactions
;
TupleS
*
m_tuples
;
restore_callback_t
*
m_callback
;
restore_callback_t
*
m_free_callback
;
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
0 → 100644
View file @
0c722c90
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
);
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
);
bool
BackupRestore
::
init
()
{
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
ndbout
<<
"Failed to connect to ndb!!"
<<
endl
;
return
false
;
}
ndbout
<<
"Connected to ndb!!"
<<
endl
;
#if USE_MYSQL
if
(
use_mysql
)
{
if
(
mysql_thread_safe
()
==
0
)
{
ndbout
<<
"Not thread safe mysql library..."
<<
endl
;
exit
(
-
1
);
}
ndbout
<<
"Connecting to MySQL..."
<<
endl
;
/**
* nwe param:
* port
* host
* user
*/
bool
returnValue
=
true
;
mysql_init
(
&
mysql
);
{
int
portNo
=
3306
;
if
(
mysql_real_connect
(
&
mysql
,
ga_host
,
ga_user
,
ga_password
,
ga_database
,
ga_port
,
::
ga_socket
,
0
)
==
NULL
)
{
ndbout_c
(
"Connect failed: %s"
,
mysql_error
(
&
mysql
));
returnValue
=
false
;
}
ndbout
<<
"Connected to MySQL!!!"
<<
endl
;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return
returnValue
;
}
#endif
if
(
m_callback
)
{
delete
[]
m_callback
;
m_callback
=
0
;
}
m_callback
=
new
restore_callback_t
[
m_parallelism
];
if
(
m_callback
==
0
)
{
ndbout
<<
"Failed to allocate callback structs"
<<
endl
;
return
false
;
}
m_free_callback
=
m_callback
;
for
(
int
i
=
0
;
i
<
m_parallelism
;
i
++
)
{
m_callback
[
i
].
restore
=
this
;
m_callback
[
i
].
connection
=
0
;
m_callback
[
i
].
retries
=
0
;
if
(
i
>
0
)
m_callback
[
i
-
1
].
next
=
&
(
m_callback
[
i
]);
}
m_callback
[
m_parallelism
-
1
].
next
=
0
;
return
true
;
}
BackupRestore
::~
BackupRestore
()
{
if
(
m_ndb
!=
0
)
delete
m_ndb
;
if
(
m_callback
)
delete
[]
m_callback
;
}
#ifdef USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
,
MYSQL
*
mysqlp
){
if
(
!
m_restore_meta
)
{
return
true
;
}
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
table
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
/*ignore return value. mysql_select_db will trap errors anyways*/
if
(
mysql_query
(
mysqlp
,
stmtCreateDB
)
==
0
)
{
//ndbout_c("%s", stmtCreateDB);
}
if
(
mysql_select_db
(
&
mysql
,
database
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
char
buf
[
2048
];
/**
* create table ddl
*/
if
(
create_table_string
(
table
,
tableName
,
buf
))
{
ndbout_c
(
"Unable to create a table definition since the "
"backup contains undefined types"
);
return
false
;
}
//ndbout_c("%s", buf);
if
(
mysql_query
(
mysqlp
,
buf
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
else
{
ndbout_c
(
"Successfully restored table %s into database %s"
,
tableName
,
database
);
}
return
true
;
}
#endif
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
{
return
true
;
}
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
{
delete
&
tup
;
return
;
}
restore_callback_t
*
cb
=
m_free_callback
;
if
(
cb
)
{
m_free_callback
=
cb
->
next
;
cb
->
retries
=
0
;
cb
->
tup
=
&
tup
;
tuple_a
(
cb
);
}
if
(
m_free_callback
==
0
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
1
);
}
}
void
BackupRestore
::
tuple_a
(
restore_callback_t
*
cb
)
{
while
(
cb
->
retries
<
10
)
{
/**
* start transactions
*/
cb
->
connection
=
m_ndb
->
startTransaction
();
if
(
cb
->
connection
==
NULL
)
{
/*
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
continue;
}
*/
asynchExitHandler
();
}
// if
const
TupleS
&
tup
=
*
(
cb
->
tup
);
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
cb
->
connection
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
Uint32
ret
=
0
;
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
{
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
}
else
{
if
(
attr
->
Data
.
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
}
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
break
;
}
asynchExitHandler
();
}
}
if
(
ret
<
0
)
continue
;
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb
->
connection
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
m_transactions
++
;
}
ndbout_c
(
"Unable to recover from errors. Exiting..."
);
asynchExitHandler
();
}
void
BackupRestore
::
cback
(
int
result
,
restore_callback_t
*
cb
)
{
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
tuple_a
(
cb
);
}
else
{
ndbout_c
(
"Restore: Failed to restore data "
"due to a unrecoverable error. Exiting..."
);
delete
m_ndb
;
delete
cb
->
tup
;
exit
(
-
1
);
}
}
else
{
/**
* OK! close transaction
*/
m_ndb
->
closeTransaction
(
cb
->
connection
);
delete
cb
->
tup
;
m_transactions
--
;
}
}
void
BackupRestore
::
asynchExitHandler
()
{
if
(
m_ndb
!=
NULL
)
delete
m_ndb
;
exit
(
-
1
);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
void
BackupRestore
::
endOfTuples
()
{
if
(
!
m_restore
)
return
;
// Send all transactions to NDB
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
m_ndb
->
pollNdb
(
3000
,
m_transactions
);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
ndbout
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
tup
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
.
m_values
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
#if 1
trans
->
execute
(
Commit
);
#else
const
int
ret
=
trans
->
execute
(
Commit
);
// Both insert update and delete can fail during log running
// and it's ok
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
#endif
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
m_restore
)
{
ndbout
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cb
=
(
restore_callback_t
*
)
aObject
;
(
cb
->
restore
)
->
cback
(
result
,
cb
);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
)
{
NdbError
error
=
trans
->
getNdbError
();
ndb
->
closeTransaction
(
trans
);
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
ndbout
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
ndbout
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
ndb/src/kernel/blocks/backup/restore/main.cpp
View file @
0c722c90
...
...
@@ -14,62 +14,26 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Restore.hpp"
#include <getarg.h>
#include <NdbSleep.h>
#include <Vector.hpp>
#include <ndb_limits.h>
#include <NdbTCP.h>
#ifdef USE_MYSQL
#include <mysql.h>
#endif
#include <NdbOut.hpp>
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logEntry
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
RestoreMetaData
&
);
#include "consumer_restore.hpp"
#include "consumer_printer.hpp"
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
const
char
*
delimiter
=
";"
;
// Delimiter in file dump
static
int
ga_nodeId
=
0
;
static
int
ga_nParallelism
=
1
;
static
int
ga_nParallelism
=
1
28
;
static
int
ga_backupId
=
0
;
static
bool
ga_dont_ignore_systab_0
=
false
;
static
myVector
<
class
BackupConsumer
*>
g_consumers
;
#ifdef USE_MYSQL
/**
* mysql specific stuff:
*/
static
const
char
*
ga_user
=
"root"
;
static
const
char
*
ga_host
=
"localhost"
;
static
const
char
*
ga_socket
=
"/tmp/mysql.sock"
;
static
const
char
*
ga_password
=
""
;
static
const
char
*
ga_database
=
""
;
static
int
ga_port
=
3306
;
static
bool
use_mysql
=
false
;
static
MYSQL
mysql
;
#endif
#ifdef NDB_WIN32
static
const
char
*
ga_backupPath
=
".
\\
"
;
#else
static
const
char
*
ga_backupPath
=
"./"
;
#endif
static
Vector
<
class
BackupConsumer
*>
g_consumers
;
typedef
struct
{
void
*
ndb
;
void
*
restore
;
TupleS
*
tup
;
int
transaction
;
int
retries
;
}
restore_callback_t
;
static
const
char
*
ga_backupPath
=
"."
DIR_SEPARATOR
;
static
const
char
*
ga_connect_NDB
=
NULL
;
...
...
@@ -78,102 +42,9 @@ static const char* ga_connect_NDB = NULL;
*/
static
bool
ga_restore
=
false
;
static
bool
ga_print
=
false
;
class
BackupConsumer
{
public:
virtual
bool
init
()
{
return
true
;}
virtual
bool
table
(
const
TableS
&
){
return
true
;}
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
)
{
return
true
;};
#endif
virtual
void
tuple
(
const
TupleS
&
){}
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
)
{};
// virtual bool asynchErrorHandler(NdbConnection * trans){return true;};
virtual
void
asynchExitHandler
(){};
virtual
void
endOfTuples
(){}
virtual
void
logEntry
(
const
LogEntry
&
){}
virtual
void
endOfLogEntrys
(){}
protected:
#ifdef USE_MYSQL
int
create_table_string
(
const
TableS
&
table
,
char
*
,
char
*
);
#endif
};
class
BackupPrinter
:
public
BackupConsumer
{
NdbOut
&
m_ndbout
;
public:
BackupPrinter
(
NdbOut
&
out
=
ndbout
)
:
m_ndbout
(
out
)
{
m_print
=
false
;
m_print_log
=
false
;
m_print_data
=
false
;
m_print_meta
=
false
;
}
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfTuples
()
{};
virtual
void
endOfLogEntrys
();
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
);
bool
m_print
;
bool
m_print_log
;
bool
m_print_data
;
bool
m_print_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
class
BackupRestore
:
public
BackupConsumer
{
public:
BackupRestore
()
{
m_ndb
=
0
;
m_logCount
=
m_dataCount
=
0
;
m_restore
=
false
;
m_restore_meta
=
false
;
}
virtual
~
BackupRestore
();
virtual
bool
init
();
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
);
virtual
void
asynchExitHandler
();
virtual
void
endOfTuples
();
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfLogEntrys
();
void
connectToMysql
();
Ndb
*
m_ndb
;
bool
m_restore
;
bool
m_restore_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
bool
readArguments
(
const
int
argc
,
const
char
**
argv
)
{
BackupPrinter
*
printer
=
new
BackupPrinter
();
if
(
printer
==
NULL
)
return
false
;
BackupRestore
*
restore
=
new
BackupRestore
();
if
(
restore
==
NULL
)
{
delete
printer
;
return
false
;
}
int
_print
=
0
;
int
_print_meta
=
0
;
...
...
@@ -236,10 +107,18 @@ readArguments(const int argc, const char** argv)
ga_nParallelism
<
1
||
ga_nParallelism
>
1024
)
{
arg_printusage
(
args
,
num_args
,
argv
[
0
],
"<path to backup files>
\n
"
);
return
false
;
}
BackupPrinter
*
printer
=
new
BackupPrinter
();
if
(
printer
==
NULL
)
return
false
;
BackupRestore
*
restore
=
new
BackupRestore
(
ga_nParallelism
);
if
(
restore
==
NULL
)
{
delete
printer
;
delete
restore
;
return
false
;
}
...
...
@@ -282,11 +161,11 @@ readArguments(const int argc, const char** argv)
}
{
BackupConsumer
*
c
=
printer
;
BackupConsumer
*
c
=
printer
;
g_consumers
.
push_back
(
c
);
}
{
BackupConsumer
*
c
=
restore
;
BackupConsumer
*
c
=
restore
;
g_consumers
.
push_back
(
c
);
}
// Set backup file path
...
...
@@ -294,20 +173,6 @@ readArguments(const int argc, const char** argv)
{
ga_backupPath
=
argv
[
optind
];
}
#ifdef USE_MYSQL
if
(
use_mysql
)
{
ga_dont_ignore_systab_0
=
false
;
ga_database
=
""
;
//not used yet. pethaps later if we want to
// restore meta data in an existing mysql database,
// and not just restore it to the same database
// as when the backup was taken.
// If implementing this, then the
// tupleAsynch must also be changed so that the
// table data is restored to the correct table.
// also, mysql_select_db must be set properly (ie.,
// ignored in codw below)
}
#endif
return
true
;
}
...
...
@@ -316,15 +181,12 @@ readArguments(const int argc, const char** argv)
void
clearConsumers
()
{
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
delete
g_consumers
[
i
];
g_consumers
.
clear
();
}
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
);
static
NdbConnection
*
asynchTrans
[
1024
];
bool
static
bool
checkSysTable
(
const
char
*
tableName
)
{
return
ga_dont_ignore_systab_0
||
...
...
@@ -334,6 +196,12 @@ checkSysTable(const char *tableName)
strcmp
(
tableName
,
"sys/def/NDB$EVENTS_0"
)
!=
0
);
}
static
void
free_data_callback
()
{
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
tuple_free
();
}
int
main
(
int
argc
,
const
char
**
argv
)
...
...
@@ -343,6 +211,12 @@ main(int argc, const char** argv)
return
-
1
;
}
if
(
ga_connect_NDB
!=
NULL
)
{
// Use connection string
Ndb
::
setConnectString
(
ga_connect_NDB
);
}
/**
* we must always load meta data, even if we will only print it to stdout
*/
...
...
@@ -377,7 +251,7 @@ main(int argc, const char** argv)
}
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
{
if
(
!
g_consumers
[
i
]
->
init
())
{
...
...
@@ -391,36 +265,22 @@ main(int argc, const char** argv)
{
if
(
checkSysTable
(
metaData
[
i
]
->
getTableName
()))
{
for
(
int
j
=
0
;
j
<
g_consumers
.
size
();
j
++
)
#ifdef USE_MYSQL
if
(
use_mysql
)
{
if
(
!
g_consumers
[
j
]
->
table
(
*
metaData
[
i
],
&
mysql
))
{
ndbout_c
(
"Restore: Failed to restore table: %s. "
"Exiting..."
,
metaData
[
i
]
->
getTableName
());
return
-
11
;
}
}
else
#endif
if
(
!
g_consumers
[
j
]
->
table
(
*
metaData
[
i
]))
{
ndbout_c
(
"Restore: Failed to restore table: %s. "
"Exiting..."
,
metaData
[
i
]
->
getTableName
());
return
-
11
;
}
for
(
Uint32
j
=
0
;
j
<
g_consumers
.
size
();
j
++
)
if
(
!
g_consumers
[
j
]
->
table
(
*
metaData
[
i
]))
{
ndbout_c
(
"Restore: Failed to restore table: %s. "
"Exiting..."
,
metaData
[
i
]
->
getTableName
());
return
-
11
;
}
}
}
if
(
ga_restore
||
ga_print
)
{
if
(
ga_restore
)
{
RestoreDataIterator
dataIter
(
metaData
);
RestoreDataIterator
dataIter
(
metaData
,
&
free_data_callback
);
// Read data file header
if
(
!
dataIter
.
readHeader
())
...
...
@@ -430,19 +290,15 @@ main(int argc, const char** argv)
}
while
(
dataIter
.
readFragmentHeader
(
res
))
while
(
dataIter
.
readFragmentHeader
(
res
=
0
))
{
const
TupleS
*
tuple
=
0
;
while
((
tuple
=
dataIter
.
getNextTuple
(
res
))
!=
NULL
)
const
TupleS
*
tuple
;
while
((
tuple
=
dataIter
.
getNextTuple
(
res
=
1
))
!=
0
)
{
if
(
checkSysTable
(
tuple
->
getTable
()
->
getTableName
()))
{
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
{
g_consumers
[
i
]
->
tupleAsynch
(
*
tuple
,
0
);
}
}
}
while
(
tuple
!=
NULL
);
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
tuple
(
*
tuple
);
}
// while (tuple != NULL);
if
(
res
<
0
)
{
...
...
@@ -459,44 +315,37 @@ main(int argc, const char** argv)
if
(
res
<
0
)
{
ndbout_c
(
"Restore: An error occured while restoring data. "
"Exiting..."
);
err
<<
"Restore: An error occured while restoring data. Exiting... res="
<<
res
<<
endl
;
return
-
1
;
}
dataIter
.
validateFooter
();
//not implemented
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
endOfTuples
();
RestoreLogIterator
logIter
(
metaData
);
if
(
!
logIter
.
readHeader
())
{
ndbout
<<
"Failed to read header of data file. Exiting..."
;
err
<<
"Failed to read header of data file. Exiting..."
<<
endl
;
return
-
1
;
}
/**
* I have not touched the part below : -johan 040218
* except fixing return values.
*/
const
LogEntry
*
logEntry
=
0
;
while
((
logEntry
=
logIter
.
getNextLogEntry
(
res
))
)
while
((
logEntry
=
logIter
.
getNextLogEntry
(
res
=
0
))
!=
0
)
{
if
(
checkSysTable
(
logEntry
->
m_table
->
getTableName
()))
{
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
logEntry
(
*
logEntry
);
}
}
if
(
res
<
0
)
{
ndbout_c
(
"Restore: An restoring the data log"
"Exiting..."
);
err
<<
"Restore: An restoring the data log. Exiting... res="
<<
res
<<
endl
;
return
-
1
;
}
logIter
.
validateFooter
();
//not implemented
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
for
(
Uint32
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
endOfLogEntrys
();
}
}
...
...
@@ -504,893 +353,3 @@ main(int argc, const char** argv)
return
1
;
}
// main
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
AttributeS
&
attr
){
const
AttributeData
&
data
=
attr
.
Data
;
const
AttributeDesc
&
desc
=
*
attr
.
Desc
;
if
(
data
.
null
)
{
ndbout
<<
"<NULL>"
;
return
ndbout
;
}
NdbRecAttr
tmprec
;
tmprec
.
setup
(
desc
.
m_column
,
(
char
*
)
data
.
void_value
);
ndbout
<<
tmprec
;
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
)
{
ndbout
<<
tuple
.
getTable
()
->
getTableName
()
<<
"; "
;
for
(
int
i
=
0
;
i
<
tuple
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tuple
[
i
];
debug
<<
i
<<
" "
<<
attr
->
Desc
->
m_column
->
getName
();
ndbout
<<
(
*
attr
);
if
(
i
!=
(
tuple
.
getNoOfAttributes
()
-
1
))
ndbout
<<
delimiter
<<
" "
;
}
// for
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logE
)
{
switch
(
logE
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
ndbout
<<
"INSERT "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_DELETE
:
ndbout
<<
"DELETE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_UPDATE
:
ndbout
<<
"UPDATE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
default:
ndbout
<<
"Unknown log entry type (not insert, delete or update)"
;
}
for
(
int
i
=
0
;
i
<
logE
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
logE
.
m_values
[
i
];
ndbout
<<
attr
->
Desc
->
m_column
->
getName
()
<<
"="
;
ndbout
<<
(
*
attr
);
if
(
i
<
(
logE
.
m_values
.
size
()
-
1
))
ndbout
<<
", "
;
}
return
ndbout
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
table
){
ndbout
<<
endl
<<
"Table: "
<<
table
.
getTableName
()
<<
endl
;
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
ndbout
<<
desc
->
m_column
->
getName
()
<<
": "
<<
desc
->
m_column
->
getType
();
ndbout
<<
" key: "
<<
desc
->
m_column
->
getPrimaryKey
();
ndbout
<<
" array: "
<<
desc
->
arraySize
;
ndbout
<<
" size: "
<<
desc
->
size
<<
endl
;
}
// for
return
ndbout
;
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
bool
BackupPrinter
::
table
(
const
TableS
&
tab
)
{
if
(
m_print
||
m_print_meta
)
{
m_ndbout
<<
tab
;
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#ifdef USE_MYSQL
bool
BackupPrinter
::
table
(
const
TableS
&
tab
,
MYSQL
*
mysql
)
{
if
(
m_print
||
m_print_meta
)
{
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
tab
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
ndbout_c
(
"%s"
,
stmtCreateDB
);
char
buf
[
2048
];
create_table_string
(
tab
,
tableName
,
buf
);
ndbout_c
(
"%s"
,
buf
);
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#endif
void
BackupPrinter
::
tuple
(
const
TupleS
&
tup
)
{
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupPrinter
::
logEntry
(
const
LogEntry
&
logE
)
{
if
(
m_print
||
m_print_log
)
m_ndbout
<<
logE
<<
endl
;
m_logCount
++
;
}
bool
BackupRestore
::
init
()
{
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
if
(
ga_connect_NDB
!=
NULL
)
{
// Use connection string
Ndb
::
setConnectString
(
ga_connect_NDB
);
}
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
ndbout
<<
"Failed to connect to ndb!!"
<<
endl
;
delete
m_ndb
;
return
false
;
}
ndbout
<<
"Connected to ndb!!"
<<
endl
;
#if USE_MYSQL
if
(
use_mysql
)
{
if
(
mysql_thread_safe
()
==
0
)
{
ndbout
<<
"Not thread safe mysql library..."
<<
endl
;
exit
(
-
1
);
}
ndbout
<<
"Connecting to MySQL..."
<<
endl
;
/**
* nwe param:
* port
* host
* user
*/
bool
returnValue
=
true
;
mysql_init
(
&
mysql
);
{
int
portNo
=
3306
;
if
(
mysql_real_connect
(
&
mysql
,
ga_host
,
ga_user
,
ga_password
,
ga_database
,
ga_port
,
ga_socket
,
0
)
==
NULL
)
{
ndbout_c
(
"Connect failed: %s"
,
mysql_error
(
&
mysql
));
returnValue
=
false
;
}
ndbout
<<
"Connected to MySQL!!!"
<<
endl
;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return
returnValue
;
}
#endif
return
true
;
}
BackupRestore
::~
BackupRestore
()
{
if
(
m_ndb
!=
0
)
delete
m_ndb
;
}
#ifdef USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
,
MYSQL
*
mysqlp
){
if
(
!
m_restore_meta
)
{
return
true
;
}
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
table
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
/*ignore return value. mysql_select_db will trap errors anyways*/
if
(
mysql_query
(
mysqlp
,
stmtCreateDB
)
==
0
)
{
//ndbout_c("%s", stmtCreateDB);
}
if
(
mysql_select_db
(
&
mysql
,
database
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
char
buf
[
2048
];
/**
* create table ddl
*/
if
(
create_table_string
(
table
,
tableName
,
buf
))
{
ndbout_c
(
"Unable to create a table definition since the "
"backup contains undefined types"
);
return
false
;
}
//ndbout_c("%s", buf);
if
(
mysql_query
(
mysqlp
,
buf
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
else
{
ndbout_c
(
"Successfully restored table %s into database %s"
,
tableName
,
database
);
}
return
true
;
}
int
BackupConsumer
::
create_table_string
(
const
TableS
&
table
,
char
*
tableName
,
char
*
buf
){
int
pos
=
0
;
int
pos2
=
0
;
char
buf2
[
2048
];
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
"CREATE TABLE "
,
tableName
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"("
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
" primary key("
);
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
// ndbout << desc->name << ": ";
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
desc
->
m_column
->
getName
(),
" "
);
switch
(
desc
->
m_column
->
getType
()){
case
NdbDictionary
:
:
Column
::
Int
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int"
);
break
;
case
NdbDictionary
:
:
Column
::
Unsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Float
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"float"
);
break
;
case
NdbDictionary
:
:
Column
::
Decimal
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"decimal"
);
break
;
case
NdbDictionary
:
:
Column
::
Char
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"char"
);
break
;
case
NdbDictionary
:
:
Column
::
Varchar
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar"
);
break
;
case
NdbDictionary
:
:
Column
::
Binary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Varbinary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigint
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigunsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Double
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"double"
);
break
;
case
NdbDictionary
:
:
Column
::
Datetime
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"datetime"
);
break
;
case
NdbDictionary
:
:
Column
::
Timespec
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"time"
);
break
;
case
NdbDictionary
:
:
Column
::
Undefined
:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
break
;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
}
if
(
desc
->
arraySize
>
1
)
{
int
attrSize
=
desc
->
arraySize
;
pos
+=
sprintf
(
buf
+
pos
,
"%s%u%s"
,
"("
,
attrSize
,
")"
);
}
if
(
desc
->
m_column
->
getPrimaryKey
())
{
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
" not null"
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s%s"
,
desc
->
m_column
->
getName
(),
","
);
}
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
","
);
}
// for
pos2
--
;
// remove trailing comma
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
")"
);
// pos--; // remove trailing comma
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
buf2
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
") type=ndbcluster"
);
return
0
;
}
#endif // USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
{
return
true
;
}
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cbData
=
(
restore_callback_t
*
)
aObject
;
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
asynchErrorHandler
(
trans
,
(
Ndb
*
)
cbData
->
ndb
))
{
((
Ndb
*
)
cbData
->
ndb
)
->
closeTransaction
(
asynchTrans
[
cbData
->
transaction
]);
cbData
->
retries
++
;
((
BackupRestore
*
)
cbData
)
->
tupleAsynch
(
*
(
TupleS
*
)(
cbData
->
tup
),
cbData
);
}
else
{
ndbout_c
(
"Restore: Failed to restore data "
"due to a unrecoverable error. Exiting..."
);
delete
(
Ndb
*
)
cbData
->
ndb
;
delete
cbData
->
tup
;
delete
cbData
;
exit
(
-
1
);
}
}
else
{
/**
* OK! close transaction
*/
((
Ndb
*
)
cbData
->
ndb
)
->
closeTransaction
(
asynchTrans
[
cbData
->
transaction
]);
delete
cbData
->
tup
;
delete
cbData
;
}
}
static
int
nPreparedTransactions
=
0
;
void
BackupPrinter
::
tupleAsynch
(
const
TupleS
&
tup
,
restore_callback_t
*
callback
)
{
m_dataCount
++
;
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupRestore
::
tupleAsynch
(
const
TupleS
&
tup
,
restore_callback_t
*
cbData
)
{
if
(
!
m_restore
)
{
delete
&
tup
;
return
;
}
Uint32
retries
;
if
(
cbData
!=
0
)
retries
=
cbData
->
retries
;
else
retries
=
0
;
while
(
retries
<
10
)
{
/**
* start transactions
*/
asynchTrans
[
nPreparedTransactions
]
=
m_ndb
->
startTransaction
();
if
(
asynchTrans
[
nPreparedTransactions
]
==
NULL
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
asynchTrans
[
nPreparedTransactions
]
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
Uint32
ret
=
0
;
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
{
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
}
}
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
!
attr
->
Desc
->
m_column
->
getPrimaryKey
())
if
(
attr
->
Data
.
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
}
restore_callback_t
*
cb
;
if
(
cbData
==
0
)
{
cb
=
new
restore_callback_t
;
cb
->
retries
=
0
;
}
else
cb
=
cbData
;
cb
->
ndb
=
m_ndb
;
cb
->
restore
=
this
;
cb
->
tup
=
(
TupleS
*
)
&
tup
;
cb
->
transaction
=
nPreparedTransactions
;
// Prepare transaction (the transaction is NOT yet sent to NDB)
asynchTrans
[
nPreparedTransactions
]
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
if
(
nPreparedTransactions
==
ga_nParallelism
-
1
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
ga_nParallelism
);
nPreparedTransactions
=
0
;
}
else
nPreparedTransactions
++
;
m_dataCount
++
;
return
;
}
ndbout_c
(
"Unable to recover from errors. Exiting..."
);
asynchExitHandler
();
}
void
BackupRestore
::
asynchExitHandler
()
{
if
(
m_ndb
!=
NULL
)
delete
m_ndb
;
exit
(
-
1
);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
)
{
NdbError
error
=
trans
->
getNdbError
();
ndb
->
closeTransaction
(
trans
);
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
ndbout
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
ndbout
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
return
;
while
(
1
)
{
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
// TODO: check return value and handle error
if
(
op
->
writeTuple
()
==
-
1
)
{
ndbout
<<
"writeTuple call failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
i
,
dataPtr
,
length
);
}
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
!
attr
->
Desc
->
m_column
->
getPrimaryKey
())
if
(
attr
->
Data
.
null
)
op
->
setValue
(
i
,
NULL
,
0
);
else
op
->
setValue
(
i
,
dataPtr
,
length
);
}
int
ret
=
trans
->
execute
(
Commit
);
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
m_ndb
->
closeTransaction
(
trans
);
if
(
ret
==
0
)
break
;
}
m_dataCount
++
;
}
void
BackupRestore
::
endOfTuples
()
{
if
(
!
m_restore
)
return
;
// Send all transactions to NDB
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
m_ndb
->
pollNdb
(
3000
,
nPreparedTransactions
);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
nPreparedTransactions
=
0
;
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
ndbout
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
tup
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
.
m_values
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
#if 1
trans
->
execute
(
Commit
);
#else
const
int
ret
=
trans
->
execute
(
Commit
);
// Both insert update and delete can fail during log running
// and it's ok
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
#endif
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
ga_restore
)
{
ndbout
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
}
void
BackupPrinter
::
endOfLogEntrys
()
{
if
(
m_print
||
m_print_log
)
{
ndbout
<<
"Printed "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
" to stdout."
<<
endl
;
}
}
ndb/src/kernel/blocks/backup/restore/myVector.hpp
deleted
100644 → 0
View file @
2e8e6bd6
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef MY_VECTOR_HPP
#define MY_VECTOR_HPP
// Template class for std::vector-like class (hopefully works in OSE)
template
<
class
T
>
class
myVector
{
// Note that last element in array is used for end() and is always empty
int
sizeIncrement
;
int
thisSize
;
int
used
;
T
*
storage
;
public:
// Assignment of whole vector
myVector
<
T
>
&
operator
=
(
myVector
<
T
>
&
org
)
{
// Don't copy if they point to the same address
if
(
!
(
this
==
&
org
))
{
// Check memory space
if
(
thisSize
<
org
.
thisSize
)
{
// We have to increase memory for destination
T
*
tmpStorage
=
new
T
[
org
.
thisSize
];
delete
[]
storage
;
storage
=
tmpStorage
;
}
// if
thisSize
=
org
.
thisSize
;
sizeIncrement
=
org
.
sizeIncrement
;
used
=
org
.
used
;
for
(
int
i
=
0
;
i
<
thisSize
;
i
++
)
{
storage
[
i
]
=
org
.
storage
[
i
];
}
// for
}
// if
return
*
this
;
}
// operator=
// Construct with size s+1
myVector
(
int
s
=
1
)
:
sizeIncrement
(
5
),
// sizeIncrement(s),
thisSize
(
s
+
1
),
used
(
0
),
storage
(
new
T
[
s
+
1
])
{
}
~
myVector
()
{
delete
[]
storage
;
}
// Destructor: deallocate memory
T
&
operator
[](
int
i
)
{
// Return by index
if
((
i
<
0
)
||
(
i
>=
used
))
{
// Index error
ndbout
<<
"vector index out of range"
<<
endl
;
abort
();
return
storage
[
used
-
1
];
}
// if
else
{
return
storage
[
i
];
}
// else
}
// operator[]
const
T
&
operator
[](
int
i
)
const
{
// Return by index
if
((
i
<
0
)
||
(
i
>=
used
))
{
// Index error
ndbout
<<
"vector index out of range"
<<
endl
;
abort
();
return
storage
[
used
-
1
];
}
// if
else
{
return
storage
[
i
];
}
// else
}
// operator[]
int
getSize
()
const
{
return
used
;
}
void
push_back
(
T
&
item
)
{
if
(
used
>=
thisSize
-
1
)
{
// We have to allocate new storage
int
newSize
=
thisSize
+
sizeIncrement
;
T
*
tmpStorage
=
new
T
[
newSize
];
if
(
tmpStorage
==
NULL
)
{
// Memory allocation error! break
ndbout
<<
"PANIC: Memory allocation error in vector"
<<
endl
;
return
;
}
// if
thisSize
=
newSize
;
for
(
int
i
=
0
;
i
<
used
;
i
++
)
{
tmpStorage
[
i
]
=
storage
[
i
];
}
// for
delete
[]
storage
;
storage
=
tmpStorage
;
}
// if
// Now push
storage
[
used
]
=
item
;
used
++
;
};
// myVector<> push_back()
// Remove item at back
void
pop_back
()
{
if
(
used
>
0
)
{
used
--
;
}
// if
}
// pop_back()
int
size
()
const
{
return
used
;
};
bool
empty
()
const
{
return
(
used
==
0
);
}
void
clear
()
{
used
=
0
;
}
};
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment