diff --git a/BitKeeper/etc/logging_ok b/BitKeeper/etc/logging_ok
index 84f6b60f4d964b11badca7d7ccc20a060a20316c..e294d969c2f50fbbf17d29d389d573327ef58a45 100644
--- a/BitKeeper/etc/logging_ok
+++ b/BitKeeper/etc/logging_ok
@@ -104,6 +104,7 @@ lenz@mysql.com
 magnus@neptunus.(none)
 magnus@shellback.(none)
 marko@hundin.mysql.fi
+mats@mysql.com
 matt@mysql.com
 matthias@three.local.lan
 miguel@hegel.(none)
diff --git a/innobase/buf/buf0buf.c b/innobase/buf/buf0buf.c
index d686b559528a1654b30067ca2cc42e189ae3c334..2f8ce7507baf19fde6d56fcebc5db3145625260d 100644
--- a/innobase/buf/buf0buf.c
+++ b/innobase/buf/buf0buf.c
@@ -548,8 +548,9 @@ buf_pool_init(
 		}
 		/*----------------------------------------*/
 	} else {
-		buf_pool->frame_mem = ut_malloc(
-					UNIV_PAGE_SIZE * (n_frames + 1));
+		buf_pool->frame_mem = ut_malloc_low(
+					UNIV_PAGE_SIZE * (n_frames + 1),
+					TRUE, FALSE);
 	}
 
 	if (buf_pool->frame_mem == NULL) {
diff --git a/innobase/buf/buf0lru.c b/innobase/buf/buf0lru.c
index 42e3b363ced02666f685d55a3bd16d0e175d41ff..985426a9e2b608081bf978c9ad2182ed75bbfa38 100644
--- a/innobase/buf/buf0lru.c
+++ b/innobase/buf/buf0lru.c
@@ -42,6 +42,10 @@ initial segment in buf_LRU_get_recent_limit */
 
 #define BUF_LRU_INITIAL_RATIO	8
 
+/* If we switch on the InnoDB monitor because there are too few available
+frames in the buffer pool, we set this to TRUE */
+ibool	buf_lru_switched_on_innodb_mon	= FALSE;
+
 /**********************************************************************
 Takes a block out of the LRU list and page hash table and sets the block
 state to BUF_BLOCK_REMOVE_HASH. */
@@ -287,6 +291,32 @@ buf_LRU_try_free_flushed_blocks(void)
 	mutex_exit(&(buf_pool->mutex));
 }	
 
+/**********************************************************************
+Returns TRUE if less than 15 % of the buffer pool is available. This can be
+used in heuristics to prevent huge transactions eating up the whole buffer
+pool for their locks. */
+
+ibool
+buf_LRU_buf_pool_running_out(void)
+/*==============================*/
+				/* out: TRUE if less than 15 % of buffer pool
+				left */
+{
+	ibool	ret	= FALSE;
+
+	mutex_enter(&(buf_pool->mutex));
+
+	if (!recv_recovery_on && UT_LIST_GET_LEN(buf_pool->free)
+	   + UT_LIST_GET_LEN(buf_pool->LRU) < buf_pool->max_size / 7) {
+		
+		ret = TRUE;
+	}
+
+	mutex_exit(&(buf_pool->mutex));
+
+	return(ret);
+}
+
 /**********************************************************************
 Returns a free block from buf_pool. The block is taken off the free list.
 If it is empty, blocks are moved from the end of the LRU list to the free
@@ -325,7 +355,8 @@ buf_LRU_get_free_block(void)
 	   
 	} else if (!recv_recovery_on && UT_LIST_GET_LEN(buf_pool->free)
 	   + UT_LIST_GET_LEN(buf_pool->LRU) < buf_pool->max_size / 5) {
-		if (!srv_print_innodb_monitor) {
+
+		if (!buf_lru_switched_on_innodb_mon) {
 
 	   		/* Over 80 % of the buffer pool is occupied by lock
 			heaps or the adaptive hash index. This may be a memory
@@ -342,16 +373,18 @@ buf_LRU_get_free_block(void)
 "InnoDB: lock heap and hash index sizes.\n",
 			(ulong) (buf_pool->curr_size / (1024 * 1024 / UNIV_PAGE_SIZE)));
 
+			buf_lru_switched_on_innodb_mon = TRUE;
 			srv_print_innodb_monitor = TRUE;
 			os_event_set(srv_lock_timeout_thread_event);
 		}
-	} else if (!recv_recovery_on && UT_LIST_GET_LEN(buf_pool->free)
-	   + UT_LIST_GET_LEN(buf_pool->LRU) < buf_pool->max_size / 4) {
+	} else if (buf_lru_switched_on_innodb_mon) {
 
 		/* Switch off the InnoDB Monitor; this is a simple way
 		to stop the monitor if the situation becomes less urgent,
-		but may also surprise users! */
+		but may also surprise users if the user also switched on the
+		monitor! */
 
+		buf_lru_switched_on_innodb_mon = FALSE;
 		srv_print_innodb_monitor = FALSE;
 	}
 	
diff --git a/innobase/include/buf0lru.h b/innobase/include/buf0lru.h
index 69a376f8caba67da9fe62a99b00fa8817bdc8950..45164dd561e06ef3915b201ccabe1aa98543a121 100644
--- a/innobase/include/buf0lru.h
+++ b/innobase/include/buf0lru.h
@@ -25,6 +25,16 @@ wasted. */
 void
 buf_LRU_try_free_flushed_blocks(void);
 /*==================================*/
+/**********************************************************************
+Returns TRUE if less than 15 % of the buffer pool is available. This can be
+used in heuristics to prevent huge transactions eating up the whole buffer
+pool for their locks. */
+
+ibool
+buf_LRU_buf_pool_running_out(void);
+/*==============================*/
+				/* out: TRUE if less than 15 % of buffer pool
+				left */
 
 /*#######################################################################
 These are low-level functions
diff --git a/innobase/include/db0err.h b/innobase/include/db0err.h
index be7667bfd0c7b2a1d83df17698f8afcf1456a6f6..de5ac44e73f4a3e332439c898fe509086f54fdf0 100644
--- a/innobase/include/db0err.h
+++ b/innobase/include/db0err.h
@@ -53,7 +53,11 @@ Created 5/24/1996 Heikki Tuuri
 					name already exists */
 #define DB_TABLESPACE_DELETED	44	/* tablespace does not exist or is
 					being dropped right now */
-					
+#define	DB_LOCK_TABLE_FULL	45	/* lock structs have exhausted the
+					buffer pool (for big transactions,
+					InnoDB stores the lock structs in the
+					buffer pool) */
+
 /* The following are partial failure codes */
 #define DB_FAIL 		1000
 #define DB_OVERFLOW 		1001
diff --git a/innobase/include/row0sel.h b/innobase/include/row0sel.h
index bb6fb70ca86602667b9347ea529792ad2137e3f1..8d5187bfc1c3d9e303f1840593b5a205e19f0422 100644
--- a/innobase/include/row0sel.h
+++ b/innobase/include/row0sel.h
@@ -120,6 +120,7 @@ row_search_for_mysql(
 					/* out: DB_SUCCESS,
 					DB_RECORD_NOT_FOUND, 
 					DB_END_OF_INDEX, DB_DEADLOCK,
+					DB_LOCK_TABLE_FULL,
 					or DB_TOO_BIG_RECORD */
 	byte*		buf,		/* in/out: buffer for the fetched
 					row in the MySQL format */
diff --git a/innobase/include/ut0mem.h b/innobase/include/ut0mem.h
index 73ecb25101a3423a1cfcf65fabf39b2bf7b0ec56..74357f6bf1301a52ab31e43f851898cbb2a74c76 100644
--- a/innobase/include/ut0mem.h
+++ b/innobase/include/ut0mem.h
@@ -38,8 +38,10 @@ ut_malloc_low(
 /*==========*/
 	                     /* out, own: allocated memory */
         ulint   n,           /* in: number of bytes to allocate */
-	ibool   set_to_zero); /* in: TRUE if allocated memory should be set
+	ibool   set_to_zero, /* in: TRUE if allocated memory should be set
 			     to zero if UNIV_SET_MEM_TO_ZERO is defined */
+	ibool	assert_on_error); /* in: if TRUE, we crash mysqld if the memory
+				cannot be allocated */
 /**************************************************************************
 Allocates memory. Sets it also to zero if UNIV_SET_MEM_TO_ZERO is
 defined. */
diff --git a/innobase/mem/mem0pool.c b/innobase/mem/mem0pool.c
index 023369e8ec51a4c3b6abbe7c39864bc080111f12..cb891a030924b3072d1a4b3bb8541d5dede37ab7 100644
--- a/innobase/mem/mem0pool.c
+++ b/innobase/mem/mem0pool.c
@@ -199,7 +199,7 @@ mem_pool_create(
 	but only when allocated at a higher level in mem0mem.c.
 	This is to avoid masking useful Purify warnings. */
 
-	pool->buf = ut_malloc_low(size, FALSE);
+	pool->buf = ut_malloc_low(size, FALSE, TRUE);
 	pool->size = size;
 
 	mutex_create(&(pool->mutex));
diff --git a/innobase/row/row0mysql.c b/innobase/row/row0mysql.c
index 6ac8c943dc678ed2bffdaa2081a33b829df786fe..0de4b189493e7276a1ba4e57da1d0c42a3f09e25 100644
--- a/innobase/row/row0mysql.c
+++ b/innobase/row/row0mysql.c
@@ -308,7 +308,8 @@ row_mysql_handle_errors(
 
 		return(TRUE);
 
-	} else if (err == DB_DEADLOCK || err == DB_LOCK_WAIT_TIMEOUT) {
+	} else if (err == DB_DEADLOCK || err == DB_LOCK_WAIT_TIMEOUT
+		   || err == DB_LOCK_TABLE_FULL) {
 		/* Roll back the whole transaction; this resolution was added
 		to version 3.23.43 */
 
diff --git a/innobase/row/row0sel.c b/innobase/row/row0sel.c
index 26d26ca323c7cec3a27591fc8651ed5f17b3f882..27470df81c5e073d989d792d95e9fccea3f6d7ae 100644
--- a/innobase/row/row0sel.c
+++ b/innobase/row/row0sel.c
@@ -730,8 +730,18 @@ sel_set_rec_lock(
 	ulint		type, 	/* in: LOCK_ORDINARY, LOCK_GAP, or LOC_REC_NOT_GAP */
 	que_thr_t*	thr)	/* in: query thread */	
 {
+	trx_t*	trx;
 	ulint	err;
 
+	trx = thr_get_trx(thr);	
+
+	if (UT_LIST_GET_LEN(trx->trx_locks) > 10000) {
+		if (buf_LRU_buf_pool_running_out()) {
+			
+			return(DB_LOCK_TABLE_FULL);
+		}
+	}
+
 	if (index->type & DICT_CLUSTERED) {
 		err = lock_clust_rec_read_check_and_lock(0, rec, index, mode,
 							type, thr);
@@ -2790,6 +2800,7 @@ row_search_for_mysql(
 					/* out: DB_SUCCESS,
 					DB_RECORD_NOT_FOUND, 
 					DB_END_OF_INDEX, DB_DEADLOCK,
+					DB_LOCK_TABLE_FULL,
 					or DB_TOO_BIG_RECORD */
 	byte*		buf,		/* in/out: buffer for the fetched
 					row in the MySQL format */
diff --git a/innobase/srv/srv0start.c b/innobase/srv/srv0start.c
index 9709f5235de071148fcfcad5c6536540c0c27938..69341a1d7d182ec158305b902ca6176f41e96f32 100644
--- a/innobase/srv/srv0start.c
+++ b/innobase/srv/srv0start.c
@@ -1172,6 +1172,9 @@ NetWare. */
 	}
 
 	if (ret == NULL) {
+		fprintf(stderr,
+"InnoDB: Fatal error: cannot allocate the memory for the buffer pool\n");
+
 		return(DB_ERROR);
 	}
 
diff --git a/innobase/ut/ut0mem.c b/innobase/ut/ut0mem.c
index a6002d7fd83ac61f6b57b7943d2ad00d943dd97e..6ed61b0b5deb1f5395d66e22b9ddd7e09edab8e8 100644
--- a/innobase/ut/ut0mem.c
+++ b/innobase/ut/ut0mem.c
@@ -61,8 +61,10 @@ ut_malloc_low(
 /*==========*/
 	                     /* out, own: allocated memory */
         ulint   n,           /* in: number of bytes to allocate */
-	ibool   set_to_zero) /* in: TRUE if allocated memory should be set
+	ibool   set_to_zero, /* in: TRUE if allocated memory should be set
 			     to zero if UNIV_SET_MEM_TO_ZERO is defined */
+	ibool	assert_on_error) /* in: if TRUE, we crash mysqld if the memory
+				cannot be allocated */
 {
 	void*	ret;
 
@@ -86,9 +88,7 @@ ut_malloc_low(
 		"InnoDB: Check if you should increase the swap file or\n"
 		"InnoDB: ulimits of your operating system.\n"
 		"InnoDB: On FreeBSD check you have compiled the OS with\n"
-		"InnoDB: a big enough maximum process size.\n"
-		"InnoDB: We now intentionally generate a seg fault so that\n"
-		"InnoDB: on Linux we get a stack trace.\n",
+		"InnoDB: a big enough maximum process size.\n",
 		                  (ulong) n, (ulong) ut_total_allocated_memory,
 #ifdef __WIN__
 			(ulong) GetLastError()
@@ -110,7 +110,15 @@ ut_malloc_low(
 		/* Intentional segfault on NetWare causes an abend. Avoid this 
 		by graceful exit handling in ut_a(). */
 #if (!defined __NETWARE__) 
-		if (*ut_mem_null_ptr) ut_mem_null_ptr = 0;
+		if (assert_on_error) {
+			fprintf(stderr,
+		"InnoDB: We now intentionally generate a seg fault so that\n"
+		"InnoDB: on Linux we get a stack trace.\n");
+
+			if (*ut_mem_null_ptr) ut_mem_null_ptr = 0;
+		} else {
+			return(NULL);
+		}
 #else
 		ut_a(0);
 #endif
@@ -144,7 +152,7 @@ ut_malloc(
 	                /* out, own: allocated memory */
         ulint   n)      /* in: number of bytes to allocate */
 {
-        return(ut_malloc_low(n, TRUE));
+        return(ut_malloc_low(n, TRUE, TRUE));
 }
 
 /**************************************************************************
diff --git a/mysql-test/r/rpl_start_stop_slave.result b/mysql-test/r/rpl_start_stop_slave.result
new file mode 100644
index 0000000000000000000000000000000000000000..1b4d87124d18b28ed5639d18c8920b8e04c9cea6
--- /dev/null
+++ b/mysql-test/r/rpl_start_stop_slave.result
@@ -0,0 +1,12 @@
+slave stop;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+slave start;
+stop slave;
+create table t1(n int);
+start slave;
+stop slave io_thread;
+start slave io_thread;
+drop table t1;
diff --git a/mysql-test/t/rpl_start_stop_slave.test b/mysql-test/t/rpl_start_stop_slave.test
new file mode 100644
index 0000000000000000000000000000000000000000..903ff204194b43ee1670bc6194434392b2b7f687
--- /dev/null
+++ b/mysql-test/t/rpl_start_stop_slave.test
@@ -0,0 +1,34 @@
+source include/master-slave.inc;
+
+#
+# Bug#6148 ()
+#
+connection slave;
+stop slave;
+
+# Let the master do lots of insertions
+connection master;
+create table t1(n int);
+let $1=5000;
+disable_query_log;
+while ($1)
+{
+ eval insert into t1 values($1);
+ dec $1;
+}
+enable_query_log;
+save_master_pos;
+
+connection slave;
+start slave;
+sleep 1;
+stop slave io_thread;
+start slave io_thread;
+sync_with_master;
+
+connection master;
+drop table t1;
+save_master_pos;
+
+connection slave;
+sync_with_master;
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index ca3a2a2186da324567f26dfa99b29c125cfab3ea..831d14eac525625b43f20ce2d08753a55032f054 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -379,18 +379,30 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
       setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
 	       "Unable to connect with connect string: %s",
 	       cfg.makeConnectString(buf,sizeof(buf)));
+      if (verbose == -2)
+	ndbout << ", failed." << endl;
       return -1;
     }
     if (verbose == -1) {
-      ndbout << "retrying every " << retry_delay_in_seconds << " seconds:";
+      ndbout << "Retrying every " << retry_delay_in_seconds << " seconds";
+      if (no_retries > 0)
+	ndbout << ". Attempts left:";
+      else
+	ndbout << ", until connected.";;	
+      ndbout << flush;
       verbose= -2;
     }
-    NdbSleep_SecSleep(retry_delay_in_seconds);
-    if (verbose == -2) {
-      ndbout << " " << no_retries;
+    if (no_retries > 0) {
+      if (verbose == -2) {
+	ndbout << " " << no_retries;
+	ndbout << flush;
+      }
+      no_retries--;
     }
-    no_retries--;
+    NdbSleep_SecSleep(retry_delay_in_seconds);
   }
+  if (verbose == -2)
+    ndbout << endl;
 
   handle->cfg_i = i;
 
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp
index 54beaa49d3fed0185338adbb019ec385a4609a02..bfe8b6786b43f992535a830a8e1a476da58e5523 100644
--- a/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -44,7 +44,7 @@ class CommandInterpreter {
    *   Constructor
    *   @param mgmtSrvr: Management server to use when executing commands
    */
-  CommandInterpreter(const char *);
+  CommandInterpreter(const char *, int verbose);
   ~CommandInterpreter();
   
   /**
@@ -94,6 +94,7 @@ class CommandInterpreter {
    */
   void executeHelp(char* parameters);
   void executeShow(char* parameters);
+  void executeConnect(char* parameters);
   void executePurge(char* parameters);
   void executeShutdown(char* parameters);
   void executeRun(char* parameters);
@@ -153,6 +154,7 @@ class CommandInterpreter {
 
   NdbMgmHandle m_mgmsrv;
   bool connected;
+  int m_verbose;
   int try_reconnect;
 #ifdef HAVE_GLOBAL_REPLICATION  
   NdbRepHandle m_repserver;
@@ -169,9 +171,9 @@ class CommandInterpreter {
 #include "ndb_mgmclient.hpp"
 #include "ndb_mgmclient.h"
 
-Ndb_mgmclient::Ndb_mgmclient(const char *host)
+Ndb_mgmclient::Ndb_mgmclient(const char *host,int verbose)
 {
-  m_cmd= new CommandInterpreter(host);
+  m_cmd= new CommandInterpreter(host,verbose);
 }
 Ndb_mgmclient::~Ndb_mgmclient()
 {
@@ -275,6 +277,7 @@ static const char* helpText =
 "REP CONNECT <host:port>                Connect to REP server on host:port\n"
 #endif
 "PURGE STALE SESSIONS                   Reset reserved nodeid's in the mgmt server\n"
+"CONNECT                                Connect to management server (reconnect if already connected)\n"
 "QUIT                                   Quit management client\n"
 ;
 
@@ -373,7 +376,8 @@ convert(const char* s, int& val) {
 /*
  * Constructor
  */
-CommandInterpreter::CommandInterpreter(const char *_host) 
+CommandInterpreter::CommandInterpreter(const char *_host,int verbose) 
+  : m_verbose(verbose)
 {
   m_mgmsrv = ndb_mgm_create_handle();
   if(m_mgmsrv == NULL) {
@@ -437,7 +441,15 @@ CommandInterpreter::connect()
 {
   if(!connected) {
     if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
+    {
       connected = true;
+      if (m_verbose)
+      {
+	printf("Connected to Management Server at: %s:%d\n",
+	       ndb_mgm_get_connected_host(m_mgmsrv),
+	       ndb_mgm_get_connected_port(m_mgmsrv));
+      }
+    }
   }
   return connected;
 }
@@ -445,7 +457,7 @@ CommandInterpreter::connect()
 bool 
 CommandInterpreter::disconnect() 
 {
-  if (ndb_mgm_disconnect(m_mgmsrv) == -1) {
+  if (connected && (ndb_mgm_disconnect(m_mgmsrv) == -1)) {
     ndbout_c("Could not disconnect from management server");
     printError();
   }
@@ -459,18 +471,21 @@ CommandInterpreter::disconnect()
 int 
 CommandInterpreter::execute(const char *_line, int _try_reconnect) 
 {
+  DBUG_ENTER("CommandInterpreter::execute");
+  DBUG_PRINT("info",("line=\"%s\"",_line));
+
   if (_try_reconnect >= 0)
     try_reconnect=_try_reconnect;
   char * line;
   if(_line == NULL) {
     //   ndbout << endl;
-    return false;
+    DBUG_RETURN(false);
   }
   line = my_strdup(_line,MYF(MY_WME));
   My_auto_ptr<char> ptr(line);
   
   if (emptyString(line)) {
-    return true;
+    DBUG_RETURN(true);
   }
   
   for (unsigned int i = 0; i < strlen(line); ++i) {
@@ -484,41 +499,49 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
   if (strcmp(firstToken, "HELP") == 0 ||
       strcmp(firstToken, "?") == 0) {
     executeHelp(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
+  }
+  else if (strcmp(firstToken, "CONNECT") == 0) {
+    executeConnect(allAfterFirstToken);
+    DBUG_RETURN(true);
   }
-  else if (strcmp(firstToken, "SHOW") == 0) {
+
+  if (!connect())
+    DBUG_RETURN(true);
+
+  if (strcmp(firstToken, "SHOW") == 0) {
     executeShow(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if (strcmp(firstToken, "SHUTDOWN") == 0) {
     executeShutdown(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if (strcmp(firstToken, "CLUSTERLOG") == 0){
     executeClusterLog(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if(strcmp(firstToken, "START") == 0 &&
 	  allAfterFirstToken != NULL &&
 	  strncmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){
     executeStartBackup(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if(strcmp(firstToken, "ABORT") == 0 &&
 	  allAfterFirstToken != NULL &&
 	  strncmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){
     executeAbortBackup(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if (strcmp(firstToken, "PURGE") == 0) {
     executePurge(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   } 
 #ifdef HAVE_GLOBAL_REPLICATION
   else if(strcmp(firstToken, "REPLICATION") == 0 ||
 	  strcmp(firstToken, "REP") == 0) {
     executeRep(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
 #endif // HAVE_GLOBAL_REPLICATION
   else if(strcmp(firstToken, "ENTER") == 0 &&
@@ -526,14 +549,14 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
 	  strncmp(allAfterFirstToken, "SINGLE USER MODE ", 
 		  sizeof("SINGLE USER MODE") - 1) == 0){
     executeEnterSingleUser(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if(strcmp(firstToken, "EXIT") == 0 &&
 	  allAfterFirstToken != NULL &&
 	  strncmp(allAfterFirstToken, "SINGLE USER MODE ", 
 		  sizeof("SINGLE USER MODE") - 1) == 0){
     executeExitSingleUser(allAfterFirstToken);
-    return true;
+    DBUG_RETURN(true);
   }
   else if (strcmp(firstToken, "ALL") == 0) {
     analyseAfterFirstToken(-1, allAfterFirstToken);
@@ -542,7 +565,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
 	  strcmp(firstToken, "EXIT") == 0 ||
 	  strcmp(firstToken, "BYE") == 0) && 
 	  allAfterFirstToken == NULL){
-    return false;
+    DBUG_RETURN(false);
   } else {
     /**
      * First token should be a digit, node ID
@@ -552,18 +575,18 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
     if (! convert(firstToken, nodeId)) {
       ndbout << "Invalid command: " << line << endl;
       ndbout << "Type HELP for help." << endl << endl;
-      return true;
+      DBUG_RETURN(true);
     }
 
     if (nodeId < 0) {
       ndbout << "Invalid node ID: " << firstToken << "." << endl;
-      return true;
+      DBUG_RETURN(true);
     }
     
     analyseAfterFirstToken(nodeId, allAfterFirstToken);
     
   }
-  return true;
+  DBUG_RETURN(true);
 }
 
 
@@ -692,7 +715,6 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun,
     ndbout_c("Trying to start all nodes of system.");
     ndbout_c("Use ALL STATUS to see the system start-up phases.");
   } else {
-    connect();
     struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
     if(cl == 0){
       ndbout_c("Unable get status from management server");
@@ -826,8 +848,6 @@ CommandInterpreter::executeHelp(char* parameters)
 void
 CommandInterpreter::executeShutdown(char* parameters) 
 { 
-  connect();
-
   ndb_mgm_cluster_state *state = ndb_mgm_get_status(m_mgmsrv);
   if(state == NULL) {
     ndbout_c("Could not get status");
@@ -979,7 +999,6 @@ CommandInterpreter::executePurge(char* parameters)
 
   int i;
   char *str;
-  connect();
   
   if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
     ndbout_c("Command failed");
@@ -999,7 +1018,6 @@ void
 CommandInterpreter::executeShow(char* parameters) 
 { 
   int i;
-  connect();
   if (emptyString(parameters)) {
     ndbout << "Cluster Configuration" << endl
 	   << "---------------------" << endl;
@@ -1087,6 +1105,12 @@ CommandInterpreter::executeShow(char* parameters)
   }
 }
 
+void
+CommandInterpreter::executeConnect(char* parameters) 
+{
+  disconnect();
+  connect();
+}
 
 //*****************************************************************************
 //*****************************************************************************
@@ -1094,7 +1118,6 @@ void
 CommandInterpreter::executeClusterLog(char* parameters) 
 {
   int i;
-  connect();
   if (parameters != 0 && strlen(parameters) != 0) {
     enum ndb_mgm_clusterlog_level severity = NDB_MGM_CLUSTERLOG_ALL;
     int isOk = true;
@@ -1240,7 +1263,6 @@ CommandInterpreter::executeClusterLog(char* parameters)
 void
 CommandInterpreter::executeStop(int processId, const char *, bool all) 
 {
-  connect();
   int result = 0;
   if(all) {
     result = ndb_mgm_stop(m_mgmsrv, 0, 0);
@@ -1262,7 +1284,6 @@ CommandInterpreter::executeStop(int processId, const char *, bool all)
 void
 CommandInterpreter::executeEnterSingleUser(char* parameters) 
 {
-  connect();
   strtok(parameters, " ");
   struct ndb_mgm_reply reply;
   char* id = strtok(NULL, " ");
@@ -1289,7 +1310,6 @@ CommandInterpreter::executeEnterSingleUser(char* parameters)
 void 
 CommandInterpreter::executeExitSingleUser(char* parameters) 
 {
-  connect();
   int result = ndb_mgm_exit_single_user(m_mgmsrv, 0);
   if (result != 0) {
     ndbout_c("Exiting single user mode failed.");
@@ -1304,7 +1324,6 @@ void
 CommandInterpreter::executeStart(int processId, const char* parameters,
 				 bool all) 
 {
-  connect();
   int result;
   if(all) {
     result = ndb_mgm_start(m_mgmsrv, 0, 0);
@@ -1328,7 +1347,6 @@ void
 CommandInterpreter::executeRestart(int processId, const char* parameters,
 				   bool all) 
 {
-  connect();
   int result;
   int nostart = 0;
   int initialstart = 0;
@@ -1378,7 +1396,6 @@ CommandInterpreter::executeDumpState(int processId, const char* parameters,
     ndbout << "Expected argument" << endl;
     return;
   }
-  connect();
 
   Uint32 no = 0;
   int pars[25];
@@ -1418,7 +1435,6 @@ CommandInterpreter::executeStatus(int processId,
     return;
   }
 
-  connect();
   ndb_mgm_node_status status;
   Uint32 startPhase, version;
   bool system;
@@ -1469,7 +1485,6 @@ void
 CommandInterpreter::executeLogLevel(int processId, const char* parameters, 
 				    bool all) 
 {
-  connect();
   (void) all;
   
   BaseString tmp(parameters);
@@ -1525,7 +1540,6 @@ void CommandInterpreter::executeError(int processId,
     return;
   }
 
-  connect();
   // Copy parameters since strtok will modify it
   char* newpar = my_strdup(parameters,MYF(MY_WME)); 
   My_auto_ptr<char> ap1(newpar);
@@ -1589,7 +1603,6 @@ void
 CommandInterpreter::executeLog(int processId,
 			       const char* parameters, bool all) 
 {
-  connect();
   struct ndb_mgm_reply reply;
   Vector<const char *> blocks;
   if (! parseBlockSpecification(parameters, blocks)) {
@@ -1657,7 +1670,6 @@ CommandInterpreter::executeTestOn(int processId,
     ndbout << "No parameters expected to this command." << endl;
     return;
   }
-  connect();
   struct ndb_mgm_reply reply;
   int result = ndb_mgm_start_signallog(m_mgmsrv, processId, &reply);
   if (result != 0) {
@@ -1676,7 +1688,6 @@ CommandInterpreter::executeTestOff(int processId,
     ndbout << "No parameters expected to this command." << endl;
     return;
   }
-  connect();
   struct ndb_mgm_reply reply;
   int result = ndb_mgm_stop_signallog(m_mgmsrv, processId, &reply);
   if (result != 0) {
@@ -1798,8 +1809,6 @@ CommandInterpreter::executeEventReporting(int processId,
     ndbout << "Expected argument" << endl;
     return;
   }
-  connect();
-
   BaseString tmp(parameters);
   Vector<BaseString> spec;
   tmp.split(spec, "=");
@@ -1850,7 +1859,6 @@ CommandInterpreter::executeEventReporting(int processId,
 void
 CommandInterpreter::executeStartBackup(char* /*parameters*/) 
 {
-  connect();
   struct ndb_mgm_reply reply;
   unsigned int backupId;
 
@@ -1897,8 +1905,6 @@ CommandInterpreter::executeStartBackup(char* /*parameters*/)
 void
 CommandInterpreter::executeAbortBackup(char* parameters) 
 {
-  connect();
-
   strtok(parameters, " ");
   struct ndb_mgm_reply reply;
   char* id = strtok(NULL, "\0");
@@ -1952,7 +1958,6 @@ CommandInterpreter::executeRep(char* parameters)
     return;
   }
 
-  connect();
   char * line = my_strdup(parameters,MYF(MY_WME));
   My_auto_ptr<char> ap1((char*)line);
   char * firstToken = strtok(line, " ");
diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp
index f32cc683296813adb1baf4d932bfa9996df32274..08d5d60cfab546c2d999e80d8f86e94864144d65 100644
--- a/ndb/src/mgmclient/main.cpp
+++ b/ndb/src/mgmclient/main.cpp
@@ -56,17 +56,18 @@ handler(int sig){
   }
 }
 
-
+static const char default_prompt[]= "ndb_mgm> ";
 static unsigned _try_reconnect;
 static char *opt_connect_str= 0;
+static const char *prompt= default_prompt;
 
 static struct my_option my_long_options[] =
 {
   NDB_STD_OPTS("ndb_mgm"),
   { "try-reconnect", 't',
-    "Specify number of retries for connecting to ndb_mgmd, default infinite", 
+    "Specify number of tries for connecting to ndb_mgmd (0 = infinite)", 
     (gptr*) &_try_reconnect, (gptr*) &_try_reconnect, 0,
-    GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+    GET_UINT, REQUIRED_ARG, 3, 0, 0, 0, 0, 0 },
   { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
 };
 static void short_usage_sub(void)
@@ -116,13 +117,13 @@ read_and_execute(int _try_reconnect)
   }
 #ifdef HAVE_READLINE
   /* Get a line from the user. */
-  line_read = readline ("ndb_mgm> ");    
+  line_read = readline (prompt);    
   /* If the line has any text in it, save it on the history. */
   if (line_read && *line_read)
     add_history (line_read);
 #else
   static char linebuffer[254];
-  fputs("ndb_mgm> ", stdout);
+  fputs(prompt, stdout);
   linebuffer[sizeof(linebuffer)-1]=0;
   line_read = fgets(linebuffer, sizeof(linebuffer)-1, stdin);
   if (line_read == linebuffer) {
@@ -155,12 +156,16 @@ int main(int argc, char** argv){
     opt_connect_str= buf;
   }
 
+  if (!isatty(0))
+  {
+    prompt= 0;
+  }
+
   ndbout << "-- NDB Cluster -- Management Client --" << endl;
-  printf("Connecting to Management Server: %s\n", opt_connect_str ? opt_connect_str : "default");
 
   signal(SIGPIPE, handler);
 
-  com = new Ndb_mgmclient(opt_connect_str);
+  com = new Ndb_mgmclient(opt_connect_str,1);
   while(read_and_execute(_try_reconnect));
   delete com;
   
diff --git a/ndb/src/mgmclient/ndb_mgmclient.hpp b/ndb/src/mgmclient/ndb_mgmclient.hpp
index f6bcebc38965db09aef481cf339165eb691f8dd5..ea592dfdf4ed0f02d5d1744ecbdfb4ad0cbeedd2 100644
--- a/ndb/src/mgmclient/ndb_mgmclient.hpp
+++ b/ndb/src/mgmclient/ndb_mgmclient.hpp
@@ -21,7 +21,7 @@ class CommandInterpreter;
 class Ndb_mgmclient
 {
 public:
-  Ndb_mgmclient(const char*);
+  Ndb_mgmclient(const char*,int verbose=0);
   ~Ndb_mgmclient();
   int execute(const char *_line, int _try_reconnect=-1);
   int execute(int argc, char** argv, int _try_reconnect=-1);
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 81b5eb9dfb3111c40e53dc005276bba07acd7eb5..986da71a8e81c25eff83946d80f443232acf8c87 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -432,15 +432,20 @@ MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
   theFacade = 0;
 
   m_newConfig = NULL;
-  m_configFilename.assign(config_filename);
+  if (config_filename)
+    m_configFilename.assign(config_filename);
+  else
+    m_configFilename.assign("config.ini");
 
   m_nextConfigGenerationNumber = 0;
 
   m_config_retriever= new ConfigRetriever(connect_string,
 					  NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
-
+  // if connect_string explicitly given or
+  // no config filename is given then
   // first try to allocate nodeid from another management server
-  if(m_config_retriever->do_connect(0,0,0) == 0)
+  if ((connect_string || config_filename == NULL) &&
+      (m_config_retriever->do_connect(0,0,0) == 0))
   {
     int tmp_nodeid= 0;
     tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp
index 84ff98626e51cd125d36b4272ff228bf5b2f3553..64a2cb35c1f4edace7214cbdc0c9f561ea36ec38 100644
--- a/ndb/src/mgmsrv/main.cpp
+++ b/ndb/src/mgmsrv/main.cpp
@@ -176,7 +176,6 @@ int main(int argc, char** argv)
 #endif
 
   global_mgmt_server_check = 1;
-  glob.config_filename= "config.ini";
 
   const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
   load_defaults("my",load_default_groups,&argc,&argv);
diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c
index bc49358cc63fb3bdfa2ab6a6b141cb7d902eb3fb..6744f4c1640326f8cd833543959c14a155aaa81f 100644
--- a/ndb/src/ndbapi/ndberror.c
+++ b/ndb/src/ndbapi/ndberror.c
@@ -241,11 +241,12 @@ ErrorBundle ErrorCodes[] = {
   { 877,  AE, "877" },
   { 878,  AE, "878" },
   { 879,  AE, "879" },
+  { 880,  AE, "Tried to read too much - too many getValue calls" },
   { 884,  AE, "Stack overflow in interpreter" },
   { 885,  AE, "Stack underflow in interpreter" },
   { 886,  AE, "More than 65535 instructions executed in interpreter" },
+  { 897,  AE, "Update attempt of primary key via ndbcluster internal api (if this occurs via the MySQL server it is a bug, please report)" },
   { 4256, AE, "Must call Ndb::init() before this function" },
-  { 880,  AE, "Tried to read too much - too many getValue calls" },
   { 4257, AE, "Tried to read too much - too many getValue calls" },
 
   /** 
diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc
index 091b2f46e06a071bb63af2377478ef5e22374ca2..c7384857d79c7afdd8b3427e56e6f4051d175caa 100644
--- a/sql/ha_innodb.cc
+++ b/sql/ha_innodb.cc
@@ -410,6 +410,9 @@ convert_error_code_to_mysql(
   	} else if (error == (int) DB_NO_SAVEPOINT) {
 
     		return(HA_ERR_NO_SAVEPOINT);
+  	} else if (error == (int) DB_LOCK_TABLE_FULL) {
+
+    		return(HA_ERR_LOCK_TABLE_FULL);
     	} else {
     		return(-1);			// Unknown error
     	}
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 5f67143065b12a730c675d019711518bcfd18e58..f759be59ffb8aab6d14dea454e5e68992cbb97ce 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -917,8 +917,8 @@ bool load_master_data(THD* thd)
         */
         int error;
 
-        if (init_master_info(active_mi, master_info_file, relay_log_info_file,
-			     0))
+        if (init_master_info(active_mi, master_info_file, relay_log_info_file, 
+			     0, (SLAVE_IO | SLAVE_SQL)))
           my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 	strmake(active_mi->master_log_name, row[0],
 		sizeof(active_mi->master_log_name));
diff --git a/sql/slave.cc b/sql/slave.cc
index 04ab4830312a36d35ca6cce972a2764ffd21dadc..dbe58275b9c088d5f09180fcb305d08234a1f8bc 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -160,7 +160,7 @@ int init_slave()
   }
     
   if (init_master_info(active_mi,master_info_file,relay_log_info_file,
-		       !master_host))
+		       !master_host, (SLAVE_IO | SLAVE_SQL)))
   {
     sql_print_error("Failed to initialize the master info structure");
     goto err;
@@ -1981,7 +1981,8 @@ void clear_until_condition(RELAY_LOG_INFO* rli)
 
 int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
 		     const char* slave_info_fname,
-		     bool abort_if_no_master_info_file)
+		     bool abort_if_no_master_info_file,
+		     int thread_mask)
 {
   int fd,error;
   char fname[FN_REFLEN+128];
@@ -1995,8 +1996,15 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
       last time. If this case pos_in_file would be set and we would
       get a crash when trying to read the signature for the binary
       relay log.
+      
+      We only rewind the read position if we are starting the SQL
+      thread. The handle_slave_sql thread assumes that the read
+      position is at the beginning of the file, and will read the
+      "signature" and then fast-forward to the last position read.
     */
-    my_b_seek(mi->rli.cur_log, (my_off_t) 0);
+    if (thread_mask & SLAVE_SQL) {
+      my_b_seek(mi->rli.cur_log, (my_off_t) 0);
+    }
     DBUG_RETURN(0);
   }
 
diff --git a/sql/slave.h b/sql/slave.h
index e73c81a1e6f386d00bc23df4f2ea9d9ae9b6d152..1abe166c944d5e9cfd0adc8ae198149dff6d1be4 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -520,7 +520,8 @@ void clear_until_condition(RELAY_LOG_INFO* rli);
 void clear_slave_error_timestamp(RELAY_LOG_INFO* rli);
 int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
 		     const char* slave_info_fname,
-		     bool abort_if_no_master_info_file);
+		     bool abort_if_no_master_info_file,
+		     int thread_mask);
 void end_master_info(MASTER_INFO* mi);
 int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname);
 void end_relay_log_info(RELAY_LOG_INFO* rli);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index d2e3e72618da845e800a909f06234198689a7685..0d1a7f3890d2d94895231731f3352dfc1c53b437 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -779,7 +779,8 @@ int start_slave(THD* thd , MASTER_INFO* mi,  bool net_report)
     thread_mask&= thd->lex->slave_thd_opt;
   if (thread_mask) //some threads are stopped, start them
   {
-    if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
+    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
+			 thread_mask))
       slave_errno=ER_MASTER_INFO;
     else if (server_id_supplied && *mi->host)
     {
@@ -1074,7 +1075,8 @@ bool change_master(THD* thd, MASTER_INFO* mi)
   thd->proc_info = "Changing master";
   LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
   // TODO: see if needs re-write
-  if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
+  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
+		       thread_mask))
   {
     my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
     unlock_slave_threads(mi);