Commit c2f72d60 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch '322001-poc-for-migrating-pages-to-zip-storage-in-the-background' into 'master'

Migrate GitLab Pages to zip storage in the background

See merge request gitlab-org/gitlab!54578
parents 68a4fcce 08cec96c
...@@ -2,10 +2,8 @@ ...@@ -2,10 +2,8 @@
module Pages module Pages
class MigrateFromLegacyStorageService class MigrateFromLegacyStorageService
def initialize(logger, migration_threads:, batch_size:, ignore_invalid_entries:, mark_projects_as_not_deployed:) def initialize(logger, ignore_invalid_entries:, mark_projects_as_not_deployed:)
@logger = logger @logger = logger
@migration_threads = migration_threads
@batch_size = batch_size
@ignore_invalid_entries = ignore_invalid_entries @ignore_invalid_entries = ignore_invalid_entries
@mark_projects_as_not_deployed = mark_projects_as_not_deployed @mark_projects_as_not_deployed = mark_projects_as_not_deployed
...@@ -14,25 +12,35 @@ module Pages ...@@ -14,25 +12,35 @@ module Pages
@counters_lock = Mutex.new @counters_lock = Mutex.new
end end
def execute def execute_with_threads(threads:, batch_size:)
@queue = SizedQueue.new(1) @queue = SizedQueue.new(1)
threads = start_migration_threads migration_threads = start_migration_threads(threads)
ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: @batch_size) do |batch| ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: batch_size) do |batch|
@queue.push(batch) @queue.push(batch)
end end
@queue.close @queue.close
@logger.info("Waiting for threads to finish...") @logger.info(message: "Pages legacy storage migration: Waiting for threads to finish...")
threads.each(&:join) migration_threads.each(&:join)
{ migrated: @migrated, errored: @errored } { migrated: @migrated, errored: @errored }
end end
def start_migration_threads def execute_for_batch(project_ids)
Array.new(@migration_threads) do batch = ProjectPagesMetadatum.only_on_legacy_storage.where(project_id: project_ids) # rubocop: disable CodeReuse/ActiveRecord
process_batch(batch)
{ migrated: @migrated, errored: @errored }
end
private
def start_migration_threads(count)
Array.new(count) do
Thread.new do Thread.new do
while batch = @queue.pop while batch = @queue.pop
Rails.application.executor.wrap do Rails.application.executor.wrap do
...@@ -50,12 +58,12 @@ module Pages ...@@ -50,12 +58,12 @@ module Pages
migrate_project(project) migrate_project(project)
end end
@logger.info("#{@migrated} projects are migrated successfully, #{@errored} projects failed to be migrated") @logger.info(message: "Pages legacy storage migration: batch processed", migrated: @migrated, errored: @errored)
rescue => e rescue => e
# This method should never raise exception otherwise all threads might be killed # This method should never raise exception otherwise all threads might be killed
# and this will result in queue starving (and deadlock) # and this will result in queue starving (and deadlock)
Gitlab::ErrorTracking.track_exception(e) Gitlab::ErrorTracking.track_exception(e)
@logger.error("failed processing a batch: #{e.message}") @logger.error(message: "Pages legacy storage migration: failed processing a batch: #{e.message}")
end end
def migrate_project(project) def migrate_project(project)
...@@ -67,15 +75,15 @@ module Pages ...@@ -67,15 +75,15 @@ module Pages
end end
if result[:status] == :success if result[:status] == :success
@logger.info("project_id: #{project.id} #{project.pages_path} has been migrated in #{time.round(2)} seconds: #{result[:message]}") @logger.info(message: "Pages legacy storage migration: project migrated: #{result[:message]}", project_id: project.id, pages_path: project.pages_path, duration: time.round(2))
@counters_lock.synchronize { @migrated += 1 } @counters_lock.synchronize { @migrated += 1 }
else else
@logger.error("project_id: #{project.id} #{project.pages_path} failed to be migrated in #{time.round(2)} seconds: #{result[:message]}") @logger.error(message: "Pages legacy storage migration: project failed to be migrated: #{result[:message]}", project_id: project.id, pages_path: project.pages_path, duration: time.round(2))
@counters_lock.synchronize { @errored += 1 } @counters_lock.synchronize { @errored += 1 }
end end
rescue => e rescue => e
@counters_lock.synchronize { @errored += 1 } @counters_lock.synchronize { @errored += 1 }
@logger.error("project_id: #{project&.id} #{project&.pages_path} failed to be migrated: #{e.message}") @logger.error(message: "Pages legacy storage migration: project failed to be migrated: #{result[:message]}", project_id: project&.id, pages_path: project&.pages_path)
Gitlab::ErrorTracking.track_exception(e, project_id: project&.id) Gitlab::ErrorTracking.track_exception(e, project_id: project&.id)
end end
end end
......
---
title: Automatically try to migrate gitlab pages to zip storage
merge_request: 54578
author:
type: added
# frozen_string_literal: true
class ScheduleMigratePagesToZipStorage < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
MIGRATION = 'MigratePagesToZipStorage'
BATCH_SIZE = 10
BATCH_TIME = 5.minutes
disable_ddl_transaction!
class ProjectPagesMetadatum < ActiveRecord::Base
extend SuppressCompositePrimaryKeyWarning
include EachBatch
self.primary_key = :project_id
self.table_name = 'project_pages_metadata'
self.inheritance_column = :_type_disabled
scope :deployed, -> { where(deployed: true) }
scope :only_on_legacy_storage, -> { deployed.where(pages_deployment_id: nil) }
end
def up
queue_background_migration_jobs_by_range_at_intervals(
ProjectPagesMetadatum.only_on_legacy_storage,
MIGRATION,
BATCH_TIME,
batch_size: BATCH_SIZE,
primary_column_name: :project_id
)
end
end
7c562d43801c18af48dc526dc6574aebd11689b62bad864b107580d341ba64a1
\ No newline at end of file
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# migrates pages from legacy storage to zip format
# we intentionally use application code here because
# it has a lot of dependencies including models, carrierwave uploaders and service objects
# and copying all or part of this code in the background migration doesn't add much value
# see https://gitlab.com/gitlab-org/gitlab/-/merge_requests/54578 for discussion
class MigratePagesToZipStorage
def perform(start_id, stop_id)
::Pages::MigrateFromLegacyStorageService.new(Gitlab::AppLogger,
ignore_invalid_entries: false,
mark_projects_as_not_deployed: false)
.execute_for_batch(start_id..stop_id)
end
end
end
end
...@@ -9,10 +9,9 @@ namespace :gitlab do ...@@ -9,10 +9,9 @@ namespace :gitlab do
logger.info('Starting to migrate legacy pages storage to zip deployments') logger.info('Starting to migrate legacy pages storage to zip deployments')
result = ::Pages::MigrateFromLegacyStorageService.new(logger, result = ::Pages::MigrateFromLegacyStorageService.new(logger,
migration_threads: migration_threads,
batch_size: batch_size,
ignore_invalid_entries: ignore_invalid_entries, ignore_invalid_entries: ignore_invalid_entries,
mark_projects_as_not_deployed: mark_projects_as_not_deployed).execute mark_projects_as_not_deployed: mark_projects_as_not_deployed)
.execute_with_threads(threads: migration_threads, batch_size: batch_size)
logger.info("A total of #{result[:migrated] + result[:errored]} projects were processed.") logger.info("A total of #{result[:migrated] + result[:errored]} projects were processed.")
logger.info("- The #{result[:migrated]} projects migrated successfully") logger.info("- The #{result[:migrated]} projects migrated successfully")
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::MigratePagesToZipStorage do
let(:namespace) { create(:group) } # rubocop: disable RSpec/FactoriesInMigrationSpecs
let(:migration) { described_class.new }
describe '#perform' do
context 'when there is project to migrate' do
let!(:project) { create_project('project') }
after do
FileUtils.rm_rf(project.pages_path)
end
it 'migrates project to zip storage' do
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService,
anything,
ignore_invalid_entries: false,
mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute_for_batch).with(project.id..project.id).and_call_original
end
migration.perform(project.id, project.id)
expect(project.reload.pages_metadatum.pages_deployment.file.filename).to eq("_migrated.zip")
end
end
end
def create_project(path)
project = create(:project) # rubocop: disable RSpec/FactoriesInMigrationSpecs
project.mark_pages_as_deployed
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
project
end
end
# frozen_string_literal: true
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20210302150310_schedule_migrate_pages_to_zip_storage.rb')
RSpec.describe ScheduleMigratePagesToZipStorage, :sidekiq_might_not_need_inline, schema: 20201231133921 do
let(:migration_class) { described_class::MIGRATION }
let(:migration_name) { migration_class.to_s.demodulize }
let(:namespaces_table) { table(:namespaces) }
let(:projects_table) { table(:projects) }
let(:metadata_table) { table(:project_pages_metadata) }
let(:deployments_table) { table(:pages_deployments) }
let(:namespace) { namespaces_table.create!(path: "group", name: "group") }
def create_project_metadata(path, deployed, with_deployment)
project = projects_table.create!(path: path, namespace_id: namespace.id)
deployment_id = nil
if with_deployment
deployment_id = deployments_table.create!(project_id: project.id, file_store: 1, file: '1', file_count: 1, file_sha256: '123', size: 1).id
end
metadata_table.create!(project_id: project.id, deployed: deployed, pages_deployment_id: deployment_id)
end
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
freeze_time do
create_project_metadata("not-deployed-project", false, false)
first_id = create_project_metadata("project1", true, false).id
last_id = create_project_metadata("project2", true, false).id
create_project_metadata("project-with-deployment", true, true)
migrate!
expect(migration_name).to be_scheduled_delayed_migration(5.minutes, first_id, last_id)
expect(BackgroundMigrationWorker.jobs.size).to eq(1)
end
end
end
end
...@@ -5,104 +5,133 @@ require 'spec_helper' ...@@ -5,104 +5,133 @@ require 'spec_helper'
RSpec.describe Pages::MigrateFromLegacyStorageService do RSpec.describe Pages::MigrateFromLegacyStorageService do
let(:batch_size) { 10 } let(:batch_size) { 10 }
let(:mark_projects_as_not_deployed) { false } let(:mark_projects_as_not_deployed) { false }
let(:service) { described_class.new(Rails.logger, migration_threads: 3, batch_size: batch_size, ignore_invalid_entries: false, mark_projects_as_not_deployed: mark_projects_as_not_deployed) } let(:service) { described_class.new(Rails.logger, ignore_invalid_entries: false, mark_projects_as_not_deployed: mark_projects_as_not_deployed) }
it 'does not try to migrate pages if pages are not deployed' do shared_examples "migrates projects properly" do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new) it 'does not try to migrate pages if pages are not deployed' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 0, errored: 0) is_expected.to eq(migrated: 0, errored: 0)
end end
context 'when there is work for multiple threads' do context 'when pages are marked as deployed' do
let(:batch_size) { 2 } # override to force usage of multiple threads let(:project) { create(:project) }
it 'uses multiple threads' do before do
projects = create_list(:project, 20)
projects.each do |project|
project.mark_pages_as_deployed project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do
context 'when mark_projects_as_not_deployed is set' do
let(:mark_projects_as_not_deployed) { true }
it 'counts project as migrated' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: true) do |service|
expect(service).to receive(:execute).and_call_original
end
is_expected.to eq(migrated: 1, errored: 0)
end
end
it 'counts project as errored' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original
end
FileUtils.mkdir_p File.join(project.pages_path, "public") is_expected.to eq(migrated: 0, errored: 1)
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end end
end end
threads = Concurrent::Set.new context 'when pages directory exists on disk' do
before do
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original
end
expect(service).to receive(:migrate_project).exactly(20).times.and_wrap_original do |m, *args| expect(project.pages_metadatum.reload.pages_deployment).to eq(nil)
threads.add(Thread.current) expect(subject).to eq(migrated: 1, errored: 0)
expect(project.pages_metadatum.reload.pages_deployment).to be
end
# sleep to be 100% certain that once thread can't consume all the queue context 'when deployed already exists for the project' do
# it works without it, but I want to avoid making this test flaky before do
sleep(0.01) deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment)
end
m.call(*args) it 'does not try to migrate project' do
end expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 20, errored: 0) is_expected.to eq(migrated: 0, errored: 0)
expect(threads.length).to eq(3) end
end
end
end end
end end
context 'when pages are marked as deployed' do describe '#execute_with_threads' do
let(:project) { create(:project) } subject { service.execute_with_threads(threads: 3, batch_size: batch_size) }
before do include_examples "migrates projects properly"
project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do context 'when there is work for multiple threads' do
context 'when mark_projects_as_not_deployed is set' do let(:batch_size) { 2 } # override to force usage of multiple threads
let(:mark_projects_as_not_deployed) { true }
it 'counts project as migrated' do it 'uses multiple threads' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: true) do |service| projects = create_list(:project, 20)
expect(service).to receive(:execute).and_call_original projects.each do |project|
end project.mark_pages_as_deployed
expect(service.execute).to eq(migrated: 1, errored: 0) FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end end
end
it 'counts project as errored' do threads = Concurrent::Set.new
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original
end
expect(service.execute).to eq(migrated: 0, errored: 1) expect(service).to receive(:migrate_project).exactly(20).times.and_wrap_original do |m, *args|
end threads.add(Thread.current)
end
context 'when pages directory exists on disk' do # sleep to be 100% certain that once thread can't consume all the queue
before do # it works without it, but I want to avoid making this test flaky
FileUtils.mkdir_p File.join(project.pages_path, "public") sleep(0.01)
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do m.call(*args)
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original
end end
expect do is_expected.to eq(migrated: 20, errored: 0)
expect(service.execute).to eq(migrated: 1, errored: 0) expect(threads.length).to eq(3)
end.to change { project.pages_metadatum.reload.pages_deployment }.from(nil)
end end
end
end
context 'when deployed already exists for the project' do describe "#execute_for_batch" do
before do subject { service.execute_for_batch(Project.ids) }
deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment) include_examples "migrates projects properly"
end
it 'only tries to migrate projects with passed ids' do
projects = create_list(:project, 5)
it 'does not try to migrate project' do projects.each(&:mark_pages_as_deployed)
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new) projects_to_migrate = projects.first(3)
expect(service.execute).to eq(migrated: 0, errored: 0) projects_to_migrate.each do |project|
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project, ignore_invalid_entries: false, mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original
end end
end end
expect(service.execute_for_batch(projects_to_migrate.pluck(:id))).to eq(migrated: 0, errored: 3)
end end
end end
end end
...@@ -12,11 +12,9 @@ RSpec.describe 'gitlab:pages' do ...@@ -12,11 +12,9 @@ RSpec.describe 'gitlab:pages' do
it 'calls migration service' do it 'calls migration service' do
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything,
migration_threads: 3,
batch_size: 10,
ignore_invalid_entries: false, ignore_invalid_entries: false,
mark_projects_as_not_deployed: false) do |service| mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute_with_threads).with(threads: 3, batch_size: 10).and_call_original
end end
subject subject
...@@ -26,11 +24,9 @@ RSpec.describe 'gitlab:pages' do ...@@ -26,11 +24,9 @@ RSpec.describe 'gitlab:pages' do
stub_env('PAGES_MIGRATION_THREADS', '5') stub_env('PAGES_MIGRATION_THREADS', '5')
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything,
migration_threads: 5,
batch_size: 10,
ignore_invalid_entries: false, ignore_invalid_entries: false,
mark_projects_as_not_deployed: false) do |service| mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute_with_threads).with(threads: 5, batch_size: 10).and_call_original
end end
subject subject
...@@ -40,11 +36,9 @@ RSpec.describe 'gitlab:pages' do ...@@ -40,11 +36,9 @@ RSpec.describe 'gitlab:pages' do
stub_env('PAGES_MIGRATION_BATCH_SIZE', '100') stub_env('PAGES_MIGRATION_BATCH_SIZE', '100')
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything,
migration_threads: 3,
batch_size: 100,
ignore_invalid_entries: false, ignore_invalid_entries: false,
mark_projects_as_not_deployed: false) do |service| mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute_with_threads).with(threads: 3, batch_size: 100).and_call_original
end end
subject subject
...@@ -54,11 +48,9 @@ RSpec.describe 'gitlab:pages' do ...@@ -54,11 +48,9 @@ RSpec.describe 'gitlab:pages' do
stub_env('PAGES_MIGRATION_IGNORE_INVALID_ENTRIES', 'true') stub_env('PAGES_MIGRATION_IGNORE_INVALID_ENTRIES', 'true')
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything,
migration_threads: 3,
batch_size: 10,
ignore_invalid_entries: true, ignore_invalid_entries: true,
mark_projects_as_not_deployed: false) do |service| mark_projects_as_not_deployed: false) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute_with_threads).with(threads: 3, batch_size: 10).and_call_original
end end
subject subject
...@@ -68,11 +60,9 @@ RSpec.describe 'gitlab:pages' do ...@@ -68,11 +60,9 @@ RSpec.describe 'gitlab:pages' do
stub_env('PAGES_MIGRATION_MARK_PROJECTS_AS_NOT_DEPLOYED', 'true') stub_env('PAGES_MIGRATION_MARK_PROJECTS_AS_NOT_DEPLOYED', 'true')
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything,
migration_threads: 3,
batch_size: 10,
ignore_invalid_entries: false, ignore_invalid_entries: false,
mark_projects_as_not_deployed: true) do |service| mark_projects_as_not_deployed: true) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute_with_threads).with(threads: 3, batch_size: 10).and_call_original
end end
subject subject
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment