mf_keycache.c 64.1 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8
/* Copyright (C) 2000 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
unknown's avatar
unknown committed
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
unknown's avatar
unknown committed
10 11 12 13 14 15
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
unknown's avatar
unknown committed
16 17

/*
18 19 20 21
  These functions are to handle keyblock cacheing
  for NISAM, MISAM and PISAM databases.
  One cache can handle many files.
  It must contain buffers of the same blocksize.
unknown's avatar
unknown committed
22
  init_key_cache() should be used to init cache handler.
23
*/
unknown's avatar
unknown committed
24 25 26 27 28

#include "mysys_priv.h"
#include "my_static.h"
#include <m_string.h>
#include <errno.h>
29 30 31
#include <assert.h>
#include <stdarg.h>

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
/*
  Some compilation flags have been added specifically for this module
  to control the following:
  - not to let a thread to yield the control when reading directly
    from key cache, which might improve performance in many cases;
    to enable this add:
    #define SERIALIZED_READ_FROM_CACHE
  - to set an upper bound for number of threads simultaneously
    using the key cache; this setting helps to determine an optimal
    size for hash table and improve performance when the number of
    blocks in the key cache much less than the number of threads
    accessing it;
    to set this number equal to <N> add
      #define MAX_THREADS <N>
  - to substitute calls of pthread_cond_wait for calls of
    pthread_cond_timedwait (wait with timeout set up);
    this setting should be used only when you want to trap a deadlock
    situation, which theoretically should not happen;
    to set timeout equal to <T> seconds add
      #define KEYCACHE_TIMEOUT <T>
  - to enable the module traps and to send debug information from
    key cache module to a special debug log add:
      #define KEYCACHE_DEBUG
    the name of this debug log file <LOG NAME> can be set through:
      #define KEYCACHE_DEBUG_LOG  <LOG NAME>
    if the name is not defined, it's set by default;
    if the KEYCACHE_DEBUG flag is not set up and we are in a debug
    mode, i.e. when ! defined(DBUG_OFF), the debug information from the
    module is sent to the regular debug log.

  Example of the settings:
    #define SERIALIZED_READ_FROM_CACHE
    #define MAX_THREADS   100
    #define KEYCACHE_TIMEOUT  1
    #define KEYCACHE_DEBUG
    #define KEYCACHE_DEBUG_LOG  "my_key_cache_debug.log"
68
*/
unknown's avatar
unknown committed
69 70

#if defined(MSDOS) && !defined(M_IC80386)
71
/* we nead much memory */
unknown's avatar
unknown committed
72 73
#undef my_malloc_lock
#undef my_free_lock
74 75 76 77 78 79 80 81 82 83 84 85
#define my_malloc_lock(A,B)  halloc((long) (A/IO_SIZE),IO_SIZE)
#define my_free_lock(A,B)    hfree(A)
#endif /* defined(MSDOS) && !defined(M_IC80386) */

#define STRUCT_PTR(TYPE, MEMBER, a)                                           \
          (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))

/* types of condition variables */
#define  COND_FOR_REQUESTED 0
#define  COND_FOR_SAVED     1
#define  COND_FOR_READERS   2

86 87 88 89 90
typedef pthread_cond_t KEYCACHE_CONDVAR;

/* info about requests in a waiting queue */
typedef struct st_keycache_wqueue
{
91 92 93
  struct st_my_thread_var *last_thread;  /* circular list of waiting threads */
} KEYCACHE_WQUEUE;

94
/* descriptor of the page in the key cache block buffer */
95
typedef struct st_keycache_page
96
{
97 98 99 100
  int file;               /* file to which the page belongs to  */
  my_off_t filepos;       /* position of the page in the file   */
} KEYCACHE_PAGE;

101 102 103
/* element in the chain of a hash table bucket */
typedef struct st_hash_link
{
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
  struct st_hash_link *next, **prev; /* to connect links in the same bucket  */
  struct st_block_link *block;       /* reference to the block for the page: */
  File file;                         /* from such a file                     */
  my_off_t diskpos;                  /* with such an offset                  */
  uint requests;                     /* number of requests for the page      */
} HASH_LINK;            /* offset is always alighed for key_cache_block_size */

/* simple states of a block */
#define BLOCK_ERROR       1   /* an error occured when performing disk i/o   */
#define BLOCK_READ        2   /* the is page in the block buffer             */
#define BLOCK_IN_SWITCH   4   /* block is preparing to read new page         */
#define BLOCK_REASSIGNED  8   /* block does not accept requests for old page */
#define BLOCK_IN_FLUSH   16   /* block is in flush operation                 */
#define BLOCK_CHANGED    32   /* block buffer contains a dirty page          */

/* page status, returned by find_key_block */
#define PAGE_READ               0
#define PAGE_TO_BE_READ         1
#define PAGE_WAIT_TO_BE_READ    2

124
/* key cache block */
125
typedef struct st_block_link
126
{
127 128 129 130 131 132 133 134 135 136 137 138 139
  struct st_block_link
    *next_used, **prev_used;   /* to connect links in the LRU chain (ring)   */
  struct st_block_link
    *next_changed, **prev_changed; /* for lists of file dirty/clean blocks   */
  struct st_hash_link *hash_link; /* backward ptr to referring hash_link     */
  KEYCACHE_WQUEUE wqueue[2]; /* queues on waiting requests for new/old pages */
  uint requests;          /* number of requests for the block                */
  byte *buffer;           /* buffer for the block page                       */
  uint offset;            /* beginning of modified data in the buffer        */
  uint length;            /* end of data in the buffer                       */
  uint status;            /* state of the block                              */
  KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event    */
} BLOCK_LINK;
unknown's avatar
unknown committed
140

unknown's avatar
unknown committed
141 142 143 144 145 146
void *dflt_keycache;
uint key_cache_block_size;     /* size of the page buffer of a cache block */
ulong  my_cache_w_requests, my_cache_write, /* counters                     */
       my_cache_r_requests, my_cache_read;  /* for statistics               */
ulong  my_blocks_used,             /* number of currently used blocks       */
       my_blocks_changed;          /* number of currently dirty blocks      */
147 148 149 150

#define CHANGED_BLOCKS_HASH 128             /* must be power of 2            */
#define FLUSH_CACHE         2000            /* sort this many blocks at once */

unknown's avatar
unknown committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
typedef struct st_key_cache
{
  my_bool key_cache_inited;
  uint key_cache_shift;
  uint key_cache_block_size;     /* size of the page buffer of a cache block */
  uint hash_entries;             /* max number of entries in the hash table  */
  int hash_links;                /* max number of hash links                 */
  int hash_links_used;           /* number of hash links currently used      */
  int disk_blocks;               /* max number of blocks in the cache        */
  ulong blocks_used;             /* number of currently used blocks          */
  ulong blocks_changed;          /* number of currently dirty blocks         */
  ulong cache_w_requests;
  ulong cache_write;
  ulong cache_r_requests;
  ulong cache_read;
166
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  ulong_blocks_available;     /* number of blocks available in the LRU chain */
#endif 
  HASH_LINK **hash_root;         /* arr. of entries into hash table buckets  */
  HASH_LINK *hash_link_root;     /* memory for hash table links              */
  HASH_LINK *free_hash_list;     /* list of free hash links                  */
  BLOCK_LINK *block_root;        /* memory for block links                   */
  byte HUGE_PTR *block_mem;      /* memory for block buffers                 */
  BLOCK_LINK *used_last;         /* ptr to the last block of the LRU chain   */
  pthread_mutex_t thr_lock_keycache;
  KEYCACHE_WQUEUE waiting_for_hash_link; /* waiting for a free hash link     */
  KEYCACHE_WQUEUE waiting_for_block;    /* requests waiting for a free block */
  BLOCK_LINK *changed_blocks[CHANGED_BLOCKS_HASH]; /* hash for dirty file bl.*/
  BLOCK_LINK *file_blocks[CHANGED_BLOCKS_HASH];    /* hash for other file bl.*/
} KEY_CACHE;

static int flush_all_key_blocks();
static void test_key_cache(KEY_CACHE *keycache, 
                           const char *where, my_bool lock);

186
#define KEYCACHE_HASH(f, pos)                                                 \
unknown's avatar
unknown committed
187 188
(((ulong) ((pos) >> keycache->key_cache_shift)+                               \
                                     (ulong) (f)) & (keycache->hash_entries-1))
189
#define FILE_HASH(f)                 ((uint) (f) & (CHANGED_BLOCKS_HASH-1))
unknown's avatar
unknown committed
190

191
#define DEFAULT_KEYCACHE_DEBUG_LOG  "keycache_debug.log"
unknown's avatar
unknown committed
192

193 194 195
#if defined(KEYCACHE_DEBUG) && ! defined(KEYCACHE_DEBUG_LOG)
#define KEYCACHE_DEBUG_LOG  DEFAULT_KEYCACHE_DEBUG_LOG
#endif
unknown's avatar
unknown committed
196

197 198 199 200 201 202 203 204 205
#if defined(KEYCACHE_DEBUG_LOG)
static FILE *keycache_debug_log=NULL;
static void keycache_debug_print _VARARGS((const char *fmt,...));
#define KEYCACHE_DEBUG_OPEN                                                   \
          keycache_debug_log=fopen(KEYCACHE_DEBUG_LOG, "w")

#define KEYCACHE_DEBUG_CLOSE                                                  \
          if (keycache_debug_log) fclose(keycache_debug_log)
#else
206
#define KEYCACHE_DEBUG_OPEN
207 208 209
#define KEYCACHE_DEBUG_CLOSE
#endif /* defined(KEYCACHE_DEBUG_LOG) */

210
#if defined(KEYCACHE_DEBUG_LOG) && defined(KEYCACHE_DEBUG)
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
#define KEYCACHE_DBUG_PRINT(l, m)                                             \
            { if (keycache_debug_log) fprintf(keycache_debug_log, "%s: ", l); \
              keycache_debug_print m; }

#define KEYCACHE_DBUG_ASSERT(a)                                               \
            { if (! (a) && keycache_debug_log) fclose(keycache_debug_log);    \
              assert(a); }
#else
#define KEYCACHE_DBUG_PRINT(l, m)  DBUG_PRINT(l, m)
#define KEYCACHE_DBUG_ASSERT(a)    DBUG_ASSERT(a)
#endif /* defined(KEYCACHE_DEBUG_LOG) && defined(KEYCACHE_DEBUG) */

#if defined(KEYCACHE_DEBUG) || !defined(DBUG_OFF)
static long keycache_thread_id;
#define KEYCACHE_THREAD_TRACE(l)                                              \
             KEYCACHE_DBUG_PRINT(l,("|thread %ld",keycache_thread_id))

#define KEYCACHE_THREAD_TRACE_BEGIN(l)                                        \
            { struct st_my_thread_var *thread_var =my_thread_var;             \
              keycache_thread_id=my_thread_var->id;                           \
              KEYCACHE_DBUG_PRINT(l,("[thread %ld",keycache_thread_id)) }

#define KEYCACHE_THREAD_TRACE_END(l)                                          \
            KEYCACHE_DBUG_PRINT(l,("]thread %ld",keycache_thread_id))
#else
236 237 238
#define KEYCACHE_THREAD_TRACE_BEGIN(l)
#define KEYCACHE_THREAD_TRACE_END(l)
#define KEYCACHE_THREAD_TRACE(l)
239 240 241
#endif /* defined(KEYCACHE_DEBUG) || !defined(DBUG_OFF) */

#define BLOCK_NUMBER(b)                                                       \
unknown's avatar
unknown committed
242
  ((uint) (((char*)(b)-(char *) keycache->block_root)/sizeof(BLOCK_LINK)))
243
#define HASH_LINK_NUMBER(h)                                                   \
unknown's avatar
unknown committed
244
  ((uint) (((char*)(h)-(char *) keycache->hash_link_root)/sizeof(HASH_LINK)))
245 246 247 248 249 250

#if (defined(KEYCACHE_TIMEOUT) && !defined(__WIN__)) || defined(KEYCACHE_DEBUG)
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex);
#else
#define  keycache_pthread_cond_wait pthread_cond_wait
unknown's avatar
unknown committed
251 252
#endif

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
#if defined(KEYCACHE_DEBUG)
static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex);
static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex);
static int keycache_pthread_cond_signal(pthread_cond_t *cond);
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond);
#else
#define keycache_pthread_mutex_lock pthread_mutex_lock
#define keycache_pthread_mutex_unlock pthread_mutex_unlock
#define keycache_pthread_cond_signal pthread_cond_signal
#define keycache_pthread_cond_broadcast pthread_cond_broadcast
#endif /* defined(KEYCACHE_DEBUG) */

static uint next_power(uint value)
{
  uint old_value=1;
  while (value)
  {
    old_value=value;
    value&= value-1;
  }
  return (old_value << 1);
}
unknown's avatar
unknown committed
275 276


277
/*
278 279
  Initialize the key cache,
  return number of blocks in it
280
*/
281

unknown's avatar
unknown committed
282 283
int init_key_cache(void **pkeycache, uint key_cache_block_size,
                   ulong use_mem)
unknown's avatar
unknown committed
284
{
285 286
  uint blocks, hash_links, length;
  int error;
unknown's avatar
unknown committed
287
  KEY_CACHE *keycache;
288

unknown's avatar
unknown committed
289
  DBUG_ENTER("init_key_cache");
290

unknown's avatar
unknown committed
291 292 293 294 295 296 297
  if (!*pkeycache)
  {
    if (!(*pkeycache= my_malloc(sizeof(KEY_CACHE), MYF(MY_ZEROFILL))))
      DBUG_RETURN(0);
  }
  keycache= (KEY_CACHE *) *pkeycache;

298
  KEYCACHE_DEBUG_OPEN;
unknown's avatar
unknown committed
299
  if (keycache->key_cache_inited && keycache->disk_blocks > 0)
unknown's avatar
unknown committed
300
  {
301 302
    DBUG_PRINT("warning",("key cache already in use"));
    DBUG_RETURN(0);
unknown's avatar
unknown committed
303
  }
unknown's avatar
unknown committed
304
  if (! keycache->key_cache_inited)
unknown's avatar
unknown committed
305
  {
unknown's avatar
unknown committed
306 307 308 309 310
    keycache->key_cache_inited= TRUE;
    keycache->disk_blocks= -1;
    pthread_mutex_init(&keycache->thr_lock_keycache, MY_MUTEX_INIT_FAST);
    keycache->key_cache_shift= my_bit_log2(key_cache_block_size);
    keycache->key_cache_block_size= key_cache_block_size;
311
    DBUG_PRINT("info",("key_cache_block_size: %u",
312
               key_cache_block_size));
unknown's avatar
unknown committed
313
  }
314

unknown's avatar
unknown committed
315 316
  keycache->cache_w_requests= keycache->cache_r_requests= 0;
  keycache->cache_read= keycache->cache_write=0;
317

unknown's avatar
unknown committed
318 319
  keycache->block_mem= NULL;
  keycache->block_root= NULL;
320

321 322 323
  blocks= (uint) (use_mem/(sizeof(BLOCK_LINK)+2*sizeof(HASH_LINK)+
                           sizeof(HASH_LINK*)*5/4+key_cache_block_size));
  /* It doesn't make sense to have too few blocks (less than 8) */
unknown's avatar
unknown committed
324
  if (blocks >= 8 && keycache->disk_blocks < 0)
unknown's avatar
unknown committed
325 326 327
  {
    for (;;)
    {
328
      /* Set my_hash_entries to the next bigger 2 power */
unknown's avatar
unknown committed
329 330 331
      if ((keycache->hash_entries= next_power(blocks)) < blocks*5/4)
        keycache->hash_entries<<= 1;
      hash_links= 2*blocks;
332 333 334 335
#if defined(MAX_THREADS)
      if (hash_links < MAX_THREADS + blocks - 1)
        hash_links=MAX_THREADS + blocks - 1;
#endif
336 337
      while ((length=(ALIGN_SIZE(blocks*sizeof(BLOCK_LINK))+
		      ALIGN_SIZE(hash_links*sizeof(HASH_LINK))+
unknown's avatar
unknown committed
338 339
		      ALIGN_SIZE(sizeof(HASH_LINK*)*keycache->hash_entries)))+
	     ((ulong) blocks << keycache->key_cache_shift) > use_mem)
340 341
        blocks--;
      /* Allocate memory for cache page buffers */
unknown's avatar
unknown committed
342 343 344
      if ((keycache->block_mem= 
             my_malloc_lock((ulong) blocks*keycache->key_cache_block_size,
			    MYF(0))))
345
      {
346
        /*
347
           Allocate memory for blocks, hash_links and hash entries;
348
           For each block 2 hash links are allocated
349
        */
unknown's avatar
unknown committed
350 351
        if ((keycache->block_root= (BLOCK_LINK*) my_malloc((uint) length,
                                                           MYF(0))))
352
          break;
unknown's avatar
unknown committed
353
        my_free_lock(keycache->block_mem, MYF(0));
354
      }
355
      if (blocks < 8)
unknown's avatar
unknown committed
356
      {
unknown's avatar
unknown committed
357
        my_errno= ENOMEM;
358
        goto err;
unknown's avatar
unknown committed
359
      }
unknown's avatar
unknown committed
360
      blocks= blocks/4*3;
unknown's avatar
unknown committed
361
    }
unknown's avatar
unknown committed
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    keycache->disk_blocks= (int) blocks;
    keycache->hash_links= hash_links;
    keycache->hash_root= (HASH_LINK**) ((char*) keycache->block_root +
				        ALIGN_SIZE(blocks*sizeof(BLOCK_LINK)));
    keycache->hash_link_root= (HASH_LINK*) ((char*) keycache->hash_root +
				            ALIGN_SIZE((sizeof(HASH_LINK*) *
						    keycache->hash_entries)));
    bzero((byte*) keycache->block_root,
           keycache->disk_blocks*sizeof(BLOCK_LINK));
    bzero((byte*) keycache->hash_root,
          keycache->hash_entries*sizeof(HASH_LINK*));
    bzero((byte*) keycache->hash_link_root,
           keycache->hash_links*sizeof(HASH_LINK));
    keycache->hash_links_used= 0;
    keycache->free_hash_list= NULL;
    keycache->blocks_used= keycache->blocks_changed= 0;
378
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
379
    keycache->_blocks_available=0;
380 381
#endif
    /* The LRU chain is empty after initialization */
unknown's avatar
unknown committed
382
    keycache->used_last=NULL;
383

unknown's avatar
unknown committed
384 385
    keycache->waiting_for_hash_link.last_thread= NULL;
    keycache->waiting_for_block.last_thread= NULL;
386 387 388
    DBUG_PRINT("exit",
      ("disk_blocks: %d  block_root: %lx  hash_entries: %d  hash_root: %lx  \
       hash_links: %d hash_link_root %lx",
unknown's avatar
unknown committed
389 390 391
       keycache->disk_blocks, keycache->block_root,
       keycache->hash_entries, keycache->hash_root,
       keycache->hash_links, keycache->hash_link_root));
unknown's avatar
unknown committed
392
  }
unknown's avatar
unknown committed
393 394 395 396
  bzero((gptr) keycache->changed_blocks,
        sizeof(keycache->changed_blocks[0])*CHANGED_BLOCKS_HASH);
  bzero((gptr) keycache->file_blocks,
        sizeof(keycache->file_blocks[0])*CHANGED_BLOCKS_HASH);
397

unknown's avatar
unknown committed
398
  DBUG_RETURN((int) blocks);
399

unknown's avatar
unknown committed
400
err:
unknown's avatar
unknown committed
401 402 403 404 405 406
  error= my_errno;
  if (keycache->block_mem)
    my_free_lock((gptr) keycache->block_mem, MYF(0));
  if (keycache->block_mem)
    my_free((gptr) keycache->block_root,MYF(0));
  my_errno= error;
unknown's avatar
unknown committed
407
  DBUG_RETURN(0);
408
}
unknown's avatar
unknown committed
409 410


unknown's avatar
unknown committed
411
/*
412
  Resize the key cache
unknown's avatar
unknown committed
413
*/
unknown's avatar
unknown committed
414
int resize_key_cache(void **pkeycache, ulong use_mem)
unknown's avatar
unknown committed
415
{
416
  int blocks;
unknown's avatar
unknown committed
417 418 419 420
  KEY_CACHE *keycache= (KEY_CACHE *) *pkeycache;
  uint key_cache_block_size= keycache->key_cache_block_size;

  keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
unknown's avatar
unknown committed
421 422
  if (flush_all_key_blocks())
  {
423
    /* TODO: if this happens, we should write a warning in the log file ! */
unknown's avatar
unknown committed
424
    keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
unknown's avatar
unknown committed
425 426
    return 0;
  }
unknown's avatar
unknown committed
427 428
  keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
  end_key_cache(pkeycache, 0);
429
  /* the following will work even if memory is 0 */
unknown's avatar
unknown committed
430
  blocks=init_key_cache(pkeycache, key_cache_block_size, use_mem);
431
  return blocks;
unknown's avatar
unknown committed
432 433 434
}


435
/*
436
  Remove key_cache from memory
437
*/
438

unknown's avatar
unknown committed
439
void end_key_cache(void **pkeycache, my_bool cleanup)
unknown's avatar
unknown committed
440
{
unknown's avatar
unknown committed
441
  KEY_CACHE *keycache= (KEY_CACHE *) *pkeycache;
unknown's avatar
unknown committed
442
  DBUG_ENTER("end_key_cache");
unknown's avatar
unknown committed
443
  if (keycache->disk_blocks > 0)
unknown's avatar
unknown committed
444
  {
unknown's avatar
unknown committed
445
    if (keycache->block_mem)
unknown's avatar
unknown committed
446
    {
unknown's avatar
unknown committed
447 448
      my_free_lock((gptr) keycache->block_mem, MYF(0));
      my_free((gptr) keycache->block_root, MYF(0));
unknown's avatar
unknown committed
449
    }
unknown's avatar
unknown committed
450
    keycache->disk_blocks= -1;
unknown's avatar
unknown committed
451
  }
452
  KEYCACHE_DEBUG_CLOSE;
unknown's avatar
unknown committed
453
  keycache->key_cache_inited=0;
unknown's avatar
unknown committed
454
  DBUG_PRINT("status",
455 456
             ("used: %d  changed: %d  w_requests: %ld  \
              writes: %ld  r_requests: %ld  reads: %ld",
unknown's avatar
unknown committed
457 458 459 460 461 462 463 464 465
              keycache->blocks_used, keycache->blocks_changed, 
              keycache->cache_w_requests, keycache->cache_write,
              keycache->cache_r_requests, keycache->cache_read));
  if (cleanup)
  {
    pthread_mutex_destroy(&keycache->thr_lock_keycache);
    my_free(*pkeycache, MYF(0));
    *pkeycache= NULL;
  }
unknown's avatar
unknown committed
466 467 468 469
  DBUG_VOID_RETURN;
} /* end_key_cache */


470
/*
471
  Link a thread into double-linked queue of waiting threads
472
*/
473

474 475
static inline void link_into_queue(KEYCACHE_WQUEUE *wqueue,
                                   struct st_my_thread_var *thread)
476
{
477 478
  struct st_my_thread_var *last;
  if (! (last=wqueue->last_thread))
479 480
  {
    /* Queue is empty */
481 482 483 484
    thread->next=thread;
    thread->prev=&thread->next;
  }
  else
485
  {
486 487 488 489 490 491 492 493 494
    thread->prev=last->next->prev;
    last->next->prev=&thread->next;
    thread->next=last->next;
    last->next=thread;
  }
  wqueue->last_thread=thread;
}

/*
495
  Unlink a thread from double-linked queue of waiting threads
496
*/
497

498 499
static inline void unlink_from_queue(KEYCACHE_WQUEUE *wqueue,
                                     struct st_my_thread_var *thread)
500
{
501 502 503 504
  KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id));
  if (thread->next == thread)
    /* The queue contains only one member */
    wqueue->last_thread=NULL;
505
  else
506 507 508 509 510 511 512 513 514 515 516 517
  {
    thread->next->prev=thread->prev;
    *thread->prev=thread->next;
    if (wqueue->last_thread == thread)
      wqueue->last_thread=STRUCT_PTR(struct st_my_thread_var, next,
                                     thread->prev);
  }
  thread->next=NULL;
}


/*
518
  Add a thread to single-linked queue of waiting threads
519
*/
520

521 522
static inline void add_to_queue(KEYCACHE_WQUEUE *wqueue,
                                struct st_my_thread_var *thread)
523
{
524 525 526 527
  struct st_my_thread_var *last;
  if (! (last=wqueue->last_thread))
    thread->next=thread;
  else
528
  {
529 530 531 532 533 534 535 536
    thread->next=last->next;
    last->next=thread;
  }
  wqueue->last_thread=thread;
}


/*
537
  Remove all threads from queue signaling them to proceed
538
*/
539 540 541

static void release_queue(KEYCACHE_WQUEUE *wqueue)
{
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
  struct st_my_thread_var *last=wqueue->last_thread;
  struct st_my_thread_var *next=last->next;
  struct st_my_thread_var *thread;
  do
  {
    thread=next;
    keycache_pthread_cond_signal(&thread->suspend);
    KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
    next=thread->next;
    thread->next=NULL;
  }
  while (thread != last);
  wqueue->last_thread=NULL;
}


/*
559
  Unlink a block from the chain of dirty/clean blocks
560
*/
561

562
static inline void unlink_changed(BLOCK_LINK *block)
unknown's avatar
unknown committed
563
{
564 565 566
  if (block->next_changed)
    block->next_changed->prev_changed=block->prev_changed;
  *block->prev_changed=block->next_changed;
unknown's avatar
unknown committed
567 568 569
}


570
/*
571
  Link a block into the chain of dirty/clean blocks
572
*/
573

574
static inline void link_changed(BLOCK_LINK *block, BLOCK_LINK **phead)
unknown's avatar
unknown committed
575
{
576 577 578 579
  block->prev_changed=phead;
  if ((block->next_changed=*phead))
    (*phead)->prev_changed= &block->next_changed;
  *phead=block;
unknown's avatar
unknown committed
580 581
}

582 583

/*
584 585
  Unlink a block from the chain of dirty/clean blocks, if it's asked for,
  and link it to the chain of clean blocks for the specified file
586
*/
587

unknown's avatar
unknown committed
588 589
static void link_to_file_list(KEY_CACHE *keycache,
                              BLOCK_LINK *block, int file, my_bool unlink)
unknown's avatar
unknown committed
590
{
591 592
  if (unlink)
    unlink_changed(block);
unknown's avatar
unknown committed
593
  link_changed(block,&keycache->file_blocks[FILE_HASH(file)]);
594 595 596
  if (block->status & BLOCK_CHANGED)
  {
    block->status&=~BLOCK_CHANGED;
unknown's avatar
unknown committed
597
    keycache->blocks_changed--;
598
  }
unknown's avatar
unknown committed
599 600
}

601

602 603 604
/*
  Unlink a block from the chain of clean blocks for the specified
  file and link it to the chain of dirty blocks for this file
605
*/
606

unknown's avatar
unknown committed
607 608
static inline void link_to_changed_list(KEY_CACHE *keycache,
                                        BLOCK_LINK *block)
unknown's avatar
unknown committed
609
{
610
  unlink_changed(block);
unknown's avatar
unknown committed
611 612
  link_changed(block,
               &keycache->changed_blocks[FILE_HASH(block->hash_link->file)]);
613
  block->status|=BLOCK_CHANGED;
unknown's avatar
unknown committed
614
  keycache->blocks_changed++;
unknown's avatar
unknown committed
615 616 617
}


618
/*
619
  Link a block to the LRU chain at the beginning or at the end
620
*/
621

unknown's avatar
unknown committed
622
static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool at_end)
623 624
{
  KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
unknown's avatar
unknown committed
625
  if (keycache->waiting_for_block.last_thread) {
626
    /* Signal that in the LRU chain an available block has appeared */
unknown's avatar
unknown committed
627 628 629 630
    struct st_my_thread_var *last_thread=
                               keycache->waiting_for_block.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
631 632 633 634 635 636
    HASH_LINK *hash_link= (HASH_LINK *) first_thread->opt_info;
    struct st_my_thread_var *thread;
    do
    {
      thread=next_thread;
      next_thread=thread->next;
637
      /*
638 639 640 641 642 643
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if ((HASH_LINK *) thread->opt_info == hash_link)
      {
        keycache_pthread_cond_signal(&thread->suspend);
unknown's avatar
unknown committed
644
        unlink_from_queue(&keycache->waiting_for_block, thread);
645 646
        block->requests++;
      }
647
    }
648
    while (thread != last_thread);
unknown's avatar
unknown committed
649
    hash_link->block= block;
650 651
    KEYCACHE_THREAD_TRACE("link_block: after signaling");
#if defined(KEYCACHE_DEBUG)
652
    KEYCACHE_DBUG_PRINT("link_block",
653
        ("linked,unlinked block %u  status=%x  #requests=%u  #available=%u",
unknown's avatar
unknown committed
654 655
         BLOCK_NUMBER(block), block->status,
         block->requests, blocks_available));
unknown's avatar
unknown committed
656
#endif
657 658
    return;
  }
unknown's avatar
unknown committed
659
  if (keycache->used_last)
660
  {
unknown's avatar
unknown committed
661 662 663 664
    keycache->used_last->next_used->prev_used= &block->next_used;
    block->next_used= keycache->used_last->next_used;
    block->prev_used= &keycache->used_last->next_used;
    keycache->used_last->next_used= block;
665
    if (at_end)
unknown's avatar
unknown committed
666
      keycache->used_last= block;
667 668 669 670
  }
  else
  {
    /* The LRU chain is empty */
unknown's avatar
unknown committed
671 672
    keycache->used_last=block->next_used= block;
    block->prev_used= &block->next_used;
673 674 675
  }
  KEYCACHE_THREAD_TRACE("link_block");
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
676
  keycache->blocks_available++;
677
  KEYCACHE_DBUG_PRINT("link_block",
678 679
      ("linked block %u:%1u  status=%x  #requests=%u  #available=%u",
       BLOCK_NUMBER(block),at_end,block->status,
unknown's avatar
unknown committed
680 681
       block->requests, keycache->blocks_available));
  KEYCACHE_DBUG_ASSERT(keycache->blocks_available <= keycache->blocks_used);
682 683
#endif
}
unknown's avatar
unknown committed
684

685 686

/*
687
  Unlink a block from the LRU chain
688
*/
689

unknown's avatar
unknown committed
690
static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
691 692 693
{
  if (block->next_used == block)
    /* The list contains only one member */
unknown's avatar
unknown committed
694
    keycache->used_last= NULL;
695
  else
696
  {
unknown's avatar
unknown committed
697 698 699 700
    block->next_used->prev_used= block->prev_used;
    *block->prev_used= block->next_used;
    if (keycache->used_last == block)
      keycache->used_last= STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
701
  }
unknown's avatar
unknown committed
702
  block->next_used= NULL;
703

704 705
  KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
706
  keycache->blocks_available--;
707
  KEYCACHE_DBUG_PRINT("unlink_block",
708 709
    ("unlinked block %u  status=%x   #requests=%u  #available=%u",
     BLOCK_NUMBER(block),block->status,
unknown's avatar
unknown committed
710 711
     block->requests, keycache->blocks_available));
  KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0);
712 713 714 715 716
#endif
}


/*
717
  Register requests for a block
718
*/
unknown's avatar
unknown committed
719
static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count)
720 721 722
{
  if (! block->requests)
    /* First request for the block unlinks it */
unknown's avatar
unknown committed
723
    unlink_block(keycache, block);
724 725 726 727
  block->requests+=count;
}


728 729 730
/*
  Unregister request for a block
  linking it to the LRU chain if it's the last request
731
*/
732

unknown's avatar
unknown committed
733 734
static inline void unreg_request(KEY_CACHE *keycache,
                                 BLOCK_LINK *block, int at_end)
735 736
{
  if (! --block->requests)
unknown's avatar
unknown committed
737
    link_block(keycache, block, (my_bool)at_end);
738 739 740
}

/*
741
  Remove a reader of the page in block
742
*/
743

744
static inline void remove_reader(BLOCK_LINK *block)
745
{
746 747 748 749 750 751
  if (! --block->hash_link->requests && block->condvar)
    keycache_pthread_cond_signal(block->condvar);
}


/*
752 753
  Wait until the last reader of the page in block
  signals on its termination
754
*/
unknown's avatar
unknown committed
755
static inline void wait_for_readers(KEY_CACHE *keycache, BLOCK_LINK *block)
756 757 758 759 760
{
  struct st_my_thread_var *thread=my_thread_var;
  while (block->hash_link->requests)
  {
    block->condvar=&thread->suspend;
unknown's avatar
unknown committed
761
    keycache_pthread_cond_wait(&thread->suspend, &keycache->thr_lock_keycache);
762 763 764 765 766 767
    block->condvar=NULL;
  }
}


/*
768
  Add a hash link to a bucket in the hash_table
769
*/
770

771 772 773
static inline void link_hash(HASH_LINK **start, HASH_LINK *hash_link)
{
  if (*start)
unknown's avatar
unknown committed
774 775 776 777
    (*start)->prev= &hash_link->next;
  hash_link->next= *start;
  hash_link->prev= start;
  *start= hash_link;
778 779 780 781
}


/*
782
  Remove a hash link from the hash table
783
*/
784

unknown's avatar
unknown committed
785
static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link)
786 787 788 789
{
  KEYCACHE_DBUG_PRINT("unlink_hash", ("file %u, filepos %lu #requests=%u",
      (uint) hash_link->file,(ulong) hash_link->diskpos, hash_link->requests));
  KEYCACHE_DBUG_ASSERT(hash_link->requests == 0);
unknown's avatar
unknown committed
790 791 792 793
  if ((*hash_link->prev= hash_link->next))
    hash_link->next->prev= hash_link->prev;
  hash_link->block= NULL;
  if (keycache->waiting_for_hash_link.last_thread)
794 795
  {
    /* Signal that A free hash link appeared */
unknown's avatar
unknown committed
796 797 798 799
    struct st_my_thread_var *last_thread=
                               keycache->waiting_for_hash_link.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
800 801 802
    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE *) (first_thread->opt_info);
    struct st_my_thread_var *thread;

unknown's avatar
unknown committed
803 804
    hash_link->file= first_page->file;
    hash_link->diskpos= first_page->filepos;
805 806 807
    do
    {
      KEYCACHE_PAGE *page;
unknown's avatar
unknown committed
808
      thread= next_thread;
809
      page= (KEYCACHE_PAGE *) thread->opt_info;
unknown's avatar
unknown committed
810
      next_thread= thread->next;
811
      /*
812 813 814 815 816 817
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if (page->file == hash_link->file && page->filepos == hash_link->diskpos)
      {
        keycache_pthread_cond_signal(&thread->suspend);
unknown's avatar
unknown committed
818
        unlink_from_queue(&keycache->waiting_for_hash_link, thread);
819 820 821
      }
    }
    while (thread != last_thread);
unknown's avatar
unknown committed
822 823 824
    link_hash(&keycache->hash_root[KEYCACHE_HASH(hash_link->file,
					         hash_link->diskpos)],
              hash_link);
825
    return;
826
  }
unknown's avatar
unknown committed
827 828
  hash_link->next= keycache->free_hash_list;
  keycache->free_hash_list= hash_link;
829 830
}

831

832
/*
833
  Get the hash link for a page
834
*/
835

unknown's avatar
unknown committed
836 837
static HASH_LINK *get_hash_link(KEY_CACHE *keycache, 
                                int file, my_off_t filepos)
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853
{
  reg1 HASH_LINK *hash_link, **start;
  KEYCACHE_PAGE page;
#if defined(KEYCACHE_DEBUG)
  int cnt;
#endif

  KEYCACHE_DBUG_PRINT("get_hash_link", ("file %u, filepos %lu",
                      (uint) file,(ulong) filepos));

restart:
  /*
     Find the bucket in the hash table for the pair (file, filepos);
     start contains the head of the bucket list,
     hash_link points to the first member of the list
  */
unknown's avatar
unknown committed
854
  hash_link= *(start= &keycache->hash_root[KEYCACHE_HASH(file, filepos)]);
855
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
856
  cnt= 0;
857 858 859 860 861 862 863 864
#endif
  /* Look for an element for the pair (file, filepos) in the bucket chain */
  while (hash_link &&
         (hash_link->diskpos != filepos || hash_link->file != file))
  {
    hash_link= hash_link->next;
#if defined(KEYCACHE_DEBUG)
    cnt++;
865
    if (! (cnt <= my_hash_links_used))
866 867
    {
      int i;
unknown's avatar
unknown committed
868 869
      for (i=0, hash_link= *start ;
           i < cnt ; i++, hash_link= hash_link->next)
870 871 872 873 874
      {
        KEYCACHE_DBUG_PRINT("get_hash_link", ("file %u, filepos %lu",
            (uint) hash_link->file,(ulong) hash_link->diskpos));
      }
    }
unknown's avatar
unknown committed
875
    KEYCACHE_DBUG_ASSERT(cnt <= keycache->hash_links_used);
876 877 878
#endif
  }
  if (! hash_link)
879 880
  {
    /* There is no hash link in the hash table for the pair (file, filepos) */
unknown's avatar
unknown committed
881
    if (keycache->free_hash_list)
882
    {
unknown's avatar
unknown committed
883 884
      hash_link= keycache->free_hash_list;
      keycache->free_hash_list=hash_link->next;
885
    }
unknown's avatar
unknown committed
886
    else if (keycache->hash_links_used < keycache->hash_links)
887
    {
unknown's avatar
unknown committed
888
      hash_link= &keycache->hash_link_root[keycache->hash_links_used++];
889 890
    }
    else
891 892
    {
      /* Wait for a free hash link */
unknown's avatar
unknown committed
893
      struct st_my_thread_var *thread= my_thread_var;
894 895 896
      KEYCACHE_DBUG_PRINT("get_hash_link", ("waiting"));
      page.file=file; page.filepos=filepos;
      thread->opt_info= (void *) &page;
unknown's avatar
unknown committed
897 898 899 900
      link_into_queue(&keycache->waiting_for_hash_link, thread);
      keycache_pthread_cond_wait(&thread->suspend,
                                 &keycache->thr_lock_keycache);
      thread->opt_info= NULL;
901 902
      goto restart;
    }
unknown's avatar
unknown committed
903 904
    hash_link->file= file;
    hash_link->diskpos= filepos;
905 906 907 908
    link_hash(start, hash_link);
  }
  /* Register the request for the page */
  hash_link->requests++;
909

910 911 912 913 914
  return hash_link;
}


/*
915 916 917 918 919
  Get a block for the file page requested by a keycache read/write operation;
  If the page is not in the cache return a free block, if there is none
  return the lru block after saving its buffer if the page is dirty
*/

unknown's avatar
unknown committed
920 921
static BLOCK_LINK *find_key_block(KEY_CACHE *keycache,
                                  int file, my_off_t filepos,
922 923 924 925
                                  int wrmode, int *page_st)
{
  HASH_LINK *hash_link;
  BLOCK_LINK *block;
unknown's avatar
unknown committed
926
  int error= 0;
927
  int page_status;
928

929 930 931 932 933 934 935
  DBUG_ENTER("find_key_block");
  KEYCACHE_THREAD_TRACE("find_key_block:begin");
  DBUG_PRINT("enter", ("file %u, filepos %lu, wrmode %lu",
               (uint) file,(ulong) filepos,(uint) wrmode));
  KEYCACHE_DBUG_PRINT("find_key_block", ("file %u, filepos %lu, wrmode %lu",
                      (uint) file,(ulong) filepos,(uint) wrmode));
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
936 937
  DBUG_EXECUTE("check_keycache2",
               test_key_cache(keycache, "start of find_key_block", 0););
unknown's avatar
unknown committed
938
#endif
939

940 941
restart:
  /* Find the hash link for the requested page (file, filepos) */
unknown's avatar
unknown committed
942
  hash_link= get_hash_link(keycache, file, filepos);
943

unknown's avatar
unknown committed
944 945
  page_status= -1;
  if ((block= hash_link->block) &&
946
      block->hash_link == hash_link && (block->status & BLOCK_READ))
unknown's avatar
unknown committed
947
    page_status= PAGE_READ;
948 949 950 951 952

  if (page_status == PAGE_READ && (block->status & BLOCK_IN_SWITCH))
  {
    /* This is a request for a page to be removed from cache */
    KEYCACHE_DBUG_PRINT("find_key_block",
953
             ("request for old page in block %u",BLOCK_NUMBER(block)));
954
    /*
955 956 957 958
       Only reading requests can proceed until the old dirty page is flushed,
       all others are to be suspended, then resubmitted
    */
    if (!wrmode && !(block->status & BLOCK_REASSIGNED))
unknown's avatar
unknown committed
959
      reg_requests(keycache, block,1);
960 961 962
    else
    {
      hash_link->requests--;
963
      KEYCACHE_DBUG_PRINT("find_key_block",
964 965 966 967 968 969 970 971
                          ("request waiting for old page to be saved"));
      {
        struct st_my_thread_var *thread=my_thread_var;
        /* Put the request into the queue of those waiting for the old page */
        add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
        /* Wait until the request can be resubmitted */
        do
        {
unknown's avatar
unknown committed
972 973
          keycache_pthread_cond_wait(&thread->suspend,
                                     &keycache->thr_lock_keycache);
974 975 976
        }
        while(thread->next);
      }
977
      KEYCACHE_DBUG_PRINT("find_key_block",
978 979 980 981 982 983
                          ("request for old page resubmitted"));
      /* Resubmit the request */
      goto restart;
    }
  }
  else
984 985
  {
    /* This is a request for a new page or for a page not to be removed */
986
    if (! block)
987 988
    {
      /* No block is assigned for the page yet */
unknown's avatar
unknown committed
989
      if (keycache->blocks_used < (uint) keycache->disk_blocks)
990 991
      {
	/* There are some never used blocks, take first of them */
unknown's avatar
unknown committed
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
        hash_link->block= block= &keycache->block_root[keycache->blocks_used];
        block->buffer= ADD_TO_PTR(keycache->block_mem,
                                  ((ulong) keycache->blocks_used*
                                   keycache->key_cache_block_size),
                                  byte*);
        block->status= 0;
        block->length= 0;
        block->offset= keycache->key_cache_block_size;
        block->requests= 1;
        keycache->blocks_used++;
        link_to_file_list(keycache, block, file, 0);
        block->hash_link= hash_link;
        page_status= PAGE_TO_BE_READ;
1005
        KEYCACHE_DBUG_PRINT("find_key_block",
unknown's avatar
unknown committed
1006
                            ("got never used block %u", BLOCK_NUMBER(block)));
1007 1008
      }
      else
1009 1010
      {
	/* There are no never used blocks, use a block from the LRU chain */
1011
        /*
1012
           Wait until a new block is added to the LRU chain;
1013 1014 1015
           several threads might wait here for the same page,
           all of them must get the same block
        */
1016

unknown's avatar
unknown committed
1017
        if (! keycache->used_last)
1018
        {
unknown's avatar
unknown committed
1019 1020 1021
          struct st_my_thread_var *thread= my_thread_var;
          thread->opt_info= (void *) hash_link;
          link_into_queue(&keycache->waiting_for_block, thread);
1022
          do
1023
          {
unknown's avatar
unknown committed
1024 1025
            keycache_pthread_cond_wait(&thread->suspend,
                                       &keycache->thr_lock_keycache);
1026 1027
          }
          while (thread->next);
unknown's avatar
unknown committed
1028
          thread->opt_info= NULL;
1029
        }
unknown's avatar
unknown committed
1030
        block= hash_link->block;
1031 1032
        if (! block)
        {
1033 1034
          /*
             Take the first block from the LRU chain
1035 1036
             unlinking it from the chain
          */
unknown's avatar
unknown committed
1037 1038 1039
          block= keycache->used_last->next_used;
          reg_requests(keycache, block,1);
          hash_link->block= block;
1040
        }
1041 1042 1043 1044 1045

        if (block->hash_link != hash_link &&
	    ! (block->status & BLOCK_IN_SWITCH) )
        {
	  /* this is a primary request for a new page */
1046
          block->status|=BLOCK_IN_SWITCH;
1047 1048

          KEYCACHE_DBUG_PRINT("find_key_block",
unknown's avatar
unknown committed
1049
                        ("got block %u for new page", BLOCK_NUMBER(block)));
1050

1051
          if (block->status & BLOCK_CHANGED)
1052 1053 1054
          {
	    /* The block contains a dirty page - push it out of the cache */

unknown's avatar
unknown committed
1055
            KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
1056

unknown's avatar
unknown committed
1057
            keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1058 1059 1060
            /*
	      The call is thread safe because only the current
	      thread might change the block->hash_link value
1061
            */
unknown's avatar
unknown committed
1062 1063
            error=my_pwrite(block->hash_link->file, block->buffer,
                            block->length, block->hash_link->diskpos,
1064
                            MYF(MY_NABP | MY_WAIT_IF_FULL));
unknown's avatar
unknown committed
1065 1066
            keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
            keycache->cache_write++;
1067
          }
1068

unknown's avatar
unknown committed
1069
          block->status|= BLOCK_REASSIGNED;
1070 1071
          if (block->hash_link)
          {
1072 1073 1074 1075 1076
            /*
	      Wait until all pending read requests
	      for this page are executed
	      (we could have avoided this waiting, if we had read
	      a page in the cache in a sweep, without yielding control)
1077
            */
unknown's avatar
unknown committed
1078
            wait_for_readers(keycache, block);
1079

1080
            /* Remove the hash link for this page from the hash table */
unknown's avatar
unknown committed
1081
            unlink_hash(keycache, block->hash_link);
1082 1083 1084 1085
            /* All pending requests for this page must be resubmitted */
            if (block->wqueue[COND_FOR_SAVED].last_thread)
              release_queue(&block->wqueue[COND_FOR_SAVED]);
          }
unknown's avatar
unknown committed
1086 1087 1088 1089 1090 1091 1092
          link_to_file_list(keycache, block, file,
                            (my_bool)(block->hash_link ? 1 : 0));
          block->status= error? BLOCK_ERROR : 0;
          block->length= 0;
          block->offset= keycache->key_cache_block_size;
          block->hash_link= hash_link;
          page_status= PAGE_TO_BE_READ;
1093

1094 1095 1096 1097 1098 1099
          KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
          KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
        }
        else
        {
          /* This is for secondary requests for a new page only */
unknown's avatar
unknown committed
1100 1101 1102
            page_status= block->hash_link == hash_link &&
                           (block->status & BLOCK_READ) ?
                              PAGE_READ : PAGE_WAIT_TO_BE_READ;
1103 1104
        }
      }
1105

unknown's avatar
unknown committed
1106
      keycache->cache_read++;
1107 1108 1109
    }
    else
    {
unknown's avatar
unknown committed
1110
      reg_requests(keycache, block, 1);
1111 1112 1113 1114 1115
      page_status = block->hash_link == hash_link &&
                    (block->status & BLOCK_READ) ?
                      PAGE_READ : PAGE_WAIT_TO_BE_READ;
    }
  }
1116

1117 1118
  KEYCACHE_DBUG_ASSERT(page_status != -1);
  *page_st=page_status;
unknown's avatar
unknown committed
1119 1120 1121
  KEYCACHE_DBUG_PRINT("find_key_block",
                      ("file %u, filepos %lu, page_status %lu",
                      (uint) file,(ulong) filepos,(uint) page_status));
1122

1123
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1124 1125
  DBUG_EXECUTE("check_keycache2",
               test_key_cache(keycache, "end of find_key_block",0););
1126 1127 1128 1129
#endif
  KEYCACHE_THREAD_TRACE("find_key_block:end");
  DBUG_RETURN(block);
}
unknown's avatar
unknown committed
1130 1131


1132
/*
1133 1134 1135
  Read into a key cache block buffer from disk;
  do not to report error when the size of successfully read
  portion is less than read_length, but not less than min_length
1136
*/
1137

unknown's avatar
unknown committed
1138 1139
static void read_block(KEY_CACHE *keycache,
                       BLOCK_LINK *block, uint read_length,
1140 1141 1142
                       uint min_length, my_bool primary)
{
  uint got_length;
1143

1144
  /* On entry THR_LOCK_keycache is locked */
1145

1146 1147
  KEYCACHE_THREAD_TRACE("read_block");
  if (primary)
1148 1149 1150 1151
  {
    /*
      This code is executed only by threads
      that submitted primary requests
1152
    */
1153 1154

    KEYCACHE_DBUG_PRINT("read_block",
1155
                        ("page to be read by primary request"));
1156

1157
    /* Page is not in buffer yet, is to be read from disk */
unknown's avatar
unknown committed
1158 1159 1160 1161
    keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
    got_length= my_pread(block->hash_link->file, block->buffer,
                         read_length, block->hash_link->diskpos, MYF(0));
    keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
1162
    if (got_length < min_length)
unknown's avatar
unknown committed
1163
      block->status|= BLOCK_ERROR;
1164 1165
    else
    {
unknown's avatar
unknown committed
1166 1167
      block->status= BLOCK_READ;
      block->length= got_length;
1168
    }
1169
    KEYCACHE_DBUG_PRINT("read_block",
1170 1171 1172 1173 1174
                        ("primary request: new page in cache"));
    /* Signal that all pending requests for this page now can be processed */
    if (block->wqueue[COND_FOR_REQUESTED].last_thread)
      release_queue(&block->wqueue[COND_FOR_REQUESTED]);
  }
1175 1176 1177 1178 1179
  else
  {
    /*
      This code is executed only by threads
      that submitted secondary requests
1180
    */
1181
    KEYCACHE_DBUG_PRINT("read_block",
1182 1183 1184
                      ("secondary request waiting for new page to be read"));
    {
      struct st_my_thread_var *thread=my_thread_var;
1185
      /* Put the request into a queue and wait until it can be processed */
unknown's avatar
unknown committed
1186
      add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
1187 1188
      do
      {
unknown's avatar
unknown committed
1189 1190
        keycache_pthread_cond_wait(&thread->suspend,
                                   &keycache->thr_lock_keycache);
1191 1192 1193
      }
      while (thread->next);
    }
1194
    KEYCACHE_DBUG_PRINT("read_block",
1195 1196 1197 1198 1199 1200
                        ("secondary request: new page in cache"));
  }
}


/*
1201 1202 1203 1204 1205 1206
  Read a block of data from a cached file into a buffer;
  if return_buffer is set then the cache buffer is returned if
  it can be used;
  filepos must be a multiple of 'block_length', but it doesn't
  have to be a multiple of key_cache_block_size;
  returns adress from where data is read
1207
*/
unknown's avatar
unknown committed
1208

unknown's avatar
unknown committed
1209 1210
byte *key_cache_read(void *pkeycache,
                     File file, my_off_t filepos, byte *buff, uint length,
1211 1212
		     uint block_length __attribute__((unused)),
		     int return_buffer __attribute__((unused)))
unknown's avatar
unknown committed
1213 1214
{
  int error=0;
unknown's avatar
unknown committed
1215
  KEY_CACHE *keycache= (KEY_CACHE *) pkeycache;
unknown's avatar
unknown committed
1216 1217
  DBUG_ENTER("key_cache_read");
  DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
1218
               (uint) file,(ulong) filepos,length));
1219

unknown's avatar
unknown committed
1220
  if (keycache->disk_blocks > 0)
1221 1222
  {
    /* Key cache is used */
1223
    reg1 BLOCK_LINK *block;
unknown's avatar
unknown committed
1224 1225
    uint offset= (uint) (filepos & (keycache->key_cache_block_size-1));
    byte *start= buff;
unknown's avatar
unknown committed
1226
    uint read_length;
1227 1228
    uint status;
    int page_st;
1229

1230
#ifndef THREAD
unknown's avatar
unknown committed
1231
    if (block_length > keycache->key_cache_block_size || offset)
1232 1233
      return_buffer=0;
#endif
1234

1235 1236
    /* Read data in key_cache_block_size increments */
    filepos-= offset;
unknown's avatar
unknown committed
1237 1238
    do
    {
unknown's avatar
unknown committed
1239 1240
      read_length= length > keycache->key_cache_block_size ?
                   keycache->key_cache_block_size : length;
1241
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1242 1243 1244
      keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
      keycache->cache_r_requests++;
      block=find_key_block(keycache, file, filepos, 0, &page_st);
unknown's avatar
unknown committed
1245
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
unknown's avatar
unknown committed
1246
      {
1247
        /* The requested page is to be read into the block buffer */
unknown's avatar
unknown committed
1248 1249
        read_block(keycache, block, 
                   keycache->key_cache_block_size, read_length+offset,
1250
                   (my_bool)(page_st == PAGE_TO_BE_READ));
unknown's avatar
unknown committed
1251
      }
1252 1253
      else if (! (block->status & BLOCK_ERROR) &&
               block->length < read_length + offset)
1254 1255 1256 1257 1258
      {
        /*
           Impossible if nothing goes wrong:
           this could only happen if we are using a file with
           small key blocks and are trying to read outside the file
1259
        */
unknown's avatar
unknown committed
1260 1261
        my_errno= -1;
        block->status|= BLOCK_ERROR;
unknown's avatar
unknown committed
1262
      }
1263

unknown's avatar
unknown committed
1264
      if (! ((status= block->status) & BLOCK_ERROR))
unknown's avatar
unknown committed
1265
      {
1266
#ifndef THREAD
1267
        if (! return_buffer)
1268 1269 1270
#endif
        {
#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1271
          keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1272
#endif
1273

1274 1275
          /* Copy data from the cache buffer */
          if (!(read_length & 511))
unknown's avatar
unknown committed
1276
            bmove512(buff, block->buffer+offset, read_length);
1277
          else
unknown's avatar
unknown committed
1278
            memcpy(buff, block->buffer+offset, (size_t) read_length);
1279 1280

#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1281
          keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
1282 1283
#endif
        }
unknown's avatar
unknown committed
1284
      }
1285

1286
      remove_reader(block);
1287 1288 1289
      /*
         Link the block into the LRU chain
         if it's the last submitted request for the block
1290
      */
unknown's avatar
unknown committed
1291
      unreg_request(keycache, block,1);
1292

unknown's avatar
unknown committed
1293
      keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1294

1295 1296
      if (status & BLOCK_ERROR)
        DBUG_RETURN((byte *) 0);
1297

1298 1299 1300
#ifndef THREAD
      if (return_buffer)
          return (block->buffer);
unknown's avatar
unknown committed
1301
#endif
1302

unknown's avatar
unknown committed
1303 1304 1305
      buff+= read_length;
      filepos+= read_length;
      offset= 0;
1306

unknown's avatar
unknown committed
1307
    } while ((length-= read_length));
unknown's avatar
unknown committed
1308
    DBUG_RETURN(start);
unknown's avatar
unknown committed
1309
  }
1310

1311
  /* Key cache is not used */
unknown's avatar
unknown committed
1312 1313 1314 1315 1316 1317
  statistic_increment(keycache->cache_r_requests,
                      &keycache->thr_lock_keycache);
  statistic_increment(keycache->cache_read,
                      &keycache->thr_lock_keycache);
  if (my_pread(file, (byte*) buff, length, filepos, MYF(MY_NABP)))
    error= 1;
1318 1319
  DBUG_RETURN(error? (byte*) 0 : buff);
}
unknown's avatar
unknown committed
1320 1321


unknown's avatar
unknown committed
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
/*
  Insert a block of file data from a buffer into key cache

  SYNOPSIS
    key_cache_insert()
      file      file descriptor
      filepos   file offset of the data from the buffer 
      buff      buffer with data to insert into key cache
      length    length of the data in the buffer

  RETURN VALUE
    0 if a success, 1 -otherwise.
*/

unknown's avatar
unknown committed
1336 1337
int key_cache_insert(void *pkeycache,
                     File file, my_off_t filepos, byte *buff, uint length)
unknown's avatar
unknown committed
1338
{
unknown's avatar
unknown committed
1339
  KEY_CACHE *keycache= (KEY_CACHE *) pkeycache;
unknown's avatar
unknown committed
1340 1341 1342 1343
  DBUG_ENTER("key_cache_insert");
  DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
               (uint) file,(ulong) filepos, length));

unknown's avatar
unknown committed
1344
  if (keycache->disk_blocks > 0)
unknown's avatar
unknown committed
1345 1346 1347
  {
    /* Key cache is used */
    reg1 BLOCK_LINK *block;
unknown's avatar
unknown committed
1348
    uint offset= (uint) (filepos & (keycache->key_cache_block_size-1));
unknown's avatar
unknown committed
1349 1350 1351 1352 1353 1354 1355
    uint read_length;
    int page_st;

    /* Read data into key cache from buff in key_cache_block_size increments */
    filepos-= offset;
    do
    {
unknown's avatar
unknown committed
1356 1357
      read_length= length > keycache->key_cache_block_size ?
                   keycache->key_cache_block_size : length;
unknown's avatar
unknown committed
1358
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1359 1360 1361
      keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
      keycache->cache_r_requests++;
      block= find_key_block(keycache, file, filepos, 0, &page_st);
unknown's avatar
unknown committed
1362 1363 1364 1365
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
      {
        /* The requested page is to be read into the block buffer */
#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1366
        keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
unknown's avatar
unknown committed
1367 1368 1369 1370 1371 1372 1373 1374 1375
#endif

        /* Copy data from buff */
        if (!(read_length & 511))
          bmove512(block->buffer+offset, buff, read_length);
        else
          memcpy(block->buffer+offset, buff, (size_t) read_length);

#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1376
        keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
unknown's avatar
unknown committed
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
#endif
        block->status= BLOCK_READ;
        block->length= read_length+offset;
      }

      remove_reader(block);
      /*
         Link the block into the LRU chain
         if it's the last submitted request for the block
      */
unknown's avatar
unknown committed
1387
      unreg_request(keycache, block,1);
unknown's avatar
unknown committed
1388

unknown's avatar
unknown committed
1389
      keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
unknown's avatar
unknown committed
1390 1391 1392 1393

      if (block->status & BLOCK_ERROR)
        DBUG_RETURN(1);

unknown's avatar
unknown committed
1394 1395
      buff+= read_length;
      filepos+= read_length;
unknown's avatar
unknown committed
1396 1397 1398 1399 1400 1401 1402 1403
      offset=0;

    } while ((length-= read_length));
  }
  DBUG_RETURN(0);
}


1404
/*
1405 1406 1407 1408 1409
  Write a buffer into disk;
  filepos must be a multiple of 'block_length', but it doesn't
  have to be a multiple of key cache block size;
  if !dont_write then all dirty pages involved in writing should
  have been flushed from key cache before the function starts
1410
*/
1411

unknown's avatar
unknown committed
1412 1413
int key_cache_write(void *pkeycache,
                    File file, my_off_t filepos, byte *buff, uint length,
1414 1415
                    uint block_length  __attribute__((unused)),
                    int dont_write)
unknown's avatar
unknown committed
1416
{
1417
  reg1 BLOCK_LINK *block;
unknown's avatar
unknown committed
1418
  int error=0;
unknown's avatar
unknown committed
1419
  KEY_CACHE *keycache= (KEY_CACHE *) pkeycache;
1420

unknown's avatar
unknown committed
1421
  DBUG_ENTER("key_cache_write");
1422
  DBUG_PRINT("enter", ("file %u, filepos %lu, length %u block_length %u",
unknown's avatar
unknown committed
1423
               (uint) file, (ulong) filepos, length, block_length));
unknown's avatar
unknown committed
1424 1425

  if (!dont_write)
1426 1427
  {
    /* Force writing from buff into disk */
unknown's avatar
unknown committed
1428 1429 1430
    statistic_increment(keycache->cache_write,
                        &keycache->thr_lock_keycache);
    if (my_pwrite(file, buff, length, filepos, MYF(MY_NABP | MY_WAIT_IF_FULL)))
unknown's avatar
unknown committed
1431
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1432
  }
1433

unknown's avatar
unknown committed
1434
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1435 1436
  DBUG_EXECUTE("check_keycache",
               test_key_cache(keycache, "start of key_cache_write", 1););
unknown's avatar
unknown committed
1437
#endif
1438

unknown's avatar
unknown committed
1439
  if (keycache->disk_blocks > 0)
1440 1441
  {
    /* Key cache is used */
unknown's avatar
unknown committed
1442
    uint read_length;
unknown's avatar
unknown committed
1443
    uint offset= (uint) (filepos & (keycache->key_cache_block_size-1));
1444
    int page_st;
1445

1446 1447
    /* Write data in key_cache_block_size increments */
    filepos-= offset;
unknown's avatar
unknown committed
1448 1449
    do
    {
unknown's avatar
unknown committed
1450 1451
      read_length= length > keycache->key_cache_block_size ?
                   keycache->key_cache_block_size : length;
1452
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1453 1454 1455
      keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
      keycache->cache_w_requests++;
      block= find_key_block(keycache, file, filepos, 1, &page_st);
unknown's avatar
unknown committed
1456
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ &&
unknown's avatar
unknown committed
1457 1458 1459 1460
          (offset || read_length < keycache->key_cache_block_size))
        read_block(keycache, block,
                   offset + read_length >= keycache->key_cache_block_size?
                   offset : keycache->key_cache_block_size,
1461
                   offset,(my_bool)(page_st == PAGE_TO_BE_READ));
1462

1463
      if (!dont_write)
1464 1465 1466
      {
	/* buff has been written to disk at start */
        if ((block->status & BLOCK_CHANGED) &&
unknown's avatar
unknown committed
1467 1468
            (!offset && read_length >= keycache->key_cache_block_size))
             link_to_file_list(keycache, block, block->hash_link->file, 1);
1469 1470
      }
      else if (! (block->status & BLOCK_CHANGED))
unknown's avatar
unknown committed
1471
        link_to_changed_list(keycache, block);
1472

unknown's avatar
unknown committed
1473 1474
      set_if_smaller(block->offset, offset)
      set_if_bigger(block->length, read_length+offset);
1475

1476
      if (! (block->status & BLOCK_ERROR))
unknown's avatar
unknown committed
1477
      {
1478
        if (!(read_length & 511))
unknown's avatar
unknown committed
1479
             bmove512(block->buffer+offset, buff, read_length);
1480
        else
unknown's avatar
unknown committed
1481
          memcpy(block->buffer+offset, buff, (size_t) read_length);
unknown's avatar
unknown committed
1482
      }
1483 1484 1485

      block->status|=BLOCK_READ;

1486 1487
      /* Unregister the request */
      block->hash_link->requests--;
unknown's avatar
unknown committed
1488
      unreg_request(keycache, block, 1);
1489

1490 1491
      if (block->status & BLOCK_ERROR)
      {
unknown's avatar
unknown committed
1492 1493
        keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
        error= 1;
1494 1495
        break;
      }
1496

unknown's avatar
unknown committed
1497
      keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1498

unknown's avatar
unknown committed
1499 1500 1501
      buff+= read_length;
      filepos+= read_length;
      offset= 0;
1502

unknown's avatar
unknown committed
1503 1504 1505
    } while ((length-= read_length));
  }
  else
1506 1507 1508 1509
  {
    /* Key cache is not used */
    if (dont_write)
    {
unknown's avatar
unknown committed
1510 1511 1512 1513 1514 1515
      statistic_increment(keycache->cache_w_requests,
                          &keycache->thr_lock_keycache);
      statistic_increment(keycache->cache_write, 
                          &keycache->thr_lock_keycache);
      if (my_pwrite(file, (byte*) buff, length, filepos, 
                    MYF(MY_NABP | MY_WAIT_IF_FULL)))
1516
        error=1;
unknown's avatar
unknown committed
1517
    }
1518
  }
unknown's avatar
unknown committed
1519 1520

#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1521 1522
  DBUG_EXECUTE("exec",
               test_key_cache(keycache, "end of key_cache_write", 1););
unknown's avatar
unknown committed
1523
#endif
1524 1525
  DBUG_RETURN(error);
}
unknown's avatar
unknown committed
1526 1527


1528 1529 1530 1531 1532 1533
/*
  Free block: remove reference to it from hash table,
  remove it from the chain file of dirty/clean blocks
  and add it at the beginning of the LRU chain
*/

unknown's avatar
unknown committed
1534
static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block)
unknown's avatar
unknown committed
1535
{
1536
  KEYCACHE_THREAD_TRACE("free block");
1537 1538 1539
  KEYCACHE_DBUG_PRINT("free_block",
                      ("block %u to be freed",BLOCK_NUMBER(block)));
  if (block->hash_link)
unknown's avatar
unknown committed
1540
  {
unknown's avatar
unknown committed
1541 1542 1543
    block->status|= BLOCK_REASSIGNED;
    wait_for_readers(keycache, block);
    unlink_hash(keycache, block->hash_link);
unknown's avatar
unknown committed
1544
  }
1545

1546
  unlink_changed(block);
unknown's avatar
unknown committed
1547 1548 1549
  block->status= 0;
  block->length= 0;
  block->offset= keycache->key_cache_block_size;
1550
  KEYCACHE_THREAD_TRACE("free block");
1551
  KEYCACHE_DBUG_PRINT("free_block",
1552
                      ("block is freed"));
unknown's avatar
unknown committed
1553 1554
  unreg_request(keycache, block, 0);
  block->hash_link= NULL;
unknown's avatar
unknown committed
1555 1556 1557
}


1558
static int cmp_sec_link(BLOCK_LINK **a, BLOCK_LINK **b)
unknown's avatar
unknown committed
1559
{
1560 1561
  return (((*a)->hash_link->diskpos < (*b)->hash_link->diskpos) ? -1 :
      ((*a)->hash_link->diskpos > (*b)->hash_link->diskpos) ? 1 : 0);
unknown's avatar
unknown committed
1562 1563
}

unknown's avatar
unknown committed
1564

1565 1566 1567
/*
  Flush a portion of changed blocks to disk,
  free used blocks if requested
1568
*/
1569

unknown's avatar
unknown committed
1570 1571
static int flush_cached_blocks(KEY_CACHE *keycache,
                               File file, BLOCK_LINK **cache,
1572 1573
                               BLOCK_LINK **end,
                               enum flush_type type)
unknown's avatar
unknown committed
1574
{
1575
  int error;
unknown's avatar
unknown committed
1576 1577
  int last_errno= 0;
  uint count= end-cache;
1578

1579
  /* Don't lock the cache during the flush */
unknown's avatar
unknown committed
1580
  keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1581 1582 1583
  /*
     As all blocks referred in 'cache' are marked by BLOCK_IN_FLUSH
     we are guarunteed no thread will change them
1584
  */
unknown's avatar
unknown committed
1585
  qsort((byte*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
1586

unknown's avatar
unknown committed
1587
  keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
1588
  for ( ; cache != end ; cache++)
unknown's avatar
unknown committed
1589
  {
1590
    BLOCK_LINK *block= *cache;
1591 1592

    KEYCACHE_DBUG_PRINT("flush_cached_blocks",
1593
                        ("block %u to be flushed", BLOCK_NUMBER(block)));
unknown's avatar
unknown committed
1594 1595 1596 1597 1598 1599
    keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
    error= my_pwrite(file, block->buffer+block->offset, block->length,
                     block->hash_link->diskpos, 
                     MYF(MY_NABP | MY_WAIT_IF_FULL));
    keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
    keycache->cache_write++;
1600
    if (error)
unknown's avatar
unknown committed
1601
    {
1602
      block->status|= BLOCK_ERROR;
unknown's avatar
unknown committed
1603
      if (!last_errno)
unknown's avatar
unknown committed
1604
        last_errno= errno ? errno : -1;
1605 1606 1607 1608
    }
    /* type will never be FLUSH_IGNORE_CHANGED here */
    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
    {
unknown's avatar
unknown committed
1609 1610
      keycache->blocks_changed--;
      free_block(keycache, block);
1611
    }
1612
    else
1613
    {
unknown's avatar
unknown committed
1614 1615 1616
      block->status&= ~BLOCK_IN_FLUSH;
      link_to_file_list(keycache, block, file, 1);
      unreg_request(keycache, block, 1);
unknown's avatar
unknown committed
1617
    }
1618

unknown's avatar
unknown committed
1619 1620 1621 1622 1623
  }
  return last_errno;
}


1624
/*
1625
  Flush all blocks for a file to disk
1626
*/
1627

unknown's avatar
unknown committed
1628 1629
int flush_key_blocks(void *pkeycache,
                     File file, enum flush_type type)
unknown's avatar
unknown committed
1630
{
1631
  BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
unknown's avatar
unknown committed
1632 1633
  int last_errno= 0;
  KEY_CACHE *keycache= (KEY_CACHE *) pkeycache;
1634
  DBUG_ENTER("flush_key_blocks");
unknown's avatar
unknown committed
1635
  DBUG_PRINT("enter",("file: %d  blocks_used: %d  blocks_changed: %d",
unknown's avatar
unknown committed
1636
              file, keycache->blocks_used, keycache->blocks_changed));
1637

unknown's avatar
unknown committed
1638
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1639 1640
    DBUG_EXECUTE("check_keycache",
                 test_key_cache(keycache, "start of flush_key_blocks", 0););
unknown's avatar
unknown committed
1641
#endif
1642

unknown's avatar
unknown committed
1643
  keycache_pthread_mutex_lock(&keycache->thr_lock_keycache);
1644

unknown's avatar
unknown committed
1645 1646
  cache= cache_buff;
  if (keycache->disk_blocks > 0 &&
1647
      (!my_disable_flush_key_blocks || type != FLUSH_KEEP))
1648 1649
  {
    /* Key cache exists and flush is not disabled */
unknown's avatar
unknown committed
1650 1651
    int error= 0;
    uint count= 0;
1652
    BLOCK_LINK **pos,**end;
unknown's avatar
unknown committed
1653
    BLOCK_LINK *first_in_switch= NULL;
1654 1655 1656 1657
    BLOCK_LINK *block, *next;
#if defined(KEYCACHE_DEBUG)
    uint cnt=0;
#endif
1658

unknown's avatar
unknown committed
1659 1660
    if (type != FLUSH_IGNORE_CHANGED)
    {
1661
      /*
1662 1663 1664
         Count how many key blocks we have to cache to be able
         to flush all dirty pages with minimum seek moves
      */
unknown's avatar
unknown committed
1665
      for (block= keycache->changed_blocks[FILE_HASH(file)] ;
1666
           block ;
unknown's avatar
unknown committed
1667
           block= block->next_changed)
unknown's avatar
unknown committed
1668
      {
1669
        if (block->hash_link->file == file)
1670
        {
1671
          count++;
unknown's avatar
unknown committed
1672
          KEYCACHE_DBUG_ASSERT(count<= keycache->blocks_used);
1673
        }
unknown's avatar
unknown committed
1674
      }
1675
      /* Allocate a new buffer only if its bigger than the one we have */
1676
      if (count > FLUSH_CACHE &&
unknown's avatar
unknown committed
1677 1678
          !(cache= (BLOCK_LINK**) my_malloc(sizeof(BLOCK_LINK*)*count,
                                            MYF(0))))
1679
      {
unknown's avatar
unknown committed
1680 1681
        cache= cache_buff;
        count= FLUSH_CACHE;
1682
      }
unknown's avatar
unknown committed
1683
    }
1684

1685 1686
    /* Retrieve the blocks and write them to a buffer to be flushed */
restart:
unknown's avatar
unknown committed
1687 1688
    end= (pos= cache)+count;
    for (block= keycache->changed_blocks[FILE_HASH(file)] ;
1689
         block ;
unknown's avatar
unknown committed
1690
         block= next)
unknown's avatar
unknown committed
1691
    {
1692 1693
#if defined(KEYCACHE_DEBUG)
      cnt++;
unknown's avatar
unknown committed
1694
      KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
1695
#endif
unknown's avatar
unknown committed
1696
      next= block->next_changed;
1697
      if (block->hash_link->file == file)
unknown's avatar
unknown committed
1698
      {
1699
        /*
1700 1701 1702 1703 1704
           Mark the block with BLOCK_IN_FLUSH in order not to let
           other threads to use it for new pages and interfere with
           our sequence ot flushing dirty file pages
        */
        block->status|= BLOCK_IN_FLUSH;
1705

1706
        if (! (block->status & BLOCK_IN_SWITCH))
1707 1708 1709 1710 1711
        {
	  /*
	    We care only for the blocks for which flushing was not
	    initiated by other threads as a result of page swapping
          */
unknown's avatar
unknown committed
1712
          reg_requests(keycache, block, 1);
1713 1714 1715
          if (type != FLUSH_IGNORE_CHANGED)
          {
	    /* It's not a temporary file */
1716
            if (pos == end)
1717 1718 1719 1720
            {
	      /*
		This happens only if there is not enough
		memory for the big block
1721
              */
unknown's avatar
unknown committed
1722 1723
              if ((error= flush_cached_blocks(keycache, file, cache, 
                                              end,type)))
1724 1725 1726 1727 1728
                last_errno=error;
              /*
		Restart the scan as some other thread might have changed
		the changed blocks chain: the blocks that were in switch
		state before the flush started have to be excluded
1729 1730 1731
              */
              goto restart;
            }
unknown's avatar
unknown committed
1732
            *pos++= block;
1733 1734 1735 1736
          }
          else
          {
            /* It's a temporary file */
unknown's avatar
unknown committed
1737 1738
            keycache->blocks_changed--;
            free_block(keycache, block);
1739 1740 1741
          }
        }
        else
1742 1743
        {
	  /* Link the block into a list of blocks 'in switch' */
1744
          unlink_changed(block);
unknown's avatar
unknown committed
1745
          link_changed(block, &first_in_switch);
1746
        }
unknown's avatar
unknown committed
1747 1748 1749 1750
      }
    }
    if (pos != cache)
    {
unknown's avatar
unknown committed
1751 1752
      if ((error= flush_cached_blocks(keycache, file, cache, pos, type)))
        last_errno= error;
1753 1754 1755 1756 1757
    }
    /* Wait until list of blocks in switch is empty */
    while (first_in_switch)
    {
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
1758
      cnt= 0;
1759
#endif
unknown's avatar
unknown committed
1760
      block= first_in_switch;
1761
      {
unknown's avatar
unknown committed
1762
        struct st_my_thread_var *thread= my_thread_var;
1763 1764 1765
        add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
        do
        {
unknown's avatar
unknown committed
1766 1767
          keycache_pthread_cond_wait(&thread->suspend,
                                     &keycache->thr_lock_keycache);
1768 1769 1770 1771 1772
        }
        while (thread->next);
      }
#if defined(KEYCACHE_DEBUG)
      cnt++;
unknown's avatar
unknown committed
1773
      KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
1774
#endif
unknown's avatar
unknown committed
1775 1776
    }
    /* The following happens very seldom */
1777
    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
unknown's avatar
unknown committed
1778
    {
1779 1780 1781
#if defined(KEYCACHE_DEBUG)
      cnt=0;
#endif
unknown's avatar
unknown committed
1782
      for (block= keycache->file_blocks[FILE_HASH(file)] ;
1783
           block ;
unknown's avatar
unknown committed
1784
           block= next)
unknown's avatar
unknown committed
1785
      {
1786 1787
#if defined(KEYCACHE_DEBUG)
        cnt++;
unknown's avatar
unknown committed
1788
        KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
1789
#endif
unknown's avatar
unknown committed
1790
        next= block->next_changed;
1791 1792 1793 1794
        if (block->hash_link->file == file &&
            (! (block->status & BLOCK_CHANGED)
             || type == FLUSH_IGNORE_CHANGED))
        {
unknown's avatar
unknown committed
1795 1796
          reg_requests(keycache, block, 1);
          free_block(keycache, block);
1797
        }
unknown's avatar
unknown committed
1798 1799
      }
    }
1800
  }
1801

unknown's avatar
unknown committed
1802
  keycache_pthread_mutex_unlock(&keycache->thr_lock_keycache);
1803

unknown's avatar
unknown committed
1804
#ifndef DBUG_OFF
1805
    DBUG_EXECUTE("check_keycache",
unknown's avatar
unknown committed
1806
                 test_key_cache(keycache, "end of flush_key_blocks", 0););
unknown's avatar
unknown committed
1807 1808
#endif
  if (cache != cache_buff)
unknown's avatar
unknown committed
1809
    my_free((gptr) cache, MYF(0));
unknown's avatar
unknown committed
1810
  if (last_errno)
1811
    errno=last_errno;                /* Return first error */
unknown's avatar
unknown committed
1812
  DBUG_RETURN(last_errno != 0);
unknown's avatar
unknown committed
1813 1814 1815
}


1816 1817
/*
  Flush all blocks in the key cache to disk
1818
*/
1819

unknown's avatar
unknown committed
1820
static int flush_all_key_blocks(KEY_CACHE *keycache)
1821 1822 1823 1824
{
#if defined(KEYCACHE_DEBUG)
  uint cnt=0;
#endif
unknown's avatar
unknown committed
1825
  while (keycache->blocks_changed > 0)
1826 1827
  {
    BLOCK_LINK *block;
unknown's avatar
unknown committed
1828
    for (block= keycache->used_last->next_used ; ; block=block->next_used)
1829 1830 1831 1832 1833
    {
      if (block->hash_link)
      {
#if defined(KEYCACHE_DEBUG)
        cnt++;
unknown's avatar
unknown committed
1834
        KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
1835
#endif
unknown's avatar
unknown committed
1836
        if (flush_key_blocks(keycache, block->hash_link->file, FLUSH_RELEASE))
1837 1838 1839
          return 1;
        break;
      }
unknown's avatar
unknown committed
1840
      if (block == keycache->used_last)
1841 1842 1843 1844 1845
        break;
    }
  }
  return 0;
}
unknown's avatar
unknown committed
1846 1847


1848 1849
#ifndef DBUG_OFF
/*
1850
  Test if disk-cache is ok
unknown's avatar
unknown committed
1851
*/
unknown's avatar
unknown committed
1852 1853
static void test_key_cache(KEY_CACHE *keycache,
                           const char *where __attribute__((unused)),
1854 1855 1856
                           my_bool lock __attribute__((unused)))
{
  /* TODO */
1857
}
1858
#endif
unknown's avatar
unknown committed
1859

1860 1861 1862 1863 1864 1865
#if defined(KEYCACHE_TIMEOUT)

#define KEYCACHE_DUMP_FILE  "keycache_dump.txt"
#define MAX_QUEUE_LEN  100


unknown's avatar
unknown committed
1866
static void keycache_dump(KEY_CACHE *keycache)
unknown's avatar
unknown committed
1867
{
1868
  FILE *keycache_dump_file=fopen(KEYCACHE_DUMP_FILE, "w");
unknown's avatar
unknown committed
1869
  struct st_my_thread_var *thread_var= my_thread_var;
1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893
  struct st_my_thread_var *last;
  struct st_my_thread_var *thread;
  BLOCK_LINK *block;
  HASH_LINK *hash_link;
  KEYCACHE_PAGE *page;
  uint i;

  fprintf(keycache_dump_file, "thread:%u\n", thread->id);

  i=0;
  thread=last=waiting_for_hash_link.last_thread;
  fprintf(keycache_dump_file, "queue of threads waiting for hash link\n");
  if (thread)
    do
    {
      thread=thread->next;
      page= (KEYCACHE_PAGE *) thread->opt_info;
      fprintf(keycache_dump_file,
              "thread:%u, (file,filepos)=(%u,%lu)\n",
              thread->id,(uint) page->file,(ulong) page->filepos);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);
1894

1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911
  i=0;
  thread=last=waiting_for_block.last_thread;
  fprintf(keycache_dump_file, "queue of threads waiting for block\n");
  if (thread)
    do
    {
      thread=thread->next;
      hash_link= (HASH_LINK *) thread->opt_info;
      fprintf(keycache_dump_file,
        "thread:%u hash_link:%u (file,filepos)=(%u,%lu)\n",
        thread->id, (uint) HASH_LINK_NUMBER(hash_link),
        (uint) hash_link->file,(ulong) hash_link->diskpos);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);

unknown's avatar
unknown committed
1912
  for (i=0 ; i< keycache->blocks_used ; i++)
1913 1914
  {
    int j;
unknown's avatar
unknown committed
1915
    block= &keycache->block_root[i];
1916
    hash_link= block->hash_link;
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
    fprintf(keycache_dump_file,
            "block:%u hash_link:%d status:%x #requests=%u waiting_for_readers:%d\n",
            i, (int) (hash_link ? HASH_LINK_NUMBER(hash_link) : -1),
            block->status, block->requests, block->condvar ? 1 : 0);
    for (j=0 ; j < 2; j++)
    {
      KEYCACHE_WQUEUE *wqueue=&block->wqueue[j];
      thread=last=wqueue->last_thread;
      fprintf(keycache_dump_file, "queue #%d\n", j);
      if (thread)
1927
      {
1928 1929 1930 1931 1932 1933 1934 1935 1936
        do
        {
          thread=thread->next;
          fprintf(keycache_dump_file,
                  "thread:%u\n", thread->id);
          if (++i == MAX_QUEUE_LEN)
            break;
        }
        while (thread != last);
1937
      }
1938 1939 1940
    }
  }
  fprintf(keycache_dump_file, "LRU chain:");
unknown's avatar
unknown committed
1941
  block= keycache= used_last;
1942
  if (block)
1943
  {
1944 1945
    do
    {
unknown's avatar
unknown committed
1946
      block= block->next_used;
1947 1948 1949
      fprintf(keycache_dump_file,
              "block:%u, ", BLOCK_NUMBER(block));
    }
unknown's avatar
unknown committed
1950
    while (block != keycache->used_last);
1951
  }
1952
  fprintf(keycache_dump_file, "\n");
1953

1954
  fclose(keycache_dump_file);
unknown's avatar
unknown committed
1955 1956
}

1957
#endif /* defined(KEYCACHE_TIMEOUT) */
unknown's avatar
unknown committed
1958

1959
#if defined(KEYCACHE_TIMEOUT) && !defined(__WIN__)
unknown's avatar
unknown committed
1960 1961


1962
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
1963 1964 1965 1966 1967 1968 1969 1970 1971
                                      pthread_mutex_t *mutex)
{
  int rc;
  struct timeval  now;            /* time when we started waiting        */
  struct timespec timeout;        /* timeout value for the wait function */
  struct timezone tz;
#if defined(KEYCACHE_DEBUG)
  int cnt=0;
#endif
1972 1973

  /* Get current time */
1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990
  gettimeofday(&now, &tz);
  /* Prepare timeout value */
  timeout.tv_sec = now.tv_sec + KEYCACHE_TIMEOUT;
  timeout.tv_nsec = now.tv_usec * 1000; /* timeval uses microseconds.         */
                                        /* timespec uses nanoseconds.         */
                                        /* 1 nanosecond = 1000 micro seconds. */
  KEYCACHE_THREAD_TRACE_END("started waiting");
#if defined(KEYCACHE_DEBUG)
  cnt++;
  if (cnt % 100 == 0)
    fprintf(keycache_debug_log, "waiting...\n");
    fflush(keycache_debug_log);
#endif
  rc = pthread_cond_timedwait(cond, mutex, &timeout);
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
#if defined(KEYCACHE_DEBUG)
  if (rc == ETIMEDOUT)
1991
  {
1992 1993 1994 1995 1996
    fprintf(keycache_debug_log,"aborted by keycache timeout\n");
    fclose(keycache_debug_log);
    abort();
  }
#endif
1997

1998 1999
  if (rc == ETIMEDOUT)
    keycache_dump();
2000

2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020
#if defined(KEYCACHE_DEBUG)
  KEYCACHE_DBUG_ASSERT(rc != ETIMEDOUT);
#else
  assert(rc != ETIMEDOUT);
#endif
  return rc;
}
#else
#if defined(KEYCACHE_DEBUG)
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex)
{
  int rc;
  KEYCACHE_THREAD_TRACE_END("started waiting");
  rc = pthread_cond_wait(cond, mutex);
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
  return rc;
}
#endif
#endif /* defined(KEYCACHE_TIMEOUT) && !defined(__WIN__) */
unknown's avatar
unknown committed
2021

2022
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
2023 2024


2025
static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex)
unknown's avatar
unknown committed
2026
{
2027 2028 2029 2030
  int rc;
  rc=pthread_mutex_lock(mutex);
  KEYCACHE_THREAD_TRACE_BEGIN("");
  return rc;
unknown's avatar
unknown committed
2031
}
unknown's avatar
unknown committed
2032 2033


2034 2035 2036 2037 2038
static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex)
{
  KEYCACHE_THREAD_TRACE_END("");
  pthread_mutex_unlock(mutex);
}
unknown's avatar
unknown committed
2039 2040


2041
static int keycache_pthread_cond_signal(pthread_cond_t *cond)
unknown's avatar
unknown committed
2042
{
2043 2044 2045 2046 2047
  int rc;
  KEYCACHE_THREAD_TRACE("signal");
  rc=pthread_cond_signal(cond);
  return rc;
}
unknown's avatar
unknown committed
2048 2049


2050 2051 2052 2053 2054 2055 2056
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond)
{
  int rc;
  KEYCACHE_THREAD_TRACE("signal");
  rc=pthread_cond_broadcast(cond);
  return rc;
}
unknown's avatar
unknown committed
2057

2058
#if defined(KEYCACHE_DEBUG_LOG)
unknown's avatar
unknown committed
2059 2060


2061 2062 2063 2064 2065
static void keycache_debug_print(const char * fmt,...)
{
  va_list args;
  va_start(args,fmt);
  if (keycache_debug_log)
unknown's avatar
unknown committed
2066
  {
2067 2068
    VOID(vfprintf(keycache_debug_log, fmt, args));
    VOID(fputc('\n',keycache_debug_log));
unknown's avatar
unknown committed
2069
  }
2070 2071 2072
  va_end(args);
}
#endif /* defined(KEYCACHE_DEBUG_LOG) */
unknown's avatar
unknown committed
2073

2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084
#if defined(KEYCACHE_DEBUG_LOG)


void keycache_debug_log_close(void)
{
  if (keycache_debug_log)
    fclose(keycache_debug_log);
}
#endif /* defined(KEYCACHE_DEBUG_LOG) */

#endif /* defined(KEYCACHE_DEBUG) */