ha_ndbcluster.cc 154 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
/* Copyright (C) 2000-2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
*/

/*
  This file defines the NDB Cluster handler: the interface between MySQL and
  NDB Cluster
*/

#ifdef __GNUC__
#pragma implementation                          // gcc: Class implementation
#endif

#include "mysql_priv.h"

#ifdef HAVE_NDBCLUSTER_DB
#include <my_dir.h>
#include "ha_ndbcluster.h"
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp>

// Default value for parallelism
36
static const int parallelism= 0;
unknown's avatar
unknown committed
37

38 39
// Default value for max number of transactions
// createable against NDB from this handler
40 41
static const int max_transactions= 256;

42
// connectstring to cluster if given by mysqld
unknown's avatar
unknown committed
43
const char *ndbcluster_connectstring= 0;
44

45 46
static const char *ha_ndb_ext=".ndb";

unknown's avatar
unknown committed
47
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
48

unknown's avatar
unknown committed
49 50

#define ERR_PRINT(err) \
51
  DBUG_PRINT("error", ("%d  message: %s", err.code, err.message))
unknown's avatar
unknown committed
52 53 54

#define ERR_RETURN(err)		         \
{				         \
55 56 57
  const NdbError& tmp= err;              \
  ERR_PRINT(tmp);		         \
  DBUG_RETURN(ndb_to_mysql_error(&tmp)); \
unknown's avatar
unknown committed
58 59 60 61
}

// Typedefs for long names
typedef NdbDictionary::Column NDBCOL;
unknown's avatar
unknown committed
62
typedef NdbDictionary::Table NDBTAB;
unknown's avatar
unknown committed
63 64 65
typedef NdbDictionary::Index  NDBINDEX;
typedef NdbDictionary::Dictionary  NDBDICT;

unknown's avatar
unknown committed
66
bool ndbcluster_inited= FALSE;
unknown's avatar
unknown committed
67

68
static Ndb* g_ndb= NULL;
69
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
70

unknown's avatar
unknown committed
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;

// Table lock handling
static HASH ndbcluster_open_tables;

static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                my_bool not_used __attribute__((unused)));
static NDB_SHARE *get_share(const char *table_name);
static void free_share(NDB_SHARE *share);

static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
static int unpackfrm(const void **data, uint *len,
		     const void* pack_data);

unknown's avatar
unknown committed
86
static int ndb_get_table_statistics(Ndb*, const char *, 
87 88
				    Uint64* rows, Uint64* commits);

unknown's avatar
unknown committed
89

90 91 92 93
/*
  Dummy buffer to read zero pack_length fields
  which are mapped to 1 char
*/
unknown's avatar
unknown committed
94
static uint32 dummy_buf;
95

unknown's avatar
unknown committed
96 97 98 99 100 101 102 103
/*
  Error handling functions
*/

struct err_code_mapping
{
  int ndb_err;
  int my_err;
104
  int show_warning;
unknown's avatar
unknown committed
105 106 107 108
};

static const err_code_mapping err_map[]= 
{
109 110
  { 626, HA_ERR_KEY_NOT_FOUND, 0 },
  { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
unknown's avatar
unknown committed
111
  { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  { 721, HA_ERR_TABLE_EXIST, 1 },
  { 4244, HA_ERR_TABLE_EXIST, 1 },

  { 709, HA_ERR_NO_SUCH_TABLE, 1 },
  { 284, HA_ERR_NO_SUCH_TABLE, 1 },

  { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },

  { 623, HA_ERR_RECORD_FILE_FULL, 1 },
  { 624, HA_ERR_RECORD_FILE_FULL, 1 },
  { 625, HA_ERR_RECORD_FILE_FULL, 1 },
  { 826, HA_ERR_RECORD_FILE_FULL, 1 },
  { 827, HA_ERR_RECORD_FILE_FULL, 1 },
  { 832, HA_ERR_RECORD_FILE_FULL, 1 },

  { 0, 1, 0 },

  { -1, -1, 1 }
unknown's avatar
unknown committed
134 135 136 137 138 139
};


static int ndb_to_mysql_error(const NdbError *err)
{
  uint i;
140 141
  for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
  if (err_map[i].show_warning)
unknown's avatar
unknown committed
142
  {
143 144 145 146
    // Push the NDB error message as warning
    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
			ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
			err->code, err->message, "NDB");
unknown's avatar
unknown committed
147
  }
148 149
  if (err_map[i].my_err == -1)
    return err->code;
unknown's avatar
unknown committed
150 151 152 153
  return err_map[i].my_err;
}


unknown's avatar
unknown committed
154 155 156 157 158

inline
int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans)
{
  int m_batch_execute= 0;
unknown's avatar
unknown committed
159 160
#ifdef NOT_USED
  if (m_batch_execute)
unknown's avatar
unknown committed
161
    return 0;
unknown's avatar
unknown committed
162
#endif
163
  return trans->execute(NoCommit,AbortOnError,h->m_force_send);
unknown's avatar
unknown committed
164 165 166 167 168 169
}

inline
int execute_commit(ha_ndbcluster *h, NdbConnection *trans)
{
  int m_batch_execute= 0;
unknown's avatar
unknown committed
170 171
#ifdef NOT_USED
  if (m_batch_execute)
unknown's avatar
unknown committed
172
    return 0;
unknown's avatar
unknown committed
173
#endif
174 175 176 177 178 179 180 181 182 183 184 185
  return trans->execute(Commit,AbortOnError,h->m_force_send);
}

inline
int execute_commit(THD *thd, NdbConnection *trans)
{
  int m_batch_execute= 0;
#ifdef NOT_USED
  if (m_batch_execute)
    return 0;
#endif
  return trans->execute(Commit,AbortOnError,thd->variables.ndb_force_send);
unknown's avatar
unknown committed
186 187 188 189 190 191
}

inline
int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans)
{
  int m_batch_execute= 0;
unknown's avatar
unknown committed
192 193
#ifdef NOT_USED
  if (m_batch_execute)
unknown's avatar
unknown committed
194
    return 0;
unknown's avatar
unknown committed
195
#endif
unknown's avatar
merge  
unknown committed
196
  return trans->execute(NoCommit,AO_IgnoreError,h->m_force_send);
unknown's avatar
unknown committed
197 198
}

199
/*
200
  CPDH condition storage support
201
*/

Ndb_item::Ndb_item(NDB_ITEM_TYPE item_type, 
		   NDB_ITEM_QUALIFICATION item_qualification,
		   const Item *item_value) 
  : type(item_type), qualification(item_qualification)
{ 
  switch(item_type) {
  case(NDB_VALUE):
  {
    switch(item_qualification.value_type) {
    case(Item::STRING_ITEM): {
      Ndb_item_string_value *string_value = new Ndb_item_string_value();
      Item_string *string_item= (Item_string *)item_value;
      string_value->s= string_item->str_value;
      string_value->c= string_item->collation.collation;
      value.string_value= string_value;
      break;
    }
    case(Item::INT_ITEM): {
      value.int_value= ((Item_int *)item_value)->val_int();
      break;
    }
    case(Item::REAL_ITEM): {
      value.real_value= ((Item_real *)item_value)->val_real();
      break;
    }
    case(Item::NULL_ITEM):
      break;
    case(Item::VARBIN_ITEM): {
      Ndb_item_string_value *string_value = new Ndb_item_string_value();
      Item_varbinary *varbin_item= (Item_varbinary *)item_value;
      string_value->s= varbin_item->str_value;
      string_value->c= varbin_item->collation.collation;
      value.string_value= string_value;
      break;
    }
    default:
      break;
    }
  }
  break;
  case(NDB_FIELD): {
    NDB_ITEM_FIELD_VALUE *field_value= new NDB_ITEM_FIELD_VALUE();
    Item_field *field_item= (Item_field *) item_value;
    field_value->field= field_item->field;
    field_value->column_no= -1; // Will be fetched at scan filter generation
    value.field_value= field_value;
    break;
  }
  case(NDB_FUNCTION):
    break;
  }
}
 
Ndb_item::Ndb_item(longlong int_value) : type(NDB_VALUE)
{
  qualification.value_type= Item::INT_ITEM;
  value.int_value= int_value; 
}

Ndb_item::Ndb_item(double real_value) : type(NDB_VALUE)
{
  qualification.value_type= Item::REAL_ITEM;
  value.real_value= real_value; 
}

Ndb_item::Ndb_item(): type(NDB_VALUE)
{
  qualification.value_type= Item::NULL_ITEM;
}

Ndb_item::Ndb_item(Field *field, int column_no) : type(NDB_FIELD)
{
    NDB_ITEM_FIELD_VALUE *field_value= new NDB_ITEM_FIELD_VALUE();
    qualification.field_type= field->type();
    field_value->field= field;
    field_value->column_no= column_no;
    value.field_value= field_value;
}

Ndb_item::Ndb_item(Item_func::Functype func_type) : type(NDB_FUNCTION)
{
  qualification.function_type= func_type;
}

Ndb_item::~Ndb_item() 
{ 
  if (type == NDB_VALUE && 
      (qualification.value_type == Item::STRING_ITEM ||
       qualification.value_type == Item::VARBIN_ITEM))
  {
    delete value.string_value;
    value.string_value= NULL;
  }
  else if (type == NDB_FIELD)
  {
    delete value.field_value;
    value.field_value= NULL;
  }
}

void Ndb_item::print(String* str)
{
  switch(type) {
  case(NDB_VALUE):
    str->append("[#NDB_VALUE ");
    switch(qualification.value_type) {
    case (Item::INT_ITEM): {
      String tmp;
      tmp.set(value.int_value, &my_charset_bin);
      str->append(tmp);
      break;
    }
    case (Item::REAL_ITEM): {
      String tmp;
      tmp.set(value.real_value, 4 , &my_charset_bin);
      str->append(tmp);
      break;
    }
    case (Item::STRING_ITEM): {
      str->append(value.string_value->s.ptr());
      break;
    }
    case (Item::VARBIN_ITEM): {
      str->append(value.string_value->s.ptr());
      break;
    }
    case (Item::NULL_ITEM):
      str->append("NULL");
      break;
    default:
      str->append("ILLEGAL VALUE");
    }
    str->append("]");
    break;
  case(NDB_FIELD):  
    str->append("[#NDB_FIELD ");
    str->append(value.field_value->field->field_name);
    str->append("]");
    break;
  case(NDB_FUNCTION):
    str->append("[#NDB_FUNCTION ");
    switch(qualification.function_type) {
    case(Item_func::UNKNOWN_FUNC): {
      str->append("UNKNOWN]");
      break;
    }
    case(Item_func::EQ_FUNC): {
      str->append("=]");
      break;
    }
    case(Item_func::NE_FUNC): {
      str->append("!=]");
      break;
    }
    case(Item_func::LT_FUNC): {
      str->append("<]");
      break;
    }
    case(Item_func::LE_FUNC): {
      str->append("<=]");
      break;
    }
    case(Item_func::GE_FUNC): {
      str->append(">=]");
      break;
    }
    case(Item_func::GT_FUNC): {
      str->append(">]");
      break;
    }
    case(Item_func::LIKE_FUNC): {
      str->append("like]");
      break;
    }
    case(Item_func::NOTLIKE_FUNC): {
      str->append("notlike]");
      break;
    }
    case(Item_func::ISNULL_FUNC): {
      str->append("isnull]");
      break;
    }
    case(Item_func::ISNOTNULL_FUNC): {
      str->append("isnotnull]");
      break;
    }
    case(Item_func::COND_AND_FUNC): {
      str->append("and]");
      break;
    }
    case(Item_func::COND_OR_FUNC): {
      str->append("or]");
      break;
    }
    default:
      str->append("UNSUPPORTED]");
    }
  }
}
401

402 403 404
/*
  Place holder for ha_ndbcluster thread specific data
*/
405 406
Thd_ndb::Thd_ndb()
{
407
  ndb= new Ndb(g_ndb_cluster_connection, "");
408 409
  lock_count= 0;
  count= 0;
410
  error= 0;
411 412 413 414
}

Thd_ndb::~Thd_ndb()
{
415 416
  if (ndb)
    delete ndb;
417 418 419 420 421 422 423 424
}

/*
 * manage uncommitted insert/deletes during transactio to get records correct
 */

struct Ndb_table_local_info {
  int no_uncommitted_rows_count;
425
  ulong last_count;
426 427 428
  ha_rows records;
};

unknown's avatar
unknown committed
429 430 431 432 433 434 435 436 437 438
void ha_ndbcluster::set_rec_per_key()
{
  DBUG_ENTER("ha_ndbcluster::get_status_const");
  for (uint i=0 ; i < table->keys ; i++)
  {
    table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
  }
  DBUG_VOID_RETURN;
}

439 440
void ha_ndbcluster::records_update()
{
441 442
  if (m_ha_not_exact_count)
    return;
443 444 445 446 447
  DBUG_ENTER("ha_ndbcluster::records_update");
  struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
  DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
		      ((const NDBTAB *)m_table)->getTableId(),
		      info->no_uncommitted_rows_count));
448
  //  if (info->records == ~(ha_rows)0)
449 450 451 452 453 454
  {
    Uint64 rows;
    if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
      info->records= rows;
    }
  }
455 456 457 458 459
  {
    THD *thd= current_thd;
    if (((Thd_ndb*)(thd->transaction.thd_ndb))->error)
      info->no_uncommitted_rows_count= 0;
  }
460 461 462 463
  records= info->records+ info->no_uncommitted_rows_count;
  DBUG_VOID_RETURN;
}

464 465
void ha_ndbcluster::no_uncommitted_rows_execute_failure()
{
466 467
  if (m_ha_not_exact_count)
    return;
468
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
469 470
  THD *thd= current_thd;
  ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 1;
471 472 473
  DBUG_VOID_RETURN;
}

474 475
void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
{
476 477
  if (m_ha_not_exact_count)
    return;
478 479 480
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
  struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
  Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
481
  if (info->last_count != thd_ndb->count)
482
  {
483
    info->last_count= thd_ndb->count;
484 485 486 487 488 489 490 491 492 493 494
    info->no_uncommitted_rows_count= 0;
    info->records= ~(ha_rows)0;
    DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
			((const NDBTAB *)m_table)->getTableId(),
			info->no_uncommitted_rows_count));
  }
  DBUG_VOID_RETURN;
}

void ha_ndbcluster::no_uncommitted_rows_update(int c)
{
495 496
  if (m_ha_not_exact_count)
    return;
497
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
unknown's avatar
unknown committed
498 499
  struct Ndb_table_local_info *info=
    (struct Ndb_table_local_info *)m_table_info;
500 501 502 503 504 505 506 507 508
  info->no_uncommitted_rows_count+= c;
  DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
		      ((const NDBTAB *)m_table)->getTableId(),
		      info->no_uncommitted_rows_count));
  DBUG_VOID_RETURN;
}

void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
{
509 510
  if (m_ha_not_exact_count)
    return;
511 512
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
  ((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
513
  ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 0;
514 515 516
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
517 518 519 520 521 522 523 524
/*
  Take care of the error that occured in NDB
  
  RETURN
    0	No error
    #   The mapped error code
*/

525

unknown's avatar
unknown committed
526 527
int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
528
  int res;
unknown's avatar
unknown committed
529 530 531 532 533 534 535 536 537 538
  const NdbError err= trans->getNdbError();
  DBUG_ENTER("ndb_err");
  
  ERR_PRINT(err);
  switch (err.classification) {
  case NdbError::SchemaError:
  {
    NDBDICT *dict= m_ndb->getDictionary();
    DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
    dict->invalidateTable(m_tabname);
539
    table->version=0L;			/* Free when thread is ready */
unknown's avatar
unknown committed
540 541 542 543 544
    break;
  }
  default:
    break;
  }
545 546 547 548
  res= ndb_to_mysql_error(&err);
  DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", 
		      err.code, res));
  if (res == HA_ERR_FOUND_DUPP_KEY)
549
    m_dupkey= table->primary_key;
550 551
  
  DBUG_RETURN(res);
unknown's avatar
unknown committed
552 553 554
}


555
/*
556
  Override the default get_error_message in order to add the 
557 558 559
  error message of NDB 
 */

560 561
bool ha_ndbcluster::get_error_message(int error, 
				      String *buf)
562
{
563
  DBUG_ENTER("ha_ndbcluster::get_error_message");
564
  DBUG_PRINT("enter", ("error: %d", error));
565

unknown's avatar
unknown committed
566
  Ndb *ndb= ((Thd_ndb*)current_thd->transaction.thd_ndb)->ndb;
567
  if (!ndb)
unknown's avatar
unknown committed
568
    DBUG_RETURN(FALSE);
569

570
  const NdbError err= ndb->getNdbError(error);
571 572 573 574
  bool temporary= err.status==NdbError::TemporaryError;
  buf->set(err.message, strlen(err.message), &my_charset_bin);
  DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
  DBUG_RETURN(temporary);
575 576 577
}


unknown's avatar
unknown committed
578 579
/*
  Check if type is supported by NDB.
580 581
  TODO Use this once in open(), not in every operation

unknown's avatar
unknown committed
582 583 584 585 586
*/

static inline bool ndb_supported_type(enum_field_types type)
{
  switch (type) {
unknown's avatar
unknown committed
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
  case MYSQL_TYPE_DECIMAL:    
  case MYSQL_TYPE_TINY:        
  case MYSQL_TYPE_SHORT:
  case MYSQL_TYPE_LONG:
  case MYSQL_TYPE_INT24:       
  case MYSQL_TYPE_LONGLONG:
  case MYSQL_TYPE_FLOAT:
  case MYSQL_TYPE_DOUBLE:
  case MYSQL_TYPE_TIMESTAMP:
  case MYSQL_TYPE_DATETIME:    
  case MYSQL_TYPE_DATE:
  case MYSQL_TYPE_NEWDATE:
  case MYSQL_TYPE_TIME:        
  case MYSQL_TYPE_YEAR:        
  case MYSQL_TYPE_STRING:      
  case MYSQL_TYPE_VAR_STRING:
  case MYSQL_TYPE_TINY_BLOB:
  case MYSQL_TYPE_BLOB:    
  case MYSQL_TYPE_MEDIUM_BLOB:   
  case MYSQL_TYPE_LONG_BLOB:  
  case MYSQL_TYPE_ENUM:
  case MYSQL_TYPE_SET:         
unknown's avatar
unknown committed
609
    return TRUE;
unknown's avatar
unknown committed
610 611
  case MYSQL_TYPE_NULL:   
  case MYSQL_TYPE_GEOMETRY:
612
  case MYSQL_TYPE_VARCHAR:
unknown's avatar
unknown committed
613
    break;
unknown's avatar
unknown committed
614
  }
unknown's avatar
unknown committed
615
  return FALSE;
unknown's avatar
unknown committed
616 617 618
}


unknown's avatar
unknown committed
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
/*
  Instruct NDB to set the value of the hidden primary key
*/

bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
				   uint fieldnr, const byte *field_ptr)
{
  DBUG_ENTER("set_hidden_key");
  DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
			    NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
}


/*
  Instruct NDB to set the value of one primary key attribute
*/

int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
                               uint fieldnr, const byte *field_ptr)
{
  uint32 pack_len= field->pack_length();
  DBUG_ENTER("set_ndb_key");
  DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d", 
                       fieldnr, field->field_name, field->type(),
                       pack_len));
  DBUG_DUMP("key", (char*)field_ptr, pack_len);
  
unknown's avatar
unknown committed
646 647 648 649 650
  if (ndb_supported_type(field->type()))
  {
    if (! (field->flags & BLOB_FLAG))
      // Common implementation for most field types
      DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
unknown's avatar
unknown committed
651
  }
unknown's avatar
unknown committed
652 653 654
  // Unhandled field types
  DBUG_PRINT("error", ("Field type %d not supported", field->type()));
  DBUG_RETURN(2);
unknown's avatar
unknown committed
655 656 657 658 659 660 661 662
}


/*
 Instruct NDB to set the value of one attribute
*/

int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, 
663
                                 uint fieldnr, bool *set_blob_value)
unknown's avatar
unknown committed
664 665 666 667 668 669 670 671
{
  const byte* field_ptr= field->ptr;
  uint32 pack_len=  field->pack_length();
  DBUG_ENTER("set_ndb_value");
  DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", 
                       fieldnr, field->field_name, field->type(), 
                       pack_len, field->is_null()?"Y":"N"));
  DBUG_DUMP("value", (char*) field_ptr, pack_len);
unknown's avatar
unknown committed
672 673

  if (ndb_supported_type(field->type()))
unknown's avatar
unknown committed
674
  {
675
    // ndb currently does not support size 0
unknown's avatar
unknown committed
676
    uint32 empty_field;
677 678
    if (pack_len == 0)
    {
unknown's avatar
unknown committed
679 680 681 682 683 684
      pack_len= sizeof(empty_field);
      field_ptr= (byte *)&empty_field;
      if (field->is_null())
	empty_field= 0;
      else
	empty_field= 1;
685
    }
unknown's avatar
unknown committed
686 687 688 689 690 691 692 693 694 695
    if (! (field->flags & BLOB_FLAG))
    {
      if (field->is_null())
        // Set value to NULL
        DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
      // Common implementation for most field types
      DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
    }

    // Blob type
696
    NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
unknown's avatar
unknown committed
697 698 699 700 701 702 703 704 705 706 707 708
    if (ndb_blob != NULL)
    {
      if (field->is_null())
        DBUG_RETURN(ndb_blob->setNull() != 0);

      Field_blob *field_blob= (Field_blob*)field;

      // Get length and pointer to data
      uint32 blob_len= field_blob->get_length(field_ptr);
      char* blob_ptr= NULL;
      field_blob->get_ptr(&blob_ptr);

709 710 711
      // Looks like NULL ptr signals length 0 blob
      if (blob_ptr == NULL) {
        DBUG_ASSERT(blob_len == 0);
712
        blob_ptr= (char*)"";
713
      }
unknown's avatar
unknown committed
714 715 716 717 718

      DBUG_PRINT("value", ("set blob ptr=%x len=%u",
                           (unsigned)blob_ptr, blob_len));
      DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));

719
      if (set_blob_value)
unknown's avatar
unknown committed
720
	*set_blob_value= TRUE;
unknown's avatar
unknown committed
721 722 723 724
      // No callback needed to write value
      DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
    }
    DBUG_RETURN(1);
unknown's avatar
unknown committed
725
  }
unknown's avatar
unknown committed
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
  // Unhandled field types
  DBUG_PRINT("error", ("Field type %d not supported", field->type()));
  DBUG_RETURN(2);
}


/*
  Callback to read all blob values.
  - not done in unpack_record because unpack_record is valid
    after execute(Commit) but reading blobs is not
  - may only generate read operations; they have to be executed
    somewhere before the data is available
  - due to single buffer for all blobs, we let the last blob
    process all blobs (last so that all are active)
  - null bit is still set in unpack_record
  - TODO allocate blob part aligned buffers
*/

unknown's avatar
unknown committed
744
NdbBlob::ActiveHook g_get_ndb_blobs_value;
unknown's avatar
unknown committed
745

unknown's avatar
unknown committed
746
int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
unknown's avatar
unknown committed
747
{
unknown's avatar
unknown committed
748
  DBUG_ENTER("g_get_ndb_blobs_value");
unknown's avatar
unknown committed
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
  if (ndb_blob->blobsNextBlob() != NULL)
    DBUG_RETURN(0);
  ha_ndbcluster *ha= (ha_ndbcluster *)arg;
  DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
}

int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
  DBUG_ENTER("get_ndb_blobs_value");

  // Field has no field number so cannot use TABLE blob_field
  // Loop twice, first only counting total buffer size
  for (int loop= 0; loop <= 1; loop++)
  {
    uint32 offset= 0;
    for (uint i= 0; i < table->fields; i++)
    {
      Field *field= table->field[i];
      NdbValue value= m_value[i];
      if (value.ptr != NULL && (field->flags & BLOB_FLAG))
      {
        Field_blob *field_blob= (Field_blob *)field;
        NdbBlob *ndb_blob= value.blob;
        Uint64 blob_len= 0;
        if (ndb_blob->getLength(blob_len) != 0)
          DBUG_RETURN(-1);
        // Align to Uint64
        uint32 blob_size= blob_len;
        if (blob_size % 8 != 0)
          blob_size+= 8 - blob_size % 8;
        if (loop == 1)
        {
781
          char *buf= m_blobs_buffer + offset;
unknown's avatar
unknown committed
782 783 784 785 786 787 788 789 790 791 792
          uint32 len= 0xffffffff;  // Max uint32
          DBUG_PRINT("value", ("read blob ptr=%x len=%u",
                               (uint)buf, (uint)blob_len));
          if (ndb_blob->readData(buf, len) != 0)
            DBUG_RETURN(-1);
          DBUG_ASSERT(len == blob_len);
          field_blob->set_ptr(len, buf);
        }
        offset+= blob_size;
      }
    }
793
    if (loop == 0 && offset > m_blobs_buffer_size)
unknown's avatar
unknown committed
794
    {
795 796
      my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
      m_blobs_buffer_size= 0;
unknown's avatar
unknown committed
797
      DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
798 799
      m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
      if (m_blobs_buffer == NULL)
unknown's avatar
unknown committed
800
        DBUG_RETURN(-1);
801
      m_blobs_buffer_size= offset;
unknown's avatar
unknown committed
802
    }
unknown's avatar
unknown committed
803
  }
unknown's avatar
unknown committed
804
  DBUG_RETURN(0);
unknown's avatar
unknown committed
805 806 807 808 809
}


/*
  Instruct NDB to fetch one field
unknown's avatar
unknown committed
810 811
  - data is read directly into buffer provided by field
    if field is NULL, data is read into memory provided by NDBAPI
unknown's avatar
unknown committed
812 813
*/

unknown's avatar
unknown committed
814
int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
unknown's avatar
unknown committed
815
                                 uint fieldnr, byte* buf)
unknown's avatar
unknown committed
816 817
{
  DBUG_ENTER("get_ndb_value");
unknown's avatar
unknown committed
818 819 820 821 822
  DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
                       (int)(field != NULL ? field->flags : 0)));

  if (field != NULL)
  {
unknown's avatar
unknown committed
823
    DBUG_ASSERT(buf);
unknown's avatar
unknown committed
824 825 826 827
    if (ndb_supported_type(field->type()))
    {
      DBUG_ASSERT(field->ptr != NULL);
      if (! (field->flags & BLOB_FLAG))
unknown's avatar
unknown committed
828
      {	
829 830 831 832
	byte *field_buf;
	if (field->pack_length() != 0)
	  field_buf= buf + (field->ptr - table->record[0]);
	else
unknown's avatar
unknown committed
833
	  field_buf= (byte *)&dummy_buf;
unknown's avatar
unknown committed
834 835
        m_value[fieldnr].rec= ndb_op->getValue(fieldnr, 
					       field_buf);
unknown's avatar
unknown committed
836 837 838 839 840 841 842 843 844 845
        DBUG_RETURN(m_value[fieldnr].rec == NULL);
      }

      // Blob type
      NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
      m_value[fieldnr].blob= ndb_blob;
      if (ndb_blob != NULL)
      {
        // Set callback
        void *arg= (void *)this;
unknown's avatar
unknown committed
846
        DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0);
unknown's avatar
unknown committed
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
      }
      DBUG_RETURN(1);
    }
    // Unhandled field types
    DBUG_PRINT("error", ("Field type %d not supported", field->type()));
    DBUG_RETURN(2);
  }

  // Used for hidden key only
  m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
  DBUG_RETURN(m_value[fieldnr].rec == NULL);
}


/*
  Check if any set or get of blob value in current query.
*/
bool ha_ndbcluster::uses_blob_value(bool all_fields)
{
  if (table->blob_fields == 0)
unknown's avatar
unknown committed
867
    return FALSE;
unknown's avatar
unknown committed
868
  if (all_fields)
unknown's avatar
unknown committed
869
    return TRUE;
unknown's avatar
unknown committed
870 871 872
  {
    uint no_fields= table->fields;
    int i;
unknown's avatar
unknown committed
873
    THD *thd= table->in_use;
unknown's avatar
unknown committed
874 875 876 877 878 879
    // They always put blobs at the end..
    for (i= no_fields - 1; i >= 0; i--)
    {
      Field *field= table->field[i];
      if (thd->query_id == field->query_id)
      {
unknown's avatar
unknown committed
880
        return TRUE;
unknown's avatar
unknown committed
881 882 883
      }
    }
  }
unknown's avatar
unknown committed
884
  return FALSE;
unknown's avatar
unknown committed
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
}


/*
  Get metadata for this table from NDB 

  IMPLEMENTATION
    - save the NdbDictionary::Table for easy access
    - check that frm-file on disk is equal to frm-file
      of table accessed in NDB
    - build a list of the indexes for the table
*/

int ha_ndbcluster::get_metadata(const char *path)
{
  NDBDICT *dict= m_ndb->getDictionary();
  const NDBTAB *tab;
  int error;
unknown's avatar
unknown committed
903
  bool invalidating_ndb_table= FALSE;
904

unknown's avatar
unknown committed
905 906 907
  DBUG_ENTER("get_metadata");
  DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));

908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
  do {
    const void *data, *pack_data;
    uint length, pack_length;

    if (!(tab= dict->getTable(m_tabname)))
      ERR_RETURN(dict->getNdbError());
    DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
    /*
      Compare FrmData in NDB with frm file from disk.
    */
    error= 0;
    if (readfrm(path, &data, &length) ||
	packfrm(data, length, &pack_data, &pack_length))
    {
      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
      my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
926
    
927 928 929 930 931 932 933
    if ((pack_length != tab->getFrmLength()) || 
	(memcmp(pack_data, tab->getFrmData(), pack_length)))
    {
      if (!invalidating_ndb_table)
      {
	DBUG_PRINT("info", ("Invalidating table"));
	dict->invalidateTable(m_tabname);
unknown's avatar
unknown committed
934
	invalidating_ndb_table= TRUE;
935 936 937 938 939 940 941 942 943
      }
      else
      {
	DBUG_PRINT("error", 
		   ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", 
		    pack_length, tab->getFrmLength(),
		    memcmp(pack_data, tab->getFrmData(), pack_length)));      
	DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
	DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
unknown's avatar
unknown committed
944
	error= 3;
unknown's avatar
unknown committed
945
	invalidating_ndb_table= FALSE;
946 947 948 949
      }
    }
    else
    {
unknown's avatar
unknown committed
950
      invalidating_ndb_table= FALSE;
951 952 953 954 955
    }
    my_free((char*)data, MYF(0));
    my_free((char*)pack_data, MYF(0));
  } while (invalidating_ndb_table);

unknown's avatar
unknown committed
956 957 958
  if (error)
    DBUG_RETURN(error);

unknown's avatar
unknown committed
959 960
  m_table= NULL;
  m_table_info= NULL;
unknown's avatar
unknown committed
961
  
unknown's avatar
unknown committed
962
  DBUG_RETURN(build_index_list(table, ILBP_OPEN));  
963
}
unknown's avatar
unknown committed
964

unknown's avatar
unknown committed
965

unknown's avatar
unknown committed
966
int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
967
{
968
  uint i;
unknown's avatar
unknown committed
969
  int error= 0;
970 971
  const char *name, *index_name;
  char unique_index_name[FN_LEN];
972
  static const char* unique_suffix= "$unique";
unknown's avatar
unknown committed
973 974 975 976
  KEY* key_info= tab->key_info;
  const char **key_name= tab->keynames.type_names;
  NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
  DBUG_ENTER("build_index_list");
977
  
unknown's avatar
unknown committed
978
  // Save information about all known indexes
unknown's avatar
unknown committed
979
  for (i= 0; i < tab->keys; i++, key_info++, key_name++)
980
  {
unknown's avatar
unknown committed
981
    index_name= *key_name;
982
    NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
983
    m_index[i].type= idx_type;
984
    if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
985
    {
986 987 988
      strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
      DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d",
			  unique_index_name, i));
989
    }
unknown's avatar
unknown committed
990 991 992
    // Create secondary indexes if in create phase
    if (phase == ILBP_CREATE)
    {
993 994
      DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));      
      switch (idx_type){
unknown's avatar
unknown committed
995 996 997 998 999 1000 1001 1002 1003
	
      case PRIMARY_KEY_INDEX:
	// Do nothing, already created
	break;
      case PRIMARY_KEY_ORDERED_INDEX:
	error= create_ordered_index(index_name, key_info);
	break;
      case UNIQUE_ORDERED_INDEX:
	if (!(error= create_ordered_index(index_name, key_info)))
1004
	  error= create_unique_index(unique_index_name, key_info);
unknown's avatar
unknown committed
1005 1006
	break;
      case UNIQUE_INDEX:
1007 1008
	if (!(error= check_index_fields_not_null(i)))
	  error= create_unique_index(unique_index_name, key_info);
unknown's avatar
unknown committed
1009 1010 1011 1012 1013
	break;
      case ORDERED_INDEX:
	error= create_ordered_index(index_name, key_info);
	break;
      default:
unknown's avatar
unknown committed
1014
	DBUG_ASSERT(FALSE);
unknown's avatar
unknown committed
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
	break;
      }
      if (error)
      {
	DBUG_PRINT("error", ("Failed to create index %u", i));
	drop_table();
	break;
      }
    }
    // Add handles to index objects
1025
    if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
1026
    {
1027
      DBUG_PRINT("info", ("Get handle to index %s", index_name));
unknown's avatar
unknown committed
1028
      const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
1029
      if (!index) DBUG_RETURN(1);
unknown's avatar
unknown committed
1030
      m_index[i].index= (void *) index;
1031
    }
1032
    if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
1033
    {
1034 1035
      DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
      const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
1036
      if (!index) DBUG_RETURN(1);
unknown's avatar
unknown committed
1037
      m_index[i].unique_index= (void *) index;
1038 1039
    }      
  }
unknown's avatar
unknown committed
1040 1041
  
  DBUG_RETURN(error);
1042 1043
}

1044

unknown's avatar
unknown committed
1045 1046 1047 1048
/*
  Decode the type of an index from information 
  provided in table object
*/
1049
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
unknown's avatar
unknown committed
1050
{
1051 1052 1053
  bool is_hash_index=  (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
  if (inx == table->primary_key)
    return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
unknown's avatar
unknown committed
1054
  else
1055 1056
    return ((table->key_info[inx].flags & HA_NOSAME) ? 
	    (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
unknown's avatar
unknown committed
1057 1058
	    ORDERED_INDEX);
} 
1059

1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
int ha_ndbcluster::check_index_fields_not_null(uint inx)
{
  KEY* key_info= table->key_info + inx;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;
  DBUG_ENTER("check_index_fields_not_null");
  
  for (; key_part != end; key_part++) 
    {
      Field* field= key_part->field;
      if (field->maybe_null())
      {
	my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
			MYF(0),field->field_name);
	DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
      }
    }
  
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1080 1081 1082

void ha_ndbcluster::release_metadata()
{
1083
  uint i;
1084

unknown's avatar
unknown committed
1085 1086 1087 1088
  DBUG_ENTER("release_metadata");
  DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));

  m_table= NULL;
unknown's avatar
unknown committed
1089
  m_table_info= NULL;
unknown's avatar
unknown committed
1090

1091
  // Release index list 
1092 1093
  for (i= 0; i < MAX_KEY; i++)
  {
1094 1095
    m_index[i].unique_index= NULL;      
    m_index[i].index= NULL;      
1096 1097
  }

unknown's avatar
unknown committed
1098 1099 1100
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
1101
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
1102
{
1103
  if (type >= TL_WRITE_ALLOW_WRITE)
unknown's avatar
unknown committed
1104
    return NdbOperation::LM_Exclusive;
1105
  else if (uses_blob_value(m_retrieve_all_fields))
1106
    return NdbOperation::LM_Read;
unknown's avatar
unknown committed
1107
  else
unknown's avatar
unknown committed
1108
    return NdbOperation::LM_CommittedRead;
1109 1110
}

unknown's avatar
unknown committed
1111 1112 1113 1114 1115 1116
static const ulong index_type_flags[]=
{
  /* UNDEFINED_INDEX */
  0,                         

  /* PRIMARY_KEY_INDEX */
1117
  HA_ONLY_WHOLE_INDEX, 
1118 1119

  /* PRIMARY_KEY_ORDERED_INDEX */
1120
  /* 
unknown's avatar
unknown committed
1121
     Enable HA_KEYREAD_ONLY when "sorted" indexes are supported, 
1122 1123 1124
     thus ORDERD BY clauses can be optimized by reading directly 
     through the index.
  */
unknown's avatar
unknown committed
1125
  // HA_KEYREAD_ONLY | 
unknown's avatar
unknown committed
1126 1127 1128
  HA_READ_NEXT |
  HA_READ_RANGE |
  HA_READ_ORDER,
unknown's avatar
unknown committed
1129 1130

  /* UNIQUE_INDEX */
1131
  HA_ONLY_WHOLE_INDEX,
unknown's avatar
unknown committed
1132

1133
  /* UNIQUE_ORDERED_INDEX */
unknown's avatar
unknown committed
1134 1135 1136
  HA_READ_NEXT |
  HA_READ_RANGE |
  HA_READ_ORDER,
1137

unknown's avatar
unknown committed
1138
  /* ORDERED_INDEX */
unknown's avatar
unknown committed
1139 1140 1141
  HA_READ_NEXT |
  HA_READ_RANGE |
  HA_READ_ORDER
unknown's avatar
unknown committed
1142 1143 1144 1145 1146 1147 1148
};

static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);

inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
  DBUG_ASSERT(idx_no < MAX_KEY);
1149
  return m_index[idx_no].type;
unknown's avatar
unknown committed
1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
}


/*
  Get the flags for an index

  RETURN
    flags depending on the type of the index.
*/

1160 1161
inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
                                        bool all_parts) const 
unknown's avatar
unknown committed
1162 1163
{ 
  DBUG_ENTER("index_flags");
1164
  DBUG_PRINT("info", ("idx_no: %d", idx_no));
unknown's avatar
unknown committed
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
  DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
  DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
}


int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
{
  KEY* key_info= table->key_info + table->primary_key;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;
  DBUG_ENTER("set_primary_key");

  for (; key_part != end; key_part++) 
  {
    Field* field= key_part->field;
    if (set_ndb_key(op, field, 
		    key_part->fieldnr-1, key))
      ERR_RETURN(op->getNdbError());
    key += key_part->length;
  }
  DBUG_RETURN(0);
}


1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
int ha_ndbcluster::set_primary_key_from_old_data(NdbOperation *op, const byte *old_data)
{
  KEY* key_info= table->key_info + table->primary_key;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;
  DBUG_ENTER("set_primary_key_from_old_data");

  for (; key_part != end; key_part++) 
  {
    Field* field= key_part->field;
    if (set_ndb_key(op, field, 
		    key_part->fieldnr-1, old_data+key_part->offset))
      ERR_RETURN(op->getNdbError());
  }
  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
int ha_ndbcluster::set_primary_key(NdbOperation *op)
{
  DBUG_ENTER("set_primary_key");
  KEY* key_info= table->key_info + table->primary_key;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;

  for (; key_part != end; key_part++) 
  {
    Field* field= key_part->field;
    if (set_ndb_key(op, field, 
                    key_part->fieldnr-1, field->ptr))
      ERR_RETURN(op->getNdbError());
  }
  DBUG_RETURN(0);
}

1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
int 
ha_ndbcluster::set_index_key(NdbOperation *op, 
			     const KEY *key_info, 
			     const byte * key_ptr)
{
  DBUG_ENTER("set_index_key");
  uint i;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;
  
  for (i= 0; key_part != end; key_part++, i++) 
  {
    if (set_ndb_key(op, key_part->field, i, 
		    key_part->null_bit ? key_ptr + 1 : key_ptr))
      ERR_RETURN(m_active_trans->getNdbError());
    key_ptr+= key_part->store_length;
  }
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1243 1244 1245 1246 1247

/*
  Read one record from NDB using primary key
*/

1248
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) 
unknown's avatar
unknown committed
1249
{
1250 1251 1252 1253
  int res;
  DBUG_ENTER("pk_read");
  DBUG_PRINT("enter", ("key_len: %u", key_len));
  DBUG_DUMP("key", (char*)key, key_len);
unknown's avatar
unknown committed
1254 1255 1256 1257 1258
  uint no_fields= table->fields, i;
  NdbConnection *trans= m_active_trans;
  NdbOperation *op;
  THD *thd= current_thd;

1259 1260
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
unknown's avatar
unknown committed
1261
  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || 
1262
      op->readTuple(lm) != 0)
1263
    ERR_RETURN(trans->getNdbError());
1264
  
unknown's avatar
unknown committed
1265 1266 1267 1268 1269 1270
  if (table->primary_key == MAX_KEY) 
  {
    // This table has no primary key, use "hidden" primary key
    DBUG_PRINT("info", ("Using hidden key"));
    DBUG_DUMP("key", (char*)key, 8);    
    if (set_hidden_key(op, no_fields, key))
1271
      ERR_RETURN(trans->getNdbError());
1272
    
unknown's avatar
unknown committed
1273
    // Read key at the same time, for future reference
unknown's avatar
unknown committed
1274
    if (get_ndb_value(op, NULL, no_fields, NULL))
1275
      ERR_RETURN(trans->getNdbError());
unknown's avatar
unknown committed
1276 1277 1278 1279 1280 1281 1282
  } 
  else 
  {
    if ((res= set_primary_key(op, key)))
      return res;
  }
  
1283 1284
  if((res= define_read_attrs(buf, op)))
    DBUG_RETURN(res);
unknown's avatar
unknown committed
1285
  
unknown's avatar
unknown committed
1286
  if (execute_no_commit_ie(this,trans) != 0) 
unknown's avatar
unknown committed
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(ndb_err(trans));
  }

  // The value have now been fetched from NDB  
  unpack_record(buf);
  table->status= 0;     
  DBUG_RETURN(0);
}

1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
/*
  Read one complementing record from NDB using primary key from old_data
*/

int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
{
  uint no_fields= table->fields, i;
  NdbConnection *trans= m_active_trans;
  NdbOperation *op;
  THD *thd= current_thd;
  DBUG_ENTER("complemented_pk_read");

1310
  if (m_retrieve_all_fields)
1311 1312 1313
    // We have allready retrieved all fields, nothing to complement
    DBUG_RETURN(0);

1314 1315
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
unknown's avatar
unknown committed
1316
  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || 
1317
      op->readTuple(lm) != 0)
1318
    ERR_RETURN(trans->getNdbError());
1319 1320 1321 1322 1323
  
  int res;
  if ((res= set_primary_key_from_old_data(op, old_data)))
    ERR_RETURN(trans->getNdbError());
  
1324 1325 1326 1327
  // Read all unreferenced non-key field(s)
  for (i= 0; i < no_fields; i++) 
  {
    Field *field= table->field[i];
1328 1329
    if (!((field->flags & PRI_KEY_FLAG) ||
	  (thd->query_id == field->query_id)))
1330
    {
unknown's avatar
unknown committed
1331
      if (get_ndb_value(op, field, i, new_data))
1332
	ERR_RETURN(trans->getNdbError());
1333 1334 1335
    }
  }
  
unknown's avatar
unknown committed
1336
  if (execute_no_commit(this,trans) != 0) 
1337 1338 1339 1340 1341 1342 1343 1344
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(ndb_err(trans));
  }

  // The value have now been fetched from NDB  
  unpack_record(new_data);
  table->status= 0;     
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358

  /**
   * restore m_value
   */
  for (i= 0; i < no_fields; i++) 
  {
    Field *field= table->field[i];
    if (!((field->flags & PRI_KEY_FLAG) ||
	  (thd->query_id == field->query_id)))
    {
      m_value[i].ptr= NULL;
    }
  }
  
1359 1360 1361
  DBUG_RETURN(0);
}

1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
/*
  Peek to check if a particular row already exists
*/

int ha_ndbcluster::peek_row()
{
  NdbConnection *trans= m_active_trans;
  NdbOperation *op;
  THD *thd= current_thd;
  DBUG_ENTER("peek_row");
unknown's avatar
unknown committed
1372

1373 1374 1375 1376 1377
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
      op->readTuple(lm) != 0)
    ERR_RETURN(trans->getNdbError());
unknown's avatar
unknown committed
1378

1379 1380 1381
  int res;
  if ((res= set_primary_key(op)))
    ERR_RETURN(trans->getNdbError());
unknown's avatar
unknown committed
1382

1383
  if (execute_no_commit_ie(this,trans) != 0)
unknown's avatar
unknown committed
1384 1385 1386 1387
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(ndb_err(trans));
  } 
1388 1389
  DBUG_RETURN(0);
}
1390

unknown's avatar
unknown committed
1391 1392 1393 1394 1395 1396 1397
/*
  Read one record from NDB using unique secondary index
*/

int ha_ndbcluster::unique_index_read(const byte *key,
				     uint key_len, byte *buf)
{
1398
  int res;
unknown's avatar
unknown committed
1399 1400 1401 1402 1403 1404
  NdbConnection *trans= m_active_trans;
  NdbIndexOperation *op;
  DBUG_ENTER("unique_index_read");
  DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
  DBUG_DUMP("key", (char*)key, key_len);
  
1405 1406
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
1407 1408
  if (!(op= trans->getNdbIndexOperation((NDBINDEX *) 
					m_index[active_index].unique_index, 
unknown's avatar
unknown committed
1409
                                        (const NDBTAB *) m_table)) ||
1410
      op->readTuple(lm) != 0)
unknown's avatar
unknown committed
1411 1412 1413
    ERR_RETURN(trans->getNdbError());
  
  // Set secondary index key(s)
1414 1415 1416
  if((res= set_index_key(op, table->key_info + active_index, key)))
    DBUG_RETURN(res);
  
1417 1418
  if((res= define_read_attrs(buf, op)))
    DBUG_RETURN(res);
unknown's avatar
unknown committed
1419

unknown's avatar
unknown committed
1420
  if (execute_no_commit_ie(this,trans) != 0) 
unknown's avatar
unknown committed
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(ndb_err(trans));
  }
  // The value have now been fetched from NDB
  unpack_record(buf);
  table->status= 0;
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1431
inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
1432 1433
{
  DBUG_ENTER("fetch_next");
1434
  int check;
unknown's avatar
unknown committed
1435
  NdbConnection *trans= m_active_trans;
1436
  
1437
  bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
1438 1439
  do {
    DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
unknown's avatar
unknown committed
1440 1441 1442
    /*
      We can only handle one tuple with blobs at a time.
    */
1443
    if (m_ops_pending && m_blobs_pending)
unknown's avatar
unknown committed
1444
    {
unknown's avatar
unknown committed
1445
      if (execute_no_commit(this,trans) != 0)
1446
	DBUG_RETURN(ndb_err(trans));
1447 1448
      m_ops_pending= 0;
      m_blobs_pending= FALSE;
unknown's avatar
unknown committed
1449
    }
1450 1451
    
    if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
1452 1453 1454 1455 1456 1457 1458
    {
      DBUG_RETURN(0);
    } 
    else if (check == 1 || check == 2)
    {
      // 1: No more records
      // 2: No more cached records
1459
      
1460 1461 1462 1463 1464
      /*
	Before fetching more rows and releasing lock(s),
	all pending update or delete operations should 
	be sent to NDB
      */
1465 1466
      DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
      if (m_ops_pending)
1467
      {
1468
	if (m_transaction_on)
1469 1470
	{
	  if (execute_no_commit(this,trans) != 0)
unknown's avatar
unknown committed
1471
	    DBUG_RETURN(-1);
1472 1473 1474 1475
	}
	else
	{
	  if  (execute_commit(this,trans) != 0)
unknown's avatar
unknown committed
1476
	    DBUG_RETURN(-1);
unknown's avatar
unknown committed
1477 1478
	  int res= trans->restart();
	  DBUG_ASSERT(res == 0);
1479
	}
1480
	m_ops_pending= 0;
1481
      }
1482 1483
      contact_ndb= (check == 2);
    }
unknown's avatar
unknown committed
1484 1485 1486 1487
    else
    {
      DBUG_RETURN(-1);
    }
1488
  } while (check == 2);
unknown's avatar
unknown committed
1489

1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
  DBUG_RETURN(1);
}

/*
  Get the next record of a started scan. Try to fetch
  it locally from NdbApi cached records if possible, 
  otherwise ask NDB for more.

  NOTE
  If this is a update/delete make sure to not contact 
  NDB before any pending ops have been sent to NDB.
unknown's avatar
unknown committed
1501

1502 1503 1504 1505 1506 1507 1508
*/

inline int ha_ndbcluster::next_result(byte *buf)
{  
  int res;
  DBUG_ENTER("next_result");
    
1509 1510 1511
  if (!m_active_cursor)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  
unknown's avatar
unknown committed
1512
  if((res= fetch_next(m_active_cursor)) == 0)
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
  {
    DBUG_PRINT("info", ("One more record found"));    
    
    unpack_record(buf);
    table->status= 0;
    DBUG_RETURN(0);
  }
  else if(res == 1)
  {
    // No more records
    table->status= STATUS_NOT_FOUND;
    
    DBUG_PRINT("info", ("No more records"));
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  }
  else
  {
    DBUG_RETURN(ndb_err(m_active_trans));
  }
unknown's avatar
unknown committed
1532 1533
}

1534
/*
1535
  Set bounds for ordered index scan.
1536 1537
*/

unknown's avatar
unknown committed
1538
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
1539 1540
			      const key_range *keys[2],
			      uint range_no)
1541
{
1542 1543 1544 1545
  const KEY *const key_info= table->key_info + active_index;
  const uint key_parts= key_info->key_parts;
  uint key_tot_len[2];
  uint tot_len;
1546
  uint i, j;
1547 1548

  DBUG_ENTER("set_bounds");
1549
  DBUG_PRINT("info", ("key_parts=%d", key_parts));
1550

1551
  for (j= 0; j <= 1; j++)
1552
  {
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565
    const key_range *key= keys[j];
    if (key != NULL)
    {
      // for key->flag see ha_rkey_function
      DBUG_PRINT("info", ("key %d length=%d flag=%d",
                          j, key->length, key->flag));
      key_tot_len[j]= key->length;
    }
    else
    {
      DBUG_PRINT("info", ("key %d not present", j));
      key_tot_len[j]= 0;
    }
1566 1567
  }
  tot_len= 0;
unknown's avatar
unknown committed
1568

1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587
  for (i= 0; i < key_parts; i++)
  {
    KEY_PART_INFO *key_part= &key_info->key_part[i];
    Field *field= key_part->field;
    uint part_len= key_part->length;
    uint part_store_len= key_part->store_length;
    // Info about each key part
    struct part_st {
      bool part_last;
      const key_range *key;
      const byte *part_ptr;
      bool part_null;
      int bound_type;
      const char* bound_ptr;
    };
    struct part_st part[2];

    for (j= 0; j <= 1; j++)
    {
1588
      struct part_st &p= part[j];
1589 1590 1591 1592 1593 1594 1595
      p.key= NULL;
      p.bound_type= -1;
      if (tot_len < key_tot_len[j])
      {
        p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
        p.key= keys[j];
        p.part_ptr= &p.key->key[tot_len];
unknown's avatar
unknown committed
1596
        p.part_null= key_part->null_bit && *p.part_ptr;
1597
        p.bound_ptr= (const char *)
unknown's avatar
unknown committed
1598
          p.part_null ? 0 : key_part->null_bit ? p.part_ptr + 1 : p.part_ptr;
1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635

        if (j == 0)
        {
          switch (p.key->flag)
          {
            case HA_READ_KEY_EXACT:
              p.bound_type= NdbIndexScanOperation::BoundEQ;
              break;
            case HA_READ_KEY_OR_NEXT:
              p.bound_type= NdbIndexScanOperation::BoundLE;
              break;
            case HA_READ_AFTER_KEY:
              if (! p.part_last)
                p.bound_type= NdbIndexScanOperation::BoundLE;
              else
                p.bound_type= NdbIndexScanOperation::BoundLT;
              break;
            default:
              break;
          }
        }
        if (j == 1) {
          switch (p.key->flag)
          {
            case HA_READ_BEFORE_KEY:
              if (! p.part_last)
                p.bound_type= NdbIndexScanOperation::BoundGE;
              else
                p.bound_type= NdbIndexScanOperation::BoundGT;
              break;
            case HA_READ_AFTER_KEY:     // weird
              p.bound_type= NdbIndexScanOperation::BoundGE;
              break;
            default:
              break;
          }
        }
1636

1637 1638 1639 1640 1641
        if (p.bound_type == -1)
        {
          DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
          DBUG_ASSERT(false);
          // Stop setting bounds but continue with what we have
1642
	  op->end_of_bound(range_no);
1643 1644 1645 1646
          DBUG_RETURN(0);
        }
      }
    }
1647

1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
    // Seen with e.g. b = 1 and c > 1
    if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
        part[1].bound_type == NdbIndexScanOperation::BoundGE &&
        memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
    {
      DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
      part[0].bound_type= NdbIndexScanOperation::BoundEQ;
      part[1].bound_type= -1;
    }
    // Not seen but was in previous version
    if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
        part[1].bound_type == NdbIndexScanOperation::BoundGE &&
        memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
    {
      DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
      part[1].bound_type= -1;
    }
1665

1666 1667
    for (j= 0; j <= 1; j++)
    {
1668
      struct part_st &p= part[j];
1669 1670 1671 1672 1673 1674 1675 1676 1677
      // Set bound if not done with this key
      if (p.key != NULL)
      {
        DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
                            j, i, tot_len, part_len, p.part_last, p.bound_type));
        DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);

        // Set bound if not cancelled via type -1
        if (p.bound_type != -1)
1678 1679 1680 1681 1682
	{
	  char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
	  strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
	  truncated_field_name[sizeof(truncated_field_name)-1]= '\0';
          if (op->setBound(truncated_field_name, p.bound_type, p.bound_ptr))
1683
            ERR_RETURN(op->getNdbError());
1684
	}
1685 1686 1687 1688
      }
    }

    tot_len+= part_store_len;
1689
  }
1690
  op->end_of_bound(range_no);
1691 1692 1693
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
inline 
int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
{
  uint i;
  THD *thd= current_thd;
  NdbConnection *trans= m_active_trans;

  DBUG_ENTER("define_read_attrs");  

  // Define attributes to read
  for (i= 0; i < table->fields; i++) 
  {
    Field *field= table->field[i];
    if ((thd->query_id == field->query_id) ||
1708
	((field->flags & PRI_KEY_FLAG)) || 
1709
	m_retrieve_all_fields)
unknown's avatar
unknown committed
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732
    {      
      if (get_ndb_value(op, field, i, buf))
	ERR_RETURN(op->getNdbError());
    } 
    else 
    {
      m_value[i].ptr= NULL;
    }
  }
    
  if (table->primary_key == MAX_KEY) 
  {
    DBUG_PRINT("info", ("Getting hidden key"));
    // Scanning table with no primary key
    int hidden_no= table->fields;      
#ifndef DBUG_OFF
    const NDBTAB *tab= (const NDBTAB *) m_table;    
    if (!tab->getColumn(hidden_no))
      DBUG_RETURN(1);
#endif
    if (get_ndb_value(op, NULL, hidden_no, NULL))
      ERR_RETURN(op->getNdbError());
  }
1733
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1734
} 
1735

unknown's avatar
unknown committed
1736
/*
1737
  Start ordered index scan in NDB
unknown's avatar
unknown committed
1738 1739
*/

1740 1741 1742
int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
				      const key_range *end_key,
				      bool sorted, byte* buf)
unknown's avatar
unknown committed
1743
{  
1744
  int res;
unknown's avatar
unknown committed
1745
  bool restart;
unknown's avatar
unknown committed
1746
  NdbConnection *trans= m_active_trans;
unknown's avatar
unknown committed
1747
  NdbIndexScanOperation *op;
1748

unknown's avatar
unknown committed
1749
  DBUG_ENTER("ordered_index_scan");
1750
  DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));  
unknown's avatar
unknown committed
1751
  DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
unknown's avatar
unknown committed
1752

1753 1754
  // Check that sorted seems to be initialised
  DBUG_ASSERT(sorted == 0 || sorted == 1);
unknown's avatar
unknown committed
1755
  
1756
  if (m_active_cursor == 0)
unknown's avatar
unknown committed
1757 1758 1759 1760 1761 1762 1763
  {
    restart= false;
    NdbOperation::LockMode lm=
      (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
    if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
					      m_index[active_index].index, 
					      (const NDBTAB *) m_table)) ||
1764
	op->readTuples(lm, 0, parallelism, sorted))
unknown's avatar
unknown committed
1765
      ERR_RETURN(trans->getNdbError());
1766
    m_active_cursor= op;
unknown's avatar
unknown committed
1767 1768
  } else {
    restart= true;
1769
    op= (NdbIndexScanOperation*)m_active_cursor;
unknown's avatar
unknown committed
1770 1771 1772 1773
    
    DBUG_ASSERT(op->getSorted() == sorted);
    DBUG_ASSERT(op->getLockMode() == 
		(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
unknown's avatar
unknown committed
1774
    if(op->reset_bounds(m_force_send))
unknown's avatar
unknown committed
1775 1776
      DBUG_RETURN(ndb_err(m_active_trans));
  }
1777
  
1778
  {
1779
    const key_range *keys[2]= { start_key, end_key };
1780 1781 1782
    res= set_bounds(op, keys);
    if (res)
      DBUG_RETURN(res);
1783
  }
1784
  
1785
  if (!restart && (res= define_read_attrs(buf, op)))
1786
  {
1787
    DBUG_RETURN(res);
unknown's avatar
unknown committed
1788
  }
1789 1790 1791 1792 1793 1794

  if (execute_no_commit(this,trans) != 0)
    DBUG_RETURN(ndb_err(trans));
  
  DBUG_RETURN(next_result(buf));
}
unknown's avatar
unknown committed
1795 1796

/*
1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
  Start a filtered scan in NDB.

  NOTE
  This function is here as an example of how to start a
  filtered scan. It should be possible to replace full_table_scan 
  with this function and make a best effort attempt 
  at filtering out the irrelevant data by converting the "items" 
  into interpreted instructions.
  This would speed up table scans where there is a limiting WHERE clause
  that doesn't match any index in the table.

unknown's avatar
unknown committed
1808 1809 1810 1811 1812 1813
 */

int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, 
				 byte *buf,
				 enum ha_rkey_function find_flag)
{  
1814
  int res;
unknown's avatar
unknown committed
1815
  NdbConnection *trans= m_active_trans;
1816
  NdbScanOperation *op;
unknown's avatar
unknown committed
1817 1818 1819 1820 1821 1822 1823

  DBUG_ENTER("filtered_scan");
  DBUG_PRINT("enter", ("key_len: %u, index: %u", 
                       key_len, active_index));
  DBUG_DUMP("key", (char*)key, key_len);  
  DBUG_PRINT("info", ("Starting a new filtered scan on %s",
		      m_tabname));
1824

1825 1826 1827
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
1828
      op->readTuples(lm, 0, parallelism))
unknown's avatar
unknown committed
1829
    ERR_RETURN(trans->getNdbError());
1830
  m_active_cursor= op;
unknown's avatar
unknown committed
1831
  
unknown's avatar
unknown committed
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846
  {
    // Start scan filter
    NdbScanFilter sf(op);
    sf.begin();
      
    // Set filter using the supplied key data
    byte *key_ptr= (byte *) key;    
    uint tot_len= 0;
    KEY* key_info= table->key_info + active_index;
    for (uint k= 0; k < key_info->key_parts; k++) 
    {
      KEY_PART_INFO* key_part= key_info->key_part+k;
      Field* field= key_part->field;
      uint ndb_fieldnr= key_part->fieldnr-1;
      DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
1847
      //const NDBCOL *col= ((const NDBTAB *) m_table)->getColumn(ndb_fieldnr);
unknown's avatar
unknown committed
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876
      uint32 field_len=  field->pack_length();
      DBUG_DUMP("key", (char*)key, field_len);
	
      DBUG_PRINT("info", ("Column %s, type: %d, len: %d", 
			  field->field_name, field->real_type(), field_len));
	
      // Define scan filter
      if (field->real_type() == MYSQL_TYPE_STRING)
	sf.eq(ndb_fieldnr, key_ptr, field_len);
      else 
      {
	if (field_len == 8)
	  sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
	else if (field_len <= 4)
	  sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
	else 
	  DBUG_RETURN(1);
      }
	
      key_ptr += field_len;
      tot_len += field_len;
	
      if (tot_len >= key_len)
	break;
    }
    // End scan filter
    sf.end();
  }

1877 1878
  if((res= define_read_attrs(buf, op)))
    DBUG_RETURN(res);
unknown's avatar
unknown committed
1879

1880 1881 1882 1883 1884
  if (execute_no_commit(this,trans) != 0)
    DBUG_RETURN(ndb_err(trans));
  DBUG_PRINT("exit", ("Scan started successfully"));
  DBUG_RETURN(next_result(buf));
}
unknown's avatar
unknown committed
1885 1886

/*
1887
  Start full table scan in NDB
unknown's avatar
unknown committed
1888 1889 1890 1891 1892
 */

int ha_ndbcluster::full_table_scan(byte *buf)
{
  uint i;
1893
  int res;
unknown's avatar
unknown committed
1894
  NdbScanOperation *op;
1895
  NdbConnection *trans= m_active_trans;
unknown's avatar
unknown committed
1896 1897 1898 1899

  DBUG_ENTER("full_table_scan");  
  DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));

1900 1901 1902
  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
1903
      op->readTuples(lm, 0, parallelism))
unknown's avatar
unknown committed
1904
    ERR_RETURN(trans->getNdbError());
1905
  m_active_cursor= op;
1906
  generate_scan_filter(m_cond_stack, op);
1907 1908 1909 1910 1911 1912 1913
  if((res= define_read_attrs(buf, op)))
    DBUG_RETURN(res);

  if (execute_no_commit(this,trans) != 0)
    DBUG_RETURN(ndb_err(trans));
  DBUG_PRINT("exit", ("Scan started successfully"));
  DBUG_RETURN(next_result(buf));
1914 1915
}

unknown's avatar
unknown committed
1916 1917 1918 1919 1920
/*
  Insert one record into NDB
*/
int ha_ndbcluster::write_row(byte *record)
{
unknown's avatar
unknown committed
1921
  bool has_auto_increment;
unknown's avatar
unknown committed
1922 1923 1924 1925
  uint i;
  NdbConnection *trans= m_active_trans;
  NdbOperation *op;
  int res;
unknown's avatar
unknown committed
1926 1927
  THD *thd= current_thd;

unknown's avatar
unknown committed
1928
  DBUG_ENTER("write_row");
1929

1930
  if(m_ignore_dup_key && table->primary_key != MAX_KEY)
1931
  {
1932 1933 1934 1935 1936 1937 1938 1939 1940
    int peek_res= peek_row();
    
    if (!peek_res) 
    {
      m_dupkey= table->primary_key;
      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
    }
    if (peek_res != HA_ERR_KEY_NOT_FOUND)
      DBUG_RETURN(peek_res);
1941
  }
unknown's avatar
unknown committed
1942
  
unknown's avatar
unknown committed
1943
  statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
unknown's avatar
unknown committed
1944 1945
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
1946
  has_auto_increment= (table->next_number_field && record == table->record[0]);
unknown's avatar
unknown committed
1947

unknown's avatar
unknown committed
1948
  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
unknown's avatar
unknown committed
1949 1950 1951 1952 1953 1954 1955 1956 1957
    ERR_RETURN(trans->getNdbError());

  res= (m_use_write) ? op->writeTuple() :op->insertTuple(); 
  if (res != 0)
    ERR_RETURN(trans->getNdbError());  
 
  if (table->primary_key == MAX_KEY) 
  {
    // Table has hidden primary key
unknown's avatar
unknown committed
1958
    Uint64 auto_value= m_ndb->getAutoIncrementValue((const NDBTAB *) m_table);
unknown's avatar
unknown committed
1959 1960 1961 1962 1963 1964
    if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
      ERR_RETURN(op->getNdbError());
  } 
  else 
  {
    int res;
1965

1966 1967
    if (has_auto_increment) 
    {
1968
      m_skip_auto_increment= FALSE;
1969
      update_auto_increment();
1970
      m_skip_auto_increment= !auto_increment_column_changed;
1971
    }
1972

unknown's avatar
unknown committed
1973 1974 1975 1976 1977
    if ((res= set_primary_key(op)))
      return res;
  }

  // Set non-key attribute(s)
unknown's avatar
unknown committed
1978
  bool set_blob_value= FALSE;
unknown's avatar
unknown committed
1979 1980 1981 1982
  for (i= 0; i < table->fields; i++) 
  {
    Field *field= table->field[i];
    if (!(field->flags & PRI_KEY_FLAG) &&
1983
	set_ndb_value(op, field, i, &set_blob_value))
1984
    {
1985
      m_skip_auto_increment= TRUE;
unknown's avatar
unknown committed
1986
      ERR_RETURN(op->getNdbError());
1987
    }
unknown's avatar
unknown committed
1988 1989 1990 1991 1992 1993 1994 1995 1996
  }

  /*
    Execute write operation
    NOTE When doing inserts with many values in 
    each INSERT statement it should not be necessary
    to NoCommit the transaction between each row.
    Find out how this is detected!
  */
1997
  m_rows_inserted++;
1998
  no_uncommitted_rows_update(1);
1999 2000 2001
  m_bulk_insert_not_flushed= TRUE;
  if ((m_rows_to_insert == 1) || 
      ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
2002
      set_blob_value)
2003
  {
2004
    THD *thd= current_thd;
2005 2006 2007
    // Send rows to NDB
    DBUG_PRINT("info", ("Sending inserts to NDB, "\
			"rows_inserted:%d, bulk_insert_rows: %d", 
2008
			(int)m_rows_inserted, (int)m_bulk_insert_rows));
2009

2010
    m_bulk_insert_not_flushed= FALSE;
2011 2012
    //    if (thd->transaction.on)
    if (m_transaction_on)
2013
    {
unknown's avatar
unknown committed
2014
      if (execute_no_commit(this,trans) != 0)
2015
      {
2016
	m_skip_auto_increment= TRUE;
2017
	no_uncommitted_rows_execute_failure();
2018 2019
	DBUG_RETURN(ndb_err(trans));
      }
2020 2021
    }
    else
2022
    {
unknown's avatar
unknown committed
2023
      if (execute_commit(this,trans) != 0)
2024
      {
2025
	m_skip_auto_increment= TRUE;
2026
	no_uncommitted_rows_execute_failure();
2027
	DBUG_RETURN(ndb_err(trans));
2028
      }
unknown's avatar
unknown committed
2029 2030
      int res= trans->restart();
      DBUG_ASSERT(res == 0);
2031
    }
2032
  }
2033
  if ((has_auto_increment) && (m_skip_auto_increment))
unknown's avatar
unknown committed
2034
  {
2035
    Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
unknown's avatar
unknown committed
2036
    DBUG_PRINT("info", 
2037 2038
	       ("Trying to set next auto increment value to %lu",
                (ulong) next_val));
unknown's avatar
unknown committed
2039
    if (m_ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE))
unknown's avatar
unknown committed
2040 2041
      DBUG_PRINT("info", 
		 ("Setting next auto increment value to %u", next_val));  
2042
  }
2043
  m_skip_auto_increment= TRUE;
2044

unknown's avatar
unknown committed
2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
  DBUG_RETURN(0);
}


/* Compare if a key in a row has changed */

int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
			   const byte * new_row)
{
  KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
  KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;

  for (; key_part != end ; key_part++)
  {
    if (key_part->null_bit)
    {
      if ((old_row[key_part->null_offset] & key_part->null_bit) !=
	  (new_row[key_part->null_offset] & key_part->null_bit))
	return 1;
    }
2065
    if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
unknown's avatar
unknown committed
2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090
    {

      if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
				      (char*) (new_row + key_part->offset),
				      (ulong) key_part->length))
	return 1;
    }
    else
    {
      if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
		 key_part->length))
	return 1;
    }
  }
  return 0;
}

/*
  Update one record in NDB using primary key
*/

int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
  THD *thd= current_thd;
  NdbConnection *trans= m_active_trans;
2091
  NdbScanOperation* cursor= m_active_cursor;
unknown's avatar
unknown committed
2092 2093 2094 2095
  NdbOperation *op;
  uint i;
  DBUG_ENTER("update_row");
  
unknown's avatar
unknown committed
2096
  statistic_increment(thd->status_var.ha_update_count, &LOCK_status);
unknown's avatar
unknown committed
2097 2098 2099
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
    table->timestamp_field->set_time();

2100
  /* Check for update of primary key for special handling */  
2101 2102
  if ((table->primary_key != MAX_KEY) &&
      (key_cmp(table->primary_key, old_data, new_data)))
2103
  {
2104
    int read_res, insert_res, delete_res;
2105

2106
    DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
2107
    // Get all old fields, since we optimize away fields not in query
2108
    read_res= complemented_pk_read(old_data, new_data);
2109 2110 2111 2112 2113 2114
    if (read_res)
    {
      DBUG_PRINT("info", ("pk read failed"));
      DBUG_RETURN(read_res);
    }
    // Insert new row
2115 2116
    insert_res= write_row(new_data);
    if (insert_res)
2117 2118 2119 2120
    {
      DBUG_PRINT("info", ("insert failed"));
      DBUG_RETURN(insert_res);
    }
2121 2122
    // Delete old row
    DBUG_PRINT("info", ("insert succeded"));
2123
    m_primary_key_update= TRUE;
2124
    delete_res= delete_row(old_data);
2125
    m_primary_key_update= FALSE;
2126 2127 2128 2129 2130 2131 2132 2133
    if (delete_res)
    {
      DBUG_PRINT("info", ("delete failed"));
      // Undo write_row(new_data)
      DBUG_RETURN(delete_row(new_data));
    }     
    DBUG_PRINT("info", ("insert+delete succeeded"));
    DBUG_RETURN(0);
2134
  }
2135

2136
  if (cursor)
unknown's avatar
unknown committed
2137
  {
2138 2139 2140 2141 2142 2143 2144 2145
    /*
      We are scanning records and want to update the record
      that was just found, call updateTuple on the cursor 
      to take over the lock to a new update operation
      And thus setting the primary key of the record from 
      the active record in cursor
    */
    DBUG_PRINT("info", ("Calling updateTuple on cursor"));
2146
    if (!(op= cursor->updateCurrentTuple()))
2147
      ERR_RETURN(trans->getNdbError());
2148
    m_ops_pending++;
unknown's avatar
unknown committed
2149
    if (uses_blob_value(FALSE))
2150
      m_blobs_pending= TRUE;
2151 2152 2153
  }
  else
  {  
unknown's avatar
unknown committed
2154
    if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165
	op->updateTuple() != 0)
      ERR_RETURN(trans->getNdbError());  
    
    if (table->primary_key == MAX_KEY) 
    {
      // This table has no primary key, use "hidden" primary key
      DBUG_PRINT("info", ("Using hidden key"));
      
      // Require that the PK for this record has previously been 
      // read into m_value
      uint no_fields= table->fields;
2166
      const NdbRecAttr* rec= m_value[no_fields].rec;
2167 2168 2169 2170 2171 2172 2173 2174 2175
      DBUG_ASSERT(rec);
      DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
      
      if (set_hidden_key(op, no_fields, rec->aRef()))
	ERR_RETURN(op->getNdbError());
    } 
    else 
    {
      int res;
2176
      if ((res= set_primary_key_from_old_data(op, old_data)))
2177 2178
	DBUG_RETURN(res);
    }
unknown's avatar
unknown committed
2179 2180 2181 2182 2183 2184
  }

  // Set non-key attribute(s)
  for (i= 0; i < table->fields; i++) 
  {
    Field *field= table->field[i];
2185
    if (((thd->query_id == field->query_id) || m_retrieve_all_fields) &&
unknown's avatar
unknown committed
2186 2187 2188 2189
        (!(field->flags & PRI_KEY_FLAG)) &&
	set_ndb_value(op, field, i))
      ERR_RETURN(op->getNdbError());
  }
2190

unknown's avatar
unknown committed
2191
  // Execute update operation
unknown's avatar
unknown committed
2192
  if (!cursor && execute_no_commit(this,trans) != 0) {
2193
    no_uncommitted_rows_execute_failure();
unknown's avatar
unknown committed
2194
    DBUG_RETURN(ndb_err(trans));
2195
  }
unknown's avatar
unknown committed
2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206
  
  DBUG_RETURN(0);
}


/*
  Delete one record from NDB, using primary key 
*/

int ha_ndbcluster::delete_row(const byte *record)
{
unknown's avatar
unknown committed
2207
  THD *thd= current_thd;
unknown's avatar
unknown committed
2208
  NdbConnection *trans= m_active_trans;
2209
  NdbScanOperation* cursor= m_active_cursor;
unknown's avatar
unknown committed
2210 2211 2212
  NdbOperation *op;
  DBUG_ENTER("delete_row");

unknown's avatar
unknown committed
2213
  statistic_increment(thd->status_var.ha_delete_count,&LOCK_status);
unknown's avatar
unknown committed
2214

2215
  if (cursor)
unknown's avatar
unknown committed
2216
  {
2217
    /*
2218
      We are scanning records and want to delete the record
2219
      that was just found, call deleteTuple on the cursor 
2220
      to take over the lock to a new delete operation
2221 2222 2223 2224
      And thus setting the primary key of the record from 
      the active record in cursor
    */
    DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
2225
    if (cursor->deleteCurrentTuple() != 0)
2226
      ERR_RETURN(trans->getNdbError());     
2227
    m_ops_pending++;
unknown's avatar
unknown committed
2228

2229 2230
    no_uncommitted_rows_update(-1);

2231 2232 2233 2234
    // If deleting from cursor, NoCommit will be handled in next_result
    DBUG_RETURN(0);
  }
  else
unknown's avatar
unknown committed
2235
  {
2236
    
unknown's avatar
unknown committed
2237
    if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) || 
2238 2239 2240
	op->deleteTuple() != 0)
      ERR_RETURN(trans->getNdbError());
    
2241 2242
    no_uncommitted_rows_update(-1);
    
2243 2244 2245 2246 2247
    if (table->primary_key == MAX_KEY) 
    {
      // This table has no primary key, use "hidden" primary key
      DBUG_PRINT("info", ("Using hidden key"));
      uint no_fields= table->fields;
2248
      const NdbRecAttr* rec= m_value[no_fields].rec;
2249 2250 2251 2252 2253 2254 2255 2256
      DBUG_ASSERT(rec != NULL);
      
      if (set_hidden_key(op, no_fields, rec->aRef()))
	ERR_RETURN(op->getNdbError());
    } 
    else 
    {
      int res;
2257 2258 2259 2260
      if ((res= (m_primary_key_update ?
		 set_primary_key_from_old_data(op, record)
		 : set_primary_key(op))))
	  return res;  
2261
    }
unknown's avatar
unknown committed
2262
  }
2263
  
unknown's avatar
unknown committed
2264
  // Execute delete operation
unknown's avatar
unknown committed
2265
  if (execute_no_commit(this,trans) != 0) {
2266
    no_uncommitted_rows_execute_failure();
unknown's avatar
unknown committed
2267
    DBUG_RETURN(ndb_err(trans));
2268
  }
unknown's avatar
unknown committed
2269 2270
  DBUG_RETURN(0);
}
2271
  
unknown's avatar
unknown committed
2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289
/*
  Unpack a record read from NDB 

  SYNOPSIS
    unpack_record()
    buf			Buffer to store read row

  NOTE
    The data for each row is read directly into the
    destination buffer. This function is primarily 
    called in order to check if any fields should be 
    set to null.
*/

void ha_ndbcluster::unpack_record(byte* buf)
{
  uint row_offset= (uint) (buf - table->record[0]);
  Field **field, **end;
unknown's avatar
unknown committed
2290
  NdbValue *value= m_value;
unknown's avatar
unknown committed
2291
  DBUG_ENTER("unpack_record");
2292

2293
  end= table->field + table->fields;
unknown's avatar
unknown committed
2294 2295 2296 2297 2298 2299 2300
  
  // Set null flag(s)
  bzero(buf, table->null_bytes);
  for (field= table->field, end= field+table->fields;
       field < end;
       field++, value++)
  {
unknown's avatar
unknown committed
2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
    if ((*value).ptr)
    {
      if (! ((*field)->flags & BLOB_FLAG))
      {
        if ((*value).rec->isNULL())
         (*field)->set_null(row_offset);
      }
      else
      {
        NdbBlob* ndb_blob= (*value).blob;
unknown's avatar
unknown committed
2311
        bool isNull= TRUE;
unknown's avatar
unknown committed
2312 2313 2314 2315 2316 2317
        int ret= ndb_blob->getNull(isNull);
        DBUG_ASSERT(ret == 0);
        if (isNull)
         (*field)->set_null(row_offset);
      }
    }
unknown's avatar
unknown committed
2318
  }
2319
  
unknown's avatar
unknown committed
2320 2321 2322 2323 2324 2325
#ifndef DBUG_OFF
  // Read and print all values that was fetched
  if (table->primary_key == MAX_KEY)
  {
    // Table with hidden primary key
    int hidden_no= table->fields;
unknown's avatar
unknown committed
2326
    const NDBTAB *tab= (const NDBTAB *) m_table;
unknown's avatar
unknown committed
2327
    const NDBCOL *hidden_col= tab->getColumn(hidden_no);
2328
    const NdbRecAttr* rec= m_value[hidden_no].rec;
unknown's avatar
unknown committed
2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343
    DBUG_ASSERT(rec);
    DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, 
                          hidden_col->getName(), rec->u_64_value()));
  } 
  print_results();
