From 3742297b1b41525f36f7e93cc76cc2aa28bcaa6a Mon Sep 17 00:00:00 2001
From: unknown <mikron@c-4d08e253.1238-1-64736c10.cust.bredbandsbolaget.se>
Date: Wed, 1 Feb 2006 10:06:07 +0100
Subject: [PATCH] WL 2826: First step in error handling of ALTER TABLE for
 partitioning

BUILD/SETUP.sh:
  Add possibility for BUILD scripts to add error inject flag
BUILD/compile-pentium-debug-max:
  Add error inject flag to this script
configure.in:
  Add handling of --with-error-inject in configure script
sql/ha_ndbcluster.cc:
  Add possibility to rename handler file
sql/ha_ndbcluster.h:
  Add possibility to rename handler file
sql/ha_partition.cc:
  Add possibility to rename handler file
sql/ha_partition.h:
  Add possibility to rename handler file
sql/handler.h:
  Add possibility to rename handler file
sql/mysql_priv.h:
  Add error inject macros
sql/mysqld.cc:
  Add error inject system variables
sql/set_var.cc:
  Add error inject system variables
sql/sql_class.h:
  Add error inject system variables
sql/sql_table.cc:
  Start modifying code for introducing table log, Step 1
sql/unireg.cc:
  Add rename flag to handler file call
sql/sql_partition.cc:
  Changes to ADD/DROP/CHANGE partitions
---
 BUILD/SETUP.sh                  |   1 +
 BUILD/compile-pentium-debug-max |   2 +-
 configure.in                    |  13 +++
 sql/ha_ndbcluster.cc            |   8 +-
 sql/ha_ndbcluster.h             |   3 +-
 sql/ha_partition.cc             |  27 +++++-
 sql/ha_partition.h              |   3 +-
 sql/handler.h                   |   6 +-
 sql/mysql_priv.h                |  30 +++++-
 sql/mysqld.cc                   |   6 ++
 sql/set_var.cc                  |  10 ++
 sql/sql_class.h                 |   4 +
 sql/sql_partition.cc            | 163 ++++++++++++++++++++-----------
 sql/sql_table.cc                | 167 ++++++++++++++++++--------------
 sql/unireg.cc                   |   2 +-
 15 files changed, 307 insertions(+), 138 deletions(-)

diff --git a/BUILD/SETUP.sh b/BUILD/SETUP.sh
index eece41d72e6..e76507b1a77 100755
--- a/BUILD/SETUP.sh
+++ b/BUILD/SETUP.sh
@@ -71,6 +71,7 @@ pentium_cflags="$check_cpu_cflags"
 pentium64_cflags="$check_cpu_cflags -m64"
 ppc_cflags="$check_cpu_cflags"
 sparc_cflags=""
+error_inject_flag="--with-error-inject "
 
 # be as fast as we can be without losing our ability to backtrace
 fast_cflags="-O3 -fno-omit-frame-pointer"
diff --git a/BUILD/compile-pentium-debug-max b/BUILD/compile-pentium-debug-max
index 7a11ad24c44..d065ed2bf71 100755
--- a/BUILD/compile-pentium-debug-max
+++ b/BUILD/compile-pentium-debug-max
@@ -3,7 +3,7 @@
 path=`dirname $0`
 . "$path/SETUP.sh" $@ --with-debug=full
 
-extra_flags="$pentium_cflags $debug_cflags $max_cflags"
+extra_flags="$pentium_cflags $debug_cflags $max_cflags $error_inject_flag"
 c_warnings="$c_warnings $debug_extra_warnings"
 cxx_warnings="$cxx_warnings $debug_extra_warnings"
 extra_configs="$pentium_configs $debug_configs $max_configs"
diff --git a/configure.in b/configure.in
index ffc311a7857..ad69365a1b9 100644
--- a/configure.in
+++ b/configure.in
@@ -654,6 +654,7 @@ else
   AC_MSG_RESULT([no])
 fi
 
+  
 MYSQL_SYS_LARGEFILE
 
 # Types that must be checked AFTER large file support is checked
@@ -1554,6 +1555,18 @@ then
   DEBUG_OPTIMIZE_CXX=""
 fi
 
+# If we should allow error injection tests
+AC_ARG_WITH(error-inject,
+    [ --with-error-inject    Enable error injection in MySQL Server],
+    [ with_error_inject=$withval ],
+    [ with_error_inject=no ])
+
+if test "$with_error_inject" = "yes"
+then
+  CFLAGS="-DERROR_INJECT_SUPPORT $CFLAGS"
+  CXXFLAGS="-DERROR_INJECT_SUPPORT $CXXFLAGS"
+fi
+
 AC_ARG_WITH(debug,
     [  --without-debug         Build a production version without debugging code],
     [with_debug=$withval],
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 85f43ba2757..deea59b8074 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -4399,7 +4399,9 @@ int ha_ndbcluster::create(const char *name,
   DBUG_RETURN(my_errno);
 }
 
-int ha_ndbcluster::create_handler_files(const char *file) 
+int ha_ndbcluster::create_handler_files(const char *file,
+                                        const char *old_name,
+                                        bool rename_flag) 
 { 
   const char *name;
   Ndb* ndb;
@@ -4410,6 +4412,10 @@ int ha_ndbcluster::create_handler_files(const char *file)
 
   DBUG_ENTER("create_handler_files");
 
+  if (rename_flag)
+  {
+    DBUG_RETURN(FALSE);
+  }
   if (!(ndb= get_ndb()))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index a62356d41ab..3ec6ada9a98 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -588,7 +588,8 @@ class ha_ndbcluster: public handler
   int rename_table(const char *from, const char *to);
   int delete_table(const char *name);
   int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
-  int create_handler_files(const char *file);
+  int create_handler_files(const char *file, const char *old_name,
+                           bool rename_flag);
   int get_default_no_partitions(ulonglong max_rows);
   bool get_no_parts(const char *name, uint *no_parts);
   void set_auto_partitions(partition_info *part_info);
diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc
index 50299dffd85..d318c4ab348 100644
--- a/sql/ha_partition.cc
+++ b/sql/ha_partition.cc
@@ -563,7 +563,9 @@ int ha_partition::rename_table(const char *from, const char *to)
     and types of engines in the partitions.
 */
 
-int ha_partition::create_handler_files(const char *name)
+int ha_partition::create_handler_files(const char *path,
+                                       const char *old_path,
+                                       bool rename_flag)
 {
   DBUG_ENTER("ha_partition::create_handler_files()");
 
@@ -571,10 +573,27 @@ int ha_partition::create_handler_files(const char *name)
     We need to update total number of parts since we might write the handler
     file as part of a partition management command
   */
-  if (create_handler_file(name))
+  if (rename_flag)
   {
-    my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
-    DBUG_RETURN(1);
+    char name[FN_REFLEN];
+    char old_name[FN_REFLEN];
+    char *par_str= ".par";
+
+    strxmov(name, path, par_str, NullS);
+    strxmov(old_name, old_path, par_str, NullS);
+    if (my_delete(name, MYF(MY_WME)) ||
+        my_rename(old_name, name, MYF(MY_WME)))
+    {
+      DBUG_RETURN(TRUE);
+    }
+  }
+  else
+  {
+    if (create_handler_file(path))
+    {
+      my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
+      DBUG_RETURN(1);
+    }
   }
   DBUG_RETURN(0);
 }
diff --git a/sql/ha_partition.h b/sql/ha_partition.h
index 35435844064..f62231bb11e 100644
--- a/sql/ha_partition.h
+++ b/sql/ha_partition.h
@@ -179,7 +179,8 @@ class ha_partition :public handler
   virtual int rename_table(const char *from, const char *to);
   virtual int create(const char *name, TABLE *form,
 		     HA_CREATE_INFO *create_info);
-  virtual int create_handler_files(const char *name);
+  virtual int create_handler_files(const char *name,
+                                   const char *old_name, bool rename_flag);
   virtual void update_create_info(HA_CREATE_INFO *create_info);
   virtual char *update_table_comment(const char *comment);
   virtual int change_partitions(HA_CREATE_INFO *create_info,
diff --git a/sql/handler.h b/sql/handler.h
index d29c499f954..b6a925695b8 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1779,7 +1779,11 @@ class handler :public Sql_alloc
   virtual void drop_table(const char *name);
   
   virtual int create(const char *name, TABLE *form, HA_CREATE_INFO *info)=0;
-  virtual int create_handler_files(const char *name) { return FALSE;}
+  virtual int create_handler_files(const char *name, const char *old_name,
+                                   bool rename_flag)
+  {
+    return FALSE;
+  }
 
   virtual int change_partitions(HA_CREATE_INFO *create_info,
                                 const char *path,
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 589ca1349c1..b501e4c7663 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -603,6 +603,29 @@ struct Query_cache_query_flags
 #define query_cache_invalidate_by_MyISAM_filename_ref NULL
 #endif /*HAVE_QUERY_CACHE*/
 
+/*
+  Error injector Macros to enable easy testing of recovery after failures
+  in various error cases.
+*/
+#ifndef ERROR_INJECT_SUPPORT
+#define ERROR_INJECTOR(x)
+#define ERROR_INJECTOR_ACTION(x)
+#define ERROR_INJECTOR_CRASH(x)
+#else
+
+inline bool
+my_error_inject(int error)
+{
+  return (current_thd->variables.error_inject_code == error) ? 1 : 0;
+}
+
+#define ERROR_INJECTOR_CRASH(code) \
+  (my_error_inject((code)) ? ((DBUG_ASSERT(0)), 0) : 0
+#define ERROR_INJECTOR_ACTION(code, action) \
+  (my_error_inject((code)) ? ((action), 0) : 0
+#define ERROR_INJECT(code) \
+  (my_error_inject((code)) ? 1 : 0)
+#endif
 uint build_table_path(char *buff, size_t bufflen, const char *db,
                       const char *table, const char *ext);
 void write_bin_log(THD *thd, bool clear_error,
@@ -1099,9 +1122,10 @@ typedef struct st_lock_param_type
 } ALTER_PARTITION_PARAM_TYPE;
 
 void mem_alloc_error(size_t size);
-#define WFRM_INITIAL_WRITE 1
-#define WFRM_CREATE_HANDLER_FILES 2
-#define WFRM_PACK_FRM 4
+bool write_table_log(ALTER_PARTITION_PARAM_TYPE *lpt);
+#define WFRM_WRITE_SHADOW 1
+#define WFRM_INSTALL_SHADOW 2
+#define WFRM_PACK_FRM
 bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags);
 bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
 void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f5b93e6a5e5..07e2abc1de5 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -7055,6 +7055,12 @@ static void mysql_init_variables(void)
   max_system_variables.max_join_size=   (ulonglong) HA_POS_ERROR;
   global_system_variables.old_passwords= 0;
   global_system_variables.old_alter_table= 0;
+#ifdef ERROR_INJECT_SUPPORT
+  global_system_variables.error_inject_code= 0;
+  global_system_variables.error_inject_value= 0;
+  max_system_variables.error_inject_code= ~0;
+  max_system_variables.error_inject_value= ~0;
+#endif
   
   /*
     Default behavior for 4.1 and 5.0 is to treat NULL values as unequal
diff --git a/sql/set_var.cc b/sql/set_var.cc
index b85b2576b83..bc6c68e87f6 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -215,6 +215,12 @@ sys_var_long_ptr	sys_delayed_insert_timeout("delayed_insert_timeout",
 						   &delayed_insert_timeout);
 sys_var_long_ptr	sys_delayed_queue_size("delayed_queue_size",
 					       &delayed_queue_size);
+#ifdef ERROR_INJECT_SUPPORT
+sys_var_long_ptr	sys_error_inject_code("error_inject_code",
+					      &error_inject_code);
+sys_var_long_ptr	sys_error_inject_value("error_inject_value",
+					       &error_inject_value);
+#endif
 sys_var_event_executor        sys_event_executor("event_scheduler",
                                                &event_executor_running_global_var);
 sys_var_long_ptr	sys_expire_logs_days("expire_logs_days",
@@ -729,6 +735,10 @@ SHOW_VAR init_vars[]= {
   {sys_div_precincrement.name,(char*) &sys_div_precincrement,SHOW_SYS},
   {sys_engine_condition_pushdown.name, 
    (char*) &sys_engine_condition_pushdown,                          SHOW_SYS},
+#ifdef ERROR_INJECT_SUPPORT
+  {sys_error_inject_code.name,(char*) &sys_error_inject_code,       SHOW_SYS},
+  {sys_error_inject_value.name,(char*)&sys_error_inject_value,      SHOW_SYS},
+#endif
   {sys_event_executor.name,   (char*) &sys_event_executor,          SHOW_SYS},
   {sys_expire_logs_days.name, (char*) &sys_expire_logs_days,        SHOW_SYS},
   {sys_flush.name,             (char*) &sys_flush,                  SHOW_SYS},
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 00440449be8..bf6633f98ed 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -186,6 +186,10 @@ struct system_variables
   ha_rows max_join_size;
   ulong auto_increment_increment, auto_increment_offset;
   ulong bulk_insert_buff_size;
+#ifdef ERROR_INJECT_SUPPORT
+  ulong error_inject_code;
+  ulong error_inject_value;
+#endif
   ulong join_buff_size;
   ulong long_query_time;
   ulong max_allowed_packet;
diff --git a/sql/sql_partition.cc b/sql/sql_partition.cc
index c29f5ef5650..114ac273d52 100644
--- a/sql/sql_partition.cc
+++ b/sql/sql_partition.cc
@@ -5244,7 +5244,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
       1) Write the new frm, pack it and then delete it
       2) Perform the change within the handler
     */
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE | WFRM_PACK_FRM)) ||
+    if ((mysql_write_frm(lpt, WFRM_WRITE_SHADOW | WFRM_PACK_FRM)) ||
         (mysql_change_partitions(lpt)))
     {
       fast_alter_partition_error_handler(lpt);
@@ -5275,29 +5275,56 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
       after a DROP PARTITION) if one ensured that failed accesses to the
       dropped partitions was aborted for sure (thus only possible for
       transactional engines).
-      
-      1) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
+
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the new frm file as a shadow frm
+      2) Write the table log to ensure that the operation is completed
+         even in the presence of a MySQL Server crash
+      3) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
          the table have completed
-      2) Write the new frm file where the partitions have changed but are
-         still remaining with the state PART_TO_BE_DROPPED
-      3) Write the bin log
-      4) Prepare MyISAM handlers for drop of partitions
-      5) Ensure that any users that has opened the table but not yet
+      4) Write the bin log
+         Unfortunately the writing of the binlog is not synchronised with
+         other logging activities. So no matter in which order the binlog
+         is written compared to other activities there will always be cases
+         where crashes make strange things occur. In this placement it can
+         happen that the ALTER TABLE DROP PARTITION gets performed in the
+         master but not in the slaves if we have a crash, after writing the
+         table log but before writing the binlog. A solution to this would
+         require writing the statement first in the table log and then
+         when recovering from the crash read the binlog and insert it into
+         the binlog if not written already.
+      5) Install the previously written shadow frm file
+      6) Ensure that any users that has opened the table but not yet
          reached the abort lock do that before downgrading the lock.
-      6) Drop the partitions
-      7) Write the frm file that the partition has been dropped
-      8) Wait until all accesses using the old frm file has completed
-      9) Complete query
+      7) Prepare MyISAM handlers for drop of partitions
+      8) Drop the partitions
+      9) Remove entries from table log
+      10) Wait until all accesses using the old frm file has completed
+      11) Complete query
+
+      We insert Error injections at all places where it could be interesting
+      to test if recovery is properly done.
     */
-    if ((abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
+    if (write_log_shadow_frm(lpt) ||
+        ERROR_INJECTOR_CRASH(1000) ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECTOR_CRASH(1001) ||
+        write_log_drop_partition(lpt) ||
+        ERROR_INJECTOR_CRASH(1002) ||
+        abort_and_upgrade_lock(lpt) ||
         ((!thd->lex->no_write_to_binlog) &&
-         (write_bin_log(thd, FALSE,
-                       thd->query, thd->query_length), FALSE)) ||
-        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
+         write_bin_log(thd, FALSE,
+                       thd->query, thd->query_length), FALSE) ||
+        ERROR_INJECTOR_CRASH(1003) ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
         (close_open_tables_and_downgrade(lpt), FALSE) || 
-        (mysql_drop_partitions(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
+        ERROR_INJECTOR_CRASH(1004) ||
+        table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE) ||
+        ERROR_INJECTOR_CRASH(1005) ||
+        mysql_drop_partitions(lpt) ||
+        ERROR_INJECTOR_CRASH(1006) ||
+        write_log_completed(lpt) ||
+        ERROR_INJECTOR_CRASH(1007) ||
         (mysql_wait_completed_table(lpt, table), FALSE))
     {
       fast_alter_partition_error_handler(lpt);
@@ -5317,26 +5344,38 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
       miss updates made by a transaction serialised before it that are
       inserted into the new partition.
 
-      1) Write the new frm file where state of added partitions is
-         changed to PART_TO_BE_ADDED
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the new frm file as a shadow frm file
+      2) Log the changes to happen in table log
       2) Add the new partitions
       3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
          are still using the old partitioning scheme. Wait until all
          ongoing users have completed before progressing.
-      4) Write a new frm file of the table where the partitions are added
-         to the table.
-      5) Write binlog
+      4) Write binlog
+      5) Install the new frm file of the table where the partitions are
+         added to the table.
       6) Wait until all accesses using the old frm file has completed
-      7) Complete query
+      7) Remove entries from table log
+      8) Complete query
     */
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
-        (mysql_change_partitions(lpt)) ||
-        (abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
+    if (write_log_shadow_frm(lpt) ||
+        ERROR_INJECTED_CRASH(1010) ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECTED_CRASH(1011) ||
+        write_log_add_partition(lpt) ||
+        ERROR_INJECTED_CRASH(1012) ||
+        mysql_change_partitions(lpt) ||
+        ERROR_INJECTED_CRASH(1013) ||
+        abort_and_upgrade_lock(lpt) ||
         ((!thd->lex->no_write_to_binlog) &&
          (write_bin_log(thd, FALSE,
                         thd->query, thd->query_length), FALSE)) ||
-        (close_open_tables_and_downgrade(lpt), FALSE))
+        ERROR_INJECTED_CRASH(1014) ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
+        ERROR_INJECTED_CRASH(1015) ||
+        (close_open_tables_and_downgrade(lpt), FALSE) ||
+        write_log_completed(lpt) ||
+        ERROR_INJECTED_CRASH(1016)) 
     {
       fast_alter_partition_error_handler(lpt);
       DBUG_RETURN(TRUE);
@@ -5375,40 +5414,56 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
       use a lower lock level. This can be handled inside store_lock in the
       respective handler.
 
-      1) Write the new frm file where state of added partitions is
-         changed to PART_TO_BE_ADDED and the reorganised partitions
-         are set in state PART_TO_BE_REORGED.
-      2) Add the new partitions
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the shadow frm file of new partitioning
+      2) Log such that temporary partitions added in change phase are
+         removed in a crash situation
+      3) Add the new partitions
          Copy from the reorganised partitions to the new partitions
-      3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
+      4) Log that operation is completed and log all complete actions
+         needed to complete operation from here
+      5) Lock all partitions in TL_WRITE_ONLY to ensure that no users
          are still using the old partitioning scheme. Wait until all
          ongoing users have completed before progressing.
-      4) Prepare MyISAM handlers for rename and delete of partitions
-      5) Write a new frm file of the table where the partitions are
-         reorganised.
-      6) Rename the reorged partitions such that they are no longer
+      6) Prepare MyISAM handlers for rename and delete of partitions
+      7) Rename the reorged partitions such that they are no longer
          used and rename those added to their real new names.
-      7) Write bin log
-      8) Wait until all accesses using the old frm file has completed
-      9) Drop the reorganised partitions
-      10)Write a new frm file of the table where the partitions are
-         reorganised.
-      11)Wait until all accesses using the old frm file has completed
-      12)Complete query
+      8) Write bin log
+      9) Install the shadow frm file
+      10) Wait until all accesses using the old frm file has completed
+      11) Drop the reorganised partitions
+      12) Remove log entry
+      13)Wait until all accesses using the old frm file has completed
+      14)Complete query
     */
 
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
-        (mysql_change_partitions(lpt)) ||
-        (abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
-        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
-        (mysql_rename_partitions(lpt)) ||
+    if (write_log_shadow_frm(lpt) ||
+        ERROR_INJECT_CRASH(1020) ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECT_CRASH(1021) ||
+        write_log_ph1_change_partition(lpt) ||
+        ERROR_INJECT_CRASH(1022) ||
+        mysql_change_partitions(lpt) ||
+        ERROR_INJECT_CRASH(1023) ||
+        write_log_ph2_change_partition(lpt) ||
+        ERROR_INJECT_CRASH(1024) ||
+        abort_and_upgrade_lock(lpt) ||
+        table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE) ||
+        ERROR_INJECT_CRASH(1025) ||
+        mysql_rename_partitions(lpt) ||
+        ERROR_INJECT_CRASH(1026) ||
         ((!thd->lex->no_write_to_binlog) &&
          (write_bin_log(thd, FALSE,
                         thd->query, thd->query_length), FALSE)) ||
+        ERROR_INJECT_CRASH(1027) ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
+        ERROR_INJECT_CRASH(1028) ||
         (close_open_tables_and_downgrade(lpt), FALSE) ||
-        (mysql_drop_partitions(lpt)) ||
-        (mysql_write_frm(lpt, 0UL)) ||
+        ERROR_INJECT_CRASH(1029) ||
+        mysql_drop_partitions(lpt) ||
+        ERROR_INJECT_CRASH(1030) ||
+        write_log_completed(lpt) ||
+        ERROR_INJECT_CRASH(1031) ||
         (mysql_wait_completed_table(lpt, table), FALSE))
     {
         fast_alter_partition_error_handler(lpt);
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 2938f27c58d..809a70a0d45 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -245,6 +245,28 @@ static int mysql_copy_key_list(List<Key> *orig_key,
 }
 
 
+/*
+  SYNOPSIS
+    write_table_log()
+    lpt                       Struct carrying parameters to the function
+
+  RETURN VALUES
+    TRUE                      Failure in writing the log
+    FALSE                     Success
+
+  DESCRIPTION
+    A careful write of the table log is performed to ensure that we can
+    handle crashes occurring during CREATE and ALTER TABLE processing.
+*/
+
+bool
+write_table_log(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  DBUG_ENTER("write_table_log");
+  DBUG_RETURN(FALSE);
+}
+
+
 /*
   SYNOPSIS
     mysql_write_frm()
@@ -277,83 +299,66 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
   */
   int error= 0;
   char path[FN_REFLEN+1];
+  char shadow_path[FN_REFLEN+1];
+  char shadow_frm_name[FN_REFLEN+1];
   char frm_name[FN_REFLEN+1];
   DBUG_ENTER("mysql_write_frm");
 
-  if (flags & WFRM_INITIAL_WRITE)
+  /*
+    Build shadow frm file name
+  */
+  build_table_filename(shadow_path, sizeof(path), lpt->db,
+                       lpt->table_name, "#");
+  strxmov(shadow_frm_name, path, reg_ext, NullS);
+  if (flags & WFRM_WRITE_SHADOW)
   {
-    error= mysql_copy_create_list(lpt->create_list,
-                                  &lpt->new_create_list);
-    error+= mysql_copy_key_list(lpt->key_list,
-                                &lpt->new_key_list);
-    if (error)
+    if (mysql_copy_create_list(lpt->create_list,
+                               &lpt->new_create_list) ||
+        mysql_copy_key_list(lpt->key_list,
+                            &lpt->new_key_list) ||
+        mysql_prepare_table(lpt->thd, lpt->create_info,
+                            &lpt->new_create_list,
+                            &lpt->new_key_list,
+                            /*tmp_table*/ 1,
+                            &lpt->db_options,
+                            lpt->table->file,
+                            &lpt->key_info_buffer,
+                            &lpt->key_count,
+                            /*select_field_count*/ 0)))
     {
       DBUG_RETURN(TRUE);
     }
-  }
-  build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
-  strxmov(frm_name, path, reg_ext, NullS);
-  if ((flags & WFRM_INITIAL_WRITE) &&
-      (mysql_prepare_table(lpt->thd, lpt->create_info, &lpt->new_create_list,
-                           &lpt->new_key_list,/*tmp_table*/ 1, &lpt->db_options,
-                           lpt->table->file, &lpt->key_info_buffer,
-                           &lpt->key_count, /*select_field_count*/ 0)))
-  {
-    DBUG_RETURN(TRUE);
-  }
 #ifdef WITH_PARTITION_STORAGE_ENGINE
-  {
-    partition_info *part_info= lpt->table->part_info;
-    char *part_syntax_buf;
-    uint syntax_len, i;
-    bool any_unnormal_state= FALSE;
-
-    if (part_info)
     {
-      uint max_part_state_len= part_info->partitions.elements +
-                           part_info->temp_partitions.elements;
-      if (!(part_info->part_state= (uchar*)sql_alloc(max_part_state_len)))
-      {
-        DBUG_RETURN(TRUE);
-      }
-      part_info->part_state_len= 0;
-      if (!(part_syntax_buf= generate_partition_syntax(part_info,
-                                                       &syntax_len,
-                                                       TRUE, FALSE)))
-      {
-        DBUG_RETURN(TRUE);
-      }
-      for (i= 0; i < part_info->part_state_len; i++)
-      {
-        enum partition_state part_state=
-                        (enum partition_state)part_info->part_state[i];
-        if (part_state != PART_NORMAL && part_state != PART_IS_ADDED)
-          any_unnormal_state= TRUE;
-      }
-      if (!any_unnormal_state)
+      partition_info *part_info= lpt->table->part_info;
+      char *part_syntax_buf;
+      uint syntax_len;
+
+      if (part_info)
       {
-        part_info->part_state= NULL;
-        part_info->part_state_len= 0;
+        if (!(part_syntax_buf= generate_partition_syntax(part_info,
+                                                         &syntax_len,
+                                                         TRUE, FALSE)))
+        {
+          DBUG_RETURN(TRUE);
+        }
+        part_info->part_info_string= part_syntax_buf;
+        part_info->part_info_len= syntax_len;
       }
-      part_info->part_info_string= part_syntax_buf;
-      part_info->part_info_len= syntax_len;
     }
-  }
 #endif
-  /*
-    We write the frm file with the LOCK_open mutex since otherwise we could
-    overwrite the frm file as another is reading it in open_table.
-  */
-  lpt->create_info->table_options= lpt->db_options;
-  VOID(pthread_mutex_lock(&LOCK_open));
-  if ((mysql_create_frm(lpt->thd, frm_name, lpt->db, lpt->table_name,
-                        lpt->create_info, lpt->new_create_list, lpt->key_count,
-                        lpt->key_info_buffer, lpt->table->file)) ||
-      ((flags & WFRM_CREATE_HANDLER_FILES) &&
-       lpt->table->file->create_handler_files(path)))
-  {
-    error= 1;
-    goto end;
+    /* Write shadow frm file */
+    lpt->create_info->table_options= lpt->db_options;
+    if ((mysql_create_frm(lpt->thd, shadow_frm_name, lpt->db,
+                          lpt->table_name, lpt->create_info,
+                          lpt->new_create_list, lpt->key_count,
+                          lpt->key_info_buffer, lpt->table->file)) ||
+         lpt->table->file->create_handler_files(shadow_path, NULL, FALSE))
+    {
+      my_delete(shadow_frm_name, MYF(0));
+      error= 1;
+      goto end;
+    }
   }
   if (flags & WFRM_PACK_FRM)
   {
@@ -365,7 +370,7 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
     */
     const void *data= 0;
     uint length= 0;
-    if (readfrm(path, &data, &length) ||
+    if (readfrm(shadow_path, &data, &length) ||
         packfrm(data, length, &lpt->pack_frm_data, &lpt->pack_frm_len))
     {
       my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
@@ -374,11 +379,31 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
       error= 1;
       goto end;
     }
-    error= my_delete(frm_name, MYF(MY_WME));
+    error= my_delete(shadow_frm_name, MYF(MY_WME));
   }
-  /* Frm file have been updated to reflect the change about to happen.  */
+  if (flags & WFRM_INSTALL_SHADOW)
+  {
+    /*
+      Build frm file name
+    */
+    build_table_filename(path, sizeof(path), lpt->db,
+                         lpt->table_name, "");
+    strxmov(frm_name, path, reg_ext, NullS);
+    /*
+      When we are changing to use new frm file we need to ensure that we
+      don't collide with another thread in process to open the frm file.
+    */
+    VOID(pthread_mutex_lock(&LOCK_open));
+    if (my_delete(frm_name, MYF(MY_WME)) ||
+        my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
+        lpt->table->file->create_handler_files(path, shadow_path, TRUE))
+    {
+      error= 1;
+    }
+    VOID(pthread_mutex_unlock(&LOCK_open));
+  }
+
 end:
-  VOID(pthread_mutex_unlock(&LOCK_open));
   DBUG_RETURN(error);
 }
 
@@ -4631,7 +4656,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
       error= (mysql_create_frm(thd, reg_path, db, table_name,
                                create_info, prepared_create_list, key_count,
                                key_info_buffer, table->file) ||
-              table->file->create_handler_files(reg_path));
+              table->file->create_handler_files(reg_path, NULL, FALSE));
       VOID(pthread_mutex_unlock(&LOCK_open));
       if (error)
         goto err;
@@ -4677,7 +4702,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
       error= (mysql_create_frm(thd, reg_path, db, table_name,
                                create_info, prepared_create_list, key_count,
                                key_info_buffer, table->file) ||
-              table->file->create_handler_files(reg_path));
+              table->file->create_handler_files(reg_path, NULL, FALSE));
       VOID(pthread_mutex_unlock(&LOCK_open));
       if (error)
         goto err;
@@ -4900,7 +4925,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
       VOID(pthread_mutex_lock(&LOCK_open));
     }
     /* Tell the handler that a new frm file is in place. */
-    if (table->file->create_handler_files(reg_path))
+    if (table->file->create_handler_files(reg_path, NULL, FALSE))
     {
       VOID(pthread_mutex_unlock(&LOCK_open));
       goto err;
diff --git a/sql/unireg.cc b/sql/unireg.cc
index 2c5f4b34091..ab362b130b0 100644
--- a/sql/unireg.cc
+++ b/sql/unireg.cc
@@ -330,7 +330,7 @@ int rea_create_table(THD *thd, const char *path,
 
   // Make sure mysql_create_frm din't remove extension
   DBUG_ASSERT(*fn_rext(frm_name));
-  if (file->create_handler_files(path))
+  if (file->create_handler_files(path, NULL, FALSE))
     goto err_handler;
   if (!create_info->frm_only && ha_create_table(thd, path, db, table_name,
                                                 create_info,0))
-- 
2.30.9