set correct lockmode in all reads...

    move lockmode from scan operation to operation
    added read tuple with lock mode
parent 00180412
...@@ -49,6 +49,15 @@ public: ...@@ -49,6 +49,15 @@ public:
* @{ * @{
*/ */
/**
* Define the NdbIndexOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
* reads a tuple.
*
* @return 0 if successful otherwise -1.
*/
int readTuple(LockMode);
/** /**
* Define the NdbIndexOperation to be a standard operation of type readTuple. * Define the NdbIndexOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation * When calling NdbConnection::execute, this operation
......
...@@ -51,6 +51,19 @@ public: ...@@ -51,6 +51,19 @@ public:
* @{ * @{
*/ */
/**
* Lock when performing read
*/
enum LockMode {
LM_Read = 0,
LM_Exclusive = 1,
LM_CommittedRead = 2,
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
LM_Dirty = 2
#endif
};
/** /**
* Define the NdbOperation to be a standard operation of type insertTuple. * Define the NdbOperation to be a standard operation of type insertTuple.
* When calling NdbConnection::execute, this operation * When calling NdbConnection::execute, this operation
...@@ -88,6 +101,15 @@ public: ...@@ -88,6 +101,15 @@ public:
*/ */
virtual int deleteTuple(); virtual int deleteTuple();
/**
* Define the NdbOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
* reads a tuple.
*
* @return 0 if successful otherwise -1.
*/
virtual int readTuple(LockMode);
/** /**
* Define the NdbOperation to be a standard operation of type readTuple. * Define the NdbOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation * When calling NdbConnection::execute, this operation
......
...@@ -53,18 +53,6 @@ public: ...@@ -53,18 +53,6 @@ public:
IndexCursor = 2 IndexCursor = 2
}; };
/**
* Lock when performing scan
*/
enum LockMode {
LM_Read = 0,
LM_Exclusive = 1,
LM_CommittedRead = 2,
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
LM_Dirty = 2
#endif
};
/** /**
* Type of cursor * Type of cursor
*/ */
......
...@@ -85,6 +85,11 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, ...@@ -85,6 +85,11 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex,
return 0; return 0;
} }
int NdbIndexOperation::readTuple(NdbOperation::LockMode lm)
{
return NdbOperation::readTuple(lm);
}
int NdbIndexOperation::readTuple() int NdbIndexOperation::readTuple()
{ {
// First check that index is unique // First check that index is unique
......
...@@ -103,6 +103,24 @@ NdbOperation::writeTuple() ...@@ -103,6 +103,24 @@ NdbOperation::writeTuple()
* int readTuple(); * int readTuple();
*****************************************************************************/ *****************************************************************************/
int int
NdbOperation::readTuple(NdbOperation::LockMode lm)
{
switch(lm) {
case LM_Read:
return readTuple();
break;
case LM_Exclusive:
return readTupleExclusive();
break;
case LM_CommittedRead:
return readTuple();
break;
};
}
/******************************************************************************
* int readTuple();
*****************************************************************************/
int
NdbOperation::readTuple() NdbOperation::readTuple()
{ {
NdbConnection* tNdbCon = theNdbCon; NdbConnection* tNdbCon = theNdbCon;
......
...@@ -803,14 +803,14 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) ...@@ -803,14 +803,14 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{ {
int lm; int lm;
if (type == TL_WRITE_ALLOW_WRITE) if (type == TL_WRITE_ALLOW_WRITE)
lm= NdbScanOperation::LM_Exclusive; lm= NdbOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields)) else if (uses_blob_value(retrieve_all_fields))
/* /*
TODO use a new scan mode to read + lock + keyinfo TODO use a new scan mode to read + lock + keyinfo
*/ */
lm= NdbScanOperation::LM_Exclusive; lm= NdbOperation::LM_Exclusive;
else else
lm= NdbScanOperation::LM_CommittedRead; lm= NdbOperation::LM_CommittedRead;
return lm; return lm;
} }
...@@ -952,8 +952,10 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -952,8 +952,10 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len)); DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple() != 0) op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY) if (table->primary_key == MAX_KEY)
...@@ -1021,8 +1023,10 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -1021,8 +1023,10 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// We have allready retrieved all fields, nothing to complement // We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0); DBUG_RETURN(0);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple() != 0) op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
int res; int res;
...@@ -1073,10 +1077,12 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1073,10 +1077,12 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbIndexOperation((NDBINDEX *) if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
m_index[active_index].unique_index, m_index[active_index].unique_index,
(const NDBTAB *) m_table)) || (const NDBTAB *) m_table)) ||
op->readTuple() != 0) op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
// Set secondary index key(s) // Set secondary index key(s)
...@@ -1335,14 +1341,13 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1335,14 +1341,13 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(end_key, "end_key");); DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
index_name= get_index_name(active_index); index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) NdbOperation::LockMode lm=
get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism, sorted))) if ((op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -1400,11 +1405,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -1400,11 +1405,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s", DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname)); m_tabname));
if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table))) NdbOperation::LockMode lm=
ERR_RETURN(trans->getNdbError()); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
get_ndb_lock_type(m_lock.type); !(cursor= op->readTuples(lm, 0, parallelism)))
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -1471,11 +1475,10 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -1471,11 +1475,10 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan"); DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table))) NdbOperation::LockMode lm=
ERR_RETURN(trans->getNdbError()); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
get_ndb_lock_type(m_lock.type); !(cursor= op->readTuples(lm, 0, parallelism)))
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op)); DBUG_RETURN(define_read_attrs(buf, op));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment