sort.c 24.2 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
12

unknown's avatar
unknown committed
13 14 15 16 17 18 19 20 21
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*
  Creates a index for a database by reading keys, sorting them and outputing
  them in sorted order through SORT_INFO functions.
*/

22
#include "fulltext.h"
unknown's avatar
unknown committed
23 24 25 26 27 28 29
#if defined(MSDOS) || defined(__WIN__)
#include <fcntl.h>
#else
#include <stddef.h>
#endif
#include <queues.h>

unknown's avatar
unknown committed
30 31
/* static variables */

unknown's avatar
unknown committed
32 33 34
#undef MIN_SORT_MEMORY
#undef MYF_RW
#undef DISK_BUFFER_SIZE
unknown's avatar
unknown committed
35 36 37 38

#define MERGEBUFF 15
#define MERGEBUFF2 31
#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
unknown's avatar
unknown committed
39
#define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
40
#define DISK_BUFFER_SIZE (IO_SIZE*16)
unknown's avatar
unknown committed
41 42

typedef struct st_buffpek {
unknown's avatar
unknown committed
43 44 45 46 47
  my_off_t file_pos;                    /* Where we are in the sort file */
  uchar *base,*key;                     /* Key pointers */
  ha_rows count;                        /* Number of rows in table */
  ulong mem_count;                      /* numbers of keys in memory */
  ulong max_keys;                       /* Max keys in buffert */
unknown's avatar
unknown committed
48 49 50 51
} BUFFPEK;

extern void print_error _VARARGS((const char *fmt,...));

unknown's avatar
unknown committed
52
/* Functions defined in this file */
unknown's avatar
unknown committed
53 54

static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info,uint keys,
unknown's avatar
unknown committed
55 56 57 58
                                    uchar **sort_keys,
                                    DYNAMIC_ARRAY *buffpek,int *maxbuffer,
                                    IO_CACHE *tempfile,
                                    IO_CACHE *tempfile_for_exceptions);
unknown's avatar
unknown committed
59
static int NEAR_F write_keys(MI_SORT_PARAM *info,uchar * *sort_keys,
unknown's avatar
unknown committed
60
                             uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
unknown's avatar
unknown committed
61 62
static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key,
			    IO_CACHE *tempfile);
unknown's avatar
unknown committed
63
static int NEAR_F write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
unknown's avatar
unknown committed
64
                              uint count);
unknown's avatar
unknown committed
65
static int NEAR_F merge_many_buff(MI_SORT_PARAM *info,uint keys,
unknown's avatar
unknown committed
66 67 68
                                  uchar * *sort_keys,
                                  BUFFPEK *buffpek,int *maxbuffer,
                                  IO_CACHE *t_file);
69
static uint NEAR_F read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
unknown's avatar
unknown committed
70
                                  uint sort_length);
71
static int NEAR_F merge_buffers(MI_SORT_PARAM *info,uint keys,
unknown's avatar
unknown committed
72 73 74
                                IO_CACHE *from_file, IO_CACHE *to_file,
                                uchar * *sort_keys, BUFFPEK *lastbuff,
                                BUFFPEK *Fb, BUFFPEK *Tb);
unknown's avatar
unknown committed
75
static int NEAR_F merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
unknown's avatar
unknown committed
76
                              IO_CACHE *);
unknown's avatar
unknown committed
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91


/*
  Creates a index of sorted keys

  SYNOPSIS
    _create_index_by_sort()
    info		Sort parameters
    no_messages		Set to 1 if no output
    sortbuff_size	Size if sortbuffer to allocate

  RESULT
    0	ok
   <> 0 Error
*/
unknown's avatar
unknown committed
92 93 94 95 96 97

int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
			  ulong sortbuff_size)
{
  int error,maxbuffer,skr;
  uint memavl,old_memavl,keys,sort_length;
98
  DYNAMIC_ARRAY buffpek;
unknown's avatar
unknown committed
99 100
  ha_rows records;
  uchar **sort_keys;
101
  IO_CACHE tempfile, tempfile_for_exceptions;
unknown's avatar
unknown committed
102 103 104
  DBUG_ENTER("_create_index_by_sort");
  DBUG_PRINT("enter",("sort_length: %d", info->key_length));

105
  my_b_clear(&tempfile);
106 107 108
  my_b_clear(&tempfile_for_exceptions);
  bzero((char*) &buffpek,sizeof(buffpek));
  sort_keys= (uchar **) NULL; error= 1;
unknown's avatar
unknown committed
109 110 111
  maxbuffer=1;

  memavl=max(sortbuff_size,MIN_SORT_MEMORY);
unknown's avatar
unknown committed
112
  records=	info->sort_info->max_records;
unknown's avatar
unknown committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  sort_length=	info->key_length;
  LINT_INIT(keys);

  while (memavl >= MIN_SORT_MEMORY)
  {
    if ((my_off_t) (records+1)*(sort_length+sizeof(char*)) <=
	(my_off_t) memavl)
      keys= records+1;
    else
      do
      {
	skr=maxbuffer;
	if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
	    (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
	     (sort_length+sizeof(char*))) <= 1)
	{
	  mi_check_print_error(info->sort_info->param,
			       "sort_buffer_size is to small");
	  goto err;
	}
      }
      while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);

136 137
    if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
				       HA_FT_MAXLEN, MYF(0))))
unknown's avatar
unknown committed
138
    {
139
      if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
140
			     maxbuffer/2))
unknown's avatar
unknown committed
141
	my_free((gptr) sort_keys,MYF(0));
142 143
      else
	break;
unknown's avatar
unknown committed
144 145 146 147 148 149 150
    }
    old_memavl=memavl;
    if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
      memavl=MIN_SORT_MEMORY;
  }
  if (memavl < MIN_SORT_MEMORY)
  {
151 152
    mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
    goto err; /* purecov: tested */
unknown's avatar
unknown committed
153 154 155 156 157 158
  }
  (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */

  if (!no_messages)
    printf("  - Searching for keys, allocating buffer for %d keys\n",keys);

159 160
  if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
                                  &tempfile,&tempfile_for_exceptions))
unknown's avatar
unknown committed
161
      == HA_POS_ERROR)
162
    goto err; /* purecov: tested */
unknown's avatar
unknown committed
163 164 165 166 167
  if (maxbuffer == 0)
  {
    if (!no_messages)
      printf("  - Dumping %lu keys\n",records);
    if (write_index(info,sort_keys,(uint) records))
168
      goto err; /* purecov: inspected */
unknown's avatar
unknown committed
169 170 171 172 173 174 175
  }
  else
  {
    keys=(keys*(sort_length+sizeof(char*)))/sort_length;
    if (maxbuffer >= MERGEBUFF2)
    {
      if (!no_messages)
176
	printf("  - Merging %lu keys\n",records); /* purecov: tested */
177 178
      if (merge_many_buff(info,keys,sort_keys,
                  dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
unknown's avatar
unknown committed
179
	goto err;				/* purecov: inspected */
unknown's avatar
unknown committed
180
    }
181 182
    if (flush_io_cache(&tempfile) ||
	reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
unknown's avatar
unknown committed
183
      goto err;					/* purecov: inspected */
unknown's avatar
unknown committed
184
    if (!no_messages)
185
      puts("  - Last merge and dumping keys"); /* purecov: tested */
186 187
    if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
                    maxbuffer,&tempfile))
unknown's avatar
unknown committed
188
      goto err;					/* purecov: inspected */
unknown's avatar
unknown committed
189
  }
190

unknown's avatar
unknown committed
191
  if (flush_pending_blocks(info))
192 193 194 195 196
    goto err;

  if (my_b_inited(&tempfile_for_exceptions))
  {
    MI_INFO *index=info->sort_info->info;
unknown's avatar
unknown committed
197
    uint     keyno=info->key;
198 199 200 201 202 203
    uint     key_length, ref_length=index->s->rec_reflength;

    if (flush_io_cache(&tempfile_for_exceptions) ||
	reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
      goto err;

unknown's avatar
unknown committed
204 205 206 207
    while (!my_b_read(&tempfile_for_exceptions,(byte*)&key_length,
		      sizeof(key_length))
        && !my_b_read(&tempfile_for_exceptions,(byte*)sort_keys,
		      (uint) key_length))
208
    {
unknown's avatar
unknown committed
209 210
	if (_mi_ck_write(index,keyno,(uchar*) sort_keys,key_length-ref_length))
	  goto err;
211 212 213
    }
  }

unknown's avatar
unknown committed
214 215 216 217 218
  error =0;

err:
  if (sort_keys)
    my_free((gptr) sort_keys,MYF(0));
219
  delete_dynamic(&buffpek);
220
  close_cached_file(&tempfile);
221
  close_cached_file(&tempfile_for_exceptions);
unknown's avatar
unknown committed
222 223 224 225 226

  DBUG_RETURN(error ? -1 : 0);
} /* _create_index_by_sort */


unknown's avatar
unknown committed
227
/* Search after all keys and place them in a temp. file */
unknown's avatar
unknown committed
228 229

static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys,
230 231 232
				    uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
				    int *maxbuffer, IO_CACHE *tempfile,
				    IO_CACHE *tempfile_for_exceptions)
unknown's avatar
unknown committed
233 234
{
  int error;
235
  uint idx;
unknown's avatar
unknown committed
236 237
  DBUG_ENTER("find_all_keys");

238
  idx=error=0;
unknown's avatar
unknown committed
239
  sort_keys[0]=(uchar*) (sort_keys+keys);
unknown's avatar
unknown committed
240

unknown's avatar
unknown committed
241
  while (!(error=(*info->key_read)(info,sort_keys[idx])))
unknown's avatar
unknown committed
242
  {
unknown's avatar
unknown committed
243
    if (info->real_key_length > info->key_length)
unknown's avatar
unknown committed
244
    {
245
      if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
unknown's avatar
unknown committed
246
        DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
247 248 249 250 251
      continue;
    }

    if (++idx == keys)
    {
unknown's avatar
unknown committed
252 253 254
      if (write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
		     tempfile))
      DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
255

unknown's avatar
unknown committed
256
      sort_keys[0]=(uchar*) (sort_keys+keys);
unknown's avatar
unknown committed
257
      memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
258
      idx=1;
unknown's avatar
unknown committed
259
    }
260
    sort_keys[idx]=sort_keys[idx-1]+info->key_length;
unknown's avatar
unknown committed
261 262
  }
  if (error > 0)
263
    DBUG_RETURN(HA_POS_ERROR);		/* Aborted by get_key */ /* purecov: inspected */
264 265
  if (buffpek->elements)
  {
unknown's avatar
unknown committed
266 267
    if (write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
		   tempfile))
unknown's avatar
unknown committed
268
      DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
269 270 271 272
    *maxbuffer=buffpek->elements-1;
  }
  else
    *maxbuffer=0;
unknown's avatar
unknown committed
273

274 275
  DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
} /* find_all_keys */
unknown's avatar
unknown committed
276

unknown's avatar
unknown committed
277 278

/* Search after all keys and place them in a temp. file */
unknown's avatar
unknown committed
279

280
pthread_handler_decl(thr_find_all_keys,arg)
unknown's avatar
unknown committed
281
{
282
  MI_SORT_PARAM *info= (MI_SORT_PARAM*) arg;
unknown's avatar
unknown committed
283
  int error;
unknown's avatar
unknown committed
284 285 286 287 288 289 290 291
  uint memavl,old_memavl,keys,sort_length;
  uint idx, maxbuffer;
  uchar **sort_keys;

  my_b_clear(&info->tempfile);
  my_b_clear(&info->tempfile_for_exceptions);
  bzero((char*) &info->buffpek,sizeof(info->buffpek));
  bzero((char*) &info->unique, sizeof(info->unique));
unknown's avatar
unknown committed
292 293
  sort_keys= (uchar **) NULL;
  error= 1;
unknown's avatar
unknown committed
294 295 296 297 298 299 300
  if (info->sort_info->got_error)
    goto err;

  memavl=max(info->sortbuff_size, MIN_SORT_MEMORY);
  idx=      info->sort_info->max_records;
  sort_length=  info->key_length;

unknown's avatar
unknown committed
301
  maxbuffer=1;
unknown's avatar
unknown committed
302 303 304 305 306 307
  while (memavl >= MIN_SORT_MEMORY)
  {
    if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
        (my_off_t) memavl)
      keys= idx+1;
    else
unknown's avatar
unknown committed
308 309
    {
      uint skr;
unknown's avatar
unknown committed
310 311 312 313 314 315 316 317 318 319 320 321 322
      do
      {
        skr=maxbuffer;
        if (memavl < sizeof(BUFFPEK)*maxbuffer ||
            (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
             (sort_length+sizeof(char*))) <= 1)
        {
          mi_check_print_error(info->sort_info->param,
                               "sort_buffer_size is to small");
          goto err;
        }
      }
      while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
unknown's avatar
unknown committed
323
    }
unknown's avatar
unknown committed
324
    if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
unknown's avatar
unknown committed
325 326
				       ((info->keyinfo->flag & HA_FULLTEXT) ?
					HA_FT_MAXLEN : 0), MYF(0))))
unknown's avatar
unknown committed
327 328
    {
      if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK),
unknown's avatar
unknown committed
329
				maxbuffer, maxbuffer/2))
unknown's avatar
unknown committed
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
        my_free((gptr) sort_keys,MYF(0));
      else
        break;
    }
    old_memavl=memavl;
    if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
      memavl=MIN_SORT_MEMORY;
  }
  if (memavl < MIN_SORT_MEMORY)
  {
    mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
    goto err; /* purecov: tested */
  }

  if (info->sort_info->param->testflag & T_VERBOSE)
    printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys);
  info->sort_keys=sort_keys;

  idx=error=0;
  sort_keys[0]=(uchar*) (sort_keys+keys);

unknown's avatar
unknown committed
351 352
  while (!(error=info->sort_info->got_error) ||
	 !(error=(*info->key_read)(info,sort_keys[idx])))
unknown's avatar
unknown committed
353 354 355
  {
    if (info->real_key_length > info->key_length)
    {
unknown's avatar
unknown committed
356
      if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions))
unknown's avatar
unknown committed
357 358 359 360 361 362 363
        goto err;
      continue;
    }

    if (++idx == keys)
    {
      if (write_keys(info,sort_keys,idx-1,
unknown's avatar
unknown committed
364 365
		     (BUFFPEK *)alloc_dynamic(&info->buffpek),
		     &info->tempfile))
unknown's avatar
unknown committed
366 367 368 369 370 371 372 373 374 375 376 377
        goto err;

      sort_keys[0]=(uchar*) (sort_keys+keys);
      memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
      idx=1;
    }
    sort_keys[idx]=sort_keys[idx-1]+info->key_length;
  }
  if (error > 0)
    goto err;
  if (info->buffpek.elements)
  {
unknown's avatar
unknown committed
378 379
    if (write_keys(info,sort_keys, idx,
		   (BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile))
unknown's avatar
unknown committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      goto err;
    info->keys=(info->buffpek.elements-1)*(keys-1)+idx;
  }
  else
    info->keys=idx;

  info->sort_keys_length=keys;
  goto ok;

err:
  info->sort_info->got_error=1; /* no need to protect this with a mutex */
  if (sort_keys)
    my_free((gptr) sort_keys,MYF(0));
  info->sort_keys=0;
  delete_dynamic(& info->buffpek);
unknown's avatar
unknown committed
395 396 397
  close_cached_file(&info->tempfile);
  close_cached_file(&info->tempfile_for_exceptions);

unknown's avatar
unknown committed
398
ok:
unknown's avatar
unknown committed
399 400
  remove_io_thread(&info->read_cache);
  pthread_mutex_lock(&info->sort_info->mutex);
unknown's avatar
unknown committed
401
  info->sort_info->threads_running--;
unknown's avatar
unknown committed
402 403
  pthread_cond_signal(&info->sort_info->cond);
  pthread_mutex_unlock(&info->sort_info->mutex);
unknown's avatar
unknown committed
404
  return NULL;
405
}
unknown's avatar
unknown committed
406

unknown's avatar
unknown committed
407

408
int thr_write_keys(MI_SORT_PARAM *sort_param)
unknown's avatar
unknown committed
409 410 411 412 413
{
  SORT_INFO *sort_info=sort_param->sort_info;
  MI_CHECK *param=sort_info->param;
  ulong length, keys;
  ulong *rec_per_key_part=param->rec_per_key_part;
unknown's avatar
unknown committed
414 415
  int got_error=sort_info->got_error;
  uint i;
unknown's avatar
unknown committed
416 417 418 419
  MI_INFO *info=sort_info->info;
  MYISAM_SHARE *share=info->s;  
  MI_SORT_PARAM *sinfo;
  byte *mergebuf=0;
unknown's avatar
unknown committed
420
  LINT_INIT(length);
unknown's avatar
unknown committed
421

unknown's avatar
unknown committed
422 423 424
  for (i=0, sinfo=sort_param ;
       i < sort_info->total_keys ;
       i++, sinfo++, rec_per_key_part+=sinfo->keyinfo->keysegs)
unknown's avatar
unknown committed
425 426 427 428 429 430 431 432 433
  {
    if (!sinfo->sort_keys)
    {
      got_error=1;
      continue;
    }
    share->state.key_map|=(ulonglong) 1 << sinfo->key;
    if (param->testflag & T_STATISTICS)
      update_key_parts(sinfo->keyinfo, rec_per_key_part,
unknown's avatar
unknown committed
434
		       sinfo->unique, (ulonglong) info->state->records);
unknown's avatar
unknown committed
435 436 437
    if (!sinfo->buffpek.elements)
    {
      if (param->testflag & T_VERBOSE)
unknown's avatar
unknown committed
438 439 440 441 442 443
      {
        printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
	fflush(stdout);
      }
      if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
	  flush_pending_blocks(sinfo))
unknown's avatar
unknown committed
444 445 446 447 448 449
        got_error=1;
    }
    my_free((gptr) sinfo->sort_keys,MYF(0));
    sinfo->sort_keys=0;
  }

unknown's avatar
unknown committed
450 451 452 453 454
  for (i=0, sinfo=sort_param ;
       i < sort_info->total_keys ;
       i++, sinfo++, delete_dynamic(&sinfo->buffpek),
	 close_cached_file(&sinfo->tempfile),
	 close_cached_file(&sinfo->tempfile_for_exceptions))
unknown's avatar
unknown committed
455
  {
unknown's avatar
unknown committed
456 457
    if (got_error)
      continue;
unknown's avatar
unknown committed
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
    if (sinfo->buffpek.elements)
    {
      uint maxbuffer=sinfo->buffpek.elements-1;
      if (!mergebuf)
      {
        length=param->sort_buffer_length;
        while (length >= MIN_SORT_MEMORY && !mergebuf)
        {
          mergebuf=my_malloc(length, MYF(0));
          length=length*3/4;
        }
        if (!mergebuf)
        {
          got_error=1;
          continue;
        }
      }
      keys=length/sinfo->key_length;
      if (maxbuffer >= MERGEBUFF2)
      {
        if (param->testflag & T_VERBOSE)
unknown's avatar
unknown committed
479
          printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
unknown's avatar
unknown committed
480
        if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
unknown's avatar
unknown committed
481 482
			    dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
			    &maxbuffer, &sinfo->tempfile))
unknown's avatar
unknown committed
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
        {
          got_error=1;
          continue;
        }
      }
      if (flush_io_cache(&sinfo->tempfile) ||
          reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
      {
        got_error=1;
        continue;
      }
      if (param->testflag & T_VERBOSE)
        printf("Key %d  - Last merge and dumping keys", sinfo->key+1);
      if (merge_index(sinfo, keys, (uchar **)mergebuf,
                      dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
unknown's avatar
unknown committed
498 499
                      maxbuffer,&sinfo->tempfile) ||
	  flush_pending_blocks(sinfo))
unknown's avatar
unknown committed
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
      {
        got_error=1;
        continue;
      }
    }
    if (my_b_inited(&sinfo->tempfile_for_exceptions))
    {
      uint key_length;

      if (param->testflag & T_VERBOSE)
        printf("Key %d  - Dumping 'long' keys", sinfo->key+1);

      if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
          reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
      {
        got_error=1;
        continue;
      }

unknown's avatar
unknown committed
519 520 521 522 523
      while (!got_error &&
	     !my_b_read(&sinfo->tempfile_for_exceptions,(byte*)&key_length,
			sizeof(key_length)) &&
	     !my_b_read(&sinfo->tempfile_for_exceptions,(byte*)mergebuf,
			(uint) key_length))
unknown's avatar
unknown committed
524
      {
unknown's avatar
unknown committed
525 526 527
	if (_mi_ck_write(info,sinfo->key,(uchar*) mergebuf,
			 key_length - info->s->rec_reflength))
	  got_error=1;
unknown's avatar
unknown committed
528 529 530 531 532 533 534 535
      }
    }
  }
  my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
  return got_error;
}

        /* Write all keys in memory to file for later merge */
unknown's avatar
unknown committed
536 537

static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
unknown's avatar
unknown committed
538
                             uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
unknown's avatar
unknown committed
539
{
540 541
  uchar **end;
  uint sort_length=info->key_length;
unknown's avatar
unknown committed
542 543 544
  DBUG_ENTER("write_keys");

  qsort2((byte*) sort_keys,count,sizeof(byte*),(qsort2_cmp) info->key_cmp,
unknown's avatar
unknown committed
545
         info);
546 547
  if (!my_b_inited(tempfile) &&
      open_cached_file(tempfile, info->tmpdir, "ST", DISK_BUFFER_SIZE,
unknown's avatar
unknown committed
548
                       info->sort_info->param->myf_rw))
549
    DBUG_RETURN(1); /* purecov: inspected */
550

551
  buffpek->file_pos=my_b_tell(tempfile);
unknown's avatar
unknown committed
552
  buffpek->count=count;
553 554 555

  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
    if (my_b_write(tempfile,(byte*) *sort_keys,(uint) sort_length))
556
      DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
557 558 559 560
  DBUG_RETURN(0);
} /* write_keys */


unknown's avatar
unknown committed
561 562
static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key,
			    IO_CACHE *tempfile)
563
{
unknown's avatar
unknown committed
564
  uint key_length=info->real_key_length;
565 566 567 568
  DBUG_ENTER("write_key");

  if (!my_b_inited(tempfile) &&
      open_cached_file(tempfile, info->tmpdir, "ST", DISK_BUFFER_SIZE,
unknown's avatar
unknown committed
569
                       info->sort_info->param->myf_rw))
570 571 572 573 574 575 576 577
    DBUG_RETURN(1);

  if (my_b_write(tempfile,(byte*)&key_length,sizeof(key_length)) ||
      my_b_write(tempfile,(byte*)key,(uint) key_length))
    DBUG_RETURN(1);
  DBUG_RETURN(0);
} /* write_key */

unknown's avatar
unknown committed
578

unknown's avatar
unknown committed
579
        /* Write index */
unknown's avatar
unknown committed
580

581
static int NEAR_F write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
unknown's avatar
unknown committed
582
                              register uint count)
unknown's avatar
unknown committed
583 584 585 586
{
  DBUG_ENTER("write_index");

  qsort2((gptr) sort_keys,(size_t) count,sizeof(byte*),
unknown's avatar
unknown committed
587
        (qsort2_cmp) info->key_cmp,info);
unknown's avatar
unknown committed
588
  while (count--)
unknown's avatar
unknown committed
589
  {
unknown's avatar
unknown committed
590
    if ((*info->key_write)(info,*sort_keys++))
591
      DBUG_RETURN(-1); /* purecov: inspected */
unknown's avatar
unknown committed
592
  }
unknown's avatar
unknown committed
593 594 595 596
  DBUG_RETURN(0);
} /* write_index */


unknown's avatar
unknown committed
597
        /* Merge buffers to make < MERGEBUFF2 buffers */
unknown's avatar
unknown committed
598 599

static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys,
unknown's avatar
unknown committed
600 601
                                  uchar **sort_keys, BUFFPEK *buffpek,
                                  int *maxbuffer, IO_CACHE *t_file)
unknown's avatar
unknown committed
602 603
{
  register int i;
604
  IO_CACHE t_file2, *from_file, *to_file, *temp;
unknown's avatar
unknown committed
605 606 607
  BUFFPEK *lastbuff;
  DBUG_ENTER("merge_many_buff");

608
  if (*maxbuffer < MERGEBUFF2)
unknown's avatar
unknown committed
609
    DBUG_RETURN(0);                             /* purecov: inspected */
610 611
  if (flush_io_cache(t_file) ||
      open_cached_file(&t_file2,info->tmpdir,"ST",DISK_BUFFER_SIZE,
unknown's avatar
unknown committed
612 613
                       info->sort_info->param->myf_rw))
    DBUG_RETURN(1);                             /* purecov: inspected */
unknown's avatar
unknown committed
614

615
  from_file= t_file ; to_file= &t_file2;
unknown's avatar
unknown committed
616 617
  while (*maxbuffer >= MERGEBUFF2)
  {
618 619
    reinit_io_cache(from_file,READ_CACHE,0L,0,0);
    reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
unknown's avatar
unknown committed
620 621 622 623
    lastbuff=buffpek;
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
    {
      if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
unknown's avatar
unknown committed
624 625
                        buffpek+i,buffpek+i+MERGEBUFF-1))
        break; /* purecov: inspected */
unknown's avatar
unknown committed
626 627
    }
    if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
unknown's avatar
unknown committed
628
                      buffpek+i,buffpek+ *maxbuffer))
629
      break; /* purecov: inspected */
630
    if (flush_io_cache(to_file))
unknown's avatar
unknown committed
631
      break;                                    /* purecov: inspected */
unknown's avatar
unknown committed
632
    temp=from_file; from_file=to_file; to_file=temp;
633
    *maxbuffer= (int) (lastbuff-buffpek)-1;
unknown's avatar
unknown committed
634
  }
unknown's avatar
unknown committed
635
  close_cached_file(to_file);                   /* This holds old result */
636
  if (to_file == t_file)
unknown's avatar
unknown committed
637
    *t_file=t_file2;                            /* Copy result file */
unknown's avatar
unknown committed
638

unknown's avatar
unknown committed
639
  DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
unknown's avatar
unknown committed
640 641 642
} /* merge_many_buff */


unknown's avatar
unknown committed
643 644 645 646 647 648 649 650 651 652 653 654
/*
   Read data to buffer

  SYNOPSIS
    read_to_buffer()
    fromfile		File to read from
    buffpek		Where to read from
    sort_length		max length to read
  RESULT
    > 0	Ammount of bytes read
    -1	Error
*/
unknown's avatar
unknown committed
655

656
static uint NEAR_F read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
unknown's avatar
unknown committed
657
                                  uint sort_length)
unknown's avatar
unknown committed
658 659 660 661
{
  register uint count;
  uint length;

662
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
unknown's avatar
unknown committed
663
  {
664
    if (my_pread(fromfile->file,(byte*) buffpek->base,
unknown's avatar
unknown committed
665 666
                 (length= sort_length*count),buffpek->file_pos,MYF_RW))
      return((uint) -1);                        /* purecov: inspected */
unknown's avatar
unknown committed
667
    buffpek->key=buffpek->base;
unknown's avatar
unknown committed
668 669
    buffpek->file_pos+= length;                 /* New filepos */
    buffpek->count-=    count;
unknown's avatar
unknown committed
670 671 672 673 674 675
    buffpek->mem_count= count;
  }
  return (count*sort_length);
} /* read_to_buffer */


unknown's avatar
unknown committed
676 677 678 679
/*
  Merge buffers to one buffer
  If to_file == 0 then use info->key_write
*/
unknown's avatar
unknown committed
680 681

static int NEAR_F
682
merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
unknown's avatar
unknown committed
683 684
              IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
              BUFFPEK *Fb, BUFFPEK *Tb)
unknown's avatar
unknown committed
685 686 687 688 689 690 691 692 693 694 695 696 697 698
{
  int error;
  uint sort_length,maxcount;
  ha_rows count;
  my_off_t to_start_filepos;
  uchar *strpos;
  BUFFPEK *buffpek,**refpek;
  QUEUE queue;
  DBUG_ENTER("merge_buffers");

  count=error=0;
  maxcount=keys/((uint) (Tb-Fb) +1);
  LINT_INIT(to_start_filepos);
  if (to_file)
699
    to_start_filepos=my_b_tell(to_file);
unknown's avatar
unknown committed
700
  strpos=(uchar*) sort_keys;
701
  sort_length=info->key_length;
unknown's avatar
unknown committed
702 703

  if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
unknown's avatar
unknown committed
704
                 (int (*)(void*, byte *,byte*)) info->key_cmp,
unknown's avatar
unknown committed
705
                 (void*) info))
706
    DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
707

708
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
unknown's avatar
unknown committed
709 710 711 712 713
  {
    count+= buffpek->count;
    buffpek->base= strpos;
    buffpek->max_keys=maxcount;
    strpos+= (uint) (error=(int) read_to_buffer(from_file,buffpek,
unknown's avatar
unknown committed
714
                                                sort_length));
715 716
    if (error == -1)
      goto err; /* purecov: inspected */
unknown's avatar
unknown committed
717
    queue_insert(&queue,(char*) buffpek);
unknown's avatar
unknown committed
718 719 720 721 722 723 724 725 726
  }

  while (queue.elements > 1)
  {
    for (;;)
    {
      buffpek=(BUFFPEK*) queue_top(&queue);
      if (to_file)
      {
unknown's avatar
unknown committed
727 728 729 730
        if (my_b_write(to_file,(byte*) buffpek->key,(uint) sort_length))
        {
          error=1; goto err; /* purecov: inspected */
        }
unknown's avatar
unknown committed
731 732 733
      }
      else
      {
unknown's avatar
unknown committed
734 735 736 737
        if ((*info->key_write)(info,(void*) buffpek->key))
        {
          error=1; goto err; /* purecov: inspected */
        }
unknown's avatar
unknown committed
738 739 740 741
      }
      buffpek->key+=sort_length;
      if (! --buffpek->mem_count)
      {
unknown's avatar
unknown committed
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
        if (!(error=(int) read_to_buffer(from_file,buffpek,sort_length)))
        {
          uchar *base=buffpek->base;
          uint max_keys=buffpek->max_keys;

          VOID(queue_remove(&queue,0));

          /* Put room used by buffer to use in other buffer */
          for (refpek= (BUFFPEK**) &queue_top(&queue);
               refpek <= (BUFFPEK**) &queue_end(&queue);
               refpek++)
          {
            buffpek= *refpek;
            if (buffpek->base+buffpek->max_keys*sort_length == base)
            {
              buffpek->max_keys+=max_keys;
              break;
            }
            else if (base+max_keys*sort_length == buffpek->base)
            {
              buffpek->base=base;
              buffpek->max_keys+=max_keys;
              break;
            }
          }
          break;                /* One buffer have been removed */
        }
unknown's avatar
unknown committed
769
      }
770
      else if (error == -1)
unknown's avatar
unknown committed
771 772
        goto err;               /* purecov: inspected */
      queue_replaced(&queue);   /* Top element has been replaced */
unknown's avatar
unknown committed
773 774 775 776 777 778 779 780 781
    }
  }
  buffpek=(BUFFPEK*) queue_top(&queue);
  buffpek->base=(uchar *) sort_keys;
  buffpek->max_keys=keys;
  do
  {
    if (to_file)
    {
782
      if (my_b_write(to_file,(byte*) buffpek->key,
unknown's avatar
unknown committed
783
                     (sort_length*buffpek->mem_count)))
unknown's avatar
unknown committed
784
      {
unknown's avatar
unknown committed
785
        error=1; goto err; /* purecov: inspected */
unknown's avatar
unknown committed
786 787 788 789 790 791 792
      }
    }
    else
    {
      register uchar *end;
      strpos= buffpek->key;
      for (end=strpos+buffpek->mem_count*sort_length;
unknown's avatar
unknown committed
793 794
           strpos != end ;
           strpos+=sort_length)
unknown's avatar
unknown committed
795
      {
unknown's avatar
unknown committed
796 797 798 799
        if ((*info->key_write)(info,(void*) strpos))
        {
          error=1; goto err; /* purecov: inspected */
        }
unknown's avatar
unknown committed
800 801 802 803
      }
    }
  }
  while ((error=(int) read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
unknown's avatar
unknown committed
804
         error != 0);
unknown's avatar
unknown committed
805 806 807

  lastbuff->count=count;
  if (to_file)
808
    lastbuff->file_pos=to_start_filepos;
unknown's avatar
unknown committed
809 810 811 812 813 814
err:
  delete_queue(&queue);
  DBUG_RETURN(error);
} /* merge_buffers */


unknown's avatar
unknown committed
815
        /* Do a merge to output-file (save only positions) */
unknown's avatar
unknown committed
816 817 818

static int NEAR_F
merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
unknown's avatar
unknown committed
819
            BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
unknown's avatar
unknown committed
820 821
{
  DBUG_ENTER("merge_index");
822
  if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
unknown's avatar
unknown committed
823
                    buffpek+maxbuffer))
824
    DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
825 826 827
  DBUG_RETURN(0);
} /* merge_index */