Commit 7bfa86a2 authored by Robert Speicher's avatar Robert Speicher

Merge branch 'geo-base-scheduler-worker' into 'master'

Geo - Extract a base scheduler worker

See merge request !2425
parents 5a97763e 372510c9
module Geo
class BaseSchedulerWorker
include Sidekiq::Worker
include CronjobQueue
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes
MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i
attr_reader :pending_resources, :scheduled_jobs, :start_time
def initialize
@pending_resources = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to schedule (DB_RETRIEVE_BATCH_SIZE) into a pending list.
# 2. Schedule them so that at most MAX_CAPACITY are running at once.
# 3. When a slot frees, schedule another job.
# 4. When we have drained the pending list, load another batch into memory, and schedule the
# remaining jobs, excluding ones in progress.
# 5. Quit when we have scheduled all jobs or exceeded MAX_RUNTIME.
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
log_info('Started scheduler')
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule jobs
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
@pending_resources = load_pending_resources if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless resources_remain?
schedule_jobs
break if last_batch
break unless renew_lease!
sleep(1)
end
log_info('Finished scheduler')
end
end
private
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def lease_timeout
LEASE_TIMEOUT
end
def max_capacity
MAX_CAPACITY
end
def run_time
RUN_TIME
end
def reload_queue?
pending_resources.size < max_capacity
end
def resources_remain?
pending_resources.size > 0
end
def over_time?
Time.now - start_time >= run_time
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(db_retrieve_batch_size)
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def schedule_jobs
num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min
return unless resources_remain?
num_to_schedule.times do
job = schedule_job(*pending_resources.shift)
scheduled_jobs << job if job&.fetch(:job_id).present?
end
end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
log_error('Cannot obtain an exclusive lease. There must be another worker already in execution.')
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: lease_timeout)
end
def renew_lease!
exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
def log_info(message)
Gitlab::Geo::Logger.info(class: self.class.name, message: message)
end
def log_error(message)
Gitlab::Geo::Logger.error(class: self.class.name, message: message)
end
end
end
module Geo
class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker
private
def schedule_job(object_db_id, object_type)
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
def load_pending_resources
lfs_object_ids = find_lfs_object_ids(db_retrieve_batch_size)
objects_ids = find_object_ids(db_retrieve_batch_size)
interleave(lfs_object_ids, objects_ids)
end
def find_object_ids(limit)
downloaded_ids = find_downloaded_ids([:attachment, :avatar, :file])
Upload.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').downcase] }
end
def find_lfs_object_ids(limit)
downloaded_ids = find_downloaded_ids([:lfs])
LfsObject.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_downloaded_ids(file_types)
downloaded_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(downloaded_ids + scheduled_file_ids(file_types)).uniq
end
def scheduled_file_ids(types)
scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end
end
end
......@@ -16,8 +16,13 @@ module Geo
Geo::RepositorySyncService.new(project).execute if sync_repository?(registry, scheduled_time)
Geo::WikiSyncService.new(project).execute if sync_wiki?(registry, scheduled_time)
rescue ActiveRecord::RecordNotFound
logger.error("Couldn't find project with ID=#{project_id}, skipping syncing")
rescue ActiveRecord::RecordNotFound => e
Gitlab::Geo::Logger.error(
class: self.class.name,
message: "Couldn't find project, skipping syncing",
project_id: project_id,
error: e
)
end
private
......
module Geo
class RepositorySyncWorker < Geo::BaseSchedulerWorker
BACKOFF_DELAY = 5.minutes
MAX_CAPACITY = 25
private
def max_capacity
MAX_CAPACITY
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id
end
def load_pending_resources
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc)
.limit(db_retrieve_batch_size)
.pluck(:id)
end
def find_project_ids_updated_recently
Geo::ProjectRegistry.dirty
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at, :desc))
.limit(db_retrieve_batch_size)
.pluck(:project_id)
end
end
end
class GeoFileDownloadDispatchWorker
include Sidekiq::Worker
include CronjobQueue
LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze
LEASE_TIMEOUT = 8.hours.freeze
RUN_TIME = 60.minutes.to_i.freeze
DB_RETRIEVE_BATCH = 1000.freeze
MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize
@pending_downloads = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to download from the primary (DB_RETRIEVE_BATCH) into a pending list.
# 2. Schedule them so that at most MAX_CONCURRENT_DOWNLOADS are running at once.
# 3. When a slot frees, schedule another download.
# 4. When we have drained the pending list, load another batch into memory, and schedule the remaining
# files, excluding ones in progress.
# 5. Quit when we have scheduled all downloads or exceeded an hour.
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule downloads
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless downloads_remain?
schedule_downloads
break if last_batch
sleep(1)
end
end
end
private
def reload_queue?
@pending_downloads.size < MAX_CONCURRENT_DOWNLOADS
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_downloads
lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH)
@pending_downloads = interleave(lfs_object_ids, objects_ids)
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).compact.take(DB_RETRIEVE_BATCH)
end
def downloads_remain?
@pending_downloads.size
end
def schedule_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_downloads.size].min
return unless downloads_remain?
num_to_schedule.times do
object_db_id, object_type = @pending_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
if job_id
@scheduled_jobs << { id: object_db_id, type: object_type, job_id: job_id }
end
end
end
def find_object_ids(limit)
downloaded_ids = find_downloaded_ids([:attachment, :avatar, :file])
Upload.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').downcase] }
end
def find_lfs_object_ids(limit)
downloaded_ids = find_downloaded_ids([:lfs])
LfsObject.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_downloaded_ids(file_types)
downloaded_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(downloaded_ids + scheduled_ids(file_types)).uniq
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def scheduled_ids(types)
@scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
class GeoRepositorySyncWorker
include Sidekiq::Worker
include CronjobQueue
LEASE_KEY = 'geo_repository_sync_worker'.freeze
LEASE_TIMEOUT = 10.minutes
BATCH_SIZE = 1000
BACKOFF_DELAY = 5.minutes
MAX_CAPACITY = 25
RUN_TIME = 60.minutes.to_i
def initialize
@pending_projects = []
@scheduled_jobs = []
end
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.primary_node.present?
logger.info "Started Geo repository sync scheduler"
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_projects if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless projects_remain?
schedule_jobs
break if last_batch
break unless renew_lease!
sleep(1)
end
logger.info "Finished Geo repository sync scheduler"
end
end
private
def reload_queue?
@pending_projects.size < MAX_CAPACITY
end
def projects_remain?
@pending_projects.size > 0
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_projects
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
@pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc)
.limit(BATCH_SIZE)
.pluck(:id)
end
def find_project_ids_updated_recently
Geo::ProjectRegistry.dirty
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at, :desc))
.limit(BATCH_SIZE)
.pluck(:project_id)
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(BATCH_SIZE)
end
def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min
return unless projects_remain?
num_to_schedule.times do
project_id = @pending_projects.shift
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
@scheduled_jobs << { id: project_id, job_id: job_id } if job_id
end
end
def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
logger.info "Cannot obtain an exclusive lease. There must be another worker already in execution."
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
def renew_lease!
exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
......@@ -409,10 +409,10 @@ Settings.cron_jobs['geo_bulk_notify_worker']['cron'] ||= '*/10 * * * * *'
Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWorker'
Settings.cron_jobs['geo_repository_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_sync_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'GeoRepositorySyncWorker'
Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'Geo::RepositorySyncWorker'
Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '5 * * * *'
Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'GeoFileDownloadDispatchWorker'
Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
Settings.cron_jobs['import_export_project_cleanup_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['import_export_project_cleanup_worker']['cron'] ||= '0 * * * *'
Settings.cron_jobs['import_export_project_cleanup_worker']['job_class'] = 'ImportExportProjectCleanupWorker'
......
......@@ -41,7 +41,7 @@ module Gitlab
def self.current_node_enabled?
# No caching of the enabled! If we cache it and an admin disables
# this node, an active GeoRepositorySyncWorker would keep going for up
# this node, an active Geo::RepositorySyncWorker would keep going for up
# to max run time after the node was disabled.
Gitlab::Geo.current_node.reload.enabled?
end
......
require 'spec_helper'
describe GeoFileDownloadDispatchWorker do
describe Geo::FileDownloadDispatchWorker do
before do
@primary = create(:geo_node, :primary, host: 'primary-geo-node')
@secondary = create(:geo_node, :current)
allow(Gitlab::Geo).to receive(:secondary?).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:renew).and_return(true)
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
end
......@@ -48,8 +50,8 @@ describe GeoFileDownloadDispatchWorker do
# 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
stub_const('Geo::BaseSchedulerWorker::MAX_CAPACITY', 2)
avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
create_list(:lfs_object, 2, :with_file)
......@@ -65,7 +67,7 @@ describe GeoFileDownloadDispatchWorker do
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 4.
# 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
expect(subject).to receive(:load_pending_resources).exactly(3).times.and_call_original
Sidekiq::Testing.inline! do
subject.perform
......
require 'spec_helper'
describe GeoRepositorySyncWorker do
describe Geo::RepositorySyncWorker do
let!(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let!(:secondary) { create(:geo_node, :current) }
let!(:project_1) { create(:empty_project) }
......@@ -11,6 +11,7 @@ describe GeoRepositorySyncWorker do
describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
end
it 'performs Geo::ProjectSyncWorker for each project' do
......@@ -46,8 +47,8 @@ describe GeoRepositorySyncWorker do
subject.perform
end
it 'does not perform Geo::ProjectSyncWorker when primary node does not exists' do
allow(Gitlab::Geo).to receive(:primary_node) { nil }
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_in)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment