Commit 71cd24bb authored by Nick Thomas's avatar Nick Thomas

Rework Geo::BaseSchedulerWorker to be clearer and log per-loop information

parent 945598aa
...@@ -8,7 +8,7 @@ module Geo ...@@ -8,7 +8,7 @@ module Geo
MAX_CAPACITY = 10 MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i RUN_TIME = 60.minutes.to_i
attr_reader :pending_resources, :scheduled_jobs, :start_time attr_reader :pending_resources, :scheduled_jobs, :start_time, :loops
def initialize def initialize
@pending_resources = [] @pending_resources = []
...@@ -27,34 +27,43 @@ module Geo ...@@ -27,34 +27,43 @@ module Geo
return unless Gitlab::Geo.geo_database_configured? return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary? return unless Gitlab::Geo.secondary?
log_info('Started scheduler') @start_time = Time.now.utc
@loops = 0
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule jobs # Prevent multiple Sidekiq workers from attempting to schedule jobs
try_obtain_lease do try_obtain_lease do
loop do log_info('Started scheduler')
break unless node_enabled? reason = :unknown
update_jobs_in_progress begin
@pending_resources = load_pending_resources if reload_queue? reason = loop do
break :node_disabled unless node_enabled?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers. update_jobs_in_progress
last_batch = reload_queue? update_pending_resources
break if over_time? break :over_time if over_time?
break unless resources_remain? break :complete unless resources_remain?
schedule_jobs # If we're still under the limit after refreshing from the DB, we
# can end after scheduling the remaining transfers.
break if last_batch last_batch = reload_queue?
break unless renew_lease! schedule_jobs
@loops += 1
sleep(1)
break :last_batch if last_batch
break :lease_lost unless renew_lease!
sleep(1)
end
rescue => err
reason = :error
log_error(err.message)
raise err
ensure
duration = Time.now.utc - start_time
log_info('Finished scheduler', total_loops: loops, duration: duration, reason: reason)
end end
log_info('Finished scheduler')
end end
end end
...@@ -89,7 +98,7 @@ module Geo ...@@ -89,7 +98,7 @@ module Geo
end end
def over_time? def over_time?
Time.now - start_time >= run_time (Time.now.utc - start_time) >= run_time
end end
def interleave(first, second) def interleave(first, second)
...@@ -103,21 +112,28 @@ module Geo ...@@ -103,21 +112,28 @@ module Geo
def update_jobs_in_progress def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids) status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise. # SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ] # For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed. # Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact @scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
def update_pending_resources
@pending_resources = load_pending_resources if reload_queue?
end end
def schedule_jobs def schedule_jobs
num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min
to_schedule = pending_resources.shift(num_to_schedule)
return unless resources_remain? scheduled = to_schedule.map do |args|
job = schedule_job(*args)
job if job&.fetch(:job_id, nil).present?
end.compact
num_to_schedule.times do scheduled_jobs.concat(scheduled)
job = schedule_job(*pending_resources.shift)
scheduled_jobs << job if job&.fetch(:job_id).present? log_info("Loop #{loops}", enqueued: scheduled.length, pending: pending_resources.length, scheduled: scheduled_jobs.length)
end
end end
def scheduled_job_ids def scheduled_job_ids
...@@ -165,12 +181,14 @@ module Geo ...@@ -165,12 +181,14 @@ module Geo
@current_node_enabled ||= Gitlab::Geo.current_node_enabled? @current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end end
def log_info(message) def log_info(message, extra_args = {})
Gitlab::Geo::Logger.info(class: self.class.name, message: message) args = { class: self.class.name, message: message }.merge(extra_args)
Gitlab::Geo::Logger.info(args)
end end
def log_error(message) def log_error(message, extra_args = {})
Gitlab::Geo::Logger.error(class: self.class.name, message: message) args = { class: self.class.name, message: message }.merge(extra_args)
Gitlab::Geo::Logger.error(args)
end end
end end
end end
---
title: Improve logging output for several Geo background workers
merge_request: 2961
author:
type: other
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment