Commit ee4f7391 authored by Michael Kozono's avatar Michael Kozono

Extract helper for queuing background jobs

parent dbf924c5
......@@ -5,8 +5,6 @@ class DeleteConflictingRedirectRoutes < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
BATCH_SIZE = 1000 # Number of rows to process per job
JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time
MIGRATION = 'DeleteConflictingRedirectRoutesRange'.freeze
disable_ddl_transaction!
......@@ -18,11 +16,9 @@ class DeleteConflictingRedirectRoutes < ActiveRecord::Migration
end
def up
jobs = []
say opening_message
queue_background_migration_jobs(Route, MIGRATION)
queue_background_migration_jobs_by_range(Route, MIGRATION)
end
def down
......@@ -36,31 +32,4 @@ class DeleteConflictingRedirectRoutes < ActiveRecord::Migration
https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/13357
MSG
end
def queue_background_migration_jobs(model_class, job_class_name, batch_size = BATCH_SIZE)
jobs = []
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck('MIN(id), MAX(id)').first
# Note: This conditional will only be true if JOB_BUFFER_SIZE * batch_size < (total number of rows)
if jobs.length >= JOB_BUFFER_SIZE
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
bulk_queue_jobs(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
bulk_queue_jobs(jobs) unless jobs.empty?
end
def bulk_queue_jobs(jobs)
say "Queuing #{jobs.size} BackgroundMigrationWorker jobs..."
BackgroundMigrationWorker.perform_bulk(jobs)
end
end
module Gitlab
module Database
module MigrationHelpers
BACKGROUND_MIGRATION_BATCH_SIZE = 1000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time
# Adds `created_at` and `updated_at` columns with timezone information.
#
# This method is an improved version of Rails' built-in method `add_timestamps`.
......@@ -653,6 +656,51 @@ into similar problems in the future (e.g. when new tables are created).
EOF
end
end
# Queues background migration jobs for an entire table, batched by ID range.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size = BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = []
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck('MIN(id), MAX(id)').first
if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE
# Note: This code path generally only helps with many millions of rows
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
end
end
end
end
......@@ -914,4 +914,66 @@ describe Gitlab::Database::MigrationHelpers do
.to raise_error(RuntimeError, /Your database user is not allowed/)
end
end
describe '#queue_background_migration_jobs_by_range', :sidekiq do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
before do
User.class_eval do
include EachBatch
end
end
context 'with enough rows to bulk queue jobs more than once' do
before do
stub_const('Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE', 1)
end
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]])
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2)
end
end
context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.queue_background_migration_jobs_by_range(User, 'FooJob', 2)
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
end
......@@ -6,8 +6,8 @@ describe DeleteConflictingRedirectRoutes, :migration, :sidekiq do
let!(:routes) { table(:routes) }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
stub_const("#{described_class.name}::JOB_BUFFER_SIZE", 2)
stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_BATCH_SIZE", 2)
stub_const("Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE", 2)
routes.create!(id: 1, source_id: 1, source_type: 'Namespace', path: 'foo1')
routes.create!(id: 2, source_id: 2, source_type: 'Namespace', path: 'foo2')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment