Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
c4a1c72b
Commit
c4a1c72b
authored
Dec 02, 2006
by
brian@zim.(none)
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Formailized the row buffer structure, implemented new streaming format.
parent
16b79adf
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
251 additions
and
79 deletions
+251
-79
mysql-test/r/archive.result
mysql-test/r/archive.result
+1
-1
mysql-test/t/archive.test
mysql-test/t/archive.test
+1
-1
storage/archive/ha_archive.cc
storage/archive/ha_archive.cc
+225
-75
storage/archive/ha_archive.h
storage/archive/ha_archive.h
+24
-2
No files found.
mysql-test/r/archive.result
View file @
c4a1c72b
drop table if exists t1,t2,t3;
drop table if exists t1,t2,t3
,t4,t5
;
CREATE TABLE t1 (
CREATE TABLE t1 (
Period smallint(4) unsigned zerofill DEFAULT '0000' NOT NULL,
Period smallint(4) unsigned zerofill DEFAULT '0000' NOT NULL,
Varor_period smallint(4) unsigned DEFAULT '0' NOT NULL
Varor_period smallint(4) unsigned DEFAULT '0' NOT NULL
...
...
mysql-test/t/archive.test
View file @
c4a1c72b
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
--
source
include
/
have_binlog_format_mixed_or_statement
.
inc
--
source
include
/
have_binlog_format_mixed_or_statement
.
inc
--
disable_warnings
--
disable_warnings
drop
table
if
exists
t1
,
t2
,
t3
;
drop
table
if
exists
t1
,
t2
,
t3
,
t4
,
t5
;
--
enable_warnings
--
enable_warnings
CREATE
TABLE
t1
(
CREATE
TABLE
t1
(
...
...
storage/archive/ha_archive.cc
View file @
c4a1c72b
...
@@ -146,6 +146,11 @@ static handler *archive_create_handler(handlerton *hton,
...
@@ -146,6 +146,11 @@ static handler *archive_create_handler(handlerton *hton,
*/
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
/*
Size of header used for row
*/
#define ARCHIVE_ROW_HEADER_SIZE 4
static
handler
*
archive_create_handler
(
handlerton
*
hton
,
static
handler
*
archive_create_handler
(
handlerton
*
hton
,
TABLE_SHARE
*
table
,
TABLE_SHARE
*
table
,
MEM_ROOT
*
mem_root
)
MEM_ROOT
*
mem_root
)
...
@@ -248,6 +253,8 @@ int ha_archive::read_data_header(azio_stream *file_to_read)
...
@@ -248,6 +253,8 @@ int ha_archive::read_data_header(azio_stream *file_to_read)
DBUG_PRINT
(
"ha_archive::read_data_header"
,
(
"Check %u"
,
data_buffer
[
0
]));
DBUG_PRINT
(
"ha_archive::read_data_header"
,
(
"Check %u"
,
data_buffer
[
0
]));
DBUG_PRINT
(
"ha_archive::read_data_header"
,
(
"Version %u"
,
data_buffer
[
1
]));
DBUG_PRINT
(
"ha_archive::read_data_header"
,
(
"Version %u"
,
data_buffer
[
1
]));
share
->
data_version
=
(
uchar
)
data_buffer
[
1
];
if
((
data_buffer
[
0
]
!=
(
uchar
)
ARCHIVE_CHECK_HEADER
)
&&
if
((
data_buffer
[
0
]
!=
(
uchar
)
ARCHIVE_CHECK_HEADER
)
&&
(
data_buffer
[
1
]
!=
(
uchar
)
ARCHIVE_VERSION
))
(
data_buffer
[
1
]
!=
(
uchar
)
ARCHIVE_VERSION
))
...
@@ -283,6 +290,7 @@ int ha_archive::write_data_header(azio_stream *file_to_write)
...
@@ -283,6 +290,7 @@ int ha_archive::write_data_header(azio_stream *file_to_write)
*rows will contain the current number of rows in the data file upon success.
*rows will contain the current number of rows in the data file upon success.
*/
*/
int
ha_archive
::
read_meta_file
(
File
meta_file
,
ha_rows
*
rows
,
int
ha_archive
::
read_meta_file
(
File
meta_file
,
ha_rows
*
rows
,
uint
*
meta_version
,
ulonglong
*
auto_increment
,
ulonglong
*
auto_increment
,
ulonglong
*
forced_flushes
,
ulonglong
*
forced_flushes
,
char
*
real_path
)
char
*
real_path
)
...
@@ -326,6 +334,8 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
...
@@ -326,6 +334,8 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
DBUG_PRINT
(
"ha_archive::read_meta_file"
,
(
"Real Path %s"
,
real_path
));
DBUG_PRINT
(
"ha_archive::read_meta_file"
,
(
"Real Path %s"
,
real_path
));
DBUG_PRINT
(
"ha_archive::read_meta_file"
,
(
"Dirty %d"
,
(
int
)(
*
ptr
)));
DBUG_PRINT
(
"ha_archive::read_meta_file"
,
(
"Dirty %d"
,
(
int
)(
*
ptr
)));
*
meta_version
=
(
uchar
)
meta_buffer
[
1
];
if
((
meta_buffer
[
0
]
!=
(
uchar
)
ARCHIVE_CHECK_HEADER
)
||
if
((
meta_buffer
[
0
]
!=
(
uchar
)
ARCHIVE_CHECK_HEADER
)
||
((
bool
)(
*
ptr
)
==
TRUE
))
((
bool
)(
*
ptr
)
==
TRUE
))
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
...
@@ -446,7 +456,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
...
@@ -446,7 +456,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
VOID
(
pthread_mutex_init
(
&
share
->
mutex
,
MY_MUTEX_INIT_FAST
));
VOID
(
pthread_mutex_init
(
&
share
->
mutex
,
MY_MUTEX_INIT_FAST
));
if
((
share
->
meta_file
=
my_open
(
meta_file_name
,
O_RDWR
,
MYF
(
0
)))
==
-
1
)
if
((
share
->
meta_file
=
my_open
(
meta_file_name
,
O_RDWR
,
MYF
(
0
)))
==
-
1
)
share
->
crashed
=
TRUE
;
share
->
crashed
=
TRUE
;
DBUG_PRINT
(
"
info
"
,
(
"archive opening (1) up write at %s"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive opening (1) up write at %s"
,
share
->
data_file_name
));
share
->
data_file_name
));
/*
/*
...
@@ -454,6 +464,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
...
@@ -454,6 +464,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
a write.
a write.
*/
*/
if
(
read_meta_file
(
share
->
meta_file
,
&
share
->
rows_recorded
,
if
(
read_meta_file
(
share
->
meta_file
,
&
share
->
rows_recorded
,
&
share
->
meta_version
,
&
share
->
auto_increment_value
,
&
share
->
auto_increment_value
,
&
share
->
forced_flushes
,
&
share
->
forced_flushes
,
share
->
real_path
))
share
->
real_path
))
...
@@ -468,7 +479,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
...
@@ -468,7 +479,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
thr_lock_init
(
&
share
->
lock
);
thr_lock_init
(
&
share
->
lock
);
}
}
share
->
use_count
++
;
share
->
use_count
++
;
DBUG_PRINT
(
"
info
"
,
(
"archive table %.*s has %d open handles now"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive table %.*s has %d open handles now"
,
share
->
table_name_length
,
share
->
table_name
,
share
->
table_name_length
,
share
->
table_name
,
share
->
use_count
));
share
->
use_count
));
if
(
share
->
crashed
)
if
(
share
->
crashed
)
...
@@ -487,7 +498,7 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
...
@@ -487,7 +498,7 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
{
{
int
rc
=
0
;
int
rc
=
0
;
DBUG_ENTER
(
"ha_archive::free_share"
);
DBUG_ENTER
(
"ha_archive::free_share"
);
DBUG_PRINT
(
"
info
"
,
(
"archive table %.*s has %d open handles on entrance"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive table %.*s has %d open handles on entrance"
,
share
->
table_name_length
,
share
->
table_name
,
share
->
table_name_length
,
share
->
table_name
,
share
->
use_count
));
share
->
use_count
));
...
@@ -539,7 +550,7 @@ int ha_archive::init_archive_writer()
...
@@ -539,7 +550,7 @@ int ha_archive::init_archive_writer()
if
(
!
(
azopen
(
&
(
share
->
archive_write
),
share
->
data_file_name
,
if
(
!
(
azopen
(
&
(
share
->
archive_write
),
share
->
data_file_name
,
O_WRONLY
|
O_APPEND
|
O_BINARY
)))
O_WRONLY
|
O_APPEND
|
O_BINARY
)))
{
{
DBUG_PRINT
(
"
info
"
,
(
"Could not open archive write file"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"Could not open archive write file"
));
share
->
crashed
=
TRUE
;
share
->
crashed
=
TRUE
;
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
}
}
...
@@ -575,7 +586,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
...
@@ -575,7 +586,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
int
rc
=
0
;
int
rc
=
0
;
DBUG_ENTER
(
"ha_archive::open"
);
DBUG_ENTER
(
"ha_archive::open"
);
DBUG_PRINT
(
"
info
"
,
(
"archive table was opened for crash: %s"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive table was opened for crash: %s"
,
(
open_options
&
HA_OPEN_FOR_REPAIR
)
?
"yes"
:
"no"
));
(
open_options
&
HA_OPEN_FOR_REPAIR
)
?
"yes"
:
"no"
));
share
=
get_share
(
name
,
table
,
&
rc
);
share
=
get_share
(
name
,
table
,
&
rc
);
...
@@ -589,9 +600,17 @@ int ha_archive::open(const char *name, int mode, uint open_options)
...
@@ -589,9 +600,17 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN
(
rc
);
DBUG_RETURN
(
rc
);
}
}
thr_lock_data_init
(
&
share
->
lock
,
&
lock
,
NULL
);
record_buffer
=
create_record_buffer
(
table
->
s
->
reclength
);
if
(
!
record_buffer
)
{
free_share
(
share
);
DBUG_RETURN
(
HA_ERR_OUT_OF_MEM
);
}
thr_lock_data_init
(
&
share
->
lock
,
&
lock
,
NULL
);
DBUG_PRINT
(
"
info
"
,
(
"archive data_file_name %s"
,
share
->
data_file_name
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive data_file_name %s"
,
share
->
data_file_name
));
if
(
!
(
azopen
(
&
archive
,
share
->
data_file_name
,
O_RDONLY
|
O_BINARY
)))
if
(
!
(
azopen
(
&
archive
,
share
->
data_file_name
,
O_RDONLY
|
O_BINARY
)))
{
{
if
(
errno
==
EROFS
||
errno
==
EACCES
)
if
(
errno
==
EROFS
||
errno
==
EACCES
)
...
@@ -599,7 +618,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
...
@@ -599,7 +618,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
}
}
DBUG_PRINT
(
"
info
"
,
(
"archive table was crashed %s"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive table was crashed %s"
,
rc
==
HA_ERR_CRASHED_ON_USAGE
?
"yes"
:
"no"
));
rc
==
HA_ERR_CRASHED_ON_USAGE
?
"yes"
:
"no"
));
if
(
rc
==
HA_ERR_CRASHED_ON_USAGE
&&
open_options
&
HA_OPEN_FOR_REPAIR
)
if
(
rc
==
HA_ERR_CRASHED_ON_USAGE
&&
open_options
&
HA_OPEN_FOR_REPAIR
)
{
{
...
@@ -632,6 +651,8 @@ int ha_archive::close(void)
...
@@ -632,6 +651,8 @@ int ha_archive::close(void)
int
rc
=
0
;
int
rc
=
0
;
DBUG_ENTER
(
"ha_archive::close"
);
DBUG_ENTER
(
"ha_archive::close"
);
destroy_record_buffer
(
record_buffer
);
/* First close stream */
/* First close stream */
if
(
azclose
(
&
archive
))
if
(
azclose
(
&
archive
))
rc
=
1
;
rc
=
1
;
...
@@ -676,7 +697,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
...
@@ -676,7 +697,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if
(
!
(
field
->
flags
&
AUTO_INCREMENT_FLAG
))
if
(
!
(
field
->
flags
&
AUTO_INCREMENT_FLAG
))
{
{
error
=
-
1
;
error
=
-
1
;
DBUG_PRINT
(
"
info
"
,
(
"Index error in creating archive table"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"Index error in creating archive table"
));
goto
error
;
goto
error
;
}
}
}
}
...
@@ -701,7 +722,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
...
@@ -701,7 +722,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if
(
create_info
->
data_file_name
)
if
(
create_info
->
data_file_name
)
{
{
char
linkname
[
FN_REFLEN
];
char
linkname
[
FN_REFLEN
];
DBUG_PRINT
(
"
info
"
,
(
"archive will create stream file %s"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive will create stream file %s"
,
create_info
->
data_file_name
));
create_info
->
data_file_name
));
fn_format
(
name_buff
,
create_info
->
data_file_name
,
""
,
ARZ
,
fn_format
(
name_buff
,
create_info
->
data_file_name
,
""
,
ARZ
,
...
@@ -762,37 +783,74 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
...
@@ -762,37 +783,74 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
{
{
my_off_t
written
;
my_off_t
written
;
uint
*
ptr
,
*
end
;
uint
*
ptr
,
*
end
;
int
r_pack_length
;
byte
size_buffer
[
ARCHIVE_ROW_HEADER_SIZE
];
// Longest possible row length with blobs
DBUG_ENTER
(
"ha_archive::real_write_row"
);
DBUG_ENTER
(
"ha_archive::real_write_row"
);
written
=
azwrite
(
writer
,
buf
,
table
->
s
->
reclength
);
// We pack the row for writing
r_pack_length
=
pack_row
(
buf
);
DBUG_PRINT
(
"ha_archive"
,(
"Pack row length %d"
,
r_pack_length
));
// Store the size of the row before the row
bzero
(
size_buffer
,
ARCHIVE_ROW_HEADER_SIZE
);
int4store
(
size_buffer
,
(
int
)
r_pack_length
);
DBUG_PRINT
(
"ha_archive"
,(
"Pack %d %d %d %d"
,
size_buffer
[
0
],
size_buffer
[
1
],
size_buffer
[
2
],
size_buffer
[
3
]));
azwrite
(
writer
,
size_buffer
,
ARCHIVE_ROW_HEADER_SIZE
);
written
=
azwrite
(
writer
,
record_buffer
->
buffer
,
r_pack_length
);
DBUG_PRINT
(
"ha_archive::real_write_row"
,
(
"Wrote %d bytes expected %d"
,
DBUG_PRINT
(
"ha_archive::real_write_row"
,
(
"Wrote %d bytes expected %d"
,
(
uint32
)
written
,
(
uint32
)
written
,
(
uint32
)
table
->
s
->
rec
length
));
(
uint32
)
r_pack_
length
));
if
(
!
delayed_insert
||
!
bulk_insert
)
if
(
!
delayed_insert
||
!
bulk_insert
)
share
->
dirty
=
TRUE
;
share
->
dirty
=
TRUE
;
if
(
written
!=
(
my_off_t
)
table
->
s
->
rec
length
)
if
(
written
!=
(
my_off_t
)
r_pack_
length
)
DBUG_RETURN
(
errno
?
errno
:
-
1
);
DBUG_RETURN
(
errno
?
errno
:
-
1
);
/*
We should probably mark the table as damagaged if the record is written
DBUG_RETURN
(
0
);
but the blob fails.
}
*/
for
(
ptr
=
table
->
s
->
blob_field
,
end
=
ptr
+
table
->
s
->
blob_fields
;
/* Calculate max length needed for row */
int
ha_archive
::
max_row_length
(
const
byte
*
buf
)
{
ulonglong
length
=
table
->
s
->
reclength
+
table
->
s
->
fields
*
2
;
uint
*
ptr
,
*
end
;
for
(
ptr
=
table
->
s
->
blob_field
,
end
=
ptr
+
table
->
s
->
blob_fields
;
ptr
!=
end
;
ptr
!=
end
;
ptr
++
)
ptr
++
)
{
{
char
*
data_ptr
;
Field_blob
*
blob
=
((
Field_blob
*
)
table
->
field
[
*
ptr
]);
uint32
size
=
((
Field_blob
*
)
table
->
field
[
*
ptr
])
->
get_length
();
length
+=
blob
->
get_length
((
char
*
)
buf
+
blob
->
offset
())
+
2
;
}
if
(
size
)
return
length
;
{
}
((
Field_blob
*
)
table
->
field
[
*
ptr
])
->
get_ptr
(
&
data_ptr
);
written
=
azwrite
(
writer
,
data_ptr
,
(
unsigned
)
size
);
if
(
written
!=
(
my_off_t
)
size
)
int
ha_archive
::
pack_row
(
const
byte
*
record
)
DBUG_RETURN
(
errno
?
errno
:
-
1
);
{
}
byte
*
ptr
;
DBUG_ENTER
(
"ha_archive::pack_row"
);
if
(
table
->
s
->
blob_fields
)
{
if
(
fix_rec_buff
(
max_row_length
(
record
)))
DBUG_RETURN
(
HA_ERR_OUT_OF_MEM
);
/* purecov: inspected */
}
}
DBUG_RETURN
(
0
);
/* Copy null bits */
memcpy
(
record_buffer
->
buffer
,
record
,
table
->
s
->
null_bytes
);
ptr
=
record_buffer
->
buffer
+
table
->
s
->
null_bytes
;
for
(
Field
**
field
=
table
->
field
;
*
field
;
field
++
)
ptr
=
(
byte
*
)
(
*
field
)
->
pack
((
char
*
)
ptr
,
(
char
*
)
record
+
(
*
field
)
->
offset
());
DBUG_RETURN
((
size_t
)
(
ptr
-
record_buffer
->
buffer
));
}
}
...
@@ -809,7 +867,9 @@ int ha_archive::write_row(byte *buf)
...
@@ -809,7 +867,9 @@ int ha_archive::write_row(byte *buf)
{
{
int
rc
;
int
rc
;
byte
*
read_buf
=
NULL
;
byte
*
read_buf
=
NULL
;
byte
*
ptr
;
ulonglong
temp_auto
;
ulonglong
temp_auto
;
DBUG_ENTER
(
"ha_archive::write_row"
);
DBUG_ENTER
(
"ha_archive::write_row"
);
if
(
share
->
crashed
)
if
(
share
->
crashed
)
...
@@ -866,12 +926,6 @@ int ha_archive::write_row(byte *buf)
...
@@ -866,12 +926,6 @@ int ha_archive::write_row(byte *buf)
goto
error
;
goto
error
;
}
}
/*
Now we read and check all of the rows.
if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
if ((longlong)temp_auto ==
mfield->val_int((char*)(read_buf + mfield->offset())))
*/
Field
*
mfield
=
table
->
next_number_field
;
Field
*
mfield
=
table
->
next_number_field
;
while
(
!
(
get_row
(
&
archive
,
read_buf
)))
while
(
!
(
get_row
(
&
archive
,
read_buf
)))
...
@@ -899,37 +953,8 @@ int ha_archive::write_row(byte *buf)
...
@@ -899,37 +953,8 @@ int ha_archive::write_row(byte *buf)
if
(
init_archive_writer
())
if
(
init_archive_writer
())
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
/*
Varchar structures are constant in size but are not cleaned up request
to request. The following sets all unused space to null to improve
compression.
*/
for
(
Field
**
field
=
table
->
field
;
*
field
;
field
++
)
{
/*
Pack length will report 256 when you have 255 bytes
of data plus the single byte for length.
Probably could have added a method to say the number
of bytes taken up by field for the length data.
*/
uint32
actual_length
=
(
*
field
)
->
data_length
()
+
((
*
field
)
->
pack_length
()
>
256
?
2
:
1
);
if
((
*
field
)
->
real_type
()
==
MYSQL_TYPE_VARCHAR
)
{
char
*
ptr
=
(
*
field
)
->
ptr
+
actual_length
;
DBUG_ASSERT
(
actual_length
<=
(
*
field
)
->
pack_length
());
uint32
to_free
=
(
*
field
)
->
pack_length
()
-
actual_length
;
if
(
to_free
>
0
)
bzero
(
ptr
,
to_free
);
}
}
share
->
rows_recorded
++
;
share
->
rows_recorded
++
;
rc
=
real_write_row
(
buf
,
&
(
share
->
archive_write
));
rc
=
real_write_row
(
buf
,
&
(
share
->
archive_write
));
error:
error:
pthread_mutex_unlock
(
&
share
->
mutex
);
pthread_mutex_unlock
(
&
share
->
mutex
);
if
(
read_buf
)
if
(
read_buf
)
...
@@ -1054,7 +1079,7 @@ int ha_archive::rnd_init(bool scan)
...
@@ -1054,7 +1079,7 @@ int ha_archive::rnd_init(bool scan)
if
(
scan
)
if
(
scan
)
{
{
scan_rows
=
share
->
rows_recorded
;
scan_rows
=
share
->
rows_recorded
;
DBUG_PRINT
(
"
info
"
,
(
"archive will retrieve %llu rows"
,
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive will retrieve %llu rows"
,
(
unsigned
long
long
)
scan_rows
));
(
unsigned
long
long
)
scan_rows
));
stats
.
records
=
0
;
stats
.
records
=
0
;
...
@@ -1067,7 +1092,7 @@ int ha_archive::rnd_init(bool scan)
...
@@ -1067,7 +1092,7 @@ int ha_archive::rnd_init(bool scan)
pthread_mutex_lock
(
&
share
->
mutex
);
pthread_mutex_lock
(
&
share
->
mutex
);
if
(
share
->
dirty
==
TRUE
)
if
(
share
->
dirty
==
TRUE
)
{
{
DBUG_PRINT
(
"
info
"
,
(
"archive flushing out rows for scan"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive flushing out rows for scan"
));
azflush
(
&
(
share
->
archive_write
),
Z_SYNC_FLUSH
);
azflush
(
&
(
share
->
archive_write
),
Z_SYNC_FLUSH
);
share
->
forced_flushes
++
;
share
->
forced_flushes
++
;
share
->
dirty
=
FALSE
;
share
->
dirty
=
FALSE
;
...
@@ -1088,16 +1113,90 @@ int ha_archive::rnd_init(bool scan)
...
@@ -1088,16 +1113,90 @@ int ha_archive::rnd_init(bool scan)
positioned where you want it.
positioned where you want it.
*/
*/
int
ha_archive
::
get_row
(
azio_stream
*
file_to_read
,
byte
*
buf
)
int
ha_archive
::
get_row
(
azio_stream
*
file_to_read
,
byte
*
buf
)
{
int
rc
;
DBUG_ENTER
(
"ha_archive::get_row"
);
if
(
share
->
data_version
==
ARCHIVE_VERSION
)
rc
=
get_row_version3
(
file_to_read
,
buf
);
else
rc
=
get_row_version2
(
file_to_read
,
buf
);
DBUG_PRINT
(
"ha_archive"
,
(
"Return %d
\n
"
,
rc
));
DBUG_RETURN
(
rc
);
}
/* Reallocate buffer if needed */
bool
ha_archive
::
fix_rec_buff
(
int
length
)
{
if
(
!
record_buffer
->
buffer
||
length
>
record_buffer
->
length
)
{
byte
*
newptr
;
if
(
!
(
newptr
=
(
byte
*
)
my_realloc
((
gptr
)
record_buffer
->
buffer
,
length
,
MYF
(
MY_ALLOW_ZERO_PTR
))))
return
1
;
/* purecov: inspected */
record_buffer
->
buffer
=
newptr
;
record_buffer
->
length
=
length
;
}
return
0
;
}
int
ha_archive
::
unpack_row
(
azio_stream
*
file_to_read
,
char
*
record
)
{
DBUG_ENTER
(
"ha_archive::unpack_row"
);
int
read
;
// Bytes read, azread() returns int
byte
size_buffer
[
ARCHIVE_ROW_HEADER_SIZE
];
int
row_len
;
/* First we grab the length stored */
read
=
azread
(
file_to_read
,
(
byte
*
)
size_buffer
,
ARCHIVE_ROW_HEADER_SIZE
);
if
(
read
==
Z_STREAM_ERROR
)
DBUG_RETURN
(
HA_ERR_CRASHED_ON_USAGE
);
/* If we read nothing we are at the end of the file */
if
(
read
==
0
||
read
!=
ARCHIVE_ROW_HEADER_SIZE
)
DBUG_RETURN
(
HA_ERR_END_OF_FILE
);
row_len
=
sint4korr
(
size_buffer
);
DBUG_PRINT
(
"ha_archive"
,(
"Unpack row length %d -> %llu"
,
row_len
,
(
unsigned
long
long
)
table
->
s
->
reclength
));
fix_rec_buff
(
row_len
);
if
(
azread
(
file_to_read
,
record_buffer
->
buffer
,
row_len
)
!=
row_len
)
DBUG_RETURN
(
-
1
);
/* Copy null bits */
const
char
*
ptr
=
(
const
char
*
)
record_buffer
->
buffer
;
memcpy
(
record
,
ptr
,
table
->
s
->
null_bytes
);
ptr
+=
table
->
s
->
null_bytes
;
for
(
Field
**
field
=
table
->
field
;
*
field
;
field
++
)
ptr
=
(
*
field
)
->
unpack
(
record
+
(
*
field
)
->
offset
(),
ptr
);
DBUG_RETURN
(
0
);
}
int
ha_archive
::
get_row_version3
(
azio_stream
*
file_to_read
,
byte
*
buf
)
{
DBUG_ENTER
(
"ha_archive::get_row_version3"
);
int
returnable
=
unpack_row
(
file_to_read
,
buf
);
DBUG_RETURN
(
returnable
);
}
int
ha_archive
::
get_row_version2
(
azio_stream
*
file_to_read
,
byte
*
buf
)
{
{
int
read
;
// Bytes read, azread() returns int
int
read
;
// Bytes read, azread() returns int
uint
*
ptr
,
*
end
;
uint
*
ptr
,
*
end
;
char
*
last
;
char
*
last
;
size_t
total_blob_length
=
0
;
size_t
total_blob_length
=
0
;
MY_BITMAP
*
read_set
=
table
->
read_set
;
MY_BITMAP
*
read_set
=
table
->
read_set
;
DBUG_ENTER
(
"ha_archive::get_row"
);
DBUG_ENTER
(
"ha_archive::get_row
_version2
"
);
read
=
azread
(
file_to_read
,
buf
,
table
->
s
->
reclength
);
read
=
azread
(
file_to_read
,
buf
,
table
->
s
->
reclength
);
DBUG_PRINT
(
"ha_archive::get_row"
,
(
"Read %d bytes expected %lu"
,
read
,
DBUG_PRINT
(
"ha_archive::get_row
_version2
"
,
(
"Read %d bytes expected %lu"
,
read
,
(
unsigned
long
)
table
->
s
->
reclength
));
(
unsigned
long
)
table
->
s
->
reclength
));
if
(
read
==
Z_STREAM_ERROR
)
if
(
read
==
Z_STREAM_ERROR
)
...
@@ -1266,8 +1365,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1266,8 +1365,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if
(
check_opt
->
flags
==
T_EXTEND
)
if
(
check_opt
->
flags
==
T_EXTEND
)
{
{
DBUG_PRINT
(
"
info
"
,
(
"archive extended rebuild"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive extended rebuild"
));
byte
*
buf
;
byte
*
buf
;
archive_record_buffer
*
write_buffer
,
*
read_buffer
,
*
original_buffer
;
original_buffer
=
record_buffer
;
/*
/*
First we create a buffer that we can use for reading rows, and can pass
First we create a buffer that we can use for reading rows, and can pass
...
@@ -1279,6 +1381,15 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1279,6 +1381,15 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
goto
error
;
goto
error
;
}
}
read_buffer
=
create_record_buffer
(
record_buffer
->
length
);
write_buffer
=
create_record_buffer
(
record_buffer
->
length
);
if
(
!
write_buffer
||
!
read_buffer
)
{
rc
=
HA_ERR_OUT_OF_MEM
;
goto
error
;
}
/*
/*
Now we will rewind the archive file so that we are positioned at the
Now we will rewind the archive file so that we are positioned at the
start of the file.
start of the file.
...
@@ -1300,8 +1411,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1300,8 +1411,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
{
share
->
rows_recorded
=
0
;
share
->
rows_recorded
=
0
;
stats
.
auto_increment_value
=
share
->
auto_increment_value
=
0
;
stats
.
auto_increment_value
=
share
->
auto_increment_value
=
0
;
record_buffer
=
read_buffer
;
while
(
!
(
rc
=
get_row
(
&
archive
,
buf
)))
while
(
!
(
rc
=
get_row
(
&
archive
,
buf
)))
{
{
record_buffer
=
write_buffer
;
real_write_row
(
buf
,
&
writer
);
real_write_row
(
buf
,
&
writer
);
if
(
table
->
found_next_number_field
)
if
(
table
->
found_next_number_field
)
{
{
...
@@ -1313,18 +1427,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1313,18 +1427,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
auto_value
;
auto_value
;
}
}
share
->
rows_recorded
++
;
share
->
rows_recorded
++
;
record_buffer
=
read_buffer
;
}
}
}
}
DBUG_PRINT
(
"info"
,
(
"recovered %llu archive rows"
,
DBUG_PRINT
(
"ha_archive"
,
(
"recovered %llu archive rows"
,
(
unsigned
long
long
)
share
->
rows_recorded
));
(
unsigned
long
long
)
share
->
rows_recorded
));
record_buffer
=
original_buffer
;
destroy_record_buffer
(
read_buffer
);
destroy_record_buffer
(
write_buffer
);
my_free
((
char
*
)
buf
,
MYF
(
0
));
my_free
((
char
*
)
buf
,
MYF
(
0
));
if
(
rc
&&
rc
!=
HA_ERR_END_OF_FILE
)
if
(
rc
&&
rc
!=
HA_ERR_END_OF_FILE
)
goto
error
;
goto
error
;
}
}
else
else
{
{
DBUG_PRINT
(
"
info
"
,
(
"archive quick rebuild"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive quick rebuild"
));
/*
/*
The quick method is to just read the data raw, and then compress it directly.
The quick method is to just read the data raw, and then compress it directly.
*/
*/
...
@@ -1333,7 +1453,7 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1333,7 +1453,7 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if
(
azrewind
(
&
archive
)
==
-
1
)
if
(
azrewind
(
&
archive
)
==
-
1
)
{
{
rc
=
HA_ERR_CRASHED_ON_USAGE
;
rc
=
HA_ERR_CRASHED_ON_USAGE
;
DBUG_PRINT
(
"
info
"
,
(
"archive HA_ERR_CRASHED_ON_USAGE"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"archive HA_ERR_CRASHED_ON_USAGE"
));
goto
error
;
goto
error
;
}
}
...
@@ -1359,12 +1479,12 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
...
@@ -1359,12 +1479,12 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
know it failed.
know it failed.
We also need to reopen our read descriptor since it has changed.
We also need to reopen our read descriptor since it has changed.
*/
*/
DBUG_PRINT
(
"
info
"
,
(
"Reopening archive data file"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"Reopening archive data file"
));
if
(
!
azopen
(
&
(
share
->
archive_write
),
share
->
data_file_name
,
if
(
!
azopen
(
&
(
share
->
archive_write
),
share
->
data_file_name
,
O_WRONLY
|
O_APPEND
|
O_BINARY
)
||
O_WRONLY
|
O_APPEND
|
O_BINARY
)
||
!
azopen
(
&
archive
,
share
->
data_file_name
,
O_RDONLY
|
O_BINARY
))
!
azopen
(
&
archive
,
share
->
data_file_name
,
O_RDONLY
|
O_BINARY
))
{
{
DBUG_PRINT
(
"
info
"
,
(
"Could not open archive write file"
));
DBUG_PRINT
(
"
ha_archive
"
,
(
"Could not open archive write file"
));
rc
=
HA_ERR_CRASHED_ON_USAGE
;
rc
=
HA_ERR_CRASHED_ON_USAGE
;
}
}
...
@@ -1577,6 +1697,36 @@ bool ha_archive::check_and_repair(THD *thd)
...
@@ -1577,6 +1697,36 @@ bool ha_archive::check_and_repair(THD *thd)
DBUG_RETURN
(
repair
(
thd
,
&
check_opt
));
DBUG_RETURN
(
repair
(
thd
,
&
check_opt
));
}
}
archive_record_buffer
*
ha_archive
::
create_record_buffer
(
ulonglong
length
)
{
DBUG_ENTER
(
"ha_archive::create_record_buffer"
);
archive_record_buffer
*
r
;
if
(
!
(
r
=
(
archive_record_buffer
*
)
my_malloc
(
sizeof
(
archive_record_buffer
),
MYF
(
MY_WME
))))
{
DBUG_RETURN
(
NULL
);
/* purecov: inspected */
}
r
->
length
=
(
int
)
length
;
if
(
!
(
r
->
buffer
=
(
byte
*
)
my_malloc
(
r
->
length
,
MYF
(
MY_WME
))))
{
my_free
((
char
*
)
r
,
MYF
(
MY_ALLOW_ZERO_PTR
));
DBUG_RETURN
(
NULL
);
/* purecov: inspected */
}
DBUG_RETURN
(
r
);
}
void
ha_archive
::
destroy_record_buffer
(
archive_record_buffer
*
r
)
{
DBUG_ENTER
(
"ha_archive::destroy_record_buffer"
);
my_free
((
char
*
)
r
->
buffer
,
MYF
(
MY_ALLOW_ZERO_PTR
));
my_free
((
char
*
)
r
,
MYF
(
MY_ALLOW_ZERO_PTR
));
DBUG_VOID_RETURN
;
}
struct
st_mysql_storage_engine
archive_storage_engine
=
struct
st_mysql_storage_engine
archive_storage_engine
=
{
MYSQL_HANDLERTON_INTERFACE_VERSION
};
{
MYSQL_HANDLERTON_INTERFACE_VERSION
};
...
@@ -1590,7 +1740,7 @@ mysql_declare_plugin(archive)
...
@@ -1590,7 +1740,7 @@ mysql_declare_plugin(archive)
PLUGIN_LICENSE_GPL
,
PLUGIN_LICENSE_GPL
,
archive_db_init
,
/* Plugin Init */
archive_db_init
,
/* Plugin Init */
archive_db_done
,
/* Plugin Deinit */
archive_db_done
,
/* Plugin Deinit */
0x0
1
00
/* 1.0 */
,
0x0
3
00
/* 1.0 */
,
NULL
,
/* status variables */
NULL
,
/* status variables */
NULL
,
/* system variables */
NULL
,
/* system variables */
NULL
/* config options */
NULL
/* config options */
...
...
storage/archive/ha_archive.h
View file @
c4a1c72b
...
@@ -27,6 +27,12 @@
...
@@ -27,6 +27,12 @@
ha_example.h.
ha_example.h.
*/
*/
typedef
struct
st_archive_record_buffer
{
byte
*
buffer
;
int
length
;
}
archive_record_buffer
;
typedef
struct
st_archive_share
{
typedef
struct
st_archive_share
{
char
*
table_name
;
char
*
table_name
;
char
data_file_name
[
FN_REFLEN
];
char
data_file_name
[
FN_REFLEN
];
...
@@ -43,18 +49,23 @@ typedef struct st_archive_share {
...
@@ -43,18 +49,23 @@ typedef struct st_archive_share {
ulonglong
forced_flushes
;
ulonglong
forced_flushes
;
ulonglong
mean_rec_length
;
ulonglong
mean_rec_length
;
char
real_path
[
FN_REFLEN
];
char
real_path
[
FN_REFLEN
];
uint
meta_version
;
uint
data_version
;
}
ARCHIVE_SHARE
;
}
ARCHIVE_SHARE
;
/*
/*
Version for file format.
Version for file format.
1 - Initial Version
1 - Initial Version (Never Released)
2 - Stream Compression, seperate blobs, no packing
3 - One steam (row and blobs), with packing
*/
*/
#define ARCHIVE_VERSION
2
#define ARCHIVE_VERSION
3
class
ha_archive
:
public
handler
class
ha_archive
:
public
handler
{
{
THR_LOCK_DATA
lock
;
/* MySQL lock */
THR_LOCK_DATA
lock
;
/* MySQL lock */
ARCHIVE_SHARE
*
share
;
/* Shared lock info */
ARCHIVE_SHARE
*
share
;
/* Shared lock info */
azio_stream
archive
;
/* Archive file we are working with */
azio_stream
archive
;
/* Archive file we are working with */
my_off_t
current_position
;
/* The position of the row we just read */
my_off_t
current_position
;
/* The position of the row we just read */
byte
byte_buffer
[
IO_SIZE
];
/* Initial buffer for our string */
byte
byte_buffer
[
IO_SIZE
];
/* Initial buffer for our string */
...
@@ -65,6 +76,10 @@ class ha_archive: public handler
...
@@ -65,6 +76,10 @@ class ha_archive: public handler
const
byte
*
current_key
;
const
byte
*
current_key
;
uint
current_key_len
;
uint
current_key_len
;
uint
current_k_offset
;
uint
current_k_offset
;
archive_record_buffer
*
record_buffer
;
archive_record_buffer
*
create_record_buffer
(
ulonglong
length
);
void
destroy_record_buffer
(
archive_record_buffer
*
r
);
public:
public:
ha_archive
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
ha_archive
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
...
@@ -105,7 +120,10 @@ class ha_archive: public handler
...
@@ -105,7 +120,10 @@ class ha_archive: public handler
int
rnd_next
(
byte
*
buf
);
int
rnd_next
(
byte
*
buf
);
int
rnd_pos
(
byte
*
buf
,
byte
*
pos
);
int
rnd_pos
(
byte
*
buf
,
byte
*
pos
);
int
get_row
(
azio_stream
*
file_to_read
,
byte
*
buf
);
int
get_row
(
azio_stream
*
file_to_read
,
byte
*
buf
);
int
get_row_version2
(
azio_stream
*
file_to_read
,
byte
*
buf
);
int
get_row_version3
(
azio_stream
*
file_to_read
,
byte
*
buf
);
int
read_meta_file
(
File
meta_file
,
ha_rows
*
rows
,
int
read_meta_file
(
File
meta_file
,
ha_rows
*
rows
,
uint
*
meta_version
,
ulonglong
*
auto_increment
,
ulonglong
*
auto_increment
,
ulonglong
*
forced_flushes
,
ulonglong
*
forced_flushes
,
char
*
real_path
);
char
*
real_path
);
...
@@ -137,5 +155,9 @@ class ha_archive: public handler
...
@@ -137,5 +155,9 @@ class ha_archive: public handler
bool
is_crashed
()
const
;
bool
is_crashed
()
const
;
int
check
(
THD
*
thd
,
HA_CHECK_OPT
*
check_opt
);
int
check
(
THD
*
thd
,
HA_CHECK_OPT
*
check_opt
);
bool
check_and_repair
(
THD
*
thd
);
bool
check_and_repair
(
THD
*
thd
);
int
max_row_length
(
const
byte
*
buf
);
bool
fix_rec_buff
(
int
length
);
int
unpack_row
(
azio_stream
*
file_to_read
,
char
*
record
);
int
pack_row
(
const
byte
*
record
);
};
};
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment