ha_archive.cc 44.2 KB
Newer Older
1 2 3 4
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
  the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

unknown's avatar
unknown committed
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17 18 19
#pragma implementation        // gcc: Class implementation
#endif

20
#include "mysql_priv.h"
unknown's avatar
unknown committed
21
#include <myisam.h>
22 23

#include "ha_archive.h"
24
#include <my_dir.h>
25

unknown's avatar
unknown committed
26 27
#include <mysql/plugin.h>

28 29 30
/*
  First, if you want to understand storage engines you should look at 
  ha_example.cc and ha_example.h. 
31

32 33 34
  This example was written as a test case for a customer who needed
  a storage engine without indexes that could compress data very well.
  So, welcome to a completely compressed storage engine. This storage
35
  engine only does inserts. No replace, deletes, or updates. All reads are 
36 37
  complete table scans. Compression is done through a combination of packing
  and making use of the zlib library
38 39 40
  
  We keep a file pointer open for each instance of ha_archive for each read
  but for writes we keep one open file handle just for that. We flush it
unknown's avatar
unknown committed
41
  only if we have a read occur. azip handles compressing lots of records
42 43 44 45 46
  at once much better then doing lots of little records between writes.
  It is possible to not lock on writes but this would then mean we couldn't
  handle bulk inserts as well (that is if someone was trying to read at
  the same time since we would want to flush).

47 48 49 50 51 52 53 54 55 56
  A "meta" file is kept alongside the data file. This file serves two purpose.
  The first purpose is to track the number of rows in the table. The second 
  purpose is to determine if the table was closed properly or not. When the 
  meta file is first opened it is marked as dirty. It is opened when the table 
  itself is opened for writing. When the table is closed the new count for rows 
  is written to the meta file and the file is marked as clean. If the meta file 
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
  this point an error occurs and the user is told to rebuild the file.
  A rebuild scans the rows and rewrites the meta file. If corruption is found
  in the data file then the meta file is not repaired.
57

58
  At some point a recovery method for such a drastic case needs to be divised.
59

60
  Locks are row level, and you will get a consistant read. 
61 62 63 64 65 66 67

  For performance as far as table scans go it is quite fast. I don't have
  good numbers but locally it has out performed both Innodb and MyISAM. For
  Innodb the question will be if the table can be fit into the buffer
  pool. For MyISAM its a question of how much the file system caches the
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
  doesn't have enough memory to cache entire table that archive turns out 
68
  to be any faster. 
69

70
  Examples between MyISAM (packed) and Archive.
71 72 73 74 75 76 77 78 79 80 81

  Table with 76695844 identical rows:
  29680807 a_archive.ARZ
  920350317 a.MYD


  Table with 8991478 rows (all of Slashdot's comments):
  1922964506 comment_archive.ARZ
  2944970297 comment_text.MYD


82 83
  TODO:
   Allow users to set compression level.
84
   Allow adjustable block size.
85 86
   Implement versioning, should be easy.
   Allow for errors, find a way to mark bad rows.
87
   Add optional feature so that rows can be flushed at interval (which will cause less
88 89 90 91
     compression but may speed up ordered searches).
   Checkpoint the meta file to allow for faster rebuilds.
   Option to allow for dirty reads, this would lower the sync calls, which would make
     inserts a lot faster, but would mean highly arbitrary reads.
92 93 94 95 96 97 98 99 100

    -Brian
*/

/* Variables for archive share methods */
pthread_mutex_t archive_mutex;
static HASH archive_open_tables;

/* The file extension */
101 102
#define ARZ ".ARZ"               // The data file
#define ARN ".ARN"               // Files used during an optimize call
103
#define ARM ".ARM"               // Meta file (deprecated)
104

105 106 107 108 109
/*
  uchar + uchar
*/
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
110

111
/* Static declarations for handerton */
112 113 114
static handler *archive_create_handler(handlerton *hton, 
                                       TABLE_SHARE *table, 
                                       MEM_ROOT *mem_root);
115
int archive_discover(handlerton *hton, THD* thd, const char *db, 
116 117 118
                     const char *name,
                     uchar **frmblob, 
                     size_t *frmlen);
119

120
/*
121 122 123
  Number of rows that will force a bulk insert.
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
124

125 126 127 128 129
/*
  Size of header used for row
*/
#define ARCHIVE_ROW_HEADER_SIZE 4

130 131 132
static handler *archive_create_handler(handlerton *hton,
                                       TABLE_SHARE *table, 
                                       MEM_ROOT *mem_root)
133
{
134
  return new (mem_root) ha_archive(hton, table);
135
}
unknown's avatar
unknown committed
136

137 138 139
/*
  Used for hash table that tracks open tables.
*/
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141 142 143
                             my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
144
  return (uchar*) share->table_name;
145 146
}

147 148 149 150 151 152

/*
  Initialize the archive handler.

  SYNOPSIS
    archive_db_init()
153
    void *
154 155

  RETURN
156 157
    FALSE       OK
    TRUE        Error
158 159
*/

160
int archive_db_init(void *p)
161
{
162
  DBUG_ENTER("archive_db_init");
163
  handlerton *archive_hton;
164

165
  archive_hton= (handlerton *)p;
166 167 168 169 170
  archive_hton->state= SHOW_OPTION_YES;
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
  archive_hton->create= archive_create_handler;
  archive_hton->flags= HTON_NO_FLAGS;
  archive_hton->discover= archive_discover;
unknown's avatar
unknown committed
171

172 173
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
    goto error;
unknown's avatar
unknown committed
174 175
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
                (hash_get_key) archive_get_key, 0, 0))
176 177 178 179 180 181 182 183 184
  {
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  else
  {
    DBUG_RETURN(FALSE);
  }
error:
  DBUG_RETURN(TRUE);
185 186 187 188 189 190
}

/*
  Release the archive handler.

  SYNOPSIS
unknown's avatar
unknown committed
191
    archive_db_done()
192 193 194 195 196 197
    void

  RETURN
    FALSE       OK
*/

198
int archive_db_done(void *p)
199
{
200 201 202
  hash_free(&archive_open_tables);
  VOID(pthread_mutex_destroy(&archive_mutex));

203
  return 0;
204 205
}

unknown's avatar
unknown committed
206

207 208
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
209 210 211 212 213
{
  /* Set our original buffer from pre-allocated memory */
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);

  /* The size of the offset value we will use for position() */
214 215
  ref_length= sizeof(my_off_t);
  archive_reader_open= FALSE;
216
}
217

218
int archive_discover(handlerton *hton, THD* thd, const char *db, 
219 220 221
                     const char *name,
                     uchar **frmblob, 
                     size_t *frmlen)
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
{
  DBUG_ENTER("archive_discover");
  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name)); 
  azio_stream frm_stream;
  char az_file[FN_REFLEN];
  char *frm_ptr;
  MY_STAT file_stat; 

  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);

  if (!(my_stat(az_file, &file_stat, MYF(0))))
    goto err;

  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
  {
    if (errno == EROFS || errno == EACCES)
      DBUG_RETURN(my_errno= errno);
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
  }

  if (frm_stream.frm_length == 0)
    goto err;

  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
  azread_frm(&frm_stream, frm_ptr);
  azclose(&frm_stream);

  *frmlen= frm_stream.frm_length;
250
  *frmblob= (uchar*) frm_ptr;
251 252 253 254 255 256 257

  DBUG_RETURN(0);
err:
  my_errno= 0;
  DBUG_RETURN(1);
}

258 259 260
/*
  This method reads the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
261
int ha_archive::read_data_header(azio_stream *file_to_read)
262
{
263 264
  int error;
  unsigned long ret;
265
  uchar data_buffer[DATA_BUFFER_SIZE];
266 267
  DBUG_ENTER("ha_archive::read_data_header");

unknown's avatar
unknown committed
268
  if (azrewind(file_to_read) == -1)
269 270
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

271 272 273
  if (file_to_read->version >= 3)
    DBUG_RETURN(0);
  /* Everything below this is just legacy to version 2< */
274

275
  DBUG_PRINT("ha_archive", ("Reading legacy data header"));
276

277
  ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
278

279 280
  if (ret != DATA_BUFFER_SIZE)
  {
281
    DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu", 
282 283 284
                              DATA_BUFFER_SIZE, ret));
    DBUG_RETURN(1);
  }
285

286 287 288 289 290
  if (error)
  {
    DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
    DBUG_RETURN(1);
  }
291
  
292 293
  DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
  DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
294

295 296 297
  if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&  
      (data_buffer[1] != (uchar)ARCHIVE_VERSION))
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
298 299 300 301 302 303 304

  DBUG_RETURN(0);
}


/*
  We create the shared memory space that we will use for the open table. 
305 306 307
  No matter what we try to get or create a share. This is so that a repair
  table operation can occur. 

308
  See ha_example.cc for a longer description.
309
*/
unknown's avatar
unknown committed
310
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
311 312
{
  uint length;
313
  DBUG_ENTER("ha_archive::get_share");
314 315 316 317 318

  pthread_mutex_lock(&archive_mutex);
  length=(uint) strlen(table_name);

  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
319
                                           (uchar*) table_name,
320 321
                                           length)))
  {
322 323
    char *tmp_name;
    azio_stream archive_tmp;
324

325
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
326 327
                          &share, sizeof(*share),
                          &tmp_name, length+1,
328
                          NullS)) 
329 330
    {
      pthread_mutex_unlock(&archive_mutex);
331 332
      *rc= HA_ERR_OUT_OF_MEM;
      DBUG_RETURN(NULL);
333 334
    }

335 336 337
    share->use_count= 0;
    share->table_name_length= length;
    share->table_name= tmp_name;
338
    share->crashed= FALSE;
339
    share->archive_write_open= FALSE;
340
    fn_format(share->data_file_name, table_name, "",
341
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
342 343 344
    strmov(share->table_name, table_name);
    DBUG_PRINT("ha_archive", ("Data File %s", 
                        share->data_file_name));
345 346 347 348 349 350
    /*
      We will use this lock for rows.
    */
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
    
    /*
351 352 353 354
      We read the meta file, but do not mark it dirty. Since we are not
      doing a write we won't mark it dirty (and we won't open it for
      anything but reading... open it for write and we will generate null
      compression writes).
355
    */
356 357 358 359 360
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY)))
    {
      DBUG_RETURN(NULL);
    }
    stats.auto_increment_value= archive_tmp.auto_increment;
361
    share->rows_recorded= (ha_rows)archive_tmp.rows;
362 363 364
    share->crashed= archive_tmp.dirty;
    azclose(&archive_tmp);

365
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
366
    thr_lock_init(&share->lock);
367 368
  }
  share->use_count++;
369
  DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now", 
370 371 372 373
                      share->table_name_length, share->table_name,
                      share->use_count));
  if (share->crashed)
    *rc= HA_ERR_CRASHED_ON_USAGE;
374 375
  pthread_mutex_unlock(&archive_mutex);

376
  DBUG_RETURN(share);
377 378 379 380
}


/* 
381
  Free the share.
382 383
  See ha_example.cc for a description.
*/
unknown's avatar
unknown committed
384
int ha_archive::free_share()
385 386
{
  int rc= 0;
387
  DBUG_ENTER("ha_archive::free_share");
unknown's avatar
unknown committed
388 389 390 391
  DBUG_PRINT("ha_archive",
             ("archive table %.*s has %d open handles on entrance", 
              share->table_name_length, share->table_name,
              share->use_count));
392

393
  pthread_mutex_lock(&archive_mutex);
unknown's avatar
unknown committed
394
  if (!--share->use_count)
395
  {
396
    hash_delete(&archive_open_tables, (uchar*) share);
unknown's avatar
unknown committed
397 398
    thr_lock_delete(&share->lock);
    VOID(pthread_mutex_destroy(&share->mutex));
399 400 401 402 403 404 405
    /* 
      We need to make sure we don't reset the crashed state.
      If we open a crashed file, wee need to close it as crashed unless
      it has been repaired.
      Since we will close the data down after this, we go on and count
      the flush on close;
    */
unknown's avatar
unknown committed
406
    if (share->archive_write_open)
407
    {
unknown's avatar
unknown committed
408
      if (azclose(&(share->archive_write)))
409
        rc= 1;
410
    }
411
    my_free((uchar*) share, MYF(0));
412 413 414
  }
  pthread_mutex_unlock(&archive_mutex);

415
  DBUG_RETURN(rc);
416 417
}

418 419 420 421 422 423 424 425
int ha_archive::init_archive_writer()
{
  DBUG_ENTER("ha_archive::init_archive_writer");
  /* 
    It is expensive to open and close the data files and since you can't have
    a gzip file that can be both read and written we keep a writer open
    that is shared amoung all open tables.
  */
426
  if (!(azopen(&(share->archive_write), share->data_file_name, 
427
               O_RDWR|O_BINARY)))
428
  {
429
    DBUG_PRINT("ha_archive", ("Could not open archive write file"));
430 431 432 433 434 435 436 437
    share->crashed= TRUE;
    DBUG_RETURN(1);
  }
  share->archive_write_open= TRUE;

  DBUG_RETURN(0);
}

438

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
int ha_archive::init_archive_reader()
{
  DBUG_ENTER("ha_archive::init_archive_reader");
  /* 
    It is expensive to open and close the data files and since you can't have
    a gzip file that can be both read and written we keep a writer open
    that is shared amoung all open tables.
  */
  if (!archive_reader_open)
  {
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
    {
      DBUG_PRINT("ha_archive", ("Could not open archive read file"));
      share->crashed= TRUE;
      DBUG_RETURN(1);
    }
    archive_reader_open= TRUE;
  }

  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
462
/*
463 464
  We just implement one additional file extension.
*/
unknown's avatar
unknown committed
465 466 467 468 469
static const char *ha_archive_exts[] = {
  ARZ,
  NullS
};

470
const char **ha_archive::bas_ext() const
unknown's avatar
unknown committed
471 472 473
{
  return ha_archive_exts;
}
474 475 476 477 478 479 480 481


/* 
  When opening a file we:
  Create/get our shared structure.
  Init out lock.
  We open the file we will read from.
*/
482
int ha_archive::open(const char *name, int mode, uint open_options)
483
{
484
  int rc= 0;
485 486
  DBUG_ENTER("ha_archive::open");

487
  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", 
488
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
unknown's avatar
unknown committed
489
  share= get_share(name, &rc);
490 491 492

  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
  {
unknown's avatar
unknown committed
493 494
    /* purecov: begin inspected */
    free_share();
495
    DBUG_RETURN(rc);
unknown's avatar
unknown committed
496
    /* purecov: end */    
497 498 499 500 501 502
  }
  else if (rc == HA_ERR_OUT_OF_MEM)
  {
    DBUG_RETURN(rc);
  }

503 504
  DBUG_ASSERT(share);

505 506
  record_buffer= create_record_buffer(table->s->reclength + 
                                      ARCHIVE_ROW_HEADER_SIZE);
507 508 509

  if (!record_buffer)
  {
unknown's avatar
unknown committed
510
    free_share();
511 512 513 514
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }

  thr_lock_data_init(&share->lock, &lock, NULL);
515

516
  DBUG_PRINT("ha_archive", ("archive table was crashed %s", 
517 518 519
                      rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
  {
520
    DBUG_RETURN(0);
521 522 523
  }
  else
    DBUG_RETURN(rc);
524 525 526 527
}


/*
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
  Closes the file.

  SYNOPSIS
    close();
  
  IMPLEMENTATION:

  We first close this storage engines file handle to the archive and
  then remove our reference count to the table (and possibly free it
  as well).

  RETURN
    0  ok
    1  Error
*/

544 545
int ha_archive::close(void)
{
546
  int rc= 0;
547
  DBUG_ENTER("ha_archive::close");
548

549 550
  destroy_record_buffer(record_buffer);

551
  /* First close stream */
552 553 554 555 556
  if (archive_reader_open)
  {
    if (azclose(&archive))
      rc= 1;
  }
557
  /* then also close share */
unknown's avatar
unknown committed
558
  rc|= free_share();
559 560

  DBUG_RETURN(rc);
561 562 563 564
}


/*
565 566 567 568 569 570
  We create our data file here. The format is pretty simple. 
  You can read about the format of the data file above.
  Unlike other storage engines we do not "pack" our data. Since we 
  are about to do a general compression, packing would just be a waste of 
  CPU time. If the table has blobs they are written after the row in the order 
  of creation.
571 572 573 574
*/

int ha_archive::create(const char *name, TABLE *table_arg,
                       HA_CREATE_INFO *create_info)
575 576
{
  char name_buff[FN_REFLEN];
577
  char linkname[FN_REFLEN];
578
  int error;
579
  azio_stream create_stream;            /* Archive file we are working with */
580 581
  File frm_file;                   /* File handler for readers */
  MY_STAT file_stat;  // Stat information for the data file
582
  uchar *frm_ptr;
583

584 585
  DBUG_ENTER("ha_archive::create");

586 587
  stats.auto_increment_value= (create_info->auto_increment_value ?
                               create_info->auto_increment_value -1 :
588
                               (ulonglong) 0);
589

590 591 592 593 594 595 596 597 598 599 600 601 602
  for (uint key= 0; key < table_arg->s->keys; key++)
  {
    KEY *pos= table_arg->key_info+key;
    KEY_PART_INFO *key_part=     pos->key_part;
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;

    for (; key_part != key_part_end; key_part++)
    {
      Field *field= key_part->field;

      if (!(field->flags & AUTO_INCREMENT_FLAG))
      {
        error= -1;
603
        DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
604 605 606 607 608
        goto error;
      }
    }
  }

609 610 611
  /* 
    We reuse name_buff since it is available.
  */
612
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
613
  {
614
    DBUG_PRINT("ha_archive", ("archive will create stream file %s", 
615 616 617
                        create_info->data_file_name));
                        
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
618
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
619
    fn_format(linkname, name, "", ARZ,
620
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
621 622 623
  }
  else
  {
624 625
    fn_format(name_buff, name, "", ARZ,
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
626
    linkname[0]= 0;
627
  }
628

629 630 631 632 633
  /*
    There is a chance that the file was "discovered". In this case
    just use whatever file is there.
  */
  if (!(my_stat(name_buff, &file_stat, MYF(0))))
634
  {
635 636 637 638 639 640 641 642 643 644 645
    my_errno= 0;
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
    {
      error= errno;
      goto error2;
    }

    if (linkname[0])
      my_symlink(name_buff, linkname, MYF(0));
    fn_format(name_buff, name, "", ".frm",
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
646

647 648 649
    /*
      Here is where we open up the frm and pass it to archive to store 
    */
650 651 652 653
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
    {
      if (!my_fstat(frm_file, &file_stat, MYF(MY_WME)))
      {
654
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
655 656 657 658
        if (frm_ptr)
        {
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
659
          my_free((uchar*)frm_ptr, MYF(0));
660 661 662 663
        }
      }
      my_close(frm_file, MYF(0));
    }
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

    if (create_info->comment.str)
      azwrite_comment(&create_stream, create_info->comment.str, 
                      create_info->comment.length);

    /* 
      Yes you need to do this, because the starting value 
      for the autoincrement may not be zero.
    */
    create_stream.auto_increment= stats.auto_increment_value;
    if (azclose(&create_stream))
    {
      error= errno;
      goto error2;
    }
  }
  else
    my_errno= 0;
682 683 684

  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
685

686

687
  DBUG_RETURN(0);
688

689
error2:
690
  delete_table(name);
691
error:
692 693
  /* Return error number, if we got one */
  DBUG_RETURN(error ? error : -1);
694 695
}

696 697
/*
  This is where the actual row is written out.
698
*/
699
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
700
{
701
  my_off_t written;
702
  unsigned int r_pack_length;
703
  DBUG_ENTER("ha_archive::real_write_row");
704

705
  /* We pack the row for writing */
706 707 708
  r_pack_length= pack_row(buf);

  written= azwrite(writer, record_buffer->buffer, r_pack_length);
709 710
  if (written != r_pack_length)
  {
711 712 713
    DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", 
                                              (uint32) written, 
                                              (uint32)r_pack_length));
714 715 716
    DBUG_RETURN(-1);
  }

717
  if (!delayed_insert || !bulk_insert)
718 719
    share->dirty= TRUE;

720 721 722 723
  DBUG_RETURN(0);
}


724 725 726 727
/* 
  Calculate max length needed for row. This includes
  the bytes required for the length in the header.
*/
728

729
uint32 ha_archive::max_row_length(const uchar *buf)
730
{
731
  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
732
  length+= ARCHIVE_ROW_HEADER_SIZE;
733 734 735

  uint *ptr, *end;
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
736 737
       ptr != end ;
       ptr++)
738
  {
739
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
740
  }
741

742 743 744 745
  return length;
}


746
unsigned int ha_archive::pack_row(uchar *record)
747
{
748
  uchar *ptr;
749 750 751 752

  DBUG_ENTER("ha_archive::pack_row");


753 754
  if (fix_rec_buff(max_row_length(record)))
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
755

756
  /* Copy null bits */
757 758 759
  memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE, 
         record, table->s->null_bytes);
  ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
760 761

  for (Field **field=table->field ; *field ; field++)
762
  {
763
    if (!((*field)->is_null()))
764
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
765
  }
766

767 768 769 770 771 772 773
  int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
                                         ARCHIVE_ROW_HEADER_SIZE)); 
  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
                           (ptr - record_buffer->buffer - 
                             ARCHIVE_ROW_HEADER_SIZE)));

  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
774 775 776 777 778 779 780 781 782 783 784 785
}


/* 
  Look at ha_archive::open() for an explanation of the row format.
  Here we just write out the row.

  Wondering about start_bulk_insert()? We don't implement it for
  archive since it optimizes for lots of writes. The only save
  for implementing start_bulk_insert() is that we could skip 
  setting dirty to true each time.
*/
786
int ha_archive::write_row(uchar *buf)
787 788
{
  int rc;
789
  uchar *read_buf= NULL;
790
  ulonglong temp_auto;
791
  uchar *record=  table->record[0];
792 793 794
  DBUG_ENTER("ha_archive::write_row");

  if (share->crashed)
795
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
796

797 798 799 800
  if (!share->archive_write_open)
    if (init_archive_writer())
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

unknown's avatar
unknown committed
801
  ha_statistic_increment(&SSV::ha_write_count);
802 803 804
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  pthread_mutex_lock(&share->mutex);
805

806
  if (table->next_number_field && record == table->record[0])
807 808 809 810 811 812
  {
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
    update_auto_increment();
    temp_auto= table->next_number_field->val_int();

    /*
813 814
      We don't support decremening auto_increment. They make the performance
      just cry.
815
    */
816
    if (temp_auto <= share->archive_write.auto_increment && 
817 818 819 820 821
        mkey->flags & HA_NOSAME)
    {
      rc= HA_ERR_FOUND_DUPP_KEY;
      goto error;
    }
822
#ifdef DEAD_CODE
823 824 825 826 827 828
    /*
      Bad news, this will cause a search for the unique value which is very 
      expensive since we will have to do a table scan which will lock up 
      all other writers during this period. This could perhaps be optimized 
      in the future.
    */
829 830 831 832 833
    {
      /* 
        First we create a buffer that we can use for reading rows, and can pass
        to get_row().
      */
834
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
      {
        rc= HA_ERR_OUT_OF_MEM;
        goto error;
      }
       /* 
         All of the buffer must be written out or we won't see all of the
         data 
       */
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      /*
        Set the position of the local read thread to the beginning postion.
      */
      if (read_data_header(&archive))
      {
        rc= HA_ERR_CRASHED_ON_USAGE;
        goto error;
      }

      Field *mfield= table->next_number_field;

      while (!(get_row(&archive, read_buf)))
      {
857 858
        if (!memcmp(read_buf + mfield->offset(record),
                    table->next_number_field->ptr,
unknown's avatar
unknown committed
859
                    mfield->max_display_length()))
860 861 862 863 864 865
        {
          rc= HA_ERR_FOUND_DUPP_KEY;
          goto error;
        }
      }
    }
866
#endif
867 868
    else
    {
869
      if (temp_auto > share->archive_write.auto_increment)
870 871
        stats.auto_increment_value= share->archive_write.auto_increment= 
          temp_auto;
872 873 874 875 876 877 878
    }
  }

  /*
    Notice that the global auto_increment has been increased.
    In case of a failed row write, we will never try to reuse the value.
  */
879
  share->rows_recorded++;
880
  rc= real_write_row(buf,  &(share->archive_write));
881
error:
882
  pthread_mutex_unlock(&share->mutex);
883
  if (read_buf)
884
    my_free((uchar*) read_buf, MYF(0));
885

886
  DBUG_RETURN(rc);
887 888
}

889

890 891 892 893
void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
                                    ulonglong nb_desired_values,
                                    ulonglong *first_value,
                                    ulonglong *nb_reserved_values)
894
{
895
  *nb_reserved_values= 1;
896
  *first_value= share->archive_write.auto_increment + 1;
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
}

/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
  DBUG_ENTER("ha_archive::index_init");
  active_index= keynr;
  DBUG_RETURN(0);
}


/*
  No indexes, so if we get a request for an index search since we tell
  the optimizer that we have unique indexes, we scan
*/
912
int ha_archive::index_read(uchar *buf, const uchar *key,
913 914 915 916 917 918 919 920 921
                             uint key_len, enum ha_rkey_function find_flag)
{
  int rc;
  DBUG_ENTER("ha_archive::index_read");
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
  DBUG_RETURN(rc);
}


922
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
923 924
                                 uint key_len, enum ha_rkey_function find_flag)
{
925
  int rc;
926 927
  bool found= 0;
  KEY *mkey= &table->s->key_info[index];
928 929 930
  current_k_offset= mkey->key_part->offset;
  current_key= key;
  current_key_len= key_len;
931 932 933 934


  DBUG_ENTER("ha_archive::index_read_idx");

935
  rc= rnd_init(TRUE);
936

937
  if (rc)
938 939 940 941
    goto error;

  while (!(get_row(&archive, buf)))
  {
942
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
943 944 945 946 947 948 949 950 951 952 953 954 955
    {
      found= 1;
      break;
    }
  }

  if (found)
    DBUG_RETURN(0);

error:
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}

956

957
int ha_archive::index_next(uchar * buf) 
958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
{ 
  bool found= 0;

  DBUG_ENTER("ha_archive::index_next");

  while (!(get_row(&archive, buf)))
  {
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
    {
      found= 1;
      break;
    }
  }

  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
}

975 976 977 978 979
/*
  All calls that need to scan the table start with this method. If we are told
  that it is a table scan we rewind the file to the beginning, otherwise
  we assume the position will be set.
*/
980

981 982 983
int ha_archive::rnd_init(bool scan)
{
  DBUG_ENTER("ha_archive::rnd_init");
984 985 986
  
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
987

988 989
  init_archive_reader();

990
  /* We rewind the file so that we can read from the beginning if scan */
991
  if (scan)
992
  {
993 994
    DBUG_PRINT("info", ("archive will retrieve %llu rows", 
                        (unsigned long long) scan_rows));
995
    stats.records= 0;
996

997 998
    /* 
      If dirty, we lock, and then reset/flush the data.
unknown's avatar
unknown committed
999
      I found that just calling azflush() doesn't always work.
1000
    */
1001 1002
    pthread_mutex_lock(&share->mutex);
    scan_rows= share->rows_recorded;
1003
    if (share->dirty == TRUE)
1004
    {
1005 1006
      if (share->dirty == TRUE)
      {
1007
        DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
unknown's avatar
unknown committed
1008
        azflush(&(share->archive_write), Z_SYNC_FLUSH);
1009 1010
        share->dirty= FALSE;
      }
1011
    }
1012
    pthread_mutex_unlock(&share->mutex);
1013

unknown's avatar
unknown committed
1014
    if (read_data_header(&archive))
1015
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1016 1017
  }

1018 1019 1020 1021 1022 1023 1024 1025
  DBUG_RETURN(0);
}


/*
  This is the method that is used to read a row. It assumes that the row is 
  positioned where you want it.
*/
1026
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1027
{
1028 1029
  int rc;
  DBUG_ENTER("ha_archive::get_row");
1030
  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", 
1031 1032 1033
                            (uchar)file_to_read->version, 
                            ARCHIVE_VERSION));
  if (file_to_read->version == ARCHIVE_VERSION)
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    rc= get_row_version3(file_to_read, buf);
  else
    rc= get_row_version2(file_to_read, buf);

  DBUG_PRINT("ha_archive", ("Return %d\n", rc));

  DBUG_RETURN(rc);
}

/* Reallocate buffer if needed */
1044
bool ha_archive::fix_rec_buff(unsigned int length)
1045
{
1046 1047 1048 1049 1050
  DBUG_ENTER("ha_archive::fix_rec_buff");
  DBUG_PRINT("ha_archive", ("Fixing %u for %u", 
                            length, record_buffer->length));
  DBUG_ASSERT(record_buffer->buffer);

unknown's avatar
unknown committed
1051
  if (length > record_buffer->length)
1052
  {
1053 1054
    uchar *newptr;
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
1055
                                    length,
1056
				    MYF(MY_ALLOW_ZERO_PTR))))
1057
      DBUG_RETURN(1);
1058 1059 1060
    record_buffer->buffer= newptr;
    record_buffer->length= length;
  }
1061 1062 1063 1064

  DBUG_ASSERT(length <= record_buffer->length);

  DBUG_RETURN(0);
1065 1066
}

1067
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1068 1069 1070
{
  DBUG_ENTER("ha_archive::unpack_row");

1071
  unsigned int read;
1072
  int error;
1073
  uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1074
  unsigned int row_len;
1075 1076

  /* First we grab the length stored */
1077
  read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1078

1079
  if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1080 1081 1082 1083 1084 1085
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  /* If we read nothing we are at the end of the file */
  if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

1086 1087 1088
  row_len=  uint4korr(size_buffer);
  DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len, 
                           (unsigned int)table->s->reclength));
1089
  fix_rec_buff(row_len);
1090
  DBUG_ASSERT(row_len <= record_buffer->length);
1091

1092 1093
  read= azread(file_to_read, record_buffer->buffer, row_len, &error);

1094 1095
  DBUG_ASSERT(row_len == read);

1096 1097
  if (read != row_len || error)
  {
1098
    DBUG_RETURN(-1);
1099
  }
1100 1101

  /* Copy null bits */
1102
  const uchar *ptr= record_buffer->buffer;
1103 1104 1105
  memcpy(record, ptr, table->s->null_bytes);
  ptr+= table->s->null_bytes;
  for (Field **field=table->field ; *field ; field++)
1106
  {
1107 1108
    if (!((*field)->is_null()))
    {
1109
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1110
    }
1111
  }
1112 1113 1114 1115
  DBUG_RETURN(0);
}


1116
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1117 1118
{
  DBUG_ENTER("ha_archive::get_row_version3");
unknown's avatar
unknown committed
1119

1120
  int returnable= unpack_row(file_to_read, buf);
unknown's avatar
unknown committed
1121

1122 1123 1124 1125
  DBUG_RETURN(returnable);
}


1126
int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1127
{
1128
  unsigned int read;
1129
  int error;
1130
  uint *ptr, *end;
1131 1132
  char *last;
  size_t total_blob_length= 0;
1133
  MY_BITMAP *read_set= table->read_set;
1134
  DBUG_ENTER("ha_archive::get_row_version2");
1135

1136
  read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1137

1138 1139 1140 1141
  /* If we read nothing we are at the end of the file */
  if (read == 0)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

1142 1143
  if (read != table->s->reclength)
  {
1144
    DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u", 
1145
                                                read, 
1146
                                                (unsigned int)table->s->reclength));
1147 1148
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
  }
1149

1150
  if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1151
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1152

1153 1154 1155
  /* 
    If the record is the wrong size, the file is probably damaged, unless 
    we are dealing with a delayed insert or a bulk insert.
1156
  */
1157
  if ((ulong) read != table->s->reclength)
1158
    DBUG_RETURN(HA_ERR_END_OF_FILE);
1159 1160

  /* Calculate blob length, we use this for our buffer */
1161 1162 1163
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1164
  {
1165 1166 1167
    if (bitmap_is_set(read_set,
                      (((Field_blob*) table->field[*ptr])->field_index)))
        total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1168
  }
1169 1170 1171

  /* Adjust our row buffer if we need be */
  buffer.alloc(total_blob_length);
1172
  last= (char *)buffer.ptr();
1173

1174
  /* Loop through our blobs and read them */
1175 1176 1177
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1178
  {
1179
    size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1180 1181
    if (size)
    {
1182 1183
      if (bitmap_is_set(read_set,
                        ((Field_blob*) table->field[*ptr])->field_index))
1184
      {
1185 1186 1187 1188 1189
        read= azread(file_to_read, last, size, &error);

        if (error)
          DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1190 1191
        if ((size_t) read != size)
          DBUG_RETURN(HA_ERR_END_OF_FILE);
1192
        ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1193 1194 1195 1196 1197 1198
        last += size;
      }
      else
      {
        (void)azseek(file_to_read, size, SEEK_CUR);
      }
1199
    }
1200 1201 1202 1203
  }
  DBUG_RETURN(0);
}

1204

1205 1206 1207 1208
/* 
  Called during ORDER BY. Its position is either from being called sequentially
  or by having had ha_archive::rnd_pos() called before it is called.
*/
1209

1210
int ha_archive::rnd_next(uchar *buf)
1211 1212
{
  int rc;
1213
  DBUG_ENTER("ha_archive::rnd_next");
1214

1215 1216 1217
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1218 1219 1220 1221
  if (!scan_rows)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  scan_rows--;

unknown's avatar
unknown committed
1222
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
unknown's avatar
unknown committed
1223 1224
  current_position= aztell(&archive);
  rc= get_row(&archive, buf);
1225 1226


1227
  if (rc != HA_ERR_END_OF_FILE)
1228
    stats.records++;
1229 1230 1231 1232 1233

  DBUG_RETURN(rc);
}


1234
/*
1235 1236 1237 1238
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
  each call to ha_archive::rnd_next() if an ordering of the rows is
  needed.
*/
1239

1240
void ha_archive::position(const uchar *record)
1241 1242
{
  DBUG_ENTER("ha_archive::position");
1243
  my_store_ptr(ref, ref_length, current_position);
1244 1245 1246 1247 1248
  DBUG_VOID_RETURN;
}


/*
1249 1250 1251 1252
  This is called after a table scan for each row if the results of the
  scan need to be ordered. It will take *pos and use it to move the
  cursor in the file so that the next row that is called is the
  correctly ordered row.
1253
*/
1254

1255
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1256 1257
{
  DBUG_ENTER("ha_archive::rnd_pos");
unknown's avatar
unknown committed
1258
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1259
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
unknown's avatar
unknown committed
1260
  (void)azseek(&archive, current_position, SEEK_SET);
1261

unknown's avatar
unknown committed
1262
  DBUG_RETURN(get_row(&archive, buf));
1263 1264 1265
}

/*
1266
  This method repairs the meta file. It does this by walking the datafile and 
1267 1268
  rewriting the meta file. Currently it does this by calling optimize with
  the extended flag.
1269
*/
1270
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1271
{
1272
  DBUG_ENTER("ha_archive::repair");
1273 1274
  check_opt->flags= T_EXTEND;
  int rc= optimize(thd, check_opt);
1275

1276 1277
  if (rc)
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1278

1279
  share->crashed= FALSE;
1280
  DBUG_RETURN(0);
1281 1282
}

1283 1284 1285
/*
  The table can become fragmented if data was inserted, read, and then
  inserted again. What we do is open up the file and recompress it completely. 
1286
*/
1287 1288 1289
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  DBUG_ENTER("ha_archive::optimize");
1290
  int rc= 0;
unknown's avatar
unknown committed
1291
  azio_stream writer;
1292 1293
  char writer_filename[FN_REFLEN];

1294 1295
  init_archive_reader();

1296
  // now we close both our writer and our reader for the rename
1297
  if (share->archive_write_open)
1298 1299
  {
    azclose(&(share->archive_write));
1300
    share->archive_write_open= FALSE;
1301
  }
1302

1303
  /* Lets create a file to contain the new data */
1304
  fn_format(writer_filename, share->table_name, "", ARN, 
1305
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1306

1307
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1308 1309 1310 1311 1312 1313
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 

  /* 
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
    Any dead rows are removed (aka rows that may have been partially recorded). 

1314 1315 1316 1317
    As of Archive format 3, this is the only type that is performed, before this
    version it was just done on T_EXTEND
  */
  if (1)
1318
  {
1319 1320
    DBUG_PRINT("ha_archive", ("archive extended rebuild"));

1321 1322 1323 1324
    /*
      Now we will rewind the archive file so that we are positioned at the 
      start of the file.
    */
unknown's avatar
unknown committed
1325
    rc= read_data_header(&archive);
1326 1327 1328 1329 1330 1331

    /* 
      On success of writing out the new header, we now fetch each row and
      insert it into the new archive file. 
    */
    if (!rc)
1332 1333
    {
      share->rows_recorded= 0;
1334
      stats.auto_increment_value= share->archive_write.auto_increment= 0;
1335
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1336

1337
      while (!(rc= get_row(&archive, table->record[0])))
1338
      {
1339
        real_write_row(table->record[0], &writer);
1340 1341 1342 1343
        /*
          Long term it should be possible to optimize this so that
          it is not called on each row.
        */
1344 1345 1346
        if (table->found_next_number_field)
        {
          Field *field= table->found_next_number_field;
1347
          ulonglong auto_value=
1348 1349
            (ulonglong) field->val_int(table->record[0] +
                                       field->offset(table->record[0]));
1350 1351
          if (share->archive_write.auto_increment < auto_value)
            stats.auto_increment_value= share->archive_write.auto_increment=
1352
              auto_value;
1353
        }
1354
      }
1355

1356
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
unknown's avatar
unknown committed
1357
      share->rows_recorded= (ha_rows)writer.rows;
1358
    }
1359

1360 1361
    DBUG_PRINT("info", ("recovered %llu archive rows", 
                        (unsigned long long)share->rows_recorded));
1362 1363

    DBUG_PRINT("ha_archive", ("recovered %llu archive rows", 
1364
                        (unsigned long long)share->rows_recorded));
1365 1366 1367 1368 1369

    if (rc && rc != HA_ERR_END_OF_FILE)
      goto error;
  } 

unknown's avatar
unknown committed
1370
  azclose(&writer);
1371
  share->dirty= FALSE;
1372 1373 1374 1375 1376 1377
  
  azclose(&archive);

  // make the file we just wrote be our data file
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));

1378

1379
  DBUG_RETURN(rc);
1380
error:
1381
  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
unknown's avatar
unknown committed
1382
  azclose(&writer);
1383 1384 1385

  DBUG_RETURN(rc); 
}
1386 1387 1388 1389 1390 1391 1392 1393

/* 
  Below is an example of how to setup row level locking.
*/
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
                                       THR_LOCK_DATA **to,
                                       enum thr_lock_type lock_type)
{
1394 1395 1396 1397 1398
  if (lock_type == TL_WRITE_DELAYED)
    delayed_insert= TRUE;
  else
    delayed_insert= FALSE;

1399 1400
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
  {
1401 1402 1403 1404 1405 1406 1407 1408
    /* 
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set 
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers 
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
unknown's avatar
unknown committed
1409 1410
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
        && !thd_tablespace_op(thd))
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
      lock_type = TL_WRITE_ALLOW_WRITE;

    /* 
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2. 
    */

unknown's avatar
unknown committed
1421
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
      lock_type = TL_READ;

    lock.type=lock_type;
  }

  *to++= &lock;

  return to;
}

1432 1433
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
{
1434 1435 1436
  DBUG_ENTER("ha_archive::update_create_info");

  ha_archive::info(HA_STATUS_AUTO);
1437
  if (create_info->used_fields & HA_CREATE_USED_AUTO)
1438
  {
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
    /* 
      Internally Archive keeps track of last used, not next used.
      To make the output look like MyISAM we add 1 here.

      This is not completely compatible with MYISAM though, since
      MyISAM will record on "SHOW CREATE TABLE" the last position,
      where we will report the original position the table was
      created with.
    */
    create_info->auto_increment_value= stats.auto_increment_value + 1;
1449
  }
1450

unknown's avatar
unknown committed
1451
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
1452
    create_info->data_file_name= share->real_path;
1453 1454

  DBUG_VOID_RETURN;
1455 1456
}

1457 1458 1459 1460

/*
  Hints for optimizer, see ha_tina for more information
*/
1461
int ha_archive::info(uint flag)
1462 1463
{
  DBUG_ENTER("ha_archive::info");
1464 1465 1466 1467
  /* 
    This should be an accurate number now, though bulk and delayed inserts can
    cause the number to be inaccurate.
  */
1468 1469
  stats.records= share->rows_recorded;
  stats.deleted= 0;
1470 1471 1472 1473 1474 1475 1476
  /* Costs quite a bit more to get all information */
  if (flag & HA_STATUS_TIME)
  {
    MY_STAT file_stat;  // Stat information for the data file

    VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));

1477 1478 1479 1480 1481
    stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
    stats.data_file_length= file_stat.st_size;
    stats.create_time= file_stat.st_ctime;
    stats.update_time= file_stat.st_mtime;
    stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1482
  }
1483 1484
  stats.delete_length= 0;
  stats.index_file_length=0;
1485

1486
  if (flag & HA_STATUS_AUTO)
1487
  {
1488
    init_archive_reader();
1489 1490 1491
    azflush(&archive, Z_SYNC_FLUSH);
    stats.auto_increment_value= archive.auto_increment;
  }
1492

1493
  DBUG_RETURN(0);
1494
}
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504


/*
  This method tells us that a bulk insert operation is about to occur. We set
  a flag which will keep write_row from saying that its data is dirty. This in
  turn will keep selects from causing a sync to occur.
  Basically, yet another optimizations to keep compression working well.
*/
void ha_archive::start_bulk_insert(ha_rows rows)
{
1505
  DBUG_ENTER("ha_archive::start_bulk_insert");
1506 1507
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
    bulk_insert= TRUE;
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
  DBUG_VOID_RETURN;
}


/* 
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
  flag, and set the share dirty so that the next select will call sync for us.
*/
int ha_archive::end_bulk_insert()
{
1518
  DBUG_ENTER("ha_archive::end_bulk_insert");
1519 1520 1521 1522
  bulk_insert= FALSE;
  share->dirty= TRUE;
  DBUG_RETURN(0);
}
1523 1524 1525 1526 1527 1528 1529 1530 1531

/*
  We cancel a truncate command. The only way to delete an archive table is to drop it.
  This is done for security reasons. In a later version we will enable this by 
  allowing the user to select a different row format.
*/
int ha_archive::delete_all_rows()
{
  DBUG_ENTER("ha_archive::delete_all_rows");
unknown's avatar
unknown committed
1532
  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1533
}
1534 1535 1536 1537 1538 1539

/*
  We just return state if asked.
*/
bool ha_archive::is_crashed() const 
{
1540 1541
  DBUG_ENTER("ha_archive::is_crashed");
  DBUG_RETURN(share->crashed); 
1542 1543 1544 1545 1546 1547 1548 1549 1550
}

/*
  Simple scan of the tables to make sure everything is ok.
*/

int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
{
  int rc= 0;
unknown's avatar
unknown committed
1551
  const char *old_proc_info;
1552 1553 1554
  ha_rows count= share->rows_recorded;
  DBUG_ENTER("ha_archive::check");

unknown's avatar
unknown committed
1555
  old_proc_info= thd_proc_info(thd, "Checking table");
1556
  /* Flush any waiting data */
unknown's avatar
unknown committed
1557
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1558 1559 1560 1561 1562

  /*
    Now we will rewind the archive file so that we are positioned at the 
    start of the file.
  */
1563
  init_archive_reader();
1564 1565 1566
  read_data_header(&archive);
  while (!(rc= get_row(&archive, table->record[0])))
    count--;
1567

unknown's avatar
unknown committed
1568
  thd_proc_info(thd, old_proc_info);
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590

  if ((rc && rc != HA_ERR_END_OF_FILE) || count)  
  {
    share->crashed= FALSE;
    DBUG_RETURN(HA_ADMIN_CORRUPT);
  }
  else
  {
    DBUG_RETURN(HA_ADMIN_OK);
  }
}

/*
  Check and repair the table if needed.
*/
bool ha_archive::check_and_repair(THD *thd) 
{
  HA_CHECK_OPT check_opt;
  DBUG_ENTER("ha_archive::check_and_repair");

  check_opt.init();

1591
  DBUG_RETURN(repair(thd, &check_opt));
1592
}
unknown's avatar
unknown committed
1593

1594
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
{
  DBUG_ENTER("ha_archive::create_record_buffer");
  archive_record_buffer *r;
  if (!(r= 
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
                                           MYF(MY_WME))))
  {
    DBUG_RETURN(NULL); /* purecov: inspected */
  }
  r->length= (int)length;

1606
  if (!(r->buffer= (uchar*) my_malloc(r->length,
1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
                                    MYF(MY_WME))))
  {
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(NULL); /* purecov: inspected */
  }

  DBUG_RETURN(r);
}

void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
{
  DBUG_ENTER("ha_archive::destroy_record_buffer");
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
1624
struct st_mysql_storage_engine archive_storage_engine=
1625
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
unknown's avatar
unknown committed
1626 1627 1628 1629

mysql_declare_plugin(archive)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
unknown's avatar
unknown committed
1630 1631
  &archive_storage_engine,
  "ARCHIVE",
unknown's avatar
unknown committed
1632
  "Brian Aker, MySQL AB",
unknown's avatar
unknown committed
1633
  "Archive storage engine",
1634
  PLUGIN_LICENSE_GPL,
unknown's avatar
unknown committed
1635
  archive_db_init, /* Plugin Init */
unknown's avatar
unknown committed
1636
  archive_db_done, /* Plugin Deinit */
1637
  0x0300 /* 3.0 */,
1638 1639 1640
  NULL,                       /* status variables                */
  NULL,                       /* system variables                */
  NULL                        /* config options                  */
unknown's avatar
unknown committed
1641 1642
}
mysql_declare_plugin_end;
unknown's avatar
unknown committed
1643