#endif
  DBUG_VOID_RETURN;
}

/*
  Utility function to print/dump the fetched field
 */

void ha_ndbcluster::print_results()
{
unknown's avatar
unknown committed
2344
  const NDBTAB *tab= (const NDBTAB*) m_table;
unknown's avatar
unknown committed
2345 2346 2347
  DBUG_ENTER("print_results");

#ifndef DBUG_OFF
2348

unknown's avatar
unknown committed
2349 2350 2351 2352 2353
  if (!_db_on_)
    DBUG_VOID_RETURN;
  
  for (uint f=0; f<table->fields;f++)
  {
2354 2355
    // Use DBUG_PRINT since DBUG_FILE cannot be filtered out
    char buf[2000];
unknown's avatar
unknown committed
2356 2357
    Field *field;
    const NDBCOL *col;
unknown's avatar
unknown committed
2358
    NdbValue value;
2359
    NdbBlob *ndb_blob;
unknown's avatar
unknown committed
2360

2361
    buf[0] = 0;
unknown's avatar
unknown committed
2362
    if (!(value= m_value[f]).ptr)
unknown's avatar
unknown committed
2363
    {
2364 2365
      my_snprintf(buf, sizeof(buf), "not read");
      goto print_value;
unknown's avatar
unknown committed
2366 2367 2368 2369
    }
    field= table->field[f];
    DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
    col= tab->getColumn(f);
unknown's avatar
unknown committed
2370 2371

    if (! (field->flags & BLOB_FLAG))
unknown's avatar
unknown committed
2372
    {
2373
      ndb_blob= NULL;
unknown's avatar
unknown committed
2374 2375
      if (value.rec->isNULL())
      {
2376 2377
        my_snprintf(buf, sizeof(buf), "NULL");
        goto print_value;
unknown's avatar
unknown committed
2378 2379 2380 2381 2382
      }
    }
    else
    {
      ndb_blob= value.blob;
unknown's avatar
unknown committed
2383
      bool isNull= TRUE;
unknown's avatar
unknown committed
2384 2385
      ndb_blob->getNull(isNull);
      if (isNull) {
2386 2387
        my_snprintf(buf, sizeof(buf), "NULL");
        goto print_value;
unknown's avatar
unknown committed
2388
      }
unknown's avatar
unknown committed
2389 2390 2391 2392 2393
    }

    switch (col->getType()) {
    case NdbDictionary::Column::Tinyint: {
      char value= *field->ptr;
2394
      my_snprintf(buf, sizeof(buf), "Tinyint %d", value);
unknown's avatar
unknown committed
2395 2396 2397 2398
      break;
    }
    case NdbDictionary::Column::Tinyunsigned: {
      unsigned char value= *field->ptr;
2399
      my_snprintf(buf, sizeof(buf), "Tinyunsigned %u", value);
unknown's avatar
unknown committed
2400 2401 2402 2403
      break;
    }
    case NdbDictionary::Column::Smallint: {
      short value= *field->ptr;
2404
      my_snprintf(buf, sizeof(buf), "Smallint %d", value);
unknown's avatar
unknown committed
2405 2406 2407 2408
      break;
    }
    case NdbDictionary::Column::Smallunsigned: {
      unsigned short value= *field->ptr;
2409
      my_snprintf(buf, sizeof(buf), "Smallunsigned %u", value);
unknown's avatar
unknown committed
2410 2411 2412 2413 2414
      break;
    }
    case NdbDictionary::Column::Mediumint: {
      byte value[3];
      memcpy(value, field->ptr, 3);
2415
      my_snprintf(buf, sizeof(buf), "Mediumint %d,%d,%d", value[0], value[1], value[2]);
unknown's avatar
unknown committed
2416 2417 2418 2419 2420
      break;
    }
    case NdbDictionary::Column::Mediumunsigned: {
      byte value[3];
      memcpy(value, field->ptr, 3);
2421
      my_snprintf(buf, sizeof(buf), "Mediumunsigned %u,%u,%u", value[0], value[1], value[2]);
unknown's avatar
unknown committed
2422 2423 2424
      break;
    }
    case NdbDictionary::Column::Int: {
2425
      my_snprintf(buf, sizeof(buf), "Int %d", value);
unknown's avatar
unknown committed
2426 2427 2428 2429
      break;
    }
    case NdbDictionary::Column::Unsigned: {
      Uint32 value= (Uint32) *field->ptr;
2430
      my_snprintf(buf, sizeof(buf), "Unsigned %u", value);
unknown's avatar
unknown committed
2431 2432 2433 2434
      break;
    }
    case NdbDictionary::Column::Bigint: {
      Int64 value= (Int64) *field->ptr;
2435
      my_snprintf(buf, sizeof(buf), "Bigint %lld", value);
unknown's avatar
unknown committed
2436 2437 2438 2439
      break;
    }
    case NdbDictionary::Column::Bigunsigned: {
      Uint64 value= (Uint64) *field->ptr;
2440
      my_snprintf(buf, sizeof(buf), "Bigunsigned %llu", value);
unknown's avatar
unknown committed
2441 2442 2443 2444
      break;
    }
    case NdbDictionary::Column::Float: {
      float value= (float) *field->ptr;
2445
      my_snprintf(buf, sizeof(buf), "Float %f", (double)value);
unknown's avatar
unknown committed
2446 2447 2448 2449
      break;
    }
    case NdbDictionary::Column::Double: {
      double value= (double) *field->ptr;
2450
      my_snprintf(buf, sizeof(buf), "Double %f", value);
unknown's avatar
unknown committed
2451 2452 2453 2454
      break;
    }
    case NdbDictionary::Column::Decimal: {
      char *value= field->ptr;
2455
      my_snprintf(buf, sizeof(buf), "Decimal '%-*s'", field->pack_length(), value);
unknown's avatar
unknown committed
2456 2457 2458
      break;
    }
    case NdbDictionary::Column::Char:{
2459
      const char *value= (char *) field->ptr;
2460
      my_snprintf(buf, sizeof(buf), "Char '%.*s'", field->pack_length(), value);
unknown's avatar
unknown committed
2461 2462 2463 2464 2465
      break;
    }
    case NdbDictionary::Column::Varchar:
    case NdbDictionary::Column::Binary:
    case NdbDictionary::Column::Varbinary: {
2466
      const char *value= (char *) field->ptr;
2467
      my_snprintf(buf, sizeof(buf), "Var '%.*s'", field->pack_length(), value);
unknown's avatar
unknown committed
2468 2469
      break;
    }
unknown's avatar
unknown committed
2470 2471
    case NdbDictionary::Column::Bit: {
      const char *value= (char *) field->ptr;
2472
      my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value);
unknown's avatar
unknown committed
2473 2474
      break;
    }
unknown's avatar
unknown committed
2475 2476
    case NdbDictionary::Column::Datetime: {
      Uint64 value= (Uint64) *field->ptr;
2477
      my_snprintf(buf, sizeof(buf), "Datetime %llu", value);
unknown's avatar
unknown committed
2478 2479 2480 2481
      break;
    }
    case NdbDictionary::Column::Timespec: {
      Uint64 value= (Uint64) *field->ptr;
2482
      my_snprintf(buf, sizeof(buf), "Timespec %llu", value);
unknown's avatar
unknown committed
2483 2484
      break;
    }
unknown's avatar
unknown committed
2485 2486 2487
    case NdbDictionary::Column::Blob: {
      Uint64 len= 0;
      ndb_blob->getLength(len);
2488
      my_snprintf(buf, sizeof(buf), "Blob [len=%u]", (unsigned)len);
unknown's avatar
unknown committed
2489 2490 2491 2492 2493
      break;
    }
    case NdbDictionary::Column::Text: {
      Uint64 len= 0;
      ndb_blob->getLength(len);
2494
      my_snprintf(buf, sizeof(buf), "Text [len=%u]", (unsigned)len);
unknown's avatar
unknown committed
2495 2496 2497
      break;
    }
    case NdbDictionary::Column::Undefined:
2498
      my_snprintf(buf, sizeof(buf), "Unknown type: %d", col->getType());
unknown's avatar
unknown committed
2499
      break;
unknown's avatar
unknown committed
2500
    }
2501 2502 2503

print_value:
    DBUG_PRINT("value", ("%u,%s: %s", f, col->getName(), buf));
unknown's avatar
unknown committed
2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520
  }
#endif
  DBUG_VOID_RETURN;
}


int ha_ndbcluster::index_init(uint index)
{
  DBUG_ENTER("index_init");
  DBUG_PRINT("enter", ("index: %u", index));
  DBUG_RETURN(handler::index_init(index));
}


int ha_ndbcluster::index_end()
{
  DBUG_ENTER("index_end");
2521
  DBUG_RETURN(close_scan());
unknown's avatar
unknown committed
2522 2523
}

2524 2525 2526 2527 2528 2529 2530 2531
/**
 * Check if key contains null
 */
static
int
check_null_in_key(const KEY* key_info, const byte *key, uint key_len)
{
  KEY_PART_INFO *curr_part, *end_part;
2532
  const byte* end_ptr= key + key_len;
2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545
  curr_part= key_info->key_part;
  end_part= curr_part + key_info->key_parts;
  

  for (; curr_part != end_part && key < end_ptr; curr_part++)
  {
    if(curr_part->null_bit && *key)
      return 1;

    key += curr_part->store_length;
  }
  return 0;
}
unknown's avatar
unknown committed
2546 2547

int ha_ndbcluster::index_read(byte *buf,
2548 2549
			      const byte *key, uint key_len, 
			      enum ha_rkey_function find_flag)
unknown's avatar
unknown committed
2550 2551 2552 2553 2554
{
  DBUG_ENTER("index_read");
  DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", 
                       active_index, key_len, find_flag));

unknown's avatar
unknown committed
2555
  int error;
2556 2557
  ndb_index_type type= get_index_type(active_index);
  const KEY* key_info= table->key_info+active_index;
unknown's avatar
unknown committed
2558 2559 2560 2561 2562
  switch (type){
  case PRIMARY_KEY_ORDERED_INDEX:
  case PRIMARY_KEY_INDEX:
    if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
    {
2563 2564
      if(m_active_cursor && (error= close_scan()))
	DBUG_RETURN(error);
unknown's avatar
unknown committed
2565 2566 2567 2568 2569 2570 2571 2572 2573
      DBUG_RETURN(pk_read(key, key_len, buf));
    }
    else if (type == PRIMARY_KEY_INDEX)
    {
      DBUG_RETURN(1);
    }
    break;
  case UNIQUE_ORDERED_INDEX:
  case UNIQUE_INDEX:
2574 2575
    if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len &&
	!check_null_in_key(key_info, key, key_len))
unknown's avatar
unknown committed
2576
    {
2577 2578
      if(m_active_cursor && (error= close_scan()))
	DBUG_RETURN(error);
unknown's avatar
unknown committed
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589
      DBUG_RETURN(unique_index_read(key, key_len, buf));
    }
    else if (type == UNIQUE_INDEX)
    {
      DBUG_RETURN(1);
    }
    break;
  case ORDERED_INDEX:
    break;
  default:
  case UNDEFINED_INDEX:
unknown's avatar
unknown committed
2590
    DBUG_ASSERT(FALSE);
2591
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2592 2593 2594
    break;
  }
  
2595
  key_range start_key;
2596 2597 2598
  start_key.key= key;
  start_key.length= key_len;
  start_key.flag= find_flag;
unknown's avatar
unknown committed
2599
  error= ordered_index_scan(&start_key, 0, TRUE, buf);  
unknown's avatar
unknown committed
2600
  DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error);
unknown's avatar
unknown committed
2601 2602 2603 2604 2605 2606 2607
}


int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, 
			      const byte *key, uint key_len, 
			      enum ha_rkey_function find_flag)
{
unknown's avatar
unknown committed
2608
  statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status);
unknown's avatar
unknown committed
2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619
  DBUG_ENTER("index_read_idx");
  DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));  
  index_init(index_no);  
  DBUG_RETURN(index_read(buf, key, key_len, find_flag));
}


int ha_ndbcluster::index_next(byte *buf)
{
  DBUG_ENTER("index_next");

2620
  int error= 1;
unknown's avatar
unknown committed
2621 2622
  statistic_increment(current_thd->status_var.ha_read_next_count,
		      &LOCK_status);
2623
  DBUG_RETURN(next_result(buf));
unknown's avatar
unknown committed
2624 2625 2626 2627 2628 2629
}


int ha_ndbcluster::index_prev(byte *buf)
{
  DBUG_ENTER("index_prev");
unknown's avatar
unknown committed
2630 2631
  statistic_increment(current_thd->status_var.ha_read_prev_count,
		      &LOCK_status);
unknown's avatar
unknown committed
2632 2633 2634 2635 2636 2637 2638
  DBUG_RETURN(1);
}


int ha_ndbcluster::index_first(byte *buf)
{
  DBUG_ENTER("index_first");
unknown's avatar
unknown committed
2639 2640
  statistic_increment(current_thd->status_var.ha_read_first_count,
		      &LOCK_status);
unknown's avatar
unknown committed
2641 2642 2643
  // Start the ordered index scan and fetch the first row

  // Only HA_READ_ORDER indexes get called by index_first
unknown's avatar
unknown committed
2644
  DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf));
unknown's avatar
unknown committed
2645 2646 2647 2648 2649 2650
}


int ha_ndbcluster::index_last(byte *buf)
{
  DBUG_ENTER("index_last");
2651
  statistic_increment(current_thd->status_var.ha_read_last_count,&LOCK_status);
unknown's avatar
unknown committed
2652
  int res;
unknown's avatar
unknown committed
2653
  if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
2654
    NdbScanOperation *cursor= m_active_cursor; 
unknown's avatar
unknown committed
2655
    while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
unknown's avatar
unknown committed
2656 2657 2658 2659 2660 2661
    if(res == 1){
      unpack_record(buf);
      table->status= 0;     
      DBUG_RETURN(0);
    }
  }
unknown's avatar
unknown committed
2662
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2663 2664 2665
}


2666 2667 2668 2669 2670
inline
int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
					   const key_range *end_key,
					   bool eq_range, bool sorted,
					   byte* buf)
2671
{
2672
  KEY* key_info;
2673 2674
  int error= 1; 
  DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
2675
  DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
2676

2677
  switch (get_index_type(active_index)){
2678
  case PRIMARY_KEY_ORDERED_INDEX:
2679
  case PRIMARY_KEY_INDEX:
2680 2681 2682 2683
    key_info= table->key_info + active_index;
    if (start_key && 
	start_key->length == key_info->key_length &&
	start_key->flag == HA_READ_KEY_EXACT)
2684
    {
2685 2686
      if(m_active_cursor && (error= close_scan()))
	DBUG_RETURN(error);
2687 2688 2689
      error= pk_read(start_key->key, start_key->length, buf);      
      DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
    }
2690
    break;
2691
  case UNIQUE_ORDERED_INDEX:
2692
  case UNIQUE_INDEX:
2693
    key_info= table->key_info + active_index;
2694 2695 2696
    if (start_key && start_key->length == key_info->key_length &&
	start_key->flag == HA_READ_KEY_EXACT && 
	!check_null_in_key(key_info, start_key->key, start_key->length))
2697
    {
2698 2699
      if(m_active_cursor && (error= close_scan()))
	DBUG_RETURN(error);
2700 2701 2702
      error= unique_index_read(start_key->key, start_key->length, buf);
      DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
    }
2703 2704 2705 2706
    break;
  default:
    break;
  }
2707 2708

  // Start the ordered index scan and fetch the first row
unknown's avatar
unknown committed
2709
  error= ordered_index_scan(start_key, end_key, sorted, buf);
2710 2711 2712
  DBUG_RETURN(error);
}

2713

unknown's avatar
unknown committed
2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727
int ha_ndbcluster::read_range_first(const key_range *start_key,
				    const key_range *end_key,
				    bool eq_range, bool sorted)
{
  byte* buf= table->record[0];
  DBUG_ENTER("ha_ndbcluster::read_range_first");
  
  DBUG_RETURN(read_range_first_to_buf(start_key,
				      end_key,
				      eq_range, 
				      sorted,
				      buf));
}

2728
int ha_ndbcluster::read_range_next()
2729 2730 2731 2732 2733 2734
{
  DBUG_ENTER("ha_ndbcluster::read_range_next");
  DBUG_RETURN(next_result(table->record[0]));
}


unknown's avatar
unknown committed
2735 2736
int ha_ndbcluster::rnd_init(bool scan)
{
2737
  NdbScanOperation *cursor= m_active_cursor;
unknown's avatar
unknown committed
2738 2739
  DBUG_ENTER("rnd_init");
  DBUG_PRINT("enter", ("scan: %d", scan));
2740
  // Check if scan is to be restarted
unknown's avatar
unknown committed
2741 2742 2743 2744
  if (cursor)
  {
    if (!scan)
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2745
    int res= cursor->restart(m_force_send);
unknown's avatar
unknown committed
2746
    DBUG_ASSERT(res == 0);
unknown's avatar
unknown committed
2747
  }
unknown's avatar
unknown committed
2748 2749 2750 2751
  index_init(table->primary_key);
  DBUG_RETURN(0);
}

2752 2753
int ha_ndbcluster::close_scan()
{
unknown's avatar
unknown committed
2754
  NdbConnection *trans= m_active_trans;
2755 2756
  DBUG_ENTER("close_scan");

unknown's avatar
unknown committed
2757 2758
  m_multi_cursor= 0;
  if (!m_active_cursor && !m_multi_cursor)
2759 2760
    DBUG_RETURN(1);

unknown's avatar
unknown committed
2761
  NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
unknown's avatar
unknown committed
2762
  
2763
  if (m_ops_pending)
unknown's avatar
unknown committed
2764 2765 2766 2767 2768
  {
    /*
      Take over any pending transactions to the 
      deleteing/updating transaction before closing the scan    
    */
2769
    DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
unknown's avatar
unknown committed
2770
    if (execute_no_commit(this,trans) != 0) {
2771
      no_uncommitted_rows_execute_failure();
unknown's avatar
unknown committed
2772
      DBUG_RETURN(ndb_err(trans));
2773
    }
2774
    m_ops_pending= 0;
unknown's avatar
unknown committed
2775 2776
  }
  
unknown's avatar
unknown committed
2777
  cursor->close(m_force_send);
unknown's avatar
unknown committed
2778
  m_active_cursor= m_multi_cursor= NULL;
unknown's avatar
unknown committed
2779
  DBUG_RETURN(0);
2780
}
unknown's avatar
unknown committed
2781 2782 2783 2784

int ha_ndbcluster::rnd_end()
{
  DBUG_ENTER("rnd_end");
2785
  DBUG_RETURN(close_scan());
unknown's avatar
unknown committed
2786 2787 2788 2789 2790 2791
}


int ha_ndbcluster::rnd_next(byte *buf)
{
  DBUG_ENTER("rnd_next");
unknown's avatar
unknown committed
2792 2793
  statistic_increment(current_thd->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
2794

unknown's avatar
unknown committed
2795
  if (!m_active_cursor)
2796 2797
    DBUG_RETURN(full_table_scan(buf));
  DBUG_RETURN(next_result(buf));
unknown's avatar
unknown committed
2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810
}


/*
  An "interesting" record has been found and it's pk 
  retrieved by calling position
  Now it's time to read the record from db once 
  again
*/

int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
{
  DBUG_ENTER("rnd_pos");
unknown's avatar
unknown committed
2811 2812
  statistic_increment(current_thd->status_var.ha_read_rnd_count,
		      &LOCK_status);
unknown's avatar
unknown committed
2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859
  // The primary key for the record is stored in pos
  // Perform a pk_read using primary key "index"
  DBUG_RETURN(pk_read(pos, ref_length, buf));  
}


/*
  Store the primary key of this record in ref 
  variable, so that the row can be retrieved again later
  using "reference" in rnd_pos
*/

void ha_ndbcluster::position(const byte *record)
{
  KEY *key_info;
  KEY_PART_INFO *key_part;
  KEY_PART_INFO *end;
  byte *buff;
  DBUG_ENTER("position");

  if (table->primary_key != MAX_KEY) 
  {
    key_info= table->key_info + table->primary_key;
    key_part= key_info->key_part;
    end= key_part + key_info->key_parts;
    buff= ref;
    
    for (; key_part != end; key_part++) 
    {
      if (key_part->null_bit) {
        /* Store 0 if the key part is a NULL part */      
        if (record[key_part->null_offset]
            & key_part->null_bit) {
          *buff++= 1;
          continue;
        }      
        *buff++= 0;
      }
      memcpy(buff, record + key_part->offset, key_part->length);
      buff += key_part->length;
    }
  } 
  else 
  {
    // No primary key, get hidden key
    DBUG_PRINT("info", ("Getting hidden key"));
    int hidden_no= table->fields;
2860
    const NdbRecAttr* rec= m_value[hidden_no].rec;
unknown's avatar
unknown committed
2861
    const NDBTAB *tab= (const NDBTAB *) m_table;  
unknown's avatar
unknown committed
2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886
    const NDBCOL *hidden_col= tab->getColumn(hidden_no);
    DBUG_ASSERT(hidden_col->getPrimaryKey() && 
                hidden_col->getAutoIncrement() &&
                rec != NULL && 
                ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
    memcpy(ref, (const void*)rec->aRef(), ref_length);
  }
  
  DBUG_DUMP("ref", (char*)ref, ref_length);
  DBUG_VOID_RETURN;
}


void ha_ndbcluster::info(uint flag)
{
  DBUG_ENTER("info");
  DBUG_PRINT("enter", ("flag: %d", flag));
  
  if (flag & HA_STATUS_POS)
    DBUG_PRINT("info", ("HA_STATUS_POS"));
  if (flag & HA_STATUS_NO_LOCK)
    DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
  if (flag & HA_STATUS_TIME)
    DBUG_PRINT("info", ("HA_STATUS_TIME"));
  if (flag & HA_STATUS_VARIABLE)
2887
  {
unknown's avatar
unknown committed
2888
    DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
2889 2890
    if (m_table_info)
    {
2891 2892 2893 2894
      if (m_ha_not_exact_count)
	records= 100;
      else
	records_update();
2895 2896 2897
    }
    else
    {
2898 2899 2900 2901
      Uint64 rows= 100;
      if (current_thd->variables.ndb_use_exact_count)
	ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0);
      records= rows;
2902
    }
2903
  }
unknown's avatar
unknown committed
2904 2905 2906 2907 2908
  if (flag & HA_STATUS_CONST)
  {
    DBUG_PRINT("info", ("HA_STATUS_CONST"));
    set_rec_per_key();
  }
unknown's avatar
unknown committed
2909
  if (flag & HA_STATUS_ERRKEY)
2910
  {
unknown's avatar
unknown committed
2911
    DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
2912
    errkey= m_dupkey;
2913
  }
unknown's avatar
unknown committed
2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931
  if (flag & HA_STATUS_AUTO)
    DBUG_PRINT("info", ("HA_STATUS_AUTO"));
  DBUG_VOID_RETURN;
}


int ha_ndbcluster::extra(enum ha_extra_function operation)
{
  DBUG_ENTER("extra");
  switch (operation) {
  case HA_EXTRA_NORMAL:              /* Optimize for space (def) */
    DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
    break;
  case HA_EXTRA_QUICK:                 /* Optimize for speed */
    DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
    break;
  case HA_EXTRA_RESET:                 /* Reset database to after open */
    DBUG_PRINT("info", ("HA_EXTRA_RESET"));
2932 2933
    DBUG_PRINT("info", ("Clearing condition stack"));
    cond_clear();
unknown's avatar
unknown committed
2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002
    break;
  case HA_EXTRA_CACHE:                 /* Cash record in HA_rrnd() */
    DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
    break;
  case HA_EXTRA_NO_CACHE:              /* End cacheing of records (def) */
    DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
    break;
  case HA_EXTRA_NO_READCHECK:          /* No readcheck on update */
    DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
    break;
  case HA_EXTRA_READCHECK:             /* Use readcheck (def) */
    DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
    break;
  case HA_EXTRA_KEYREAD:               /* Read only key to database */
    DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
    break;
  case HA_EXTRA_NO_KEYREAD:            /* Normal read of records (def) */
    DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
    break;
  case HA_EXTRA_NO_USER_CHANGE:        /* No user is allowed to write */
    DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
    break;
  case HA_EXTRA_KEY_CACHE:
    DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
    break;
  case HA_EXTRA_NO_KEY_CACHE:
    DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
    break;
  case HA_EXTRA_WAIT_LOCK:            /* Wait until file is avalably (def) */
    DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
    break;
  case HA_EXTRA_NO_WAIT_LOCK:         /* If file is locked, return quickly */
    DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
    break;
  case HA_EXTRA_WRITE_CACHE:           /* Use write cache in ha_write() */
    DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
    break;
  case HA_EXTRA_FLUSH_CACHE:           /* flush write_record_cache */
    DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
    break;
  case HA_EXTRA_NO_KEYS:               /* Remove all update of keys */
    DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
    break;
  case HA_EXTRA_KEYREAD_CHANGE_POS:         /* Keyread, but change pos */
    DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
    break;                                  
  case HA_EXTRA_REMEMBER_POS:          /* Remember pos for next/prev */
    DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
    break;
  case HA_EXTRA_RESTORE_POS:
    DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
    break;
  case HA_EXTRA_REINIT_CACHE:          /* init cache from current record */
    DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
    break;
  case HA_EXTRA_FORCE_REOPEN:          /* Datafile have changed on disk */
    DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
    break;
  case HA_EXTRA_FLUSH:                 /* Flush tables to disk */
    DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
    break;
  case HA_EXTRA_NO_ROWS:               /* Don't write rows */
    DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
    break;
  case HA_EXTRA_RESET_STATE:           /* Reset positions */
    DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
    break;
  case HA_EXTRA_IGNORE_DUP_KEY:       /* Dup keys don't rollback everything*/
    DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
3003 3004 3005 3006 3007 3008
    if (current_thd->lex->sql_command == SQLCOM_REPLACE)
    {
      DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
      m_use_write= TRUE;
    } else 
    {
3009 3010
      DBUG_PRINT("info", ("Ignoring duplicate key"));
      m_ignore_dup_key= TRUE;
3011
    }
unknown's avatar
unknown committed
3012 3013 3014 3015
    break;
  case HA_EXTRA_NO_IGNORE_DUP_KEY:
    DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
    DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
unknown's avatar
unknown committed
3016
    m_use_write= FALSE;
3017
    m_ignore_dup_key= FALSE;
unknown's avatar
unknown committed
3018 3019 3020 3021 3022
    break;
  case HA_EXTRA_RETRIEVE_ALL_COLS:    /* Retrieve all columns, not just those
					 where field->query_id is the same as
					 the current query id */
    DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
3023
    m_retrieve_all_fields= TRUE;
unknown's avatar
unknown committed
3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
    break;
  case HA_EXTRA_PREPARE_FOR_DELETE:
    DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
    break;
  case HA_EXTRA_PREPARE_FOR_UPDATE:     /* Remove read cache if problems */
    DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
    break;
  case HA_EXTRA_PRELOAD_BUFFER_SIZE: 
    DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
    break;
  case HA_EXTRA_RETRIEVE_PRIMARY_KEY: 
    DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
3036
    m_retrieve_primary_key= TRUE;
unknown's avatar
unknown committed
3037 3038 3039 3040 3041 3042
    break;
  case HA_EXTRA_CHANGE_KEY_TO_UNIQUE: 
    DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
    break;
  case HA_EXTRA_CHANGE_KEY_TO_DUP: 
    DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
3043 3044
  case HA_EXTRA_KEYREAD_PRESERVE_FIELDS:
    DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_PRESERVE_FIELDS"));
unknown's avatar
unknown committed
3045 3046 3047 3048 3049 3050 3051
    break;

  }
  
  DBUG_RETURN(0);
}

3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064
/* 
   Start of an insert, remember number of rows to be inserted, it will
   be used in write_row and get_autoincrement to send an optimal number
   of rows in each roundtrip to the server

   SYNOPSIS
   rows     number of rows to insert, 0 if unknown

*/

void ha_ndbcluster::start_bulk_insert(ha_rows rows)
{
  int bytes, batch;
unknown's avatar
unknown committed
3065
  const NDBTAB *tab= (const NDBTAB *) m_table;    
3066 3067

  DBUG_ENTER("start_bulk_insert");
unknown's avatar
unknown committed
3068
  DBUG_PRINT("enter", ("rows: %d", (int)rows));
3069
  
3070 3071
  m_rows_inserted= 0;
  m_rows_to_insert= rows; 
3072 3073 3074 3075 3076 3077 3078 3079

  /* 
    Calculate how many rows that should be inserted
    per roundtrip to NDB. This is done in order to minimize the 
    number of roundtrips as much as possible. However performance will 
    degrade if too many bytes are inserted, thus it's limited by this 
    calculation.   
  */
3080
  const int bytesperbatch= 8192;
3081
  bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
3082
  batch= bytesperbatch/bytes;
3083 3084
  batch= batch == 0 ? 1 : batch;
  DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
3085
  m_bulk_insert_rows= batch;
3086 3087 3088 3089 3090 3091 3092 3093 3094

  DBUG_VOID_RETURN;
}

/*
  End of an insert
 */
int ha_ndbcluster::end_bulk_insert()
{
3095 3096
  int error= 0;

3097
  DBUG_ENTER("end_bulk_insert");
3098
  // Check if last inserts need to be flushed
3099
  if (m_bulk_insert_not_flushed)
3100 3101 3102 3103 3104
  {
    NdbConnection *trans= m_active_trans;
    // Send rows to NDB
    DBUG_PRINT("info", ("Sending inserts to NDB, "\
                        "rows_inserted:%d, bulk_insert_rows: %d", 
3105 3106
                        m_rows_inserted, m_bulk_insert_rows)); 
    m_bulk_insert_not_flushed= FALSE;
unknown's avatar
unknown committed
3107
    if (execute_no_commit(this,trans) != 0) {
3108
      no_uncommitted_rows_execute_failure();
3109
      my_errno= error= ndb_err(trans);
3110
    }
3111 3112
  }

3113 3114
  m_rows_inserted= 0;
  m_rows_to_insert= 1;
3115
  DBUG_RETURN(error);
3116 3117
}

unknown's avatar
unknown committed
3118 3119 3120 3121

int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
  DBUG_ENTER("extra_opt");
unknown's avatar
unknown committed
3122
  DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
unknown's avatar
unknown committed
3123 3124 3125 3126 3127
  DBUG_RETURN(extra(operation));
}


const char **ha_ndbcluster::bas_ext() const
3128
{ static const char *ext[]= { ha_ndb_ext, NullS }; return ext; }
unknown's avatar
unknown committed
3129 3130 3131 3132 3133 3134 3135 3136 3137 3138


/*
  How many seeks it will take to read through the table
  This is to be comparable to the number returned by records_in_range so
  that we can decide if we should scan the table or use keys.
*/

double ha_ndbcluster::scan_time()
{
unknown's avatar
unknown committed
3139 3140 3141 3142 3143
  DBUG_ENTER("ha_ndbcluster::scan_time()");
  double res= rows2double(records*1000);
  DBUG_PRINT("exit", ("table: %s value: %f", 
		      m_tabname, res));
  DBUG_RETURN(res);
unknown's avatar
unknown committed
3144 3145 3146 3147 3148 3149 3150 3151 3152 3153
}


THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
                                          THR_LOCK_DATA **to,
                                          enum thr_lock_type lock_type)
{
  DBUG_ENTER("store_lock");
  if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) 
  {
unknown's avatar
unknown committed
3154

unknown's avatar
unknown committed
3155 3156 3157
    /* If we are not doing a LOCK TABLE, then allow multiple
       writers */
    
3158 3159 3160
    /* Since NDB does not currently have table locks
       this is treated as a ordinary lock */

unknown's avatar
unknown committed
3161
    if ((lock_type >= TL_WRITE_ALLOW_WRITE &&
unknown's avatar
unknown committed
3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176
         lock_type <= TL_WRITE) && !thd->in_lock_tables)      
      lock_type= TL_WRITE_ALLOW_WRITE;
    
    /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
       to t2. Convert the lock to a normal read lock to allow
       concurrent inserts to t2. */
    
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
      lock_type= TL_READ;
    
    m_lock.type=lock_type;
  }
  *to++= &m_lock;
3177 3178

  DBUG_PRINT("exit", ("lock_type: %d", lock_type));
unknown's avatar
unknown committed
3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221
  
  DBUG_RETURN(to);
}

#ifndef DBUG_OFF
#define PRINT_OPTION_FLAGS(t) { \
      if (t->options & OPTION_NOT_AUTOCOMMIT) \
        DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \
      if (t->options & OPTION_BEGIN) \
        DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \
      if (t->options & OPTION_TABLE_LOCK) \
        DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \
}
#else
#define PRINT_OPTION_FLAGS(t)
#endif


/*
  As MySQL will execute an external lock for every new table it uses
  we can use this to start the transactions.
  If we are in auto_commit mode we just need to start a transaction
  for the statement, this will be stored in transaction.stmt.
  If not, we have to start a master transaction if there doesn't exist
  one from before, this will be stored in transaction.all
 
  When a table lock is held one transaction will be started which holds
  the table lock and for each statement a hupp transaction will be started  
 */

int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
  int error=0;
  NdbConnection* trans= NULL;

  DBUG_ENTER("external_lock");
  /*
    Check that this handler instance has a connection
    set up to the Ndb object of thd
   */
  if (check_ndb_connection())
    DBUG_RETURN(1);
 
3222 3223 3224 3225 3226
  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;

  DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d", 
                       thd_ndb->lock_count));

unknown's avatar
unknown committed
3227 3228
  if (lock_type != F_UNLCK)
  {
3229
    DBUG_PRINT("info", ("lock_type != F_UNLCK"));
3230
    if (!thd_ndb->lock_count++)
unknown's avatar
unknown committed
3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242
    {
      PRINT_OPTION_FLAGS(thd);

      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))) 
      {
        // Autocommit transaction
        DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
        DBUG_PRINT("trans",("Starting transaction stmt"));      

        trans= m_ndb->startTransaction();
        if (trans == NULL)
          ERR_RETURN(m_ndb->getNdbError());
3243
	no_uncommitted_rows_reset(thd);
unknown's avatar
unknown committed
3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256
        thd->transaction.stmt.ndb_tid= trans;
      } 
      else 
      { 
        if (!thd->transaction.all.ndb_tid)
	{
          // Not autocommit transaction
          // A "master" transaction ha not been started yet
          DBUG_PRINT("trans",("starting transaction, all"));
          
          trans= m_ndb->startTransaction();
          if (trans == NULL)
            ERR_RETURN(m_ndb->getNdbError());
3257
	  no_uncommitted_rows_reset(thd);
unknown's avatar
unknown committed
3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286

          /*
            If this is the start of a LOCK TABLE, a table look 
            should be taken on the table in NDB
           
            Check if it should be read or write lock
           */
          if (thd->options & (OPTION_TABLE_LOCK))
	  {
            //lockThisTable();
            DBUG_PRINT("info", ("Locking the table..." ));
          }

          thd->transaction.all.ndb_tid= trans; 
        }
      }
    }
    /*
      This is the place to make sure this handler instance
      has a started transaction.
     
      The transaction is started by the first handler on which 
      MySQL Server calls external lock
     
      Other handlers in the same stmt or transaction should use 
      the same NDB transaction. This is done by setting up the m_active_trans
      pointer to point to the NDB transaction. 
     */

3287 3288 3289 3290 3291 3292 3293 3294
    // store thread specific data first to set the right context
    m_force_send=          thd->variables.ndb_force_send;
    m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
    m_autoincrement_prefetch= thd->variables.ndb_autoincrement_prefetch_sz;
    if (!thd->transaction.on)
      m_transaction_on= FALSE;
    else
      m_transaction_on= thd->variables.ndb_use_transactions;
unknown's avatar
unknown committed
3295
    //     m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
3296

unknown's avatar
unknown committed
3297 3298 3299 3300
    m_active_trans= thd->transaction.all.ndb_tid ? 
      (NdbConnection*)thd->transaction.all.ndb_tid:
      (NdbConnection*)thd->transaction.stmt.ndb_tid;
    DBUG_ASSERT(m_active_trans);
3301
    // Start of transaction
3302
    m_retrieve_all_fields= FALSE;
3303
    m_retrieve_primary_key= FALSE;
3304
    m_ops_pending= 0;    
3305 3306 3307 3308 3309 3310 3311 3312 3313 3314
    {
      NDBDICT *dict= m_ndb->getDictionary();
      const NDBTAB *tab;
      void *tab_info;
      if (!(tab= dict->getTable(m_tabname, &tab_info)))
	ERR_RETURN(dict->getNdbError());
      DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
      m_table= (void *)tab;
      m_table_info= tab_info;
    }
3315
    no_uncommitted_rows_init(thd);
unknown's avatar
unknown committed
3316 3317 3318
  } 
  else 
  {
3319
    DBUG_PRINT("info", ("lock_type == F_UNLCK"));
3320
    if (!--thd_ndb->lock_count)
unknown's avatar
unknown committed
3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336
    {
      DBUG_PRINT("trans", ("Last external_lock"));
      PRINT_OPTION_FLAGS(thd);

      if (thd->transaction.stmt.ndb_tid)
      {
        /*
          Unlock is done without a transaction commit / rollback.
          This happens if the thread didn't update any rows
          We must in this case close the transaction to release resources
        */
        DBUG_PRINT("trans",("ending non-updating transaction"));
        m_ndb->closeTransaction(m_active_trans);
        thd->transaction.stmt.ndb_tid= 0;
      }
    }
unknown's avatar
unknown committed
3337 3338
    m_table= NULL;
    m_table_info= NULL;
3339 3340 3341 3342 3343 3344 3345 3346 3347
    /*
      This is the place to make sure this handler instance
      no longer are connected to the active transaction.

      And since the handler is no longer part of the transaction 
      it can't have open cursors, ops or blobs pending.
    */
    m_active_trans= NULL;    

3348 3349
    if (m_active_cursor)
      DBUG_PRINT("warning", ("m_active_cursor != NULL"));
3350 3351
    m_active_cursor= NULL;

unknown's avatar
unknown committed
3352 3353 3354 3355
    if (m_multi_cursor)
      DBUG_PRINT("warning", ("m_multi_cursor != NULL"));
    m_multi_cursor= NULL;
    
3356
    if (m_blobs_pending)
3357
      DBUG_PRINT("warning", ("blobs_pending != 0"));
3358
    m_blobs_pending= 0;
3359
    
3360
    if (m_ops_pending)
3361
      DBUG_PRINT("warning", ("ops_pending != 0L"));
3362
    m_ops_pending= 0;
unknown's avatar
unknown committed
3363 3364 3365 3366 3367 3368 3369 3370
  }
  DBUG_RETURN(error);
}

/*
  When using LOCK TABLE's external_lock is only called when the actual
  TABLE LOCK is done.
  Under LOCK TABLES, each used tables will force a call to start_stmt.
unknown's avatar
unknown committed
3371 3372
  Ndb doesn't currently support table locks, and will do ordinary
  startTransaction for each transaction/statement.
unknown's avatar
unknown committed
3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386
*/

int ha_ndbcluster::start_stmt(THD *thd)
{
  int error=0;
  DBUG_ENTER("start_stmt");
  PRINT_OPTION_FLAGS(thd);

  NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid;
  if (!trans){
    DBUG_PRINT("trans",("Starting transaction stmt"));  
    
    NdbConnection *tablock_trans= 
      (NdbConnection*)thd->transaction.all.ndb_tid;
unknown's avatar
unknown committed
3387
    DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
unknown's avatar
unknown committed
3388 3389 3390
    DBUG_ASSERT(tablock_trans);
//    trans= m_ndb->hupp(tablock_trans);
    trans= m_ndb->startTransaction();
unknown's avatar
unknown committed
3391 3392
    if (trans == NULL)
      ERR_RETURN(m_ndb->getNdbError());
3393
    no_uncommitted_rows_reset(thd);
unknown's avatar
unknown committed
3394 3395 3396
    thd->transaction.stmt.ndb_tid= trans;
  }
  m_active_trans= trans;
3397

3398
  // Start of statement
3399
  m_retrieve_all_fields= FALSE;
3400
  m_retrieve_primary_key= FALSE;
3401
  m_ops_pending= 0;    
unknown's avatar
unknown committed
3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413
  
  DBUG_RETURN(error);
}


/*
  Commit a transaction started in NDB 
 */

int ndbcluster_commit(THD *thd, void *ndb_transaction)
{
  int res= 0;
3414
  Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
unknown's avatar
unknown committed
3415 3416 3417 3418 3419 3420 3421 3422
  NdbConnection *trans= (NdbConnection*)ndb_transaction;

  DBUG_ENTER("ndbcluster_commit");
  DBUG_PRINT("transaction",("%s",
                            trans == thd->transaction.stmt.ndb_tid ? 
                            "stmt" : "all"));
  DBUG_ASSERT(ndb && trans);

3423
  if (execute_commit(thd,trans) != 0)
unknown's avatar
unknown committed
3424 3425
  {
    const NdbError err= trans->getNdbError();
3426
    const NdbOperation *error_op= trans->getNdbErrorOperation();
unknown's avatar
unknown committed
3427 3428
    ERR_PRINT(err);     
    res= ndb_to_mysql_error(&err);
3429
    if (res != -1) 
3430
      ndbcluster_print_error(res, error_op);
unknown's avatar
unknown committed
3431
  }
3432
  ndb->closeTransaction(trans);
unknown's avatar
unknown committed
3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443
  DBUG_RETURN(res);
}


/*
  Rollback a transaction started in NDB
 */

int ndbcluster_rollback(THD *thd, void *ndb_transaction)
{
  int res= 0;
3444
  Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
unknown's avatar
unknown committed
3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455
  NdbConnection *trans= (NdbConnection*)ndb_transaction;

  DBUG_ENTER("ndbcluster_rollback");
  DBUG_PRINT("transaction",("%s",
                            trans == thd->transaction.stmt.ndb_tid ? 
                            "stmt" : "all"));
  DBUG_ASSERT(ndb && trans);

  if (trans->execute(Rollback) != 0)
  {
    const NdbError err= trans->getNdbError();
3456
    const NdbOperation *error_op= trans->getNdbErrorOperation();
unknown's avatar
unknown committed
3457 3458
    ERR_PRINT(err);     
    res= ndb_to_mysql_error(&err);
3459 3460
    if (res != -1) 
      ndbcluster_print_error(res, error_op);
unknown's avatar
unknown committed
3461 3462 3463 3464 3465 3466 3467
  }
  ndb->closeTransaction(trans);
  DBUG_RETURN(0);
}


/*
unknown's avatar
unknown committed
3468 3469 3470
  Define NDB column based on Field.
  Returns 0 or mysql error code.
  Not member of ha_ndbcluster because NDBCOL cannot be declared.
unknown's avatar
unknown committed
3471 3472
 */

unknown's avatar
unknown committed
3473 3474 3475
static int create_ndb_column(NDBCOL &col,
                             Field *field,
                             HA_CREATE_INFO *info)
unknown's avatar
unknown committed
3476
{
unknown's avatar
unknown committed
3477
  // Set name
3478 3479 3480 3481 3482 3483
  {
    char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
    strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
    truncated_field_name[sizeof(truncated_field_name)-1]= '\0';
    col.setName(truncated_field_name);
  }
unknown's avatar
unknown committed
3484 3485
  // Get char set
  CHARSET_INFO *cs= field->charset();
unknown's avatar
unknown committed
3486 3487 3488 3489
  // Set type and sizes
  const enum enum_field_types mysql_type= field->real_type();
  switch (mysql_type) {
  // Numeric types
unknown's avatar
unknown committed
3490
  case MYSQL_TYPE_DECIMAL:    
unknown's avatar
unknown committed
3491 3492 3493
    col.setType(NDBCOL::Char);
    col.setLength(field->pack_length());
    break;
unknown's avatar
unknown committed
3494
  case MYSQL_TYPE_TINY:        
unknown's avatar
unknown committed
3495 3496 3497 3498 3499 3500
    if (field->flags & UNSIGNED_FLAG)
      col.setType(NDBCOL::Tinyunsigned);
    else
      col.setType(NDBCOL::Tinyint);
    col.setLength(1);
    break;
unknown's avatar
unknown committed
3501
  case MYSQL_TYPE_SHORT:
unknown's avatar
unknown committed
3502 3503 3504 3505 3506 3507
    if (field->flags & UNSIGNED_FLAG)
      col.setType(NDBCOL::Smallunsigned);
    else
      col.setType(NDBCOL::Smallint);
    col.setLength(1);
    break;
unknown's avatar
unknown committed
3508
  case MYSQL_TYPE_LONG:
unknown's avatar
unknown committed
3509 3510 3511 3512 3513 3514
    if (field->flags & UNSIGNED_FLAG)
      col.setType(NDBCOL::Unsigned);
    else
      col.setType(NDBCOL::Int);
    col.setLength(1);
    break;
unknown's avatar
unknown committed
3515
  case MYSQL_TYPE_INT24:       
unknown's avatar
unknown committed
3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527
    if (field->flags & UNSIGNED_FLAG)
      col.setType(NDBCOL::Mediumunsigned);
    else
      col.setType(NDBCOL::Mediumint);
    col.setLength(1);
    break;
  case MYSQL_TYPE_LONGLONG:
    if (field->flags & UNSIGNED_FLAG)
      col.setType(NDBCOL::Bigunsigned);
    else
      col.setType(NDBCOL::Bigint);
    col.setLength(1);
unknown's avatar
unknown committed
3528 3529
    break;
  case MYSQL_TYPE_FLOAT:
unknown's avatar
unknown committed
3530 3531 3532
    col.setType(NDBCOL::Float);
    col.setLength(1);
    break;
unknown's avatar
unknown committed
3533
  case MYSQL_TYPE_DOUBLE:
unknown's avatar
unknown committed
3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554
    col.setType(NDBCOL::Double);
    col.setLength(1);
    break;
  // Date types
  case MYSQL_TYPE_TIMESTAMP:
    col.setType(NDBCOL::Unsigned);
    col.setLength(1);
    break;
  case MYSQL_TYPE_DATETIME:    
    col.setType(NDBCOL::Datetime);
    col.setLength(1);
    break;
  case MYSQL_TYPE_DATE:
  case MYSQL_TYPE_NEWDATE:
  case MYSQL_TYPE_TIME:        
  case MYSQL_TYPE_YEAR:        
    col.setType(NDBCOL::Char);
    col.setLength(field->pack_length());
    break;
  // Char types
  case MYSQL_TYPE_STRING:      
3555
    if (field->pack_length() == 0)
3556 3557 3558 3559
    {
      col.setType(NDBCOL::Bit);
      col.setLength(1);
    }
unknown's avatar
unknown committed
3560 3561
    else if (field->flags & BINARY_FLAG)
    {
unknown's avatar
unknown committed
3562
      col.setType(NDBCOL::Binary);
unknown's avatar
unknown committed
3563
      col.setLength(field->pack_length());
unknown's avatar
unknown committed
3564
    }
3565
    else
unknown's avatar
unknown committed
3566 3567 3568
    {
      col.setType(NDBCOL::Char);
      col.setCharset(cs);
3569
      col.setLength(field->pack_length());
unknown's avatar
unknown committed
3570
    }
unknown's avatar
unknown committed
3571 3572 3573 3574
    break;
  case MYSQL_TYPE_VAR_STRING:
    if (field->flags & BINARY_FLAG)
      col.setType(NDBCOL::Varbinary);
unknown's avatar
unknown committed
3575
    else {
unknown's avatar
unknown committed
3576
      col.setType(NDBCOL::Varchar);
unknown's avatar
unknown committed
3577 3578
      col.setCharset(cs);
    }
unknown's avatar
unknown committed
3579 3580 3581 3582 3583 3584 3585
    col.setLength(field->pack_length());
    break;
  // Blob types (all come in as MYSQL_TYPE_BLOB)
  mysql_type_tiny_blob:
  case MYSQL_TYPE_TINY_BLOB:
    if (field->flags & BINARY_FLAG)
      col.setType(NDBCOL::Blob);
unknown's avatar
unknown committed
3586
    else {
unknown's avatar
unknown committed
3587
      col.setType(NDBCOL::Text);
unknown's avatar
unknown committed
3588 3589
      col.setCharset(cs);
    }
unknown's avatar
unknown committed
3590 3591 3592 3593 3594
    col.setInlineSize(256);
    // No parts
    col.setPartSize(0);
    col.setStripeSize(0);
    break;
3595
  //mysql_type_blob:
unknown's avatar
unknown committed
3596 3597 3598
  case MYSQL_TYPE_BLOB:    
    if (field->flags & BINARY_FLAG)
      col.setType(NDBCOL::Blob);
unknown's avatar
unknown committed
3599
    else {
unknown's avatar
unknown committed
3600
      col.setType(NDBCOL::Text);
unknown's avatar
unknown committed
3601 3602
      col.setCharset(cs);
    }
unknown's avatar
unknown committed
3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620
    // Use "<=" even if "<" is the exact condition
    if (field->max_length() <= (1 << 8))
      goto mysql_type_tiny_blob;
    else if (field->max_length() <= (1 << 16))
    {
      col.setInlineSize(256);
      col.setPartSize(2000);
      col.setStripeSize(16);
    }
    else if (field->max_length() <= (1 << 24))
      goto mysql_type_medium_blob;
    else
      goto mysql_type_long_blob;
    break;
  mysql_type_medium_blob:
  case MYSQL_TYPE_MEDIUM_BLOB:   
    if (field->flags & BINARY_FLAG)
      col.setType(NDBCOL::Blob);
unknown's avatar
unknown committed
3621
    else {
unknown's avatar
unknown committed
3622
      col.setType(NDBCOL::Text);
unknown's avatar
unknown committed
3623 3624
      col.setCharset(cs);
    }
unknown's avatar
unknown committed
3625 3626 3627 3628 3629 3630 3631 3632
    col.setInlineSize(256);
    col.setPartSize(4000);
    col.setStripeSize(8);
    break;
  mysql_type_long_blob:
  case MYSQL_TYPE_LONG_BLOB:  
    if (field->flags & BINARY_FLAG)
      col.setType(NDBCOL::Blob);
unknown's avatar
unknown committed
3633
    else {
unknown's avatar
unknown committed
3634
      col.setType(NDBCOL::Text);
unknown's avatar
unknown committed
3635 3636
      col.setCharset(cs);
    }
unknown's avatar
unknown committed
3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655
    col.setInlineSize(256);
    col.setPartSize(8000);
    col.setStripeSize(4);
    break;
  // Other types
  case MYSQL_TYPE_ENUM:
    col.setType(NDBCOL::Char);
    col.setLength(field->pack_length());
    break;
  case MYSQL_TYPE_SET:         
    col.setType(NDBCOL::Char);
    col.setLength(field->pack_length());
    break;
  case MYSQL_TYPE_NULL:        
  case MYSQL_TYPE_GEOMETRY:
    goto mysql_type_unsupported;
  mysql_type_unsupported:
  default:
    return HA_ERR_UNSUPPORTED;
unknown's avatar
unknown committed
3656
  }
unknown's avatar
unknown committed
3657 3658 3659 3660 3661 3662 3663 3664
  // Set nullable and pk
  col.setNullable(field->maybe_null());
  col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
  // Set autoincrement
  if (field->flags & AUTO_INCREMENT_FLAG) 
  {
    col.setAutoIncrement(TRUE);
    ulonglong value= info->auto_increment_value ?
3665
      info->auto_increment_value : (ulonglong) 1;
unknown's avatar
unknown committed
3666 3667
    DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
    col.setAutoIncrementInitialValue(value);
unknown's avatar
unknown committed
3668
  }
unknown's avatar
unknown committed
3669
  else
unknown's avatar
unknown committed
3670
    col.setAutoIncrement(FALSE);
unknown's avatar
unknown committed
3671
  return 0;
unknown's avatar
unknown committed
3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683
}

