Commit d9f910bf authored by Alexey Yurchenko's avatar Alexey Yurchenko Committed by Julius Goryavsky

MDEV-31809 Make SST script interface read-write

Add two-way communication between parent and child in wsp::proc class.
Refactor wsp::thd class to call my_thread_init() conditionally.
Signed-off-by: default avatarJulius Goryavsky <julius.goryavsky@mariadb.com>
parent 203d337a
......@@ -107,8 +107,7 @@ static void* wsrep_sst_donor_monitor_thread(void *arg __attribute__((unused)))
WSREP_INFO("Donor monitor thread started to monitor");
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF
wsp::thd thd;
while (!sst_donor_completed)
{
......@@ -144,8 +143,7 @@ static void* wsrep_sst_joiner_monitor_thread(void *arg __attribute__((unused)))
WSREP_INFO("Joiner monitor thread started to monitor");
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF
wsp::thd thd;
while (!sst_joiner_completed)
{
......@@ -642,9 +640,9 @@ static void* sst_joiner_thread (void* a)
wsp::process proc (arg->cmd, "r", arg->env);
if (proc.pipe() && !proc.error())
if (proc.from() && !proc.error())
{
const char* tmp= my_fgets (out, out_len, proc.pipe());
const char* tmp= my_fgets (out, out_len, proc.from());
if (!tmp || strlen(tmp) < (magic_len + 2) ||
strncasecmp (tmp, magic, magic_len))
......@@ -698,7 +696,7 @@ static void* sst_joiner_thread (void* a)
err= EINVAL;
wait_signal:
tmp= my_fgets (out, out_len, proc.pipe());
tmp= my_fgets (out, out_len, proc.from());
if (tmp)
{
......@@ -1534,7 +1532,7 @@ static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
{
wsp::process proc (cmd_str, "r", env);
if (NULL != proc.pipe())
if (NULL != proc.from())
{
proc.wait();
}
......@@ -1847,18 +1845,24 @@ static void* sst_donor_thread (void* a)
// We turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF
// We also set this SST thread THD as system thread
wsp::thd thd(FALSE, true);
wsp::thd thd(true, true);
wsp::process proc(arg->cmd, "r", arg->env);
err= -proc.error();
if (proc.to() && !err)
{
// Close the pipe, so that the SST process gets an EOF
proc.close_to();
}
/* Inform server about SST script startup and release TO isolation */
mysql_mutex_lock (&arg->lock);
arg->err= -err;
mysql_cond_signal (&arg->cond);
mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
if (proc.pipe() && !err)
if (proc.from() && !err)
{
long long total= 0;
long long complete= 0;
......@@ -1866,7 +1870,7 @@ static void* sst_donor_thread (void* a)
long long total_prev= 0;
wait_signal:
out= my_fgets (out_buf, out_len, proc.pipe());
out= my_fgets (out_buf, out_len, proc.from());
if (out)
{
......
......@@ -180,8 +180,8 @@ env::append(const char* val)
}
#define PIPE_READ 0
#define PIPE_WRITE 1
#define READ_END 0
#define WRITE_END 1
#define STDIN_FD 0
#define STDOUT_FD 1
......@@ -189,8 +189,88 @@ env::append(const char* val)
# define POSIX_SPAWN_USEVFORK 0
#endif
static int
add_file_actions(posix_spawn_file_actions_t *fact,
int close_fd, int dup_fd, int unused_fd)
{
// close child's stdout|stdin fd
int err= posix_spawn_file_actions_addclose(fact, close_fd);
if (err)
{
WSREP_ERROR ("posix_spawn_file_actions_addclose() failed: %d (%s)",
err, strerror(err));
return err;
}
// substitute our pipe descriptor in place of the closed one
err= posix_spawn_file_actions_adddup2(fact, dup_fd, close_fd);
if (err)
{
WSREP_ERROR ("posix_spawn_file_actions_addup2() failed: %d (%s)",
err, strerror(err));
return err;
}
// close unused end of the pipe
err= posix_spawn_file_actions_addclose(fact, unused_fd);
if (err)
{
WSREP_ERROR ("posix_spawn_file_actions_addclose(2) failed: %d (%s)",
err, strerror(err));
return err;
}
return 0;
}
void
process::setup_parent_pipe_end(io_direction direction,
int pipe_fds[],
int const pipe_end,
const char* const mode)
{
io_[direction] = fdopen(pipe_fds[pipe_end], mode);
if (io_[direction])
{
pipe_fds[pipe_end]= -1; // skip close on cleanup
}
else
{
err_= errno;
WSREP_ERROR("fdopen() failed on '%s' pipe: %d (%s)",
mode, err_, strerror(err_));
}
}
void
process::close_io(io_direction const direction, bool const warn)
{
if (io_[direction])
{
if (warn)
{
WSREP_WARN("Closing pipe to child process: %s, PID(%ld) "
"which might still be running.", str_, (long)pid_);
}
if (fclose(io_[direction]) == -1)
{
err_= errno;
WSREP_ERROR("fclose(%d) failed: %d (%s)",
direction, err_, strerror(err_));
}
io_[direction]= NULL;
}
}
process::process (const char* cmd, const char* type, char** env)
: str_(cmd ? strdup(cmd) : strdup("")), io_(NULL), err_(EINVAL), pid_(0)
:
str_(cmd ? strdup(cmd) : strdup("")),
io_{ NULL, NULL },
err_(EINVAL),
pid_(0)
{
if (0 == str_)
{
......@@ -205,33 +285,41 @@ process::process (const char* cmd, const char* type, char** env)
return;
}
if (NULL == type || (strcmp (type, "w") && strcmp(type, "r")))
if (NULL == type ||
(strncmp(type, "w", 1) && strncmp(type, "r", 1) && strncmp(type, "rw", 2)))
{
WSREP_ERROR ("type argument should be either \"r\" or \"w\".");
WSREP_ERROR ("type argument should be either \"r\", \"w\" or \"rw\".");
return;
}
if (NULL == env) { env= environ; } // default to global environment
int pipe_fds[2]= { -1, };
if (::pipe(pipe_fds))
{
err_= errno;
WSREP_ERROR ("pipe() failed: %d (%s)", err_, strerror(err_));
return;
}
bool const read_from_child= strchr(type, 'r');
bool const write_to_child= strchr(type, 'w');
// which end of pipe will be returned to parent
int const parent_end (strcmp(type,"w") ? PIPE_READ : PIPE_WRITE);
int const child_end (parent_end == PIPE_READ ? PIPE_WRITE : PIPE_READ);
int const close_fd (parent_end == PIPE_READ ? STDOUT_FD : STDIN_FD);
int read_pipe[2]= { -1, -1 };
int write_pipe[2]= { -1, -1 };
char* const pargv[4]= { strdup("sh"), strdup("-c"), strdup(str_), NULL };
if (!(pargv[0] && pargv[1] && pargv[2]))
{
err_= ENOMEM;
WSREP_ERROR ("Failed to allocate pargv[] array.");
goto cleanup_pipe;
goto cleanup_pargv;
}
if (read_from_child && ::pipe(read_pipe))
{
err_= errno;
WSREP_ERROR ("pipe(read_pipe) failed: %d (%s)", err_, strerror(err_));
goto cleanup_pargv;
}
if (write_to_child && ::pipe(write_pipe))
{
err_= errno;
WSREP_ERROR ("pipe(write_pipe) failed: %d (%s)", err_, strerror(err_));
goto cleanup_pipes;
}
posix_spawnattr_t attr;
......@@ -240,7 +328,7 @@ process::process (const char* cmd, const char* type, char** env)
{
WSREP_ERROR ("posix_spawnattr_init() failed: %d (%s)",
err_, strerror(err_));
goto cleanup_pipe;
goto cleanup_pipes;
}
/* make sure that no signlas are masked in child process */
......@@ -288,23 +376,19 @@ process::process (const char* cmd, const char* type, char** env)
goto cleanup_attr;
}
// close child's stdout|stdin depending on what we returning
err_= posix_spawn_file_actions_addclose (&fact, close_fd);
if (err_)
/* Add file actions for the child (fd substitution, close unused fds) */
if (read_from_child)
{
WSREP_ERROR ("posix_spawn_file_actions_addclose() failed: %d (%s)",
err_, strerror(err_));
goto cleanup_fact;
err_= add_file_actions(&fact, STDOUT_FD, read_pipe[WRITE_END],
read_pipe[READ_END]);
if (err_) goto cleanup_fact;
}
// substitute our pipe descriptor in place of the closed one
err_= posix_spawn_file_actions_adddup2 (&fact,
pipe_fds[child_end], close_fd);
if (err_)
if (write_to_child)
{
WSREP_ERROR ("posix_spawn_file_actions_addup2() failed: %d (%s)",
err_, strerror(err_));
goto cleanup_fact;
err_= add_file_actions(&fact, STDIN_FD, write_pipe[READ_END],
write_pipe[WRITE_END]);
if (err_) goto cleanup_fact;
}
err_= posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, env);
......@@ -316,16 +400,14 @@ process::process (const char* cmd, const char* type, char** env)
goto cleanup_fact;
}
io_= fdopen (pipe_fds[parent_end], type);
if (io_)
if (read_from_child)
{
pipe_fds[parent_end]= -1; // skip close on cleanup
setup_parent_pipe_end(READ, read_pipe, READ_END, "r");
}
else
if (write_to_child)
{
err_= errno;
WSREP_ERROR ("fdopen() failed: %d (%s)", err_, strerror(err_));
setup_parent_pipe_end(WRITE, write_pipe, WRITE_END, "w");
}
cleanup_fact:
......@@ -345,10 +427,13 @@ process::process (const char* cmd, const char* type, char** env)
err, strerror(err));
}
cleanup_pipe:
if (pipe_fds[0] >= 0) close (pipe_fds[0]);
if (pipe_fds[1] >= 0) close (pipe_fds[1]);
cleanup_pipes:
if (read_pipe[0] >= 0) close (read_pipe[0]);
if (read_pipe[1] >= 0) close (read_pipe[1]);
if (write_pipe[0] >= 0) close (write_pipe[0]);
if (write_pipe[1] >= 0) close (write_pipe[1]);
cleanup_pargv:
free (pargv[0]);
free (pargv[1]);
free (pargv[2]);
......@@ -356,20 +441,8 @@ process::process (const char* cmd, const char* type, char** env)
process::~process ()
{
if (io_)
{
assert (pid_);
assert (str_);
WSREP_WARN("Closing pipe to child process: %s, PID(%ld) "
"which might still be running.", str_, (long)pid_);
if (fclose (io_) == -1)
{
err_= errno;
WSREP_ERROR("fclose() failed: %d (%s)", err_, strerror(err_));
}
}
close_io(READ, true);
close_io(WRITE, true);
if (str_) free (const_cast<char*>(str_));
}
......@@ -408,29 +481,42 @@ process::wait ()
}
pid_= 0;
if (io_) fclose (io_);
io_= NULL;
close_io(READ, false);
close_io(WRITE, false);
}
}
else {
assert (NULL == io_);
assert (NULL == io_[READ]);
assert (NULL == io_[WRITE]);
WSREP_ERROR("Command did not run: %s", str_);
}
return err_;
}
thd::thd (my_bool won, bool system_thread) : init(), ptr(new THD(0))
thd::thd (my_bool ini, bool system_thread)
:
init(ini),
ptr(init.err_ ? nullptr : new THD(0))
{
if (ptr)
{
ptr->thread_stack= (char*) &ptr;
ptr->thread_stack= (char*)&ptr;
ptr->real_id= pthread_self();
wsrep_assign_from_threadvars(ptr);
wsrep_store_threadvars(ptr);
ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog
ptr->variables.wsrep_on= won;
ptr->variables.tx_isolation= ISO_READ_COMMITTED;
ptr->variables.sql_log_bin = 0;
ptr->variables.option_bits &= ~OPTION_BIN_LOG; // disable binlog
ptr->variables.option_bits |= OPTION_LOG_OFF; // disable general log
ptr->variables.wsrep_on = false;
ptr->security_context()->skip_grants();
if (system_thread)
{
ptr->system_thread= SYSTEM_THREAD_GENERIC;
}
ptr->security_ctx->master_access= ALL_KNOWN_ACL;
lex_start(ptr);
}
......
......@@ -275,19 +275,30 @@ class process
{
private:
const char* const str_;
FILE* io_;
FILE* io_[2];
int err_;
pid_t pid_;
enum io_direction { READ, WRITE };
void setup_parent_pipe_end(io_direction direction,
int pipe_fds[],
int const pipe_end,
const char* const mode);
void close_io(io_direction direction, bool warn = false);
public:
/*! @arg type is a pointer to a null-terminated string which must contain
either the letter 'r' for reading or the letter 'w' for writing.
/*! @arg type is a pointer to a null-terminated string which must be
either "r", "w" or "rw"
@arg env optional null-terminated vector of environment variables
*/
process (const char* cmd, const char* type, char** env);
~process ();
FILE* pipe () { return io_; }
FILE* from () { return io_[READ]; }
FILE* to () { return io_[WRITE]; }
void close_to() { close_io(WRITE, false); }
int error() { return err_; }
int wait ();
const char* cmd() { return str_; }
......@@ -295,11 +306,19 @@ class process
class thd
{
class thd_init
/* Helper class to init/deinit current thread for use with THD */
class my_init
{
public:
thd_init() { my_thread_init(); }
~thd_init() { my_thread_end(); }
my_bool const init_;
int const err_;
my_init(my_bool const init) :
init_(init),
err_(init_ ? my_thread_init() : 0)
{}
~my_init() {
if (init_ && !err_) my_thread_end();
}
}
init;
......@@ -307,9 +326,17 @@ class thd
thd& operator= (const thd&);
public:
thd(my_bool wsrep_on, bool system_thread=false);
/*
@param[in] init Should be set to true if called in a freshly forked
thread to initialize MySQL-specific thread context
and likewise deinitialize on object destruction.
Should be set to false if the thread already has
initialized the context, but original THD* is not
available.
*/
explicit thd(my_bool init=true, bool system_thread=false);
~thd();
int err() const { return init.err_; }
THD* const ptr;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment