/* Copyright (c) 2005 PrimeBase Technologies GmbH * * PrimeBase XT * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * 2008-01-24 Paul McCullagh * * Row lock functions. * * H&G2JCtL */ #include "xt_config.h" #ifdef DRIZZLED #include <bitset> #endif #include <stdio.h> #include "lock_xt.h" #include "thread_xt.h" #include "table_xt.h" #include "xaction_xt.h" #include "database_xt.h" #include "trace_xt.h" #ifdef DEBUG //#define XT_TRACE_LOCKS //#define CHECK_ROWLOCK_GROUP_CONSISTENCY #endif /* * This function should never be called. It indicates a link * error! */ xtPublic void xt_log_atomic_error_and_abort(c_char *func, c_char *file, u_int line) { xt_logf(NULL, func, file, line, XT_LOG_ERROR, "%s", "Atomic operations not supported\n"); abort(); } /* * ----------------------------------------------------------------------- * ROW LOCKS, LIST BASED */ #ifdef XT_USE_LIST_BASED_ROW_LOCKS #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY /* * Requires a spin-lock on group->lg_lock! */ static void check_rowlock_group(XTLockGroupPtr group) { XTThreadPtr self = xt_get_self(); char *crash = NULL; if (group->lg_lock.spl_locker != self) *crash = 1; if (group->lg_list_in_use > group->lg_list_size) *crash = 1; xtRowID prev_row = 0; XTLockItemPtr item = group->lg_list; for (int i = 0; i < group->lg_list_in_use; i++, item++) { if (!item->li_thread_id) *crash = 1; if(!xt_thr_array[item->li_thread_id]->st_xact_data) *crash = 1; if(item->li_count > XT_TEMP_LOCK_BYTES) *crash = 1; // rows per thread must obey the row_id > prev_row_id + prev_count*group_size rule if (prev_row >= item->li_row_id) *crash = 1; // calculate the new prev. row if (item->li_count < XT_TEMP_LOCK_BYTES) prev_row = item->li_row_id + (item->li_count - 1) * XT_ROW_LOCK_GROUP_COUNT; else prev_row = item->li_row_id; } } #endif static int xlock_cmp_row_ids(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b) { xtRowID row_id = *((xtTableID *) a); XTLockItemPtr item = (XTLockItemPtr) b; if (row_id < item->li_row_id) return -1; if (row_id > item->li_row_id) return 1; return 0; } void XTRowLockList::xt_remove_all_locks(struct XTDatabase *, XTThreadPtr thread) { #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "remove all locks\n"); #endif if (!bl_count) return; xtThreadID thd_id; XTPermRowLockPtr plock; #ifndef XT_USE_TABLE_REF XTOpenTablePtr pot = NULL; #endif thd_id = thread->t_id; plock = (XTPermRowLockPtr) bl_data; for (u_int i=0; i<bl_count; i++) { #ifdef XT_USE_TABLE_REF XTTableHPtr tab = plock->pr_table; #else if (!xt_db_open_pool_table_ns(&pot, db, plock->pr_tab_id)) { /* Should not happen, but just in case, we just don't * remove the lock. We will probably end up with a deadlock * somewhere. */ xt_log_and_clear_exception_ns(); } else { #endif for (int j=0; j<XT_ROW_LOCK_GROUP_COUNT; j++) { if (plock->pr_group[j]) { /* Go through group j and compact. */ #ifndef XT_USE_TABLE_REF XTTableHPtr tab = pot->ot_table; #endif XTLockGroupPtr group; XTLockItemPtr copy; XTLockItemPtr item; int new_count; group = &tab->tab_locks.rl_groups[j]; xt_spinlock_lock(&group->lg_lock); copy = group->lg_list; item = group->lg_list; new_count = 0; for (size_t k=0; k<group->lg_list_in_use; k++) { if (item->li_thread_id != thd_id) { if (copy != item) { copy->li_row_id = item->li_row_id; copy->li_count = item->li_count; copy->li_thread_id = item->li_thread_id; } new_count++; copy++; } #ifdef XT_TRACE_LOCKS else { if (item->li_count == XT_TEMP_LOCK_BYTES) xt_ttracef(xt_get_self(), "remove group %d lock row_id=%d TEMP\n", j, (int) item->li_row_id); else xt_ttracef(xt_get_self(), "remove group %d locks row_id=%d (%d)\n", j, (int) item->li_row_id, (int) item->li_count); } #endif item++; } group->lg_list_in_use = new_count; #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif if (group->lg_wait_queue) tab->tab_locks.rl_grant_locks(group, thread); xt_spinlock_unlock(&group->lg_lock); xt_xn_wakeup_thread_list(thread); } } #ifdef XT_USE_TABLE_REF xt_heap_release(NULL, plock->pr_table); #else xt_db_return_table_to_pool_ns(pot); } #endif plock++; } bl_count = 0; } #ifdef DEBUG_LOCK_QUEUE int *dummy_ptr = 0; void XTRowLocks::rl_check(XTLockWaitPtr no_lw) { XTLockGroupPtr group; XTLockWaitPtr lw, lw_prev; for (int i=0; i<XT_ROW_LOCK_GROUP_COUNT; i++) { group = &rl_groups[i]; xt_spinlock_lock(&group->lg_lock); lw = group->lg_wait_queue; lw_prev = NULL; while (lw) { if (lw == no_lw) *dummy_ptr = 1; if (lw->lw_prev != lw_prev) *dummy_ptr = 2; lw_prev = lw; lw = lw->lw_next; } xt_spinlock_unlock(&group->lg_lock); } } #endif xtBool XTRowLocks::rl_lock_row(XTLockGroupPtr group, XTLockWaitPtr lw, XTRowLockListPtr, int *result) { XTLockItemPtr item; size_t index; xtRowID row_id = lw->lw_row_id; #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif if (group->lg_list_size == group->lg_list_in_use) { if (!xt_realloc_ns((void **) &group->lg_list, (group->lg_list_size + 2) * sizeof(XTLockItemRec))) return FAILED; group->lg_list_size += 2; } item = (XTLockItemPtr) xt_bsearch(NULL, &row_id, group->lg_list, group->lg_list_in_use, sizeof(XTLockItemRec), &index, NULL, xlock_cmp_row_ids); /* There's no item with this ID, but there could be an item with a range that covers this row */ if (!item && group->lg_list_in_use) { if (index > 0) { int count; item = group->lg_list + index - 1; count = item->li_count; if (item->li_count == XT_TEMP_LOCK_BYTES) count = 1; if (row_id >= item->li_row_id + count * XT_ROW_LOCK_GROUP_COUNT) item = NULL; } } if (item) { /* Item already exists. */ if (item->li_thread_id == lw->lw_thread->t_id) { /* Already have a permanent lock: */ *result = XT_NO_LOCK; lw->lw_curr_lock = XT_NO_LOCK; return OK; } /* {REMOVE-LOCKS} * This must be valid because a thread must remove * the locks before it frees its st_xact_data structure, * xt_thr_array entry must also be valid, because * transaction must be ended before the thread is * killed. */ *result = item->li_count == XT_TEMP_LOCK_BYTES ? XT_TEMP_LOCK : XT_PERM_LOCK; lw->lw_xn_id = xt_thr_array[item->li_thread_id]->st_xact_data->xd_start_xn_id; lw->lw_curr_lock = *result; return OK; } /* Add the lock: */ XT_MEMMOVE(group->lg_list, &group->lg_list[index+1], &group->lg_list[index], (group->lg_list_in_use - index) * sizeof(XTLockItemRec)); group->lg_list[index].li_row_id = row_id; group->lg_list[index].li_count = XT_TEMP_LOCK_BYTES; group->lg_list[index].li_thread_id = lw->lw_thread->t_id; group->lg_list_in_use++; #ifdef XT_TRACE_LOCKS xt_ttracef(ot->ot_thread, "set temp lock row=%d setby=%s\n", (int) row_id, xt_get_self()->t_name); #endif #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif *result = XT_NO_LOCK; lw->lw_ot->ot_temp_row_lock = row_id; lw->lw_curr_lock = XT_NO_LOCK; return OK; } void XTRowLocks::rl_grant_locks(XTLockGroupPtr group, XTThreadPtr thread) { XTLockWaitPtr lw, lw_next, lw_prev; int result; xtThreadID lw_thd_id; thread->st_thread_list_count = 0; lw = group->lg_wait_queue; while (lw) { lw_next = lw->lw_next; lw_prev = lw->lw_prev; lw_thd_id = lw->lw_thread->t_id; /* NOTE: after lw_curr_lock is changed, lw may no longer be referenced * by this function!!! */ if (!rl_lock_row(group, lw, &lw->lw_thread->st_lock_list, &result)) { /* We transfer the error to the other thread! */ XTThreadPtr self = xt_get_self(); result = XT_LOCK_ERR; memcpy(&lw->lw_thread->t_exception, &self->t_exception, sizeof(XTExceptionRec)); lw->lw_curr_lock = XT_LOCK_ERR; } if (result == XT_NO_LOCK || result == XT_LOCK_ERR) { /* Remove from the wait queue: */ if (lw_next) lw_next->lw_prev = lw_prev; if (lw_prev) lw_prev->lw_next = lw_next; if (group->lg_wait_queue == lw) group->lg_wait_queue = lw_next; if (group->lg_wait_queue_end == lw) group->lg_wait_queue_end = lw_prev; if (result == XT_NO_LOCK) { /* Add to the thread list: */ if (thread->st_thread_list_count == thread->st_thread_list_size) { if (!xt_realloc_ns((void **) &thread->st_thread_list, (thread->st_thread_list_size+1) * sizeof(xtThreadID))) { xt_xn_wakeup_thread(lw_thd_id); goto done; } thread->st_thread_list_size++; } thread->st_thread_list[thread->st_thread_list_count] = lw_thd_id; thread->st_thread_list_count++; done:; } } lw = lw_next; } } void XTRowLocks::xt_cancel_temp_lock(XTLockWaitPtr lw) { XTLockGroupPtr group; group = &rl_groups[lw->lw_row_id % XT_ROW_LOCK_GROUP_COUNT]; xt_spinlock_lock(&group->lg_lock); if (lw->lw_curr_lock == XT_TEMP_LOCK || lw->lw_curr_lock == XT_PERM_LOCK) { /* In case of XT_LOCK_ERR or XT_NO_LOCK, the lw structure will * no longer be on the wait queue. */ XTLockWaitPtr lw_next, lw_prev; lw_next = lw->lw_next; lw_prev = lw->lw_prev; /* Remove from the wait queue: */ if (lw_next) lw_next->lw_prev = lw_prev; if (lw_prev) lw_prev->lw_next = lw_next; if (group->lg_wait_queue == lw) group->lg_wait_queue = lw_next; if (group->lg_wait_queue_end == lw) group->lg_wait_queue_end = lw_prev; } xt_spinlock_unlock(&group->lg_lock); } //#define QUEUE_ORDER_FIFO /* Try to lock a row. * This function returns: * XT_NO_LOCK on success. * XT_TEMP_LOCK if there is a temporary lock on the row. * XT_PERM_LOCK if there is a permanent lock in the row. * XT_FAILED an error occured. * * If there is a lock on this row, the transaction ID of the * locker is also returned. * * The caller must wait if the row is locked. If the lock is * permanent, then the caller must wait for the transaction to * terminate. If the lock is temporary, then the caller must * wait for the transaction to signal that the lock has been * released. */ xtBool XTRowLocks::xt_set_temp_lock(XTOpenTablePtr ot, XTLockWaitPtr lw, XTRowLockListPtr lock_list) { XTLockGroupPtr group; int result; if (ot->ot_temp_row_lock) { /* Check if we don't already have this temp lock: */ if (ot->ot_temp_row_lock == lw->lw_row_id) { lw->lw_curr_lock = XT_NO_LOCK; return OK; } xt_make_lock_permanent(ot, lock_list); } /* Add a temporary lock. */ group = &rl_groups[lw->lw_row_id % XT_ROW_LOCK_GROUP_COUNT]; xt_spinlock_lock(&group->lg_lock); if (!rl_lock_row(group, lw, lock_list, &result)) { xt_spinlock_unlock(&group->lg_lock); return FAILED; } if (result != XT_NO_LOCK) { /* Add the thread to the end of the thread queue: */ #ifdef QUEUE_ORDER_FIFO if (group->lg_wait_queue_end) { group->lg_wait_queue_end->lw_next = lw; lw->lw_prev = group->lg_wait_queue_end; } else { group->lg_wait_queue = lw; lw->lw_prev = NULL; } lw->lw_next = NULL; group->lg_wait_queue_end = lw; #else XTLockWaitPtr pos = group->lg_wait_queue_end; xtXactID xn_id = ot->ot_thread->st_xact_data->xd_start_xn_id; while (pos) { if (pos->lw_thread->st_xact_data->xd_start_xn_id < xn_id) break; pos = pos->lw_prev; } if (pos) { lw->lw_prev = pos; lw->lw_next = pos->lw_next; if (pos->lw_next) pos->lw_next->lw_prev = lw; else group->lg_wait_queue_end = lw; pos->lw_next = lw; } else { /* Front of the queue: */ lw->lw_prev = NULL; lw->lw_next = group->lg_wait_queue; if (group->lg_wait_queue) group->lg_wait_queue->lw_prev = lw; else group->lg_wait_queue_end = lw; group->lg_wait_queue = lw; } #endif } xt_spinlock_unlock(&group->lg_lock); return OK; } /* * Remove a temporary lock. * * If updated is set to TRUE this means that the row was update. * This means that any thread waiting on the temporary lock will * also have to wait for the transaction to quit before * continuing. * * If the thread were to continue it would just hang again because * it will discover that the transaction has updated the row. * * So the 'updated' flag is an optimisation which prevents the * thread from making an unncessary retry. */ void XTRowLocks::xt_remove_temp_lock(XTOpenTablePtr ot, xtBool updated) { xtRowID row_id; XTLockGroupPtr group; XTLockItemPtr item; size_t index; xtBool lock_granted = FALSE; xtThreadID locking_thread_id = 0; if (!(row_id = ot->ot_temp_row_lock)) return; group = &rl_groups[row_id % XT_ROW_LOCK_GROUP_COUNT]; xt_spinlock_lock(&group->lg_lock); #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "remove temp lock %d\n", (int) row_id); #endif item = (XTLockItemPtr) xt_bsearch(NULL, &row_id, group->lg_list, group->lg_list_in_use, sizeof(XTLockItemRec), &index, NULL, xlock_cmp_row_ids); if (item) { /* Item exists. */ if (item->li_thread_id == ot->ot_thread->t_id && item->li_count == XT_TEMP_LOCK_BYTES) { XTLockWaitPtr lw; /* First check if there is some thread waiting to take over this lock: */ lw = group->lg_wait_queue; while (lw) { if (lw->lw_row_id == row_id) { lock_granted = TRUE; break; } lw = lw->lw_next; } if (lock_granted) { /* Grant the lock just released... */ XTLockWaitPtr lw_next, lw_prev; xtXactID locking_xact_id; /* Store this info, lw will soon be untouchable! */ lw_next = lw->lw_next; lw_prev = lw->lw_prev; locking_xact_id = lw->lw_thread->st_xact_data->xd_start_xn_id; locking_thread_id = lw->lw_thread->t_id; /* Lock has moved from one thread to the next. * change the thread holding this lock: */ item->li_thread_id = locking_thread_id; /* Remove from the wait queue: */ if (lw_next) lw_next->lw_prev = lw_prev; if (lw_prev) lw_prev->lw_next = lw_next; if (group->lg_wait_queue == lw) group->lg_wait_queue = lw_next; if (group->lg_wait_queue_end == lw) group->lg_wait_queue_end = lw_prev; /* If the thread that release the lock updated the * row then we will have to wait for the transaction * to terminate: */ if (updated) { lw->lw_row_updated = TRUE; lw->lw_updating_xn_id = ot->ot_thread->st_xact_data->xd_start_xn_id; } /* The thread has the lock now: */ lw->lw_ot->ot_temp_row_lock = row_id; lw->lw_curr_lock = XT_NO_LOCK; /* Everyone after this that is waiting for the same lock is * now waiting for a different transaction: */ lw = lw_next; while (lw) { if (lw->lw_row_id == row_id) { lw->lw_xn_id = locking_xact_id; lw->lw_curr_lock = XT_TEMP_LOCK; } lw = lw->lw_next; } } else { /* Remove the lock: */ XT_MEMMOVE(group->lg_list, &group->lg_list[index], &group->lg_list[index+1], (group->lg_list_in_use - index - 1) * sizeof(XTLockItemRec)); group->lg_list_in_use--; } } } #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif xt_spinlock_unlock(&group->lg_lock); ot->ot_temp_row_lock = 0; if (lock_granted) xt_xn_wakeup_thread(locking_thread_id); } xtBool XTRowLocks::xt_make_lock_permanent(XTOpenTablePtr ot, XTRowLockListPtr lock_list) { xtRowID row_id; XTLockGroupPtr group; XTLockItemPtr item; size_t index; if (!(row_id = ot->ot_temp_row_lock)) return OK; #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "make lock perm %d\n", (int) ot->ot_temp_row_lock); #endif /* Add to the lock list: */ XTPermRowLockPtr locks = (XTPermRowLockPtr) lock_list->bl_data; for (unsigned i=0; i<lock_list->bl_count; i++) { #ifdef XT_USE_TABLE_REF if (locks->pr_table == ot->ot_table) { #else if (locks->pr_tab_id == ot->ot_table->tab_id) { #endif locks->pr_group[row_id % XT_ROW_LOCK_GROUP_COUNT] = 1; goto done; } locks++; } /* Add new to lock list: */ { XTPermRowLockRec perm_lock; #ifdef XT_USE_TABLE_REF perm_lock.pr_table = ot->ot_table; xt_heap_reference(NULL, perm_lock.pr_table); #else perm_lock.pr_tab_id = ot->ot_table->tab_id; #endif memset(perm_lock.pr_group, 0, XT_ROW_LOCK_GROUP_COUNT); perm_lock.pr_group[row_id % XT_ROW_LOCK_GROUP_COUNT] = 1; if (!xt_bl_append(NULL, lock_list, &perm_lock)) { xt_remove_temp_lock(ot, FALSE); return FAILED; } } done: group = &rl_groups[row_id % XT_ROW_LOCK_GROUP_COUNT]; xt_spinlock_lock(&group->lg_lock); item = (XTLockItemPtr) xt_bsearch(NULL, &row_id, group->lg_list, group->lg_list_in_use, sizeof(XTLockItemRec), &index, NULL, xlock_cmp_row_ids); ASSERT_NS(item); #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif if (item) { /* Lock exists (it should!). */ if (item->li_thread_id == ot->ot_thread->t_id && item->li_count == XT_TEMP_LOCK_BYTES) { if (index > 0 && group->lg_list[index-1].li_thread_id == ot->ot_thread->t_id && group->lg_list[index-1].li_count < XT_TEMP_LOCK_BYTES-2 && group->lg_list[index-1].li_row_id == row_id - (XT_ROW_LOCK_GROUP_COUNT * group->lg_list[index-1].li_count)) { group->lg_list[index-1].li_count++; /* Combine with the left: */ if (index + 1 < group->lg_list_in_use && group->lg_list[index+1].li_thread_id == ot->ot_thread->t_id && group->lg_list[index+1].li_count != XT_TEMP_LOCK_BYTES && group->lg_list[index+1].li_row_id == row_id + XT_ROW_LOCK_GROUP_COUNT) { /* And combine with the right */ u_int left = group->lg_list[index-1].li_count + group->lg_list[index+1].li_count; u_int right; if (left > XT_TEMP_LOCK_BYTES-1) { right = left - (XT_TEMP_LOCK_BYTES-1); left = XT_TEMP_LOCK_BYTES-1; } else right = 0; group->lg_list[index-1].li_count = left; if (right) { /* There is something left over on the right: */ group->lg_list[index+1].li_count = right; group->lg_list[index+1].li_row_id = group->lg_list[index-1].li_row_id + left * XT_ROW_LOCK_GROUP_COUNT; XT_MEMMOVE(group->lg_list, &group->lg_list[index], &group->lg_list[index+1], (group->lg_list_in_use - index - 1) * sizeof(XTLockItemRec)); group->lg_list_in_use--; } else { XT_MEMMOVE(group->lg_list, &group->lg_list[index], &group->lg_list[index+2], (group->lg_list_in_use - index - 2) * sizeof(XTLockItemRec)); group->lg_list_in_use -= 2; } } else { XT_MEMMOVE(group->lg_list, &group->lg_list[index], &group->lg_list[index+1], (group->lg_list_in_use - index - 1) * sizeof(XTLockItemRec)); group->lg_list_in_use--; } } else if (index + 1 < group->lg_list_in_use && group->lg_list[index+1].li_thread_id == ot->ot_thread->t_id && group->lg_list[index+1].li_count < XT_TEMP_LOCK_BYTES-2 && group->lg_list[index+1].li_row_id == row_id + XT_ROW_LOCK_GROUP_COUNT) { /* Combine with the right: */ group->lg_list[index+1].li_count++; group->lg_list[index+1].li_row_id = row_id; XT_MEMMOVE(group->lg_list, &group->lg_list[index], &group->lg_list[index+1], (group->lg_list_in_use - index - 1) * sizeof(XTLockItemRec)); group->lg_list_in_use--; } else group->lg_list[index].li_count = 1; } } #ifdef CHECK_ROWLOCK_GROUP_CONSISTENCY check_rowlock_group(group); #endif xt_spinlock_unlock(&group->lg_lock); ot->ot_temp_row_lock = 0; return OK; } xtBool xt_init_row_locks(XTRowLocksPtr rl) { for (int i=0; i<XT_ROW_LOCK_GROUP_COUNT; i++) { xt_spinlock_init_with_autoname(NULL, &rl->rl_groups[i].lg_lock); rl->rl_groups[i].lg_wait_queue = NULL; rl->rl_groups[i].lg_list_size = 0; rl->rl_groups[i].lg_list_in_use = 0; rl->rl_groups[i].lg_list = NULL; } return OK; } void xt_exit_row_locks(XTRowLocksPtr rl) { for (int i=0; i<XT_ROW_LOCK_GROUP_COUNT; i++) { xt_spinlock_free(NULL, &rl->rl_groups[i].lg_lock); rl->rl_groups[i].lg_wait_queue = NULL; rl->rl_groups[i].lg_list_size = 0; rl->rl_groups[i].lg_list_in_use = 0; if (rl->rl_groups[i].lg_list) { xt_free_ns(rl->rl_groups[i].lg_list); rl->rl_groups[i].lg_list = NULL; } } } /* * ----------------------------------------------------------------------- * ROW LOCKS, HASH BASED */ #else // XT_USE_LIST_BASED_ROW_LOCKS void XTRowLockList::old_xt_remove_all_locks(struct XTDatabase *db, xtThreadID thd_id) { #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "remove all locks\n"); #endif if (!bl_count) return; int pgroup; xtTableID ptab_id; XTPermRowLockPtr plock; XTOpenTablePtr pot = NULL; plock = (XTPermRowLockPtr) &bl_data[bl_count * bl_item_size]; for (u_int i=0; i<bl_count; i++) { plock--; pgroup = plock->pr_group; ptab_id = plock->pr_tab_id; if (pot) { if (pot->ot_table->tab_id == ptab_id) goto remove_lock; xt_db_return_table_to_pool_ns(pot); pot = NULL; } if (!xt_db_open_pool_table_ns(&pot, db, ptab_id)) { /* Should not happen, but just in case, we just don't * remove the lock. We will probably end up with a deadlock * somewhere. */ xt_log_and_clear_exception_ns(); goto skip_remove_lock; } if (!pot) /* Can happen of the table has been dropped: */ goto skip_remove_lock; remove_lock: #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "remove lock group=%d\n", pgroup); #endif pot->ot_table->tab_locks.tab_row_locks[pgroup] = NULL; pot->ot_table->tab_locks.tab_lock_perm[pgroup] = 0; skip_remove_lock:; } bl_count = 0; if (pot) xt_db_return_table_to_pool_ns(pot); } /* Try to lock a row. * This function returns: * XT_NO_LOCK on success. * XT_TEMP_LOCK if there is a temporary lock on the row. * XT_PERM_LOCK if there is a permanent lock in the row. * * If there is a lock on this row, the transaction ID of the * locker is also returned. * * The caller must wait if the row is locked. If the lock is * permanent, then the caller must wait for the transaction to * terminate. If the lock is temporary, then the caller must * wait for the transaction to signal that the lock has been * released. */ int XTRowLocks::old_xt_set_temp_lock(XTOpenTablePtr ot, xtRowID row, xtXactID *xn_id, XTRowLockListPtr lock_list) { int group; XTXactDataPtr xact, my_xact; if (ot->ot_temp_row_lock) { /* Check if we don't already have this temp lock: */ if (ot->ot_temp_row_lock == row) { gl->lw_curr_lock = XT_NO_LOCK; return XT_NO_LOCK; } xt_make_lock_permanent(ot, lock_list); } my_xact = ot->ot_thread->st_xact_data; group = row % XT_ROW_LOCK_COUNT; if ((xact = tab_row_locks[group])) { if (xact == my_xact) return XT_NO_LOCK; *xn_id = xact->xd_start_xn_id; return tab_lock_perm[group] ? XT_PERM_LOCK : XT_TEMP_LOCK; } tab_row_locks[row % XT_ROW_LOCK_COUNT] = my_xact; #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "set temp lock %d group=%d for %s\n", (int) row, (int) row % XT_ROW_LOCK_COUNT, ot->ot_thread->t_name); #endif ot->ot_temp_row_lock = row; return XT_NO_LOCK; } /* Just check if there is a lock on the row. * This function returns: * XT_NO_LOCK if there is no lock. * XT_TEMP_LOCK if there is a temporary lock on the row. * XT_PERM_LOCK if a lock is a permanent lock in the row. */ int XTRowLocks::old_xt_is_locked(struct XTOpenTable *ot, xtRowID row, xtXactID *xn_id) { int group; XTXactDataPtr xact; group = row % XT_ROW_LOCK_COUNT; if ((xact = tab_row_locks[group])) { if (xact == ot->ot_thread->st_xact_data) return XT_NO_LOCK; *xn_id = xact->xd_start_xn_id; if (tab_lock_perm[group]) return XT_PERM_LOCK; return XT_TEMP_LOCK; } return XT_NO_LOCK; } void XTRowLocks::old_xt_remove_temp_lock(XTOpenTablePtr ot) { int group; XTXactDataPtr xact, my_xact; if (!ot->ot_temp_row_lock) return; my_xact = ot->ot_thread->st_xact_data; group = ot->ot_temp_row_lock % XT_ROW_LOCK_COUNT; #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "remove temp lock %d group=%d\n", (int) ot->ot_temp_row_lock, (int) ot->ot_temp_row_lock % XT_ROW_LOCK_COUNT); #endif ot->ot_temp_row_lock = 0; if ((xact = tab_row_locks[group])) { if (xact == my_xact) tab_row_locks[group] = NULL; } if (ot->ot_table->tab_db->db_xn_wait_count) xt_xn_wakeup_transactions(ot->ot_table->tab_db, ot->ot_thread); } xtBool XTRowLocks::old_xt_make_lock_permanent(XTOpenTablePtr ot, XTRowLockListPtr lock_list) { int group; if (!ot->ot_temp_row_lock) return OK; #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "make lock perm %d group=%d\n", (int) ot->ot_temp_row_lock, (int) ot->ot_temp_row_lock % XT_ROW_LOCK_COUNT); #endif /* Check if the lock is already permanent: */ group = ot->ot_temp_row_lock % XT_ROW_LOCK_COUNT; if (!tab_lock_perm[group]) { XTPermRowLockRec plock; plock.pr_tab_id = ot->ot_table->tab_id; plock.pr_group = group; if (!xt_bl_append(NULL, lock_list, &plock)) { xt_remove_temp_lock(ot); return FAILED; } tab_lock_perm[group] = 1; } ot->ot_temp_row_lock = 0; return OK; } /* Release this lock, and all locks gained after this lock * on this table. * * The locks are only released temporarily. The will be regained * below using regain locks. * * Returns: * XT_NO_LOCK if no lock is released. * XT_PERM_LOCK if a lock is released. * * Note that only permanent locks can be released in this way. * So if the thread has a temporary lock, it will first be made * permanent. * * {RELEASING-LOCKS} * The idea of the releasing locks comes from the fact that each * lock, locks a group of records. * So if T1 has a lock (e.g. when doing SELECT FOR UPDATE), * and then encounters an updated record x * from T2, and it must wait for T2, it firsts releases the * lock, just in case T2 tries to gain a lock on another * record y in the same group, which will cause it to * wait on T1. * * However, there are several problems with releasing * locks. * - It can cause a "live-lock", where another transation * keeps getting in before. * - It may not solve the problem in all cases because * the SELECT FOR UPDATE has locked other record groups * before it encountered record x. * - Further problems occur when locks are granted by * callback: * T1 waits for T2, because it has a lock on record x * T2 releases the lock because it must wait for T3 * T1 is granted the lock (but does not know about this yet) * T2 tries to regain lock (after T3 quits) and * must wait for T1 - DEADLOCK * * In general, it does not make sense to release locks * when it can be granted again by a callback. * * TODO: 2 possible solutions: * - Do not lock groups, lock rows. * UPDATE INTENSION ROW LOCK * - Use multiple lock types: * UPDATE INTENSION LOCK (required first) * SHARED UPDATE LOCK (used by INSERT or DELETE) * EXCLUSIVE UPDATE LOCK (used by SELECT FOR UPDATE) * * Temporary solution. Do not release any locks. int XTRowLocks::xt_release_locks(struct XTOpenTable *ot, xtRowID row, XTRowLockListPtr lock_list) */ /* * Regain a lock previously held. This function regains locks * released by xt_release_locks(). * * It will return lock_type and xn_id if the row is locked, and therefore * regain cannot continue. In this case, the caller must wait. * It returns XT_NO_LOCK if there are no more locks to be regained. * * Locks are always regained in the order in which they were originally * taken. xtBool XTRowLocks::xt_regain_locks(struct XTOpenTable *ot, int *lock_type, xtXactID *xn_id, XTRowLockListPtr lock_list) */ xtBool old_xt_init_row_locks(XTRowLocksPtr rl) { memset(rl->tab_lock_perm, 0, XT_ROW_LOCK_COUNT); memset(rl->tab_row_locks, 0, XT_ROW_LOCK_COUNT * sizeof(XTXactDataPtr)); return OK; } void old_xt_exit_row_locks(XTRowLocksPtr XT_UNUSED(rl)) { } #endif // XT_USE_LIST_BASED_ROW_LOCKS xtPublic xtBool xt_init_row_lock_list(XTRowLockListPtr lock_list) { lock_list->bl_item_size = sizeof(XTPermRowLockRec); lock_list->bl_size = 0; lock_list->bl_count = 0; lock_list->bl_data = NULL; return OK; } xtPublic void xt_exit_row_lock_list(XTRowLockListPtr lock_list) { xt_bl_set_size(NULL, lock_list, 0); } /* * ----------------------------------------------------------------------- * SPECIAL EXCLUSIVE/SHARED (XS) LOCK */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_rwmutex_init(struct XTThread *self, XTRWMutexPtr xsl, const char *n) #else xtPublic void xt_rwmutex_init(XTThreadPtr self, XTRWMutexPtr xsl) #endif { #ifdef DEBUG xsl->xs_lock_thread = 0; xsl->xs_inited = 12345; #endif xt_init_mutex_with_autoname(self, &xsl->xs_lock); xt_init_cond(self, &xsl->xs_cond); xt_atomic_set4(&xsl->xs_state, 0); xsl->xs_xlocker = 0; /* Must be aligned! */ ASSERT(xt_thr_maximum_threads == xt_align_size(xt_thr_maximum_threads, XT_XS_LOCK_ALIGN)); xsl->x.xs_rlock = (xtWord1 *) xt_calloc(self, xt_thr_maximum_threads); #ifdef XT_THREAD_LOCK_INFO xsl->xs_name = n; xt_thread_lock_info_init(&xsl->xs_lock_info, xsl); #endif } xtPublic void xt_rwmutex_free(XTThreadPtr self, XTRWMutexPtr xsl) { #ifdef DEBUG ASSERT(!xsl->xs_lock_thread); ASSERT(xsl->xs_inited == 12345); xsl->xs_inited = 0; #endif if (xsl->x.xs_rlock) xt_free(self, (void *) xsl->x.xs_rlock); xt_free_mutex(&xsl->xs_lock); xt_free_cond(&xsl->xs_cond); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&xsl->xs_lock_info); #endif } xtPublic xtBool xt_rwmutex_xlock(XTRWMutexPtr xsl, xtThreadID thd_id) { #ifdef DEBUG ASSERT_NS(xsl->xs_inited == 12345); #endif ASSERT_NS(xt_get_self()->t_id == thd_id); xt_lock_mutex_ns(&xsl->xs_lock); ASSERT_NS(xsl->x.xs_rlock[thd_id] == XT_NO_LOCK); /* Wait for exclusive locker: */ while (xsl->xs_xlocker) { if (!xt_timed_wait_cond_ns(&xsl->xs_cond, &xsl->xs_lock, 10000)) { xt_unlock_mutex_ns(&xsl->xs_lock); return FAILED; } } /* I am the locker (set state before locker!): */ xt_atomic_set4(&xsl->xs_state, 0); xsl->xs_xlocker = thd_id; /* Wait for all the read lockers: */ while (xsl->xs_state < xt_thr_current_max_threads) { while (xsl->x.xs_rlock[xsl->xs_state]) { /* {RACE-WR_MUTEX} * Just in case of this, we keep the wait time down! */ if (!xt_timed_wait_cond_ns(&xsl->xs_cond, &xsl->xs_lock, 10)) { xt_atomic_set4(&xsl->xs_state, 0); xsl->xs_xlocker = 0; xt_unlock_mutex_ns(&xsl->xs_lock); return FAILED; } } /* State can be incremented in parallel by a reader * thread! */ xt_atomic_set4(&xsl->xs_state, xsl->xs_state + 1); } /* I have waited for all: */ xt_atomic_set4(&xsl->xs_state, xt_thr_maximum_threads); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&xsl->xs_lock_info); #endif return OK; } xtPublic xtBool xt_rwmutex_slock(XTRWMutexPtr xsl, xtThreadID thd_id) { #ifdef DEBUG ASSERT_NS(xsl->xs_inited == 12345); #endif ASSERT_NS(xt_get_self()->t_id == thd_id); xt_atomic_inc1(&xsl->x.xs_rlock[thd_id]); if (xsl->x.xs_rlock[thd_id] > 1) return OK; /* Check if there could be an X locker: */ if (xsl->xs_xlocker) { /* There is an X locker. * If xs_state < thd_id then the X locker will wait for me. * So I should not wait! */ if (xsl->xs_state >= thd_id) { /* If xsl->xs_state >= thd_id, then the locker has already * checked me, and I will have to wait. * * Otherwise, xs_state <= thd_id, which means the * X locker has not checked me, and will still wait for me (or * is already waiting for me). In this case, I will have to * take the mutex to make sure exactly how far he * is with the checking. */ xt_lock_mutex_ns(&xsl->xs_lock); while (xsl->xs_state > thd_id && xsl->xs_xlocker) { if (!xt_timed_wait_cond_ns(&xsl->xs_cond, &xsl->xs_lock, 10000)) { xt_unlock_mutex_ns(&xsl->xs_lock); xsl->x.xs_rlock[thd_id]--; return FAILED; } } xt_unlock_mutex_ns(&xsl->xs_lock); } } /* There is no exclusive locker, so we have the read lock: */ ASSERT_NS(xsl->xs_state != xt_thr_maximum_threads); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&xsl->xs_lock_info); #endif return OK; } xtPublic xtBool xt_rwmutex_unlock(XTRWMutexPtr xsl, xtThreadID thd_id) { #ifdef DEBUG ASSERT_NS(xsl->xs_inited == 12345); #endif ASSERT_NS(xt_get_self()->t_id == thd_id); if (xsl->xs_xlocker == thd_id) { /* I have an X lock. */ ASSERT_NS(xsl->x.xs_rlock[thd_id] == XT_NO_LOCK); ASSERT_NS(xsl->xs_state == xt_thr_maximum_threads); xt_atomic_set4(&xsl->xs_state, 0); xsl->xs_xlocker = 0; xt_unlock_mutex_ns(&xsl->xs_lock); /* Wake up any other X or shared lockers: */ if (!xt_broadcast_cond_ns(&xsl->xs_cond)) return FAILED; } else { /* I have a shared lock: */ ASSERT_NS(xsl->x.xs_rlock[thd_id] > 0); ASSERT_NS(xsl->xs_state != xt_thr_maximum_threads); /* TODO: PMC - HOW can this fail?! - but it does? */ if (xsl->x.xs_rlock[thd_id] > 1) xsl->x.xs_rlock[thd_id]--; else { /* {RACE-WR_MUTEX}. * A BUG FIX: * * Previously I was checking "xsl->xs_xlocker" after, * descrementing the READ lock. * * This resulted in a race condition that could cause the * unlocking reader to hang in xt_lock_mutex_ns(). * This was because the X locker, grabbed the mutex (xs_lock) * but did not wait for the reader. * * The result was that the reader had to wait in UNLOCK * until the X locker did an unlock! * * This only became obvious when it caused a deadlock (because * the reader was waiting for the locker, which it should not * have been, of course). */ if (xsl->xs_xlocker) { xt_lock_mutex_ns(&xsl->xs_lock); if (xsl->xs_xlocker && xsl->xs_state == thd_id) { /* If the X locker is waiting for me, * then allow him to continue. */ if (!xt_broadcast_cond_ns(&xsl->xs_cond)) { xt_unlock_mutex_ns(&xsl->xs_lock); return FAILED; } } xt_atomic_dec1(&xsl->x.xs_rlock[thd_id]); xt_unlock_mutex_ns(&xsl->xs_lock); } else /* {RACE-WR_MUTEX} * There is a race condition between the check above, and the * the decrement here. * * However, if I check xsl->xs_xlocker afterwards, and then * try to get the lock xs_lock, I could hand for the duration * of the X lock. */ xt_atomic_dec1(&xsl->x.xs_rlock[thd_id]); } } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&xsl->xs_lock_info); #endif return OK; } /* * ----------------------------------------------------------------------- * SPIN LOCK */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_spinlock_init(XTThreadPtr self, XTSpinLockPtr spl, const char *n) #else xtPublic void xt_spinlock_init(XTThreadPtr self, XTSpinLockPtr spl) #endif { (void) self; spl->spl_lock = 0; #ifdef XT_NO_ATOMICS xt_init_mutex_with_autoname(self, &spl->spl_mutex); #endif #ifdef DEBUG spl->spl_locker = 0; #endif #ifdef XT_THREAD_LOCK_INFO spl->spl_name = n; xt_thread_lock_info_init(&spl->spl_lock_info, spl); #endif } xtPublic void xt_spinlock_free(XTThreadPtr XT_UNUSED(self), XTSpinLockPtr spl) { (void) spl; #ifdef XT_NO_ATOMICS xt_free_mutex(&spl->spl_mutex); #endif #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&spl->spl_lock_info); #endif } xtPublic xtBool xt_spinlock_spin(XTSpinLockPtr spl) { volatile xtWord4 *lck = &spl->spl_lock; for (;;) { for (int i=0; i<10; i++) { /* Check the lock variable: */ if (!*lck) { /* Try to get the lock: */ if (!xt_spinlock_set(spl)) goto done_ok; } } /* Go to "sleep" */ xt_critical_wait(); } done_ok: return OK; } #ifdef DEBUG xtPublic void xt_spinlock_set_thread(XTSpinLockPtr spl) { spl->spl_locker = xt_get_self(); } #endif /* * ----------------------------------------------------------------------- * FAST LOCK */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_fastlock_init(XTThreadPtr self, XTFastLockPtr fal, const char *n) #else xtPublic void xt_fastlock_init(XTThreadPtr self, XTFastLockPtr fal) #endif { xt_spinlock_init_with_autoname(self, &fal->fal_spinlock); xt_spinlock_init_with_autoname(self, &fal->fal_wait_lock); for (u_int i=0; i<XT_FAST_LOCK_MAX_WAIT; i++) fal->fal_wait_list[i] = NULL; fal->fal_wait_count = 0; fal->fal_wait_wakeup = 0; fal->fal_wait_alloc = 0; #ifdef XT_THREAD_LOCK_INFO fal->fal_name = n; xt_thread_lock_info_init(&fal->fal_lock_info, fal); #endif } xtPublic void xt_fastlock_free(XTThreadPtr self, XTFastLockPtr fal) { xt_spinlock_free(self, &fal->fal_spinlock); xt_spinlock_free(self, &fal->fal_wait_lock); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&fal->fal_lock_info); #endif } xtPublic xtBool xt_fastlock_spin(XTFastLockPtr fal, XTThreadPtr thread) { volatile xtWord4 *lck = &fal->fal_spinlock.spl_lock; do { for (int i=0; i<10; i++) { /* Check the lock variable: */ if (!*lck) { /* Try to get the lock: */ if (!xt_spinlock_set(&fal->fal_spinlock)) { fal->fal_locker = thread; return OK; } } } for (int i=0; i<10; i++) { xt_critical_wait(); if (!*lck) { /* Try to get the lock: */ if (!xt_spinlock_set(&fal->fal_spinlock)) { fal->fal_locker = thread; return OK; } } } /* Wait for a wakeup */ xt_spinlock_lock(&fal->fal_wait_lock); if (fal->fal_wait_count == XT_FAST_LOCK_MAX_WAIT) { xt_register_ulxterr(XT_REG_CONTEXT, XT_ERR_TOO_MANY_WAITERS, (u_long) XT_FAST_LOCK_MAX_WAIT+1); xt_spinlock_unlock(&fal->fal_wait_lock); return FAILED; } while (fal->fal_wait_list[fal->fal_wait_alloc]) fal->fal_wait_alloc = (fal->fal_wait_alloc + 1) % XT_FAST_LOCK_MAX_WAIT; fal->fal_wait_list[fal->fal_wait_alloc] = thread; fal->fal_wait_alloc = (fal->fal_wait_alloc + 1) % XT_FAST_LOCK_MAX_WAIT; fal->fal_wait_count++; xt_lock_thread(thread); xt_spinlock_unlock(&fal->fal_wait_lock); if (!xt_wait_thread(thread)) { xt_unlock_thread(thread); if (fal->fal_locker == thread) xt_fastlock_unlock(fal, thread); return FAILED; } xt_unlock_thread(thread); } while (fal->fal_locker != thread); return OK; } /* Wake up one of the waiters. */ xtPublic void xt_fastlock_wakeup(XTFastLockPtr fal) { xt_spinlock_lock(&fal->fal_wait_lock); if (fal->fal_wait_count) { XTThreadPtr thread; /* Find a waiting thread, and give it the exclusive lock: */ while (!fal->fal_wait_list[fal->fal_wait_wakeup]) fal->fal_wait_wakeup = (fal->fal_wait_wakeup + 1) % XT_FAST_LOCK_MAX_WAIT; thread = fal->fal_wait_list[fal->fal_wait_wakeup]; fal->fal_wait_list[fal->fal_wait_wakeup] = NULL; fal->fal_wait_wakeup = (fal->fal_wait_wakeup + 1) % XT_FAST_LOCK_MAX_WAIT; fal->fal_wait_count--; fal->fal_locker = thread; xt_lock_thread(thread); xt_spinlock_unlock(&fal->fal_wait_lock); xt_signal_thread(thread); xt_unlock_thread(thread); } else { xt_spinlock_unlock(&fal->fal_wait_lock); fal->fal_locker = NULL; xt_spinlock_reset(&fal->fal_spinlock); } } /* * ----------------------------------------------------------------------- * READ/WRITE SPIN LOCK * * An extremely genius very fast read/write lock based on atomics! */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_spinxslock_init(struct XTThread *XT_UNUSED(self), XTSpinXSLockPtr sxs, const char *name) #else xtPublic void xt_spinxslock_init(struct XTThread *XT_UNUSED(self), XTSpinXSLockPtr sxs) #endif { sxs->sxs_xlocked = 0; sxs->sxs_xwaiter = 0; sxs->sxs_rlock_count = 0; sxs->sxs_wait_count = 0; #ifdef DEBUG sxs->sxs_locker = 0; #endif #ifdef XT_THREAD_LOCK_INFO sxs->sxs_name = name; xt_thread_lock_info_init(&sxs->sxs_lock_info, sxs); #endif } xtPublic void xt_spinxslock_free(struct XTThread *XT_UNUSED(self), XTSpinXSLockPtr sxs) { #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&sxs->sxs_lock_info); #else (void) sxs; #endif } xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thd_id)) { register xtWord2 set; /* Wait for exclusive locker: */ for (;;) { set = xt_atomic_tas2(&sxs->sxs_xlocked, 1); if (!set) break; if (try_lock) return FALSE; xt_yield(); } #ifdef DEBUG sxs->sxs_locker = thd_id; #endif /* Wait for all the readers to wait! */ while (sxs->sxs_wait_count < sxs->sxs_rlock_count) { sxs->sxs_xwaiter = 1; xt_yield(); //* /* This should not be required, because there is only one thread * accessing this value. However, the lock fails if this * is not done with an atomic op. * * This is because threads on other processors have the * value in processor cache. So they do not * notice that the value has been set to zero. * They think it is still 1 and march through * the barrier (sxs->sxs_xwaiter < sxs->sxs_xlocked) below. * * In the meantime, this X locker has gone on thinking * all is OK. */ xt_atomic_tas2(&sxs->sxs_xwaiter, 0); } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&sxs->sxs_lock_info); #endif return OK; } xtPublic xtBool xt_spinxslock_slock(XTSpinXSLockPtr sxs) { xt_atomic_inc2(&sxs->sxs_rlock_count); /* Wait as long as the locker is not waiting: */ while (sxs->sxs_xwaiter < sxs->sxs_xlocked) { xt_atomic_inc2(&sxs->sxs_wait_count); while (sxs->sxs_xwaiter < sxs->sxs_xlocked) { xt_yield(); } xt_atomic_dec2(&sxs->sxs_wait_count); } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&sxs->sxs_lock_info); #endif return OK; } xtPublic xtBool xt_spinxslock_unlock(XTSpinXSLockPtr sxs, xtBool xlocked) { if (xlocked) { #ifdef DEBUG ASSERT_NS(sxs->sxs_locker && sxs->sxs_xlocked); sxs->sxs_locker = 0; #endif sxs->sxs_xlocked = 0; } else { #ifdef DEBUG ASSERT_NS(sxs->sxs_rlock_count > 0); #endif xt_atomic_dec2(&sxs->sxs_rlock_count); } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&sxs->sxs_lock_info); #endif return OK; } /* * ----------------------------------------------------------------------- * FAST READ/WRITE LOCK (BASED ON FAST MUTEX) */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_xsmutex_init(struct XTThread *self, XTXSMutexLockPtr xsm, const char *name) #else xtPublic void xt_xsmutex_init(struct XTThread *self, XTXSMutexLockPtr xsm) #endif { xt_init_mutex_with_autoname(self, &xsm->xsm_lock); xt_init_cond(self, &xsm->xsm_cond); xt_init_cond(self, &xsm->xsm_cond_2); xsm->xsm_xlocker = 0; xsm->xsm_rlock_count = 0; xsm->xsm_wait_count = 0; #ifdef DEBUG xsm->xsm_locker = 0; #endif #ifdef XT_THREAD_LOCK_INFO xsm->xsm_name = name; xt_thread_lock_info_init(&xsm->xsm_lock_info, xsm); #endif } xtPublic void xt_xsmutex_free(struct XTThread *XT_UNUSED(self), XTXSMutexLockPtr xsm) { xt_free_mutex(&xsm->xsm_lock); xt_free_cond(&xsm->xsm_cond); xt_free_cond(&xsm->xsm_cond_2); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&xsm->xsm_lock_info); #endif } xtPublic xtBool xt_xsmutex_xlock(XTXSMutexLockPtr xsm, xtThreadID thd_id) { xt_lock_mutex_ns(&xsm->xsm_lock); /* Wait for exclusive locker: */ while (xsm->xsm_xlocker) { if (!xt_timed_wait_cond_ns(&xsm->xsm_cond, &xsm->xsm_lock, 10000)) { xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } /* GOTCHA: You would think this is not necessary... * But is does not always work, if a normal insert is used. * The reason is, I guess, on MMP the assignment is not * always immediately visible to other processors, because they * have old versions of this variable in there cache. * * But this is required, because the locking mechanism is based * on: * Locker: sets xlocker, tests rlock_count * Reader: incs rlock_count, tests xlocker * * The test, in both cases, may not read stale values. * volatile does not help, because this just turns compiler * optimisations off. */ xt_atomic_set4(&xsm->xsm_xlocker, thd_id); /* Wait for all the reader to wait! */ while (xsm->xsm_wait_count < xsm->xsm_rlock_count) { /* {RACE-WR_MUTEX} Here as well: */ if (!xt_timed_wait_cond_ns(&xsm->xsm_cond, &xsm->xsm_lock, 100)) { xsm->xsm_xlocker = 0; xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&xsm->xsm_lock_info); #endif return OK; } xtPublic xtBool xt_xsmutex_slock(XTXSMutexLockPtr xsm, xtThreadID XT_UNUSED(thd_id)) { xt_atomic_inc2(&xsm->xsm_rlock_count); /* Check if there could be an X locker: */ if (xsm->xsm_xlocker) { /* I am waiting... */ xt_lock_mutex_ns(&xsm->xsm_lock); xsm->xsm_wait_count++; /* Wake up the xlocker: */ if (xsm->xsm_xlocker && xsm->xsm_wait_count == xsm->xsm_rlock_count) { if (!xt_broadcast_cond_ns(&xsm->xsm_cond)) { xsm->xsm_wait_count--; xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } while (xsm->xsm_xlocker) { if (!xt_timed_wait_cond_ns(&xsm->xsm_cond_2, &xsm->xsm_lock, 10000)) { xsm->xsm_wait_count--; xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } xsm->xsm_wait_count--; xt_unlock_mutex_ns(&xsm->xsm_lock); } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&xsm->xsm_lock_info); #endif return OK; } xtPublic xtBool xt_xsmutex_unlock(XTXSMutexLockPtr xsm, xtThreadID thd_id) { if (xsm->xsm_xlocker == thd_id) { xsm->xsm_xlocker = 0; if (xsm->xsm_wait_count) { if (!xt_broadcast_cond_ns(&xsm->xsm_cond_2)) { xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } else { /* Wake up any other X or shared lockers: */ if (!xt_broadcast_cond_ns(&xsm->xsm_cond)) { xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } xt_unlock_mutex_ns(&xsm->xsm_lock); } else { /* Taking the advice from {RACE-WR_MUTEX} I do the decrement * after I have a lock! */ if (xsm->xsm_xlocker) { xt_lock_mutex_ns(&xsm->xsm_lock); xt_atomic_dec2(&xsm->xsm_rlock_count); if (xsm->xsm_xlocker && xsm->xsm_wait_count == xsm->xsm_rlock_count) { /* If the X locker is waiting for me, * then allow him to continue. */ if (!xt_broadcast_cond_ns(&xsm->xsm_cond)) { xt_unlock_mutex_ns(&xsm->xsm_lock); return FAILED; } } xt_unlock_mutex_ns(&xsm->xsm_lock); } else xt_atomic_dec2(&xsm->xsm_rlock_count); } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&xsm->xsm_lock_info); #endif return OK; } /* * ----------------------------------------------------------------------- * ATOMIC READ/WRITE LOCK (BASED ON ATOMIC OPERATIONS) */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_atomicrwlock_init(struct XTThread *XT_UNUSED(self), XTAtomicRWLockPtr arw, const char *n) #else xtPublic void xt_atomicrwlock_init(struct XTThread *XT_UNUSED(self), XTAtomicRWLockPtr arw) #endif { arw->arw_reader_count = 0; arw->arw_xlock_set = 0; #ifdef XT_THREAD_LOCK_INFO arw->arw_name = n; xt_thread_lock_info_init(&arw->arw_lock_info, arw); #endif } #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_atomicrwlock_free(struct XTThread *, XTAtomicRWLockPtr arw) #else xtPublic void xt_atomicrwlock_free(struct XTThread *, XTAtomicRWLockPtr XT_UNUSED(arw)) #endif { #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&arw->arw_lock_info); #endif } xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id)) { register xtWord2 set; /* First get an exclusive lock: */ for (;;) { set = xt_atomic_tas2(&arw->arw_xlock_set, 1); if (!set) break; if (try_lock) return FALSE; xt_yield(); } /* Wait for the remaining readers: */ while (arw->arw_reader_count) xt_yield(); #ifdef DEBUG arw->arw_locker = thr_id; #endif #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&arw->arw_lock_info); #endif return TRUE; } xtPublic xtBool xt_atomicrwlock_slock(XTAtomicRWLockPtr arw) { register xtWord2 set; /* First get an exclusive lock: */ for (;;) { set = xt_atomic_tas2(&arw->arw_xlock_set, 1); if (!set) break; xt_yield(); } /* Add a reader: */ xt_atomic_inc2(&arw->arw_reader_count); /* Release the xlock: */ arw->arw_xlock_set = 0; #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&arw->arw_lock_info); #endif return OK; } xtPublic xtBool xt_atomicrwlock_unlock(XTAtomicRWLockPtr arw, xtBool xlocked) { if (xlocked) { #ifdef DEBUG arw->arw_locker = 0; #endif arw->arw_xlock_set = 0; } else xt_atomic_dec2(&arw->arw_reader_count); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&arw->arw_lock_info); #endif return OK; } /* * ----------------------------------------------------------------------- * "SKEW" ATOMITC READ/WRITE LOCK (BASED ON ATOMIC OPERATIONS) * * This lock type favors writers. It only works if the proportion of readers * to writer is high. */ #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_skewrwlock_init(struct XTThread *XT_UNUSED(self), XTSkewRWLockPtr srw, const char *n) #else xtPublic void xt_skewrwlock_init(struct XTThread *XT_UNUSED(self), XTSkewRWLockPtr srw) #endif { srw->srw_reader_count = 0; srw->srw_xlock_set = 0; #ifdef XT_THREAD_LOCK_INFO srw->srw_name = n; xt_thread_lock_info_init(&srw->srw_lock_info, srw); #endif } #ifdef XT_THREAD_LOCK_INFO xtPublic void xt_skewrwlock_free(struct XTThread *, XTSkewRWLockPtr srw) #else xtPublic void xt_skewrwlock_free(struct XTThread *, XTSkewRWLockPtr XT_UNUSED(srw)) #endif { #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&srw->srw_lock_info); #endif } xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id)) { register xtWord2 set; /* First get an exclusive lock: */ for (;;) { set = xt_atomic_tas2(&srw->srw_xlock_set, 1); if (!set) break; if (try_lock) return FALSE; xt_yield(); } /* Wait for the remaining readers: */ while (srw->srw_reader_count) xt_yield(); #ifdef DEBUG srw->srw_locker = thr_id; #endif #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&srw->srw_lock_info); #endif return TRUE; } xtPublic xtBool xt_skewrwlock_slock(XTSkewRWLockPtr srw) { /* Wait for an exclusive lock: */ retry: for (;;) { if (!srw->srw_xlock_set) break; xt_yield(); } /* Add a reader: */ xt_atomic_inc2(&srw->srw_reader_count); /* Check for xlock again: */ if (srw->srw_xlock_set) { xt_atomic_dec2(&srw->srw_reader_count); goto retry; } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&srw->srw_lock_info); #endif return OK; } xtPublic xtBool xt_skewrwlock_unlock(XTSkewRWLockPtr srw, xtBool xlocked) { if (xlocked) srw->srw_xlock_set = 0; else xt_atomic_dec2(&srw->srw_reader_count); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&srw->srw_lock_info); #endif #ifdef DEBUG srw->srw_locker = 0; #endif return OK; } /* * ----------------------------------------------------------------------- * RECURSIVE R/W LOCK (allows X lockers to lock again) */ #ifdef XT_THREAD_LOCK_INFO void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm, const char *name) { rm->rm_locker = NULL; rm->rm_lock_count = 0; xt_init_mutex(self, &rm->rm_mutex, name); } #else xtPublic void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm) { rm->rm_locker = NULL; rm->rm_lock_count = 0; xt_init_mutex(self, &rm->rm_mutex); } #endif xtPublic void xt_recursivemutex_free(XTRecursiveMutexPtr rm) { xt_free_mutex(&rm->rm_mutex); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&rm->rm_lock_info); #endif } xtPublic void xt_recursivemutex_lock(XTThreadPtr self, XTRecursiveMutexPtr rm) { if (self != rm->rm_locker) { xt_lock_mutex(self, &rm->rm_mutex); rm->rm_locker = self; } rm->rm_lock_count++; } xtPublic void xt_recursivemutex_unlock(XTThreadPtr self, XTRecursiveMutexPtr rm) { ASSERT(self == rm->rm_locker); ASSERT(rm->rm_lock_count > 0); rm->rm_lock_count--; if (!rm->rm_lock_count) { rm->rm_locker = NULL; xt_unlock_mutex(self, &rm->rm_mutex); } } /* * ----------------------------------------------------------------------- * RECURSIVE MUTEX (allows lockers to lock again) */ #ifdef XT_THREAD_LOCK_INFO void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw, const char *name) { rrw->rrw_locker = NULL; rrw->rrw_lock_count = 0; xt_init_rwlock(self, &rrw->rrw_lock, name); } #else void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw) { rrw->rrw_locker = NULL; rrw->rrw_lock_count = 0; xt_init_rwlock(self, &rrw->rrw_lock); } #endif void xt_recurrwlock_free(XTRecurRWLockPtr rrw) { xt_free_rwlock(&rrw->rrw_lock); #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_free(&rrw->rrw_lock_info); #endif } void xt_recurrwlock_xlock(struct XTThread *self, XTRecurRWLockPtr rrw) { if (self != rrw->rrw_locker) { xt_xlock_rwlock(self, &rrw->rrw_lock); rrw->rrw_locker = self; } rrw->rrw_lock_count++; } void xt_recurrwlock_slock(struct XTThread *self, XTRecurRWLockPtr rrw) { xt_slock_rwlock(self, &rrw->rrw_lock); } void xt_recurrwlock_slock_ns(XTRecurRWLockPtr rrw) { xt_slock_rwlock_ns(&rrw->rrw_lock); } void xt_recurrwlock_unxlock(struct XTThread *self, XTRecurRWLockPtr rrw) { ASSERT(self == rrw->rrw_locker); ASSERT(rrw->rrw_lock_count > 0); rrw->rrw_lock_count--; if (!rrw->rrw_lock_count) { rrw->rrw_locker = NULL; xt_unlock_rwlock(self, &rrw->rrw_lock); } } void xt_recurrwlock_unslock(struct XTThread *self, XTRecurRWLockPtr rrw) { xt_unlock_rwlock(self, &rrw->rrw_lock); } void xt_recurrwlock_unslock_ns(XTRecurRWLockPtr rrw) { xt_unlock_rwlock_ns(&rrw->rrw_lock); } /* * ----------------------------------------------------------------------- * UNIT TESTS */ #define JOB_MEMCPY 1 #define JOB_SLEEP 2 #define JOB_PRINT 3 #define JOB_INCREMENT 4 #define JOB_SNOOZE 5 #define JOB_DOUBLE_INC 6 #define LOCK_PTHREAD_RW 1 #define LOCK_PTHREAD_MUTEX 2 #define LOCK_RWMUTEX 3 #define LOCK_SPINLOCK 4 #define LOCK_FASTLOCK 5 #define LOCK_SPINXSLOCK 6 #define LOCK_XSMUTEX 7 #define LOCK_ATOMICRWLOCK 8 #define LOCK_SKEWRWLOCK 9 typedef struct XSLockTest { u_int xs_interations; xtBool xs_which_lock; xtBool xs_which_job; xtBool xs_debug_print; XTRWMutexRec xs_lock; xt_rwlock_type xs_plock; XTSpinLockRec xs_spinlock; xt_mutex_type xs_mutex; XTFastLockRec xs_fastlock; XTSpinXSLockRec xs_spinrwlock; XTXSMutexRec xs_fastrwlock; XTAtomicRWLockRec xs_atomicrwlock; XTSkewRWLockRec xs_skewrwlock; int xs_progress; xtWord4 xs_inc; } XSLockTestRec, *XSLockTestPtr; static void lck_free_thread_data(XTThreadPtr XT_UNUSED(self), void *XT_UNUSED(data)) { } static void lck_do_job(XTThreadPtr self, int job, XSLockTestPtr data, xtBool reader) { char b1[1024], b2[1024]; switch (job) { case JOB_MEMCPY: memset(b1, 0, sizeof(b1)); memset(b2, 1, sizeof(b2)); data->xs_inc++; break; case JOB_SLEEP: xt_sleep_milli_second(1); data->xs_inc++; break; case JOB_PRINT: printf("- %s got lock\n", self->t_name); xt_sleep_milli_second(10); data->xs_inc++; break; case JOB_INCREMENT: data->xs_inc++; break; case JOB_SNOOZE: xt_sleep_milli_second(10); data->xs_inc++; break; case JOB_DOUBLE_INC: if (reader) { if ((data->xs_inc & 1) != 0) printf("Noooo!\n"); } else { data->xs_inc++; data->xs_inc++; } break; } } #if 0 static void *lck_run_dumper(XTThreadPtr self) { int state = 0; while (state != 1) { sleep(1); if (state == 2) { xt_dump_trace(); state = 0; } } } #endif static void *lck_run_reader(XTThreadPtr self) { XSLockTestRec *data = (XSLockTestRec *) self->t_data; if (data->xs_debug_print) printf("- %s start\n", self->t_name); for (u_int i=0; i<data->xs_interations; i++) { if (data->xs_progress && ((i+1) % data->xs_progress) == 0) printf("- %s %d\n", self->t_name, i+1); if (data->xs_which_lock == LOCK_PTHREAD_RW) { xt_slock_rwlock_ns(&data->xs_plock); lck_do_job(self, data->xs_which_job, data, TRUE); xt_unlock_rwlock_ns(&data->xs_plock); } else if (data->xs_which_lock == LOCK_RWMUTEX) { xt_rwmutex_slock(&data->xs_lock, self->t_id); lck_do_job(self, data->xs_which_job, data, TRUE); xt_rwmutex_unlock(&data->xs_lock, self->t_id); } else if (data->xs_which_lock == LOCK_SPINXSLOCK) { xt_spinxslock_slock(&data->xs_spinrwlock); lck_do_job(self, data->xs_which_job, data, TRUE); xt_spinxslock_unlock(&data->xs_spinrwlock, FALSE); } else if (data->xs_which_lock == LOCK_XSMUTEX) { xt_xsmutex_slock(&data->xs_fastrwlock, self->t_id); lck_do_job(self, data->xs_which_job, data, TRUE); xt_xsmutex_unlock(&data->xs_fastrwlock, self->t_id); } else if (data->xs_which_lock == LOCK_ATOMICRWLOCK) { xt_atomicrwlock_slock(&data->xs_atomicrwlock); lck_do_job(self, data->xs_which_job, data, TRUE); xt_atomicrwlock_unlock(&data->xs_atomicrwlock, FALSE); } else if (data->xs_which_lock == LOCK_SKEWRWLOCK) { xt_skewrwlock_slock(&data->xs_skewrwlock); lck_do_job(self, data->xs_which_job, data, TRUE); xt_skewrwlock_unlock(&data->xs_skewrwlock, FALSE); } else ASSERT(FALSE); } if (data->xs_debug_print) printf("- %s stop\n", self->t_name); return NULL; } static void *lck_run_writer(XTThreadPtr self) { XSLockTestRec *data = (XSLockTestRec *) self->t_data; if (data->xs_debug_print) printf("- %s start\n", self->t_name); for (u_int i=0; i<data->xs_interations; i++) { if (data->xs_progress && ((i+1) % data->xs_progress) == 0) printf("- %s %d\n", self->t_name, i+1); if (data->xs_which_lock == LOCK_PTHREAD_RW) { xt_xlock_rwlock_ns(&data->xs_plock); lck_do_job(self, data->xs_which_job, data, FALSE); xt_unlock_rwlock_ns(&data->xs_plock); } else if (data->xs_which_lock == LOCK_RWMUTEX) { xt_rwmutex_xlock(&data->xs_lock, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_rwmutex_unlock(&data->xs_lock, self->t_id); } else if (data->xs_which_lock == LOCK_SPINXSLOCK) { xt_spinxslock_xlock(&data->xs_spinrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_spinxslock_unlock(&data->xs_spinrwlock, TRUE); } else if (data->xs_which_lock == LOCK_XSMUTEX) { xt_xsmutex_xlock(&data->xs_fastrwlock, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_xsmutex_unlock(&data->xs_fastrwlock, self->t_id); } else if (data->xs_which_lock == LOCK_ATOMICRWLOCK) { xt_atomicrwlock_xlock(&data->xs_atomicrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_atomicrwlock_unlock(&data->xs_atomicrwlock, TRUE); } else if (data->xs_which_lock == LOCK_SKEWRWLOCK) { xt_skewrwlock_xlock(&data->xs_skewrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_skewrwlock_unlock(&data->xs_skewrwlock, TRUE); } else ASSERT(FALSE); } if (data->xs_debug_print) printf("- %s stop\n", self->t_name); return NULL; } static void lck_print_test(XSLockTestRec *data) { switch (data->xs_which_lock) { case LOCK_PTHREAD_RW: printf("pthread read/write"); break; case LOCK_PTHREAD_MUTEX: printf("pthread mutex"); break; case LOCK_RWMUTEX: printf("fast read/write mutex"); break; case LOCK_SPINLOCK: printf("spin mutex"); break; case LOCK_FASTLOCK: printf("fast mutex"); break; case LOCK_SPINXSLOCK: printf("spin read/write lock"); break; case LOCK_XSMUTEX: printf("fast x/s mutex"); break; case LOCK_ATOMICRWLOCK: printf("atomic read/write lock"); break; case LOCK_SKEWRWLOCK: printf("skew read/write lock"); break; } switch (data->xs_which_job) { case JOB_MEMCPY: printf(" MEMCPY 2K"); break; case JOB_SLEEP: printf(" SLEEP 1/1000s"); break; case JOB_PRINT: printf(" PRINT DEBUG"); break; case JOB_INCREMENT: printf(" INCREMENT"); break; case JOB_SNOOZE: printf(" SLEEP 1/100s"); break; } printf(" %d interations", data->xs_interations); } static void *lck_run_mutex_locker(XTThreadPtr self) { XSLockTestRec *data = (XSLockTestRec *) self->t_data; if (data->xs_debug_print) printf("- %s start\n", self->t_name); for (u_int i=0; i<data->xs_interations; i++) { if (data->xs_progress && ((i+1) % data->xs_progress) == 0) printf("- %s %d\n", self->t_name, i+1); if (data->xs_which_lock == LOCK_PTHREAD_MUTEX) { xt_lock_mutex_ns(&data->xs_mutex); lck_do_job(self, data->xs_which_job, data, FALSE); xt_unlock_mutex_ns(&data->xs_mutex); } else if (data->xs_which_lock == LOCK_SPINLOCK) { xt_spinlock_lock(&data->xs_spinlock); lck_do_job(self, data->xs_which_job, data, FALSE); xt_spinlock_unlock(&data->xs_spinlock); } else if (data->xs_which_lock == LOCK_FASTLOCK) { xt_fastlock_lock(&data->xs_fastlock, self); lck_do_job(self, data->xs_which_job, data, FALSE); xt_fastlock_unlock(&data->xs_fastlock, self); } else ASSERT(FALSE); } if (data->xs_debug_print) printf("- %s stop\n", self->t_name); return NULL; } typedef struct LockThread { xtThreadID id; XTThreadPtr ptr; } LockThreadRec, *LockThreadPtr; static void lck_reader_writer_test(XTThreadPtr self, XSLockTestRec *data, int reader_cnt, int writer_cnt) { xtWord8 start; LockThreadPtr threads; int thread_cnt = reader_cnt + writer_cnt; char buffer[40]; //XTThreadPtr dumper = xt_create_daemon(self, "DUMPER"); //xt_run_thread(self, dumper, lck_run_dumper); printf("READ/WRITE TEST: "); lck_print_test(data); printf(", %d readers, %d writers\n", reader_cnt, writer_cnt); threads = (LockThreadPtr) xt_malloc(self, thread_cnt * sizeof(LockThreadRec)); for (int i=0; i<thread_cnt; i++) { sprintf(buffer, "%s%d", i < reader_cnt ? "READER-" : "WRITER-", i+1); threads[i].ptr = xt_create_daemon(self, buffer); threads[i].id = threads[i].ptr->t_id; xt_set_thread_data(threads[i].ptr, data, lck_free_thread_data); } start = xt_trace_clock(); for (int i=0; i<reader_cnt; i++) xt_run_thread(self, threads[i].ptr, lck_run_reader); for (int i=reader_cnt; i<thread_cnt; i++) xt_run_thread(self, threads[i].ptr, lck_run_writer); for (int i=0; i<thread_cnt; i++) xt_wait_for_thread(threads[i].id, TRUE); printf("----- %d reader, %d writer time=%s\n", reader_cnt, writer_cnt, xt_trace_clock_diff(buffer, start)); xt_free(self, threads); printf("TEST RESULT = %d\n", data->xs_inc); //xt_wait_for_thread(dumper, TRUE); } static void lck_mutex_lock_test(XTThreadPtr self, XSLockTestRec *data, int thread_cnt) { xtWord8 start; LockThreadPtr threads; char buffer[40]; printf("LOCK MUTEX TEST: "); lck_print_test(data); printf(", %d threads\n", thread_cnt); threads = (LockThreadPtr) xt_malloc(self, thread_cnt * sizeof(LockThreadRec)); for (int i=0; i<thread_cnt; i++) { sprintf(buffer, "THREAD%d", i+1); threads[i].ptr = xt_create_daemon(self, buffer); threads[i].id = threads[i].ptr->t_id; xt_set_thread_data(threads[i].ptr, data, lck_free_thread_data); } start = xt_trace_clock(); for (int i=0; i<thread_cnt; i++) xt_run_thread(self, threads[i].ptr, lck_run_mutex_locker); for (int i=0; i<thread_cnt; i++) xt_wait_for_thread(threads[i].id, TRUE); printf("----- %d threads time=%s\n", thread_cnt, xt_trace_clock_diff(buffer, start)); xt_free(self, threads); printf("TEST RESULT = %d\n", data->xs_inc); } xtPublic void xt_unit_test_read_write_locks(XTThreadPtr self) { XSLockTestRec data; memset(&data, 0, sizeof(data)); printf("TEST: xt_unit_test_read_write_locks\n"); printf("size of XTXSMutexRec = %d\n", (int) sizeof(XTXSMutexRec)); printf("size of pthread_cond_t = %d\n", (int) sizeof(pthread_cond_t)); printf("size of pthread_mutex_t = %d\n", (int) sizeof(pthread_mutex_t)); xt_rwmutex_init_with_autoname(self, &data.xs_lock); xt_init_rwlock_with_autoname(self, &data.xs_plock); xt_spinxslock_init_with_autoname(self, &data.xs_spinrwlock); xt_xsmutex_init_with_autoname(self, &data.xs_fastrwlock); xt_atomicrwlock_init_with_autoname(self, &data.xs_atomicrwlock); xt_skewrwlock_init_with_autoname(self, &data.xs_skewrwlock); /** data.xs_interations = 10; data.xs_which_lock = LOCK_RWMUTEX; // LOCK_PTHREAD_RW, LOCK_RWMUTEX, LOCK_SPINXSLOCK, LOCK_XSMUTEX data.xs_which_job = JOB_PRINT; data.xs_debug_print = TRUE; data.xs_progress = 0; lck_reader_writer_test(self, &data, 4, 0); lck_reader_writer_test(self, &data, 0, 2); lck_reader_writer_test(self, &data, 1, 1); lck_reader_writer_test(self, &data, 4, 2); **/ /** data.xs_interations = 4000; data.xs_which_lock = LOCK_RWMUTEX; // LOCK_PTHREAD_RW, LOCK_RWMUTEX, LOCK_SPINXSLOCK, LOCK_XSMUTEX data.xs_which_job = JOB_SLEEP; data.xs_debug_print = TRUE; data.xs_progress = 200; lck_reader_writer_test(self, &data, 4, 0); lck_reader_writer_test(self, &data, 0, 2); lck_reader_writer_test(self, &data, 1, 1); lck_reader_writer_test(self, &data, 4, 2); **/ // LOCK_PTHREAD_RW, LOCK_RWMUTEX, LOCK_SPINXSLOCK, LOCK_XSMUTEX, LOCK_ATOMICRWLOCK, LOCK_SKEWRWLOCK /**/ data.xs_interations = 100000; data.xs_which_lock = LOCK_XSMUTEX; data.xs_which_job = JOB_DOUBLE_INC; // JOB_INCREMENT, JOB_DOUBLE_INC data.xs_debug_print = FALSE; data.xs_progress = 0; lck_reader_writer_test(self, &data, 10, 0); data.xs_which_lock = LOCK_XSMUTEX; lck_reader_writer_test(self, &data, 10, 0); //lck_reader_writer_test(self, &data, 0, 5); //lck_reader_writer_test(self, &data, 10, 0); //lck_reader_writer_test(self, &data, 10, 5); /**/ /**/ data.xs_interations = 10000; data.xs_which_lock = LOCK_XSMUTEX; data.xs_which_job = JOB_MEMCPY; data.xs_debug_print = FALSE; data.xs_progress = 0; lck_reader_writer_test(self, &data, 10, 0); data.xs_which_lock = LOCK_XSMUTEX; lck_reader_writer_test(self, &data, 10, 0); //lck_reader_writer_test(self, &data, 0, 5); //lck_reader_writer_test(self, &data, 10, 0); //lck_reader_writer_test(self, &data, 10, 5); /**/ /**/ data.xs_interations = 1000; data.xs_which_lock = LOCK_XSMUTEX; data.xs_which_job = JOB_SLEEP; // JOB_SLEEP, JOB_SNOOZE data.xs_debug_print = FALSE; data.xs_progress = 0; lck_reader_writer_test(self, &data, 10, 0); data.xs_which_lock = LOCK_XSMUTEX; lck_reader_writer_test(self, &data, 10, 0); /**/ xt_rwmutex_free(self, &data.xs_lock); xt_free_rwlock(&data.xs_plock); xt_spinxslock_free(self, &data.xs_spinrwlock); xt_xsmutex_free(self, &data.xs_fastrwlock); xt_atomicrwlock_free(self, &data.xs_atomicrwlock); xt_skewrwlock_free(self, &data.xs_skewrwlock); } xtPublic void xt_unit_test_mutex_locks(XTThreadPtr self) { XSLockTestRec data; memset(&data, 0, sizeof(data)); printf("TEST: xt_unit_test_mutex_locks\n"); xt_spinlock_init_with_autoname(self, &data.xs_spinlock); xt_fastlock_init_with_autoname(self, &data.xs_fastlock); xt_init_mutex_with_autoname(self, &data.xs_mutex); /**/ data.xs_interations = 10; data.xs_which_lock = LOCK_SPINLOCK; // LOCK_SPINLOCK, LOCK_PTHREAD_MUTEX, LOCK_FASTLOCK data.xs_which_job = JOB_PRINT; data.xs_debug_print = TRUE; data.xs_progress = 0; data.xs_inc = 0; lck_mutex_lock_test(self, &data, 2); /**/ /**/ data.xs_interations = 100000; data.xs_which_lock = LOCK_SPINLOCK; // LOCK_SPINLOCK, LOCK_PTHREAD_MUTEX, LOCK_FASTLOCK data.xs_which_job = JOB_INCREMENT; data.xs_debug_print = FALSE; data.xs_progress = 0; data.xs_inc = 0; lck_mutex_lock_test(self, &data, 10); /**/ /**/ data.xs_interations = 10000; data.xs_which_lock = LOCK_SPINLOCK; // LOCK_SPINLOCK, LOCK_PTHREAD_MUTEX, LOCK_FASTLOCK data.xs_which_job = JOB_MEMCPY; data.xs_debug_print = FALSE; data.xs_progress = 0; data.xs_inc = 0; lck_mutex_lock_test(self, &data, 10); /**/ /**/ data.xs_interations = 1000; data.xs_which_lock = LOCK_FASTLOCK; // LOCK_SPINLOCK, LOCK_PTHREAD_MUTEX, LOCK_FASTLOCK data.xs_which_job = JOB_SLEEP; data.xs_debug_print = FALSE; data.xs_progress = 0; data.xs_inc = 0; lck_mutex_lock_test(self, &data, 10); /**/ /**/ data.xs_interations = 100; data.xs_which_lock = LOCK_FASTLOCK; // LOCK_SPINLOCK, LOCK_PTHREAD_MUTEX, LOCK_FASTLOCK data.xs_which_job = JOB_SNOOZE; data.xs_debug_print = FALSE; data.xs_progress = 0; data.xs_inc = 0; lck_mutex_lock_test(self, &data, 10); /**/ xt_spinlock_free(self, &data.xs_spinlock); xt_fastlock_free(self, &data.xs_fastlock); xt_free_mutex(&data.xs_mutex); } xtPublic void xt_unit_test_create_threads(XTThreadPtr self) { XTThreadPtr threads[10]; printf("TEST: xt_unit_test_create_threads\n"); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Create some threads: */ threads[0] = xt_create_daemon(self, "test0"); printf("thread = %d\n", threads[0]->t_id); threads[1] = xt_create_daemon(self, "test1"); printf("thread = %d\n", threads[1]->t_id); threads[2] = xt_create_daemon(self, "test2"); printf("thread = %d\n", threads[2]->t_id); threads[3] = xt_create_daemon(self, "test3"); printf("thread = %d\n", threads[3]->t_id); threads[4] = xt_create_daemon(self, "test4"); printf("thread = %d\n", threads[4]->t_id); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Max stays the same: */ xt_free_thread(threads[3]); xt_free_thread(threads[2]); xt_free_thread(threads[1]); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Fill in the gaps: */ threads[1] = xt_create_daemon(self, "test1"); printf("thread = %d\n", threads[1]->t_id); threads[2] = xt_create_daemon(self, "test2"); printf("thread = %d\n", threads[2]->t_id); threads[3] = xt_create_daemon(self, "test3"); printf("thread = %d\n", threads[3]->t_id); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* And add one: */ threads[5] = xt_create_daemon(self, "test5"); printf("thread = %d\n", threads[5]->t_id); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Max stays the same: */ xt_free_thread(threads[3]); xt_free_thread(threads[2]); xt_free_thread(threads[1]); xt_free_thread(threads[4]); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Recalculate the max: */ xt_free_thread(threads[5]); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); /* Fill in the gaps: */ threads[1] = xt_create_daemon(self, "test1"); printf("thread = %d\n", threads[1]->t_id); threads[2] = xt_create_daemon(self, "test2"); printf("thread = %d\n", threads[2]->t_id); threads[3] = xt_create_daemon(self, "test3"); printf("thread = %d\n", threads[3]->t_id); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); xt_free_thread(threads[3]); xt_free_thread(threads[2]); xt_free_thread(threads[1]); xt_free_thread(threads[0]); printf("current max threads = %d, in use = %d\n", xt_thr_current_max_threads, xt_thr_current_thread_count); } #ifdef UNUSED_CODE int XTRowLocks::xt_release_locks(struct XTOpenTable *ot, xtRowID row, XTRowLockListPtr lock_list) { if (ot->ot_temp_row_lock) xt_make_lock_permanent(ot, lock_list); if (!lock_list->bl_count) return XT_NO_LOCK; int group, pgroup; XTXactDataPtr xact; xtTableID tab_id, ptab_id; XTPermRowLockPtr plock; XTOpenTablePtr pot = NULL; XTRowLocksPtr row_locks; /* Do I have the lock? */ group = row % XT_ROW_LOCK_COUNT; if (!(xact = tab_row_locks[group])) /* There is no lock: */ return XT_NO_LOCK; if (xact != ot->ot_thread->st_xact_data) /* There is a lock but it does not belong to me! */ return XT_NO_LOCK; tab_id = ot->ot_table->tab_id; plock = (XTPermRowLockPtr) &lock_list->bl_data[lock_list->bl_count * lock_list->bl_item_size]; lock_list->rll_release_point = lock_list->bl_count; for (u_int i=0; i<lock_list->bl_count; i++) { plock--; pgroup = plock->pr_group; ptab_id = plock->pr_tab_id; if (ptab_id == tab_id) row_locks = this; else { if (pot) { if (pot->ot_table->tab_id == ptab_id) goto remove_lock; xt_db_return_table_to_pool_ns(pot); pot = NULL; } if (!xt_db_open_pool_table_ns(&pot, ot->ot_table->tab_db, tab_id)) { /* Should not happen, but just in case, we just don't * remove the lock. We will probably end up with a deadlock * somewhere. */ xt_log_and_clear_exception_ns(); goto skip_remove_lock; } if (!pot) /* Can happen of the table has been dropped: */ goto skip_remove_lock; remove_lock: row_locks = &pot->ot_table->tab_locks; } #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "release lock group=%d\n", pgroup); #endif row_locks->tab_row_locks[pgroup] = NULL; row_locks->tab_lock_perm[pgroup] = 0; skip_remove_lock:; lock_list->rll_release_point--; if (tab_id == ptab_id && group == pgroup) break; } if (pot) xt_db_return_table_to_pool_ns(pot); return XT_PERM_LOCK; } xtBool XTRowLocks::xt_regain_locks(struct XTOpenTable *ot, int *lock_type, xtXactID *xn_id, XTRowLockListPtr lock_list) { int group; XTXactDataPtr xact, my_xact; XTPermRowLockPtr plock; xtTableID tab_id; XTOpenTablePtr pot = NULL; XTRowLocksPtr row_locks = NULL; XTTableHPtr tab = NULL; for (u_int i=lock_list->rll_release_point; i<lock_list->bl_count; i++) { plock = (XTPermRowLockPtr) &lock_list->bl_data[i * lock_list->bl_item_size]; my_xact = ot->ot_thread->st_xact_data; group = plock->pr_group; tab_id = plock->pr_tab_id; if (tab_id == ot->ot_table->tab_id) { row_locks = this; tab = ot->ot_table; } else { if (pot) { if (tab_id == pot->ot_table->tab_id) goto gain_lock; xt_db_return_table_to_pool_ns(pot); pot = NULL; } if (!xt_db_open_pool_table_ns(&pot, ot->ot_table->tab_db, tab_id)) return FAILED; if (!pot) goto no_gain_lock; gain_lock: tab = pot->ot_table; row_locks = &tab->tab_locks; no_gain_lock:; } #ifdef XT_TRACE_LOCKS xt_ttracef(xt_get_self(), "regain lock group=%d\n", group); #endif XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[group % XT_ROW_RWLOCKS], ot->ot_thread); if ((xact = row_locks->tab_row_locks[group])) { if (xact != my_xact) { *xn_id = xact->xd_start_xn_id; *lock_type = row_locks->tab_lock_perm[group] ? XT_PERM_LOCK : XT_TEMP_LOCK; goto done; } } else row_locks->tab_row_locks[group] = my_xact; row_locks->tab_lock_perm[group] = 1; XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[group % XT_ROW_RWLOCKS], ot->ot_thread); lock_list->rll_release_point++; } *lock_type = XT_NO_LOCK; return OK; done: XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[group % XT_ROW_RWLOCKS], ot->ot_thread); return OK; } #endif