ma_pagecache.c 148 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000-2006 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*
unknown's avatar
unknown committed
18
  These functions handle page caching for Maria tables.
unknown's avatar
unknown committed
19 20 21 22 23 24 25 26 27 28

  One cache can handle many files.
  It must contain buffers of the same blocksize.
  init_pagecache() should be used to init cache handler.

  The free list (free_block_list) is a stack like structure.
  When a block is freed by free_block(), it is pushed onto the stack.
  When a new block is required it is first tried to pop one from the stack.
  If the stack is empty, it is tried to get a never-used block from the pool.
  If this is empty too, then a block is taken from the LRU ring, flushing it
29
  to disk, if necessary. This is handled in find_block().
unknown's avatar
unknown committed
30 31
  With the new free list, the blocks can have three temperatures:
  hot, warm and cold (which is free). This is remembered in the block header
unknown's avatar
unknown committed
32
  by the enum PCBLOCK_TEMPERATURE temperature variable. Remembering the
unknown's avatar
unknown committed
33
  temperature is necessary to correctly count the number of warm blocks,
unknown's avatar
unknown committed
34 35 36 37 38 39 40 41
  which is required to decide when blocks are allowed to become hot. Whenever
  a block is inserted to another (sub-)chain, we take the old and new
  temperature into account to decide if we got one more or less warm block.
  blocks_unused is the sum of never used blocks in the pool and of currently
  free blocks. blocks_used is the number of blocks fetched from the pool and
  as such gives the maximum number of in-use blocks at any time.
*/

unknown's avatar
unknown committed
42
#include "maria_def.h"
unknown's avatar
unknown committed
43
#include <m_string.h>
44
#include "ma_pagecache.h"
unknown's avatar
unknown committed
45
#include "ma_blockrec.h"
unknown's avatar
unknown committed
46
#include <my_bit.h>
unknown's avatar
unknown committed
47
#include <errno.h>
48

unknown's avatar
unknown committed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
/*
  Some compilation flags have been added specifically for this module
  to control the following:
  - not to let a thread to yield the control when reading directly
    from page cache, which might improve performance in many cases;
    to enable this add:
    #define SERIALIZED_READ_FROM_CACHE
  - to set an upper bound for number of threads simultaneously
    using the page cache; this setting helps to determine an optimal
    size for hash table and improve performance when the number of
    blocks in the page cache much less than the number of threads
    accessing it;
    to set this number equal to <N> add
      #define MAX_THREADS <N>
  - to substitute calls of pthread_cond_wait for calls of
    pthread_cond_timedwait (wait with timeout set up);
    this setting should be used only when you want to trap a deadlock
    situation, which theoretically should not happen;
    to set timeout equal to <T> seconds add
      #define PAGECACHE_TIMEOUT <T>
  - to enable the module traps and to send debug information from
    page cache module to a special debug log add:
      #define PAGECACHE_DEBUG
    the name of this debug log file <LOG NAME> can be set through:
      #define PAGECACHE_DEBUG_LOG  <LOG NAME>
    if the name is not defined, it's set by default;
    if the PAGECACHE_DEBUG flag is not set up and we are in a debug
    mode, i.e. when ! defined(DBUG_OFF), the debug information from the
    module is sent to the regular debug log.

  Example of the settings:
    #define SERIALIZED_READ_FROM_CACHE
    #define MAX_THREADS   100
    #define PAGECACHE_TIMEOUT  1
    #define PAGECACHE_DEBUG
    #define PAGECACHE_DEBUG_LOG  "my_pagecache_debug.log"
*/

/*
  In key cache we have external raw locking here we use
  SERIALIZED_READ_FROM_CACHE to avoid problem of reading
90 91 92
  not consistent data from the page.
  (keycache functions (key_cache_read(), key_cache_insert() and
  key_cache_write()) rely on external MyISAM lock, we don't)
unknown's avatar
unknown committed
93 94 95
*/
#define SERIALIZED_READ_FROM_CACHE yes

unknown's avatar
unknown committed
96
#define PCBLOCK_INFO(B) \
unknown's avatar
unknown committed
97
  DBUG_PRINT("info", \
98
             ("block: 0x%lx  fd: %lu  page: %lu  s: %0x  hshL: 0x%lx  req: %u/%u " \
99
              "wrlocks: %u  pins: %u", \
unknown's avatar
unknown committed
100 101 102 103 104 105 106 107 108 109 110 111
              (ulong)(B), \
              (ulong)((B)->hash_link ? \
                      (B)->hash_link->file.file : \
                      0), \
              (ulong)((B)->hash_link ? \
                      (B)->hash_link->pageno : \
                      0), \
              (B)->status, \
              (ulong)(B)->hash_link, \
              (uint) (B)->requests, \
              (uint)((B)->hash_link ? \
                     (B)->hash_link->requests : \
112
                       0), \
113 114
              block->wlocks, \
              (uint)(B)->pins))
unknown's avatar
unknown committed
115 116 117 118 119 120 121 122

/* TODO: put it to my_static.c */
my_bool my_disable_flush_pagecache_blocks= 0;

#define STRUCT_PTR(TYPE, MEMBER, a)                                           \
          (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))

/* types of condition variables */
unknown's avatar
unknown committed
123 124 125 126
#define  COND_FOR_REQUESTED 0  /* queue of thread waiting for read operation */
#define  COND_FOR_SAVED     1  /* queue of thread waiting for flush */
#define  COND_FOR_WRLOCK    2  /* queue of write lock */
#define  COND_SIZE          3  /* number of COND_* queues */
unknown's avatar
unknown committed
127 128 129 130 131 132 133

typedef pthread_cond_t KEYCACHE_CONDVAR;

/* descriptor of the page in the page cache block buffer */
struct st_pagecache_page
{
  PAGECACHE_FILE file;    /* file to which the page belongs to  */
134
  pgcache_page_no_t pageno; /* number of the page in the file   */
unknown's avatar
unknown committed
135 136 137 138 139 140 141 142 143 144
};

/* element in the chain of a hash table bucket */
struct st_pagecache_hash_link
{
  struct st_pagecache_hash_link
    *next, **prev;                   /* to connect links in the same bucket  */
  struct st_pagecache_block_link
    *block;                          /* reference to the block for the page: */
  PAGECACHE_FILE file;               /* from such a file                     */
145
  pgcache_page_no_t pageno;            /* this page                            */
unknown's avatar
unknown committed
146 147 148 149
  uint requests;                     /* number of requests for the page      */
};

/* simple states of a block */
unknown's avatar
unknown committed
150 151 152 153 154 155
#define PCBLOCK_ERROR       1 /* an error occurred when performing disk i/o  */
#define PCBLOCK_READ        2 /* the is page in the block buffer             */
#define PCBLOCK_IN_SWITCH   4 /* block is preparing to read new page         */
#define PCBLOCK_REASSIGNED  8 /* block does not accept requests for old page */
#define PCBLOCK_IN_FLUSH   16 /* block is in flush operation                 */
#define PCBLOCK_CHANGED    32 /* block buffer contains a dirty page          */
156
#define PCBLOCK_DIRECT_W   64 /* possible direct write to the block          */
unknown's avatar
unknown committed
157

158
/* page status, returned by find_block */
unknown's avatar
unknown committed
159 160 161 162 163
#define PAGE_READ               0
#define PAGE_TO_BE_READ         1
#define PAGE_WAIT_TO_BE_READ    2

/* block temperature determines in which (sub-)chain the block currently is */
unknown's avatar
unknown committed
164
enum PCBLOCK_TEMPERATURE { PCBLOCK_COLD /*free*/ , PCBLOCK_WARM , PCBLOCK_HOT };
unknown's avatar
unknown committed
165 166 167

/* debug info */
#ifndef DBUG_OFF
168
static const char *page_cache_page_type_str[]=
unknown's avatar
unknown committed
169
{
170 171
  /* used only for control page type changing during debugging */
  "EMPTY",
172
  "PLAIN",
173 174
  "LSN",
  "READ_UNKNOWN"
unknown's avatar
unknown committed
175
};
176

177
static const char *page_cache_page_write_mode_str[]=
unknown's avatar
unknown committed
178
{
179 180
  "DELAY",
  "DONE"
unknown's avatar
unknown committed
181
};
182

183 184 185 186 187 188 189 190 191 192
static const char *page_cache_page_lock_str[]=
{
  "free -> free",
  "read -> read",
  "write -> write",
  "free -> read",
  "free -> write",
  "read -> free",
  "write -> free",
  "write -> read"
unknown's avatar
unknown committed
193
};
194

195
static const char *page_cache_page_pin_str[]=
unknown's avatar
unknown committed
196
{
197 198 199 200
  "pinned -> pinned",
  "unpinned -> unpinned",
  "unpinned -> pinned",
  "pinned -> unpinned"
unknown's avatar
unknown committed
201
};
202 203


unknown's avatar
unknown committed
204 205 206 207 208
typedef struct st_pagecache_pin_info
{
  struct st_pagecache_pin_info *next, **prev;
  struct st_my_thread_var *thread;
}  PAGECACHE_PIN_INFO;
209

unknown's avatar
unknown committed
210 211 212 213
/*
  st_pagecache_lock_info structure should be kept in next, prev, thread part
  compatible with st_pagecache_pin_info to be compatible in functions.
*/
214

unknown's avatar
unknown committed
215 216 217 218 219 220
typedef struct st_pagecache_lock_info
{
  struct st_pagecache_lock_info *next, **prev;
  struct st_my_thread_var *thread;
  my_bool write_lock;
} PAGECACHE_LOCK_INFO;
unknown's avatar
unknown committed
221 222 223 224 225 226 227 228 229 230 231 232 233 234


/* service functions maintain debugging info about pin & lock */


/*
  Links information about thread pinned/locked the block to the list

  SYNOPSIS
    info_link()
    list                 the list to link in
    node                 the node which should be linked
*/

235
static void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
unknown's avatar
unknown committed
236 237 238 239 240 241
{
  if ((node->next= *list))
    node->next->prev= &(node->next);
  *list= node;
  node->prev= list;
}
unknown's avatar
unknown committed
242 243 244 245 246 247 248 249 250 251


/*
  Unlinks information about thread pinned/locked the block from the list

  SYNOPSIS
    info_unlink()
    node                 the node which should be unlinked
*/

252
static void info_unlink(PAGECACHE_PIN_INFO *node)
unknown's avatar
unknown committed
253 254 255 256
{
  if ((*node->prev= node->next))
   node->next->prev= node->prev;
}
unknown's avatar
unknown committed
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273


/*
  Finds information about given thread in the list of threads which
  pinned/locked this block.

  SYNOPSIS
    info_find()
    list                 the list where to find the thread
    thread               thread ID (reference to the st_my_thread_var
                         of the thread)

  RETURN
    0 - the thread was not found
    pointer to the information node of the thread in the list
*/

274 275
static PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list,
                                     struct st_my_thread_var *thread)
unknown's avatar
unknown committed
276 277 278 279 280 281 282
{
  register PAGECACHE_PIN_INFO *i= list;
  for(; i != 0; i= i->next)
    if (i->thread == thread)
      return i;
  return 0;
}
283 284

#endif /* !DBUG_OFF */
unknown's avatar
unknown committed
285 286 287 288 289 290 291 292 293 294

/* page cache block */
struct st_pagecache_block_link
{
  struct st_pagecache_block_link
    *next_used, **prev_used;   /* to connect links in the LRU chain (ring)   */
  struct st_pagecache_block_link
    *next_changed, **prev_changed; /* for lists of file dirty/clean blocks   */
  struct st_pagecache_hash_link
    *hash_link;           /* backward ptr to referring hash_link             */
295 296 297 298 299 300 301 302
#ifndef DBUG_OFF
  PAGECACHE_PIN_INFO *pin_list;
  PAGECACHE_LOCK_INFO *lock_list;
#endif
  KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event    */
  uchar *buffer;           /* buffer for the block page                      */
  PAGECACHE_FILE *write_locker;
  ulonglong last_hit_time; /* timestamp of the last hit                      */
303
  WQUEUE
unknown's avatar
unknown committed
304 305
    wqueue[COND_SIZE];    /* queues on waiting requests for new/old pages    */
  uint requests;          /* number of requests for the block                */
unknown's avatar
unknown committed
306 307
  uint status;            /* state of the block                              */
  uint pins;              /* pin counter                                     */
308
  uint wlocks;            /* write locks counter                             */
unknown's avatar
unknown committed
309
  enum PCBLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot  */
unknown's avatar
unknown committed
310 311
  enum pagecache_page_type type; /* type of the block                        */
  uint hits_left;         /* number of hits left until promotion             */
unknown's avatar
unknown committed
312 313
  /** @brief LSN when first became dirty; LSN_MAX means "not yet set"        */
  LSN rec_lsn;
unknown's avatar
unknown committed
314 315
};

unknown's avatar
unknown committed
316 317 318
/** @brief information describing a run of flush_pagecache_blocks_int() */
struct st_file_in_flush
{
319
  File file;
unknown's avatar
unknown committed
320 321 322 323 324 325 326 327 328 329 330 331
  /**
     @brief threads waiting for the thread currently flushing this file to be
     done
  */
  WQUEUE flush_queue;
  /**
     @brief if the thread currently flushing the file has a non-empty
     first_in_switch list.
  */
  my_bool first_in_switch;
};

unknown's avatar
unknown committed
332
#ifndef DBUG_OFF
unknown's avatar
unknown committed
333
/* debug checks */
334 335

#ifdef NOT_USED
336
static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
337 338
                              enum pagecache_page_pin mode
                              __attribute__((unused)))
unknown's avatar
unknown committed
339 340 341
{
  struct st_my_thread_var *thread= my_thread_var;
  PAGECACHE_PIN_INFO *info= info_find(block->pin_list, thread);
unknown's avatar
unknown committed
342
  DBUG_ENTER("info_check_pin");
343 344
  DBUG_PRINT("enter", ("thread: 0x%lx  pin: %s",
                       (ulong) thread, page_cache_page_pin_str[mode]));
unknown's avatar
unknown committed
345 346 347 348 349
  if (info)
  {
    if (mode == PAGECACHE_PIN_LEFT_UNPINNED)
    {
      DBUG_PRINT("info",
350
                 ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; LEFT_UNPINNED!!!",
unknown's avatar
unknown committed
351 352 353 354 355 356
                  (ulong)thread, (ulong)block));
      DBUG_RETURN(1);
    }
    else if (mode == PAGECACHE_PIN)
    {
      DBUG_PRINT("info",
357
                 ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; PIN!!!",
unknown's avatar
unknown committed
358 359 360 361 362 363 364 365 366
                  (ulong)thread, (ulong)block));
      DBUG_RETURN(1);
    }
  }
  else
  {
    if (mode == PAGECACHE_PIN_LEFT_PINNED)
    {
      DBUG_PRINT("info",
367
                 ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; LEFT_PINNED!!!",
unknown's avatar
unknown committed
368 369 370 371 372 373
                  (ulong)thread, (ulong)block));
      DBUG_RETURN(1);
    }
    else if (mode == PAGECACHE_UNPIN)
    {
      DBUG_PRINT("info",
374
                 ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; UNPIN!!!",
unknown's avatar
unknown committed
375 376 377 378 379 380
                  (ulong)thread, (ulong)block));
      DBUG_RETURN(1);
    }
  }
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
381

unknown's avatar
unknown committed
382 383 384 385 386 387 388 389 390 391 392 393 394 395

/*
  Debug function which checks current lock/pin state and requested changes

  SYNOPSIS
    info_check_lock()
    lock                 requested lock changes
    pin                  requested pin changes

  RETURN
    0 - OK
    1 - Error
*/

396 397 398
static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
                               enum pagecache_page_lock lock,
                               enum pagecache_page_pin pin)
unknown's avatar
unknown committed
399 400 401 402 403
{
  struct st_my_thread_var *thread= my_thread_var;
  PAGECACHE_LOCK_INFO *info=
    (PAGECACHE_LOCK_INFO *) info_find((PAGECACHE_PIN_INFO *) block->lock_list,
                                      thread);
unknown's avatar
unknown committed
404
  DBUG_ENTER("info_check_lock");
405
  switch(lock) {
unknown's avatar
unknown committed
406
  case PAGECACHE_LOCK_LEFT_UNLOCKED:
407 408
    if (pin != PAGECACHE_PIN_LEFT_UNPINNED ||
        info)
unknown's avatar
unknown committed
409
      goto error;
unknown's avatar
unknown committed
410 411
    break;
  case PAGECACHE_LOCK_LEFT_READLOCKED:
412 413 414
    if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
         pin != PAGECACHE_PIN_LEFT_PINNED) ||
        info == 0 || info->write_lock)
unknown's avatar
unknown committed
415
      goto error;
unknown's avatar
unknown committed
416 417
    break;
  case PAGECACHE_LOCK_LEFT_WRITELOCKED:
418 419
    if (pin != PAGECACHE_PIN_LEFT_PINNED ||
        info == 0 || !info->write_lock)
unknown's avatar
unknown committed
420
      goto error;
unknown's avatar
unknown committed
421 422
    break;
  case PAGECACHE_LOCK_READ:
423 424 425
    if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
         pin != PAGECACHE_PIN) ||
        info != 0)
unknown's avatar
unknown committed
426
      goto error;
unknown's avatar
unknown committed
427 428
    break;
  case PAGECACHE_LOCK_WRITE:
429 430
    if (pin != PAGECACHE_PIN ||
        info != 0)
unknown's avatar
unknown committed
431
      goto error;
unknown's avatar
unknown committed
432 433
    break;
  case PAGECACHE_LOCK_READ_UNLOCK:
434 435 436
    if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
         pin != PAGECACHE_UNPIN) ||
        info == 0 || info->write_lock)
unknown's avatar
unknown committed
437
      goto error;
unknown's avatar
unknown committed
438 439
    break;
  case PAGECACHE_LOCK_WRITE_UNLOCK:
440 441
    if (pin != PAGECACHE_UNPIN ||
        info == 0 || !info->write_lock)
unknown's avatar
unknown committed
442
      goto error;
unknown's avatar
unknown committed
443 444
    break;
  case PAGECACHE_LOCK_WRITE_TO_READ:
445 446 447
    if ((pin != PAGECACHE_PIN_LEFT_PINNED &&
         pin != PAGECACHE_UNPIN) ||
        info == 0 || !info->write_lock)
unknown's avatar
unknown committed
448
      goto error;
unknown's avatar
unknown committed
449 450 451
    break;
  }
  DBUG_RETURN(0);
unknown's avatar
unknown committed
452 453 454 455 456 457 458 459 460
error:
  DBUG_PRINT("info",
             ("info_check_lock: thread: 0x%lx block 0x%lx: info: %d wrt: %d,"
              "to lock: %s, to pin: %s",
              (ulong)thread, (ulong)block, test(info),
              (info ? info->write_lock : 0),
              page_cache_page_lock_str[lock],
              page_cache_page_pin_str[pin]));
  DBUG_RETURN(1);
unknown's avatar
unknown committed
461
}
462
#endif /* NOT_USED */
463
#endif /* !DBUG_OFF */
unknown's avatar
unknown committed
464 465 466 467

#define FLUSH_CACHE         2000            /* sort this many blocks at once */

static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
468
#ifndef DBUG_OFF
unknown's avatar
unknown committed
469 470
static void test_key_cache(PAGECACHE *pagecache,
                           const char *where, my_bool lock);
471
#endif
unknown's avatar
unknown committed
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542

#define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) +                          \
                                    (ulong) (f).file) & (p->hash_entries-1))
#define FILE_HASH(f) ((uint) (f).file & (PAGECACHE_CHANGED_BLOCKS_HASH - 1))

#define DEFAULT_PAGECACHE_DEBUG_LOG  "pagecache_debug.log"

#if defined(PAGECACHE_DEBUG) && ! defined(PAGECACHE_DEBUG_LOG)
#define PAGECACHE_DEBUG_LOG  DEFAULT_PAGECACHE_DEBUG_LOG
#endif

#if defined(PAGECACHE_DEBUG_LOG)
static FILE *pagecache_debug_log= NULL;
static void pagecache_debug_print _VARARGS((const char *fmt, ...));
#define PAGECACHE_DEBUG_OPEN                                                  \
          if (!pagecache_debug_log)                                           \
          {                                                                   \
            pagecache_debug_log= fopen(PAGECACHE_DEBUG_LOG, "w");             \
            (void) setvbuf(pagecache_debug_log, NULL, _IOLBF, BUFSIZ);        \
          }

#define PAGECACHE_DEBUG_CLOSE                                                 \
          if (pagecache_debug_log)                                            \
          {                                                                   \
            fclose(pagecache_debug_log);                                      \
            pagecache_debug_log= 0;                                           \
          }
#else
#define PAGECACHE_DEBUG_OPEN
#define PAGECACHE_DEBUG_CLOSE
#endif /* defined(PAGECACHE_DEBUG_LOG) */

#if defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG)
#define KEYCACHE_DBUG_PRINT(l, m)                                             \
            { if (pagecache_debug_log)                                        \
                fprintf(pagecache_debug_log, "%s: ", l);                      \
              pagecache_debug_print m; }

#define KEYCACHE_DBUG_ASSERT(a)                                               \
            { if (! (a) && pagecache_debug_log)                               \
                fclose(pagecache_debug_log);                                  \
              assert(a); }
#else
#define KEYCACHE_DBUG_PRINT(l, m)  DBUG_PRINT(l, m)
#define KEYCACHE_DBUG_ASSERT(a)    DBUG_ASSERT(a)
#endif /* defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG) */

#if defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF)
#ifdef THREAD
static long pagecache_thread_id;
#define KEYCACHE_THREAD_TRACE(l)                                              \
             KEYCACHE_DBUG_PRINT(l,("|thread %ld",pagecache_thread_id))

#define KEYCACHE_THREAD_TRACE_BEGIN(l)                                        \
            { struct st_my_thread_var *thread_var= my_thread_var;             \
              pagecache_thread_id= thread_var->id;                            \
              KEYCACHE_DBUG_PRINT(l,("[thread %ld",pagecache_thread_id)) }

#define KEYCACHE_THREAD_TRACE_END(l)                                          \
            KEYCACHE_DBUG_PRINT(l,("]thread %ld",pagecache_thread_id))
#else /* THREAD */
#define KEYCACHE_THREAD_TRACE(l)        KEYCACHE_DBUG_PRINT(l,(""))
#define KEYCACHE_THREAD_TRACE_BEGIN(l)  KEYCACHE_DBUG_PRINT(l,(""))
#define KEYCACHE_THREAD_TRACE_END(l)    KEYCACHE_DBUG_PRINT(l,(""))
#endif /* THREAD */
#else
#define KEYCACHE_THREAD_TRACE_BEGIN(l)
#define KEYCACHE_THREAD_TRACE_END(l)
#define KEYCACHE_THREAD_TRACE(l)
#endif /* defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF) */

unknown's avatar
unknown committed
543
#define PCBLOCK_NUMBER(p, b)                                                    \
unknown's avatar
unknown committed
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
  ((uint) (((char*)(b)-(char *) p->block_root)/sizeof(PAGECACHE_BLOCK_LINK)))
#define PAGECACHE_HASH_LINK_NUMBER(p, h)                                      \
  ((uint) (((char*)(h)-(char *) p->hash_link_root)/                           \
           sizeof(PAGECACHE_HASH_LINK)))

#if (defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)) || defined(PAGECACHE_DEBUG)
static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex);
#else
#define  pagecache_pthread_cond_wait pthread_cond_wait
#endif

#if defined(PAGECACHE_DEBUG)
static int ___pagecache_pthread_mutex_lock(pthread_mutex_t *mutex);
static void ___pagecache_pthread_mutex_unlock(pthread_mutex_t *mutex);
static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond);
#define pagecache_pthread_mutex_lock(M) \
{ DBUG_PRINT("lock", ("mutex lock 0x%lx %u", (ulong)(M), __LINE__)); \
  ___pagecache_pthread_mutex_lock(M);}
#define pagecache_pthread_mutex_unlock(M) \
{ DBUG_PRINT("lock", ("mutex unlock 0x%lx %u", (ulong)(M), __LINE__)); \
  ___pagecache_pthread_mutex_unlock(M);}
#define pagecache_pthread_cond_signal(M) \
{ DBUG_PRINT("lock", ("signal 0x%lx %u", (ulong)(M), __LINE__)); \
  ___pagecache_pthread_cond_signal(M);}
#else
#define pagecache_pthread_mutex_lock pthread_mutex_lock
#define pagecache_pthread_mutex_unlock pthread_mutex_unlock
#define pagecache_pthread_cond_signal pthread_cond_signal
#endif /* defined(PAGECACHE_DEBUG) */

unknown's avatar
unknown committed
575
extern my_bool translog_flush(TRANSLOG_ADDRESS lsn);
unknown's avatar
unknown committed
576 577

/*
578
  Write page to the disk
unknown's avatar
unknown committed
579 580 581 582 583

  SYNOPSIS
    pagecache_fwrite()
    pagecache - page cache pointer
    filedesc  - pagecache file descriptor structure
584
    buffer    - buffer which we will write
unknown's avatar
unknown committed
585 586 587 588 589 590 591
    type      - page type (plain or with LSN)
    flags     - MYF() flags

  RETURN
    0   - OK
    !=0 - Error
*/
592 593 594

static uint pagecache_fwrite(PAGECACHE *pagecache,
                             PAGECACHE_FILE *filedesc,
unknown's avatar
unknown committed
595
                             uchar *buffer,
596 597 598
                             pgcache_page_no_t pageno,
                             enum pagecache_page_type type,
                             myf flags)
unknown's avatar
unknown committed
599
{
unknown's avatar
unknown committed
600 601 602
  TRANSLOG_ADDRESS (*addr_callback)
    (uchar *page, pgcache_page_no_t offset, uchar *data)=
    filedesc->get_log_address_callback;
unknown's avatar
unknown committed
603
  DBUG_ENTER("pagecache_fwrite");
604
  DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
unknown's avatar
unknown committed
605
  if (addr_callback != NULL)
unknown's avatar
unknown committed
606
  {
unknown's avatar
unknown committed
607 608
    TRANSLOG_ADDRESS addr=
      (*addr_callback)(buffer, pageno, filedesc->callback_data);
unknown's avatar
unknown committed
609
    DBUG_PRINT("info", ("Log handler call"));
unknown's avatar
unknown committed
610 611
    DBUG_ASSERT(LSN_VALID(addr));
    if (translog_flush(addr))
unknown's avatar
unknown committed
612 613 614 615 616
      DBUG_RETURN(1);
  }
  DBUG_PRINT("info", ("write_callback: 0x%lx  data: 0x%lx",
                      (ulong) filedesc->write_callback,
                      (ulong) filedesc->callback_data));
unknown's avatar
unknown committed
617
  if ((*filedesc->write_callback)(buffer, pageno, filedesc->callback_data))
unknown's avatar
unknown committed
618 619 620
  {
    DBUG_PRINT("error", ("write callback problem"));
    DBUG_RETURN(1);
unknown's avatar
unknown committed
621
  }
unknown's avatar
unknown committed
622

unknown's avatar
unknown committed
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
  DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
                        (pageno)<<(pagecache->shift), flags));
}


/*
  Read page from the disk

  SYNOPSIS
    pagecache_fread()
    pagecache - page cache pointer
    filedesc  - pagecache file descriptor structure
    buffer    - buffer in which we will read
    pageno    - page number
    flags     - MYF() flags
*/
#define pagecache_fread(pagecache, filedesc, buffer, pageno, flags) \
  my_pread((filedesc)->file, buffer, pagecache->block_size,         \
           (pageno)<<(pagecache->shift), flags)


644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
/**
  @brief set rec_lsn of pagecache block (if it is needed)

  @param block                   block where to set rec_lsn
  @param first_REDO_LSN_for_page the LSN to set
*/

static inline void pagecache_set_block_rec_lsn(PAGECACHE_BLOCK_LINK *block,
                                               LSN first_REDO_LSN_for_page)
{
  if (block->rec_lsn == LSN_MAX)
    block->rec_lsn= first_REDO_LSN_for_page;
  else
    DBUG_ASSERT(cmp_translog_addr(block->rec_lsn,
                                  first_REDO_LSN_for_page) <= 0);
}


662 663 664 665
/*
  next_power(value) is 2 at the power of (1+floor(log2(value)));
  e.g. next_power(2)=4, next_power(3)=4.
*/
unknown's avatar
unknown committed
666
static inline uint next_power(uint value)
unknown's avatar
unknown committed
667
{
unknown's avatar
unknown committed
668
  return (uint) my_round_up_to_next_power((uint32) value) << 1;
unknown's avatar
unknown committed
669 670 671 672 673 674 675 676 677 678 679 680 681 682
}


/*
  Initialize a page cache

  SYNOPSIS
    init_pagecache()
    pagecache			pointer to a page cache data structure
    key_cache_block_size	size of blocks to keep cached data
    use_mem                     total memory to use for the key cache
    division_limit		division limit (may be zero)
    age_threshold		age threshold (may be zero)
    block_size                  size of block (should be power of 2)
683 684
    my_read_flags		Flags used for all pread/pwrite calls
			        Usually MY_WME in case of recovery
unknown's avatar
unknown committed
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699

  RETURN VALUE
    number of blocks in the key cache, if successful,
    0 - otherwise.

  NOTES.
    if pagecache->inited != 0 we assume that the key cache
    is already initialized.  This is for now used by myisamchk, but shouldn't
    be something that a program should rely on!

    It's assumed that no two threads call this function simultaneously
    referring to the same key cache handle.

*/

700 701
ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
                     uint division_limit, uint age_threshold,
702
                     uint block_size, myf my_readwrite_flags)
unknown's avatar
unknown committed
703
{
704
  ulong blocks, hash_links, length;
unknown's avatar
unknown committed
705
  int error;
706
  DBUG_ENTER("init_pagecache");
unknown's avatar
unknown committed
707 708 709 710 711 712 713 714 715 716 717 718 719 720
  DBUG_ASSERT(block_size >= 512);

  PAGECACHE_DEBUG_OPEN;
  if (pagecache->inited && pagecache->disk_blocks > 0)
  {
    DBUG_PRINT("warning",("key cache already in use"));
    DBUG_RETURN(0);
  }

  pagecache->global_cache_w_requests= pagecache->global_cache_r_requests= 0;
  pagecache->global_cache_read= pagecache->global_cache_write= 0;
  pagecache->disk_blocks= -1;
  if (! pagecache->inited)
  {
unknown's avatar
unknown committed
721 722 723 724 725 726
    if (pthread_mutex_init(&pagecache->cache_lock, MY_MUTEX_INIT_FAST) ||
        hash_init(&pagecache->files_in_flush, &my_charset_bin, 32,
                  offsetof(struct st_file_in_flush, file),
                  sizeof(((struct st_file_in_flush *)NULL)->file),
                  NULL, NULL, 0))
      goto err;
unknown's avatar
unknown committed
727 728 729 730 731 732 733 734
    pagecache->inited= 1;
    pagecache->in_init= 0;
    pagecache->resize_queue.last_thread= NULL;
  }

  pagecache->mem_size= use_mem;
  pagecache->block_size= block_size;
  pagecache->shift= my_bit_log2(block_size);
735
  pagecache->readwrite_flags= my_readwrite_flags | MY_NABP | MY_WAIT_IF_FULL;
736
  pagecache->org_readwrite_flags= pagecache->readwrite_flags;
737
  DBUG_PRINT("info", ("block_size: %u", block_size));
738
  DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size);
unknown's avatar
unknown committed
739

740 741 742 743
  blocks= (ulong) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
                              2 * sizeof(PAGECACHE_HASH_LINK) +
                              sizeof(PAGECACHE_HASH_LINK*) *
                              5/4 + block_size));
unknown's avatar
unknown committed
744 745 746 747
  /*
    We need to support page cache with just one block to be able to do
    scanning of rows-in-block files
  */
748
  for ( ; ; )
unknown's avatar
unknown committed
749
  {
750
    if (blocks < 8)
unknown's avatar
unknown committed
751
    {
752 753 754 755 756 757 758 759
      my_errno= ENOMEM;
      goto err;
    }
    /* Set my_hash_entries to the next bigger 2 power */
    if ((pagecache->hash_entries= next_power(blocks)) <
        (blocks) * 5/4)
      pagecache->hash_entries<<= 1;
    hash_links= 2 * blocks;
unknown's avatar
unknown committed
760
#if defined(MAX_THREADS)
761 762
    if (hash_links < MAX_THREADS + blocks - 1)
      hash_links= MAX_THREADS + blocks - 1;
unknown's avatar
unknown committed
763
#endif
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
    while ((length= (ALIGN_SIZE(blocks * sizeof(PAGECACHE_BLOCK_LINK)) +
                     ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
                     ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
                                pagecache->hash_entries))) +
           (blocks << pagecache->shift) > use_mem)
      blocks--;
    /* Allocate memory for cache page buffers */
    if ((pagecache->block_mem=
         my_large_malloc((ulong) blocks * pagecache->block_size,
                         MYF(MY_WME))))
    {
      /*
        Allocate memory for blocks, hash_links and hash entries;
        For each block 2 hash links are allocated
      */
      if ((pagecache->block_root=
           (PAGECACHE_BLOCK_LINK*) my_malloc((size_t) length, MYF(0))))
        break;
      my_large_free(pagecache->block_mem, MYF(0));
      pagecache->block_mem= 0;
unknown's avatar
unknown committed
784
    }
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
    blocks= blocks / 4*3;
  }
  pagecache->blocks_unused= blocks;
  pagecache->disk_blocks= (long) blocks;
  pagecache->hash_links= hash_links;
  pagecache->hash_root=
    (PAGECACHE_HASH_LINK**) ((char*) pagecache->block_root +
                             ALIGN_SIZE(blocks*sizeof(PAGECACHE_BLOCK_LINK)));
  pagecache->hash_link_root=
    (PAGECACHE_HASH_LINK*) ((char*) pagecache->hash_root +
                            ALIGN_SIZE((sizeof(PAGECACHE_HASH_LINK*) *
                                        pagecache->hash_entries)));
  bzero((uchar*) pagecache->block_root,
        pagecache->disk_blocks * sizeof(PAGECACHE_BLOCK_LINK));
  bzero((uchar*) pagecache->hash_root,
        pagecache->hash_entries * sizeof(PAGECACHE_HASH_LINK*));
  bzero((uchar*) pagecache->hash_link_root,
        pagecache->hash_links * sizeof(PAGECACHE_HASH_LINK));
  pagecache->hash_links_used= 0;
  pagecache->free_hash_list= NULL;
  pagecache->blocks_used= pagecache->blocks_changed= 0;

  pagecache->global_blocks_changed= 0;
  pagecache->blocks_available=0;		/* For debugging */

  /* The LRU chain is empty after initialization */
  pagecache->used_last= NULL;
  pagecache->used_ins= NULL;
  pagecache->free_block_list= NULL;
  pagecache->time= 0;
  pagecache->warm_blocks= 0;
  pagecache->min_warm_blocks= (division_limit ?
                               blocks * division_limit / 100 + 1 :
818
                               blocks);
819 820 821
  pagecache->age_threshold= (age_threshold ?
                             blocks * age_threshold / 100 :
                             blocks);
unknown's avatar
unknown committed
822

823 824 825
  pagecache->cnt_for_resize_op= 0;
  pagecache->resize_in_flush= 0;
  pagecache->can_be_used= 1;
unknown's avatar
unknown committed
826

827 828 829 830
  pagecache->waiting_for_hash_link.last_thread= NULL;
  pagecache->waiting_for_block.last_thread= NULL;
  DBUG_PRINT("exit",
             ("disk_blocks: %ld  block_root: 0x%lx  hash_entries: %ld\
831
 hash_root: 0x%lx  hash_links: %ld  hash_link_root: 0x%lx",
832 833 834 835 836 837 838 839 840
              pagecache->disk_blocks, (long) pagecache->block_root,
              pagecache->hash_entries, (long) pagecache->hash_root,
              pagecache->hash_links, (long) pagecache->hash_link_root));
  bzero((uchar*) pagecache->changed_blocks,
        sizeof(pagecache->changed_blocks[0]) *
        PAGECACHE_CHANGED_BLOCKS_HASH);
  bzero((uchar*) pagecache->file_blocks,
        sizeof(pagecache->file_blocks[0]) *
        PAGECACHE_CHANGED_BLOCKS_HASH);
unknown's avatar
unknown committed
841 842

  pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0;
843
  DBUG_RETURN((ulong) pagecache->disk_blocks);
unknown's avatar
unknown committed
844 845 846 847 848 849 850

err:
  error= my_errno;
  pagecache->disk_blocks= 0;
  pagecache->blocks=  0;
  if (pagecache->block_mem)
  {
unknown's avatar
unknown committed
851
    my_large_free((uchar*) pagecache->block_mem, MYF(0));
unknown's avatar
unknown committed
852 853 854 855
    pagecache->block_mem= NULL;
  }
  if (pagecache->block_root)
  {
unknown's avatar
unknown committed
856
    my_free((uchar*) pagecache->block_root, MYF(0));
unknown's avatar
unknown committed
857 858 859 860 861 862 863 864
    pagecache->block_root= NULL;
  }
  my_errno= error;
  pagecache->can_be_used= 0;
  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
/*
  Flush all blocks in the key cache to disk
*/

#ifdef NOT_USED
static int flush_all_key_blocks(PAGECACHE *pagecache)
{
#if defined(PAGECACHE_DEBUG)
  uint cnt=0;
#endif
  while (pagecache->blocks_changed > 0)
  {
    PAGECACHE_BLOCK_LINK *block;
    for (block= pagecache->used_last->next_used ; ; block=block->next_used)
    {
      if (block->hash_link)
      {
#if defined(PAGECACHE_DEBUG)
        cnt++;
        KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
#endif
        if (flush_pagecache_blocks_int(pagecache, &block->hash_link->file,
unknown's avatar
unknown committed
887
                                       FLUSH_RELEASE, NULL, NULL))
unknown's avatar
unknown committed
888 889 890 891 892 893 894 895 896 897 898
          return 1;
        break;
      }
      if (block == pagecache->used_last)
        break;
    }
  }
  return 0;
}
#endif /* NOT_USED */

unknown's avatar
unknown committed
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
/*
  Resize a key cache

  SYNOPSIS
    resize_pagecache()
    pagecache                   pointer to a page cache data structure
    use_mem			total memory to use for the new key cache
    division_limit		new division limit (if not zero)
    age_threshold		new age threshold (if not zero)

  RETURN VALUE
    number of blocks in the key cache, if successful,
    0 - otherwise.

  NOTES.
    The function first compares the memory size parameter
    with the key cache value.

    If they differ the function free the the memory allocated for the
    old key cache blocks by calling the end_pagecache function and
    then rebuilds the key cache with new blocks by calling
    init_key_cache.

    The function starts the operation only when all other threads
    performing operations with the key cache let her to proceed
    (when cnt_for_resize=0).

unknown's avatar
unknown committed
926 927 928 929 930 931 932
     Before being usable, this function needs:
     - to receive fixes for BUG#17332 "changing key_buffer_size on a running
     server can crash under load" similar to those done to the key cache
     - to have us (Sanja) look at the additional constraints placed on
     resizing, due to the page locking specific to this page cache.
     So we disable it for now.
*/
unknown's avatar
unknown committed
933
#if NOT_USED /* keep disabled until code is fixed see above !! */
934 935 936
ulong resize_pagecache(PAGECACHE *pagecache,
                       size_t use_mem, uint division_limit,
                       uint age_threshold)
unknown's avatar
unknown committed
937
{
938
  ulong blocks;
unknown's avatar
unknown committed
939
#ifdef THREAD
unknown's avatar
unknown committed
940
  struct st_my_thread_var *thread;
941 942
  WQUEUE *wqueue;

unknown's avatar
unknown committed
943
#endif
unknown's avatar
unknown committed
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959
  DBUG_ENTER("resize_pagecache");

  if (!pagecache->inited)
    DBUG_RETURN(pagecache->disk_blocks);

  if(use_mem == pagecache->mem_size)
  {
    change_pagecache_param(pagecache, division_limit, age_threshold);
    DBUG_RETURN(pagecache->disk_blocks);
  }

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);

#ifdef THREAD
  wqueue= &pagecache->resize_queue;
  thread= my_thread_var;
960
  wqueue_link_into_queue(wqueue, thread);
unknown's avatar
unknown committed
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992

  while (wqueue->last_thread->next != thread)
  {
    pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
  }
#endif

  pagecache->resize_in_flush= 1;
  if (flush_all_key_blocks(pagecache))
  {
    /* TODO: if this happens, we should write a warning in the log file ! */
    pagecache->resize_in_flush= 0;
    blocks= 0;
    pagecache->can_be_used= 0;
    goto finish;
  }
  pagecache->resize_in_flush= 0;
  pagecache->can_be_used= 0;
#ifdef THREAD
  while (pagecache->cnt_for_resize_op)
  {
    KEYCACHE_DBUG_PRINT("resize_pagecache: wait",
                        ("suspend thread %ld", thread->id));
    pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
  }
#else
  KEYCACHE_DBUG_ASSERT(pagecache->cnt_for_resize_op == 0);
#endif

  end_pagecache(pagecache, 0);			/* Don't free mutex */
  /* The following will work even if use_mem is 0 */
  blocks= init_pagecache(pagecache, pagecache->block_size, use_mem,
993 994
			 division_limit, age_threshold,
                         pagecache->readwrite_flags);
unknown's avatar
unknown committed
995 996 997

finish:
#ifdef THREAD
998
  wqueue_unlink_from_queue(wqueue, thread);
unknown's avatar
unknown committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
  /* Signal for the next resize request to proceeed if any */
  if (wqueue->last_thread)
  {
    KEYCACHE_DBUG_PRINT("resize_pagecache: signal",
                        ("thread %ld", wqueue->last_thread->next->id));
    pagecache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
  }
#endif
  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
  DBUG_RETURN(blocks);
}
unknown's avatar
unknown committed
1010
#endif /* 0 */
unknown's avatar
unknown committed
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077


/*
  Increment counter blocking resize key cache operation
*/
static inline void inc_counter_for_resize_op(PAGECACHE *pagecache)
{
  pagecache->cnt_for_resize_op++;
}


/*
  Decrement counter blocking resize key cache operation;
  Signal the operation to proceed when counter becomes equal zero
*/
static inline void dec_counter_for_resize_op(PAGECACHE *pagecache)
{
#ifdef THREAD
  struct st_my_thread_var *last_thread;
  if (!--pagecache->cnt_for_resize_op &&
      (last_thread= pagecache->resize_queue.last_thread))
  {
    KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal",
                        ("thread %ld", last_thread->next->id));
    pagecache_pthread_cond_signal(&last_thread->next->suspend);
  }
#else
  pagecache->cnt_for_resize_op--;
#endif
}

/*
  Change the page cache parameters

  SYNOPSIS
    change_pagecache_param()
    pagecache			pointer to a page cache data structure
    division_limit		new division limit (if not zero)
    age_threshold		new age threshold (if not zero)

  RETURN VALUE
    none

  NOTES.
    Presently the function resets the key cache parameters
    concerning midpoint insertion strategy - division_limit and
    age_threshold.
*/

void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
			    uint age_threshold)
{
  DBUG_ENTER("change_pagecache_param");

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  if (division_limit)
    pagecache->min_warm_blocks= (pagecache->disk_blocks *
				division_limit / 100 + 1);
  if (age_threshold)
    pagecache->age_threshold=   (pagecache->disk_blocks *
				age_threshold / 100);
  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
  DBUG_VOID_RETURN;
}


/*
1078
  Removes page cache from memory. Does NOT flush pages to disk.
unknown's avatar
unknown committed
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091

  SYNOPSIS
    end_pagecache()
    pagecache		page cache handle
    cleanup		Complete free (Free also mutex for key cache)

  RETURN VALUE
    none
*/

void end_pagecache(PAGECACHE *pagecache, my_bool cleanup)
{
  DBUG_ENTER("end_pagecache");
1092
  DBUG_PRINT("enter", ("key_cache: 0x%lx", (long) pagecache));
unknown's avatar
unknown committed
1093 1094 1095 1096 1097 1098 1099 1100

  if (!pagecache->inited)
    DBUG_VOID_RETURN;

  if (pagecache->disk_blocks > 0)
  {
    if (pagecache->block_mem)
    {
unknown's avatar
unknown committed
1101
      my_large_free((uchar*) pagecache->block_mem, MYF(0));
unknown's avatar
unknown committed
1102
      pagecache->block_mem= NULL;
unknown's avatar
unknown committed
1103
      my_free((uchar*) pagecache->block_root, MYF(0));
unknown's avatar
unknown committed
1104 1105 1106 1107 1108 1109 1110
      pagecache->block_root= NULL;
    }
    pagecache->disk_blocks= -1;
    /* Reset blocks_changed to be safe if flush_all_key_blocks is called */
    pagecache->blocks_changed= 0;
  }

1111
  DBUG_PRINT("status", ("used: %lu  changed: %lu  w_requests: %lu  "
unknown's avatar
unknown committed
1112
                        "writes: %lu  r_requests: %lu  reads: %lu",
1113 1114
                        pagecache->blocks_used,
                        pagecache->global_blocks_changed,
unknown's avatar
unknown committed
1115 1116 1117 1118 1119 1120 1121
                        (ulong) pagecache->global_cache_w_requests,
                        (ulong) pagecache->global_cache_write,
                        (ulong) pagecache->global_cache_r_requests,
                        (ulong) pagecache->global_cache_read));

  if (cleanup)
  {
unknown's avatar
unknown committed
1122
    hash_free(&pagecache->files_in_flush);
unknown's avatar
unknown committed
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
    pthread_mutex_destroy(&pagecache->cache_lock);
    pagecache->inited= pagecache->can_be_used= 0;
    PAGECACHE_DEBUG_CLOSE;
  }
  DBUG_VOID_RETURN;
} /* end_pagecache */


/*
  Unlink a block from the chain of dirty/clean blocks
*/

static inline void unlink_changed(PAGECACHE_BLOCK_LINK *block)
{
  if (block->next_changed)
    block->next_changed->prev_changed= block->prev_changed;
  *block->prev_changed= block->next_changed;
}


/*
  Link a block into the chain of dirty/clean blocks
*/

static inline void link_changed(PAGECACHE_BLOCK_LINK *block,
                                PAGECACHE_BLOCK_LINK **phead)
{
  block->prev_changed= phead;
  if ((block->next_changed= *phead))
    (*phead)->prev_changed= &block->next_changed;
  *phead= block;
}


/*
  Unlink a block from the chain of dirty/clean blocks, if it's asked for,
  and link it to the chain of clean blocks for the specified file
*/

static void link_to_file_list(PAGECACHE *pagecache,
                              PAGECACHE_BLOCK_LINK *block,
1164
                              PAGECACHE_FILE *file, my_bool unlink_flag)
unknown's avatar
unknown committed
1165
{
1166
  if (unlink_flag)
unknown's avatar
unknown committed
1167 1168
    unlink_changed(block);
  link_changed(block, &pagecache->file_blocks[FILE_HASH(*file)]);
unknown's avatar
unknown committed
1169
  if (block->status & PCBLOCK_CHANGED)
unknown's avatar
unknown committed
1170
  {
unknown's avatar
unknown committed
1171
    block->status&= ~PCBLOCK_CHANGED;
unknown's avatar
unknown committed
1172
    block->rec_lsn= LSN_MAX;
unknown's avatar
unknown committed
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189
    pagecache->blocks_changed--;
    pagecache->global_blocks_changed--;
  }
}


/*
  Unlink a block from the chain of clean blocks for the specified
  file and link it to the chain of dirty blocks for this file
*/

static inline void link_to_changed_list(PAGECACHE *pagecache,
                                        PAGECACHE_BLOCK_LINK *block)
{
  unlink_changed(block);
  link_changed(block,
               &pagecache->changed_blocks[FILE_HASH(block->hash_link->file)]);
unknown's avatar
unknown committed
1190
  block->status|=PCBLOCK_CHANGED;
unknown's avatar
unknown committed
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
  pagecache->blocks_changed++;
  pagecache->global_blocks_changed++;
}


/*
  Link a block to the LRU chain at the beginning or at the end of
  one of two parts.

  SYNOPSIS
    link_block()
      pagecache            pointer to a page cache data structure
      block               pointer to the block to link to the LRU chain
      hot                 <-> to link the block into the hot subchain
      at_end              <-> to link the block at the end of the subchain

  RETURN VALUE
    none

  NOTES.
1211
    The LRU chain is represented by a circular list of block structures.
unknown's avatar
unknown committed
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
    The list is double-linked of the type (**prev,*next) type.
    The LRU chain is divided into two parts - hot and warm.
    There are two pointers to access the last blocks of these two
    parts. The beginning of the warm part follows right after the
    end of the hot part.
    Only blocks of the warm part can be used for replacement.
    The first block from the beginning of this subchain is always
    taken for eviction (pagecache->last_used->next)

    LRU chain:       +------+   H O T    +------+
                +----| end  |----...<----| beg  |----+
                |    +------+last        +------+    |
                v<-link in latest hot (new end)      |
                |     link in latest warm (new end)->^
                |    +------+  W A R M   +------+    |
                +----| beg  |---->...----| end  |----+
                     +------+            +------+ins
                  first for eviction
*/

static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
                       my_bool hot, my_bool at_end)
{
  PAGECACHE_BLOCK_LINK *ins;
unknown's avatar
unknown committed
1236
  PAGECACHE_BLOCK_LINK **ptr_ins;
unknown's avatar
unknown committed
1237

unknown's avatar
unknown committed
1238
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
  KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
#ifdef THREAD
  if (!hot && pagecache->waiting_for_block.last_thread)
  {
    /* Signal that in the LRU warm sub-chain an available block has appeared */
    struct st_my_thread_var *last_thread=
                               pagecache->waiting_for_block.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
    PAGECACHE_HASH_LINK *hash_link=
      (PAGECACHE_HASH_LINK *) first_thread->opt_info;
    struct st_my_thread_var *thread;
    do
    {
      thread= next_thread;
      next_thread= thread->next;
      /*
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if ((PAGECACHE_HASH_LINK *) thread->opt_info == hash_link)
      {
1261
        KEYCACHE_DBUG_PRINT("link_block: signal", ("thread: %ld", thread->id));
unknown's avatar
unknown committed
1262
        pagecache_pthread_cond_signal(&thread->suspend);
1263
        wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread);
unknown's avatar
unknown committed
1264 1265 1266 1267 1268 1269 1270 1271
        block->requests++;
      }
    }
    while (thread != last_thread);
    hash_link->block= block;
    KEYCACHE_THREAD_TRACE("link_block: after signaling");
#if defined(PAGECACHE_DEBUG)
    KEYCACHE_DBUG_PRINT("link_block",
1272
        ("linked,unlinked block: %u  status: %x  #requests: %u  #available: %u",
unknown's avatar
unknown committed
1273
         PCBLOCK_NUMBER(pagecache, block), block->status,
unknown's avatar
unknown committed
1274 1275 1276 1277 1278 1279
         block->requests, pagecache->blocks_available));
#endif
    return;
  }
#else /* THREAD */
  KEYCACHE_DBUG_ASSERT(! (!hot && pagecache->waiting_for_block.last_thread));
1280
  /* Condition not transformed using DeMorgan, to keep the text identical */
unknown's avatar
unknown committed
1281
#endif /* THREAD */
unknown's avatar
unknown committed
1282 1283
  ptr_ins= hot ? &pagecache->used_ins : &pagecache->used_last;
  ins= *ptr_ins;
unknown's avatar
unknown committed
1284 1285 1286 1287 1288 1289 1290
  if (ins)
  {
    ins->next_used->prev_used= &block->next_used;
    block->next_used= ins->next_used;
    block->prev_used= &ins->next_used;
    ins->next_used= block;
    if (at_end)
unknown's avatar
unknown committed
1291
      *ptr_ins= block;
unknown's avatar
unknown committed
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
  }
  else
  {
    /* The LRU chain is empty */
    pagecache->used_last= pagecache->used_ins= block->next_used= block;
    block->prev_used= &block->next_used;
  }
  KEYCACHE_THREAD_TRACE("link_block");
#if defined(PAGECACHE_DEBUG)
  pagecache->blocks_available++;
  KEYCACHE_DBUG_PRINT("link_block",
1303 1304 1305
                      ("linked block: %u:%1u  status: %x  #requests: %u  #available: %u",
                       PCBLOCK_NUMBER(pagecache, block), at_end, block->status,
                       block->requests, pagecache->blocks_available));
unknown's avatar
unknown committed
1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
  KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <=
                       pagecache->blocks_used);
#endif
}


/*
  Unlink a block from the LRU chain

  SYNOPSIS
    unlink_block()
      pagecache            pointer to a page cache data structure
      block               pointer to the block to unlink from the LRU chain

  RETURN VALUE
    none

  NOTES.
    See NOTES for link_block
*/

static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
{
1329 1330
  DBUG_ENTER("unlink_block");
  DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block));
unknown's avatar
unknown committed
1331
  if (block->next_used == block)
1332
  {
unknown's avatar
unknown committed
1333 1334
    /* The list contains only one member */
    pagecache->used_last= pagecache->used_ins= NULL;
1335
  }
unknown's avatar
unknown committed
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
  else
  {
    block->next_used->prev_used= block->prev_used;
    *block->prev_used= block->next_used;
    if (pagecache->used_last == block)
      pagecache->used_last= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
                                       next_used, block->prev_used);
    if (pagecache->used_ins == block)
      pagecache->used_ins= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
                                      next_used, block->prev_used);
  }
  block->next_used= NULL;

  KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(PAGECACHE_DEBUG)
1351
  KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0);
unknown's avatar
unknown committed
1352 1353
  pagecache->blocks_available--;
  KEYCACHE_DBUG_PRINT("unlink_block",
1354 1355 1356 1357
                      ("unlinked block: 0x%lx (%u)  status: %x   #requests: %u  #available: %u",
                       (ulong)block, PCBLOCK_NUMBER(pagecache, block),
                       block->status,
                       block->requests, pagecache->blocks_available));
unknown's avatar
unknown committed
1358
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
1359
#endif
1360
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1361 1362 1363 1364 1365
}


/*
  Register requests for a block
unknown's avatar
unknown committed
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375

  SYNOPSIS
    reg_requests()
    pagecache            this page cache reference
    block                the block we request reference
    count                how many requests we register (it is 1 everywhere)

  NOTE
  Registration of request means we are going to use this block so we exclude
  it from the LRU if it is first request
unknown's avatar
unknown committed
1376 1377 1378 1379 1380
*/
static void reg_requests(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
                         int count)
{
  DBUG_ENTER("reg_requests");
1381
  DBUG_PRINT("enter", ("block: 0x%lx (%u)  status: %x  reqs: %u",
unknown's avatar
unknown committed
1382
		       (ulong)block, PCBLOCK_NUMBER(pagecache, block),
unknown's avatar
unknown committed
1383
                       block->status, block->requests));
unknown's avatar
unknown committed
1384
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
1385 1386 1387
  if (! block->requests)
    /* First request for the block unlinks it */
    unlink_block(pagecache, block);
unknown's avatar
unknown committed
1388
  block->requests+= count;
unknown's avatar
unknown committed
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
  DBUG_VOID_RETURN;
}


/*
  Unregister request for a block
  linking it to the LRU chain if it's the last request

  SYNOPSIS
    unreg_request()
    pagecache            pointer to a page cache data structure
    block               pointer to the block to link to the LRU chain
    at_end              <-> to link the block at the end of the LRU chain

  RETURN VALUE
    none

  NOTES.
    Every linking to the LRU chain decrements by one a special block
    counter (if it's positive). If the at_end parameter is TRUE the block is
    added either at the end of warm sub-chain or at the end of hot sub-chain.
    It is added to the hot subchain if its counter is zero and number of
    blocks in warm sub-chain is not less than some low limit (determined by
    the division_limit parameter). Otherwise the block is added to the warm
    sub-chain. If the at_end parameter is FALSE the block is always added
    at beginning of the warm sub-chain.
    Thus a warm block can be promoted to the hot sub-chain when its counter
    becomes zero for the first time.
    At the same time  the block at the very beginning of the hot subchain
    might be moved to the beginning of the warm subchain if it stays untouched
    for a too long time (this time is determined by parameter age_threshold).
*/

static void unreg_request(PAGECACHE *pagecache,
                          PAGECACHE_BLOCK_LINK *block, int at_end)
{
  DBUG_ENTER("unreg_request");
1426
  DBUG_PRINT("enter", ("block 0x%lx (%u)  status: %x  reqs: %u",
unknown's avatar
unknown committed
1427
		       (ulong)block, PCBLOCK_NUMBER(pagecache, block),
unknown's avatar
unknown committed
1428
                       block->status, block->requests));
unknown's avatar
unknown committed
1429
  PCBLOCK_INFO(block);
1430
  DBUG_ASSERT(block->requests > 0);
unknown's avatar
unknown committed
1431 1432 1433 1434 1435 1436 1437 1438 1439
  if (! --block->requests)
  {
    my_bool hot;
    if (block->hits_left)
      block->hits_left--;
    hot= !block->hits_left && at_end &&
      pagecache->warm_blocks > pagecache->min_warm_blocks;
    if (hot)
    {
unknown's avatar
unknown committed
1440
      if (block->temperature == PCBLOCK_WARM)
unknown's avatar
unknown committed
1441
        pagecache->warm_blocks--;
unknown's avatar
unknown committed
1442
      block->temperature= PCBLOCK_HOT;
1443
      KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %lu",
unknown's avatar
unknown committed
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456
                           pagecache->warm_blocks));
    }
    link_block(pagecache, block, hot, (my_bool)at_end);
    block->last_hit_time= pagecache->time;
    pagecache->time++;

    block= pagecache->used_ins;
    /* Check if we should link a hot block to the warm block */
    if (block && pagecache->time - block->last_hit_time >
	pagecache->age_threshold)
    {
      unlink_block(pagecache, block);
      link_block(pagecache, block, 0, 0);
unknown's avatar
unknown committed
1457
      if (block->temperature != PCBLOCK_WARM)
unknown's avatar
unknown committed
1458 1459
      {
        pagecache->warm_blocks++;
unknown's avatar
unknown committed
1460
        block->temperature= PCBLOCK_WARM;
unknown's avatar
unknown committed
1461
      }
1462
      KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %lu",
unknown's avatar
unknown committed
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
                           pagecache->warm_blocks));
    }
  }
  DBUG_VOID_RETURN;
}

/*
  Remove a reader of the page in block
*/

static inline void remove_reader(PAGECACHE_BLOCK_LINK *block)
{
unknown's avatar
unknown committed
1475
  DBUG_ENTER("remove_reader");
unknown's avatar
unknown committed
1476
  PCBLOCK_INFO(block);
1477
  DBUG_ASSERT(block->hash_link->requests > 0);
unknown's avatar
unknown committed
1478
#ifdef THREAD
unknown's avatar
unknown committed
1479 1480
  if (! --block->hash_link->requests && block->condvar)
    pagecache_pthread_cond_signal(block->condvar);
unknown's avatar
unknown committed
1481 1482 1483
#else
  --block->hash_link->requests;
#endif
unknown's avatar
unknown committed
1484
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1485 1486 1487 1488 1489 1490 1491 1492
}


/*
  Wait until the last reader of the page in block
  signals on its termination
*/

unknown's avatar
unknown committed
1493 1494
static inline void wait_for_readers(PAGECACHE *pagecache
                                    __attribute__((unused)),
unknown's avatar
unknown committed
1495 1496 1497 1498 1499 1500 1501
                                    PAGECACHE_BLOCK_LINK *block)
{
#ifdef THREAD
  struct st_my_thread_var *thread= my_thread_var;
  while (block->hash_link->requests)
  {
    KEYCACHE_DBUG_PRINT("wait_for_readers: wait",
1502
                        ("suspend thread: %ld  block: %u",
unknown's avatar
unknown committed
1503
                         thread->id, PCBLOCK_NUMBER(pagecache, block)));
unknown's avatar
unknown committed
1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569
    block->condvar= &thread->suspend;
    pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
    block->condvar= NULL;
  }
#else
  KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0);
#endif
}


/*
  Add a hash link to a bucket in the hash_table
*/

static inline void link_hash(PAGECACHE_HASH_LINK **start,
                             PAGECACHE_HASH_LINK *hash_link)
{
  if (*start)
    (*start)->prev= &hash_link->next;
  hash_link->next= *start;
  hash_link->prev= start;
  *start= hash_link;
}


/*
  Remove a hash link from the hash table
*/

static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
{
  KEYCACHE_DBUG_PRINT("unlink_hash", ("fd: %u  pos_ %lu  #requests=%u",
      (uint) hash_link->file.file, (ulong) hash_link->pageno,
      hash_link->requests));
  KEYCACHE_DBUG_ASSERT(hash_link->requests == 0);
  if ((*hash_link->prev= hash_link->next))
    hash_link->next->prev= hash_link->prev;
  hash_link->block= NULL;
#ifdef THREAD
  if (pagecache->waiting_for_hash_link.last_thread)
  {
    /* Signal that a free hash link has appeared */
    struct st_my_thread_var *last_thread=
                               pagecache->waiting_for_hash_link.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
    PAGECACHE_PAGE *first_page= (PAGECACHE_PAGE *) (first_thread->opt_info);
    struct st_my_thread_var *thread;

    hash_link->file= first_page->file;
    hash_link->pageno= first_page->pageno;
    do
    {
      PAGECACHE_PAGE *page;
      thread= next_thread;
      page= (PAGECACHE_PAGE *) thread->opt_info;
      next_thread= thread->next;
      /*
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if (page->file.file == hash_link->file.file &&
          page->pageno == hash_link->pageno)
      {
        KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id));
        pagecache_pthread_cond_signal(&thread->suspend);
1570
        wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
unknown's avatar
unknown committed
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585
      }
    }
    while (thread != last_thread);
    link_hash(&pagecache->hash_root[PAGECACHE_HASH(pagecache,
                                                   hash_link->file,
                                                   hash_link->pageno)],
              hash_link);
    return;
  }
#else /* THREAD */
  KEYCACHE_DBUG_ASSERT(! (pagecache->waiting_for_hash_link.last_thread));
#endif /* THREAD */
  hash_link->next= pagecache->free_hash_list;
  pagecache->free_hash_list= hash_link;
}
unknown's avatar
unknown committed
1586 1587


unknown's avatar
unknown committed
1588
/*
unknown's avatar
unknown committed
1589 1590
  Get the hash link for the page if it is in the cache (do not put the
  page in the cache if it is absent there)
unknown's avatar
unknown committed
1591 1592 1593 1594 1595 1596

  SYNOPSIS
    get_present_hash_link()
    pagecache            Pagecache reference
    file                 file ID
    pageno               page number in the file
1597
    start                where to put pointer to found hash bucket (for
unknown's avatar
unknown committed
1598 1599 1600 1601
                         direct referring it)

  RETURN
    found hashlink pointer
unknown's avatar
unknown committed
1602 1603 1604 1605
*/

static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
                                                  PAGECACHE_FILE *file,
1606
                                                  pgcache_page_no_t pageno,
unknown's avatar
unknown committed
1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
                                                  PAGECACHE_HASH_LINK ***start)
{
  reg1 PAGECACHE_HASH_LINK *hash_link;
#if defined(PAGECACHE_DEBUG)
  int cnt;
#endif
  DBUG_ENTER("get_present_hash_link");

  KEYCACHE_DBUG_PRINT("get_present_hash_link", ("fd: %u  pos: %lu",
                      (uint) file->file, (ulong) pageno));

  /*
     Find the bucket in the hash table for the pair (file, pageno);
     start contains the head of the bucket list,
     hash_link points to the first member of the list
  */
  hash_link= *(*start= &pagecache->hash_root[PAGECACHE_HASH(pagecache,
                                                            *file, pageno)]);
#if defined(PAGECACHE_DEBUG)
  cnt= 0;
#endif
  /* Look for an element for the pair (file, pageno) in the bucket chain */
  while (hash_link &&
         (hash_link->pageno != pageno ||
          hash_link->file.file != file->file))
  {
    hash_link= hash_link->next;
#if defined(PAGECACHE_DEBUG)
    cnt++;
    if (! (cnt <= pagecache->hash_links_used))
    {
      int i;
      for (i=0, hash_link= **start ;
           i < cnt ; i++, hash_link= hash_link->next)
      {
        KEYCACHE_DBUG_PRINT("get_present_hash_link", ("fd: %u  pos: %lu",
            (uint) hash_link->file.file, (ulong) hash_link->pageno));
      }
    }
    KEYCACHE_DBUG_ASSERT(cnt <= pagecache->hash_links_used);
#endif
  }
1649 1650 1651 1652 1653
  if (hash_link)
  {
    /* Register the request for the page */
    hash_link->requests++;
  }
unknown's avatar
unknown committed
1654 1655 1656 1657
  /*
    As soon as the caller will release the page cache's lock, "hash_link"
    will be potentially obsolete (unusable) information.
  */
unknown's avatar
unknown committed
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
  DBUG_RETURN(hash_link);
}


/*
  Get the hash link for a page
*/

static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
                                          PAGECACHE_FILE *file,
1668
                                          pgcache_page_no_t pageno)
unknown's avatar
unknown committed
1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
{
  reg1 PAGECACHE_HASH_LINK *hash_link;
  PAGECACHE_HASH_LINK **start;

  KEYCACHE_DBUG_PRINT("get_hash_link", ("fd: %u  pos: %lu",
                      (uint) file->file, (ulong) pageno));

restart:
  /* try to find the page in the cache */
  hash_link= get_present_hash_link(pagecache, file, pageno,
                                   &start);
1680
  if (!hash_link)
unknown's avatar
unknown committed
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
  {
    /* There is no hash link in the hash table for the pair (file, pageno) */
    if (pagecache->free_hash_list)
    {
      hash_link= pagecache->free_hash_list;
      pagecache->free_hash_list= hash_link->next;
    }
    else if (pagecache->hash_links_used < pagecache->hash_links)
    {
      hash_link= &pagecache->hash_link_root[pagecache->hash_links_used++];
    }
    else
    {
#ifdef THREAD
      /* Wait for a free hash link */
      struct st_my_thread_var *thread= my_thread_var;
unknown's avatar
unknown committed
1697
      PAGECACHE_PAGE page;
unknown's avatar
unknown committed
1698 1699 1700 1701
      KEYCACHE_DBUG_PRINT("get_hash_link", ("waiting"));
      page.file= *file;
      page.pageno= pageno;
      thread->opt_info= (void *) &page;
1702
      wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread);
unknown's avatar
unknown committed
1703 1704 1705 1706 1707 1708 1709 1710
      KEYCACHE_DBUG_PRINT("get_hash_link: wait",
                        ("suspend thread %ld", thread->id));
      pagecache_pthread_cond_wait(&thread->suspend,
                                 &pagecache->cache_lock);
      thread->opt_info= NULL;
#else
      KEYCACHE_DBUG_ASSERT(0);
#endif
1711
      DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
1712 1713 1714 1715 1716
      goto restart;
    }
    hash_link->file= *file;
    hash_link->pageno= pageno;
    link_hash(start, hash_link);
1717 1718
    /* Register the request for the page */
    hash_link->requests++;
unknown's avatar
unknown committed
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731
  }

  return hash_link;
}


/*
  Get a block for the file page requested by a pagecache read/write operation;
  If the page is not in the cache return a free block, if there is none
  return the lru block after saving its buffer if the page is dirty.

  SYNOPSIS

1732
    find_block()
unknown's avatar
unknown committed
1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
      pagecache            pointer to a page cache data structure
      file                handler for the file to read page from
      pageno              number of the page in the file
      init_hits_left      how initialize the block counter for the page
      wrmode              <-> get for writing
      reg_req             Register request to thye page
      page_st        out  {PAGE_READ,PAGE_TO_BE_READ,PAGE_WAIT_TO_BE_READ}

  RETURN VALUE
    Pointer to the found block if successful, 0 - otherwise

  NOTES.
    For the page from file positioned at pageno the function checks whether
    the page is in the key cache specified by the first parameter.
    If this is the case it immediately returns the block.
    If not, the function first chooses  a block for this page. If there is
    no not used blocks in the key cache yet, the function takes the block
    at the very beginning of the warm sub-chain. It saves the page in that
    block if it's dirty before returning the pointer to it.
    The function returns in the page_st parameter the following values:
      PAGE_READ         - if page already in the block,
      PAGE_TO_BE_READ   - if it is to be read yet by the current thread
      WAIT_TO_BE_READ   - if it is to be read by another thread
unknown's avatar
unknown committed
1756
    If an error occurs THE PCBLOCK_ERROR bit is set in the block status.
unknown's avatar
unknown committed
1757 1758 1759 1760 1761
    It might happen that there are no blocks in LRU chain (in warm part) -
    all blocks  are unlinked for some read/write operations. Then the function
    waits until first of this operations links any block back.
*/

1762 1763 1764 1765 1766 1767 1768
static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
                                        PAGECACHE_FILE *file,
                                        pgcache_page_no_t pageno,
                                        int init_hits_left,
                                        my_bool wrmode,
                                        my_bool reg_req,
                                        int *page_st)
unknown's avatar
unknown committed
1769 1770 1771 1772 1773 1774
{
  PAGECACHE_HASH_LINK *hash_link;
  PAGECACHE_BLOCK_LINK *block;
  int error= 0;
  int page_status;

1775 1776
  DBUG_ENTER("find_block");
  KEYCACHE_THREAD_TRACE("find_block:begin");
1777 1778
  DBUG_PRINT("enter", ("fd: %d  pos: %lu  wrmode: %d",
                       file->file, (ulong) pageno, wrmode));
1779 1780 1781
  KEYCACHE_DBUG_PRINT("find_block", ("fd: %d  pos: %lu  wrmode: %d",
                                     file->file, (ulong) pageno,
                                     wrmode));
unknown's avatar
unknown committed
1782 1783
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
  DBUG_EXECUTE("check_pagecache",
1784
               test_key_cache(pagecache, "start of find_block", 0););
unknown's avatar
unknown committed
1785 1786 1787 1788 1789 1790 1791 1792
#endif

restart:
  /* Find the hash link for the requested page (file, pageno) */
  hash_link= get_hash_link(pagecache, file, pageno);

  page_status= -1;
  if ((block= hash_link->block) &&
unknown's avatar
unknown committed
1793
      block->hash_link == hash_link && (block->status & PCBLOCK_READ))
unknown's avatar
unknown committed
1794 1795 1796 1797 1798 1799 1800 1801 1802
    page_status= PAGE_READ;

  if (wrmode && pagecache->resize_in_flush)
  {
    /* This is a write request during the flush phase of a resize operation */

    if (page_status != PAGE_READ)
    {
      /* We don't need the page in the cache: we are going to write on disk */
1803
      DBUG_ASSERT(hash_link->requests > 0);
unknown's avatar
unknown committed
1804 1805 1806 1807
      hash_link->requests--;
      unlink_hash(pagecache, hash_link);
      return 0;
    }
unknown's avatar
unknown committed
1808
    if (!(block->status & PCBLOCK_IN_FLUSH))
unknown's avatar
unknown committed
1809
    {
1810
      DBUG_ASSERT(hash_link->requests > 0);
unknown's avatar
unknown committed
1811 1812 1813 1814
      hash_link->requests--;
      /*
        Remove block to invalidate the page in the block buffer
        as we are going to write directly on disk.
unknown's avatar
unknown committed
1815 1816
        Although we have an exclusive lock for the updated key part
        the control can be yielded by the current thread as we might
unknown's avatar
unknown committed
1817 1818 1819
        have unfinished readers of other key parts in the block
        buffer. Still we are guaranteed not to have any readers
        of the key part we are writing into until the block is
unknown's avatar
unknown committed
1820
        removed from the cache as we set the PCBLOCK_REASSIGNED
unknown's avatar
unknown committed
1821 1822 1823 1824 1825
        flag (see the code below that handles reading requests).
      */
      free_block(pagecache, block);
      return 0;
    }
unknown's avatar
unknown committed
1826
    /* Wait until the page is flushed on disk */
1827
    DBUG_ASSERT(hash_link->requests > 0);
unknown's avatar
unknown committed
1828 1829 1830 1831
    hash_link->requests--;
    {
#ifdef THREAD
      struct st_my_thread_var *thread= my_thread_var;
1832
      wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
unknown's avatar
unknown committed
1833 1834
      do
      {
1835
        KEYCACHE_DBUG_PRINT("find_block: wait",
unknown's avatar
unknown committed
1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858
                            ("suspend thread %ld", thread->id));
        pagecache_pthread_cond_wait(&thread->suspend,
                                   &pagecache->cache_lock);
      }
      while(thread->next);
#else
      KEYCACHE_DBUG_ASSERT(0);
      /*
        Given the use of "resize_in_flush", it seems impossible
        that this whole branch is ever entered in single-threaded case
        because "(wrmode && pagecache->resize_in_flush)" cannot be true.
        TODO: Check this, and then put the whole branch into the
        "#ifdef THREAD" guard.
      */
#endif
    }
    /* Invalidate page in the block if it has not been done yet */
    if (block->status)
      free_block(pagecache, block);
    return 0;
  }

  if (page_status == PAGE_READ &&
unknown's avatar
unknown committed
1859
      (block->status & (PCBLOCK_IN_SWITCH | PCBLOCK_REASSIGNED)))
unknown's avatar
unknown committed
1860 1861 1862
  {
    /* This is a request for a page to be removed from cache */

1863
    KEYCACHE_DBUG_PRINT("find_block",
1864
                        ("request for old page in block: %u  "
unknown's avatar
unknown committed
1865
                         "wrmode: %d  block->status: %d",
unknown's avatar
unknown committed
1866
                         PCBLOCK_NUMBER(pagecache, block), wrmode,
unknown's avatar
unknown committed
1867 1868 1869 1870 1871
                         block->status));
    /*
       Only reading requests can proceed until the old dirty page is flushed,
       all others are to be suspended, then resubmitted
    */
unknown's avatar
unknown committed
1872
    if (!wrmode && !(block->status & PCBLOCK_REASSIGNED))
unknown's avatar
unknown committed
1873 1874 1875 1876 1877 1878
    {
      if (reg_req)
        reg_requests(pagecache, block, 1);
    }
    else
    {
1879
      DBUG_ASSERT(hash_link->requests > 0);
unknown's avatar
unknown committed
1880
      hash_link->requests--;
1881
      KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
1882 1883 1884 1885 1886
                          ("request waiting for old page to be saved"));
      {
#ifdef THREAD
        struct st_my_thread_var *thread= my_thread_var;
        /* Put the request into the queue of those waiting for the old page */
1887
        wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
unknown's avatar
unknown committed
1888 1889 1890
        /* Wait until the request can be resubmitted */
        do
        {
1891
          KEYCACHE_DBUG_PRINT("find_block: wait",
unknown's avatar
unknown committed
1892 1893 1894 1895 1896 1897 1898 1899 1900 1901
                              ("suspend thread %ld", thread->id));
          pagecache_pthread_cond_wait(&thread->suspend,
                                     &pagecache->cache_lock);
        }
        while(thread->next);
#else
        KEYCACHE_DBUG_ASSERT(0);
          /* No parallel requests in single-threaded case */
#endif
      }
1902
      KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
1903
                          ("request for old page resubmitted"));
1904
      DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
1905 1906 1907
      /* Resubmit the request */
      goto restart;
    }
unknown's avatar
unknown committed
1908
    block->status&= ~PCBLOCK_IN_SWITCH;
unknown's avatar
unknown committed
1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931
  }
  else
  {
    /* This is a request for a new page or for a page not to be removed */
    if (! block)
    {
      /* No block is assigned for the page yet */
      if (pagecache->blocks_unused)
      {
        if (pagecache->free_block_list)
        {
          /* There is a block in the free list. */
          block= pagecache->free_block_list;
          pagecache->free_block_list= block->next_used;
          block->next_used= NULL;
        }
        else
        {
          /* There are some never used blocks, take first of them */
          block= &pagecache->block_root[pagecache->blocks_used];
          block->buffer= ADD_TO_PTR(pagecache->block_mem,
                                    ((ulong) pagecache->blocks_used*
                                     pagecache->block_size),
unknown's avatar
unknown committed
1932
                                    uchar*);
unknown's avatar
unknown committed
1933 1934 1935
          pagecache->blocks_used++;
        }
        pagecache->blocks_unused--;
1936
        DBUG_ASSERT(block->wlocks == 0);
1937
        DBUG_ASSERT(block->pins == 0);
unknown's avatar
unknown committed
1938 1939 1940 1941 1942
        block->status= 0;
#ifndef DBUG_OFF
        block->type= PAGECACHE_EMPTY_PAGE;
#endif
        block->requests= 1;
unknown's avatar
unknown committed
1943
        block->temperature= PCBLOCK_COLD;
unknown's avatar
unknown committed
1944 1945
        block->hits_left= init_hits_left;
        block->last_hit_time= 0;
unknown's avatar
unknown committed
1946
        block->rec_lsn= LSN_MAX;
unknown's avatar
unknown committed
1947 1948 1949 1950
        link_to_file_list(pagecache, block, file, 0);
        block->hash_link= hash_link;
        hash_link->block= block;
        page_status= PAGE_TO_BE_READ;
1951 1952 1953
        DBUG_PRINT("info", ("page to be read set for page 0x%lx",
                            (ulong)block));
        KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
1954
                            ("got free or never used block %u",
unknown's avatar
unknown committed
1955
                             PCBLOCK_NUMBER(pagecache, block)));
unknown's avatar
unknown committed
1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971
      }
      else
      {
	/* There are no never used blocks, use a block from the LRU chain */

        /*
          Wait until a new block is added to the LRU chain;
          several threads might wait here for the same page,
          all of them must get the same block
        */

#ifdef THREAD
        if (! pagecache->used_last)
        {
          struct st_my_thread_var *thread= my_thread_var;
          thread->opt_info= (void *) hash_link;
1972
          wqueue_link_into_queue(&pagecache->waiting_for_block, thread);
unknown's avatar
unknown committed
1973 1974
          do
          {
1975
            KEYCACHE_DBUG_PRINT("find_block: wait",
unknown's avatar
unknown committed
1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996
                                ("suspend thread %ld", thread->id));
            pagecache_pthread_cond_wait(&thread->suspend,
                                       &pagecache->cache_lock);
          }
          while (thread->next);
          thread->opt_info= NULL;
        }
#else
        KEYCACHE_DBUG_ASSERT(pagecache->used_last);
#endif
        block= hash_link->block;
        if (! block)
        {
          /*
             Take the first block from the LRU chain
             unlinking it from the chain
          */
          block= pagecache->used_last->next_used;
          block->hits_left= init_hits_left;
          block->last_hit_time= 0;
	  if (reg_req)
1997
            reg_requests(pagecache, block, 1);
unknown's avatar
unknown committed
1998 1999
          hash_link->block= block;
        }
unknown's avatar
unknown committed
2000
        PCBLOCK_INFO(block);
2001
        DBUG_ASSERT(block->wlocks == 0);
2002
        DBUG_ASSERT(block->pins == 0);
unknown's avatar
unknown committed
2003 2004

        if (block->hash_link != hash_link &&
unknown's avatar
unknown committed
2005
	    ! (block->status & PCBLOCK_IN_SWITCH) )
unknown's avatar
unknown committed
2006 2007
        {
	  /* this is a primary request for a new page */
2008
          DBUG_ASSERT(block->wlocks == 0);
2009
          DBUG_ASSERT(block->pins == 0);
2010
          block->status|= PCBLOCK_IN_SWITCH;
unknown's avatar
unknown committed
2011

2012
          KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
2013
                              ("got block %u for new page",
unknown's avatar
unknown committed
2014
                               PCBLOCK_NUMBER(pagecache, block)));
unknown's avatar
unknown committed
2015

unknown's avatar
unknown committed
2016
          if (block->status & PCBLOCK_CHANGED)
unknown's avatar
unknown committed
2017 2018 2019
          {
	    /* The block contains a dirty page - push it out of the cache */

2020
            KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
unknown's avatar
unknown committed
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032

            pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
            /*
	      The call is thread safe because only the current
	      thread might change the block->hash_link value
            */
            DBUG_ASSERT(block->pins == 0);
            error= pagecache_fwrite(pagecache,
                                    &block->hash_link->file,
                                    block->buffer,
                                    block->hash_link->pageno,
                                    block->type,
2033
                                    pagecache->readwrite_flags);
unknown's avatar
unknown committed
2034 2035 2036 2037
            pagecache_pthread_mutex_lock(&pagecache->cache_lock);
	    pagecache->global_cache_write++;
          }

unknown's avatar
unknown committed
2038
          block->status|= PCBLOCK_REASSIGNED;
unknown's avatar
unknown committed
2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
          if (block->hash_link)
          {
            /*
	      Wait until all pending read requests
	      for this page are executed
	      (we could have avoided this waiting, if we had read
	      a page in the cache in a sweep, without yielding control)
            */
            wait_for_readers(pagecache, block);

            /* Remove the hash link for this page from the hash table */
            unlink_hash(pagecache, block->hash_link);
            /* All pending requests for this page must be resubmitted */
unknown's avatar
unknown committed
2052
#ifdef THREAD
unknown's avatar
unknown committed
2053
            if (block->wqueue[COND_FOR_SAVED].last_thread)
2054
              wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
unknown's avatar
unknown committed
2055
#endif
unknown's avatar
unknown committed
2056 2057 2058
          }
          link_to_file_list(pagecache, block, file,
                            (my_bool)(block->hash_link ? 1 : 0));
unknown's avatar
unknown committed
2059 2060
          PCBLOCK_INFO(block);
          block->status= error? PCBLOCK_ERROR : 0;
unknown's avatar
unknown committed
2061 2062 2063 2064 2065
#ifndef DBUG_OFF
          block->type= PAGECACHE_EMPTY_PAGE;
#endif
          block->hash_link= hash_link;
          page_status= PAGE_TO_BE_READ;
2066 2067
          DBUG_PRINT("info", ("page to be read set for page 0x%lx",
                              (ulong)block));
unknown's avatar
unknown committed
2068 2069 2070 2071 2072 2073 2074

          KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
          KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
        }
        else
        {
          /* This is for secondary requests for a new page only */
2075
          KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
2076 2077 2078 2079
                              ("block->hash_link: %p  hash_link: %p  "
                               "block->status: %u", block->hash_link,
                               hash_link, block->status ));
          page_status= (((block->hash_link == hash_link) &&
unknown's avatar
unknown committed
2080
                         (block->status & PCBLOCK_READ)) ?
unknown's avatar
unknown committed
2081 2082 2083 2084 2085 2086 2087 2088 2089
                        PAGE_READ : PAGE_WAIT_TO_BE_READ);
        }
      }
      pagecache->global_cache_read++;
    }
    else
    {
      if (reg_req)
	reg_requests(pagecache, block, 1);
2090
      KEYCACHE_DBUG_PRINT("find_block",
unknown's avatar
unknown committed
2091 2092 2093 2094
                          ("block->hash_link: %p  hash_link: %p  "
                           "block->status: %u", block->hash_link,
                           hash_link, block->status ));
      page_status= (((block->hash_link == hash_link) &&
unknown's avatar
unknown committed
2095
                     (block->status & PCBLOCK_READ)) ?
unknown's avatar
unknown committed
2096 2097 2098 2099 2100
                    PAGE_READ : PAGE_WAIT_TO_BE_READ);
    }
  }

  KEYCACHE_DBUG_ASSERT(page_status != -1);
2101
  *page_st= page_status;
unknown's avatar
unknown committed
2102
  DBUG_PRINT("info",
2103
             ("block: 0x%lx  fd: %u  pos: %lu  block->status: %u  page_status: %u",
unknown's avatar
unknown committed
2104 2105
              (ulong) block, (uint) file->file,
              (ulong) pageno, block->status, (uint) page_status));
2106
  KEYCACHE_DBUG_PRINT("find_block",
2107
                      ("block: 0x%lx  fd: %d  pos: %lu  block->status: %u  page_status: %d",
unknown's avatar
unknown committed
2108
                       (ulong) block,
2109 2110
                       file->file, (ulong) pageno, block->status,
                       page_status));
unknown's avatar
unknown committed
2111 2112 2113

#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
  DBUG_EXECUTE("check_pagecache",
2114
               test_key_cache(pagecache, "end of find_block",0););
unknown's avatar
unknown committed
2115
#endif
2116
  KEYCACHE_THREAD_TRACE("find_block:end");
unknown's avatar
unknown committed
2117 2118 2119 2120
  DBUG_RETURN(block);
}


2121
static void add_pin(PAGECACHE_BLOCK_LINK *block)
unknown's avatar
unknown committed
2122
{
2123
  DBUG_ENTER("add_pin");
2124
  DBUG_PRINT("enter", ("block: 0x%lx  pins: %u",
unknown's avatar
unknown committed
2125 2126
                       (ulong) block,
                       block->pins));
unknown's avatar
unknown committed
2127
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2128
  block->pins++;
unknown's avatar
unknown committed
2129
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
  {
    PAGECACHE_PIN_INFO *info=
      (PAGECACHE_PIN_INFO *)my_malloc(sizeof(PAGECACHE_PIN_INFO), MYF(0));
    info->thread= my_thread_var;
    info_link(&block->pin_list, info);
  }
#endif
  DBUG_VOID_RETURN;
}

2140
static void remove_pin(PAGECACHE_BLOCK_LINK *block)
unknown's avatar
unknown committed
2141
{
2142
  DBUG_ENTER("remove_pin");
2143
  DBUG_PRINT("enter", ("block: 0x%lx  pins: %u",
unknown's avatar
unknown committed
2144 2145
                       (ulong) block,
                       block->pins));
unknown's avatar
unknown committed
2146
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2147 2148
  DBUG_ASSERT(block->pins > 0);
  block->pins--;
unknown's avatar
unknown committed
2149
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2150 2151 2152 2153
  {
    PAGECACHE_PIN_INFO *info= info_find(block->pin_list, my_thread_var);
    DBUG_ASSERT(info != 0);
    info_unlink(info);
unknown's avatar
unknown committed
2154
    my_free((uchar*) info, MYF(0));
unknown's avatar
unknown committed
2155 2156 2157 2158
  }
#endif
  DBUG_VOID_RETURN;
}
unknown's avatar
unknown committed
2159
#ifndef DBUG_OFF
2160
static void info_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
unknown's avatar
unknown committed
2161 2162 2163 2164 2165 2166 2167 2168
{
  PAGECACHE_LOCK_INFO *info=
    (PAGECACHE_LOCK_INFO *)my_malloc(sizeof(PAGECACHE_LOCK_INFO), MYF(0));
  info->thread= my_thread_var;
  info->write_lock= wl;
  info_link((PAGECACHE_PIN_INFO **)&block->lock_list,
	    (PAGECACHE_PIN_INFO *)info);
}
2169
static void info_remove_lock(PAGECACHE_BLOCK_LINK *block)
unknown's avatar
unknown committed
2170 2171 2172 2173 2174 2175
{
  PAGECACHE_LOCK_INFO *info=
    (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
                                     my_thread_var);
  DBUG_ASSERT(info != 0);
  info_unlink((PAGECACHE_PIN_INFO *)info);
unknown's avatar
unknown committed
2176
  my_free((uchar*)info, MYF(0));
unknown's avatar
unknown committed
2177
}
2178
static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
unknown's avatar
unknown committed
2179 2180 2181 2182
{
  PAGECACHE_LOCK_INFO *info=
    (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
                                     my_thread_var);
unknown's avatar
unknown committed
2183 2184
  DBUG_ASSERT(info != 0);
  DBUG_ASSERT(info->write_lock != wl);
unknown's avatar
unknown committed
2185 2186 2187
  info->write_lock= wl;
}
#else
2188 2189 2190
#define info_add_lock(B,W)
#define info_remove_lock(B)
#define info_change_lock(B,W)
unknown's avatar
unknown committed
2191 2192 2193
#endif

/*
2194
  Put on the block write lock
unknown's avatar
unknown committed
2195 2196

  SYNOPSIS
2197
    get_wrlock()
unknown's avatar
unknown committed
2198 2199
    pagecache            pointer to a page cache data structure
    block                the block to work with
2200 2201 2202
    user_file		 Unique handler per handler file. Used to check if
			 we request many write locks withing the same
                         statement
unknown's avatar
unknown committed
2203 2204 2205

  RETURN
    0 - OK
2206
    1 - Can't lock this block, need retry
unknown's avatar
unknown committed
2207 2208
*/

2209
static my_bool get_wrlock(PAGECACHE *pagecache,
2210 2211
                          PAGECACHE_BLOCK_LINK *block,
                          PAGECACHE_FILE *user_file)
2212 2213 2214 2215 2216 2217 2218 2219 2220
{
  PAGECACHE_FILE file= block->hash_link->file;
  pgcache_page_no_t pageno= block->hash_link->pageno;
  DBUG_ENTER("get_wrlock");
  DBUG_PRINT("info", ("the block 0x%lx "
                          "files %d(%d)  pages %d(%d)",
                          (ulong)block,
                          file.file, block->hash_link->file.file,
                          pageno, block->hash_link->pageno));
unknown's avatar
unknown committed
2221
  PCBLOCK_INFO(block);
2222
  while (block->wlocks && block->write_locker != user_file)
unknown's avatar
unknown committed
2223 2224 2225 2226
  {
    /* Lock failed we will wait */
#ifdef THREAD
    struct st_my_thread_var *thread= my_thread_var;
2227
    DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block));
2228
    wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread);
unknown's avatar
unknown committed
2229 2230 2231
    dec_counter_for_resize_op(pagecache);
    do
    {
2232
      KEYCACHE_DBUG_PRINT("get_wrlock: wait",
unknown's avatar
unknown committed
2233 2234 2235 2236 2237 2238 2239 2240
                          ("suspend thread %ld", thread->id));
      pagecache_pthread_cond_wait(&thread->suspend,
                                  &pagecache->cache_lock);
    }
    while(thread->next);
#else
    DBUG_ASSERT(0);
#endif
unknown's avatar
unknown committed
2241 2242
    PCBLOCK_INFO(block);
    if ((block->status & (PCBLOCK_REASSIGNED | PCBLOCK_IN_SWITCH)) ||
2243 2244 2245 2246 2247 2248 2249 2250 2251 2252
        file.file != block->hash_link->file.file ||
        pageno != block->hash_link->pageno)
    {
      DBUG_PRINT("info", ("the block 0x%lx changed => need retry"
                          "status  %x files %d != %d or pages %d !=%d",
                          (ulong)block, block->status,
                          file.file, block->hash_link->file.file,
                          pageno, block->hash_link->pageno));
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2253
  }
2254
  /* we are doing it by global cache mutex protection, so it is OK */
2255
  block->wlocks++;
2256
  block->write_locker= user_file;
unknown's avatar
unknown committed
2257
  DBUG_PRINT("info", ("WR lock set, block 0x%lx", (ulong)block));
unknown's avatar
unknown committed
2258 2259 2260
  DBUG_RETURN(0);
}

2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274

/*
  Remove write lock from the block

  SYNOPSIS
    release_wrlock()
    pagecache            pointer to a page cache data structure
    block                the block to work with

  RETURN
    0 - OK
*/

static void release_wrlock(PAGECACHE_BLOCK_LINK *block)
unknown's avatar
unknown committed
2275
{
2276
  DBUG_ENTER("release_wrlock");
unknown's avatar
unknown committed
2277
  PCBLOCK_INFO(block);
2278
  DBUG_ASSERT(block->wlocks > 0);
2279
  DBUG_ASSERT(block->pins > 0);
2280 2281
  block->wlocks--;
  if (block->wlocks > 0)
2282
    DBUG_VOID_RETURN;                      /* Multiple write locked */
unknown's avatar
unknown committed
2283
  DBUG_PRINT("info", ("WR lock reset, block 0x%lx", (ulong)block));
unknown's avatar
unknown committed
2284 2285 2286
#ifdef THREAD
  /* release all threads waiting for write lock */
  if (block->wqueue[COND_FOR_WRLOCK].last_thread)
2287
    wqueue_release_queue(&block->wqueue[COND_FOR_WRLOCK]);
unknown's avatar
unknown committed
2288
#endif
unknown's avatar
unknown committed
2289
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2290 2291 2292
  DBUG_VOID_RETURN;
}

2293

unknown's avatar
unknown committed
2294
/*
2295
  Try to lock/unlock and pin/unpin the block
unknown's avatar
unknown committed
2296 2297

  SYNOPSIS
2298
    make_lock_and_pin()
unknown's avatar
unknown committed
2299 2300 2301 2302
    pagecache            pointer to a page cache data structure
    block                the block to work with
    lock                 lock change mode
    pin                  pinchange mode
2303
    file		 File handler requesting pin
unknown's avatar
unknown committed
2304 2305 2306 2307 2308 2309

  RETURN
    0 - OK
    1 - Try to lock the block failed
*/

2310 2311 2312
static my_bool make_lock_and_pin(PAGECACHE *pagecache,
                                 PAGECACHE_BLOCK_LINK *block,
                                 enum pagecache_page_lock lock,
2313 2314
                                 enum pagecache_page_pin pin,
                                 PAGECACHE_FILE *file)
unknown's avatar
unknown committed
2315
{
2316
  DBUG_ENTER("make_lock_and_pin");
unknown's avatar
unknown committed
2317 2318 2319 2320 2321

  DBUG_PRINT("enter", ("block: 0x%lx", (ulong)block));
#ifndef DBUG_OFF
  if (block)
  {
2322
    DBUG_PRINT("enter", ("block: 0x%lx (%u)  wrlocks: %u  pins: %u  lock: %s  pin: %s",
2323
                         (ulong)block, PCBLOCK_NUMBER(pagecache, block),
2324
                         block->wlocks,
unknown's avatar
unknown committed
2325 2326 2327
                         block->pins,
                         page_cache_page_lock_str[lock],
                         page_cache_page_pin_str[pin]));
unknown's avatar
unknown committed
2328
    PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2329 2330
  }
#endif
2331

2332
  switch (lock) {
unknown's avatar
unknown committed
2333 2334
  case PAGECACHE_LOCK_WRITE:               /* free  -> write */
    /* Writelock and pin the buffer */
2335
    if (get_wrlock(pagecache, block, file))
unknown's avatar
unknown committed
2336
    {
2337 2338
      /* can't lock => need retry */
      goto retry;
unknown's avatar
unknown committed
2339
    }
2340 2341 2342 2343

    /* The cache is locked so nothing afraid of */
    add_pin(block);
    info_add_lock(block, 1);
unknown's avatar
unknown committed
2344 2345 2346 2347
    break;
  case PAGECACHE_LOCK_WRITE_TO_READ:       /* write -> read  */
  case PAGECACHE_LOCK_WRITE_UNLOCK:        /* write -> free  */
    /*
2348
      Removes write lock and puts read lock (which is nothing in our
unknown's avatar
unknown committed
2349 2350
      implementation)
    */
2351
    release_wrlock(block);
2352
    /* fall through */
unknown's avatar
unknown committed
2353 2354 2355 2356
  case PAGECACHE_LOCK_READ_UNLOCK:         /* read  -> free  */
  case PAGECACHE_LOCK_LEFT_READLOCKED:     /* read  -> read  */
    if (pin == PAGECACHE_UNPIN)
    {
2357
      remove_pin(block);
unknown's avatar
unknown committed
2358 2359 2360
    }
    if (lock == PAGECACHE_LOCK_WRITE_TO_READ)
    {
2361
      info_change_lock(block, 0);
2362 2363 2364
    }
    else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
             lock == PAGECACHE_LOCK_READ_UNLOCK)
unknown's avatar
unknown committed
2365
    {
2366
      info_remove_lock(block);
unknown's avatar
unknown committed
2367 2368 2369 2370 2371 2372
    }
    break;
  case PAGECACHE_LOCK_READ:                /* free  -> read  */
    if (pin == PAGECACHE_PIN)
    {
      /* The cache is locked so nothing afraid off */
2373
      add_pin(block);
unknown's avatar
unknown committed
2374
    }
2375
    info_add_lock(block, 0);
unknown's avatar
unknown committed
2376 2377 2378 2379 2380 2381 2382 2383
    break;
  case PAGECACHE_LOCK_LEFT_UNLOCKED:       /* free  -> free  */
  case PAGECACHE_LOCK_LEFT_WRITELOCKED:    /* write -> write */
    break; /* do nothing */
  default:
    DBUG_ASSERT(0); /* Never should happened */
  }

unknown's avatar
unknown committed
2384 2385
#ifndef DBUG_OFF
  if (block)
2386
    PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2387
#endif
unknown's avatar
unknown committed
2388
  DBUG_RETURN(0);
2389 2390
retry:
  DBUG_PRINT("INFO", ("Retry block 0x%lx", (ulong)block));
unknown's avatar
unknown committed
2391
  PCBLOCK_INFO(block);
2392
  DBUG_ASSERT(block->hash_link->requests > 0);
2393
  block->hash_link->requests--;
2394
  DBUG_ASSERT(block->requests > 0);
2395
  unreg_request(pagecache, block, 1);
unknown's avatar
unknown committed
2396
  PCBLOCK_INFO(block);
2397 2398
  DBUG_RETURN(1);

unknown's avatar
unknown committed
2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418
}


/*
  Read into a key cache block buffer from disk.

  SYNOPSIS

    read_block()
      pagecache           pointer to a page cache data structure
      block               block to which buffer the data is to be read
      primary             <-> the current thread will read the data

  RETURN VALUE
    None

  NOTES.
    The function either reads a page data from file to the block buffer,
    or waits until another thread reads it. What page to read is determined
    by a block parameter - reference to a hash link for this page.
unknown's avatar
unknown committed
2419
    If an error occurs THE PCBLOCK_ERROR bit is set in the block status.
2420 2421

    On entry cache_lock is locked
unknown's avatar
unknown committed
2422 2423 2424 2425
*/

static void read_block(PAGECACHE *pagecache,
                       PAGECACHE_BLOCK_LINK *block,
unknown's avatar
unknown committed
2426
                       my_bool primary)
unknown's avatar
unknown committed
2427 2428
{

2429
  DBUG_ENTER("read_block");
unknown's avatar
unknown committed
2430 2431
  if (primary)
  {
2432
    size_t error;
unknown's avatar
unknown committed
2433 2434 2435 2436 2437
    /*
      This code is executed only by threads
      that submitted primary requests
    */

2438 2439
    DBUG_PRINT("read_block",
               ("page to be read by primary request"));
unknown's avatar
unknown committed
2440 2441 2442 2443 2444 2445 2446

    /* Page is not in buffer yet, is to be read from disk */
    pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
    /*
      Here other threads may step in and register as secondary readers.
      They will register in block->wqueue[COND_FOR_REQUESTED].
    */
2447 2448 2449 2450
    error= pagecache_fread(pagecache, &block->hash_link->file,
                           block->buffer,
                           block->hash_link->pageno,
                           pagecache->readwrite_flags);
unknown's avatar
unknown committed
2451
    pagecache_pthread_mutex_lock(&pagecache->cache_lock);
2452
    if (error)
unknown's avatar
unknown committed
2453
      block->status|= PCBLOCK_ERROR;
unknown's avatar
unknown committed
2454
    else
unknown's avatar
unknown committed
2455
    {
2456 2457 2458 2459 2460 2461 2462 2463 2464
      block->status|= PCBLOCK_READ;
      if ((*block->hash_link->file.read_callback)(block->buffer,
                                                  block->hash_link->pageno,
                                                  block->hash_link->
                                                  file.callback_data))
      {
        DBUG_PRINT("error", ("read callback problem"));
        block->status|= PCBLOCK_ERROR;
      }
unknown's avatar
unknown committed
2465
    }
2466 2467
    DBUG_PRINT("read_block",
               ("primary request: new page in cache"));
unknown's avatar
unknown committed
2468
    /* Signal that all pending requests for this page now can be processed */
unknown's avatar
unknown committed
2469
#ifdef THREAD
unknown's avatar
unknown committed
2470
    if (block->wqueue[COND_FOR_REQUESTED].last_thread)
2471
      wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
unknown's avatar
unknown committed
2472
#endif
unknown's avatar
unknown committed
2473 2474 2475 2476 2477 2478 2479
  }
  else
  {
    /*
      This code is executed only by threads
      that submitted secondary requests
    */
2480 2481
    DBUG_PRINT("read_block",
               ("secondary request waiting for new page to be read"));
unknown's avatar
unknown committed
2482 2483 2484 2485
    {
#ifdef THREAD
      struct st_my_thread_var *thread= my_thread_var;
      /* Put the request into a queue and wait until it can be processed */
2486
      wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
unknown's avatar
unknown committed
2487 2488
      do
      {
2489 2490
        DBUG_PRINT("read_block: wait",
                  ("suspend thread %ld", thread->id));
unknown's avatar
unknown committed
2491 2492 2493 2494 2495 2496 2497 2498 2499
        pagecache_pthread_cond_wait(&thread->suspend,
                                   &pagecache->cache_lock);
      }
      while (thread->next);
#else
      KEYCACHE_DBUG_ASSERT(0);
      /* No parallel requests in single-threaded case */
#endif
    }
2500 2501
    DBUG_PRINT("read_block",
               ("secondary request: new page in cache"));
unknown's avatar
unknown committed
2502
  }
2503
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2504 2505 2506
}


2507 2508
/**
   @brief Set LSN on the page to the given one if the given LSN is bigger
2509

2510 2511 2512
   @param  pagecache        pointer to a page cache data structure
   @param  lsn              LSN to set
   @param  block            block to check and set
2513 2514
*/

2515 2516
static void check_and_set_lsn(PAGECACHE *pagecache,
                              LSN lsn, PAGECACHE_BLOCK_LINK *block)
2517 2518 2519
{
  LSN old;
  DBUG_ENTER("check_and_set_lsn");
2520 2521 2522 2523 2524 2525
  /*
    In recovery, we can _ma_unpin_all_pages() to put a LSN on page, though
    page would be PAGECACHE_PLAIN_PAGE (transactionality temporarily disabled
    to not log REDOs).
  */
  DBUG_ASSERT((block->type == PAGECACHE_LSN_PAGE) || maria_in_recovery);
unknown's avatar
unknown committed
2526
  old= lsn_korr(block->buffer);
2527
  DBUG_PRINT("info", ("old lsn: (%lu, 0x%lx)  new lsn: (%lu, 0x%lx)",
unknown's avatar
unknown committed
2528
                      LSN_IN_PARTS(old), LSN_IN_PARTS(lsn)));
2529
  if (cmp_translog_addr(lsn, old) > 0)
2530 2531 2532
  {

    DBUG_ASSERT(block->type != PAGECACHE_READ_UNKNOWN_PAGE);
unknown's avatar
unknown committed
2533
    lsn_store(block->buffer, lsn);
2534 2535 2536
    /* we stored LSN in page so we dirtied it */
    if (!(block->status & PCBLOCK_CHANGED))
      link_to_changed_list(pagecache, block);
2537
  }
2538 2539 2540 2541
  DBUG_VOID_RETURN;
}


2542 2543 2544 2545 2546 2547 2548 2549 2550 2551
/**
  @brief Unlock/unpin page and put LSN stamp if it need

  @param pagecache      pointer to a page cache data structure
  @pagam file           handler for the file for the block of data to be read
  @param pageno         number of the block of data in the file
  @param lock           lock change
  @param pin            pin page
  @param first_REDO_LSN_for_page do not set it if it is zero
  @param lsn            if it is not LSN_IMPOSSIBLE (0) and it
2552 2553
                        is bigger then LSN on the page it will be written on
                        the page
2554 2555
  @param was_changed    should be true if the page was write locked with
                        direct link giving and the page was changed
2556

2557
  @note
2558 2559 2560 2561 2562 2563 2564 2565 2566 2567
    Pininig uses requests registration mechanism it works following way:
                                | beginnig    | ending        |
                                | of func.    | of func.      |
    ----------------------------+-------------+---------------+
    PAGECACHE_PIN_LEFT_PINNED   |      -      |       -       |
    PAGECACHE_PIN_LEFT_UNPINNED | reg request | unreg request |
    PAGECACHE_PIN               | reg request |       -       |
    PAGECACHE_UNPIN             |      -      | unreg request |


unknown's avatar
unknown committed
2568 2569
*/

unknown's avatar
unknown committed
2570 2571 2572 2573 2574
void pagecache_unlock(PAGECACHE *pagecache,
                      PAGECACHE_FILE *file,
                      pgcache_page_no_t pageno,
                      enum pagecache_page_lock lock,
                      enum pagecache_page_pin pin,
2575
                      LSN first_REDO_LSN_for_page,
2576
                      LSN lsn, my_bool was_changed)
unknown's avatar
unknown committed
2577 2578 2579
{
  PAGECACHE_BLOCK_LINK *block;
  int page_st;
unknown's avatar
unknown committed
2580
  DBUG_ENTER("pagecache_unlock");
2581
  DBUG_PRINT("enter", ("fd: %u  page: %lu  %s  %s",
unknown's avatar
unknown committed
2582 2583 2584 2585
                       (uint) file->file, (ulong) pageno,
                       page_cache_page_lock_str[lock],
                       page_cache_page_pin_str[pin]));
  /* we do not allow any lock/pin increasing here */
unknown's avatar
unknown committed
2586 2587 2588
  DBUG_ASSERT(pin != PAGECACHE_PIN);
  DBUG_ASSERT(lock != PAGECACHE_LOCK_READ);
  DBUG_ASSERT(lock != PAGECACHE_LOCK_WRITE);
unknown's avatar
unknown committed
2589 2590 2591

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  /*
2592 2593
    As soon as we keep lock cache can be used, and we have lock because want
    to unlock.
unknown's avatar
unknown committed
2594 2595 2596 2597
  */
  DBUG_ASSERT(pagecache->can_be_used);

  inc_counter_for_resize_op(pagecache);
unknown's avatar
unknown committed
2598
  /* See NOTE for pagecache_unlock about registering requests */
2599 2600
  block= find_block(pagecache, file, pageno, 0, 0,
                    test(pin == PAGECACHE_PIN_LEFT_UNPINNED), &page_st);
unknown's avatar
unknown committed
2601
  PCBLOCK_INFO(block);
unknown's avatar
unknown committed
2602
  DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
2603
  if (first_REDO_LSN_for_page)
unknown's avatar
unknown committed
2604
  {
unknown's avatar
unknown committed
2605 2606
    DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK);
    DBUG_ASSERT(pin == PAGECACHE_UNPIN);
2607
    pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
unknown's avatar
unknown committed
2608
  }
2609 2610
  if (lsn != LSN_IMPOSSIBLE)
    check_and_set_lsn(pagecache, lsn, block);
unknown's avatar
unknown committed
2611

2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631
  /* if we lock for write we must link the block to changed blocks */
  DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0 ||
              (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
               lock == PAGECACHE_LOCK_LEFT_WRITELOCKED));
  /*
    if was_changed then status should be PCBLOCK_DIRECT_W or marked
    as dirty
  */
  DBUG_ASSERT(!was_changed || (block->status & PCBLOCK_DIRECT_W) ||
              (block->status & PCBLOCK_CHANGED));
  if ((block->status & PCBLOCK_DIRECT_W) &&
      (lock == PAGECACHE_LOCK_WRITE_UNLOCK))
  {
    if (!(block->status & PCBLOCK_CHANGED) && was_changed)
      link_to_changed_list(pagecache, block);
    block->status&= ~PCBLOCK_DIRECT_W;
    DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: 0x%lx",
                        (ulong) block));
  }

2632
  if (make_lock_and_pin(pagecache, block, lock, pin, file))
unknown's avatar
unknown committed
2633 2634 2635 2636 2637 2638 2639
  {
    DBUG_ASSERT(0); /* should not happend */
  }

  remove_reader(block);
  /*
    Link the block into the LRU chain if it's the last submitted request
2640
    for the block and block will not be pinned.
unknown's avatar
unknown committed
2641
    See NOTE for pagecache_unlock about registering requests.
unknown's avatar
unknown committed
2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657
  */
  if (pin != PAGECACHE_PIN_LEFT_PINNED)
    unreg_request(pagecache, block, 1);

  dec_counter_for_resize_op(pagecache);

  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

  DBUG_VOID_RETURN;
}


/*
  Unpin page

  SYNOPSIS
unknown's avatar
unknown committed
2658
    pagecache_unpin()
unknown's avatar
unknown committed
2659 2660 2661
    pagecache           pointer to a page cache data structure
    file                handler for the file for the block of data to be read
    pageno              number of the block of data in the file
unknown's avatar
unknown committed
2662
    lsn                 if it is not LSN_IMPOSSIBLE (0) and it
2663 2664
                        is bigger then LSN on the page it will be written on
                        the page
unknown's avatar
unknown committed
2665 2666
*/

unknown's avatar
unknown committed
2667 2668
void pagecache_unpin(PAGECACHE *pagecache,
                     PAGECACHE_FILE *file,
2669 2670
                     pgcache_page_no_t pageno,
                     LSN lsn)
unknown's avatar
unknown committed
2671 2672 2673
{
  PAGECACHE_BLOCK_LINK *block;
  int page_st;
unknown's avatar
unknown committed
2674
  DBUG_ENTER("pagecache_unpin");
unknown's avatar
unknown committed
2675 2676 2677 2678 2679 2680 2681 2682 2683 2684
  DBUG_PRINT("enter", ("fd: %u  page: %lu",
                       (uint) file->file, (ulong) pageno));
  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  /*
    As soon as we keep lock cache can be used, and we have lock bacause want
    aunlock.
  */
  DBUG_ASSERT(pagecache->can_be_used);

  inc_counter_for_resize_op(pagecache);
unknown's avatar
unknown committed
2685
  /* See NOTE for pagecache_unlock about registering requests */
2686
  block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st);
unknown's avatar
unknown committed
2687 2688
  DBUG_ASSERT(block != 0);
  DBUG_ASSERT(page_st == PAGE_READ);
2689 2690
  /* we can't unpin such page without unlock */
  DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);
unknown's avatar
unknown committed
2691

2692 2693
  if (lsn != LSN_IMPOSSIBLE)
    check_and_set_lsn(pagecache, lsn, block);
2694 2695 2696 2697 2698 2699 2700 2701

  /*
    we can just unpin only with keeping read lock because:
    a) we can't pin without any lock
    b) we can't unpin keeping write lock
  */
  if (make_lock_and_pin(pagecache, block,
                        PAGECACHE_LOCK_LEFT_READLOCKED,
2702
                        PAGECACHE_UNPIN, file))
2703
    DBUG_ASSERT(0);                           /* should not happend */
unknown's avatar
unknown committed
2704 2705 2706 2707

  remove_reader(block);
  /*
    Link the block into the LRU chain if it's the last submitted request
2708
    for the block and block will not be pinned.
unknown's avatar
unknown committed
2709
    See NOTE for pagecache_unlock about registering requests
unknown's avatar
unknown committed
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720
  */
  unreg_request(pagecache, block, 1);

  dec_counter_for_resize_op(pagecache);

  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

  DBUG_VOID_RETURN;
}


2721 2722
/**
  @brief Unlock/unpin page and put LSN stamp if it need
unknown's avatar
unknown committed
2723 2724
  (uses direct block/page pointer)

2725 2726 2727 2728 2729 2730
  @param pagecache      pointer to a page cache data structure
  @param link           direct link to page (returned by read or write)
  @param lock           lock change
  @param pin            pin page
  @param first_REDO_LSN_for_page do not set it if it is LSN_IMPOSSIBLE (0)
  @param lsn            if it is not LSN_IMPOSSIBLE and it is bigger then
unknown's avatar
unknown committed
2731
                        LSN on the page it will be written on the page
2732 2733
  @param was_changed    should be true if the page was write locked with
                        direct link giving and the page was changed
unknown's avatar
unknown committed
2734 2735
*/

unknown's avatar
unknown committed
2736
void pagecache_unlock_by_link(PAGECACHE *pagecache,
2737
                              PAGECACHE_BLOCK_LINK *block,
unknown's avatar
unknown committed
2738 2739
                              enum pagecache_page_lock lock,
                              enum pagecache_page_pin pin,
2740
                              LSN first_REDO_LSN_for_page,
2741
                              LSN lsn, my_bool was_changed)
unknown's avatar
unknown committed
2742
{
unknown's avatar
unknown committed
2743
  DBUG_ENTER("pagecache_unlock_by_link");
2744
  DBUG_PRINT("enter", ("block: 0x%lx fd: %u  page: %lu  changed: %d  %s  %s",
unknown's avatar
unknown committed
2745 2746
                       (ulong) block,
                       (uint) block->hash_link->file.file,
2747
                       (ulong) block->hash_link->pageno, was_changed,
unknown's avatar
unknown committed
2748 2749
                       page_cache_page_lock_str[lock],
                       page_cache_page_pin_str[pin]));
2750 2751 2752 2753
  /*
    We do not allow any lock/pin increasing here and page can't be
    unpinned because we use direct link.
  */
unknown's avatar
unknown committed
2754 2755 2756 2757
  DBUG_ASSERT(pin != PAGECACHE_PIN);
  DBUG_ASSERT(pin != PAGECACHE_PIN_LEFT_UNPINNED);
  DBUG_ASSERT(lock != PAGECACHE_LOCK_READ);
  DBUG_ASSERT(lock != PAGECACHE_LOCK_WRITE);
unknown's avatar
unknown committed
2758 2759 2760
  if (pin == PAGECACHE_PIN_LEFT_UNPINNED &&
      lock == PAGECACHE_LOCK_READ_UNLOCK)
  {
2761
    /* block do not need here so we do not provide it */
2762
    if (make_lock_and_pin(pagecache, 0, lock, pin, 0))
2763
      DBUG_ASSERT(0);                         /* should not happend */
unknown's avatar
unknown committed
2764 2765 2766 2767 2768
    DBUG_VOID_RETURN;
  }

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  /*
unknown's avatar
unknown committed
2769 2770
    As soon as we keep lock cache can be used, and we have lock because want
    unlock.
unknown's avatar
unknown committed
2771 2772 2773 2774
  */
  DBUG_ASSERT(pagecache->can_be_used);

  inc_counter_for_resize_op(pagecache);
2775
  if (was_changed)
unknown's avatar
unknown committed
2776
  {
2777 2778 2779 2780 2781 2782 2783 2784 2785 2786
    if (first_REDO_LSN_for_page != LSN_IMPOSSIBLE)
    {
      /*
        LOCK_READ_UNLOCK is ok here as the page may have first locked
        with WRITE lock that was temporarly converted to READ lock before
        it's unpinned
      */
      DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
                  lock == PAGECACHE_LOCK_READ_UNLOCK);
      DBUG_ASSERT(pin == PAGECACHE_UNPIN);
2787
      pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
2788 2789 2790
    }
    if (lsn != LSN_IMPOSSIBLE)
      check_and_set_lsn(pagecache, lsn, block);
2791
    block->status&= ~PCBLOCK_ERROR;
2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811
  }

  /* if we lock for write we must link the block to changed blocks */
  DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0 ||
              (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
               lock == PAGECACHE_LOCK_LEFT_WRITELOCKED));
  /*
    If was_changed then status should be PCBLOCK_DIRECT_W or marked
    as dirty
  */
  DBUG_ASSERT(!was_changed || (block->status & PCBLOCK_DIRECT_W) ||
              (block->status & PCBLOCK_CHANGED));
  if ((block->status & PCBLOCK_DIRECT_W) &&
      (lock == PAGECACHE_LOCK_WRITE_UNLOCK))
  {
    if (!(block->status & PCBLOCK_CHANGED) && was_changed)
      link_to_changed_list(pagecache, block);
    block->status&= ~PCBLOCK_DIRECT_W;
    DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: 0x%lx",
                        (ulong) block));
unknown's avatar
unknown committed
2812
  }
2813

2814
  if (make_lock_and_pin(pagecache, block, lock, pin, 0))
2815
    DBUG_ASSERT(0);                           /* should not happend */
unknown's avatar
unknown committed
2816 2817 2818

  /*
    Link the block into the LRU chain if it's the last submitted request
2819
    for the block and block will not be pinned.
unknown's avatar
unknown committed
2820
    See NOTE for pagecache_unlock about registering requests.
unknown's avatar
unknown committed
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837
  */
  if (pin != PAGECACHE_PIN_LEFT_PINNED)
    unreg_request(pagecache, block, 1);

  dec_counter_for_resize_op(pagecache);

  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

  DBUG_VOID_RETURN;
}


/*
  Unpin page
  (uses direct block/page pointer)

  SYNOPSIS
unknown's avatar
unknown committed
2838
    pagecache_unpin_by_link()
unknown's avatar
unknown committed
2839 2840
    pagecache           pointer to a page cache data structure
    link                direct link to page (returned by read or write)
unknown's avatar
unknown committed
2841
    lsn                 if it is not LSN_IMPOSSIBLE (0) and it
2842 2843
                        is bigger then LSN on the page it will be written on
                        the page
unknown's avatar
unknown committed
2844 2845
*/

unknown's avatar
unknown committed
2846
void pagecache_unpin_by_link(PAGECACHE *pagecache,
unknown's avatar
unknown committed
2847
                             PAGECACHE_BLOCK_LINK *block,
2848
                             LSN lsn)
unknown's avatar
unknown committed
2849
{
unknown's avatar
unknown committed
2850
  DBUG_ENTER("pagecache_unpin_by_link");
unknown's avatar
unknown committed
2851 2852 2853 2854 2855 2856 2857
  DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu",
                       (ulong) block,
                       (uint) block->hash_link->file.file,
                       (ulong) block->hash_link->pageno));

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  /*
unknown's avatar
unknown committed
2858 2859
    As soon as we keep lock cache can be used, and we have lock because want
    unlock.
unknown's avatar
unknown committed
2860 2861
  */
  DBUG_ASSERT(pagecache->can_be_used);
2862 2863
  /* we can't unpin such page without unlock */
  DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);
unknown's avatar
unknown committed
2864 2865 2866

  inc_counter_for_resize_op(pagecache);

2867 2868
  if (lsn != LSN_IMPOSSIBLE)
    check_and_set_lsn(pagecache, lsn, block);
2869 2870 2871 2872 2873 2874 2875 2876

  /*
    We can just unpin only with keeping read lock because:
    a) we can't pin without any lock
    b) we can't unpin keeping write lock
  */
  if (make_lock_and_pin(pagecache, block,
                        PAGECACHE_LOCK_LEFT_READLOCKED,
2877
                        PAGECACHE_UNPIN, 0))
2878
    DBUG_ASSERT(0); /* should not happend */
unknown's avatar
unknown committed
2879 2880 2881

  /*
    Link the block into the LRU chain if it's the last submitted request
2882
    for the block and block will not be pinned.
unknown's avatar
unknown committed
2883
    See NOTE for pagecache_unlock about registering requests.
unknown's avatar
unknown committed
2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895
  */
  unreg_request(pagecache, block, 1);

  dec_counter_for_resize_op(pagecache);

  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

  DBUG_VOID_RETURN;
}


/*
unknown's avatar
unknown committed
2896
  @brief Read a block of data from a cached file into a buffer;
unknown's avatar
unknown committed
2897

unknown's avatar
unknown committed
2898 2899 2900 2901 2902 2903 2904 2905
  @param pagecache      pointer to a page cache data structure
  @param file           handler for the file for the block of data to be read
  @param pageno         number of the block of data in the file
  @param level          determines the weight of the data
  @param buff           buffer to where the data must be placed
  @param type           type of the page
  @param lock           lock change
  @param link           link to the page if we pin it
unknown's avatar
unknown committed
2906

unknown's avatar
unknown committed
2907
  @return address from where the data is placed if successful, 0 - otherwise.
unknown's avatar
unknown committed
2908

unknown's avatar
unknown committed
2909
  @note Pin will be chosen according to lock parameter (see lock_to_pin)
unknown's avatar
unknown committed
2910
*/
2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932
static enum pagecache_page_pin lock_to_pin[2][8]=
{
  {
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
    PAGECACHE_PIN_LEFT_PINNED   /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
    PAGECACHE_PIN               /*PAGECACHE_LOCK_WRITE*/,
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
    PAGECACHE_UNPIN             /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
    PAGECACHE_UNPIN             /*PAGECACHE_LOCK_WRITE_TO_READ*/
  },
  {
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
    PAGECACHE_PIN_LEFT_PINNED   /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
    PAGECACHE_PIN_LEFT_PINNED   /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
    PAGECACHE_PIN               /*PAGECACHE_LOCK_READ*/,
    PAGECACHE_PIN               /*PAGECACHE_LOCK_WRITE*/,
    PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
    PAGECACHE_UNPIN             /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
    PAGECACHE_PIN_LEFT_PINNED   /*PAGECACHE_LOCK_WRITE_TO_READ*/
  }
unknown's avatar
unknown committed
2933 2934
};

unknown's avatar
unknown committed
2935 2936 2937 2938 2939 2940 2941 2942
uchar *pagecache_read(PAGECACHE *pagecache,
                      PAGECACHE_FILE *file,
                      pgcache_page_no_t pageno,
                      uint level,
                      uchar *buff,
                      enum pagecache_page_type type,
                      enum pagecache_page_lock lock,
                      PAGECACHE_BLOCK_LINK **page_link)
unknown's avatar
unknown committed
2943 2944
{
  int error= 0;
2945 2946
  enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
  PAGECACHE_BLOCK_LINK *fake_link;
unknown's avatar
unknown committed
2947
  DBUG_ENTER("pagecache_read");
2948 2949 2950 2951
  DBUG_PRINT("enter", ("fd: %u  page: %lu  buffer: 0x%lx level: %u  "
                       "t:%s  %s  %s",
                       (uint) file->file, (ulong) pageno,
                       (ulong) buff, level,
unknown's avatar
unknown committed
2952 2953 2954
                       page_cache_page_type_str[type],
                       page_cache_page_lock_str[lock],
                       page_cache_page_pin_str[pin]));
2955 2956
  DBUG_ASSERT(buff != 0 || (buff == 0 && (pin == PAGECACHE_PIN ||
                                          pin == PAGECACHE_PIN_LEFT_PINNED)));
unknown's avatar
unknown committed
2957

2958 2959 2960
  if (!page_link)
    page_link= &fake_link;
  *page_link= 0;                                 /* Catch errors */
unknown's avatar
unknown committed
2961 2962 2963 2964 2965 2966

restart:

  if (pagecache->can_be_used)
  {
    /* Key cache is used */
2967
    PAGECACHE_BLOCK_LINK *block;
unknown's avatar
unknown committed
2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979
    uint status;
    int page_st;

    pagecache_pthread_mutex_lock(&pagecache->cache_lock);
    if (!pagecache->can_be_used)
    {
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
      goto no_key_cache;
    }

    inc_counter_for_resize_op(pagecache);
    pagecache->global_cache_r_requests++;
unknown's avatar
unknown committed
2980
    /* See NOTE for pagecache_unlock about registering requests. */
2981 2982
    block= find_block(pagecache, file, pageno, level,
                      test(lock == PAGECACHE_LOCK_WRITE),
2983 2984
                      test((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
                           (pin == PAGECACHE_PIN)),
2985
                      &page_st);
unknown's avatar
unknown committed
2986
    DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
2987
                block->type == type ||
2988
                type == PAGECACHE_LSN_PAGE ||
2989 2990
                type == PAGECACHE_READ_UNKNOWN_PAGE ||
                block->type == PAGECACHE_READ_UNKNOWN_PAGE);
2991 2992 2993
    if (type != PAGECACHE_READ_UNKNOWN_PAGE ||
        block->type == PAGECACHE_EMPTY_PAGE)
      block->type= type;
2994
    if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ))
2995 2996 2997 2998
    {
      DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
      /* The requested page is to be read into the block buffer */
      read_block(pagecache, block,
unknown's avatar
unknown committed
2999
                 (my_bool)(page_st == PAGE_TO_BE_READ));
3000 3001
      DBUG_PRINT("info", ("read is done"));
    }
3002

3003
    if (make_lock_and_pin(pagecache, block, lock, pin, file))
unknown's avatar
unknown committed
3004 3005
    {
      /*
3006 3007
        We failed to write lock the block, cache is unlocked,
        we will try to get the block again.
unknown's avatar
unknown committed
3008 3009
      */
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3010
      DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
3011 3012 3013
      goto restart;
    }

3014 3015
    status= block->status;
    if (!buff)
3016
    {
3017
      buff=  block->buffer;
3018
      /* possibly we will write here (resolved on unlock) */
3019 3020 3021
      if ((lock == PAGECACHE_LOCK_WRITE ||
           lock == PAGECACHE_LOCK_LEFT_WRITELOCKED) &&
          !(block->status & PCBLOCK_CHANGED))
3022 3023 3024 3025 3026
      {
        block->status|= PCBLOCK_DIRECT_W;
        DBUG_PRINT("info", ("Set PCBLOCK_DIRECT_W for block: 0x%lx",
                            (ulong) block));
      }
3027
    }
3028
    else
unknown's avatar
unknown committed
3029
    {
3030 3031
      if (!(status & PCBLOCK_ERROR))
      {
unknown's avatar
unknown committed
3032
#if !defined(SERIALIZED_READ_FROM_CACHE)
3033
        pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
unknown's avatar
unknown committed
3034 3035
#endif

3036 3037 3038
        DBUG_ASSERT((pagecache->block_size & 511) == 0);
        /* Copy data from the cache buffer */
        bmove512(buff, block->buffer, pagecache->block_size);
unknown's avatar
unknown committed
3039 3040

#if !defined(SERIALIZED_READ_FROM_CACHE)
3041
        pagecache_pthread_mutex_lock(&pagecache->cache_lock);
unknown's avatar
unknown committed
3042
#endif
3043
      }
unknown's avatar
unknown committed
3044 3045
    }

unknown's avatar
unknown committed
3046
    remove_reader(block);
unknown's avatar
unknown committed
3047 3048
    /*
      Link the block into the LRU chain if it's the last submitted request
3049
      for the block and block will not be pinned.
unknown's avatar
unknown committed
3050
      See NOTE for pagecache_unlock about registering requests.
unknown's avatar
unknown committed
3051
    */
3052
    if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
unknown's avatar
unknown committed
3053 3054
      unreg_request(pagecache, block, 1);
    else
3055
      *page_link= block;
unknown's avatar
unknown committed
3056 3057 3058 3059 3060

    dec_counter_for_resize_op(pagecache);

    pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

unknown's avatar
unknown committed
3061
    if (status & PCBLOCK_ERROR)
3062 3063 3064
    {
      DBUG_ASSERT(my_errno != 0);
      DBUG_PRINT("error", ("Got error %d when doing page read", my_errno));
unknown's avatar
unknown committed
3065
      DBUG_RETURN((uchar *) 0);
3066
    }
unknown's avatar
unknown committed
3067 3068 3069 3070 3071 3072 3073 3074 3075

    DBUG_RETURN(buff);
  }

no_key_cache:					/* Key cache is not used */

  /* We can't use mutex here as the key cache may not be initialized */
  pagecache->global_cache_r_requests++;
  pagecache->global_cache_read++;
3076 3077
  if (pagecache_fread(pagecache, file, (uchar*) buff, pageno,
                      pagecache->readwrite_flags))
unknown's avatar
unknown committed
3078
    error= 1;
unknown's avatar
unknown committed
3079
  DBUG_RETURN(error ? (uchar*) 0 : buff);
unknown's avatar
unknown committed
3080 3081 3082 3083 3084 3085 3086
}


/*
  Delete page from the buffer

  SYNOPSIS
unknown's avatar
unknown committed
3087
    pagecache_delete()
unknown's avatar
unknown committed
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101
    pagecache           pointer to a page cache data structure
    file                handler for the file for the block of data to be read
    pageno              number of the block of data in the file
    lock                lock change
    flush               flush page if it is dirty

  RETURN VALUE
    0 - deleted or was not present at all
    1 - error

  NOTES.
  lock  can be only PAGECACHE_LOCK_LEFT_WRITELOCKED (page was write locked
  before) or PAGECACHE_LOCK_WRITE (delete will write lock page before delete)
*/
unknown's avatar
unknown committed
3102 3103 3104 3105 3106
my_bool pagecache_delete(PAGECACHE *pagecache,
                         PAGECACHE_FILE *file,
                         pgcache_page_no_t pageno,
                         enum pagecache_page_lock lock,
                         my_bool flush)
unknown's avatar
unknown committed
3107 3108
{
  int error= 0;
3109
  enum pagecache_page_pin pin= lock_to_pin[0][lock];
unknown's avatar
unknown committed
3110
  DBUG_ENTER("pagecache_delete");
3111
  DBUG_PRINT("enter", ("fd: %u  page: %lu  %s  %s",
unknown's avatar
unknown committed
3112 3113 3114 3115 3116
                       (uint) file->file, (ulong) pageno,
                       page_cache_page_lock_str[lock],
                       page_cache_page_pin_str[pin]));
  DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE ||
              lock == PAGECACHE_LOCK_LEFT_WRITELOCKED);
3117 3118
  DBUG_ASSERT(pin == PAGECACHE_PIN ||
              pin == PAGECACHE_PIN_LEFT_PINNED);
unknown's avatar
unknown committed
3119 3120 3121 3122 3123 3124
restart:

  if (pagecache->can_be_used)
  {
    /* Key cache is used */
    reg1 PAGECACHE_BLOCK_LINK *block;
3125
    PAGECACHE_HASH_LINK **unused_start, *page_link;
unknown's avatar
unknown committed
3126 3127 3128 3129 3130 3131

    pagecache_pthread_mutex_lock(&pagecache->cache_lock);
    if (!pagecache->can_be_used)
      goto end;

    inc_counter_for_resize_op(pagecache);
3132 3133
    page_link= get_present_hash_link(pagecache, file, pageno, &unused_start);
    if (!page_link)
unknown's avatar
unknown committed
3134
    {
3135
      DBUG_PRINT("info", ("There is no such page in the cache"));
unknown's avatar
unknown committed
3136
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
unknown's avatar
unknown committed
3137 3138
      DBUG_RETURN(0);
    }
3139
    block= page_link->block;
unknown's avatar
unknown committed
3140
    /* See NOTE for pagecache_unlock about registering requests. */
3141 3142
    if (pin == PAGECACHE_PIN)
      reg_requests(pagecache, block, 1);
unknown's avatar
unknown committed
3143
    DBUG_ASSERT(block != 0);
3144
    if (make_lock_and_pin(pagecache, block, lock, pin, file))
unknown's avatar
unknown committed
3145 3146 3147 3148 3149 3150
    {
      /*
        We failed to writelock the block, cache is unlocked, and last write
        lock is released, we will try to get the block again.
      */
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3151
      DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
3152 3153 3154
      goto restart;
    }

3155 3156 3157
    /* we can't delete with opened direct link for write */
    DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);

unknown's avatar
unknown committed
3158
    if (block->status & PCBLOCK_CHANGED)
unknown's avatar
unknown committed
3159
    {
unknown's avatar
unknown committed
3160
      if (flush)
unknown's avatar
unknown committed
3161
      {
unknown's avatar
unknown committed
3162
        /* The block contains a dirty page - push it out of the cache */
unknown's avatar
unknown committed
3163

3164
        KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
unknown's avatar
unknown committed
3165

unknown's avatar
unknown committed
3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176
        pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
        /*
          The call is thread safe because only the current
          thread might change the block->hash_link value
        */
        DBUG_ASSERT(block->pins == 1);
        error= pagecache_fwrite(pagecache,
                                &block->hash_link->file,
                                block->buffer,
                                block->hash_link->pageno,
                                block->type,
3177
                                pagecache->readwrite_flags);
unknown's avatar
unknown committed
3178 3179
        pagecache_pthread_mutex_lock(&pagecache->cache_lock);
        pagecache->global_cache_write++;
unknown's avatar
unknown committed
3180

unknown's avatar
unknown committed
3181 3182
        if (error)
        {
unknown's avatar
unknown committed
3183
          block->status|= PCBLOCK_ERROR;
unknown's avatar
unknown committed
3184 3185
          goto err;
        }
unknown's avatar
unknown committed
3186 3187 3188
      }
      pagecache->blocks_changed--;
      pagecache->global_blocks_changed--;
unknown's avatar
unknown committed
3189
      /*
unknown's avatar
unknown committed
3190 3191
        free_block() will change the status and rec_lsn of the block so no
        need to change them here.
unknown's avatar
unknown committed
3192
      */
unknown's avatar
unknown committed
3193 3194
    }
    /* Cache is locked, so we can relese page before freeing it */
3195 3196
    make_lock_and_pin(pagecache, block,
                      PAGECACHE_LOCK_WRITE_UNLOCK,
3197
                      PAGECACHE_UNPIN, file);
3198 3199
    DBUG_ASSERT(page_link->requests > 0);
    page_link->requests--;
unknown's avatar
unknown committed
3200
    /* See NOTE for pagecache_unlock about registering requests. */
unknown's avatar
unknown committed
3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212
    free_block(pagecache, block);

err:
    dec_counter_for_resize_op(pagecache);
end:
    pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
  }

  DBUG_RETURN(error);
}


3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226
my_bool pagecache_delete_pages(PAGECACHE *pagecache,
                               PAGECACHE_FILE *file,
                               pgcache_page_no_t pageno,
                               uint page_count,
                               enum pagecache_page_lock lock,
                               my_bool flush)
{
  ulong page_end;
  DBUG_ENTER("pagecache_delete_pages");
  DBUG_ASSERT(page_count > 0);

  page_end= pageno + page_count;
  do
  {
unknown's avatar
unknown committed
3227 3228
    if (pagecache_delete(pagecache, file, pageno,
                         lock, flush))
3229 3230 3231 3232 3233 3234
      DBUG_RETURN(1);
  } while (++pageno != page_end);
  DBUG_RETURN(0);
}


3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255
/**
  @brief Writes a buffer into a cached file.

  @param pagecache       pointer to a page cache data structure
  @param file            handler for the file to write data to
  @param pageno          number of the block of data in the file
  @param level           determines the weight of the data
  @param buff            buffer with the data
  @param type            type of the page
  @param lock            lock change
  @param pin             pin page
  @param write_mode      how to write page
  @param link            link to the page if we pin it
  @param first_REDO_LSN_for_page the lsn to set rec_lsn
  @param offset          offset in the page
  @param size            size of data
  @param validator       read page validator
  @param validator_data  the validator data

  @retval 0 if a success.
  @retval 1 Error.
unknown's avatar
unknown committed
3256 3257
*/

3258
/* description of how to change lock before and after write */
unknown's avatar
unknown committed
3259 3260
struct write_lock_change
{
3261 3262 3263
  int need_lock_change; /* need changing of lock at the end of write */
  enum pagecache_page_lock new_lock; /* lock at the beginning */
  enum pagecache_page_lock unlock_lock; /* lock at the end */
unknown's avatar
unknown committed
3264 3265 3266 3267 3268 3269 3270
};

static struct write_lock_change write_lock_change_table[]=
{
  {1,
   PAGECACHE_LOCK_WRITE,
   PAGECACHE_LOCK_WRITE_UNLOCK} /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
unknown's avatar
unknown committed
3271
  {0, /*unsupported (we can't write having the block read locked) */
unknown's avatar
unknown committed
3272 3273 3274 3275 3276 3277 3278
   PAGECACHE_LOCK_LEFT_UNLOCKED,
   PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
  {0, PAGECACHE_LOCK_LEFT_WRITELOCKED, 0} /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
  {1,
   PAGECACHE_LOCK_WRITE,
   PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_READ*/,
  {0, PAGECACHE_LOCK_WRITE, 0} /*PAGECACHE_LOCK_WRITE*/,
unknown's avatar
unknown committed
3279
  {0, /*unsupported (we can't write having the block read locked) */
unknown's avatar
unknown committed
3280 3281 3282 3283 3284 3285 3286
   PAGECACHE_LOCK_LEFT_UNLOCKED,
   PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_READ_UNLOCK*/,
  {1,
   PAGECACHE_LOCK_LEFT_WRITELOCKED,
   PAGECACHE_LOCK_WRITE_UNLOCK } /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
  {1,
   PAGECACHE_LOCK_LEFT_WRITELOCKED,
unknown's avatar
unknown committed
3287
   PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_WRITE_TO_READ*/
unknown's avatar
unknown committed
3288 3289
};

3290
/* description of how to change pin before and after write */
unknown's avatar
unknown committed
3291 3292
struct write_pin_change
{
3293 3294
  enum pagecache_page_pin new_pin; /* pin status at the beginning */
  enum pagecache_page_pin unlock_pin; /* pin status at the end */
unknown's avatar
unknown committed
3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308
};

static struct write_pin_change write_pin_change_table[]=
{
  {PAGECACHE_PIN_LEFT_PINNED,
   PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN_LEFT_PINNED*/,
  {PAGECACHE_PIN,
   PAGECACHE_UNPIN} /*PAGECACHE_PIN_LEFT_UNPINNED*/,
  {PAGECACHE_PIN,
   PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN*/,
  {PAGECACHE_PIN_LEFT_PINNED,
   PAGECACHE_UNPIN} /*PAGECACHE_UNPIN*/
};

unknown's avatar
unknown committed
3309 3310 3311 3312
my_bool pagecache_write_part(PAGECACHE *pagecache,
                             PAGECACHE_FILE *file,
                             pgcache_page_no_t pageno,
                             uint level,
unknown's avatar
unknown committed
3313
                             uchar *buff,
unknown's avatar
unknown committed
3314 3315 3316 3317
                             enum pagecache_page_type type,
                             enum pagecache_page_lock lock,
                             enum pagecache_page_pin pin,
                             enum pagecache_write_mode write_mode,
3318
                             PAGECACHE_BLOCK_LINK **page_link,
3319
                             LSN first_REDO_LSN_for_page,
unknown's avatar
unknown committed
3320
                             uint offset, uint size)
unknown's avatar
unknown committed
3321 3322
{
  PAGECACHE_BLOCK_LINK *block= NULL;
unknown's avatar
unknown committed
3323
  PAGECACHE_BLOCK_LINK *fake_link;
unknown's avatar
unknown committed
3324 3325
  int error= 0;
  int need_lock_change= write_lock_change_table[lock].need_lock_change;
unknown's avatar
unknown committed
3326
  DBUG_ENTER("pagecache_write_part");
3327
  DBUG_PRINT("enter", ("fd: %u  page: %lu  level: %u  type: %s  lock: %s  "
unknown's avatar
unknown committed
3328
                       "pin: %s   mode: %s  offset: %u  size %u",
unknown's avatar
unknown committed
3329 3330 3331 3332
                       (uint) file->file, (ulong) pageno, level,
                       page_cache_page_type_str[type],
                       page_cache_page_lock_str[lock],
                       page_cache_page_pin_str[pin],
unknown's avatar
unknown committed
3333 3334
                       page_cache_page_write_mode_str[write_mode],
                       offset, size));
3335
  DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
unknown's avatar
unknown committed
3336 3337
  DBUG_ASSERT(lock != PAGECACHE_LOCK_LEFT_READLOCKED);
  DBUG_ASSERT(lock != PAGECACHE_LOCK_READ_UNLOCK);
unknown's avatar
unknown committed
3338
  DBUG_ASSERT(offset + size <= pagecache->block_size);
unknown's avatar
unknown committed
3339

3340 3341 3342
  if (!page_link)
    page_link= &fake_link;
  *page_link= 0;
unknown's avatar
unknown committed
3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364

restart:

#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
  DBUG_EXECUTE("check_pagecache",
               test_key_cache(pagecache, "start of key_cache_write", 1););
#endif

  if (pagecache->can_be_used)
  {
    /* Key cache is used */
    int page_st;

    pagecache_pthread_mutex_lock(&pagecache->cache_lock);
    if (!pagecache->can_be_used)
    {
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
      goto no_key_cache;
    }

    inc_counter_for_resize_op(pagecache);
    pagecache->global_cache_w_requests++;
unknown's avatar
unknown committed
3365
    /* See NOTE for pagecache_unlock about registering requests. */
3366 3367 3368 3369 3370 3371 3372 3373
    block= find_block(pagecache, file, pageno, level,
                      test(write_mode != PAGECACHE_WRITE_DONE &&
                           lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
                           lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
                           lock != PAGECACHE_LOCK_WRITE_TO_READ),
                      test((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
                           (pin == PAGECACHE_PIN)),
                      &page_st);
unknown's avatar
unknown committed
3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384
    if (!block)
    {
      DBUG_ASSERT(write_mode != PAGECACHE_WRITE_DONE);
      /* It happens only for requests submitted during resize operation */
      dec_counter_for_resize_op(pagecache);
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
      /* Write to the disk key cache is in resize at the moment*/
      goto no_key_cache;
    }

    DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
3385
                block->type == PAGECACHE_READ_UNKNOWN_PAGE ||
unknown's avatar
unknown committed
3386 3387 3388
                block->type == type ||
                (block->type == PAGECACHE_PLAIN_PAGE &&
                 type == PAGECACHE_LSN_PAGE));
unknown's avatar
unknown committed
3389
    block->type= type;
3390 3391 3392 3393
    /* we write to the page so it has no sense to keep the flag */
    block->status&= ~PCBLOCK_DIRECT_W;
    DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: 0x%lx",
                        (ulong) block));
unknown's avatar
unknown committed
3394

3395 3396 3397 3398
    if (make_lock_and_pin(pagecache, block,
                          write_lock_change_table[lock].new_lock,
                          (need_lock_change ?
                           write_pin_change_table[pin].new_pin :
3399
                           pin), file))
unknown's avatar
unknown committed
3400 3401 3402 3403 3404 3405
    {
      /*
        We failed to writelock the block, cache is unlocked, and last write
        lock is released, we will try to get the block again.
      */
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3406
      DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
3407 3408 3409 3410 3411
      goto restart;
    }

    if (write_mode == PAGECACHE_WRITE_DONE)
    {
3412 3413 3414 3415 3416
      if (block->status & PCBLOCK_ERROR)
      {
        DBUG_PRINT("warning", ("Writing on page with error"));
      }
      else
unknown's avatar
unknown committed
3417 3418
      {
        /* Copy data from buff */
unknown's avatar
unknown committed
3419 3420 3421 3422
        if (!(size & 511))
          bmove512(block->buffer + offset, buff, size);
        else
          memcpy(block->buffer + offset, buff, size);
3423
        block->status= PCBLOCK_READ;
3424
        /*
unknown's avatar
unknown committed
3425
          The read_callback can change the page content (removing page
3426 3427
          protection) so it have to be called
        */
unknown's avatar
unknown committed
3428 3429 3430 3431 3432 3433 3434 3435 3436
        DBUG_PRINT("info", ("read_callback: 0x%lx  data: 0x%lx",
                            (ulong) block->hash_link->file.read_callback,
                            (ulong) block->hash_link->file.callback_data));
        if ((*block->hash_link->file.read_callback)(block->buffer,
                                                    block->hash_link->pageno,
                                                    block->hash_link->
                                                    file.callback_data))
        {
          DBUG_PRINT("error", ("read callback problem"));
3437
          block->status|= PCBLOCK_ERROR;
unknown's avatar
unknown committed
3438
        }
unknown's avatar
unknown committed
3439
        KEYCACHE_DBUG_PRINT("key_cache_insert",
3440
                            ("Page injection"));
unknown's avatar
unknown committed
3441
#ifdef THREAD
unknown's avatar
unknown committed
3442 3443
        /* Signal that all pending requests for this now can be processed. */
        if (block->wqueue[COND_FOR_REQUESTED].last_thread)
3444
          wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
unknown's avatar
unknown committed
3445
#endif
unknown's avatar
unknown committed
3446 3447 3448 3449
      }
    }
    else
    {
unknown's avatar
unknown committed
3450
      if (! (block->status & PCBLOCK_CHANGED))
unknown's avatar
unknown committed
3451
          link_to_changed_list(pagecache, block);
unknown's avatar
unknown committed
3452

3453 3454 3455 3456 3457 3458 3459 3460
      if (!(size & 511))
        bmove512(block->buffer + offset, buff, size);
      else
        memcpy(block->buffer + offset, buff, size);
      block->status|= PCBLOCK_READ;
      /* Page is correct again if we made a full write in it */
      if (size == pagecache->block_size)
        block->status&= ~PCBLOCK_ERROR;
unknown's avatar
unknown committed
3461 3462
    }

3463 3464 3465 3466 3467 3468 3469 3470 3471
    if (first_REDO_LSN_for_page)
    {
      /* single write action of the last write action */
      DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
                  lock == PAGECACHE_LOCK_LEFT_UNLOCKED);
      DBUG_ASSERT(pin == PAGECACHE_UNPIN ||
                  pin == PAGECACHE_PIN_LEFT_UNPINNED);
      pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
    }
unknown's avatar
unknown committed
3472 3473 3474

    if (need_lock_change)
    {
3475
      /*
3476 3477
        We don't set rec_lsn of the block; this is ok as for the
        Maria-block-record's pages, we always keep pages pinned here.
3478 3479 3480
      */
      if (make_lock_and_pin(pagecache, block,
                            write_lock_change_table[lock].unlock_lock,
3481
                            write_pin_change_table[pin].unlock_pin, file))
3482
        DBUG_ASSERT(0);
unknown's avatar
unknown committed
3483 3484
    }

unknown's avatar
unknown committed
3485 3486 3487
    /* Unregister the request */
    DBUG_ASSERT(block->hash_link->requests > 0);
    block->hash_link->requests--;
unknown's avatar
unknown committed
3488
    /* See NOTE for pagecache_unlock about registering requests. */
3489
    if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
3490
      unreg_request(pagecache, block, 1);
unknown's avatar
unknown committed
3491
    else
3492
      *page_link= block;
unknown's avatar
unknown committed
3493

unknown's avatar
unknown committed
3494
    if (block->status & PCBLOCK_ERROR)
unknown's avatar
unknown committed
3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509
      error= 1;

    dec_counter_for_resize_op(pagecache);

    pagecache_pthread_mutex_unlock(&pagecache->cache_lock);

    goto end;
  }

no_key_cache:
  /* Key cache is not used */
  if (write_mode == PAGECACHE_WRITE_DELAY)
  {
    pagecache->global_cache_w_requests++;
    pagecache->global_cache_write++;
unknown's avatar
unknown committed
3510
    if (pagecache_fwrite(pagecache, file, (uchar*) buff, pageno, type,
3511
                         pagecache->readwrite_flags))
unknown's avatar
unknown committed
3512 3513 3514 3515 3516 3517 3518 3519
      error=1;
  }

end:
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
  DBUG_EXECUTE("exec",
               test_key_cache(pagecache, "end of key_cache_write", 1););
#endif
unknown's avatar
unknown committed
3520 3521 3522 3523
  if (block)
    PCBLOCK_INFO(block);
  else
    DBUG_PRINT("info", ("No block"));
unknown's avatar
unknown committed
3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537
  DBUG_RETURN(error);
}


/*
  Free block: remove reference to it from hash table,
  remove it from the chain file of dirty/clean blocks
  and add it to the free list.
*/

static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
{
  KEYCACHE_THREAD_TRACE("free block");
  KEYCACHE_DBUG_PRINT("free_block",
3538 3539 3540
                      ("block: %u  hash_link 0x%lx",
                       PCBLOCK_NUMBER(pagecache, block),
                       (long) block->hash_link));
unknown's avatar
unknown committed
3541 3542 3543 3544
  if (block->hash_link)
  {
    /*
      While waiting for readers to finish, new readers might request the
unknown's avatar
unknown committed
3545
      block. But since we set block->status|= PCBLOCK_REASSIGNED, they
unknown's avatar
unknown committed
3546 3547 3548
      will wait on block->wqueue[COND_FOR_SAVED]. They must be signalled
      later.
    */
unknown's avatar
unknown committed
3549
    block->status|= PCBLOCK_REASSIGNED;
unknown's avatar
unknown committed
3550 3551 3552 3553 3554
    wait_for_readers(pagecache, block);
    unlink_hash(pagecache, block->hash_link);
  }

  unlink_changed(block);
3555
  DBUG_ASSERT(block->wlocks == 0);
3556
  DBUG_ASSERT(block->pins == 0);
unknown's avatar
unknown committed
3557 3558 3559 3560
  block->status= 0;
#ifndef DBUG_OFF
  block->type= PAGECACHE_EMPTY_PAGE;
#endif
unknown's avatar
unknown committed
3561
  block->rec_lsn= LSN_MAX;
unknown's avatar
unknown committed
3562 3563 3564 3565 3566 3567 3568 3569
  KEYCACHE_THREAD_TRACE("free block");
  KEYCACHE_DBUG_PRINT("free_block",
                      ("block is freed"));
  unreg_request(pagecache, block, 0);
  block->hash_link= NULL;

  /* Remove the free block from the LRU ring. */
  unlink_block(pagecache, block);
unknown's avatar
unknown committed
3570
  if (block->temperature == PCBLOCK_WARM)
unknown's avatar
unknown committed
3571
    pagecache->warm_blocks--;
unknown's avatar
unknown committed
3572
  block->temperature= PCBLOCK_COLD;
unknown's avatar
unknown committed
3573 3574 3575 3576 3577 3578
  /* Insert the free block in the free list. */
  block->next_used= pagecache->free_block_list;
  pagecache->free_block_list= block;
  /* Keep track of the number of currently unused blocks. */
  pagecache->blocks_unused++;

unknown's avatar
unknown committed
3579
#ifdef THREAD
unknown's avatar
unknown committed
3580 3581
  /* All pending requests for this page must be resubmitted. */
  if (block->wqueue[COND_FOR_SAVED].last_thread)
3582
    wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
unknown's avatar
unknown committed
3583
#endif
unknown's avatar
unknown committed
3584 3585 3586 3587 3588 3589 3590 3591 3592 3593
}


static int cmp_sec_link(PAGECACHE_BLOCK_LINK **a, PAGECACHE_BLOCK_LINK **b)
{
  return (((*a)->hash_link->pageno < (*b)->hash_link->pageno) ? -1 :
      ((*a)->hash_link->pageno > (*b)->hash_link->pageno) ? 1 : 0);
}


3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610
/**
  @brief Flush a portion of changed blocks to disk, free used blocks
  if requested

  @param pagecache       This page cache reference.
  @param file            File which should be flushed
  @param cache           Beginning of array of the block.
  @param end             Reference to the block after last in the array.
  @param flush_type      Type of the flush.
  @param first_errno     Where to store first errno of the flush.


  @return Operation status
  @retval PCFLUSH_OK OK
  @retval PCFLUSH_ERROR There was errors during the flush process.
  @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
  @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
unknown's avatar
unknown committed
3611 3612 3613 3614 3615 3616
*/

static int flush_cached_blocks(PAGECACHE *pagecache,
                               PAGECACHE_FILE *file,
                               PAGECACHE_BLOCK_LINK **cache,
                               PAGECACHE_BLOCK_LINK **end,
3617 3618
                               enum flush_type type,
                               int *first_errno)
unknown's avatar
unknown committed
3619
{
3620
  int rc= PCFLUSH_OK;
unknown's avatar
unknown committed
3621 3622 3623
  int error;
  uint count= (uint) (end-cache);
  DBUG_ENTER("flush_cached_blocks");
3624
  *first_errno= 0;
unknown's avatar
unknown committed
3625 3626 3627 3628

  /* Don't lock the cache during the flush */
  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
  /*
unknown's avatar
unknown committed
3629
     As all blocks referred in 'cache' are marked by PCBLOCK_IN_FLUSH
unknown's avatar
unknown committed
3630
     we are guaranteed that no thread will change them
unknown's avatar
unknown committed
3631
  */
unknown's avatar
unknown committed
3632
  qsort((uchar*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
unknown's avatar
unknown committed
3633 3634 3635 3636 3637 3638 3639 3640 3641

  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  for (; cache != end; cache++)
  {
    PAGECACHE_BLOCK_LINK *block= *cache;

    if (block->pins)
    {
      KEYCACHE_DBUG_PRINT("flush_cached_blocks",
3642
                          ("block: %u (0x%lx)  pinned",
unknown's avatar
unknown committed
3643
                           PCBLOCK_NUMBER(pagecache, block), (ulong)block));
3644
      DBUG_PRINT("info", ("block: %u (0x%lx)  pinned",
unknown's avatar
unknown committed
3645 3646
                          PCBLOCK_NUMBER(pagecache, block), (ulong)block));
      PCBLOCK_INFO(block);
unknown's avatar
unknown committed
3647 3648
      /* undo the mark put by flush_pagecache_blocks_int(): */
      block->status&= ~PCBLOCK_IN_FLUSH;
3649
      rc|= PCFLUSH_PINNED;
3650
      DBUG_PRINT("warning", ("Page pinned"));
unknown's avatar
unknown committed
3651
      unreg_request(pagecache, block, 1);
3652 3653
      if (!*first_errno)
        *first_errno= HA_ERR_INTERNAL_ERROR;
unknown's avatar
unknown committed
3654 3655 3656
      continue;
    }
    /* if the block is not pinned then it is not write locked */
3657
    DBUG_ASSERT(block->wlocks == 0);
3658
    DBUG_ASSERT(block->pins == 0);
3659
    if (make_lock_and_pin(pagecache, block,
3660
                          PAGECACHE_LOCK_WRITE, PAGECACHE_PIN, 0))
3661
      DBUG_ASSERT(0);
unknown's avatar
unknown committed
3662 3663

    KEYCACHE_DBUG_PRINT("flush_cached_blocks",
3664
                        ("block: %u (0x%lx)  to be flushed",
unknown's avatar
unknown committed
3665
                         PCBLOCK_NUMBER(pagecache, block), (ulong)block));
3666
    DBUG_PRINT("info", ("block: %u (0x%lx)  to be flushed",
unknown's avatar
unknown committed
3667 3668
                        PCBLOCK_NUMBER(pagecache, block), (ulong)block));
    PCBLOCK_INFO(block);
unknown's avatar
unknown committed
3669
    pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3670
    DBUG_PRINT("info", ("block: %u (0x%lx)  pins: %u",
unknown's avatar
unknown committed
3671
                        PCBLOCK_NUMBER(pagecache, block), (ulong)block,
unknown's avatar
unknown committed
3672 3673
                        block->pins));
    DBUG_ASSERT(block->pins == 1);
unknown's avatar
unknown committed
3674
    /**
unknown's avatar
unknown committed
3675 3676
       @todo IO If page is contiguous with next page to flush, group flushes
       in one single my_pwrite().
unknown's avatar
unknown committed
3677
    */
3678
    error= pagecache_fwrite(pagecache, &block->hash_link->file,
unknown's avatar
unknown committed
3679 3680 3681
                            block->buffer,
                            block->hash_link->pageno,
                            block->type,
3682
                            pagecache->readwrite_flags);
unknown's avatar
unknown committed
3683 3684
    pagecache_pthread_mutex_lock(&pagecache->cache_lock);

3685 3686
    make_lock_and_pin(pagecache, block,
                      PAGECACHE_LOCK_WRITE_UNLOCK,
3687
                      PAGECACHE_UNPIN, 0);
unknown's avatar
unknown committed
3688 3689 3690 3691

    pagecache->global_cache_write++;
    if (error)
    {
unknown's avatar
unknown committed
3692
      block->status|= PCBLOCK_ERROR;
3693
      if (!*first_errno)
3694 3695
        *first_errno= my_errno ? my_errno : -1;
      rc|= PCFLUSH_ERROR;
unknown's avatar
unknown committed
3696
    }
unknown's avatar
unknown committed
3697
#ifdef THREAD
unknown's avatar
unknown committed
3698 3699 3700 3701 3702
    /*
      Let to proceed for possible waiting requests to write to the block page.
      It might happen only during an operation to resize the key cache.
    */
    if (block->wqueue[COND_FOR_SAVED].last_thread)
3703
      wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
unknown's avatar
unknown committed
3704
#endif
unknown's avatar
unknown committed
3705
    /* type will never be FLUSH_IGNORE_CHANGED here */
unknown's avatar
unknown committed
3706 3707
    if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
           type == FLUSH_FORCE_WRITE))
unknown's avatar
unknown committed
3708 3709 3710 3711 3712 3713 3714
    {
      pagecache->blocks_changed--;
      pagecache->global_blocks_changed--;
      free_block(pagecache, block);
    }
    else
    {
unknown's avatar
unknown committed
3715
      block->status&= ~PCBLOCK_IN_FLUSH;
unknown's avatar
unknown committed
3716 3717 3718 3719
      link_to_file_list(pagecache, block, file, 1);
      unreg_request(pagecache, block, 1);
    }
  }
3720
  DBUG_RETURN(rc);
unknown's avatar
unknown committed
3721 3722 3723
}


3724
/**
unknown's avatar
unknown committed
3725
   @brief flush all blocks for a file to disk but don't do any mutex locks
unknown's avatar
unknown committed
3726

3727 3728 3729
   @param  pagecache       pointer to a pagecache data structure
   @param  file            handler for the file to flush to
   @param  flush_type      type of the flush
unknown's avatar
unknown committed
3730
   @param  filter          optional function which tells what blocks to flush;
unknown's avatar
unknown committed
3731 3732
                           can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
                           or FLUSH_FORCE_WRITE.
unknown's avatar
unknown committed
3733 3734
   @param  filter_arg      an argument to pass to 'filter'. Information about
                           the block will be passed too.
unknown's avatar
unknown committed
3735

3736 3737 3738 3739
   @note
     This function doesn't do any mutex locks because it needs to be called
     both from flush_pagecache_blocks and flush_all_key_blocks (the later one
     does the mutex lock in the resize_pagecache() function).
unknown's avatar
unknown committed
3740

unknown's avatar
unknown committed
3741 3742 3743 3744 3745 3746
   @note
     This function can cause problems if two threads call it
     concurrently on the same file (look for "PageCacheFlushConcurrencyBugs"
     in ma_checkpoint.c); to avoid them, it has internal logic to serialize in
     this situation.

3747
   @return Operation status
3748 3749 3750 3751
   @retval PCFLUSH_OK OK
   @retval PCFLUSH_ERROR There was errors during the flush process.
   @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
   @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
unknown's avatar
unknown committed
3752 3753 3754 3755
*/

static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
                                      PAGECACHE_FILE *file,
unknown's avatar
unknown committed
3756 3757 3758
                                      enum flush_type type,
                                      PAGECACHE_FLUSH_FILTER filter,
                                      void *filter_arg)
unknown's avatar
unknown committed
3759 3760 3761
{
  PAGECACHE_BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
  int last_errno= 0;
3762
  int rc= PCFLUSH_OK;
unknown's avatar
unknown committed
3763
  DBUG_ENTER("flush_pagecache_blocks_int");
3764
  DBUG_PRINT("enter",
3765
             ("fd: %d  blocks_used: %lu  blocks_changed: %lu  type: %d",
3766 3767
              file->file, pagecache->blocks_used, pagecache->blocks_changed,
              type));
unknown's avatar
unknown committed
3768 3769 3770 3771 3772 3773 3774 3775 3776

#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
    DBUG_EXECUTE("check_pagecache",
                 test_key_cache(pagecache,
                                "start of flush_pagecache_blocks", 0););
#endif

  cache= cache_buff;
  if (pagecache->disk_blocks > 0 &&
unknown's avatar
unknown committed
3777 3778
      (!my_disable_flush_pagecache_blocks ||
       (type != FLUSH_KEEP && type != FLUSH_KEEP_LAZY)))
unknown's avatar
unknown committed
3779
  {
unknown's avatar
unknown committed
3780 3781 3782 3783 3784 3785
    /*
      Key cache exists. If my_disable_flush_pagecache_blocks is true it
      disables the operation but only FLUSH_KEEP[_LAZY]: other flushes still
      need to be allowed: FLUSH_RELEASE has to free blocks, and
      FLUSH_FORCE_WRITE is to overrule my_disable_flush_pagecache_blocks.
    */
unknown's avatar
unknown committed
3786 3787 3788 3789 3790 3791 3792 3793
    int error= 0;
    uint count= 0;
    PAGECACHE_BLOCK_LINK **pos, **end;
    PAGECACHE_BLOCK_LINK *first_in_switch= NULL;
    PAGECACHE_BLOCK_LINK *block, *next;
#if defined(PAGECACHE_DEBUG)
    uint cnt= 0;
#endif
unknown's avatar
unknown committed
3794 3795 3796

#ifdef THREAD
    struct st_file_in_flush us_flusher, *other_flusher;
3797
    us_flusher.file= file->file;
unknown's avatar
unknown committed
3798 3799 3800
    us_flusher.flush_queue.last_thread= NULL;
    us_flusher.first_in_switch= FALSE;
    while ((other_flusher= (struct st_file_in_flush *)
3801 3802
            hash_search(&pagecache->files_in_flush, (uchar *)&file->file,
                        sizeof(file->file))))
unknown's avatar
unknown committed
3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847
    {
      /*
        File is in flush already: wait, unless FLUSH_KEEP_LAZY. "Flusher"
        means "who can mark PCBLOCK_IN_FLUSH", i.e. caller of
        flush_pagecache_blocks_int().
      */
      struct st_my_thread_var *thread;
      if (type == FLUSH_KEEP_LAZY)
      {
        DBUG_PRINT("info",("FLUSH_KEEP_LAZY skips"));
        DBUG_RETURN(0);
      }
      thread= my_thread_var;
      wqueue_add_to_queue(&other_flusher->flush_queue, thread);
      do
      {
        KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait1",
                            ("suspend thread %ld", thread->id));
        pagecache_pthread_cond_wait(&thread->suspend,
                                    &pagecache->cache_lock);
      }
      while (thread->next);
    }
    /* we are the only flusher of this file now */
    while (my_hash_insert(&pagecache->files_in_flush, (uchar *)&us_flusher))
    {
      /*
        Out of memory, wait for flushers to empty the hash and retry; should
        rarely happen. Other threads are flushing the file; when done, they
        are going to remove themselves from the hash, and thus memory will
        appear again. However, this memory may be stolen by yet another thread
        (for a purpose unrelated to page cache), before we retry
        hash_insert(). So the loop may run for long. Only if the thread was
        killed do we abort the loop, returning 1 (error) which can cause the
        table to be marked as corrupted (cf maria_chk_size(), maria_close())
        and thus require a table check.
      */
      DBUG_ASSERT(0);
      pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
      if (my_thread_var->abort)
        DBUG_RETURN(1);		/* End if aborted by user */
      sleep(10);
      pagecache_pthread_mutex_lock(&pagecache->cache_lock);
    }
#endif
unknown's avatar
unknown committed
3848 3849 3850 3851

    if (type != FLUSH_IGNORE_CHANGED)
    {
      /*
unknown's avatar
unknown committed
3852 3853
        Count how many key blocks we have to cache to be able
        to flush all dirty pages with minimum seek moves.
unknown's avatar
unknown committed
3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887
      */
      for (block= pagecache->changed_blocks[FILE_HASH(*file)] ;
           block;
           block= block->next_changed)
      {
        if (block->hash_link->file.file == file->file)
        {
          count++;
          KEYCACHE_DBUG_ASSERT(count<= pagecache->blocks_used);
        }
      }
      /* Allocate a new buffer only if its bigger than the one we have */
      if (count > FLUSH_CACHE &&
          !(cache=
            (PAGECACHE_BLOCK_LINK**)
            my_malloc(sizeof(PAGECACHE_BLOCK_LINK*)*count, MYF(0))))
      {
        cache= cache_buff;
        count= FLUSH_CACHE;
      }
    }

    /* Retrieve the blocks and write them to a buffer to be flushed */
restart:
    end= (pos= cache)+count;
    for (block= pagecache->changed_blocks[FILE_HASH(*file)] ;
         block;
         block= next)
    {
#if defined(PAGECACHE_DEBUG)
      cnt++;
      KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
#endif
      next= block->next_changed;
unknown's avatar
unknown committed
3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900
      if (block->hash_link->file.file != file->file)
        continue;
      if (filter != NULL)
      {
        int filter_res= (*filter)(block->type, block->hash_link->pageno,
                                  block->rec_lsn, filter_arg);
        DBUG_PRINT("info",("filter returned %d", filter_res));
        if (filter_res == FLUSH_FILTER_SKIP_TRY_NEXT)
          continue;
        if (filter_res == FLUSH_FILTER_SKIP_ALL)
          break;
        DBUG_ASSERT(filter_res == FLUSH_FILTER_OK);
      }
unknown's avatar
unknown committed
3901 3902 3903 3904
      {
        /*
           Mark the block with BLOCK_IN_FLUSH in order not to let
           other threads to use it for new pages and interfere with
unknown's avatar
unknown committed
3905
           our sequence of flushing dirty file pages
unknown's avatar
unknown committed
3906
        */
unknown's avatar
unknown committed
3907
        block->status|= PCBLOCK_IN_FLUSH;
unknown's avatar
unknown committed
3908

unknown's avatar
unknown committed
3909
        if (! (block->status & PCBLOCK_IN_SWITCH))
unknown's avatar
unknown committed
3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924
        {
	  /*
	    We care only for the blocks for which flushing was not
	    initiated by other threads as a result of page swapping
          */
          reg_requests(pagecache, block, 1);
          if (type != FLUSH_IGNORE_CHANGED)
          {
	    /* It's not a temporary file */
            if (pos == end)
            {
	      /*
		This happens only if there is not enough
		memory for the big block
              */
3925 3926 3927
              if ((rc|= flush_cached_blocks(pagecache, file, cache,
                                            end, type, &error)) &
                  PCFLUSH_ERROR)
unknown's avatar
unknown committed
3928
                last_errno=error;
3929
              DBUG_PRINT("info", ("restarting..."));
unknown's avatar
unknown committed
3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946
              /*
		Restart the scan as some other thread might have changed
		the changed blocks chain: the blocks that were in switch
		state before the flush started have to be excluded
              */
              goto restart;
            }
            *pos++= block;
          }
          else
          {
            /* It's a temporary file */
            pagecache->blocks_changed--;
	    pagecache->global_blocks_changed--;
            free_block(pagecache, block);
          }
        }
unknown's avatar
unknown committed
3947
        else if (type != FLUSH_KEEP_LAZY)
unknown's avatar
unknown committed
3948
        {
3949
          /*
unknown's avatar
unknown committed
3950 3951
            Link the block into a list of blocks 'in switch', and then we will
            wait for this list to be empty, which means they have been flushed
3952
          */
unknown's avatar
unknown committed
3953 3954 3955
          unlink_changed(block);
          link_changed(block, &first_in_switch);
          us_flusher.first_in_switch= TRUE;
unknown's avatar
unknown committed
3956 3957 3958 3959 3960
        }
      }
    }
    if (pos != cache)
    {
3961 3962
      if ((rc|= flush_cached_blocks(pagecache, file, cache, pos, type,
                                    &error)) & PCFLUSH_ERROR)
unknown's avatar
unknown committed
3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974
        last_errno= error;
    }
    /* Wait until list of blocks in switch is empty */
    while (first_in_switch)
    {
#if defined(PAGECACHE_DEBUG)
      cnt= 0;
#endif
      block= first_in_switch;
      {
#ifdef THREAD
        struct st_my_thread_var *thread= my_thread_var;
3975
        wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
unknown's avatar
unknown committed
3976 3977
        do
        {
unknown's avatar
unknown committed
3978
          KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait2",
unknown's avatar
unknown committed
3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993
                              ("suspend thread %ld", thread->id));
          pagecache_pthread_cond_wait(&thread->suspend,
                                     &pagecache->cache_lock);
        }
        while (thread->next);
#else
        KEYCACHE_DBUG_ASSERT(0);
        /* No parallel requests in single-threaded case */
#endif
      }
#if defined(PAGECACHE_DEBUG)
      cnt++;
      KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
#endif
    }
unknown's avatar
unknown committed
3994
    us_flusher.first_in_switch= FALSE;
unknown's avatar
unknown committed
3995
    /* The following happens very seldom */
unknown's avatar
unknown committed
3996 3997
    if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
           type == FLUSH_FORCE_WRITE))
unknown's avatar
unknown committed
3998
    {
unknown's avatar
unknown committed
3999 4000 4001 4002 4003
      /*
        this code would free all blocks while filter maybe handled only a
        few, that is not possible.
      */
      DBUG_ASSERT(filter == NULL);
unknown's avatar
unknown committed
4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016
#if defined(PAGECACHE_DEBUG)
      cnt=0;
#endif
      for (block= pagecache->file_blocks[FILE_HASH(*file)] ;
           block;
           block= next)
      {
#if defined(PAGECACHE_DEBUG)
        cnt++;
        KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
#endif
        next= block->next_changed;
        if (block->hash_link->file.file == file->file &&
unknown's avatar
unknown committed
4017
            (! (block->status & PCBLOCK_CHANGED)
unknown's avatar
unknown committed
4018 4019 4020 4021 4022 4023 4024
             || type == FLUSH_IGNORE_CHANGED))
        {
          reg_requests(pagecache, block, 1);
          free_block(pagecache, block);
        }
      }
    }
unknown's avatar
unknown committed
4025 4026 4027 4028 4029 4030
#ifdef THREAD
    /* wake up others waiting to flush this file */
    hash_delete(&pagecache->files_in_flush, (uchar *)&us_flusher);
    if (us_flusher.flush_queue.last_thread)
      wqueue_release_queue(&us_flusher.flush_queue);
#endif
unknown's avatar
unknown committed
4031 4032 4033 4034 4035 4036 4037
  }

#ifndef DBUG_OFF
  DBUG_EXECUTE("check_pagecache",
               test_key_cache(pagecache, "end of flush_pagecache_blocks", 0););
#endif
  if (cache != cache_buff)
unknown's avatar
unknown committed
4038
    my_free((uchar*) cache, MYF(0));
4039 4040 4041 4042 4043 4044
  if (rc != 0)
  {
    if (last_errno)
      my_errno= last_errno;                /* Return first error */
    DBUG_PRINT("error", ("Got error: %d", my_errno));
  }
4045
  DBUG_RETURN(rc);
unknown's avatar
unknown committed
4046 4047 4048
}


unknown's avatar
unknown committed
4049 4050
/**
   @brief flush all blocks for a file to disk
unknown's avatar
unknown committed
4051

unknown's avatar
unknown committed
4052 4053 4054 4055
   @param  pagecache       pointer to a pagecache data structure
   @param  file            handler for the file to flush to
   @param  flush_type      type of the flush
   @param  filter          optional function which tells what blocks to flush;
unknown's avatar
unknown committed
4056 4057
                           can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
                           or FLUSH_FORCE_WRITE.
unknown's avatar
unknown committed
4058 4059
   @param  filter_arg      an argument to pass to 'filter'. Information about
                           the block will be passed too.
unknown's avatar
unknown committed
4060

unknown's avatar
unknown committed
4061
   @return Operation status
4062 4063 4064 4065
   @retval PCFLUSH_OK OK
   @retval PCFLUSH_ERROR There was errors during the flush process.
   @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
   @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
unknown's avatar
unknown committed
4066 4067
*/

unknown's avatar
unknown committed
4068 4069 4070 4071 4072
int flush_pagecache_blocks_with_filter(PAGECACHE *pagecache,
                                       PAGECACHE_FILE *file,
                                       enum flush_type type,
                                       PAGECACHE_FLUSH_FILTER filter,
                                       void *filter_arg)
unknown's avatar
unknown committed
4073 4074 4075
{
  int res;
  DBUG_ENTER("flush_pagecache_blocks");
4076
  DBUG_PRINT("enter", ("pagecache: 0x%lx", (long) pagecache));
unknown's avatar
unknown committed
4077 4078 4079 4080 4081

  if (pagecache->disk_blocks <= 0)
    DBUG_RETURN(0);
  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
  inc_counter_for_resize_op(pagecache);
unknown's avatar
unknown committed
4082
  res= flush_pagecache_blocks_int(pagecache, file, type, filter, filter_arg);
unknown's avatar
unknown committed
4083 4084 4085 4086 4087 4088 4089 4090 4091 4092
  dec_counter_for_resize_op(pagecache);
  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
  DBUG_RETURN(res);
}


/*
  Reset the counters of a key cache.

  SYNOPSIS
4093
    reset_pagecache_counters()
unknown's avatar
unknown committed
4094
    name       the name of a key cache
4095
    pagecache  pointer to the pagecache to be reset
unknown's avatar
unknown committed
4096 4097

  DESCRIPTION
4098 4099
    This procedure is used to reset the counters of all currently used key
    caches, both the default one and the named ones.
unknown's avatar
unknown committed
4100 4101 4102 4103 4104

  RETURN
    0 on success (always because it can't fail)
*/

4105 4106
int reset_pagecache_counters(const char *name __attribute__((unused)),
                             PAGECACHE *pagecache)
unknown's avatar
unknown committed
4107
{
4108 4109
  DBUG_ENTER("reset_pagecache_counters");
  if (!pagecache->inited)
unknown's avatar
unknown committed
4110 4111 4112 4113 4114 4115
  {
    DBUG_PRINT("info", ("Key cache %s not initialized.", name));
    DBUG_RETURN(0);
  }
  DBUG_PRINT("info", ("Resetting counters for key cache %s.", name));

4116 4117 4118 4119 4120
  pagecache->global_blocks_changed= 0;   /* Key_blocks_not_flushed */
  pagecache->global_cache_r_requests= 0; /* Key_read_requests */
  pagecache->global_cache_read= 0;       /* Key_reads */
  pagecache->global_cache_w_requests= 0; /* Key_write_requests */
  pagecache->global_cache_write= 0;      /* Key_writes */
unknown's avatar
unknown committed
4121 4122 4123 4124
  DBUG_RETURN(0);
}


4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142
/**
   @brief Allocates a buffer and stores in it some info about all dirty pages

   Does the allocation because the caller cannot know the size itself.
   Memory freeing is to be done by the caller (if the "str" member of the
   LEX_STRING is not NULL).
   Ignores all pages of another type than PAGECACHE_LSN_PAGE, because they
   are not interesting for a checkpoint record.
   The caller has the intention of doing checkpoints.

   @param       pagecache   pointer to the page cache
   @param[out]  str         pointer to where the allocated buffer, and
                            its size, will be put
   @param[out]  min_rec_lsn pointer to where the minimum rec_lsn of all
                            relevant dirty pages will be put
   @return Operation status
     @retval 0      OK
     @retval 1      Error
unknown's avatar
unknown committed
4143
*/
4144

4145
my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
unknown's avatar
unknown committed
4146
                                                  LEX_STRING *str,
unknown's avatar
unknown committed
4147
                                                  LSN *min_rec_lsn)
unknown's avatar
unknown committed
4148
{
unknown's avatar
unknown committed
4149
  my_bool error= 0;
4150
  ulong stored_list_size= 0;
unknown's avatar
unknown committed
4151 4152
  uint file_hash;
  char *ptr;
unknown's avatar
unknown committed
4153
  LSN minimum_rec_lsn= LSN_MAX;
unknown's avatar
unknown committed
4154 4155
  DBUG_ENTER("pagecache_collect_changed_blocks_with_LSN");

4156
  DBUG_ASSERT(NULL == str->str);
unknown's avatar
unknown committed
4157 4158 4159 4160 4161
  /*
    We lock the entire cache but will be quick, just reading/writing a few MBs
    of memory at most.
  */
  pagecache_pthread_mutex_lock(&pagecache->cache_lock);
unknown's avatar
unknown committed
4162 4163
#ifdef THREAD
  for (;;)
4164
  {
unknown's avatar
unknown committed
4165 4166 4167 4168 4169 4170 4171 4172 4173
    struct st_file_in_flush *other_flusher;
    for (file_hash= 0;
         (other_flusher= (struct st_file_in_flush *)
          hash_element(&pagecache->files_in_flush, file_hash)) != NULL &&
           !other_flusher->first_in_switch;
         file_hash++)
    {}
    if (other_flusher == NULL)
      break;
4174
    /*
unknown's avatar
unknown committed
4175 4176 4177 4178 4179 4180 4181 4182
      other_flusher.first_in_switch is true: some thread is flushing a file
      and has removed dirty blocks from changed_blocks[] while they were still
      dirty (they were being evicted (=>flushed) by yet another thread, which
      may not have flushed the block yet so it may still be dirty).
      If Checkpoint proceeds now, it will not see the page. If there is a
      crash right after writing the checkpoint record, before the page is
      flushed, at recovery the page will be wrongly ignored because it won't
      be in the dirty pages list in the checkpoint record. So wait.
4183
    */
unknown's avatar
unknown committed
4184 4185 4186 4187 4188
    {
      struct st_my_thread_var *thread= my_thread_var;
      wqueue_add_to_queue(&other_flusher->flush_queue, thread);
      do
      {
unknown's avatar
unknown committed
4189
        KEYCACHE_DBUG_PRINT("pagecache_collect_changed_blocks_with_lsn: wait",
unknown's avatar
unknown committed
4190 4191 4192 4193 4194 4195
                            ("suspend thread %ld", thread->id));
        pagecache_pthread_cond_wait(&thread->suspend,
                                    &pagecache->cache_lock);
      }
      while (thread->next);
    }
4196
  }
unknown's avatar
unknown committed
4197
#endif
unknown's avatar
unknown committed
4198 4199 4200 4201 4202 4203 4204 4205 4206 4207

  /* Count how many dirty pages are interesting */
  for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
  {
    PAGECACHE_BLOCK_LINK *block;
    for (block= pagecache->changed_blocks[file_hash] ;
         block;
         block= block->next_changed)
    {
      /*
unknown's avatar
unknown committed
4208
        Q: is there something subtle with block->hash_link: can it be NULL?
unknown's avatar
unknown committed
4209 4210 4211
        does it have to be == hash_link->block... ?
      */
      DBUG_ASSERT(block->hash_link != NULL);
unknown's avatar
unknown committed
4212
      DBUG_ASSERT(block->status & PCBLOCK_CHANGED);
unknown's avatar
unknown committed
4213
      /* Note that we don't store bitmap pages */
unknown's avatar
unknown committed
4214 4215
      if (block->type != PAGECACHE_LSN_PAGE)
        continue; /* no need to store it */
4216
      stored_list_size++;
unknown's avatar
unknown committed
4217 4218 4219
    }
  }

4220 4221
  compile_time_assert(sizeof(pagecache->blocks) <= 8);
  str->length= 8 + /* number of dirty pages */
unknown's avatar
unknown committed
4222 4223
    (2 + /* table id */
     1 + /* data or index file */
4224 4225 4226
     4 + /* pageno */
     LSN_STORE_SIZE /* rec_lsn */
     ) * stored_list_size;
unknown's avatar
unknown committed
4227 4228 4229
  if (NULL == (str->str= my_malloc(str->length, MYF(MY_WME))))
    goto err;
  ptr= str->str;
4230 4231
  int8store(ptr, (ulonglong)stored_list_size);
  ptr+= 8;
unknown's avatar
unknown committed
4232 4233
  DBUG_PRINT("info", ("found %lu dirty pages", stored_list_size));
  if (stored_list_size == 0)
unknown's avatar
unknown committed
4234 4235 4236 4237 4238 4239 4240 4241
    goto end;
  for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
  {
    PAGECACHE_BLOCK_LINK *block;
    for (block= pagecache->changed_blocks[file_hash] ;
         block;
         block= block->next_changed)
    {
unknown's avatar
unknown committed
4242 4243
      uint16 table_id;
      MARIA_SHARE *share;
unknown's avatar
unknown committed
4244 4245
      if (block->type != PAGECACHE_LSN_PAGE)
        continue; /* no need to store it in the checkpoint record */
unknown's avatar
unknown committed
4246
      compile_time_assert(sizeof(block->hash_link->pageno) <= 4);
unknown's avatar
unknown committed
4247 4248 4249 4250 4251 4252
      share= (MARIA_SHARE *)(block->hash_link->file.callback_data);
      table_id= share->id;
      int2store(ptr, table_id);
      ptr+= 2;
      ptr[0]= (share->kfile.file == block->hash_link->file.file);
      ptr++;
unknown's avatar
unknown committed
4253 4254
      int4store(ptr, block->hash_link->pageno);
      ptr+= 4;
4255 4256
      lsn_store(ptr, block->rec_lsn);
      ptr+= LSN_STORE_SIZE;
unknown's avatar
unknown committed
4257
      if (block->rec_lsn != LSN_MAX)
4258
      {
unknown's avatar
unknown committed
4259
        DBUG_ASSERT(LSN_VALID(block->rec_lsn));
4260 4261
        if (cmp_translog_addr(block->rec_lsn, minimum_rec_lsn) < 0)
          minimum_rec_lsn= block->rec_lsn;
unknown's avatar
unknown committed
4262
      } /* otherwise, some trn->rec_lsn should hold the correct info */
unknown's avatar
unknown committed
4263 4264 4265 4266
    }
  }
end:
  pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4267
  *min_rec_lsn= minimum_rec_lsn;
unknown's avatar
unknown committed
4268
  DBUG_RETURN(error);
unknown's avatar
unknown committed
4269 4270 4271 4272

err:
  error= 1;
  goto end;
unknown's avatar
unknown committed
4273 4274 4275
}


unknown's avatar
unknown committed
4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287
#ifndef DBUG_OFF
/*
  Test if disk-cache is ok
*/
static void test_key_cache(PAGECACHE *pagecache __attribute__((unused)),
                           const char *where __attribute__((unused)),
                           my_bool lock __attribute__((unused)))
{
  /* TODO */
}
#endif

4288 4289 4290 4291 4292
uchar *pagecache_block_link_to_buffer(PAGECACHE_BLOCK_LINK *block)
{
  return block->buffer;
}

unknown's avatar
unknown committed
4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381
#if defined(PAGECACHE_TIMEOUT)

#define KEYCACHE_DUMP_FILE  "pagecache_dump.txt"
#define MAX_QUEUE_LEN  100


static void pagecache_dump(PAGECACHE *pagecache)
{
  FILE *pagecache_dump_file=fopen(KEYCACHE_DUMP_FILE, "w");
  struct st_my_thread_var *last;
  struct st_my_thread_var *thread;
  PAGECACHE_BLOCK_LINK *block;
  PAGECACHE_HASH_LINK *hash_link;
  PAGECACHE_PAGE *page;
  uint i;

  fprintf(pagecache_dump_file, "thread:%u\n", thread->id);

  i=0;
  thread=last=waiting_for_hash_link.last_thread;
  fprintf(pagecache_dump_file, "queue of threads waiting for hash link\n");
  if (thread)
    do
    {
      thread= thread->next;
      page= (PAGECACHE_PAGE *) thread->opt_info;
      fprintf(pagecache_dump_file,
              "thread:%u, (file,pageno)=(%u,%lu)\n",
              thread->id,(uint) page->file.file,(ulong) page->pageno);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);

  i=0;
  thread=last=waiting_for_block.last_thread;
  fprintf(pagecache_dump_file, "queue of threads waiting for block\n");
  if (thread)
    do
    {
      thread=thread->next;
      hash_link= (PAGECACHE_HASH_LINK *) thread->opt_info;
      fprintf(pagecache_dump_file,
        "thread:%u hash_link:%u (file,pageno)=(%u,%lu)\n",
        thread->id, (uint) PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link),
        (uint) hash_link->file.file,(ulong) hash_link->pageno);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);

  for (i=0 ; i < pagecache->blocks_used ; i++)
  {
    int j;
    block= &pagecache->block_root[i];
    hash_link= block->hash_link;
    fprintf(pagecache_dump_file,
            "block:%u hash_link:%d status:%x #requests=%u waiting_for_readers:%d\n",
            i, (int) (hash_link ?
                      PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link) :
                      -1),
            block->status, block->requests, block->condvar ? 1 : 0);
    for (j=0 ; j < COND_SIZE; j++)
    {
      PAGECACHE_WQUEUE *wqueue=&block->wqueue[j];
      thread= last= wqueue->last_thread;
      fprintf(pagecache_dump_file, "queue #%d\n", j);
      if (thread)
      {
        do
        {
          thread=thread->next;
          fprintf(pagecache_dump_file,
                  "thread:%u\n", thread->id);
          if (++i == MAX_QUEUE_LEN)
            break;
        }
        while (thread != last);
      }
    }
  }
  fprintf(pagecache_dump_file, "LRU chain:");
  block= pagecache= used_last;
  if (block)
  {
    do
    {
      block= block->next_used;
      fprintf(pagecache_dump_file,
unknown's avatar
unknown committed
4382
              "block:%u, ", PCBLOCK_NUMBER(pagecache, block));
unknown's avatar
unknown committed
4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456
    }
    while (block != pagecache->used_last);
  }
  fprintf(pagecache_dump_file, "\n");

  fclose(pagecache_dump_file);
}

#endif /* defined(PAGECACHE_TIMEOUT) */

#if defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)


static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex)
{
  int rc;
  struct timeval  now;            /* time when we started waiting        */
  struct timespec timeout;        /* timeout value for the wait function */
  struct timezone tz;
#if defined(PAGECACHE_DEBUG)
  int cnt=0;
#endif

  /* Get current time */
  gettimeofday(&now, &tz);
  /* Prepare timeout value */
  timeout.tv_sec= now.tv_sec + PAGECACHE_TIMEOUT;
 /*
   timeval uses microseconds.
   timespec uses nanoseconds.
   1 nanosecond = 1000 micro seconds
 */
  timeout.tv_nsec= now.tv_usec * 1000;
  KEYCACHE_THREAD_TRACE_END("started waiting");
#if defined(PAGECACHE_DEBUG)
  cnt++;
  if (cnt % 100 == 0)
    fprintf(pagecache_debug_log, "waiting...\n");
    fflush(pagecache_debug_log);
#endif
  rc= pthread_cond_timedwait(cond, mutex, &timeout);
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
  if (rc == ETIMEDOUT || rc == ETIME)
  {
#if defined(PAGECACHE_DEBUG)
    fprintf(pagecache_debug_log,"aborted by pagecache timeout\n");
    fclose(pagecache_debug_log);
    abort();
#endif
    pagecache_dump();
  }

#if defined(PAGECACHE_DEBUG)
  KEYCACHE_DBUG_ASSERT(rc != ETIMEDOUT);
#else
  assert(rc != ETIMEDOUT);
#endif
  return rc;
}
#else
#if defined(PAGECACHE_DEBUG)
static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex)
{
  int rc;
  KEYCACHE_THREAD_TRACE_END("started waiting");
  rc= pthread_cond_wait(cond, mutex);
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
  return rc;
}
#endif
#endif /* defined(PAGECACHE_TIMEOUT) && !defined(__WIN__) */

4457
#if defined(PAGECACHE_DEBUG)
unknown's avatar
unknown committed
4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509
static int ___pagecache_pthread_mutex_lock(pthread_mutex_t *mutex)
{
  int rc;
  rc= pthread_mutex_lock(mutex);
  KEYCACHE_THREAD_TRACE_BEGIN("");
  return rc;
}


static void ___pagecache_pthread_mutex_unlock(pthread_mutex_t *mutex)
{
  KEYCACHE_THREAD_TRACE_END("");
  pthread_mutex_unlock(mutex);
}


static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond)
{
  int rc;
  KEYCACHE_THREAD_TRACE("signal");
  rc= pthread_cond_signal(cond);
  return rc;
}


#if defined(PAGECACHE_DEBUG_LOG)


static void pagecache_debug_print(const char * fmt, ...)
{
  va_list args;
  va_start(args,fmt);
  if (pagecache_debug_log)
  {
    VOID(vfprintf(pagecache_debug_log, fmt, args));
    VOID(fputc('\n',pagecache_debug_log));
  }
  va_end(args);
}
#endif /* defined(PAGECACHE_DEBUG_LOG) */

#if defined(PAGECACHE_DEBUG_LOG)


void pagecache_debug_log_close(void)
{
  if (pagecache_debug_log)
    fclose(pagecache_debug_log);
}
#endif /* defined(PAGECACHE_DEBUG_LOG) */

#endif /* defined(PAGECACHE_DEBUG) */