/*
  Create a table in NDB Cluster
 */

int ha_ndbcluster::create(const char *name, 
			  TABLE *form, 
			  HA_CREATE_INFO *info)
{
  NDBTAB tab;
  NDBCOL col;
unknown's avatar
unknown committed
3684
  uint pack_length, length, i, pk_length= 0;
unknown's avatar
unknown committed
3685
  const void *data, *pack_data;
3686
  const char **key_names= form->keynames.type_names;
unknown's avatar
unknown committed
3687
  char name2[FN_HEADLEN];
3688
  bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
unknown's avatar
unknown committed
3689 3690 3691 3692 3693
   
  DBUG_ENTER("create");
  DBUG_PRINT("enter", ("name: %s", name));
  fn_format(name2, name, "", "",2);       // Remove the .frm extension
  set_dbname(name2);
3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705
  set_tabname(name2);    

  if (create_from_engine)
  {
    /*
      Table alreay exists in NDB and frm file has been created by 
      caller.
      Do Ndb specific stuff, such as create a .ndb file
    */
    my_errno= write_ndb_file();
    DBUG_RETURN(my_errno);
  }
unknown's avatar
unknown committed
3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727

  DBUG_PRINT("table", ("name: %s", m_tabname));  
  tab.setName(m_tabname);
  tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));    
   
  // Save frm data for this table
  if (readfrm(name, &data, &length))
    DBUG_RETURN(1);
  if (packfrm(data, length, &pack_data, &pack_length))
    DBUG_RETURN(2);
  
  DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
  tab.setFrm(pack_data, pack_length);      
  my_free((char*)data, MYF(0));
  my_free((char*)pack_data, MYF(0));
  
  for (i= 0; i < form->fields; i++) 
  {
    Field *field= form->field[i];
    DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", 
                        field->field_name, field->real_type(),
			field->pack_length()));
3728
    if ((my_errno= create_ndb_column(col, field, info)))
unknown's avatar
unknown committed
3729
      DBUG_RETURN(my_errno);
unknown's avatar
unknown committed
3730
    tab.addColumn(col);
unknown's avatar
unknown committed
3731 3732
    if(col.getPrimaryKey())
      pk_length += (field->pack_length() + 3) / 4;
unknown's avatar
unknown committed
3733 3734 3735 3736 3737 3738 3739 3740 3741
  }
  
  // No primary key, create shadow key as 64 bit, auto increment  
  if (form->primary_key == MAX_KEY) 
  {
    DBUG_PRINT("info", ("Generating shadow key"));
    col.setName("$PK");
    col.setType(NdbDictionary::Column::Bigunsigned);
    col.setLength(1);
unknown's avatar
unknown committed
3742
    col.setNullable(FALSE);
unknown's avatar
unknown committed
3743 3744 3745
    col.setPrimaryKey(TRUE);
    col.setAutoIncrement(TRUE);
    tab.addColumn(col);
unknown's avatar
unknown committed
3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761
    pk_length += 2;
  }
  
  // Make sure that blob tables don't have to big part size
  for (i= 0; i < form->fields; i++) 
  {
    /**
     * The extra +7 concists
     * 2 - words from pk in blob table
     * 5 - from extra words added by tup/dict??
     */
    switch (form->field[i]->real_type()) {
    case MYSQL_TYPE_BLOB:    
    case MYSQL_TYPE_MEDIUM_BLOB:   
    case MYSQL_TYPE_LONG_BLOB: 
    {
3762 3763
      NdbDictionary::Column * col= tab.getColumn(i);
      int size= pk_length + (col->getPartSize()+3)/4 + 7;
unknown's avatar
unknown committed
3764 3765 3766
      if(size > NDB_MAX_TUPLE_SIZE_IN_WORDS && 
	 (pk_length+7) < NDB_MAX_TUPLE_SIZE_IN_WORDS)
      {
3767
	size= NDB_MAX_TUPLE_SIZE_IN_WORDS - pk_length - 7;
unknown's avatar
unknown committed
3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778
	col->setPartSize(4*size);
      }
      /**
       * If size > NDB_MAX and pk_length+7 >= NDB_MAX
       *   then the table can't be created anyway, so skip
       *   changing part size, and have error later
       */ 
    }
    default:
      break;
    }
unknown's avatar
unknown committed
3779 3780
  }
  
3781
  if ((my_errno= check_ndb_connection()))
unknown's avatar
unknown committed
3782 3783 3784 3785
    DBUG_RETURN(my_errno);
  
  // Create the table in NDB     
  NDBDICT *dict= m_ndb->getDictionary();
3786
  if (dict->createTable(tab) != 0) 
unknown's avatar
unknown committed
3787 3788 3789 3790 3791 3792 3793 3794
  {
    const NdbError err= dict->getNdbError();
    ERR_PRINT(err);
    my_errno= ndb_to_mysql_error(&err);
    DBUG_RETURN(my_errno);
  }
  DBUG_PRINT("info", ("Table %s/%s created successfully", 
                      m_dbname, m_tabname));
3795

unknown's avatar
unknown committed
3796 3797
  // Create secondary indexes
  my_errno= build_index_list(form, ILBP_CREATE);
3798

3799 3800 3801
  if (!my_errno)
    my_errno= write_ndb_file();

unknown's avatar
unknown committed
3802 3803 3804 3805
  DBUG_RETURN(my_errno);
}


3806 3807 3808 3809
int ha_ndbcluster::create_ordered_index(const char *name, 
					KEY *key_info)
{
  DBUG_ENTER("create_ordered_index");
unknown's avatar
unknown committed
3810
  DBUG_RETURN(create_index(name, key_info, FALSE));
3811 3812 3813 3814 3815 3816
}

int ha_ndbcluster::create_unique_index(const char *name, 
				       KEY *key_info)
{

3817
  DBUG_ENTER("create_unique_index");
unknown's avatar
unknown committed
3818
  DBUG_RETURN(create_index(name, key_info, TRUE));
3819 3820 3821
}


unknown's avatar
unknown committed
3822 3823 3824 3825 3826
/*
  Create an index in NDB Cluster
 */

int ha_ndbcluster::create_index(const char *name, 
3827 3828 3829
				KEY *key_info,
				bool unique)
{
unknown's avatar
unknown committed
3830 3831 3832 3833 3834 3835
  NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
  KEY_PART_INFO *key_part= key_info->key_part;
  KEY_PART_INFO *end= key_part + key_info->key_parts;
  
  DBUG_ENTER("create_index");
  DBUG_PRINT("enter", ("name: %s ", name));
3836

unknown's avatar
unknown committed
3837
  NdbDictionary::Index ndb_index(name);
3838
  if (unique)
unknown's avatar
unknown committed
3839 3840 3841 3842 3843
    ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
  else 
  {
    ndb_index.setType(NdbDictionary::Index::OrderedIndex);
    // TODO Only temporary ordered indexes supported
unknown's avatar
unknown committed
3844
    ndb_index.setLogging(FALSE); 
unknown's avatar
unknown committed
3845 3846 3847 3848 3849 3850 3851
  }
  ndb_index.setTable(m_tabname);

  for (; key_part != end; key_part++) 
  {
    Field *field= key_part->field;
    DBUG_PRINT("info", ("attr: %s", field->field_name));
3852 3853 3854 3855 3856 3857
    {
      char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
      strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
      truncated_field_name[sizeof(truncated_field_name)-1]= '\0';
      ndb_index.addColumnName(truncated_field_name);
    }
unknown's avatar
unknown committed
3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881
  }
  
  if (dict->createIndex(ndb_index))
    ERR_RETURN(dict->getNdbError());

  // Success
  DBUG_PRINT("info", ("Created index %s", name));
  DBUG_RETURN(0);  
}


/*
  Rename a table in NDB Cluster
*/

int ha_ndbcluster::rename_table(const char *from, const char *to)
{
  char new_tabname[FN_HEADLEN];

  DBUG_ENTER("ha_ndbcluster::rename_table");
  set_dbname(from);
  set_tabname(from);
  set_tabname(to, new_tabname);

3882 3883 3884
  if (check_ndb_connection())
    DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);

unknown's avatar
unknown committed
3885 3886 3887

  int result= alter_table_name(m_tabname, new_tabname);
  if (result == 0)
3888
  {
unknown's avatar
unknown committed
3889
    set_tabname(to);
3890 3891
    handler::rename_table(from, to);
  }
unknown's avatar
unknown committed
3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916
  
  DBUG_RETURN(result);
}


/*
  Rename a table in NDB Cluster using alter table
 */

int ha_ndbcluster::alter_table_name(const char *from, const char *to)
{
  NDBDICT *dict= m_ndb->getDictionary();
  const NDBTAB *orig_tab;
  DBUG_ENTER("alter_table_name_table");
  DBUG_PRINT("enter", ("Renaming %s to %s", from, to));

  if (!(orig_tab= dict->getTable(from)))
    ERR_RETURN(dict->getNdbError());
      
  NdbDictionary::Table copy_tab= dict->getTableForAlteration(from);
  copy_tab.setName(to);
  if (dict->alterTable(copy_tab) != 0)
    ERR_RETURN(dict->getNdbError());

  m_table= NULL;
unknown's avatar
unknown committed
3917
  m_table_info= NULL;
unknown's avatar
unknown committed
3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935
                                                                             
  DBUG_RETURN(0);
}


/*
  Delete a table from NDB Cluster
 */

int ha_ndbcluster::delete_table(const char *name)
{
  DBUG_ENTER("delete_table");
  DBUG_PRINT("enter", ("name: %s", name));
  set_dbname(name);
  set_tabname(name);
  
  if (check_ndb_connection())
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
3936 3937

  handler::delete_table(name);
unknown's avatar
unknown committed
3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977
  DBUG_RETURN(drop_table());
}


/*
  Drop a table in NDB Cluster
 */

int ha_ndbcluster::drop_table()
{
  NdbDictionary::Dictionary *dict= m_ndb->getDictionary();

  DBUG_ENTER("drop_table");
  DBUG_PRINT("enter", ("Deleting %s", m_tabname));
  
  if (dict->dropTable(m_tabname)) 
  {
    const NdbError err= dict->getNdbError();
    if (err.code == 709)
      ; // 709: No such table existed
    else 
      ERR_RETURN(dict->getNdbError());
  }  
  release_metadata();
  DBUG_RETURN(0);
}


/*
  Drop a database in NDB Cluster
 */

int ndbcluster_drop_database(const char *path)
{
  DBUG_ENTER("ndbcluster_drop_database");
  // TODO drop all tables for this database
  DBUG_RETURN(1);
}


3978
ulonglong ha_ndbcluster::get_auto_increment()
3979
{  
3980 3981
  int cache_size;
  Uint64 auto_value;
unknown's avatar
unknown committed
3982 3983
  DBUG_ENTER("get_auto_increment");
  DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
unknown's avatar
unknown committed
3984
  cache_size= 
3985
    (m_rows_to_insert - m_rows_inserted < m_autoincrement_prefetch) ?
3986
    m_rows_to_insert - m_rows_inserted 
3987
    : (m_rows_to_insert > m_autoincrement_prefetch) ? 
3988
    m_rows_to_insert 
3989
    : m_autoincrement_prefetch;
unknown's avatar
unknown committed
3990
  auto_value= 
3991
    (m_skip_auto_increment) ? 
unknown's avatar
unknown committed
3992 3993
    m_ndb->readAutoIncrementValue((const NDBTAB *) m_table)
    : m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size);
unknown's avatar
unknown committed
3994
  DBUG_RETURN((longlong)auto_value);
unknown's avatar
unknown committed
3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005
}


/*
  Constructor for the NDB Cluster table handler 
 */

ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
  handler(table_arg),
  m_active_trans(NULL),
  m_active_cursor(NULL),
unknown's avatar
unknown committed
4006
  m_multi_cursor(NULL),
unknown's avatar
unknown committed
4007 4008
  m_ndb(NULL),
  m_table(NULL),
4009
  m_table_info(NULL),
unknown's avatar
unknown committed
4010
  m_table_flags(HA_REC_NOT_IN_SEQ |
unknown's avatar
unknown committed
4011
		HA_NULL_IN_KEY |
4012
		HA_AUTO_PART_KEY |
4013
                HA_NO_VARCHAR |
4014
		HA_NO_PREFIX_CHAR_KEYS),
4015
  m_share(0),
unknown's avatar
unknown committed
4016
  m_use_write(FALSE),
4017
  m_ignore_dup_key(FALSE),
4018 4019
  m_primary_key_update(FALSE),
  m_retrieve_all_fields(FALSE),
4020
  m_retrieve_primary_key(FALSE),
4021 4022 4023 4024 4025 4026 4027 4028 4029
  m_rows_to_insert(1),
  m_rows_inserted(0),
  m_bulk_insert_rows(1024),
  m_bulk_insert_not_flushed(FALSE),
  m_ops_pending(0),
  m_skip_auto_increment(TRUE),
  m_blobs_pending(0),
  m_blobs_buffer(0),
  m_blobs_buffer_size(0),
4030 4031 4032 4033
  m_dupkey((uint) -1),
  m_ha_not_exact_count(FALSE),
  m_force_send(TRUE),
  m_autoincrement_prefetch(32),
unknown's avatar
unknown committed
4034
  m_transaction_on(TRUE),
4035 4036
  m_use_local_query_cache(FALSE),
  m_cond_stack(NULL)
unknown's avatar
unknown committed
4037
{ 
4038 4039
  int i;
  
unknown's avatar
unknown committed
4040 4041 4042 4043 4044
  DBUG_ENTER("ha_ndbcluster");

  m_tabname[0]= '\0';
  m_dbname[0]= '\0';

4045
  records= ~(ha_rows)0; // uninitialized
unknown's avatar
unknown committed
4046 4047
  block_size= 1024;

4048 4049
  for (i= 0; i < MAX_KEY; i++)
  {
4050 4051 4052
    m_index[i].type= UNDEFINED_INDEX;   
    m_index[i].unique_index= NULL;      
    m_index[i].index= NULL;      
4053 4054
  }

unknown's avatar
unknown committed
4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066
  DBUG_VOID_RETURN;
}


/*
  Destructor for NDB Cluster table handler
 */

ha_ndbcluster::~ha_ndbcluster() 
{
  DBUG_ENTER("~ha_ndbcluster");

4067 4068
  if (m_share)
    free_share(m_share);
unknown's avatar
unknown committed
4069
  release_metadata();
4070 4071
  my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
  m_blobs_buffer= 0;
unknown's avatar
unknown committed
4072 4073

  // Check for open cursor/transaction
4074 4075
  if (m_active_cursor) {
  }
unknown's avatar
unknown committed
4076
  DBUG_ASSERT(m_active_cursor == NULL);
4077 4078
  if (m_active_trans) {
  }
unknown's avatar
unknown committed
4079 4080
  DBUG_ASSERT(m_active_trans == NULL);

4081 4082 4083 4084
  // Discard the condition stack
  DBUG_PRINT("info", ("Clearing condition stack"));
  cond_clear();

unknown's avatar
unknown committed
4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096
  DBUG_VOID_RETURN;
}


/*
  Open a table for further use
  - fetch metadata for this table from NDB
  - check that table exists
*/

int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
{
unknown's avatar
unknown committed
4097
  int res;
unknown's avatar
unknown committed
4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119
  KEY *key;
  DBUG_ENTER("open");
  DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
                       name, mode, test_if_locked));
  
  // Setup ref_length to make room for the whole 
  // primary key to be written in the ref variable
  
  if (table->primary_key != MAX_KEY) 
  {
    key= table->key_info+table->primary_key;
    ref_length= key->key_length;
    DBUG_PRINT("info", (" ref_length: %d", ref_length));
  }
  // Init table lock structure 
  if (!(m_share=get_share(name)))
    DBUG_RETURN(1);
  thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
  
  set_dbname(name);
  set_tabname(name);
  
4120 4121
  if (check_ndb_connection()) {
    free_share(m_share); m_share= 0;
unknown's avatar
unknown committed
4122
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
4123
  }
4124
  
unknown's avatar
unknown committed
4125 4126 4127
  res= get_metadata(name);
  if (!res)
    info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
unknown's avatar
unknown committed
4128

unknown's avatar
unknown committed
4129
  DBUG_RETURN(res);
unknown's avatar
unknown committed
4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140
}


/*
  Close the table
  - release resources setup by open()
 */

int ha_ndbcluster::close(void)
{
  DBUG_ENTER("close");  
4141
  free_share(m_share); m_share= 0;
unknown's avatar
unknown committed
4142 4143 4144 4145 4146 4147
  release_metadata();
  m_ndb= NULL;
  DBUG_RETURN(0);
}


4148
Thd_ndb* ha_ndbcluster::seize_thd_ndb()
unknown's avatar
unknown committed
4149
{
4150 4151
  Thd_ndb *thd_ndb;
  DBUG_ENTER("seize_thd_ndb");
unknown's avatar
unknown committed
4152

4153 4154 4155
  thd_ndb= new Thd_ndb();
  thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
  if (thd_ndb->ndb->init(max_transactions) != 0)
unknown's avatar
unknown committed
4156
  {
4157
    ERR_PRINT(thd_ndb->ndb->getNdbError());
unknown's avatar
unknown committed
4158 4159 4160 4161 4162 4163
    /*
      TODO 
      Alt.1 If init fails because to many allocated Ndb 
      wait on condition for a Ndb object to be released.
      Alt.2 Seize/release from pool, wait until next release 
    */
4164 4165
    delete thd_ndb;
    thd_ndb= NULL;
unknown's avatar
unknown committed
4166
  }
4167
  DBUG_RETURN(thd_ndb);
unknown's avatar
unknown committed
4168 4169 4170
}


4171
void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
unknown's avatar
unknown committed
4172
{
4173 4174
  DBUG_ENTER("release_thd_ndb");
  delete thd_ndb;
unknown's avatar
unknown committed
4175 4176 4177 4178 4179
  DBUG_VOID_RETURN;
}


/*
unknown's avatar
unknown committed
4180
  If this thread already has a Thd_ndb object allocated
unknown's avatar
unknown committed
4181
  in current THD, reuse it. Otherwise
unknown's avatar
unknown committed
4182
  seize a Thd_ndb object, assign it to current THD and use it.
unknown's avatar
unknown committed
4183 4184 4185
 
*/

4186
Ndb* check_ndb_in_thd(THD* thd)
unknown's avatar
unknown committed
4187
{
4188
  DBUG_ENTER("check_ndb_in_thd");
4189
  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
unknown's avatar
unknown committed
4190
  
4191
  if (!thd_ndb)
unknown's avatar
unknown committed
4192
  {
unknown's avatar
unknown committed
4193
    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
4194
      DBUG_RETURN(NULL);
4195
    thd->transaction.thd_ndb= thd_ndb;
unknown's avatar
unknown committed
4196
  }
unknown's avatar
unknown committed
4197
  DBUG_RETURN(thd_ndb->ndb);
4198 4199
}

unknown's avatar
unknown committed
4200

unknown's avatar
unknown committed
4201 4202 4203 4204 4205
int ha_ndbcluster::check_ndb_connection()
{
  THD* thd= current_thd;
  DBUG_ENTER("check_ndb_connection");
  
unknown's avatar
unknown committed
4206
  if (!(m_ndb= check_ndb_in_thd(thd)))
4207
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
unknown's avatar
unknown committed
4208 4209 4210 4211
  m_ndb->setDatabaseName(m_dbname);
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
4212

unknown's avatar
unknown committed
4213 4214
void ndbcluster_close_connection(THD *thd)
{
4215
  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
unknown's avatar
unknown committed
4216
  DBUG_ENTER("ndbcluster_close_connection");
4217 4218
  if (thd_ndb)
  {
4219
    ha_ndbcluster::release_thd_ndb(thd_ndb);
4220 4221
    thd->transaction.thd_ndb= NULL;
  }
unknown's avatar
unknown committed
4222 4223 4224 4225 4226 4227 4228 4229
  DBUG_VOID_RETURN;
}


/*
  Try to discover one table from NDB
 */

4230
int ndbcluster_discover(THD* thd, const char *db, const char *name,
unknown's avatar
unknown committed
4231 4232 4233 4234 4235
			const void** frmblob, uint* frmlen)
{
  uint len;
  const void* data;
  const NDBTAB* tab;
4236
  Ndb* ndb;
unknown's avatar
unknown committed
4237
  DBUG_ENTER("ndbcluster_discover");
4238
  DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); 
unknown's avatar
unknown committed
4239

4240 4241 4242
  if (!(ndb= check_ndb_in_thd(thd)))
    DBUG_RETURN(HA_ERR_NO_CONNECTION);  
  ndb->setDatabaseName(db);
4243

4244
  NDBDICT* dict= ndb->getDictionary();
unknown's avatar
unknown committed
4245
  dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
4246 4247 4248 4249 4250 4251 4252
  dict->invalidateTable(name);
  if (!(tab= dict->getTable(name)))
  {    
    const NdbError err= dict->getNdbError();
    if (err.code == 709)
      DBUG_RETURN(1);
    ERR_RETURN(err);
unknown's avatar
unknown committed
4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274
  }
  
  DBUG_PRINT("info", ("Found table %s", tab->getName()));
  
  len= tab->getFrmLength();  
  if (len == 0 || tab->getFrmData() == NULL)
  {
    DBUG_PRINT("No frm data found",
               ("Table is probably created via NdbApi")); 
    DBUG_RETURN(2);
  }
  
  if (unpackfrm(&data, &len, tab->getFrmData()))
    DBUG_RETURN(3);

  *frmlen= len;
  *frmblob= data;
  
  DBUG_RETURN(0);
}

/*
4275 4276 4277
  Check if a table exists in NDB
   
 */
unknown's avatar
unknown committed
4278

4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292
int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
{
  uint len;
  const void* data;
  const NDBTAB* tab;
  Ndb* ndb;
  DBUG_ENTER("ndbcluster_table_exists");
  DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); 

  if (!(ndb= check_ndb_in_thd(thd)))
    DBUG_RETURN(HA_ERR_NO_CONNECTION);  
  ndb->setDatabaseName(db);

  NDBDICT* dict= ndb->getDictionary();
unknown's avatar
unknown committed
4293
  dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306
  dict->invalidateTable(name);
  if (!(tab= dict->getTable(name)))
  {    
    const NdbError err= dict->getNdbError();
    if (err.code == 709)
      DBUG_RETURN(0);
    ERR_RETURN(err);
  }
  
  DBUG_PRINT("info", ("Found table %s", tab->getName()));
  DBUG_RETURN(1);
}

unknown's avatar
unknown committed
4307 4308


unknown's avatar
unknown committed
4309 4310
extern "C" byte* tables_get_key(const char *entry, uint *length,
				my_bool not_used __attribute__((unused)))
4311 4312 4313 4314 4315 4316 4317
{
  *length= strlen(entry);
  return (byte*) entry;
}


int ndbcluster_find_files(THD *thd,const char *db,const char *path,
unknown's avatar
unknown committed
4318
			  const char *wild, bool dir, List<char> *files)
unknown's avatar
unknown committed
4319
{
4320 4321 4322
  DBUG_ENTER("ndbcluster_find_files");
  DBUG_PRINT("enter", ("db: %s", db));
  { // extra bracket to avoid gcc 2.95.3 warning
unknown's avatar
unknown committed
4323
  uint i;
4324
  Ndb* ndb;
4325
  char name[FN_REFLEN];
unknown's avatar
unknown committed
4326
  HASH ndb_tables, ok_tables;
unknown's avatar
unknown committed
4327
  NdbDictionary::Dictionary::List list;
4328 4329 4330 4331

  if (!(ndb= check_ndb_in_thd(thd)))
    DBUG_RETURN(HA_ERR_NO_CONNECTION);

4332
  if (dir)
unknown's avatar
unknown committed
4333
    DBUG_RETURN(0); // Discover of databases not yet supported
4334

unknown's avatar
unknown committed
4335
  // List tables in NDB
4336
  NDBDICT *dict= ndb->getDictionary();
unknown's avatar
unknown committed
4337 4338
  if (dict->listObjects(list, 
			NdbDictionary::Object::UserTable) != 0)
unknown's avatar
unknown committed
4339
    ERR_RETURN(dict->getNdbError());
4340

unknown's avatar
unknown committed
4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355
  if (hash_init(&ndb_tables, system_charset_info,list.count,0,0,
		(hash_get_key)tables_get_key,0,0))
  {
    DBUG_PRINT("error", ("Failed to init HASH ndb_tables"));
    DBUG_RETURN(-1);
  }

  if (hash_init(&ok_tables, system_charset_info,32,0,0,
		(hash_get_key)tables_get_key,0,0))
  {
    DBUG_PRINT("error", ("Failed to init HASH ok_tables"));
    hash_free(&ndb_tables);
    DBUG_RETURN(-1);
  }  

unknown's avatar
unknown committed
4356 4357 4358
  for (i= 0 ; i < list.count ; i++)
  {
    NdbDictionary::Dictionary::List::Element& t= list.elements[i];
unknown's avatar
unknown committed
4359
    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
unknown's avatar
unknown committed
4360

4361 4362 4363
    // Add only tables that belongs to db
    if (my_strcasecmp(system_charset_info, t.database, db))
      continue;
unknown's avatar
unknown committed
4364

unknown's avatar
unknown committed
4365
    // Apply wildcard to list of tables in NDB
4366
    if (wild)
4367
    {
4368 4369 4370 4371 4372 4373 4374 4375
      if (lower_case_table_names)
      {
	if (wild_case_compare(files_charset_info, t.name, wild))
	  continue;
      }
      else if (wild_compare(t.name,wild,0))
	continue;
    }
unknown's avatar
unknown committed
4376 4377
    DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));     
    my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
unknown's avatar
unknown committed
4378 4379
  }

unknown's avatar
unknown committed
4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394
  char *file_name;
  List_iterator<char> it(*files);
  List<char> delete_list;
  while ((file_name=it++))
  {
    DBUG_PRINT("info", ("%s", file_name));     
    if (hash_search(&ndb_tables, file_name, strlen(file_name)))
    {
      DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
      // File existed in NDB and as frm file, put in ok_tables list
      my_hash_insert(&ok_tables, (byte*)file_name);
      continue;
    }
    
    // File is not in NDB, check for .ndb file with this name
4395
    (void)strxnmov(name, FN_REFLEN, 
unknown's avatar
unknown committed
4396 4397
		   mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
    DBUG_PRINT("info", ("Check access for %s", name));
4398
    if (access(name, F_OK))
unknown's avatar
unknown committed
4399 4400 4401 4402
    {
      DBUG_PRINT("info", ("%s did not exist on disk", name));     
      // .ndb file did not exist on disk, another table type
      continue;
4403
    }
4404

unknown's avatar
unknown committed
4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415
    DBUG_PRINT("info", ("%s existed on disk", name));     
    // The .ndb file exists on disk, but it's not in list of tables in ndb
    // Verify that handler agrees table is gone.
    if (ndbcluster_table_exists(thd, db, file_name) == 0)    
    {
      DBUG_PRINT("info", ("NDB says %s does not exists", file_name));     
      it.remove();
      // Put in list of tables to remove from disk
      delete_list.push_back(thd->strdup(file_name));
    }
  }
4416

unknown's avatar
unknown committed
4417 4418 4419 4420
  // Check for new files to discover
  DBUG_PRINT("info", ("Checking for new files to discover"));       
  List<char> create_list;
  for (i= 0 ; i < ndb_tables.records ; i++)
4421
  {
unknown's avatar
unknown committed
4422 4423
    file_name= hash_element(&ndb_tables, i);
    if (!hash_search(&ok_tables, file_name, strlen(file_name)))
4424
    {
unknown's avatar
unknown committed
4425 4426 4427 4428 4429 4430
      DBUG_PRINT("info", ("%s must be discovered", file_name));       
      // File is in list of ndb tables and not in ok_tables
      // This table need to be created
      create_list.push_back(thd->strdup(file_name));
    }
  }
4431

unknown's avatar
unknown committed
4432 4433
  // Lock mutex before deleting and creating frm files
  pthread_mutex_lock(&LOCK_open);
4434

unknown's avatar
unknown committed
4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445
  if (!global_read_lock)
  {
    // Delete old files
    List_iterator_fast<char> it3(delete_list);
    while ((file_name=it3++))
    {  
      DBUG_PRINT("info", ("Remove table %s/%s",db, file_name ));
      // Delete the table and all related files
      TABLE_LIST table_list;
      bzero((char*) &table_list,sizeof(table_list));
      table_list.db= (char*) db;
4446
      table_list.alias=table_list.real_name=(char*)file_name;
unknown's avatar
unknown committed
4447
      (void)mysql_rm_table_part2(thd, &table_list, 
unknown's avatar
unknown committed
4448 4449
				 /* if_exists */ TRUE, 
				 /* drop_temporary */ FALSE, 
unknown's avatar
unknown committed
4450
				 /* drop_view */ FALSE,
unknown's avatar
unknown committed
4451
				 /* dont_log_query*/ TRUE);
4452 4453 4454
    }
  }

unknown's avatar
unknown committed
4455 4456 4457 4458 4459
  // Create new files
  List_iterator_fast<char> it2(create_list);
  while ((file_name=it2++))
  {  
    DBUG_PRINT("info", ("Table %s need discovery", name));
unknown's avatar
unknown committed
4460
    if (ha_create_table_from_engine(thd, db, file_name, TRUE) == 0)
4461
      files->push_back(thd->strdup(file_name)); 
unknown's avatar
unknown committed
4462 4463 4464 4465 4466
  }

  pthread_mutex_unlock(&LOCK_open);      
  
  hash_free(&ok_tables);
4467
  hash_free(&ndb_tables);
4468
  } // extra bracket to avoid gcc 2.95.3 warning
4469
  DBUG_RETURN(0);    
unknown's avatar
unknown committed
4470 4471 4472 4473 4474 4475 4476 4477 4478 4479
}


/*
  Initialise all gloal variables before creating 
  a NDB Cluster table handler
 */

bool ndbcluster_init()
{
unknown's avatar
unknown committed
4480
  int res;
unknown's avatar
unknown committed
4481
  DBUG_ENTER("ndbcluster_init");
4482
  // Set connectstring if specified
unknown's avatar
unknown committed
4483
  if (ndbcluster_connectstring != 0)
4484
    DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring));     
4485 4486 4487 4488
  if ((g_ndb_cluster_connection=
       new Ndb_cluster_connection(ndbcluster_connectstring)) == 0)
  {
    DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring));
unknown's avatar
unknown committed
4489
    goto ndbcluster_init_error;
4490
  }
unknown's avatar
unknown committed
4491

unknown's avatar
unknown committed
4492
  // Create a Ndb object to open the connection  to NDB
4493
  g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
4494
  g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
unknown's avatar
unknown committed
4495 4496 4497
  if (g_ndb->init() != 0)
  {
    ERR_PRINT (g_ndb->getNdbError());
unknown's avatar
unknown committed
4498
    goto ndbcluster_init_error;
unknown's avatar
unknown committed
4499
  }
unknown's avatar
unknown committed
4500

unknown's avatar
unknown committed
4501
  if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
unknown's avatar
unknown committed
4502
  {
unknown's avatar
unknown committed
4503 4504 4505
    DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
		       g_ndb_cluster_connection->get_connected_host(),
		       g_ndb_cluster_connection->get_connected_port()));
unknown's avatar
unknown committed
4506 4507
    g_ndb->waitUntilReady(10);
  } 
unknown's avatar
unknown committed
4508
  else if(res == 1)
unknown's avatar
unknown committed
4509
  {
unknown's avatar
unknown committed
4510 4511
    if (g_ndb_cluster_connection->start_connect_thread()) {
      DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
unknown's avatar
unknown committed
4512 4513 4514 4515 4516 4517
      goto ndbcluster_init_error;
    }
    {
      char buf[1024];
      DBUG_PRINT("info",("NDBCLUSTER storage engine not started, will connect using %s",
			 g_ndb_cluster_connection->get_connectstring(buf,sizeof(buf))));
unknown's avatar
unknown committed
4518
    }
unknown's avatar
unknown committed
4519
  }
unknown's avatar
unknown committed
4520
  else
unknown's avatar
unknown committed
4521 4522 4523
  {
    DBUG_ASSERT(res == -1);
    DBUG_PRINT("error", ("permanent error"));
unknown's avatar
unknown committed
4524
    goto ndbcluster_init_error;
unknown's avatar
unknown committed
4525
  }
unknown's avatar
unknown committed
4526
  
unknown's avatar
unknown committed
4527 4528 4529
  (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                   (hash_get_key) ndbcluster_get_key,0,0);
  pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
4530

unknown's avatar
unknown committed
4531 4532 4533
  ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
  if (ndb_discover_tables() != 0)
unknown's avatar
unknown committed
4534
    goto ndbcluster_init_error;    
unknown's avatar
unknown committed
4535
#endif
unknown's avatar
unknown committed
4536
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
4537 4538 4539
 ndbcluster_init_error:
  ndbcluster_end();
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551
}


/*
  End use of the NDB Cluster table handler
  - free all global variables allocated by 
    ndcluster_init()
*/

bool ndbcluster_end()
{
  DBUG_ENTER("ndbcluster_end");
unknown's avatar
unknown committed
4552 4553
  if(g_ndb)
    delete g_ndb;
unknown's avatar
unknown committed
4554
  g_ndb= NULL;
4555 4556 4557
  if (g_ndb_cluster_connection)
    delete g_ndb_cluster_connection;
  g_ndb_cluster_connection= NULL;
unknown's avatar
unknown committed
4558 4559 4560 4561 4562 4563 4564 4565
  if (!ndbcluster_inited)
    DBUG_RETURN(0);
  hash_free(&ndbcluster_open_tables);
  pthread_mutex_destroy(&ndbcluster_mutex);
  ndbcluster_inited= 0;
  DBUG_RETURN(0);
}

4566 4567 4568 4569 4570
/*
  Static error print function called from
  static handler method ndbcluster_commit
  and ndbcluster_rollback
*/
4571 4572

void ndbcluster_print_error(int error, const NdbOperation *error_op)
4573
{
4574 4575
  DBUG_ENTER("ndbcluster_print_error");
  TABLE tab;
4576 4577
  const char *tab_name= (error_op) ? error_op->getTableName() : "";
  tab.table_name= (char *) tab_name;
4578
  ha_ndbcluster error_handler(&tab);
4579
  tab.file= &error_handler;
4580
  error_handler.print_error(error, MYF(0));
unknown's avatar
unknown committed
4581
  DBUG_VOID_RETURN;
4582
}
unknown's avatar
unknown committed
4583 4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597 4598 4599 4600 4601 4602 4603 4604 4605

/*
  Set m_tabname from full pathname to table file 
 */

void ha_ndbcluster::set_tabname(const char *path_name)
{
  char *end, *ptr;
  
  /* Scan name from the end */
  end= strend(path_name)-1;
  ptr= end;
  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
    ptr--;
  }
  uint name_len= end - ptr;
  memcpy(m_tabname, ptr + 1, end - ptr);
  m_tabname[name_len]= '\0';
#ifdef __WIN__
  /* Put to lower case */
  ptr= m_tabname;
  
  while (*ptr != '\0') {
4606
    *ptr= tolower(*ptr);
unknown's avatar
unknown committed
4607 4608 4609 4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621
    ptr++;
  }
#endif
}

/**
 * Set a given location from full pathname to table file
 *
 */
void
ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
{
  char *end, *ptr;
  
  /* Scan name from the end */
4622 4623
  end= strend(path_name)-1;
  ptr= end;
unknown's avatar
unknown committed
4624 4625 4626
  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
    ptr--;
  }
4627
  uint name_len= end - ptr;
unknown's avatar
unknown committed
4628
  memcpy(tabname, ptr + 1, end - ptr);
4629
  tabname[name_len]= '\0';
unknown's avatar
unknown committed
4630 4631
#ifdef __WIN__
  /* Put to lower case */
4632
  ptr= tabname;
unknown's avatar
unknown committed
4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675 4676 4677
  
  while (*ptr != '\0') {
    *ptr= tolower(*ptr);
    ptr++;
  }
#endif
}


/*
  Set m_dbname from full pathname to table file
 
 */

void ha_ndbcluster::set_dbname(const char *path_name)
{
  char *end, *ptr;
  
  /* Scan name from the end */
  ptr= strend(path_name)-1;
  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
    ptr--;
  }
  ptr--;
  end= ptr;
  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
    ptr--;
  }
  uint name_len= end - ptr;
  memcpy(m_dbname, ptr + 1, name_len);
  m_dbname[name_len]= '\0';
#ifdef __WIN__
  /* Put to lower case */
  
  ptr= m_dbname;
  
  while (*ptr != '\0') {
    *ptr= tolower(*ptr);
    ptr++;
  }
#endif
}


ha_rows 
unknown's avatar
unknown committed
4678 4679 4680 4681
ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
                                key_range *max_key)
{
  KEY *key_info= table->key_info + inx;
unknown's avatar
unknown committed
4682
  uint key_length= key_info->key_length;
4683
  NDB_INDEX_TYPE idx_type= get_index_type(inx);  
unknown's avatar
unknown committed
4684 4685

  DBUG_ENTER("records_in_range");
4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699
  // Prevent partial read of hash indexes by returning HA_POS_ERROR
  if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
      ((min_key && min_key->length < key_length) ||
       (max_key && max_key->length < key_length)))
    DBUG_RETURN(HA_POS_ERROR);
  
  // Read from hash index with full key
  // This is a "const" table which returns only one record!      
  if ((idx_type != ORDERED_INDEX) &&
      ((min_key && min_key->length == key_length) || 
       (max_key && max_key->length == key_length)))
    DBUG_RETURN(1);
  
  DBUG_RETURN(10); /* Good guess when you don't know anything */
unknown's avatar
unknown committed
4700 4701
}

4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738
ulong ha_ndbcluster::table_flags(void) const
{
  if (m_ha_not_exact_count)
    return m_table_flags | HA_NOT_EXACT_COUNT;
  else
    return m_table_flags;
}
const char * ha_ndbcluster::table_type() const 
{
  return("ndbcluster");
}
uint ha_ndbcluster::max_supported_record_length() const
{ 
  return NDB_MAX_TUPLE_SIZE;
}
uint ha_ndbcluster::max_supported_keys() const
{
  return MAX_KEY;
}
uint ha_ndbcluster::max_supported_key_parts() const 
{
  return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY;
}
uint ha_ndbcluster::max_supported_key_length() const
{
  return NDB_MAX_KEY_SIZE;
}
bool ha_ndbcluster::low_byte_first() const
{ 
#ifdef WORDS_BIGENDIAN
  return FALSE;
#else
  return TRUE;
#endif
}
bool ha_ndbcluster::has_transactions()
{
unknown's avatar
unknown committed
4739
  return m_transaction_on;
4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755
}
const char* ha_ndbcluster::index_type(uint key_number)
{
  switch (get_index_type(key_number)) {
  case ORDERED_INDEX:
  case UNIQUE_ORDERED_INDEX:
  case PRIMARY_KEY_ORDERED_INDEX:
    return "BTREE";
  case UNIQUE_INDEX:
  case PRIMARY_KEY_INDEX:
  default:
    return "HASH";
  }
}
uint8 ha_ndbcluster::table_cache_type()
{
unknown's avatar
unknown committed
4756 4757 4758 4759
  if (m_use_local_query_cache)
    return HA_CACHE_TBL_TRANSACT;
  else
    return HA_CACHE_TBL_NOCACHE;
4760
}
unknown's avatar
unknown committed
4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848 4849 4850 4851

/*
  Handling the shared NDB_SHARE structure that is needed to 
  provide table locking.
  It's also used for sharing data with other NDB handlers
  in the same MySQL Server. There is currently not much
  data we want to or can share.
 */

static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
				my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
  return (byte*) share->table_name;
}

static NDB_SHARE* get_share(const char *table_name)
{
  NDB_SHARE *share;
  pthread_mutex_lock(&ndbcluster_mutex);
  uint length=(uint) strlen(table_name);
  if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                       (byte*) table_name,
                                       length)))
  {
    if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
                                       MYF(MY_WME | MY_ZEROFILL))))
    {
      share->table_name_length=length;
      share->table_name=(char*) (share+1);
      strmov(share->table_name,table_name);
      if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
      {
        pthread_mutex_unlock(&ndbcluster_mutex);
        my_free((gptr) share,0);
        return 0;
      }
      thr_lock_init(&share->lock);
      pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
    }
  }
  share->use_count++;
  pthread_mutex_unlock(&ndbcluster_mutex);
  return share;
}


static void free_share(NDB_SHARE *share)
{
  pthread_mutex_lock(&ndbcluster_mutex);
  if (!--share->use_count)
  {
    hash_delete(&ndbcluster_open_tables, (byte*) share);
    thr_lock_delete(&share->lock);
    pthread_mutex_destroy(&share->mutex);
    my_free((gptr) share, MYF(0));
  }
  pthread_mutex_unlock(&ndbcluster_mutex);
}



/*
  Internal representation of the frm blob
   
*/

struct frm_blob_struct 
{
  struct frm_blob_header 
  {
    uint ver;      // Version of header
    uint orglen;   // Original length of compressed data
    uint complen;  // Compressed length of data, 0=uncompressed
  } head;
  char data[1];  
};



static int packfrm(const void *data, uint len, 
		   const void **pack_data, uint *pack_len)
{
  int error;
  ulong org_len, comp_len;
  uint blob_len;
  frm_blob_struct* blob;
  DBUG_ENTER("packfrm");
  DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
  
  error= 1;
4852
  org_len= len;
unknown's avatar
unknown committed
4853 4854 4855 4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871
  if (my_compress((byte*)data, &org_len, &comp_len))
    goto err;
  
  DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
  DBUG_DUMP("compressed", (char*)data, org_len);
  
  error= 2;
  blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
  if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
    goto err;
  
  // Store compressed blob in machine independent format
  int4store((char*)(&blob->head.ver), 1);
  int4store((char*)(&blob->head.orglen), comp_len);
  int4store((char*)(&blob->head.complen), org_len);
  
  // Copy frm data into blob, already in machine independent format
  memcpy(blob->data, data, org_len);  
  
4872 4873 4874
  *pack_data= blob;
  *pack_len= blob_len;
  error= 0;
unknown's avatar
unknown committed
4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885
  
  DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
err:
  DBUG_RETURN(error);
  
}


static int unpackfrm(const void **unpack_data, uint *unpack_len,
		    const void *pack_data)
{
4886
   const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
unknown's avatar
unknown committed
4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901
   byte *data;
   ulong complen, orglen, ver;
   DBUG_ENTER("unpackfrm");
   DBUG_PRINT("enter", ("pack_data: %x", pack_data));

   complen=	uint4korr((char*)&blob->head.complen);
   orglen=	uint4korr((char*)&blob->head.orglen);
   ver=		uint4korr((char*)&blob->head.ver);
 
   DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
 		     ver,complen,orglen));
   DBUG_DUMP("blob->data", (char*) blob->data, complen);
 
   if (ver != 1)
     DBUG_RETURN(1);
4902
   if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
unknown's avatar
unknown committed
4903 4904 4905 4906 4907 4908 4909 4910 4911
     DBUG_RETURN(2);
   memcpy(data, blob->data, complen);
 
   if (my_uncompress(data, &complen, &orglen))
   {
     my_free((char*)data, MYF(0));
     DBUG_RETURN(3);
   }

4912 4913
   *unpack_data= data;
   *unpack_len= complen;
unknown's avatar
unknown committed
4914 4915 4916 4917 4918

   DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));

   DBUG_RETURN(0);
}
unknown's avatar
unknown committed
4919 4920 4921 4922 4923 4924 4925 4926

static 
int
ndb_get_table_statistics(Ndb* ndb, const char * table, 
			 Uint64* row_count, Uint64* commit_count)
{
  DBUG_ENTER("ndb_get_table_statistics");
  DBUG_PRINT("enter", ("table: %s", table));
unknown's avatar
unknown committed
4927
  NdbConnection* pTrans= ndb->startTransaction();
unknown's avatar
unknown committed
4928 4929 4930 4931
  do 
  {
    if (pTrans == NULL)
      break;
unknown's avatar
unknown committed
4932
      
unknown's avatar
unknown committed
4933 4934 4935 4936
    NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
    if (pOp == NULL)
      break;
    
4937
    if (pOp->readTuples(NdbOperation::LM_CommittedRead))
unknown's avatar
unknown committed
4938 4939 4940 4941 4942 4943 4944 4945 4946 4947
      break;
    
    int check= pOp->interpret_exit_last_row();
    if (check == -1)
      break;
    
    Uint64 rows, commits;
    pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
    pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
    
unknown's avatar
unknown committed
4948
    check= pTrans->execute(NoCommit, AbortOnError, TRUE);
unknown's avatar
unknown committed
4949 4950 4951 4952 4953
    if (check == -1)
      break;
    
    Uint64 sum_rows= 0;
    Uint64 sum_commits= 0;
4954
    while((check= pOp->nextResult(TRUE, TRUE)) == 0)
unknown's avatar
unknown committed
4955 4956 4957 4958 4959 4960 4961 4962
    {
      sum_rows+= rows;
      sum_commits+= commits;
    }
    
    if (check == -1)
      break;

4963
    pOp->close(TRUE);
unknown's avatar
unknown committed
4964

unknown's avatar
unknown committed
4965 4966 4967 4968 4969 4970 4971 4972 4973
    ndb->closeTransaction(pTrans);
    if(row_count)
      * row_count= sum_rows;
    if(commit_count)
      * commit_count= sum_commits;
    DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
    DBUG_RETURN(0);
  } while(0);

unknown's avatar
unknown committed
4974
  ndb->closeTransaction(pTrans);
unknown's avatar
unknown committed
4975 4976 4977 4978
  DBUG_PRINT("exit", ("failed"));
  DBUG_RETURN(-1);
}

4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004
/*
  Create a .ndb file to serve as a placeholder indicating 
  that the table with this name is a ndb table
*/

int ha_ndbcluster::write_ndb_file()
{
  File file;
  bool error=1;
  char path[FN_REFLEN];
  
  DBUG_ENTER("write_ndb_file");
  DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));

  (void)strxnmov(path, FN_REFLEN, 
		 mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);

  if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
  {
    // It's an empty file
    error=0;
    my_close(file,MYF(0));
  }
  DBUG_RETURN(error);
}

5005
#ifdef key_multi_range
5006 5007 5008 5009 5010 5011 5012 5013
int
ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
				      key_multi_range *ranges, 
				      uint range_count,
				      bool sorted, 
				      handler_buffer *buffer)
{
  DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
unknown's avatar
unknown committed
5014
  
5015 5016 5017
  int res;
  uint i;
  KEY* key_info= table->key_info + active_index;
5018 5019
  NDB_INDEX_TYPE index_type= get_index_type(active_index);
  ulong reclength= table->reclength;
5020 5021
  NdbOperation* op;

5022
  if (uses_blob_value(m_retrieve_all_fields))
unknown's avatar
unknown committed
5023 5024 5025 5026 5027
  {
    /**
     * blobs can't be batched currently
     */
    m_disable_multi_read= true;
unknown's avatar
unknown committed
5028 5029 5030 5031 5032
    DBUG_RETURN(handler::read_multi_range_first(found_range_p, 
						ranges, 
						range_count,
						sorted, 
						buffer));
unknown's avatar
unknown committed
5033 5034
  }

5035
  m_disable_multi_read= false;
5036 5037 5038 5039

  /**
   * Copy arguments into member variables
   */
5040 5041 5042 5043 5044
  multi_ranges= ranges;
  multi_range_count= range_count;
  multi_range_sorted= sorted;
  multi_range_buffer= buffer;

5045 5046 5047 5048 5049 5050 5051 5052 5053 5054 5055 5056 5057 5058 5059
  /**
   * read multi range will read ranges as follows (if not ordered)
   *
   * input    read order
   * ======   ==========
   * pk-op 1  pk-op 1
   * pk-op 2  pk-op 2
   * range 3  range (3,5) NOTE result rows will be intermixed
   * pk-op 4  pk-op 4
   * range 5
   * pk-op 6  pk-ok 6
   
   /**
   * Variables for loop
   */
unknown's avatar
unknown committed
5060 5061
  byte *curr= (byte*)buffer->buffer;
  byte *end_of_buffer= (byte*)buffer->buffer_end;
5062 5063 5064 5065
  NdbOperation::LockMode lm= 
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  const NDBTAB *tab= (const NDBTAB *) m_table;
  const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
5066
  const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index; 
5067 5068
  const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
  NdbIndexScanOperation* scanOp= 0;
unknown's avatar
unknown committed
5069
  for(i= 0; i<range_count && curr+reclength <= end_of_buffer; i++)
5070 5071 5072 5073
  {
    switch(index_type){
    case PRIMARY_KEY_INDEX:
  pk:
5074
    {
5075
      ranges[i].range_flag |= UNIQUE_RANGE;
5076 5077 5078 5079 5080 5081 5082 5083
      if ((op= m_active_trans->getNdbOperation(tab)) && 
	  !op->readTuple(lm) && 
	  !set_primary_key(op, ranges[i].start_key.key) &&
	  !define_read_attrs(curr, op) &&
	  (op->setAbortOption(IgnoreError), true))
	curr += reclength;
      else
	ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
5084
      break;
5085 5086
    }
    break;
5087 5088
    case UNIQUE_INDEX:
  sk:
5089
    {
5090
      ranges[i].range_flag |= UNIQUE_RANGE;
5091 5092
      if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && 
	  !op->readTuple(lm) && 
5093
	  !set_index_key(op, key_info, ranges[i].start_key.key) &&
5094 5095 5096 5097 5098
	  !define_read_attrs(curr, op) &&
	  (op->setAbortOption(IgnoreError), true))
	curr += reclength;
      else
	ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
5099 5100 5101 5102 5103 5104 5105 5106 5107
      break;
    }
    case PRIMARY_KEY_ORDERED_INDEX:
      if (ranges[i].start_key.length == key_info->key_length &&
	  ranges[i].start_key.flag == HA_READ_KEY_EXACT)
	goto pk;
      goto range;
    case UNIQUE_ORDERED_INDEX:
      if (ranges[i].start_key.length == key_info->key_length &&
5108 5109 5110
	  ranges[i].start_key.flag == HA_READ_KEY_EXACT &&
	  !check_null_in_key(key_info, ranges[i].start_key.key,
			     ranges[i].start_key.length))
5111 5112 5113 5114 5115 5116 5117
	goto sk;
      goto range;
    case ORDERED_INDEX:
  range:
      ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
      if (scanOp == 0)
      {
unknown's avatar
unknown committed
5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129
	if (m_multi_cursor)
	{
	  scanOp= m_multi_cursor;
	  DBUG_ASSERT(scanOp->getSorted() == sorted);
	  DBUG_ASSERT(scanOp->getLockMode() == 
		      (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
	  if(scanOp->reset_bounds(m_force_send))
	    DBUG_RETURN(ndb_err(m_active_trans));
	  
	  end_of_buffer -= reclength;
	}
	else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) 
5130
		 && !scanOp->readTuples(lm, 0, parallelism, sorted, false, true) &&
unknown's avatar
unknown committed
5131 5132 5133 5134 5135
		 !define_read_attrs(end_of_buffer-reclength, scanOp))
	{
	  m_multi_cursor= scanOp;
	  m_multi_range_cursor_result_ptr= end_of_buffer-reclength;
	}
5136
	else
unknown's avatar
unknown committed
5137
	{
5138
	  ERR_RETURN(scanOp ? scanOp->getNdbError() : 
5139
		     m_active_trans->getNdbError());
unknown's avatar
unknown committed
5140
	}
5141 5142
      }
      const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
5143
      if ((res= set_bounds(scanOp, keys, i)))
5144 5145
	DBUG_RETURN(res);
      break;
5146 5147 5148
    }
  }
  
5149
  if (i != range_count)
5150
  {
5151 5152 5153 5154 5155 5156
    /**
     * Mark that we're using entire buffer (even if might not) as
     *   we haven't read all ranges for some reason
     * This as we don't want mysqld to reuse the buffer when we read
     *   the remaining ranges
     */
5157
    buffer->end_of_used_area= (byte*)buffer->buffer_end;
5158 5159 5160 5161 5162 5163 5164 5165 5166 5167 5168
  }
  else
  {
    buffer->end_of_used_area= curr;
  }
  
  /**
   * Set first operation in multi range
   */
  m_current_multi_operation= 
    lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
5169
  if (!(res= execute_no_commit_ie(this, m_active_trans)))
5170 5171
  {
    multi_range_curr= 0;
5172 5173 5174
    m_multi_range_defined_count= i;
    m_multi_range_result_ptr= (byte*)buffer->buffer;
    DBUG_RETURN(read_multi_range_next(found_range_p));
5175 5176 5177 5178
  }
  ERR_RETURN(m_active_trans->getNdbError());
}

unknown's avatar
unknown committed
5179 5180 5181 5182 5183 5184
#if 0
#define DBUG_MULTI_RANGE(x) printf("read_multi_range_next: case %d\n", x);
#else
#define DBUG_MULTI_RANGE(x)
#endif

5185
int
5186
ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
5187 5188
{
  DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
5189
  if (m_disable_multi_read)
5190
  {
5191
    DBUG_RETURN(handler::read_multi_range_next(multi_range_found_p));
5192
  }
5193
  
5194
  int res;
5195
  int range_no;
5196 5197
  ulong reclength= table->reclength;
  const NdbOperation* op= m_current_multi_operation;
5198
  for(;multi_range_curr < m_multi_range_defined_count; multi_range_curr++)
5199
  {
5200
    if (multi_ranges[multi_range_curr].range_flag & UNIQUE_RANGE)
5201
    {
5202
      if (op->getNdbError().code == 0)
5203 5204 5205 5206
	goto found_next;
      
      op= m_active_trans->getNextCompletedOperation(op);
      m_multi_range_result_ptr += reclength;
5207
      continue;
5208
    } 
5209
    else if (m_multi_cursor && !multi_range_sorted)
5210
    {
unknown's avatar
unknown committed
5211 5212
      DBUG_MULTI_RANGE(1);
      if ((res= fetch_next(m_multi_cursor)) == 0)
5213
      {
unknown's avatar
unknown committed
5214
	DBUG_MULTI_RANGE(2);
5215 5216 5217 5218 5219 5220 5221 5222
	range_no= m_multi_cursor->get_range_no();
	goto found;
      } 
      else
      {
	goto close_scan;
      }
    }
unknown's avatar
unknown committed
5223
    else if (m_multi_cursor && multi_range_sorted)
5224
    {
unknown's avatar
unknown committed
5225 5226 5227
      if (m_active_cursor && (res= fetch_next(m_multi_cursor)))
      {
	DBUG_MULTI_RANGE(3);
5228
	goto close_scan;
unknown's avatar
unknown committed
5229
      }
5230
      
5231 5232 5233
      range_no= m_multi_cursor->get_range_no();
      if (range_no == multi_range_curr)
      {
unknown's avatar
unknown committed
5234
	DBUG_MULTI_RANGE(4);
5235 5236 5237
        // return current row
	goto found;
      }
unknown's avatar
unknown committed
5238
      else if (range_no > (int)multi_range_curr)
5239
      {
unknown's avatar
unknown committed
5240
	DBUG_MULTI_RANGE(5);
5241 5242 5243 5244 5245 5246
	// wait with current row
	m_active_cursor= 0;
	continue;
      }
      else 
      {
unknown's avatar
unknown committed
5247
	DBUG_MULTI_RANGE(6);
5248 5249
	// First fetch from cursor
	DBUG_ASSERT(range_no == -1);
unknown's avatar
unknown committed
5250 5251 5252 5253
	if((res= m_multi_cursor->nextResult(true)))
	{
	  goto close_scan;
	}
5254 5255 5256
	multi_range_curr--; // Will be increased in for-loop
	continue;
      }
5257
    }
unknown's avatar
unknown committed
5258
    else /** m_multi_cursor == 0 */
5259
    {
unknown's avatar
unknown committed
5260
      DBUG_MULTI_RANGE(7);
5261 5262 5263 5264
      /**
       * Corresponds to range 5 in example in read_multi_range_first
       */
      (void)1;
5265
      continue;
5266
    }
5267 5268 5269 5270 5271
    
    DBUG_ASSERT(false); // Should only get here via goto's
close_scan:
    if (res == 1)
    {
unknown's avatar
unknown committed
5272
      m_multi_cursor->close();
5273
      m_active_cursor= m_multi_cursor= 0;
unknown's avatar
unknown committed
5274
      DBUG_MULTI_RANGE(8);
5275 5276 5277 5278 5279 5280 5281
      continue;
    } 
    else 
    {
      DBUG_RETURN(ndb_err(m_active_trans));
    }
  }
5282 5283
  
  if (multi_range_curr == multi_range_count)
5284
    DBUG_RETURN(HA_ERR_END_OF_FILE);
5285
  
5286 5287 5288
  /**
   * Read remaining ranges
   */
5289
  uint left= multi_range_count - multi_range_curr;
5290 5291 5292 5293 5294
  DBUG_RETURN(read_multi_range_first(multi_range_found_p, 
				     multi_ranges + multi_range_curr,
				     left, 
				     multi_range_sorted,
				     multi_range_buffer));
5295 5296
  
found:
5297 5298 5299
  /**
   * Found a record belonging to a scan
   */
5300
  m_active_cursor= m_multi_cursor;
5301
  * multi_range_found_p= multi_ranges + range_no;
5302 5303
  memcpy(table->record[0], m_multi_range_cursor_result_ptr, reclength);
  setup_recattr(m_active_cursor->getFirstRecAttr());
5304 5305 5306
  unpack_record(table->record[0]);
  table->status= 0;     
  DBUG_RETURN(0);
5307
  
5308
found_next:
5309 5310 5311 5312
  /**
   * Found a record belonging to a pk/index op,
   *   copy result and move to next to prepare for next call
   */
5313 5314
  * multi_range_found_p= multi_ranges + multi_range_curr;
  memcpy(table->record[0], m_multi_range_result_ptr, reclength);
5315
  setup_recattr(op->getFirstRecAttr());
5316
  unpack_record(table->record[0]);
5317 5318
  table->status= 0;
  
5319
  multi_range_curr++;
5320
  m_current_multi_operation= m_active_trans->getNextCompletedOperation(op);
5321 5322
  m_multi_range_result_ptr += reclength;
  DBUG_RETURN(0);
5323 5324
}

5325 5326 5327 5328 5329 5330 5331 5332
int
ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
{
  DBUG_ENTER("setup_recattr");

  Field **field, **end;
  NdbValue *value= m_value;
  
5333
  end= table->field + table->fields;
5334 5335 5336 5337 5338 5339
  
  for (field= table->field; field < end; field++, value++)
  {
    if ((* value).ptr)
    {
      DBUG_ASSERT(curr != 0);
5340 5341
      (* value).rec= curr;
      curr= curr->next();
5342 5343 5344
    }
  }
  
unknown's avatar
unknown committed
5345
  DBUG_RETURN(0);
5346
}
5347
#endif
5348

5349 5350 5351 5352 5353 5354 5355 5356 5357 5358 5359 5360 5361 5362 5363 5364 5365 5366 5367 5368 5369 5370 5371 5372 5373 5374 5375 5376 5377 5378 5379 5380 5381 5382 5383 5384 5385 5386 5387 5388 5389 5390 5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426 5427 5428 5429 5430 5431 5432 5433 5434 5435 5436 5437 5438 5439 5440 5441 5442
/*
  Condition pushdown
*/
const 
COND* 
ha_ndbcluster::cond_push(const COND *cond) 
{ 
  THD *thd= current_thd;
  Ndb_cond_stack *ndb_cond = new Ndb_cond_stack();
  DBUG_ENTER("cond_push");

  if (thd->variables.ndb_condition_pushdown)
  {
    DBUG_EXECUTE("where",print_where((COND *)cond, m_tabname););
    if (m_cond_stack)
      ndb_cond->next= m_cond_stack;
    else
      ndb_cond->next= NULL;
    m_cond_stack= ndb_cond;
    
    if (serialize_cond(cond, ndb_cond))
    {
      DBUG_RETURN(NULL);
    }
    else
    {
      cond_pop();
    }
  }
  DBUG_RETURN(cond); 
}

inline
void 
ha_ndbcluster::cond_pop() 
{ 
  Ndb_cond_stack *ndb_cond_stack= m_cond_stack;  
  if (ndb_cond_stack)
  {
    m_cond_stack= ndb_cond_stack->next;
    delete ndb_cond_stack;
  }
};

void
ha_ndbcluster::cond_clear()
{
  DBUG_ENTER("cond_clear");
  while (m_cond_stack)
    cond_pop();

  DBUG_VOID_RETURN;
}

void ndb_serialize_cond(const Item *item, void *arg)
{
  Ndb_cond_traverse_context *context= (Ndb_cond_traverse_context *) arg;
  DBUG_ENTER("ndb_serialize_cond");  

  if (*context->supported_ptr)
  {
    Ndb_cond_stack *ndb_stack= context->stack_ptr;
    Ndb_cond *prev_cond= context->cond_ptr;
    Ndb_cond *curr_cond= context->cond_ptr= new Ndb_cond();
    if (!ndb_stack->ndb_cond)
      ndb_stack->ndb_cond= curr_cond;
    curr_cond->prev= prev_cond;
    if (prev_cond) prev_cond->next= curr_cond;
    
    switch(item->type()) {
    case(Item::FIELD_ITEM): {
      Item_field *field_item= (Item_field *) item;
      Field *field= field_item->field;
      /*
	Check that the field is part of the table of the handler
	instance and that we expect a field with of this result type.
      */
      if (context->table == field->table)
      {	
	const NDBTAB *tab= (const NDBTAB *) context->ndb_table;
	DBUG_PRINT("info", ("FIELD_ITEM"));
	DBUG_PRINT("info", ("table %s", tab->getName()));
	DBUG_PRINT("info", ("column %s", field->field_name));
	
	if(context->expecting(Item::FIELD_ITEM) &&
	   context->expecting_field_result(field->result_type()))
	{
	  // Currently only support for unsigned int
	  if (field->result_type() == INT_RESULT  &&
	      (field->type() != MYSQL_TYPE_LONG ||
	       !(field->flags & UNSIGNED_FLAG)))
	    *context->supported_ptr= FALSE;  
	  else
	  {
5443 5444
	    const NDBCOL *col= tab->getColumn(field->field_name);
	    DBUG_ASSERT(col);
5445 5446 5447 5448 5449 5450 5451 5452 5453 5454 5455 5456 5457 5458 5459 5460 5461 5462 5463 5464 5465 5466 5467 5468 5469 5470 5471 5472 5473 5474 5475 5476 5477 5478 5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493 5494 5495 5496 5497 5498 5499 5500 5501 5502 5503 5504 5505 5506 5507 5508 5509 5510 5511 5512 5513 5514 5515 5516 5517 5518 5519 5520 5521 5522 5523 5524 5525 5526 5527 5528 5529 5530 5531 5532 5533 5534 5535 5536 5537 5538 5539 5540 5541 5542 5543 5544 5545 5546 5547 5548 5549 5550 5551 5552 5553 5554 5555 5556 5557 5558 5559 5560 5561 5562 5563 5564 5565 5566 5567 5568 5569 5570 5571 5572 5573 5574 5575 5576 5577 5578 5579 5580 5581 5582 5583 5584 5585 5586 5587 5588 5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622 5623 5624 5625 5626 5627 5628 5629 5630 5631 5632 5633 5634 5635 5636 5637 5638 5639 5640 5641 5642 5643 5644 5645 5646 5647 5648 5649 5650 5651 5652 5653 5654 5655 5656 5657 5658 5659 5660 5661 5662 5663 5664 5665 5666 5667 5668 5669 5670 5671 5672 5673 5674 5675 5676 5677 5678 5679 5680 5681 5682 5683 5684 5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710 5711 5712 5713 5714 5715 5716 5717 5718 5719 5720 5721 5722 5723 5724 5725 5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749 5750 5751 5752 5753 5754 5755 5756 5757 5758 5759 5760 5761 5762 5763 5764 5765 5766 5767 5768 5769 5770 5771 5772 5773 5774 5775 5776 5777 5778 5779 5780 5781 5782 5783 5784 5785 5786 5787 5788 5789 5790 5791 5792 5793 5794 5795 5796 5797 5798 5799 5800 5801 5802 5803 5804 5805 5806 5807 5808 5809 5810 5811 5812 5813 5814 5815 5816 5817 5818 5819 5820 5821 5822 5823 5824 5825 5826 5827 5828 5829 5830 5831 5832 5833 5834 5835 5836 5837 5838 5839 5840 5841 5842 5843 5844 5845 5846 5847 5848 5849 5850 5851 5852 5853 5854 5855 5856 5857 5858 5859 5860 5861 5862 5863 5864 5865 5866 5867 5868 5869 5870 5871 5872 5873 5874 5875 5876 5877 5878 5879 5880 5881 5882 5883 5884 5885 5886 5887 5888 5889 5890 5891 5892 5893 5894 5895 5896 5897 5898 5899 5900 5901 5902 5903 5904 5905 5906 5907 5908 5909 5910 5911 5912 5913 5914 5915 5916 5917 5918 5919 5920 5921 5922 5923 5924 5925 5926 5927
	    curr_cond->ndb_item= new Ndb_item(field, col->getColumnNo());
	    context->dont_expect(Item::FIELD_ITEM);
	    context->expect_no_field_result();
	    break;
	  }
	}
      }
      *context->supported_ptr= FALSE;
      break;
    }
    case(Item::FUNC_ITEM): {
      Item_func *func_item= (Item_func *) item;

      context->expect_nothing();
      switch(func_item->functype()) {
      case(Item_func::UNKNOWN_FUNC): {
	DBUG_PRINT("info", ("UNKNOWN_FUNC"));      
	DBUG_PRINT("info", ("value %d", func_item->val_int()));
	break;
      }
      case(Item_func::EQ_FUNC): {
	DBUG_PRINT("info", ("EQ_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::NE_FUNC): {
	DBUG_PRINT("info", ("NE_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::LT_FUNC): {
	DBUG_PRINT("info", ("LT_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::LE_FUNC): {
	DBUG_PRINT("info", ("LE_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::GE_FUNC): {
	DBUG_PRINT("info", ("GE_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::GT_FUNC): {
	DBUG_PRINT("info", ("GT_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(INT_RESULT);
	context->expect(Item::INT_ITEM);
	break;
      }
      case(Item_func::LIKE_FUNC): {
	DBUG_PRINT("info", ("LIKE_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::STRING_ITEM);
	*context->supported_ptr= FALSE; // Currently not supported
	break;
      }
      case(Item_func::NOTLIKE_FUNC): {
	DBUG_PRINT("info", ("NOTLIKE_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::STRING_ITEM);
	*context->supported_ptr= FALSE; // Currently not supported
	break;
      }
      case(Item_func::ISNULL_FUNC): {
	DBUG_PRINT("info", ("ISNULL_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());      
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(STRING_RESULT);
	context->expect_field_result(REAL_RESULT);
	context->expect_field_result(INT_RESULT);
	break;
      }
      case(Item_func::ISNOTNULL_FUNC): {
	DBUG_PRINT("info", ("ISNOTNULL_FUNC"));      
	curr_cond->ndb_item= new Ndb_item(func_item->functype());     
	context->expect(Item::FIELD_ITEM);
	context->expect_field_result(STRING_RESULT);
	context->expect_field_result(REAL_RESULT);
	context->expect_field_result(INT_RESULT);
	break;
      }
      default: {
	DBUG_PRINT("info", ("Found func_item of type %d", 
			    func_item->functype()));
	*context->supported_ptr= FALSE;
      }
      }
      break;
    }
    case(Item::STRING_ITEM):
      if (context->expecting(Item::STRING_ITEM)) 
      {
	char buff[256];
	String str(buff,(uint32) sizeof(buff), system_charset_info);
	str.length(0);
	Item_string *string_item= (Item_string *) item;      
	DBUG_PRINT("info", ("STRING_ITEM")); 
	DBUG_PRINT("info", ("value \"%s\"", 
			    string_item->val_str(&str)->ptr()));
	NDB_ITEM_QUALIFICATION q;
	q.value_type= Item::STRING_ITEM;
	curr_cond->ndb_item= new Ndb_item(NDB_VALUE, q, item);      
	context->dont_expect(Item::STRING_ITEM);
      }
      else
	*context->supported_ptr= FALSE;
      break;
    case(Item::INT_ITEM): 
      if (context->expecting(Item::INT_ITEM)) 
      {
	Item_int *int_item= (Item_int *) item;      
	DBUG_PRINT("info", ("INT_ITEM"));
	DBUG_PRINT("info", ("value %d", int_item->value));
	curr_cond->ndb_item= new Ndb_item(int_item->value);
	context->dont_expect(Item::INT_ITEM);
      }
      else
	*context->supported_ptr= FALSE;
      break;
    case(Item::REAL_ITEM):
      if (context->expecting(Item::REAL_ITEM)) 
      {
	Item_real *real_item= (Item_real *) item;      
	DBUG_PRINT("info", ("REAL_ITEM %s"));
	DBUG_PRINT("info", ("value %f", real_item->value));
	curr_cond->ndb_item= new Ndb_item(real_item->value);
	context->dont_expect(Item::REAL_ITEM);
	*context->supported_ptr= FALSE; // Currently not supported
      }
      else
	*context->supported_ptr= FALSE;
      break;
    case(Item::COND_ITEM): {
      Item_cond *cond_item= (Item_cond *) item;
      switch(cond_item->functype()) {
      case(Item_func::COND_AND_FUNC):
	DBUG_PRINT("info", ("COND_AND_FUNC"));
	curr_cond->ndb_item= new Ndb_item(cond_item->functype());      
	break;
      case(Item_func::COND_OR_FUNC):
	DBUG_PRINT("info", ("COND_OR_FUNC"));
	curr_cond->ndb_item= new Ndb_item(cond_item->functype());      
	break;
      default:
	DBUG_PRINT("info", ("COND_ITEM %d", cond_item->functype()));
	*context->supported_ptr= FALSE;
	break;
      }
      break;
    }
    default: {
      DBUG_PRINT("info", ("Found item of type %d", item->type()));
      *context->supported_ptr= FALSE;
    }
    }
  }

  DBUG_VOID_RETURN;
}

bool
ha_ndbcluster::serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond)
{
  DBUG_ENTER("serialize_cond");
  Item *item= (Item *) cond;
  bool supported= TRUE;
  Ndb_cond_traverse_context context(table, (void *)m_table, 
				    &supported, ndb_cond);
  item->traverse_cond(&ndb_serialize_cond, (void *) &context, Item::PREFIX);
  DBUG_PRINT("info", ("The pushed condition is %ssupported", (supported)?"":"not "));

  DBUG_RETURN(supported);
}

Ndb_cond *
ha_ndbcluster::build_scan_filter_predicate(Ndb_cond *cond, 
					   NdbScanFilter *filter)
{
  DBUG_ENTER("build_scan_filter_predicate");  
  switch(cond->ndb_item->type) {
  case(NDB_FUNCTION): {
    if (!cond->next)
      break;
    Ndb_item *a= cond->next->ndb_item;
    switch(cond->ndb_item->qualification.function_type) {
    case(Item_func::EQ_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating EQ filter"));
      filter->eq(field->getFieldNo(),
		 (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::NE_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating NE filter"));
      filter->ne(field->getFieldNo(),
		 (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::LT_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating LT filter"));
      if (a == field)
	filter->lt(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      else
	filter->gt(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::LE_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating LE filter"));
      if (a == field)
	filter->le(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      else
	filter->ge(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::GE_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating GE filter"));
      if (a == field)
	filter->ge(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      else
	filter->le(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::GT_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      DBUG_PRINT("info", ("Generating GT filter"));
      if (a == field)
	filter->gt(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      else
	filter->lt(field->getFieldNo(),
		   (Uint32) value->getIntValue());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::LIKE_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      if (value->qualification.value_type != Item::STRING_ITEM) break;
      String *str= value->getStringValue();
      DBUG_PRINT("info", ("Generating LIKE filter: like(%d,%s,%d)", field->getFieldNo(), str->ptr(), str->length()));
      filter->like(field->getFieldNo(),
		   str->ptr(), str->length(), TRUE);
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::NOTLIKE_FUNC): {
      if (!cond->next->next)
	break;
      Ndb_item *b= cond->next->next->ndb_item;
      Ndb_item *value= 
	(a->type == NDB_VALUE)? a
	: (b->type == NDB_VALUE)? b
	: NULL;
      Ndb_item *field= 
	(a->type == NDB_FIELD)? a
	: (b->type == NDB_FIELD)? b
	: NULL;
      if (!value || !field) break;
      if (value->qualification.value_type != Item::STRING_ITEM) break;
      String *str= value->getStringValue();
      DBUG_PRINT("info", ("Generating NOTLIKE filter: notlike(%d,%s,%d)", field->getFieldNo(), str->ptr(), str->length()));
      filter->notlike(field->getFieldNo(),
		      str->ptr(), str->length());
      DBUG_RETURN(cond->next->next->next);
    }
    case(Item_func::ISNULL_FUNC):
      if (a->type == NDB_FIELD) {
	DBUG_PRINT("info", ("Generating ISNULL filter"));
	filter->isnull(a->getFieldNo());
      }
      DBUG_RETURN(cond->next->next);
    case(Item_func::ISNOTNULL_FUNC): {
      if (a->type == NDB_FIELD) {
	DBUG_PRINT("info", ("Generating ISNOTNULL filter"));
	filter->isnotnull(a->getFieldNo());
      }
      DBUG_RETURN(cond->next->next);
    }
    default:
      break;
    }
    break;
  }
  default:
    break;
  }
  DBUG_PRINT("info", ("Found illegal condition"));
  DBUG_RETURN(NULL);
}

Ndb_cond *
ha_ndbcluster::build_scan_filter_group(Ndb_cond *cond, NdbScanFilter *filter)
{
  DBUG_ENTER("build_scan_filter_group");
  switch(cond->ndb_item->type) {
  case(NDB_FUNCTION):
    switch(cond->ndb_item->qualification.function_type) {
    case(Item_func::COND_AND_FUNC): {
      DBUG_PRINT("info", ("Generating AND group"));
      filter->begin(NdbScanFilter::AND);
      cond= cond->next;
      cond= build_scan_filter_group(cond, filter);
      cond= build_scan_filter_group(cond, filter);
      filter->end();
      break;
    }
    case(Item_func::COND_OR_FUNC): {
      DBUG_PRINT("info", ("Generating OR group"));
      filter->begin(NdbScanFilter::OR);
      cond= cond->next;
      cond= build_scan_filter_group(cond, filter);
      cond= build_scan_filter_group(cond, filter);
      filter->end();
      break;
    }
    default:
      cond= build_scan_filter_predicate(cond, filter);     
    }
    break;
  default: {
    DBUG_PRINT("info", ("Illegal scan filter"));
  }
  }
  
  DBUG_RETURN(cond);
}

void 
ha_ndbcluster::build_scan_filter(Ndb_cond *cond, NdbScanFilter *filter)
{
  bool simple_cond= TRUE;
  DBUG_ENTER("build_scan_filter");  

  switch(cond->ndb_item->type) {
  case(Item_func::COND_AND_FUNC):
    simple_cond= FALSE;
    break;
  case(Item_func::COND_OR_FUNC):
    simple_cond= FALSE;
    break;
  default:
    break;
  }
  if (simple_cond) filter->begin();
  build_scan_filter_group(cond, filter);
  if (simple_cond) filter->end();

  DBUG_VOID_RETURN;
}

void
ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
				    NdbScanOperation *op)
{
  DBUG_ENTER("generate_scan_filter");
  if (ndb_cond_stack)
  {
    NdbScanFilter filter(op);
    bool multiple_cond= FALSE;
    // Wrap an AND group around multiple conditions
    if (ndb_cond_stack->next) {
      multiple_cond= TRUE;
      filter.begin();
    }
    for (Ndb_cond_stack *stack= ndb_cond_stack; 
	 (stack); 
	 stack= stack->next)
      {
	build_scan_filter(stack->ndb_cond, &filter);
      }
    if (multiple_cond) filter.end();
  }
  else
  {  
    DBUG_PRINT("info", ("Empty stack"));
  }

  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
5928
#endif /* HAVE_NDBCLUSTER_DB */