Commit a6394f15 authored by Yannis Roussos's avatar Yannis Roussos Committed by Mayra Cabrera

Initialize conversion of events.id to bigint

- Add new temporary column id_convert_to_bigint to
  events table
- Add triggers to keep events.id_convert_to_bigint
  in sync with events.id
- Create batched background migration with job
  CopyColumnUsingBackgroundMigrationJob to backfill
  events.id_convert_to_bigint
- Add new tenmporary column event_id_convert_to_bigint to
  push_event_payloads table
- Add triggers to keep push_event_payloads.event_id_convert_to_bigint
  in sync with push_event_payloads.event_id
- Create batched background migration with job
  CopyColumnUsingBackgroundMigrationJob jobs to backfill
  push_event_payloads.event_id_convert_to_bigint
parent 5f62cd3e
......@@ -187,6 +187,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:database_batched_background_migration
:feature_category: :database
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:environments_auto_stop_cron
:feature_category: :continuous_delivery
:has_external_dependencies:
......
# frozen_string_literal: true
module Database
class BatchedBackgroundMigrationWorker
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :database
idempotent!
def perform
return unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops) && active_migration
with_exclusive_lease(active_migration.interval) do
# Now that we have the exclusive lease, reload migration in case another process has changed it.
# This is a temporary solution until we have better concurrency handling around job execution
#
# We also have to disable this cop, because ApplicationRecord aliases reset to reload, but our database
# models don't inherit from ApplicationRecord
active_migration.reload # rubocop:disable Cop/ActiveRecordAssociationReload
run_active_migration if active_migration.active? && active_migration.interval_elapsed?
end
end
private
def active_migration
@active_migration ||= Gitlab::Database::BackgroundMigration::BatchedMigration.active_migration
end
def run_active_migration
Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new.run_migration_job(active_migration)
end
def with_exclusive_lease(timeout)
lease = Gitlab::ExclusiveLease.new(lease_key, timeout: timeout * 2)
yield if lease.try_obtain
ensure
lease&.cancel
end
def lease_key
self.class.name.demodulize.underscore
end
end
end
---
title: Initialize conversion of events.id to bigint, and add execute_batched_migrations_on_schedule feature flag to control scheduled background migrations
merge_request: 51332
author:
type: other
---
name: execute_batched_migrations_on_schedule
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/51332
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/326241
milestone: '13.11'
type: ops
group: group::database
default_enabled: false
......@@ -565,6 +565,9 @@ Gitlab.com do
Settings.cron_jobs['namespaces_in_product_marketing_emails_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['namespaces_in_product_marketing_emails_worker']['cron'] ||= '0 9 * * *'
Settings.cron_jobs['namespaces_in_product_marketing_emails_worker']['job_class'] = 'Namespaces::InProductMarketingEmailsWorker'
Settings.cron_jobs['batched_background_migrations_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['batched_background_migrations_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['batched_background_migrations_worker']['job_class'] = 'Database::BatchedBackgroundMigrationWorker'
end
Gitlab.ee do
......
# frozen_string_literal: true
class AddMetricsToBatchedBackgroundMigrationJobs < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :batched_background_migration_jobs, :metrics, :jsonb, null: false, default: {}
end
end
# frozen_string_literal: true
class InitializeConversionOfEventsIdToBigint < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
# Initialize the conversion of events.id to bigint
# Primary Key of the Events table
initialize_conversion_of_integer_to_bigint :events, :id
end
def down
trigger_name = rename_trigger_name(:events, :id, :id_convert_to_bigint)
remove_rename_triggers_for_postgresql :events, trigger_name
remove_column :events, :id_convert_to_bigint
end
end
# frozen_string_literal: true
class InitializeConversionOfPushEventPayloadsEventIdToBigint < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
# Foreign key that references events.id
# Also Primary key of the push_event_payloads table
initialize_conversion_of_integer_to_bigint :push_event_payloads, :event_id, primary_key: :event_id
end
def down
trigger_name = rename_trigger_name(:push_event_payloads, :event_id, :event_id_convert_to_bigint)
remove_rename_triggers_for_postgresql :push_event_payloads, trigger_name
remove_column :push_event_payloads, :event_id_convert_to_bigint
end
end
# frozen_string_literal: true
class BackfillEventsIdForBigintConversion < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
return unless Gitlab.dev_env_or_com?
backfill_conversion_of_integer_to_bigint :events, :id, batch_size: 15000, sub_batch_size: 100
end
def down
return unless Gitlab.dev_env_or_com?
Gitlab::Database::BackgroundMigration::BatchedMigration
.where(job_class_name: 'CopyColumnUsingBackgroundMigrationJob')
.where(table_name: 'events', column_name: 'id')
.where('job_arguments = ?', %w[id id_convert_to_bigint].to_json)
.delete_all
end
end
# frozen_string_literal: true
class BackfillPushEventPayloadEventIdForBigintConversion < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
return unless Gitlab.dev_env_or_com?
backfill_conversion_of_integer_to_bigint :push_event_payloads, :event_id, primary_key: :event_id,
batch_size: 15000, sub_batch_size: 100
end
def down
return unless Gitlab.dev_env_or_com?
Gitlab::Database::BackgroundMigration::BatchedMigration
.where(job_class_name: 'CopyColumnUsingBackgroundMigrationJob')
.where(table_name: 'push_event_payloads', column_name: 'event_id')
.where('job_arguments = ?', %w[event_id event_id_convert_to_bigint].to_json)
.delete_all
end
end
2ad45eaf6589600d9aadd225b55451d9213a4d858ef2717b7151062f1db225c8
\ No newline at end of file
3486452547ffa5da3e12837d2f184e356c90fdd1f016f85144a1ba4865825e87
\ No newline at end of file
e169ea265b942f636b2386a432e04d9dfccdc95f04113400d44ce59e81537843
\ No newline at end of file
b7af086a68c530dd528c4ceaf4bca8d04951c0f234f75a09922aa392bb17a796
\ No newline at end of file
4715c46f5d76c8eb3a206ad3bbcc94a8c13d1d6a66a7824dba400b0aa49c8aa6
\ No newline at end of file
......@@ -150,6 +150,24 @@ $$;
COMMENT ON FUNCTION table_sync_function_2be879775d() IS 'Partitioning migration: table sync for audit_events table';
CREATE FUNCTION trigger_07c94931164e() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW."event_id_convert_to_bigint" := NEW."event_id";
RETURN NEW;
END;
$$;
CREATE FUNCTION trigger_69523443cc10() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW."id_convert_to_bigint" := NEW."id";
RETURN NEW;
END;
$$;
CREATE TABLE audit_events (
id bigint NOT NULL,
author_id integer NOT NULL,
......@@ -9854,7 +9872,8 @@ CREATE TABLE batched_background_migration_jobs (
batch_size integer NOT NULL,
sub_batch_size integer NOT NULL,
status smallint DEFAULT 0 NOT NULL,
attempts smallint DEFAULT 0 NOT NULL
attempts smallint DEFAULT 0 NOT NULL,
metrics jsonb DEFAULT '{}'::jsonb NOT NULL
);
CREATE SEQUENCE batched_background_migration_jobs_id_seq
......@@ -12507,6 +12526,7 @@ CREATE TABLE events (
target_type character varying,
group_id bigint,
fingerprint bytea,
id_convert_to_bigint bigint DEFAULT 0 NOT NULL,
CONSTRAINT check_97e06e05ad CHECK ((octet_length(fingerprint) <= 128))
);
......@@ -16914,7 +16934,8 @@ CREATE TABLE push_event_payloads (
commit_to bytea,
ref text,
commit_title character varying(70),
ref_count integer
ref_count integer,
event_id_convert_to_bigint bigint DEFAULT 0 NOT NULL
);
CREATE TABLE push_rules (
......@@ -24618,6 +24639,10 @@ CREATE TRIGGER table_sync_trigger_b99eb6998c AFTER INSERT OR DELETE OR UPDATE ON
CREATE TRIGGER table_sync_trigger_ee39a25f9d AFTER INSERT OR DELETE OR UPDATE ON audit_events FOR EACH ROW EXECUTE PROCEDURE table_sync_function_2be879775d();
CREATE TRIGGER trigger_07c94931164e BEFORE INSERT OR UPDATE ON push_event_payloads FOR EACH ROW EXECUTE PROCEDURE trigger_07c94931164e();
CREATE TRIGGER trigger_69523443cc10 BEFORE INSERT OR UPDATE ON events FOR EACH ROW EXECUTE PROCEDURE trigger_69523443cc10();
CREATE TRIGGER trigger_has_external_issue_tracker_on_delete AFTER DELETE ON services FOR EACH ROW WHEN ((((old.category)::text = 'issue_tracker'::text) AND (old.active = true) AND (old.project_id IS NOT NULL))) EXECUTE PROCEDURE set_has_external_issue_tracker();
CREATE TRIGGER trigger_has_external_issue_tracker_on_insert AFTER INSERT ON services FOR EACH ROW WHEN ((((new.category)::text = 'issue_tracker'::text) AND (new.active = true) AND (new.project_id IS NOT NULL))) EXECUTE PROCEDURE set_has_external_issue_tracker();
......@@ -34,12 +34,18 @@ module Gitlab
parent_batch_relation = relation_scoped_to_range(batch_table, batch_column, start_id, end_id)
parent_batch_relation.each_batch(column: batch_column, of: sub_batch_size) do |sub_batch|
batch_metrics.time_operation(:update_all) do
sub_batch.update_all("#{quoted_copy_to}=#{quoted_copy_from}")
end
sleep(PAUSE_SECONDS)
end
end
def batch_metrics
@batch_metrics ||= Gitlab::Database::BackgroundMigration::BatchMetrics.new
end
private
def connection
......
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class BatchMetrics
attr_reader :timings
def initialize
@timings = {}
end
def time_operation(label)
start_time = monotonic_time
yield
timings_for_label(label) << monotonic_time - start_time
end
private
def timings_for_label(label)
timings[label] ||= []
end
def monotonic_time
Gitlab::Metrics::System.monotonic_time
end
end
end
end
end
......@@ -23,6 +23,10 @@ module Gitlab
finished: 3
}
def self.active_migration
active.queue_order.first
end
def interval_elapsed?
last_job.nil? || last_job.created_at <= Time.current - interval
end
......
......@@ -8,6 +8,16 @@ module Gitlab
@migration_wrapper = migration_wrapper
end
# Runs the next batched_job for a batched_background_migration.
#
# The batch bounds of the next job are calculated at runtime, based on the migration
# configuration and the bounds of the most recently created batched_job. Updating the
# migration configuration will cause future jobs to use the updated batch sizes.
#
# The job instance will automatically receive a set of arguments based on the migration
# configuration. For more details, see the BatchedMigrationWrapper class.
#
# Note that this method is primarily intended to called by a scheduled worker.
def run_migration_job(active_migration)
if next_batched_job = create_next_batched_job!(active_migration)
migration_wrapper.perform(next_batched_job)
......@@ -16,7 +26,15 @@ module Gitlab
end
end
# Runs all remaining batched_jobs for a batched_background_migration.
#
# This method is intended to be used in a test/dev environment to execute the background
# migration inline. It should NOT be used in a real environment for any non-trivial migrations.
def run_entire_migration(migration)
unless Rails.env.development? || Rails.env.test?
raise 'this method is not intended for use in real environments'
end
while migration.active?
run_migration_job(migration)
......
......@@ -4,6 +4,13 @@ module Gitlab
module Database
module BackgroundMigration
class BatchedMigrationWrapper
# Wraps the execution of a batched_background_migration.
#
# Updates the job's tracking records with the status of the migration
# when starting and finishing execution, and optionally saves batch_metrics
# the migration provides, if any are given.
#
# The job's batch_metrics are serialized to JSON for storage.
def perform(batch_tracking_record)
start_tracking_execution(batch_tracking_record)
......@@ -34,6 +41,10 @@ module Gitlab
tracking_record.migration_column_name,
tracking_record.sub_batch_size,
*tracking_record.migration_job_arguments)
if job_instance.respond_to?(:batch_metrics)
tracking_record.metrics = job_instance.batch_metrics
end
end
def finish_tracking_execution(tracking_record)
......
......@@ -265,6 +265,7 @@ excluded_attributes:
- :issue_id
push_event_payload:
- :event_id
- :event_id_convert_to_bigint
project_badges:
- :group_id
resource_label_events:
......@@ -287,6 +288,7 @@ excluded_attributes:
- :label_id
events:
- :target_id
- :id_convert_to_bigint
timelogs:
- :issue_id
- :merge_request_id
......
......@@ -64,5 +64,13 @@ RSpec.describe Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJo
expect(test_table.where('name is NULL and name_convert_to_text is NULL').pluck(:id)).to contain_exactly(15)
expect(test_table.where("name_convert_to_text = 'no name'").count).to eq(0)
end
it 'tracks timings of queries' do
expect(subject.batch_metrics.timings).to be_empty
subject.perform(10, 20, table_name, 'id', sub_batch_size, 'name', 'name_convert_to_text')
expect(subject.batch_metrics.timings[:update_all]).not_to be_empty
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchMetrics do
let(:batch_metrics) { described_class.new }
describe '#time_operation' do
it 'tracks the duration of the operation using monotonic time' do
expect(batch_metrics.timings).to be_empty
expect(Gitlab::Metrics::System).to receive(:monotonic_time)
.exactly(6).times
.and_return(0.0, 111.0, 200.0, 290.0, 300.0, 410.0)
batch_metrics.time_operation(:my_label) do
# some operation
end
batch_metrics.time_operation(:my_other_label) do
# some operation
end
batch_metrics.time_operation(:my_label) do
# some operation
end
expect(batch_metrics.timings).to eq(my_label: [111.0, 110.0], my_other_label: [90.0])
end
end
end
......@@ -139,6 +139,19 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
end
describe '#run_entire_migration' do
context 'when not in a development or test environment' do
it 'raises an error' do
environment = double('environment', development?: false, test?: false)
migration = build(:batched_background_migration, :finished)
allow(Rails).to receive(:env).and_return(environment)
expect do
runner.run_entire_migration(migration)
end.to raise_error('this method is not intended for use in real environments')
end
end
context 'when the given migration is not active' do
it 'does not create and run migration jobs' do
migration = build(:batched_background_migration, :finished)
......
......@@ -29,6 +29,16 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigration, type: :m
end
end
describe '.active_migration' do
let!(:migration1) { create(:batched_background_migration, :finished) }
let!(:migration2) { create(:batched_background_migration, :active) }
let!(:migration3) { create(:batched_background_migration, :active) }
it 'returns the first active migration according to queue order' do
expect(described_class.active_migration).to eq(migration2)
end
end
describe '#interval_elapsed?' do
context 'when the migration has no last_job' do
let(:batched_migration) { build(:batched_background_migration) }
......
......@@ -9,16 +9,24 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '
let_it_be(:active_migration) { create(:batched_background_migration, :active, job_arguments: [:id, :other_id]) }
let!(:job_record) { create(:batched_background_migration_job, batched_migration: active_migration) }
let(:job_instance) { double('job instance', batch_metrics: {}) }
before do
allow(job_class).to receive(:new).and_return(job_instance)
end
it 'runs the migration job' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
end
migration_wrapper.perform(job_record)
end
it 'updates the the tracking record in the database' do
it 'updates the tracking record in the database' do
test_metrics = { 'my_metris' => 'some value' }
expect(job_instance).to receive(:perform)
expect(job_instance).to receive(:batch_metrics).and_return(test_metrics)
expect(job_record).to receive(:update!).with(hash_including(attempts: 1, status: :running)).and_call_original
freeze_time do
......@@ -29,14 +37,13 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '
expect(reloaded_job_record).not_to be_pending
expect(reloaded_job_record.attempts).to eq(1)
expect(reloaded_job_record.started_at).to eq(Time.current)
expect(reloaded_job_record.metrics).to eq(test_metrics)
end
end
context 'when the migration job does not raise an error' do
it 'marks the tracking record as succeeded' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
end
freeze_time do
migration_wrapper.perform(job_record)
......@@ -51,11 +58,9 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '
context 'when the migration job raises an error' do
it 'marks the tracking record as failed before raising the error' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform)
.with(1, 10, 'events', 'id', 1, 'id', 'other_id')
.and_raise(RuntimeError, 'Something broke!')
end
freeze_time do
expect { migration_wrapper.perform(job_record) }.to raise_error(RuntimeError, 'Something broke!')
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Database::BatchedBackgroundMigrationWorker, '#perform', :clean_gitlab_redis_shared_state do
include ExclusiveLeaseHelpers
let(:worker) { described_class.new }
context 'when the feature flag is disabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: false)
end
it 'does nothing' do
expect(worker).not_to receive(:active_migration)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the feature flag is enabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: true)
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration).and_return(nil)
end
context 'when no active migrations exist' do
it 'does nothing' do
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when active migrations exist' do
let(:lease_key) { 'batched_background_migration_worker' }
let(:migration) { build(:batched_background_migration, :active, interval: 2.minutes) }
before do
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration)
.and_return(migration)
allow(migration).to receive(:interval_elapsed?).and_return(true)
allow(migration).to receive(:reload)
end
context 'when the reloaded migration is no longer active' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: 4.minutes)
expect(migration).to receive(:reload)
expect(migration).to receive(:active?).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the interval has not elapsed' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: 4.minutes)
expect(migration).to receive(:interval_elapsed?).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the reloaded migration is still active and the interval has elapsed' do
it 'runs the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: 4.minutes)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationRunner) do |instance|
expect(instance).to receive(:run_migration_job).with(migration)
end
expect(worker).to receive(:run_active_migration).and_call_original
worker.perform
end
end
it 'always cleans up the exclusive lease' do
lease = stub_exclusive_lease_taken(lease_key, timeout: 4.minutes)
expect(lease).to receive(:try_obtain).and_return(true)
expect(worker).to receive(:run_active_migration).and_raise(RuntimeError, 'I broke')
expect(lease).to receive(:cancel)
expect { worker.perform }.to raise_error(RuntimeError, 'I broke')
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment