ndb dd -

  Fix bug in tsman undo
parent a4d41932
...@@ -1479,12 +1479,16 @@ Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo) ...@@ -1479,12 +1479,16 @@ Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
Uint32 new_bits = alloc.calc_page_free_bits(free); Uint32 new_bits = alloc.calc_page_free_bits(free);
pageP->list_index = 0x8000 | new_bits; pageP->list_index = 0x8000 | new_bits;
Uint64 lsn = 0;
lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pageP->m_page_header.m_page_lsn_lo;
Tablespace_client tsman(signal, c_tsman, Tablespace_client tsman(signal, c_tsman,
fragPtrP->fragTableId, fragPtrP->fragTableId,
fragPtrP->fragmentId, fragPtrP->fragmentId,
fragPtrP->m_tablespace_id); fragPtrP->m_tablespace_id);
tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn); tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn, lsn);
} }
int int
......
...@@ -20,9 +20,31 @@ ...@@ -20,9 +20,31 @@
/** /**
* Record types * Record types
*/ */
#define PGMAN_PAGE_REQUEST 1
#define LGMAN_LOG_BUFFER_WAITER 2
#define LGMAN_LOG_SYNC_WAITER 3
#define DBTUP_PAGE_REQUEST 4
#define DBTUP_EXTENT_INFO 5
/** /**
* Resource groups * Resource groups
*/ */
/**
* Operations for dd
* PGMAN_PAGE_REQUEST
* LGMAN_LOG_BUFFER_WAITER
* LGMAN_LOG_SYNC_WAITER
* DBTUP_PAGE_REQUEST
*/
#define RG_DISK_OPERATIONS 1
/**
* Records for dd
* DBTUP_EXTENT_INFO
*/
#define RG_DISK_RECORDS 2
#endif #endif
...@@ -1747,7 +1747,8 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1747,7 +1747,8 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
Uint32 fragId, Uint32 fragId,
Local_key *key, Local_key *key,
unsigned bits, unsigned bits,
Uint64 undo_lsn) Uint64 undo_lsn,
Uint64 page_lsn)
{ {
jamEntry(); jamEntry();
...@@ -1790,6 +1791,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1790,6 +1791,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
File_formats::Datafile::Extent_header* header = File_formats::Datafile::Extent_header* header =
page->get_header(extent_no, size); page->get_header(extent_no, size);
Uint64 lsn = 0;
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn > lsn && undo_lsn > page_lsn)
{
if (DBG_UNDO)
ndbout << "tsman: ignore " << undo_lsn << "(" << lsn << ", "
<< page_lsn << ") "
<< *key << " "
<< " -> " << bits << endl;
return 0;
}
if (header->m_table == RNIL) if (header->m_table == RNIL)
{ {
if (DBG_UNDO) if (DBG_UNDO)
...@@ -1797,15 +1812,11 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1797,15 +1812,11 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
return 0; return 0;
} }
ndbrequire(header->m_table == tableId);
ndbrequire(header->m_fragment_id == fragId);
Uint32 page_no_in_extent = (key->m_page_no - data_off) % size; Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
Uint32 src = header->get_free_bits(page_no_in_extent); Uint32 src = header->get_free_bits(page_no_in_extent);
Uint64 lsn = 0; ndbrequire(header->m_table == tableId);
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; ndbrequire(header->m_fragment_id == fragId);
lsn += page->m_page_header.m_page_lsn_lo;
/** /**
* Toggle word * Toggle word
......
...@@ -200,7 +200,7 @@ private: ...@@ -200,7 +200,7 @@ private:
int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*); int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits); int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*, int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
unsigned committed_bits, Uint64 lsn); unsigned committed_bits, Uint64, Uint64);
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key); int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits); int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
...@@ -283,7 +283,8 @@ public: ...@@ -283,7 +283,8 @@ public:
/** /**
* Undo handling of page bits * Undo handling of page bits
*/ */
int restart_undo_page_free_bits(Local_key*, unsigned bits, Uint64 lsn); int restart_undo_page_free_bits(Local_key*, unsigned bits,
Uint64 lsn, Uint64 page_lsn);
/** /**
* Get tablespace info * Get tablespace info
...@@ -382,14 +383,16 @@ inline ...@@ -382,14 +383,16 @@ inline
int int
Tablespace_client::restart_undo_page_free_bits(Local_key* key, Tablespace_client::restart_undo_page_free_bits(Local_key* key,
unsigned committed_bits, unsigned committed_bits,
Uint64 lsn) Uint64 lsn,
Uint64 page_lsn)
{ {
return m_tsman->restart_undo_page_free_bits(m_signal, return m_tsman->restart_undo_page_free_bits(m_signal,
m_table_id, m_table_id,
m_fragment_id, m_fragment_id,
key, key,
committed_bits, committed_bits,
lsn); lsn,
page_lsn);
} }
......
...@@ -19,9 +19,7 @@ libkernel_a_SOURCES = \ ...@@ -19,9 +19,7 @@ libkernel_a_SOURCES = \
SectionReader.cpp \ SectionReader.cpp \
Mutex.cpp SafeCounter.cpp \ Mutex.cpp SafeCounter.cpp \
Rope.cpp \ Rope.cpp \
SuperPool.cpp \ ndbd_malloc.cpp ndbd_malloc_impl.cpp
ndbd_malloc.cpp ndbd_malloc_impl.cpp \
NdbdSuperPool.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
......
...@@ -21,9 +21,10 @@ ...@@ -21,9 +21,10 @@
struct Record_info struct Record_info
{ {
Uint32 m_size; Uint16 m_size;
Uint32 m_offset_next_pool; Uint16 m_type_id;
Uint32 m_type_id; Uint16 m_offset_next_pool;
Uint16 m_offset_magic;
}; };
struct Resource_limit struct Resource_limit
...@@ -38,6 +39,98 @@ struct Pool_context ...@@ -38,6 +39,98 @@ struct Pool_context
{ {
class SimulatedBlock* m_block; class SimulatedBlock* m_block;
struct Resource_limit* m_resource_limit; struct Resource_limit* m_resource_limit;
/**
* Alloc consekutive pages
*
* @param i : out : i value of first page
* @return : pointer to first page (NULL if failed)
*
* Will handle resource limit
*/
void* alloc_page(Uint32 *i);
/**
* Release pages
*
* @param i : in : i value of first page
* @param p : in : pointer to first page
*/
void release_page(Uint32 i, void* p);
/**
* Alloc consekutive pages
*
* @param cnt : in/out : no of requested pages,
* return no of allocated (undefined return NULL)
* out will never be > in
* @param i : out : i value of first page
* @param min : in : will never allocate less than min
* @return : pointer to first page (NULL if failed)
*
* Will handle resource limit
*/
void* alloc_pages(Uint32 *i, Uint32 *cnt, Uint32 min = 1);
/**
* Release pages
*
* @param i : in : i value of first page
* @param p : in : pointer to first page
* @param cnt : in : no of pages to release
*/
void release_pages(Uint32 i, void* p, Uint32 cnt);
/**
* Pool abort
* Only know issue is getPtr with invalid i-value.
* If other emerges, we will add argument to this method
*/
struct AbortArg
{
Uint32 m_expected_magic;
Uint32 m_found_magic;
Uint32 i;
void * p;
};
void handle_abort(const AbortArg &);
};
template <typename T>
struct Ptr
{
T * p;
Uint32 i;
inline bool isNull() const { return i == RNIL; }
inline void setNull() { i = RNIL; }
};
template <typename T>
struct ConstPtr
{
const T * p;
Uint32 i;
inline bool isNull() const { return i == RNIL; }
inline void setNull() { i = RNIL; }
}; };
#ifdef XX_DOCUMENTATION_XX
/**
* Any pool should implement the following
*/
struct Pool
{
public:
Pool();
void init(const Record_info& ri, const Pool_context& pc);
bool seize(Uint32*, void**);
void* seize(Uint32*);
void release(Uint32 i, void* p);
void * getPtr(Uint32 i);
};
#endif
#endif #endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef RECORD_POOL_HPP
#define RECORD_POOL_HPP
#include <kernel_types.h>
#include "Pool.hpp"
template <typename T, typename P>
class RecordPool {
public:
RecordPool();
~RecordPool();
void init(Uint32 type_id, const Pool_context& pc);
/**
* Update p value for ptr according to i value
*/
void getPtr(Ptr<T> &);
void getPtr(ConstPtr<T> &) const;
/**
* Get pointer for i value
*/
T * getPtr(Uint32 i);
const T * getConstPtr(Uint32 i) const;
/**
* Update p & i value for ptr according to <b>i</b> value
*/
void getPtr(Ptr<T> &, Uint32 i);
void getPtr(ConstPtr<T> &, Uint32 i) const;
/**
* Allocate an object from pool - update Ptr
*
* Return i
*/
bool seize(Ptr<T> &);
/**
* Return an object to pool
*/
void release(Uint32 i);
/**
* Return an object to pool
*/
void release(Ptr<T> &);
private:
P m_pool;
};
template <typename T, typename P>
inline
RecordPool<T, P>::RecordPool()
{
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::init(Uint32 type_id, const Pool_context& pc)
{
Record_info ri;
ri.m_size = sizeof(T);
ri.m_offset_next_pool = offsetof(T, nextPool);
ri.m_type_id = type_id;
m_pool.init(ri, pc);
}
template <typename T, typename P>
inline
RecordPool<T, P>::~RecordPool()
{
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(Ptr<T> & ptr)
{
ptr.p = static_cast<T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(ConstPtr<T> & ptr) const
{
ptr.p = static_cast<const T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(Ptr<T> & ptr, Uint32 i)
{
ptr.i = i;
ptr.p = static_cast<T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(ConstPtr<T> & ptr, Uint32 i) const
{
ptr.i = i;
ptr.p = static_cast<const T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
T *
RecordPool<T, P>::getPtr(Uint32 i)
{
return static_cast<T*>(m_pool.getPtr(i));
}
template <typename T, typename P>
inline
const T *
RecordPool<T, P>::getConstPtr(Uint32 i) const
{
return static_cast<const T*>(m_pool.getPtr(i));
}
template <typename T, typename P>
inline
bool
RecordPool<T, P>::seize(Ptr<T> & ptr)
{
return m_pool.seize(&ptr.i, (char**)&ptr.p);
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::release(Uint32 i)
{
m_pool.release(i, m_pool.getPtr(i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::release(Ptr<T> & ptr)
{
m_pool.release(ptr.i, (char*)ptr.p);
}
#endif
...@@ -239,20 +239,4 @@ ...@@ -239,20 +239,4 @@
#define MEMCOPY_NO_WORDS(to, from, no_of_words) \ #define MEMCOPY_NO_WORDS(to, from, no_of_words) \
memcpy((to), (void*)(from), (size_t)(no_of_words << 2)); memcpy((to), (void*)(from), (size_t)(no_of_words << 2));
template <class T>
struct Ptr {
T * p;
Uint32 i;
inline bool isNull() const { return i == RNIL; }
inline void setNull() { i = RNIL; }
};
template <class T>
struct ConstPtr {
const T * p;
Uint32 i;
inline bool isNull() const { return i == RNIL; }
inline void setNull() { i = RNIL; }
};
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment