/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "NdbImpl.hpp"
#include <NdbReceiver.hpp>
#include "NdbDictionaryImpl.hpp"
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <NdbConnection.hpp>

NdbReceiver::NdbReceiver(Ndb *aNdb) :
  theMagicNumber(0),
  m_ndb(aNdb),
  m_id(NdbObjectIdMap::InvalidId),
  m_type(NDB_UNINITIALIZED),
  m_owner(0)
{
  theCurrentRecAttr = theFirstRecAttr = 0;
}
 
void
NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
{
  theMagicNumber = 0x11223344;
  m_type = type;
  m_owner = owner;
  if (m_id == NdbObjectIdMap::InvalidId) {
    if (m_ndb)
      m_id = m_ndb->theNdbObjectIdMap->map(this);
  }

  theFirstRecAttr = NULL;
  theCurrentRecAttr = NULL;
  m_key_info = (keyInfo ? 1 : 0);
  m_defined_rows = 0;
}

void
NdbReceiver::release(){
  NdbRecAttr* tRecAttr = theFirstRecAttr;
  while (tRecAttr != NULL)
  {
    NdbRecAttr* tSaveRecAttr = tRecAttr;
    tRecAttr = tRecAttr->next();
    m_ndb->releaseRecAttr(tSaveRecAttr);
  }
  theFirstRecAttr = NULL;
  theCurrentRecAttr = NULL;
}
  
NdbReceiver::~NdbReceiver()
{
  if (m_id != NdbObjectIdMap::InvalidId) {
    m_ndb->theNdbObjectIdMap->unmap(m_id, this);
  }
}

NdbRecAttr *
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
  NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
  if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
    if (theFirstRecAttr == NULL)
      theFirstRecAttr = tRecAttr;
    else
      theCurrentRecAttr->next(tRecAttr);
    theCurrentRecAttr = tRecAttr;
    tRecAttr->next(NULL);
    return tRecAttr;
  }
  if(tRecAttr){
    m_ndb->releaseRecAttr(tRecAttr);
  }    
  return 0;
}

#define KEY_ATTR_ID (~0)

void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
  m_defined_rows = rows;
  m_rows = new NdbRecAttr*[rows + 1]; m_rows[rows] = 0;

  NdbColumnImpl key;
  if(key_size){
    key.m_attrId = KEY_ATTR_ID;
    key.m_arraySize = key_size+1;
    key.m_attrSize = 4;
    key.m_nullable = true; // So that receive works w.r.t KEYINFO20
  }

  for(Uint32 i = 0; i<rows; i++){
    NdbRecAttr * prev = theCurrentRecAttr;
    
    // Put key-recAttr fir on each row
    if(key_size && !getValue(&key, (char*)0)){
      abort();
      return ; // -1
    }
    
    NdbRecAttr* tRecAttr = org->theFirstRecAttr;
    while(tRecAttr != 0){
      if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0))
	tRecAttr = tRecAttr->next();
      else
	break;
    }
    
    if(tRecAttr){
      abort();
      return ;// -1;
    }

    // Store first recAttr for each row in m_rows[i]
    if(prev){
      m_rows[i] = prev->next();
    } else {
      m_rows[i] = theFirstRecAttr;
    }
  } 

  prepareSend();
  return ; //0;
}

void
NdbReceiver::copyout(NdbReceiver & dstRec){
  NdbRecAttr* src = m_rows[m_current_row++];
  NdbRecAttr* dst = dstRec.theFirstRecAttr;
  Uint32 tmp = m_key_info;
  if(tmp > 0){
    src = src->next();
  }
  
  while(dst){
    Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4;
    dst->receive_data((Uint32*)src->aRef(),  src->isNULL() ? 0 : len);
    src = src->next();
    dst = dst->next();
  }
}

int
NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
{
  bool ok = true;
  NdbRecAttr* currRecAttr = theCurrentRecAttr;
  NdbRecAttr* prevRecAttr = currRecAttr;
  
  for (Uint32 used = 0; used < aLength ; used++){
    AttributeHeader ah(* aDataPtr++);
    const Uint32 tAttrId = ah.getAttributeId();
    const Uint32 tAttrSize = ah.getDataSize();

    /**
     * Set all results to NULL if  not found...
     */
    while(currRecAttr && currRecAttr->attrId() != tAttrId){
      ok &= currRecAttr->setNULL();
      prevRecAttr = currRecAttr;
      currRecAttr = currRecAttr->next();
    }
    
    if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
      used += tAttrSize;
      aDataPtr += tAttrSize;
      prevRecAttr = currRecAttr;
      currRecAttr = currRecAttr->next();
    } else {
      ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p", 
	       this,ok, tAttrId, currRecAttr);
      abort();
      return -1;
    }
  }

  theCurrentRecAttr = currRecAttr;
  
  /**
   * Update m_received_result_length
   */
  Uint32 tmp = m_received_result_length + aLength;
  m_received_result_length = tmp;

  return (tmp == m_expected_result_length ? 1 : 0);
}

int
NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
{
  NdbRecAttr* currRecAttr = m_rows[m_current_row++];
  assert(currRecAttr->attrId() == KEY_ATTR_ID);
  currRecAttr->receive_data(aDataPtr, aLength + 1);
  
  /**
   * Save scanInfo in the end of keyinfo
   */
  ((Uint32*)currRecAttr->aRef())[aLength] = info;
  
  Uint32 tmp = m_received_result_length + aLength;
  m_received_result_length = tmp;
  
  return (tmp == m_expected_result_length ? 1 : 0);
}