Commit 48e4c8af authored by Vlad Lesin's avatar Vlad Lesin

MENT-344: Store Backups in S3

Mtr test is required, but there must be ability to remove backup from S3
to test it with MTR.
parent 3616175f
...@@ -54,7 +54,8 @@ ENDIF() ...@@ -54,7 +54,8 @@ ENDIF()
ADD_DEFINITIONS(-DPCRE_STATIC=1) ADD_DEFINITIONS(-DPCRE_STATIC=1)
ADD_DEFINITIONS(${SSL_DEFINES}) ADD_DEFINITIONS(${SSL_DEFINES})
MYSQL_ADD_EXECUTABLE(mariabackup
SET (MARIABACKUP_SOURCES
xtrabackup.cc xtrabackup.cc
innobackupex.cc innobackupex.cc
changed_page_bitmap.cc changed_page_bitmap.cc
...@@ -78,10 +79,34 @@ MYSQL_ADD_EXECUTABLE(mariabackup ...@@ -78,10 +79,34 @@ MYSQL_ADD_EXECUTABLE(mariabackup
${PROJECT_SOURCE_DIR}/sql/net_serv.cc ${PROJECT_SOURCE_DIR}/sql/net_serv.cc
${NT_SERVICE_SOURCE} ${NT_SERVICE_SOURCE}
${PROJECT_SOURCE_DIR}/libmysqld/libmysql.c ${PROJECT_SOURCE_DIR}/libmysqld/libmysql.c
COMPONENT backup )
SET (MBSTREAM_SOURCES
ds_buffer.cc
ds_local.cc
ds_stdout.cc
datasink.cc
xbstream.cc
xbstream_read.cc
xbstream_write.cc
)
SET (MARIABACKUP_LIBS sql sql_builtins crc)
SET (MBSTREAM_LIBS mysys crc)
IF(PLUGIN_S3 STREQUAL YES)
ADD_DEFINITIONS(-DWITH_S3_STORAGE_ENGINE)
INCLUDE_DIRECTORIES(
${CMAKE_SOURCE_DIR}/storage/maria
${CMAKE_SOURCE_DIR}/storage/maria/libmarias3
) )
LIST(APPEND MARIABACKUP_SOURCES ds_s3.cc)
LIST(APPEND MARIABACKUP_LIBS s3)
LIST(APPEND MBSTREAM_SOURCES
ds_s3.cc ${PROJECT_SOURCE_DIR}/libmysqld/libmysql.c)
LIST(APPEND MBSTREAM_LIBS s3 sql sql_builtins)
ENDIF()
MYSQL_ADD_EXECUTABLE(mariabackup ${MARIABACKUP_SOURCES} COMPONENT backup)
# Export all symbols on Unix, for better crash callstacks # Export all symbols on Unix, for better crash callstacks
...@@ -89,7 +114,7 @@ SET_TARGET_PROPERTIES(mariabackup PROPERTIES ENABLE_EXPORTS TRUE) ...@@ -89,7 +114,7 @@ SET_TARGET_PROPERTIES(mariabackup PROPERTIES ENABLE_EXPORTS TRUE)
ADD_SUBDIRECTORY(crc) ADD_SUBDIRECTORY(crc)
TARGET_LINK_LIBRARIES(mariabackup sql sql_builtins crc) TARGET_LINK_LIBRARIES(mariabackup ${MARIABACKUP_LIBS})
IF(NOT HAVE_SYSTEM_REGEX) IF(NOT HAVE_SYSTEM_REGEX)
TARGET_LINK_LIBRARIES(mariabackup pcreposix) TARGET_LINK_LIBRARIES(mariabackup pcreposix)
ENDIF() ENDIF()
...@@ -98,22 +123,9 @@ ENDIF() ...@@ -98,22 +123,9 @@ ENDIF()
######################################################################## ########################################################################
# xbstream binary # xbstream binary
######################################################################## ########################################################################
MYSQL_ADD_EXECUTABLE(mbstream MYSQL_ADD_EXECUTABLE(mbstream ${MBSTREAM_SOURCES} COMPONENT backup)
ds_buffer.cc
ds_local.cc
ds_stdout.cc
datasink.cc
xbstream.cc
xbstream_read.cc
xbstream_write.cc
COMPONENT backup
)
TARGET_LINK_LIBRARIES(mbstream ${MBSTREAM_LIBS})
TARGET_LINK_LIBRARIES(mbstream
mysys
crc
)
ADD_DEPENDENCIES(mbstream GenError) ADD_DEPENDENCIES(mbstream GenError)
IF(MSVC) IF(MSVC)
......
...@@ -29,11 +29,14 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -29,11 +29,14 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
#include "ds_stdout.h" #include "ds_stdout.h"
#include "ds_tmpfile.h" #include "ds_tmpfile.h"
#include "ds_buffer.h" #include "ds_buffer.h"
#ifdef WITH_S3_STORAGE_ENGINE
#include "ds_s3.h"
#endif // WITH_S3_STORAGE_ENGINE
/************************************************************************ /************************************************************************
Create a datasink of the specified type */ Create a datasink of the specified type */
ds_ctxt_t * ds_ctxt_t *
ds_create(const char *root, ds_type_t type) ds_create(const void *ds_data, ds_type_t type)
{ {
datasink_t *ds; datasink_t *ds;
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
...@@ -69,13 +72,18 @@ ds_create(const char *root, ds_type_t type) ...@@ -69,13 +72,18 @@ ds_create(const char *root, ds_type_t type)
case DS_TYPE_BUFFER: case DS_TYPE_BUFFER:
ds = &datasink_buffer; ds = &datasink_buffer;
break; break;
#ifdef WITH_S3_STORAGE_ENGINE
case DS_TYPE_S3:
ds = &datasink_s3;
break;
#endif // WITH_S3_STORAGE_ENGINE
default: default:
msg("Unknown datasink type: %d", type); msg("Unknown datasink type: %d", type);
xb_ad(0); xb_ad(0);
return NULL; return NULL;
} }
ctxt = ds->init(root); ctxt = ds->init(ds_data);
if (ctxt != NULL) { if (ctxt != NULL) {
ctxt->datasink = ds; ctxt->datasink = ds;
} else { } else {
......
...@@ -46,7 +46,7 @@ typedef struct { ...@@ -46,7 +46,7 @@ typedef struct {
} ds_file_t; } ds_file_t;
struct datasink_struct { struct datasink_struct {
ds_ctxt_t *(*init)(const char *root); ds_ctxt_t *(*init)(const void *ds_data);
ds_file_t *(*open)(ds_ctxt_t *ctxt, const char *path, MY_STAT *stat); ds_file_t *(*open)(ds_ctxt_t *ctxt, const char *path, MY_STAT *stat);
int (*write)(ds_file_t *file, const unsigned char *buf, size_t len); int (*write)(ds_file_t *file, const unsigned char *buf, size_t len);
int (*close)(ds_file_t *file); int (*close)(ds_file_t *file);
...@@ -63,12 +63,15 @@ typedef enum { ...@@ -63,12 +63,15 @@ typedef enum {
DS_TYPE_ENCRYPT, DS_TYPE_ENCRYPT,
DS_TYPE_DECRYPT, DS_TYPE_DECRYPT,
DS_TYPE_TMPFILE, DS_TYPE_TMPFILE,
DS_TYPE_BUFFER DS_TYPE_BUFFER,
#ifdef WITH_S3_STORAGE_ENGINE
DS_TYPE_S3
#endif // WITH_S3_STORAGE_ENGINE
} ds_type_t; } ds_type_t;
/************************************************************************ /************************************************************************
Create a datasink of the specified type */ Create a datasink of the specified type */
ds_ctxt_t *ds_create(const char *root, ds_type_t type); ds_ctxt_t *ds_create(const void *ds_data, ds_type_t type);
/************************************************************************ /************************************************************************
Open a datasink file */ Open a datasink file */
......
...@@ -45,7 +45,7 @@ typedef struct { ...@@ -45,7 +45,7 @@ typedef struct {
/*********************************************************************** /***********************************************************************
General archive interface */ General archive interface */
static ds_ctxt_t *archive_init(const char *root); static ds_ctxt_t *archive_init(const void *ds_data);
static ds_file_t *archive_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *archive_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int archive_write(ds_file_t *file, const void *buf, size_t len); static int archive_write(ds_file_t *file, const void *buf, size_t len);
...@@ -96,7 +96,7 @@ my_archive_close_callback(struct archive *a __attribute__((unused)), ...@@ -96,7 +96,7 @@ my_archive_close_callback(struct archive *a __attribute__((unused)),
static static
ds_ctxt_t * ds_ctxt_t *
archive_init(const char *root __attribute__((unused))) archive_init(const void *ds_data __attribute__((unused)))
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ds_archive_ctxt_t *archive_ctxt; ds_archive_ctxt_t *archive_ctxt;
......
...@@ -42,7 +42,7 @@ typedef struct { ...@@ -42,7 +42,7 @@ typedef struct {
size_t buffer_size; size_t buffer_size;
} ds_buffer_ctxt_t; } ds_buffer_ctxt_t;
static ds_ctxt_t *buffer_init(const char *root); static ds_ctxt_t *buffer_init(const void *ds_data);
static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int buffer_write(ds_file_t *file, const uchar *buf, size_t len); static int buffer_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -66,7 +66,7 @@ void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size) ...@@ -66,7 +66,7 @@ void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
} }
static ds_ctxt_t * static ds_ctxt_t *
buffer_init(const char *root) buffer_init(const void *ds_data)
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ds_buffer_ctxt_t *buffer_ctxt; ds_buffer_ctxt_t *buffer_ctxt;
...@@ -77,7 +77,7 @@ buffer_init(const char *root) ...@@ -77,7 +77,7 @@ buffer_init(const char *root)
buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE; buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
ctxt->ptr = buffer_ctxt; ctxt->ptr = buffer_ctxt;
ctxt->root = my_strdup(root, MYF(MY_FAE)); ctxt->root = my_strdup(static_cast<const char *>(ds_data), MYF(MY_FAE));
return ctxt; return ctxt;
} }
......
...@@ -63,7 +63,7 @@ extern char *xtrabackup_compress_alg; ...@@ -63,7 +63,7 @@ extern char *xtrabackup_compress_alg;
extern uint xtrabackup_compress_threads; extern uint xtrabackup_compress_threads;
extern ulonglong xtrabackup_compress_chunk_size; extern ulonglong xtrabackup_compress_chunk_size;
static ds_ctxt_t *compress_init(const char *root); static ds_ctxt_t *compress_init(const void *ds_data);
static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int compress_write(ds_file_t *file, const uchar *buf, size_t len); static int compress_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -87,7 +87,7 @@ static void *compress_worker_thread_func(void *arg); ...@@ -87,7 +87,7 @@ static void *compress_worker_thread_func(void *arg);
static static
ds_ctxt_t * ds_ctxt_t *
compress_init(const char *root) compress_init(const void *ds_data)
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ds_compress_ctxt_t *compress_ctxt; ds_compress_ctxt_t *compress_ctxt;
...@@ -109,7 +109,7 @@ compress_init(const char *root) ...@@ -109,7 +109,7 @@ compress_init(const char *root)
compress_ctxt->nthreads = xtrabackup_compress_threads; compress_ctxt->nthreads = xtrabackup_compress_threads;
ctxt->ptr = compress_ctxt; ctxt->ptr = compress_ctxt;
ctxt->root = my_strdup(root, MYF(MY_FAE)); ctxt->root = my_strdup(static_cast<const char *>(ds_data), MYF(MY_FAE));
return ctxt; return ctxt;
} }
......
...@@ -36,7 +36,7 @@ typedef struct { ...@@ -36,7 +36,7 @@ typedef struct {
size_t pagesize; size_t pagesize;
} ds_local_file_t; } ds_local_file_t;
static ds_ctxt_t *local_init(const char *root); static ds_ctxt_t *local_init(const void *ds_data);
static ds_file_t *local_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *local_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int local_write(ds_file_t *file, const uchar *buf, size_t len); static int local_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -55,23 +55,23 @@ datasink_t datasink_local = { ...@@ -55,23 +55,23 @@ datasink_t datasink_local = {
static static
ds_ctxt_t * ds_ctxt_t *
local_init(const char *root) local_init(const void *ds_data)
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
if (my_mkdir(root, 0777, MYF(0)) < 0 if (my_mkdir(static_cast<const char *>(ds_data), 0777, MYF(0)) < 0
&& my_errno != EEXIST && my_errno != EISDIR) && my_errno != EEXIST && my_errno != EISDIR)
{ {
char errbuf[MYSYS_STRERROR_SIZE]; char errbuf[MYSYS_STRERROR_SIZE];
my_strerror(errbuf, sizeof(errbuf),my_errno); my_strerror(errbuf, sizeof(errbuf),my_errno);
my_error(EE_CANT_MKDIR, MYF(ME_BELL), my_error(EE_CANT_MKDIR, MYF(ME_BELL),
root, my_errno,errbuf, my_errno); ds_data, my_errno,errbuf, my_errno);
return NULL; return NULL;
} }
ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t), MYF(MY_FAE)); ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t), MYF(MY_FAE));
ctxt->root = my_strdup(root, MYF(MY_FAE)); ctxt->root = my_strdup(static_cast<const char *>(ds_data), MYF(MY_FAE));
return ctxt; return ctxt;
} }
......
#include "common.h"
#include "ds_s3.h"
#include <string>
#include "maria_def.h"
#include "s3_func.h"
struct ds_s3_ctx_t {
ms3_st *client;
const char *bucket;
const char *path;
uint64_t seq_num;
};
static ds_ctxt_t *s3_init(const void *s3_client);
static ds_file_t *s3_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat);
static int s3_write(ds_file_t *file, const uchar *buf, size_t len);
static int s3_close(ds_file_t *file);
static void s3_deinit(ds_ctxt_t *ctxt);
datasink_t datasink_s3 = {
&s3_init,
&s3_open,
&s3_write,
&s3_close,
&s3_deinit
};
static
ds_ctxt_t *
s3_init(const void *args_void)
{
const ds_s3_args *args = static_cast<const ds_s3_args *>(args_void);
s3_init_library();
S3_INFO info;
info.protocol_version= (uint8_t) args->protocol_version;
lex_string_set(&info.host_name, args->host_name);
lex_string_set(&info.access_key, args->access_key);
lex_string_set(&info.secret_key, args->secret_key);
lex_string_set(&info.region, args->region);
lex_string_set(&info.bucket, args->bucket);
ms3_st *s3_client;
if (!(s3_client= s3_open_connection(&info)))
die("Can't open connection to S3, error: %d %s", errno, ms3_error(errno));
ms3_status_st status;
if (!ms3_status(s3_client, args->bucket, args->path, &status))
die("Can't stream to s3://%s%s as it already exists",
args->bucket, args->path);
ms3_list_st *list = nullptr;
if (!ms3_list_dir(s3_client, args->bucket, args->path, &list) && list) {
ms3_list_free(list);
die("Can't stream to s3://%s%s as it already exists",
args->bucket, args->path);
}
ds_ctxt_t *ctxt;
ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_s3_ctx_t),
MYF(MY_FAE));
ds_s3_ctx_t *s3_ctx = reinterpret_cast<ds_s3_ctx_t *>(ctxt + 1);
s3_ctx->client = s3_client;
s3_ctx->bucket = args->bucket;
s3_ctx->path = args->path;
s3_ctx->seq_num = 0;
ctxt->ptr = s3_ctx;
return ctxt;
}
static
ds_file_t *
s3_open(ds_ctxt_t *ctxt,
const char *path __attribute__((unused)),
MY_STAT *mystat __attribute__((unused)))
{
static char ds_s3_file_path[] = "s3";
ds_file_t *file;
file = (ds_file_t *) my_malloc(sizeof(ds_file_t), MYF(MY_FAE));
file->ptr = ctxt->ptr;
file->path = ds_s3_file_path;
return file;
}
static
int
s3_write(ds_file_t *file, const uchar *buf, size_t len)
{
ds_s3_ctx_t *ctx = static_cast<ds_s3_ctx_t *>(file->ptr);
std::string block_path(ctx->path);
block_path.append("/").append(std::to_string(ctx->seq_num++));
return s3_put_object(ctx->client, ctx->bucket, block_path.c_str(),
const_cast<uchar *>(buf), len, 0);
}
static
int
s3_close(ds_file_t *file)
{
my_free(file);
return 1;
}
static
void
s3_deinit(ds_ctxt_t *ctxt)
{
ms3_deinit(static_cast<ds_s3_ctx_t *>(ctxt->ptr)->client);
s3_deinit_library();
my_free(ctxt);
}
/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
Local datasink interface for XtraBackup.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*******************************************************/
#ifndef DS_S3_H
#define DS_S3_H
#include "datasink.h"
extern datasink_t datasink_s3;
struct ds_s3_args {
const char *access_key;
const char *secret_key;
const char *region;
const char *host_name;
const char *bucket;
const char *path;
ulong protocol_version;
};
#endif
...@@ -28,7 +28,7 @@ typedef struct { ...@@ -28,7 +28,7 @@ typedef struct {
File fd; File fd;
} ds_stdout_file_t; } ds_stdout_file_t;
static ds_ctxt_t *stdout_init(const char *root); static ds_ctxt_t *stdout_init(const void *ds_data);
static ds_file_t *stdout_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *stdout_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int stdout_write(ds_file_t *file, const uchar *buf, size_t len); static int stdout_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -45,13 +45,13 @@ datasink_t datasink_stdout = { ...@@ -45,13 +45,13 @@ datasink_t datasink_stdout = {
static static
ds_ctxt_t * ds_ctxt_t *
stdout_init(const char *root) stdout_init(const void *ds_data)
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t), MYF(MY_FAE)); ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t), MYF(MY_FAE));
ctxt->root = my_strdup(root, MYF(MY_FAE)); ctxt->root = my_strdup(static_cast<const char *>(ds_data), MYF(MY_FAE));
return ctxt; return ctxt;
} }
......
...@@ -39,7 +39,7 @@ typedef struct { ...@@ -39,7 +39,7 @@ typedef struct {
ds_file_t *file; ds_file_t *file;
} ds_tmp_file_t; } ds_tmp_file_t;
static ds_ctxt_t *tmpfile_init(const char *root); static ds_ctxt_t *tmpfile_init(const void *ds_data);
static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len); static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -56,7 +56,7 @@ datasink_t datasink_tmpfile = { ...@@ -56,7 +56,7 @@ datasink_t datasink_tmpfile = {
static ds_ctxt_t * static ds_ctxt_t *
tmpfile_init(const char *root) tmpfile_init(const void *ds_data)
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ds_tmpfile_ctxt_t *tmpfile_ctxt; ds_tmpfile_ctxt_t *tmpfile_ctxt;
...@@ -72,7 +72,7 @@ tmpfile_init(const char *root) ...@@ -72,7 +72,7 @@ tmpfile_init(const char *root)
} }
ctxt->ptr = tmpfile_ctxt; ctxt->ptr = tmpfile_ctxt;
ctxt->root = my_strdup(root, MYF(MY_FAE)); ctxt->root = my_strdup(static_cast<const char *>(ds_data), MYF(MY_FAE));
return ctxt; return ctxt;
} }
......
...@@ -38,7 +38,7 @@ typedef struct { ...@@ -38,7 +38,7 @@ typedef struct {
/*********************************************************************** /***********************************************************************
General streaming interface */ General streaming interface */
static ds_ctxt_t *xbstream_init(const char *root); static ds_ctxt_t *xbstream_init(const void *ds_data);
static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path, static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
MY_STAT *mystat); MY_STAT *mystat);
static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len); static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len);
...@@ -73,7 +73,7 @@ my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)), ...@@ -73,7 +73,7 @@ my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)),
static static
ds_ctxt_t * ds_ctxt_t *
xbstream_init(const char *root __attribute__((unused))) xbstream_init(const void *ds_data __attribute__((unused)))
{ {
ds_ctxt_t *ctxt; ds_ctxt_t *ctxt;
ds_stream_ctxt_t *stream_ctxt; ds_stream_ctxt_t *stream_ctxt;
......
...@@ -27,6 +27,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -27,6 +27,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
#include "xbstream.h" #include "xbstream.h"
#include "datasink.h" #include "datasink.h"
#include "crc_glue.h" #include "crc_glue.h"
#ifdef WITH_S3_STORAGE_ENGINE
#include "maria_def.h"
#include "s3_func.h"
#endif // WITH_S3_STORAGE_ENGINE
#define XBSTREAM_VERSION "1.0" #define XBSTREAM_VERSION "1.0"
#define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL) #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
...@@ -51,6 +55,29 @@ static char * opt_directory = NULL; ...@@ -51,6 +55,29 @@ static char * opt_directory = NULL;
static my_bool opt_verbose = 0; static my_bool opt_verbose = 0;
static int opt_parallel = 1; static int opt_parallel = 1;
#ifdef WITH_S3_STORAGE_ENGINE
static const char *opt_s3_access_key;
static const char *opt_s3_secret_key;
static const char *opt_s3_region = "eu-north-1";
static const char *opt_s3_host_name = "s3.amazonaws.com";
static const char *opt_s3_bucket = "MariaDB";
static const char *opt_s3_path = "/backup.xbstream";
static ulong opt_s3_protocol_version;
#include <../../client/client_priv.h>
enum options_xtrabackup
{
OPT_S3_ACCESS_KEY = OPT_MAX_CLIENT_OPTION + 1000,
OPT_S3_SECRET_KEY,
OPT_S3_REGION,
OPT_S3_HOST_NAME,
OPT_S3_BUCKET,
OPT_S3_PATH,
OPT_S3_PROTOCOL_VERSION
};
#endif // WITH_S3_STORAGE_ENGINE
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
{"help", '?', "Display this help and exit.", {"help", '?', "Display this help and exit.",
...@@ -69,12 +96,39 @@ static struct my_option my_long_options[] = ...@@ -69,12 +96,39 @@ static struct my_option my_long_options[] =
&opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG, &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
1, 1, INT_MAX, 0, 0, 0}, 1, 1, INT_MAX, 0, 0, 0},
#ifdef WITH_S3_STORAGE_ENGINE
{"s3_access_key", OPT_S3_ACCESS_KEY, "AWS access key ID",
(char**) &opt_s3_access_key, (char**) &opt_s3_access_key, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_region", OPT_S3_REGION, "AWS region",
(char**) &opt_s3_region, (char**) &opt_s3_region, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_secret_key", OPT_S3_SECRET_KEY, "AWS secret access key ID",
(char**) &opt_s3_secret_key, (char**) &opt_s3_secret_key, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_bucket", OPT_S3_BUCKET, "AWS prefix for backup",
(char**) &opt_s3_bucket, (char**) &opt_s3_bucket, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_path", OPT_S3_PATH, "AWS path for backup",
(char**) &opt_s3_path, (char**) &opt_s3_path, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_host_name", OPT_S3_HOST_NAME, "Host name to S3 provider",
(char**) &opt_s3_host_name, (char**) &opt_s3_host_name, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"S3 protocol version", OPT_S3_PROTOCOL_VERSION,
"Protocol used to communication with S3. One of "
"\"Auto\", \"Amazon\" or \"Original\".",
(uchar*) &opt_s3_protocol_version,
(uchar*) &opt_s3_protocol_version, &s3_protocol_typelib,
GET_ENUM, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif // WITH_S3_STORAGE_ENGINE
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
typedef struct { typedef struct {
HASH *filehash; HASH *filehash;
xb_rstream_t *stream; xb_rstream *stream;
ds_ctxt_t *ds_ctxt; ds_ctxt_t *ds_ctxt;
pthread_mutex_t *mutex; pthread_mutex_t *mutex;
} extract_ctxt_t; } extract_ctxt_t;
...@@ -87,6 +141,15 @@ typedef struct { ...@@ -87,6 +141,15 @@ typedef struct {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} file_entry_t; } file_entry_t;
void * operator new(decltype(sizeof(0)) n) noexcept(false)
{
return my_malloc(n, MYF(MY_FAE));
}
void operator delete(void * p) throw()
{
my_free(p);
}
static int get_options(int *argc, char ***argv); static int get_options(int *argc, char ***argv);
static int mode_create(int argc, char **argv); static int mode_create(int argc, char **argv);
static int mode_extract(int n_threads, int argc, char **argv); static int mode_extract(int n_threads, int argc, char **argv);
...@@ -486,7 +549,7 @@ int ...@@ -486,7 +549,7 @@ int
mode_extract(int n_threads, int argc __attribute__((unused)), mode_extract(int n_threads, int argc __attribute__((unused)),
char **argv __attribute__((unused))) char **argv __attribute__((unused)))
{ {
xb_rstream_t *stream = NULL; std::unique_ptr<xb_rstream> stream;
HASH filehash; HASH filehash;
ds_ctxt_t *ds_ctxt = NULL; ds_ctxt_t *ds_ctxt = NULL;
extract_ctxt_t ctxt; extract_ctxt_t ctxt;
...@@ -516,16 +579,33 @@ mode_extract(int n_threads, int argc __attribute__((unused)), ...@@ -516,16 +579,33 @@ mode_extract(int n_threads, int argc __attribute__((unused)),
goto exit; goto exit;
} }
#ifdef WITH_S3_STORAGE_ENGINE
stream = xb_stream_read_new(); if (opt_s3_access_key)
if (stream == NULL) { s3_init_library();
#endif // WITH_S3_STORAGE_ENGINE
stream =
#ifdef WITH_S3_STORAGE_ENGINE
opt_s3_access_key ?
xb_stream_s3_new(
opt_s3_access_key,
opt_s3_secret_key,
opt_s3_region,
opt_s3_host_name,
opt_s3_bucket,
opt_s3_path,
opt_s3_protocol_version) :
#endif // WITH_S3_STORAGE_ENGINE
xb_stream_stdin_new();
if (!stream) {
msg("%s: xb_stream_read_new() failed.", my_progname); msg("%s: xb_stream_read_new() failed.", my_progname);
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
ret = 1; ret = 1;
goto exit; goto exit;
} }
ctxt.stream = stream; ctxt.stream = stream.get();
ctxt.filehash = &filehash; ctxt.filehash = &filehash;
ctxt.ds_ctxt = ds_ctxt; ctxt.ds_ctxt = ds_ctxt;
ctxt.mutex = &mutex; ctxt.mutex = &mutex;
...@@ -557,7 +637,11 @@ mode_extract(int n_threads, int argc __attribute__((unused)), ...@@ -557,7 +637,11 @@ mode_extract(int n_threads, int argc __attribute__((unused)),
if (ds_ctxt != NULL) { if (ds_ctxt != NULL) {
ds_destroy(ds_ctxt); ds_destroy(ds_ctxt);
} }
xb_stream_read_done(stream);
#ifdef WITH_S3_STORAGE_ENGINE
if (opt_s3_access_key)
s3_deinit_library();
#endif // WITH_S3_STORAGE_ENGINE
return ret; return ret;
} }
...@@ -22,6 +22,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -22,6 +22,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
#define XBSTREAM_H #define XBSTREAM_H
#include <my_base.h> #include <my_base.h>
#include <memory>
/* Magic value in a chunk header */ /* Magic value in a chunk header */
#define XB_STREAM_CHUNK_MAGIC "XBSTCK01" #define XB_STREAM_CHUNK_MAGIC "XBSTCK01"
...@@ -79,7 +80,12 @@ typedef enum { ...@@ -79,7 +80,12 @@ typedef enum {
XB_CHUNK_TYPE_EOF = 'E' XB_CHUNK_TYPE_EOF = 'E'
} xb_chunk_type_t; } xb_chunk_type_t;
typedef struct xb_rstream_struct xb_rstream_t; class xb_rstream {
public:
virtual size_t read(uchar *buf, size_t len) = 0;
virtual my_off_t offset() = 0;
virtual ~xb_rstream() {};
};
typedef struct { typedef struct {
uchar flags; uchar flags;
...@@ -94,13 +100,20 @@ typedef struct { ...@@ -94,13 +100,20 @@ typedef struct {
size_t buflen; size_t buflen;
} xb_rstream_chunk_t; } xb_rstream_chunk_t;
xb_rstream_t *xb_stream_read_new(void); std::unique_ptr<xb_rstream> xb_stream_stdin_new(void);
#ifdef WITH_S3_STORAGE_ENGINE
xb_rstream_result_t xb_stream_read_chunk(xb_rstream_t *stream, std::unique_ptr<xb_rstream> xb_stream_s3_new(
const char *access_key,
const char *secret_key,
const char *region,
const char *host_name,
const char *bucket,
const char *path,
ulong protocol_version);
#endif // WITH_S3_STORAGE_ENGINE
xb_rstream_result_t xb_stream_read_chunk(xb_rstream *stream,
xb_rstream_chunk_t *chunk); xb_rstream_chunk_t *chunk);
int xb_stream_read_done(xb_rstream_t *stream);
xb_rstream_result_t xb_stream_validate_checksum(xb_rstream_chunk_t *chunk); xb_rstream_result_t xb_stream_validate_checksum(xb_rstream_chunk_t *chunk);
#endif #endif
...@@ -24,6 +24,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -24,6 +24,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
#include "common.h" #include "common.h"
#include "xbstream.h" #include "xbstream.h"
#include "crc_glue.h" #include "crc_glue.h"
#ifdef WITH_S3_STORAGE_ENGINE
#include "maria_def.h"
#include "s3_func.h"
#include <string>
#endif // WITH_S3_STORAGE_ENGINE
/* Allocate 1 MB for the payload buffer initially */ /* Allocate 1 MB for the payload buffer initially */
#define INIT_BUFFER_LEN (1024 * 1024) #define INIT_BUFFER_LEN (1024 * 1024)
...@@ -32,28 +37,124 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -32,28 +37,124 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
#define MY_OFF_T_MAX (~(my_off_t)0UL) #define MY_OFF_T_MAX (~(my_off_t)0UL)
#endif #endif
struct xb_rstream_struct { class xb_stdin_stream : public xb_rstream {
my_off_t offset; my_off_t m_offset;
File fd; File m_fd;
public:
xb_stdin_stream() : m_offset(0), m_fd(my_fileno(stdin)) {
#ifdef __WIN__
setmode(fileno(stdin), _O_BINARY);
#endif //__WIN__
}
~xb_stdin_stream() override {}
size_t read(uchar *buf, size_t len) override {
size_t result = xb_read_full(m_fd, (uchar *)buf, len);
m_offset += result;
return result;
}
my_off_t offset() override {
return m_offset;
}
}; };
xb_rstream_t * std::unique_ptr<xb_rstream> xb_stream_stdin_new(void) {
xb_stream_read_new(void) return std::unique_ptr<xb_rstream>(new xb_stdin_stream());
{ }
xb_rstream_t *stream;
stream = (xb_rstream_t *) my_malloc(sizeof(xb_rstream_t), MYF(MY_FAE)); #ifdef WITH_S3_STORAGE_ENGINE
class xb_s3_rstream : public xb_rstream {
ms3_st *m_client;
const char *m_bucket;
const char *m_path;
my_off_t m_offset;
uint64_t m_seq_num;
S3_BLOCK m_block;
my_off_t m_block_offset;
public:
xb_s3_rstream(ms3_st *client, const char *bucket, const char *path) :
m_client(client), m_bucket(bucket), m_path(path), m_offset(0),
m_seq_num(0), m_block{nullptr, nullptr, 0}, m_block_offset(0) {}
~xb_s3_rstream() override {
if (m_block.alloc_ptr)
s3_free(&m_block);
if (m_client)
ms3_deinit(m_client);
}
#ifdef __WIN__ size_t read(uchar *dst, size_t len) override;
setmode(fileno(stdin), _O_BINARY);
#endif my_off_t offset() override {
return m_offset;
}
stream->fd = my_fileno(stdin); };
stream->offset = 0;
size_t xb_s3_rstream::read(uchar *dst, size_t len) {
size_t copied = 0;
while (len) {
if (m_block_offset + len > m_block.length) {
size_t tail_size = m_block.length - m_block_offset;
if (tail_size) {
memcpy(dst, m_block.str + m_block_offset, tail_size);
dst += tail_size;
len -= tail_size;
m_offset += tail_size;
m_block_offset += tail_size;
copied += tail_size;
}
if (m_block.alloc_ptr)
s3_free(&m_block);
m_block_offset = 0;
std::string block_path(m_path);
block_path.append("/").append(std::to_string(m_seq_num++));
ms3_status_st status;
if (ms3_status(m_client, m_bucket, block_path.c_str(), &status))
return copied;
if (s3_get_object(m_client, m_bucket, block_path.c_str(), &m_block,
false, 1))
return copied;
continue;
}
return stream; memcpy(dst, m_block.str + m_block_offset, len);
m_offset += len;
m_block_offset += len;
copied += len;
break;
}
return copied;
} }
std::unique_ptr<xb_rstream> xb_stream_s3_new(
const char *access_key,
const char *secret_key,
const char *region,
const char *host_name,
const char *bucket,
const char *path,
ulong protocol_version) {
S3_INFO info;
info.protocol_version= (uint8_t) protocol_version;
lex_string_set(&info.host_name, host_name);
lex_string_set(&info.access_key, access_key);
lex_string_set(&info.secret_key, secret_key);
lex_string_set(&info.region, region);
lex_string_set(&info.bucket, bucket);
ms3_st *s3_client;
if (!(s3_client= s3_open_connection(&info)))
die("Can't open connection to S3, error: %d %s", errno, ms3_error(errno));
return std::unique_ptr<xb_rstream>(
new xb_s3_rstream(s3_client, bucket, path));
}
#endif // WITH_S3_STORAGE_ENGINE
static inline static inline
xb_chunk_type_t xb_chunk_type_t
validate_chunk_type(uchar code) validate_chunk_type(uchar code)
...@@ -84,34 +185,34 @@ xb_stream_validate_checksum(xb_rstream_chunk_t *chunk) ...@@ -84,34 +185,34 @@ xb_stream_validate_checksum(xb_rstream_chunk_t *chunk)
return XB_STREAM_READ_CHUNK; return XB_STREAM_READ_CHUNK;
} }
#define F_READ(buf,len) \ #define F_READ(buf,len) \
do { \ do { \
if (xb_read_full(fd, (uchar *)buf, len) < len) { \ if (stream->read((uchar *)buf, len) < len) { \
msg("xb_stream_read_chunk(): my_read() failed."); \ msg("xb_stream_read_chunk(): stream->read() failed."); \
goto err; \ goto err; \
} \ } \
} while (0) } while (0)
xb_rstream_result_t xb_rstream_result_t
xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk) xb_stream_read_chunk(xb_rstream *stream, xb_rstream_chunk_t *chunk)
{ {
uchar tmpbuf[16]; uchar tmpbuf[16];
uchar *ptr = tmpbuf; uchar *ptr = tmpbuf;
uint pathlen; uint pathlen;
size_t tbytes; size_t tbytes;
ulonglong ullval; ulonglong ullval;
File fd = stream->fd; my_off_t offset = stream->offset();
xb_ad(sizeof(tmpbuf) >= CHUNK_HEADER_CONSTANT_LEN); xb_ad(sizeof(tmpbuf) >= CHUNK_HEADER_CONSTANT_LEN);
/* This is the only place where we expect EOF, so read with /* This is the only place where we expect EOF, so read with
xb_read_full() rather than F_READ() */ xb_read_full() rather than F_READ() */
tbytes = xb_read_full(fd, ptr, CHUNK_HEADER_CONSTANT_LEN); tbytes = stream->read(ptr, CHUNK_HEADER_CONSTANT_LEN);
if (tbytes == 0) { if (tbytes == 0) {
return XB_STREAM_READ_EOF; return XB_STREAM_READ_EOF;
} else if (tbytes < CHUNK_HEADER_CONSTANT_LEN) { } else if (tbytes < CHUNK_HEADER_CONSTANT_LEN) {
msg("xb_stream_read_chunk(): unexpected end of stream at " msg("xb_stream_read_chunk(): unexpected end of stream at "
"offset 0x%llx.", stream->offset); "offset 0x%llx.", offset);
goto err; goto err;
} }
...@@ -120,15 +221,15 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk) ...@@ -120,15 +221,15 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk)
/* Chunk magic value */ /* Chunk magic value */
if (memcmp(tmpbuf, XB_STREAM_CHUNK_MAGIC, 8)) { if (memcmp(tmpbuf, XB_STREAM_CHUNK_MAGIC, 8)) {
msg("xb_stream_read_chunk(): wrong chunk magic at offset " msg("xb_stream_read_chunk(): wrong chunk magic at offset "
"0x%llx.", (ulonglong) stream->offset); "0x%llx.", (ulonglong) offset);
goto err; goto err;
} }
ptr += 8; ptr += 8;
stream->offset += 8; offset += 8;
/* Chunk flags */ /* Chunk flags */
chunk->flags = *ptr++; chunk->flags = *ptr++;
stream->offset++; offset++;
/* Chunk type, ignore unknown ones if ignorable flag is set */ /* Chunk type, ignore unknown ones if ignorable flag is set */
chunk->type = validate_chunk_type(*ptr); chunk->type = validate_chunk_type(*ptr);
...@@ -136,28 +237,28 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk) ...@@ -136,28 +237,28 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk)
!(chunk->flags & XB_STREAM_FLAG_IGNORABLE)) { !(chunk->flags & XB_STREAM_FLAG_IGNORABLE)) {
msg("xb_stream_read_chunk(): unknown chunk type 0x%lu at " msg("xb_stream_read_chunk(): unknown chunk type 0x%lu at "
"offset 0x%llx.", (ulong) *ptr, "offset 0x%llx.", (ulong) *ptr,
(ulonglong) stream->offset); (ulonglong) offset);
goto err; goto err;
} }
ptr++; ptr++;
stream->offset++; offset++;
/* Path length */ /* Path length */
pathlen = uint4korr(ptr); pathlen = uint4korr(ptr);
if (pathlen >= FN_REFLEN) { if (pathlen >= FN_REFLEN) {
msg("xb_stream_read_chunk(): path length (%lu) is too large at " msg("xb_stream_read_chunk(): path length (%lu) is too large at "
"offset 0x%llx.", (ulong) pathlen, stream->offset); "offset 0x%llx.", (ulong) pathlen, offset);
goto err; goto err;
} }
chunk->pathlen = pathlen; chunk->pathlen = pathlen;
stream->offset +=4; offset +=4;
xb_ad((ptr + 4 - tmpbuf) == CHUNK_HEADER_CONSTANT_LEN); xb_ad((ptr + 4 - tmpbuf) == CHUNK_HEADER_CONSTANT_LEN);
/* Path */ /* Path */
if (chunk->pathlen > 0) { if (chunk->pathlen > 0) {
F_READ((uchar *) chunk->path, pathlen); F_READ((uchar *) chunk->path, pathlen);
stream->offset += pathlen; offset += pathlen;
} }
chunk->path[pathlen] = '\0'; chunk->path[pathlen] = '\0';
...@@ -170,23 +271,23 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk) ...@@ -170,23 +271,23 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk)
ullval = uint8korr(tmpbuf); ullval = uint8korr(tmpbuf);
if (ullval > (ulonglong) SIZE_T_MAX) { if (ullval > (ulonglong) SIZE_T_MAX) {
msg("xb_stream_read_chunk(): chunk length is too large at " msg("xb_stream_read_chunk(): chunk length is too large at "
"offset 0x%llx: 0x%llx.", (ulonglong) stream->offset, "offset 0x%llx: 0x%llx.", (ulonglong) offset,
ullval); ullval);
goto err; goto err;
} }
chunk->length = (size_t) ullval; chunk->length = (size_t) ullval;
stream->offset += 8; offset += 8;
/* Payload offset */ /* Payload offset */
ullval = uint8korr(tmpbuf + 8); ullval = uint8korr(tmpbuf + 8);
if (ullval > (ulonglong) MY_OFF_T_MAX) { if (ullval > (ulonglong) MY_OFF_T_MAX) {
msg("xb_stream_read_chunk(): chunk offset is too large at " msg("xb_stream_read_chunk(): chunk offset is too large at "
"offset 0x%llx: 0x%llx.", (ulonglong) stream->offset, "offset 0x%llx: 0x%llx.", (ulonglong) offset,
ullval); ullval);
goto err; goto err;
} }
chunk->offset = (my_off_t) ullval; chunk->offset = (my_off_t) ullval;
stream->offset += 8; offset += 8;
/* Reallocate the buffer if needed */ /* Reallocate the buffer if needed */
if (chunk->length > chunk->buflen) { if (chunk->length > chunk->buflen) {
...@@ -203,26 +304,18 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk) ...@@ -203,26 +304,18 @@ xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk)
/* Checksum */ /* Checksum */
F_READ(tmpbuf, 4); F_READ(tmpbuf, 4);
chunk->checksum = uint4korr(tmpbuf); chunk->checksum = uint4korr(tmpbuf);
chunk->checksum_offset = stream->offset; chunk->checksum_offset = offset;
/* Payload */ /* Payload */
if (chunk->length > 0) { if (chunk->length > 0) {
F_READ(chunk->data, chunk->length); F_READ(chunk->data, chunk->length);
stream->offset += chunk->length; offset += chunk->length;
} }
stream->offset += 4; offset += 4;
return XB_STREAM_READ_CHUNK; return XB_STREAM_READ_CHUNK;
err: err:
return XB_STREAM_READ_ERROR; return XB_STREAM_READ_ERROR;
} }
int
xb_stream_read_done(xb_rstream_t *stream)
{
my_free(stream);
return 0;
}
...@@ -102,6 +102,12 @@ Street, Fifth Floor, Boston, MA 02110-1335 USA ...@@ -102,6 +102,12 @@ Street, Fifth Floor, Boston, MA 02110-1335 USA
#include <srv0srv.h> #include <srv0srv.h>
#include <crc_glue.h> #include <crc_glue.h>
#include <log.h> #include <log.h>
#ifdef WITH_S3_STORAGE_ENGINE
#include "ds_s3.h"
#undef LSN_MAX
#include "maria_def.h"
#include "s3_func.h"
#endif //WITH_S3_STORAGE_ENGINE
int sys_var_init(); int sys_var_init();
...@@ -204,6 +210,17 @@ my_bool opt_ssl_verify_server_cert; ...@@ -204,6 +210,17 @@ my_bool opt_ssl_verify_server_cert;
my_bool opt_extended_validation; my_bool opt_extended_validation;
my_bool opt_encrypted_backup; my_bool opt_encrypted_backup;
#ifdef WITH_S3_STORAGE_ENGINE
static const char *opt_s3_access_key;
static const char *opt_s3_secret_key;
static const char *opt_s3_region = "eu-north-1";
static const char *opt_s3_host_name = "s3.amazonaws.com";
static const char *opt_s3_bucket = "MariaDB";
static const char *opt_s3_path = "/backup.xbstream";
static ulong opt_s3_block_size;
static ulong opt_s3_protocol_version;
#endif // WITH_S3_STORAGE_ENGINE
/* === metadata of backup === */ /* === metadata of backup === */
#define XTRABACKUP_METADATA_FILENAME "xtrabackup_checkpoints" #define XTRABACKUP_METADATA_FILENAME "xtrabackup_checkpoints"
char metadata_type[30] = ""; /*[full-backuped|log-applied|incremental]*/ char metadata_type[30] = ""; /*[full-backuped|log-applied|incremental]*/
...@@ -733,8 +750,8 @@ typedef struct { ...@@ -733,8 +750,8 @@ typedef struct {
enum options_xtrabackup enum options_xtrabackup
{ {
OPT_XTRA_TARGET_DIR = 1000, /* make sure it is larger /* make sure it is larger than OPT_MAX_CLIENT_OPTION */
than OPT_MAX_CLIENT_OPTION */ OPT_XTRA_TARGET_DIR = OPT_MAX_CLIENT_OPTION + 1000,
OPT_XTRA_BACKUP, OPT_XTRA_BACKUP,
OPT_XTRA_PREPARE, OPT_XTRA_PREPARE,
OPT_XTRA_EXPORT, OPT_XTRA_EXPORT,
...@@ -827,7 +844,17 @@ enum options_xtrabackup ...@@ -827,7 +844,17 @@ enum options_xtrabackup
OPT_LOCK_DDL_PER_TABLE, OPT_LOCK_DDL_PER_TABLE,
OPT_ROCKSDB_DATADIR, OPT_ROCKSDB_DATADIR,
OPT_BACKUP_ROCKSDB, OPT_BACKUP_ROCKSDB,
OPT_XTRA_CHECK_PRIVILEGES OPT_XTRA_CHECK_PRIVILEGES,
#ifdef WITH_S3_STORAGE_ENGINE
OPT_S3_ACCESS_KEY,
OPT_S3_SECRET_KEY,
OPT_S3_REGION,
OPT_S3_HOST_NAME,
OPT_S3_BUCKET,
OPT_S3_PATH,
OPT_S3_BLOCK_SIZE,
OPT_S3_PROTOCOL_VERSION
#endif //WITH_S3_STORAGE_ENGINE
}; };
...@@ -1393,6 +1420,37 @@ struct my_option xb_server_options[] = ...@@ -1393,6 +1420,37 @@ struct my_option xb_server_options[] =
&opt_check_privileges, &opt_check_privileges, &opt_check_privileges, &opt_check_privileges,
0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
#ifdef WITH_S3_STORAGE_ENGINE
{"s3_access_key", OPT_S3_ACCESS_KEY, "AWS access key ID",
(char**) &opt_s3_access_key, (char**) &opt_s3_access_key, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_region", OPT_S3_REGION, "AWS region",
(char**) &opt_s3_region, (char**) &opt_s3_region, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_secret_key", OPT_S3_SECRET_KEY, "AWS secret access key ID",
(char**) &opt_s3_secret_key, (char**) &opt_s3_secret_key, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_bucket", OPT_S3_BUCKET, "AWS prefix for backup",
(char**) &opt_s3_bucket, (char**) &opt_s3_bucket, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_path", OPT_S3_PATH, "AWS path for backup",
(char**) &opt_s3_path, (char**) &opt_s3_path, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_host_name", OPT_S3_HOST_NAME, "Host name to S3 provider",
(char**) &opt_s3_host_name, (char**) &opt_s3_host_name, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"s3_block_size", OPT_S3_BLOCK_SIZE,
"S3 block size.",
(G_PTR*)&opt_s3_block_size, (G_PTR*)&opt_s3_block_size,
0, GET_ULONG, REQUIRED_ARG, 10*1024*1024, 1024*1024, 100*1024*1024, 0, 0, 0},
{"S3 protocol version", OPT_S3_PROTOCOL_VERSION,
"Protocol used to communication with S3. One of "
"\"Auto\", \"Amazon\" or \"Original\".",
(uchar*) &opt_s3_protocol_version,
(uchar*) &opt_s3_protocol_version, &s3_protocol_typelib,
GET_ENUM, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif // WITH_S3_STORAGE_ENGINE
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
...@@ -2931,7 +2989,29 @@ xtrabackup_init_datasinks(void) ...@@ -2931,7 +2989,29 @@ xtrabackup_init_datasinks(void)
/* All streaming goes to stdout */ /* All streaming goes to stdout */
ds_data = ds_meta = ds_redo = ds_create(xtrabackup_target_dir, ds_data = ds_meta = ds_redo = ds_create(xtrabackup_target_dir,
DS_TYPE_STDOUT); DS_TYPE_STDOUT);
} else { }
#ifdef WITH_S3_STORAGE_ENGINE
else if (opt_s3_access_key) {
xtrabackup_stream = TRUE;
xtrabackup_stream_fmt = XB_STREAM_FMT_XBSTREAM;
ds_s3_args s3_args = {
opt_s3_access_key,
opt_s3_secret_key,
opt_s3_region,
opt_s3_host_name,
opt_s3_bucket,
opt_s3_path,
opt_s3_protocol_version
};
ds_data = ds_create(&s3_args, DS_TYPE_S3);
xtrabackup_add_datasink(ds_data);
ds_ctxt_t *ds = ds_create(xtrabackup_target_dir, DS_TYPE_BUFFER);
ds_buffer_set_size(ds, opt_s3_block_size);
ds_set_pipe(ds, ds_data);
ds_data = ds;
}
#endif // WITH_S3_STORAGE_ENGINE
else {
/* Local filesystem */ /* Local filesystem */
ds_data = ds_meta = ds_redo = ds_create(xtrabackup_target_dir, ds_data = ds_meta = ds_redo = ds_create(xtrabackup_target_dir,
DS_TYPE_LOCAL); DS_TYPE_LOCAL);
......
...@@ -4920,119 +4920,3 @@ ulong STDCALL mysql_net_field_length(uchar **packet) ...@@ -4920,119 +4920,3 @@ ulong STDCALL mysql_net_field_length(uchar **packet)
{ {
return net_field_length(packet); return net_field_length(packet);
} }
/********************************************************************
Dummy functions to avoid linking with libmarias3 / libcurl
*********************************************************************/
#if defined(WITH_S3_STORAGE_ENGINE) || !defined(FIX_BEFORE_RELESE)
C_MODE_START
#include <stdint.h>
struct ms3_st;
typedef struct ms3_st ms3_st;
struct ms3_list_st;
typedef struct ms3_list_st ms3_list_st;
struct ms3_status_st;
typedef struct ms3_status_st ms3_status_st;
enum ms3_set_option_t
{
SOME_OPTIONS
};
typedef enum ms3_set_option_t ms3_set_option_t;
typedef void *(*ms3_malloc_callback)(size_t size);
typedef void (*ms3_free_callback)(void *ptr);
typedef void *(*ms3_realloc_callback)(void *ptr, size_t size);
typedef char *(*ms3_strdup_callback)(const char *str);
typedef void *(*ms3_calloc_callback)(size_t nmemb, size_t size);
uint8_t ms3_library_init_malloc(ms3_malloc_callback m,
ms3_free_callback f, ms3_realloc_callback r,
ms3_strdup_callback s, ms3_calloc_callback c)
{
return 1;
}
void ms3_library_deinit(void)
{
}
ms3_st *ms3_init(const char *s3key, const char *s3secret,
const char *region,
const char *base_domain)
{
return 0;
}
uint8_t ms3_set_option(ms3_st *ms3, ms3_set_option_t option, void *value)
{
return 0;
}
void ms3_deinit(ms3_st *ms3)
{}
const char *ms3_server_error(ms3_st *ms3)
{
return 0;
}
const char *ms3_error(uint8_t errcode)
{
return 0;
}
uint8_t ms3_list(ms3_st *ms3, const char *bucket, const char *prefix,
ms3_list_st **list)
{
return 0;
}
uint8_t ms3_list_dir(ms3_st *ms3, const char *bucket, const char *prefix,
ms3_list_st **list)
{
return 0;
}
void ms3_list_free(ms3_list_st *list)
{}
uint8_t ms3_put(ms3_st *ms3, const char *bucket, const char *key,
const uint8_t *data, size_t length)
{
return 1;
}
uint8_t ms3_get(ms3_st *ms3, const char *bucket, const char *key,
uint8_t **data, size_t *length)
{
return 1;
}
void ms3_free(uint8_t *data)
{}
uint8_t ms3_delete(ms3_st *ms3, const char *bucket, const char *key)
{
return 1;
}
uint8_t ms3_status(ms3_st *ms3, const char *bucket, const char *key,
ms3_status_st *status)
{
return 1;
}
uint8_t ms3_move(ms3_st *ms3, const char *source_bucket, const char *source_key,
const char *dest_bucket, const char *dest_key)
{
return 1;
}
void ms3_debug()
{
}
C_MODE_END
#endif /* WITH_S3_STORAGE_ENGINE */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment