Commit c9232087 authored by Michael Kozono's avatar Michael Kozono

Spread out the work a little

parent ee4f7391
...@@ -18,7 +18,7 @@ class DeleteConflictingRedirectRoutes < ActiveRecord::Migration ...@@ -18,7 +18,7 @@ class DeleteConflictingRedirectRoutes < ActiveRecord::Migration
def up def up
say opening_message say opening_message
queue_background_migration_jobs_by_range(Route, MIGRATION) queue_background_migration_jobs_by_range_at_intervals(Route, MIGRATION, 1.minute)
end end
def down def down
......
...@@ -657,7 +657,9 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -657,7 +657,9 @@ into similar problems in the future (e.g. when new tables are created).
end end
end end
# Queues background migration jobs for an entire table, batched by ID range. # Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
# If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`.
# #
# model_class - The table being iterated over # model_class - The table being iterated over
# job_class_name - The background migration job class as a string # job_class_name - The background migration job class as a string
...@@ -670,7 +672,7 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -670,7 +672,7 @@ into similar problems in the future (e.g. when new tables are created).
# self.table_name = 'routes' # self.table_name = 'routes'
# end # end
# #
# queue_background_migration_jobs_by_range(Route, 'ProcessRoutes') # bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
# #
# Where the model_class includes EachBatch, and the background migration exists: # Where the model_class includes EachBatch, and the background migration exists:
# #
...@@ -679,7 +681,7 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -679,7 +681,7 @@ into similar problems in the future (e.g. when new tables are created).
# # do something # # do something
# end # end
# end # end
def queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size = BACKGROUND_MIGRATION_BATCH_SIZE) def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = [] jobs = []
...@@ -701,6 +703,44 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -701,6 +703,44 @@ into similar problems in the future (e.g. when new tables are created).
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
end end
# Queues background migration jobs for an entire table, batched by ID range.
# Each job is scheduled with a `delay_interval` in between.
# If you use a small interval, then some jobs may run at the same time.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute)
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
model_class.each_batch(of: batch_size) do |relation, index|
start_id, end_id = relation.pluck('MIN(id), MAX(id)').first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to
# spread the work over time.
BackgroundMigrationWorker.perform_in(delay_interval * index, job_class_name, [start_id, end_id])
end
end
end end
end end
end end
...@@ -915,7 +915,7 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -915,7 +915,7 @@ describe Gitlab::Database::MigrationHelpers do
end end
end end
describe '#queue_background_migration_jobs_by_range', :sidekiq do describe '#bulk_queue_background_migration_jobs_by_range', :sidekiq do
context 'when the model has an ID column' do context 'when the model has an ID column' do
let!(:id1) { create(:user).id } let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id } let!(:id2) { create(:user).id }
...@@ -934,7 +934,7 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -934,7 +934,7 @@ describe Gitlab::Database::MigrationHelpers do
it 'queues jobs correctly' do it 'queues jobs correctly' do
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]]) expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]]) expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
...@@ -945,14 +945,14 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -945,14 +945,14 @@ describe Gitlab::Database::MigrationHelpers do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]]) expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]]) expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]])
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end end
end end
context 'with not enough rows to bulk queue jobs more than once' do context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do it 'queues jobs correctly' do
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]]) expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]]) expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
...@@ -963,7 +963,67 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -963,7 +963,67 @@ describe Gitlab::Database::MigrationHelpers do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]], expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]]) ['FooJob', [id3, id3]]])
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'without specifying batch_size' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob')
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.bulk_queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals', :sidekiq do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
around do |example|
Timecop.freeze { example.run }
end
before do
User.class_eval do
include EachBatch
end
end
context 'with batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.seconds, batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.seconds.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(20.seconds.from_now.to_f)
end
end
end
context 'without batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.seconds)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.seconds.from_now.to_f)
end
end end
end end
end end
...@@ -971,7 +1031,7 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -971,7 +1031,7 @@ describe Gitlab::Database::MigrationHelpers do
context "when the model doesn't have an ID column" do context "when the model doesn't have an ID column" do
it 'raises error (for now)' do it 'raises error (for now)' do
expect do expect do
model.queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob') model.queue_background_migration_jobs_by_range_at_intervals(ProjectAuthorization, 'FooJob', 10.seconds)
end.to raise_error(StandardError, /does not have an ID/) end.to raise_error(StandardError, /does not have an ID/)
end end
end end
......
...@@ -5,6 +5,10 @@ describe DeleteConflictingRedirectRoutes, :migration, :sidekiq do ...@@ -5,6 +5,10 @@ describe DeleteConflictingRedirectRoutes, :migration, :sidekiq do
let!(:redirect_routes) { table(:redirect_routes) } let!(:redirect_routes) { table(:redirect_routes) }
let!(:routes) { table(:routes) } let!(:routes) { table(:routes) }
around do |example|
Timecop.freeze { example.run }
end
before do before do
stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_BATCH_SIZE", 2) stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_BATCH_SIZE", 2)
stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE", 2) stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE", 2)
...@@ -34,8 +38,11 @@ describe DeleteConflictingRedirectRoutes, :migration, :sidekiq do ...@@ -34,8 +38,11 @@ describe DeleteConflictingRedirectRoutes, :migration, :sidekiq do
migrate! migrate!
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq([described_class::MIGRATION, [1, 2]]) expect(BackgroundMigrationWorker.jobs[0]['args']).to eq([described_class::MIGRATION, [1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(1.minute.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq([described_class::MIGRATION, [3, 4]]) expect(BackgroundMigrationWorker.jobs[1]['args']).to eq([described_class::MIGRATION, [3, 4]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(2.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[2]['args']).to eq([described_class::MIGRATION, [5, 5]]) expect(BackgroundMigrationWorker.jobs[2]['args']).to eq([described_class::MIGRATION, [5, 5]])
expect(BackgroundMigrationWorker.jobs[2]['at']).to eq(3.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs.size).to eq 3 expect(BackgroundMigrationWorker.jobs.size).to eq 3
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment