Commit b817afaa authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-28689, MDEV-28690: Remove ctrl_mutex

This reverts the revert 4f62dfe6
and fixes the hang that was introduced when ctrl_mutex was removed.

The test mariabackup.compress_qpress covers this code, but the
test is skipped if a stand-alone qpress executable is not available.
It is not available in many software repositories, possibly because
the code base has not been updated since 2010.

This was tested with an executable that was compile from the source
code at http://www.quicklz.com/qpress-11-source.zip (after adding
a missing #include <unistd.h> for the definition of isatty()).

Compared to the grandparent commit (before the revert), the changes
are as follows:

comp_thread_ctxt_t::done_cond: A separate condition for completed
compression, signaling that thd->to_len has been updated.

compress_write(): Replace some threads[i] with thd.
Reset thd->to_len = 0 after consuming the compressed data.

compress_worker_thread_func(): After consuming the uncompressed
data, set thd->data_avail = FALSE. After compressing, signal
thd->done_cond.
parent 4f62dfe6
/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
Copyright (c) 2022, MariaDB Corporation.
Compressing datasink implementation for XtraBackup.
......@@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
typedef struct {
pthread_t id;
uint num;
pthread_mutex_t ctrl_mutex;
pthread_cond_t ctrl_cond;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
my_bool started;
pthread_cond_t done_cond;
my_bool data_avail;
my_bool cancelled;
const char *from;
......@@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->ctrl_mutex);
pthread_mutex_lock(&thd->data_mutex);
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
pthread_mutex_lock(&thd->data_mutex);
thd->data_avail = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
......@@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
while (thd->data_avail == TRUE) {
pthread_cond_wait(&thd->data_cond,
while (!thd->to_len) {
pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
}
xb_a(threads[i].to_len > 0);
bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
write_uint64_le(dest_file,
comp_file->bytes_processed);
comp_file->bytes_processed += threads[i].from_len;
comp_file->bytes_processed += thd->from_len;
if (!fail) {
fail = write_uint32_le(dest_file, threads[i].adler) ||
ds_write(dest_file, threads[i].to,
threads[i].to_len);
fail = write_uint32_le(dest_file, thd->adler) ||
ds_write(dest_file, thd->to,
thd->to_len);
}
pthread_mutex_unlock(&threads[i].data_mutex);
thd->to_len = 0;
pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
msg("compress: write to the destination stream "
"failed.");
return 1;
}
pthread_mutex_unlock(&threads[i].ctrl_mutex);
}
}
......@@ -329,6 +325,24 @@ write_uint64_le(ds_file_t *file, ulonglong n)
return ds_write(file, tmp, sizeof(tmp));
}
static
void
destroy_worker_thread(comp_thread_ctxt_t *thd)
{
pthread_mutex_lock(&thd->data_mutex);
thd->cancelled = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
pthread_join(thd->id, NULL);
pthread_cond_destroy(&thd->data_cond);
pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex);
my_free(thd->to);
}
static
comp_thread_ctxt_t *
create_worker_threads(uint n)
......@@ -337,60 +351,36 @@ create_worker_threads(uint n)
uint i;
threads = (comp_thread_ctxt_t *)
my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE));
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
thd->num = i + 1;
thd->started = FALSE;
thd->cancelled = FALSE;
thd->data_avail = FALSE;
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
MY_QLZ_COMPRESS_OVERHEAD,
MYF(MY_FAE));
/* Initialize the control mutex and condition var */
if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
pthread_cond_init(&thd->ctrl_cond, NULL)) {
goto err;
}
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
pthread_cond_init(&thd->data_cond, NULL)) {
pthread_cond_init(&thd->data_cond, NULL) ||
pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
pthread_mutex_lock(&thd->ctrl_mutex);
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
"errno = %d", errno);
pthread_mutex_unlock(&thd->ctrl_mutex);
goto err;
}
}
/* Wait for the threads to start */
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
while (thd->started == FALSE)
pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
pthread_mutex_unlock(&thd->ctrl_mutex);
}
return threads;
err:
while (i > 0) {
comp_thread_ctxt_t *thd;
i--;
thd = threads + i;
pthread_mutex_unlock(&thd->ctrl_mutex);
for (; i; i--) {
destroy_worker_thread(threads + i);
}
my_free(threads);
......@@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
uint i;
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
threads[i].cancelled = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
pthread_join(thd->id, NULL);
pthread_cond_destroy(&thd->data_cond);
pthread_mutex_destroy(&thd->data_mutex);
pthread_cond_destroy(&thd->ctrl_cond);
pthread_mutex_destroy(&thd->ctrl_mutex);
my_free(thd->to);
destroy_worker_thread(threads + i);
}
my_free(threads);
......@@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg)
{
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
pthread_mutex_lock(&thd->ctrl_mutex);
pthread_mutex_lock(&thd->data_mutex);
thd->started = TRUE;
pthread_cond_signal(&thd->ctrl_cond);
pthread_mutex_unlock(&thd->ctrl_mutex);
while (1) {
thd->data_avail = FALSE;
pthread_cond_signal(&thd->data_cond);
while (!thd->data_avail && !thd->cancelled) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}
if (thd->cancelled)
break;
thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);
......@@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg)
thd->adler = adler32(0x00000001, (uchar *) thd->to,
(uInt)thd->to_len);
pthread_cond_signal(&thd->done_cond);
}
pthread_mutex_unlock(&thd->data_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment