Commit f8909b24 authored by pbair's avatar pbair

Add migration helper to swap partitioned tables

Add a new migration helper that completes the last step of a
partitioning migration: swapping the partitioned copy with the original.
In addition to renaming the tables, it also renames the primary keys and
moves the primary key sequences appropriately.
parent d802e948
# frozen_string_literal: true
module Gitlab
module Database
module Partitioning
class ReplaceTable
DELIMITER = ";\n\n"
attr_reader :original_table, :replacement_table, :replaced_table, :primary_key_column,
:sequence, :original_primary_key, :replacement_primary_key, :replaced_primary_key
def initialize(original_table, replacement_table, replaced_table, primary_key_column)
@original_table = original_table
@replacement_table = replacement_table
@replaced_table = replaced_table
@primary_key_column = primary_key_column
@sequence = default_sequence(original_table, primary_key_column)
@original_primary_key = default_primary_key(original_table)
@replacement_primary_key = default_primary_key(replacement_table)
@replaced_primary_key = default_primary_key(replaced_table)
end
def perform
yield sql_to_replace_table if block_given?
execute(sql_to_replace_table)
end
private
delegate :execute, :quote_table_name, :quote_column_name, to: :connection
def connection
@connection ||= ActiveRecord::Base.connection
end
def default_sequence(table, column)
"#{table}_#{column}_seq"
end
def default_primary_key(table)
"#{table}_pkey"
end
def sql_to_replace_table
@sql_to_replace_table ||= [
drop_default_sql(original_table, primary_key_column),
set_default_sql(replacement_table, primary_key_column, "nextval('#{quote_table_name(sequence)}'::regclass)"),
change_sequence_owner_sql(sequence, replacement_table, primary_key_column),
rename_table_sql(original_table, replaced_table),
rename_constraint_sql(replaced_table, original_primary_key, replaced_primary_key),
rename_table_sql(replacement_table, original_table),
rename_constraint_sql(original_table, replacement_primary_key, original_primary_key)
].join(DELIMITER)
end
def drop_default_sql(table, column)
"ALTER TABLE #{quote_table_name(table)} ALTER COLUMN #{quote_column_name(column)} DROP DEFAULT"
end
def set_default_sql(table, column, expression)
"ALTER TABLE #{quote_table_name(table)} ALTER COLUMN #{quote_column_name(column)} SET DEFAULT #{expression}"
end
def change_sequence_owner_sql(sequence, table, column)
"ALTER SEQUENCE #{quote_table_name(sequence)} OWNED BY #{quote_table_name(table)}.#{quote_column_name(column)}"
end
def rename_table_sql(old_name, new_name)
"ALTER TABLE #{quote_table_name(old_name)} RENAME TO #{quote_table_name(new_name)}"
end
def rename_constraint_sql(table, old_name, new_name)
"ALTER TABLE #{quote_table_name(table)} RENAME CONSTRAINT #{quote_column_name(old_name)} TO #{quote_column_name(new_name)}"
end
end
end
end
end
......@@ -66,7 +66,10 @@ module Gitlab
create_range_partitioned_copy(table_name, partitioned_table_name, partition_column, primary_key)
create_daterange_partitions(partitioned_table_name, partition_column.name, min_date, max_date)
end
create_trigger_to_sync_tables(table_name, partitioned_table_name, primary_key)
with_lock_retries do
create_trigger_to_sync_tables(table_name, partitioned_table_name, primary_key)
end
end
# Clean up a partitioned copy of an existing table. First, deletes the database function and trigger that were
......@@ -81,13 +84,9 @@ module Gitlab
assert_not_in_transaction_block(scope: ERROR_SCOPE)
with_lock_retries do
trigger_name = make_sync_trigger_name(table_name)
drop_trigger(table_name, trigger_name)
drop_sync_trigger(table_name)
end
function_name = make_sync_function_name(table_name)
drop_function(function_name)
partitioned_table_name = make_partitioned_table_name(table_name)
drop_table(partitioned_table_name)
end
......@@ -177,6 +176,52 @@ module Gitlab
end
end
# Replaces a non-partitioned table with its partitioned copy. This is the final step in a partitioning
# migration, which makes the partitioned table ready for use by the application. The partitioned copy should be
# replaced with the original table in such a way that it appears seamless to any database clients. The replaced
# table will be renamed to "#{replaced_table}_archived"
#
# **NOTE** This method should only be used after all other migration steps have completed successfully.
# There are several limitations to this method that MUST be handled before, or during, the swap migration:
#
# - Secondary indexes and foreign keys are not automatically recreated on the partitioned table.
# - Some types of constraints (UNIQUE and EXCLUDE) which rely on indexes, will not automatically be recreated
# on the partitioned table, since the underlying index will not be present.
# - Foreign keys referencing the original non-partitioned table, would also need to be updated to reference the
# partitioned table, but unfortunately this is not supported in PG11.
# - Views referencing the original table will not be automatically updated to reference the partitioned table.
#
# Example:
#
# replace_with_partitioned_table :audit_events
#
def replace_with_partitioned_table(table_name)
assert_table_is_allowed(table_name)
partitioned_table_name = make_partitioned_table_name(table_name)
archived_table_name = make_archived_table_name(table_name)
primary_key_name = connection.primary_key(table_name)
replace_table(table_name, partitioned_table_name, archived_table_name, primary_key_name)
end
# Rolls back a migration that replaced a non-partitioned table with its partitioned copy. This can be used to
# restore the original non-partitioned table in the event of an unexpected issue.
#
# Example:
#
# rollback_replace_with_partitioned_table :audit_events
#
def rollback_replace_with_partitioned_table(table_name)
assert_table_is_allowed(table_name)
partitioned_table_name = make_partitioned_table_name(table_name)
archived_table_name = make_archived_table_name(table_name)
primary_key_name = connection.primary_key(archived_table_name)
replace_table(table_name, archived_table_name, partitioned_table_name, primary_key_name)
end
private
def assert_table_is_allowed(table_name)
......@@ -190,6 +235,10 @@ module Gitlab
tmp_table_name("#{table}_part")
end
def make_archived_table_name(table)
"#{table}_archived"
end
def make_sync_function_name(table)
object_name(table, 'table_sync_function')
end
......@@ -270,12 +319,18 @@ module Gitlab
function_name = make_sync_function_name(source_table_name)
trigger_name = make_sync_trigger_name(source_table_name)
with_lock_retries do
create_sync_function(function_name, partitioned_table_name, unique_key)
create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table_name} table")
create_sync_function(function_name, partitioned_table_name, unique_key)
create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table_name} table")
create_sync_trigger(source_table_name, trigger_name, function_name)
end
create_sync_trigger(source_table_name, trigger_name, function_name)
end
def drop_sync_trigger(source_table_name)
trigger_name = make_sync_trigger_name(source_table_name)
drop_trigger(source_table_name, trigger_name)
function_name = make_sync_function_name(source_table_name)
drop_function(function_name)
end
def create_sync_function(name, partitioned_table_name, unique_key)
......@@ -358,6 +413,21 @@ module Gitlab
end
end
end
def replace_table(original_table_name, replacement_table_name, replaced_table_name, primary_key_name)
replace_table = Gitlab::Database::Partitioning::ReplaceTable.new(original_table_name,
replacement_table_name, replaced_table_name, primary_key_name)
with_lock_retries do
drop_sync_trigger(original_table_name)
replace_table.perform do |sql|
say("replace_table(\"#{sql}\")")
end
create_trigger_to_sync_tables(original_table_name, replaced_table_name, primary_key_name)
end
end
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::Partitioning::ReplaceTable, '#perform' do
include TableSchemaHelpers
subject { described_class.new(original_table, replacement_table, archived_table, 'id') }
let(:original_table) { '_test_original_table' }
let(:replacement_table) { '_test_replacement_table' }
let(:archived_table) { '_test_archived_table' }
let(:original_sequence) { "#{original_table}_id_seq" }
let(:original_primary_key) { "#{original_table}_pkey" }
let(:replacement_primary_key) { "#{replacement_table}_pkey" }
let(:archived_primary_key) { "#{archived_table}_pkey" }
before do
connection.execute(<<~SQL)
CREATE TABLE #{original_table} (
id serial NOT NULL PRIMARY KEY,
original_column text NOT NULL,
created_at timestamptz NOT NULL);
CREATE TABLE #{replacement_table} (
id int NOT NULL,
replacement_column text NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY (id, created_at))
PARTITION BY RANGE (created_at);
CREATE TABLE #{replacement_table}_202001 PARTITION OF #{replacement_table}
FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
SQL
end
it 'archives the old table and puts the replacement in its place' do
expect(table_type(original_table)).to eq('normal')
expect(table_type(replacement_table)).to eq('partitioned')
expect(table_type(archived_table)).to be_nil
subject.perform
expect(table_type(original_table)).to eq('partitioned')
expect(table_type(archived_table)).to eq('normal')
expect(table_type(replacement_table)).to be_nil
end
it 'transfers the primary key sequence to the replacement table' do
expect(sequence_owned_by(original_table, 'id')).to eq(original_sequence)
expect(default_expression_for(original_table, 'id')).to eq("nextval('#{original_sequence}'::regclass)")
expect(sequence_owned_by(replacement_table, 'id')).to be_nil
expect(default_expression_for(replacement_table, 'id')).to be_nil
subject.perform
expect(sequence_owned_by(original_table, 'id')).to eq(original_sequence)
expect(default_expression_for(original_table, 'id')).to eq("nextval('#{original_sequence}'::regclass)")
expect(sequence_owned_by(archived_table, 'id')).to be_nil
expect(default_expression_for(archived_table, 'id')).to be_nil
end
it 'renames the primary key constraints to match the new table names' do
expect(primary_key_constraint_name(original_table)).to eq(original_primary_key)
expect(primary_key_constraint_name(replacement_table)).to eq(replacement_primary_key)
subject.perform
expect(primary_key_constraint_name(original_table)).to eq(original_primary_key)
expect(primary_key_constraint_name(archived_table)).to eq(archived_primary_key)
end
def connection
ActiveRecord::Base.connection
end
end
......@@ -5,6 +5,7 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers do
include PartitioningHelpers
include TriggerHelpers
include TableSchemaHelpers
let(:migration) do
ActiveRecord::Migration.new.extend(described_class)
......@@ -629,6 +630,68 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHe
end
end
describe '#replace_with_partitioned_table' do
let(:archived_table) { "#{source_table}_archived" }
before do
migration.partition_table_by_date source_table, partition_column, min_date: min_date, max_date: max_date
end
it 'replaces the original table with the partitioned table' do
expect(table_type(source_table)).to eq('normal')
expect(table_type(partitioned_table)).to eq('partitioned')
expect(table_type(archived_table)).to be_nil
migration.replace_with_partitioned_table source_table
expect(table_type(source_table)).to eq('partitioned')
expect(table_type(archived_table)).to eq('normal')
expect(table_type(partitioned_table)).to be_nil
end
it 'moves the trigger from the original table to the new table' do
expect_function_to_exist(function_name)
expect_valid_function_trigger(source_table, trigger_name, function_name, after: %w[delete insert update])
migration.replace_with_partitioned_table source_table
expect_function_to_exist(function_name)
expect_valid_function_trigger(source_table, trigger_name, function_name, after: %w[delete insert update])
end
end
describe '#rollback_replace_with_partitioned_table' do
let(:archived_table) { "#{source_table}_archived" }
before do
migration.partition_table_by_date source_table, partition_column, min_date: min_date, max_date: max_date
migration.replace_with_partitioned_table source_table
end
it 'replaces the partitioned table with the non-partitioned table' do
expect(table_type(source_table)).to eq('partitioned')
expect(table_type(archived_table)).to eq('normal')
expect(table_type(partitioned_table)).to be_nil
migration.rollback_replace_with_partitioned_table source_table
expect(table_type(source_table)).to eq('normal')
expect(table_type(partitioned_table)).to eq('partitioned')
expect(table_type(archived_table)).to be_nil
end
it 'moves the trigger from the partitioned table to the non-partitioned table' do
expect_function_to_exist(function_name)
expect_valid_function_trigger(source_table, trigger_name, function_name, after: %w[delete insert update])
migration.rollback_replace_with_partitioned_table source_table
expect_function_to_exist(function_name)
expect_valid_function_trigger(source_table, trigger_name, function_name, after: %w[delete insert update])
end
end
def filter_columns_by_name(columns, names)
columns.reject { |c| names.include?(c.name) }
end
......
# frozen_string_literal: true
module TableSchemaHelpers
def table_type(name)
connection.select_value(<<~SQL)
SELECT
CASE class.relkind
WHEN 'r' THEN 'normal'
WHEN 'p' THEN 'partitioned'
ELSE 'other'
END as table_type
FROM pg_catalog.pg_class class
WHERE class.relname = '#{name}'
SQL
end
def sequence_owned_by(table_name, column_name)
connection.select_value(<<~SQL)
SELECT
sequence.relname as name
FROM pg_catalog.pg_class as sequence
INNER JOIN pg_catalog.pg_depend depend
ON depend.objid = sequence.relfilenode
INNER JOIN pg_catalog.pg_class class
ON class.relfilenode = depend.refobjid
INNER JOIN pg_catalog.pg_attribute attribute
ON attribute.attnum = depend.refobjsubid
AND attribute.attrelid = depend.refobjid
WHERE class.relname = '#{table_name}'
AND attribute.attname = '#{column_name}'
SQL
end
def default_expression_for(table_name, column_name)
connection.select_value(<<~SQL)
SELECT
pg_get_expr(attrdef.adbin, attrdef.adrelid) AS default_value
FROM pg_catalog.pg_attribute attribute
INNER JOIN pg_catalog.pg_attrdef attrdef
ON attribute.attrelid = attrdef.adrelid
AND attribute.attnum = attrdef.adnum
WHERE attribute.attrelid = '#{table_name}'::regclass
AND attribute.attname = '#{column_name}'
SQL
end
def primary_key_constraint_name(table_name)
connection.select_value(<<~SQL)
SELECT
conname AS constraint_name
FROM pg_catalog.pg_constraint
WHERE conrelid = '#{table_name}'::regclass
AND contype = 'p'
SQL
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment