Commit 649b3b46 authored by unknown's avatar unknown

WL#3071 - Maria checkpoint:

a function to store information about transactions into buffers,
is added to the transaction manager, and called by the Checkpoint module.


storage/maria/ma_checkpoint.c:
  "collecting info about transactions" moves to trnman.c
storage/maria/trnman.c:
  a function to store information about the active transactions list
  and committed transactions list, into buffers, for use by the
  Checkpoint module.
  This function needs to know how many trns there are in the committed
  list, so we introduce a counter, trnman_committed_transactions.
  m_string.h is needed for LEX_STRING.
storage/maria/trnman.h:
  A function to store information about the active transactions list
  and committed transactions list, into buffers, for use by the
  Checkpoint module.
storage/maria/unittest/trnman-t.c:
  trnman.h needs LEX_STRING so m_string.h
parent 714f3b73
...@@ -176,7 +176,8 @@ my_bool execute_checkpoint_indirect() ...@@ -176,7 +176,8 @@ my_bool execute_checkpoint_indirect()
/* checkpoint record data: */ /* checkpoint record data: */
LSN checkpoint_start_lsn; LSN checkpoint_start_lsn;
char checkpoint_start_lsn_char[8]; char checkpoint_start_lsn_char[8];
LEX_STRING strings[5]={ {&checkpoint_start_lsn_str, 8}, {0,0}, {0,0}, {0,0}, {0,0} }; LEX_STRING strings[6]=
{checkpoint_start_lsn_char, 8}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0} };
char *ptr; char *ptr;
LSN checkpoint_lsn; LSN checkpoint_lsn;
LSN candidate_max_rec_lsn_at_last_checkpoint; LSN candidate_max_rec_lsn_at_last_checkpoint;
...@@ -203,69 +204,16 @@ my_bool execute_checkpoint_indirect() ...@@ -203,69 +204,16 @@ my_bool execute_checkpoint_indirect()
goto err; goto err;
/* STEP 3: fetch information about transactions */ /* STEP 3: fetch information about transactions */
/* note: this piece will move into trnman.c */ if (trnman_collect_transactions(&strings[2], &strings[3]))
/*
Transactions are in the "active list" (protected by a mutex) and in a
lock-free hash of "committed" (insertion protected by the same mutex,
deletion lock-free).
*/
{
TRN *trn;
ulong stored_trn_size= 0;
/* First, the active transactions */
pthread_mutex_lock(LOCK_trn_list);
string2.length= 8+(7+2+8+8+8)*trnman_active_transactions;
if (NULL == (string2.str= my_malloc(string2.length)))
goto err; goto err;
ptr= string2.str;
ptr+= 8;
for (trn= active_list_min.next; trn != &active_list_max; trn= trn->next)
{
/* we would latch trn.rwlock if it existed */
if (0 == trn->short_trid) /* trn is not inited, skip */
continue;
/* state is not needed for now (only when we have prepared trx) */
/* int7store does not exist but mi_int7store does */
int7store(ptr, trn->trid);
ptr+= 7;
int2store(ptr, trn->short_trid);
ptr+= 2;
int8store(ptr, trn->undo_lsn); /* is an LSN 7 or 8 bytes really? */
ptr+= 8;
int8store(ptr, trn->undo_purge_lsn);
ptr+= 8;
int8store(ptr, read_non_atomic(&trn->first_undo_lsn));
ptr+= 8;
/* possibly unlatch el.rwlock */
stored_trn_size++;
}
pthread_mutex_unlock(LOCK_trn_list);
/*
Now the committed ones.
We need a function which scans the hash's list of elements in a
lock-free manner (a bit like lfind(), starting from bucket 0), and for
each node (committed transaction) stores the transaction's
information (trid, undo_purge_lsn, first_undo_lsn) into a buffer.
This big buffer is malloc'ed at the start, so the number of elements (or
an upper bound of it) found in the hash needs to be known in advance
(one solution is to keep LOCK_trn_list locked, ensuring that nodes are
only deleted).
*/
/*
TODO: if we see there exists no transaction (active and committed) we can
tell the lock-free structures to do some freeing (my_free()).
*/
int8store(string1.str, stored_trn_size);
string2.length= 8+(7+2+8+8+8)*stored_trn_size;
}
/* STEP 4: fetch information about table files */ /* STEP 4: fetch information about table files */
{ {
/* This global mutex is in fact THR_LOCK_maria (see ma_open()) */ /* This global mutex is in fact THR_LOCK_maria (see ma_open()) */
lock(global_share_list_mutex); lock(global_share_list_mutex);
string3.length= 8+(8+8)*share_list->count; strings[4].length= 8+(8+8)*share_list->count;
if (NULL == (string3.str= my_malloc(string3.length))) if (NULL == (strings[4].str= my_malloc(strings[4].length)))
goto err; goto err;
ptr= string3.str; ptr= string3.str;
/* possibly latch each MARIA_SHARE, one by one, like this: */ /* possibly latch each MARIA_SHARE, one by one, like this: */
...@@ -327,8 +275,8 @@ err: ...@@ -327,8 +275,8 @@ err:
end: end:
for (i= 1; i<5; i++) for (i= 1; i<6; i++)
my_free(strings[i], MYF(MY_ALLOW_ZERO_PTR)); my_free(strings[i].str, MYF(MY_ALLOW_ZERO_PTR));
/* /*
this portion cannot be done as a hook in write_log_record() for the this portion cannot be done as a hook in write_log_record() for the
......
...@@ -18,10 +18,16 @@ ...@@ -18,10 +18,16 @@
#include <my_global.h> #include <my_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <lf.h> #include <lf.h>
#include <m_string.h>
#include "trnman.h" #include "trnman.h"
/* status variables */ /*
uint trnman_active_transactions, trnman_allocated_transactions; status variables:
how many trns in the active list currently,
in the committed list currently, allocated since startup.
*/
uint trnman_active_transactions, trnman_committed_transactions,
trnman_allocated_transactions;
/* list of active transactions in the trid order */ /* list of active transactions in the trid order */
static TRN active_list_min, active_list_max; static TRN active_list_min, active_list_max;
...@@ -100,6 +106,7 @@ int trnman_init() ...@@ -100,6 +106,7 @@ int trnman_init()
committed_list_min.next= &committed_list_max; committed_list_min.next= &committed_list_max;
trnman_active_transactions= 0; trnman_active_transactions= 0;
trnman_committed_transactions= 0;
trnman_allocated_transactions= 0; trnman_allocated_transactions= 0;
pool= 0; pool= 0;
...@@ -129,6 +136,7 @@ void trnman_destroy() ...@@ -129,6 +136,7 @@ void trnman_destroy()
{ {
DBUG_ASSERT(trid_to_committed_trn.count == 0); DBUG_ASSERT(trid_to_committed_trn.count == 0);
DBUG_ASSERT(trnman_active_transactions == 0); DBUG_ASSERT(trnman_active_transactions == 0);
DBUG_ASSERT(trnman_committed_transactions == 0);
DBUG_ASSERT(active_list_max.prev == &active_list_min); DBUG_ASSERT(active_list_max.prev == &active_list_min);
DBUG_ASSERT(active_list_min.next == &active_list_max); DBUG_ASSERT(active_list_min.next == &active_list_max);
DBUG_ASSERT(committed_list_max.prev == &committed_list_min); DBUG_ASSERT(committed_list_max.prev == &committed_list_min);
...@@ -285,11 +293,14 @@ void trnman_end_trn(TRN *trn, my_bool commit) ...@@ -285,11 +293,14 @@ void trnman_end_trn(TRN *trn, my_bool commit)
*/ */
if (trn->prev == &active_list_min) if (trn->prev == &active_list_min)
{ {
uint free_me_count;
TRN *t; TRN *t;
for (t= committed_list_min.next; for (t= committed_list_min.next, free_me_count= 0;
t->commit_trid < active_list_min.next->min_read_from; t->commit_trid < active_list_min.next->min_read_from;
t= t->next) /* no-op */; t= t->next, free_me_count++) /* no-op */;
DBUG_ASSERT((t != committed_list_min.next && free_me_count > 0) ||
(t == committed_list_min.next && free_me_count == 0));
/* found transactions committed before the oldest active one */ /* found transactions committed before the oldest active one */
if (t != committed_list_min.next) if (t != committed_list_min.next)
{ {
...@@ -297,6 +308,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) ...@@ -297,6 +308,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
committed_list_min.next= t; committed_list_min.next= t;
t->prev->next= 0; t->prev->next= 0;
t->prev= &committed_list_min; t->prev= &committed_list_min;
trnman_committed_transactions-= free_me_count;
} }
} }
...@@ -312,6 +324,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) ...@@ -312,6 +324,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
trn->next= &committed_list_max; trn->next= &committed_list_max;
trn->prev= committed_list_max.prev; trn->prev= committed_list_max.prev;
committed_list_max.prev= trn->prev->next= trn; committed_list_max.prev= trn->prev->next= trn;
trnman_committed_transactions++;
res= lf_hash_insert(&trid_to_committed_trn, pins, &trn); res= lf_hash_insert(&trid_to_committed_trn, pins, &trn);
DBUG_ASSERT(res == 0); DBUG_ASSERT(res == 0);
...@@ -413,3 +426,94 @@ my_bool trnman_can_read_from(TRN *trn, TrID trid) ...@@ -413,3 +426,94 @@ my_bool trnman_can_read_from(TRN *trn, TrID trid)
return can; return can;
} }
/*
Allocates two buffers and stores in them some information about transactions
of the active list (into the first buffer) and of the committed list (into
the second buffer).
SYNOPSIS
trnman_collect_transactions()
str_act (OUT) pointer to a LEX_STRING where the allocated buffer, and
its size, will be put
str_com (OUT) pointer to a LEX_STRING where the allocated buffer, and
its size, will be put
DESCRIPTION
Does the allocation because the caller cannot know the size itself.
Memory freeing is to be done by the caller (if the "str" member of the
LEX_STRING is not NULL).
The caller has the intention of doing checkpoints.
RETURN
0 on success
1 on error
*/
my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com)
{
my_bool error;
TRN *trn;
char *ptr;
DBUG_ENTER("trnman_collect_transactions");
DBUG_ASSERT((NULL == str_act->str) && (NULL == str_com->str));
pthread_mutex_lock(&LOCK_trn_list);
str_act->length= 8+(6+2+7+7+7)*trnman_active_transactions;
str_com->length= 8+(6+7+7)*trnman_committed_transactions;
if ((NULL == (str_act->str= my_malloc(str_act->length, MYF(MY_WME)))) ||
(NULL == (str_com->str= my_malloc(str_com->length, MYF(MY_WME)))))
goto err;
/* First, the active transactions */
ptr= str_act->str;
int8store(ptr, (ulonglong)trnman_active_transactions);
ptr+= 8;
for (trn= active_list_min.next; trn != &active_list_max; trn= trn->next)
{
/*
trns with a short trid of 0 are not initialized; Recovery will recognize
this and ignore them.
State is not needed for now (only when we supported prepared trns).
For LSNs, Sanja will soon push lsn7store.
*/
int6store(ptr, trn->trid);
ptr+= 6;
int2store(ptr, trn->short_id);
ptr+= 2;
/* needed for rollback */
/* lsn7store(ptr, trn->undo_lsn); */
ptr+= 7;
/* needed for purge */
/* lsn7store(ptr, trn->undo_purge_lsn); */
ptr+= 7;
/* needed for low-water mark calculation */
/* lsn7store(ptr, read_non_atomic(&trn->first_undo_lsn)); */
ptr+= 7;
}
/* do the same for committed ones */
ptr= str_com->str;
int8store(ptr, (ulonglong)trnman_committed_transactions);
ptr+= 8;
for (trn= committed_list_min.next; trn != &committed_list_max;
trn= trn->next)
{
int6store(ptr, trn->trid);
ptr+= 6;
/* mi_int7store(ptr, trn->undo_purge_lsn); */
ptr+= 7;
/* mi_int7store(ptr, read_non_atomic(&trn->first_undo_lsn)); */
ptr+= 7;
}
/*
TODO: if we see there exists no transaction (active and committed) we can
tell the lock-free structures to do some freeing (my_free()).
*/
error= 0;
goto end;
err:
error= 1;
end:
pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error);
}
...@@ -51,6 +51,7 @@ void trnman_end_trn(TRN *trn, my_bool commit); ...@@ -51,6 +51,7 @@ void trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_abort_trn(T) trnman_end_trn(T, FALSE) #define trnman_abort_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn); void trnman_free_trn(TRN *trn);
my_bool trnman_can_read_from(TRN *trn, TrID trid); my_bool trnman_can_read_from(TRN *trn, TrID trid);
my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com);
#endif #endif
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <my_atomic.h> #include <my_atomic.h>
#include <lf.h> #include <lf.h>
#include <m_string.h>
#include "../trnman.h" #include "../trnman.h"
pthread_mutex_t rt_mutex; pthread_mutex_t rt_mutex;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment