Limit the number of concurrent projects that can be synchronized

parent 5c0b6314
......@@ -2,33 +2,73 @@ class GeoRepositorySyncWorker
include Sidekiq::Worker
include CronjobQueue
RUN_TIME = 5.minutes.to_i
BATCH_SIZE = 100
LEASE_KEY = 'geo_repository_sync_worker'.freeze
LEASE_TIMEOUT = 8.hours.freeze
BATCH_SIZE = 1000
BACKOFF_DELAY = 5.minutes
MAX_CAPACITY = 25
RUN_TIME = 60.minutes.to_i
def initialize
@pending_projects = []
@scheduled_jobs = []
end
def perform
return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.primary_node.present?
start_time = Time.now
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
project_ids = interleave(project_ids_not_synced, project_ids_updated_recently)
logger.info "Started Geo repository sync scheduler"
logger.info "Started Geo repository syncing for #{project_ids.length} project(s)"
@start_time = Time.now
project_ids.each do |project_id|
break if over_time?(start_time)
break unless node_enabled?
# Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
try_obtain_lease do
loop do
break unless node_enabled?
Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
end
update_jobs_in_progress
load_pending_projects if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless projects_remain?
schedule_jobs
break if last_batch
sleep(1)
end
logger.info "Finished Geo repository syncing for #{project_ids.length} project(s)"
logger.info "Finished Geo repository sync scheduler"
end
end
private
def reload_queue?
@pending_projects.size < MAX_CAPACITY
end
def projects_remain?
@pending_projects.size
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_projects
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
@pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc)
......@@ -51,8 +91,45 @@ class GeoRepositorySyncWorker
end.flatten(1).uniq.compact.take(BATCH_SIZE)
end
def over_time?(start_time)
Time.now - start_time >= RUN_TIME
def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min
return unless projects_remain?
num_to_schedule.times do
project_id = @pending_projects.shift
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
if job_id
@scheduled_jobs << { id: project_id, job_id: job_id }
end
end
end
def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment