Commit bf98a0f1 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Use an ExclusiveLease to prevent multiple instances of LogCursor to run

parent d08b3f75
...@@ -82,7 +82,7 @@ module Gitlab ...@@ -82,7 +82,7 @@ module Gitlab
end end
def handle_repository_update(updated_event) def handle_repository_update(updated_event)
registry = ::Geo::ProjectRegistry.find_or_create_by(project_id: project_id) registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
case updated_event.source case updated_event.source
when 'repository' when 'repository'
......
...@@ -5,19 +5,25 @@ module Gitlab ...@@ -5,19 +5,25 @@ module Gitlab
class Events class Events
BATCH_SIZE = 50 BATCH_SIZE = 50
NAMESPACE = 'geo:gitlab'.freeze NAMESPACE = 'geo:gitlab'.freeze
LEASE_TIMEOUT = 5.minutes.freeze
LEASE_KEY = 'geo_log_cursor_processed'.freeze
# fetches up to BATCH_SIZE next events and keep track of batches # fetches up to BATCH_SIZE next events and keep track of batches
def self.fetch_in_batches def self.fetch_in_batches
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch| try_obtain_lease do
yield batch ::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch|
save_processed(batch.last.id) yield batch
save_processed(batch.last.id)
renew_lease!
end
end end
end end
# saves last replicated event # saves last replicated event
def self.save_processed(event_id) def self.save_processed(event_id)
::Geo::EventLogState.create!(event_id: event_id) event_state = ::Geo::EventLogState.last || ::Geo::EventLogState.new
::Geo::EventLogState.where('event_id < ?', event_id).delete_all event_state.update!(event_id: event_id)
end end
# @return [Integer] id of last replicated event # @return [Integer] id of last replicated event
...@@ -27,6 +33,33 @@ module Gitlab ...@@ -27,6 +33,33 @@ module Gitlab
::Geo::EventLog.any? ? ::Geo::EventLog.last.id : -1 ::Geo::EventLog.any? ? ::Geo::EventLog.last.id : -1
end end
# private methods
def self.try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
$stdout.puts 'Cannot obtain an exclusive lease. There must be another process already in execution.'
return
end
begin
yield lease
ensure
Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease)
end
end
def self.renew_lease!
exclusive_lease.renew
end
def self.exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
private_class_method :try_obtain_lease, :exclusive_lease
end end
end end
end end
......
...@@ -17,6 +17,12 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do ...@@ -17,6 +17,12 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do
described_class.fetch_in_batches { |batch| batch } described_class.fetch_in_batches { |batch| batch }
end end
it 'skips execution if cannot achieve a lease' do
expect_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { }
expect { |b| described_class.fetch_in_batches(&b) }.not_to yield_control
end
end end
describe '.save_processed' do describe '.save_processed' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment