Commit 9642c3b6 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'fix-geo-log-cursor' into 'master'

Fix Geo log cursor

Closes #2858

See merge request !2353
parents e9bc4c66 7dea6fa2
...@@ -82,7 +82,7 @@ module Gitlab ...@@ -82,7 +82,7 @@ module Gitlab
end end
def handle_repository_update(updated_event) def handle_repository_update(updated_event)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id) registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: updated_event.project_id)
case updated_event.source case updated_event.source
when 'repository' when 'repository'
......
...@@ -28,10 +28,16 @@ module Gitlab ...@@ -28,10 +28,16 @@ module Gitlab
# @return [Integer] id of last replicated event # @return [Integer] id of last replicated event
def self.last_processed def self.last_processed
last = ::Geo::EventLogState.last_processed.try(:id) last = ::Geo::EventLogState.last_processed&.id
return last if last return last if last
::Geo::EventLog.any? ? ::Geo::EventLog.last.id : -1 if ::Geo::EventLog.any?
event_id = ::Geo::EventLog.last.id
save_processed(event_id)
event_id
else
-1
end
end end
# private methods # private methods
......
...@@ -2,10 +2,6 @@ require 'spec_helper' ...@@ -2,10 +2,6 @@ require 'spec_helper'
describe Gitlab::Geo::LogCursor::Daemon, lib: true do describe Gitlab::Geo::LogCursor::Daemon, lib: true do
describe '#run!' do describe '#run!' do
before do
allow(subject).to receive(:exit?) { true }
end
it 'traps signals' do it 'traps signals' do
allow(subject).to receive(:exit?) { true } allow(subject).to receive(:exit?) { true }
expect(subject).to receive(:trap_signals) expect(subject).to receive(:trap_signals)
...@@ -17,10 +13,44 @@ describe Gitlab::Geo::LogCursor::Daemon, lib: true do ...@@ -17,10 +13,44 @@ describe Gitlab::Geo::LogCursor::Daemon, lib: true do
subject { described_class.new(full_scan: true) } subject { described_class.new(full_scan: true) }
it 'executes a full-scan' do it 'executes a full-scan' do
allow(subject).to receive(:exit?) { true }
expect(subject).to receive(:full_scan!) expect(subject).to receive(:full_scan!)
subject.run! subject.run!
end end
end end
context 'when processing a repository updated event' do
let(:event_log) { create(:geo_event_log) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_updated_event) { event_log.repository_updated_event }
before do
allow(subject).to receive(:exit?).and_return(false, true)
end
it 'creates a new project registry if it does not exist' do
expect { subject.run! }.to change(Geo::ProjectRegistry, :count).by(1)
end
it 'sets resync_repository to true if event source is repository' do
repository_updated_event.update_attribute(:source, Geo::RepositoryUpdatedEvent::REPOSITORY)
registry = create(:geo_project_registry, :synced, project: repository_updated_event.project)
subject.run!
expect(registry.reload.resync_repository).to be true
end
it 'sets resync_wiki to true if event source is wiki' do
repository_updated_event.update_attribute(:source, Geo::RepositoryUpdatedEvent::WIKI)
registry = create(:geo_project_registry, :synced, project: repository_updated_event.project)
subject.run!
expect(registry.reload.resync_wiki).to be true
end
end
end end
end end
...@@ -2,20 +2,27 @@ require 'spec_helper' ...@@ -2,20 +2,27 @@ require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events, lib: true do describe Gitlab::Geo::LogCursor::Events, lib: true do
describe '.fetch_in_batches' do describe '.fetch_in_batches' do
let!(:event_log) { create(:geo_event_log) } let!(:event_log_1) { create(:geo_event_log) }
let!(:event_log_2) { create(:geo_event_log) }
before do context 'when no event_log_state exist' do
allow(described_class).to receive(:last_processed) { -1 } it 'does not yield a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.not_to yield_with_args([event_log_1, event_log_2])
end
end end
context 'when there is already an event_log_state' do
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log_1.id - 1) }
it 'yields a group of events' do it 'yields a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.to yield_with_args([event_log]) expect { |b| described_class.fetch_in_batches(&b) }.to yield_with_args([event_log_1, event_log_2])
end end
it 'saves processed files after yielding' do it 'saves last event as last processed after yielding' do
expect(described_class).to receive(:save_processed)
described_class.fetch_in_batches { |batch| batch } described_class.fetch_in_batches { |batch| batch }
expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id)
end
end end
it 'skips execution if cannot achieve a lease' do it 'skips execution if cannot achieve a lease' do
...@@ -26,15 +33,15 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do ...@@ -26,15 +33,15 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do
end end
describe '.save_processed' do describe '.save_processed' do
it 'saves a new entry in geo_event_log_state' do it 'creates a new event_log_state when no event_log_state exist' do
expect { described_class.save_processed(1) }.to change(Geo::EventLogState, :count).by(1) expect { described_class.save_processed(1) }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(1) expect(Geo::EventLogState.last.event_id).to eq(1)
end end
it 'removes older entries from geo_event_log_state' do it 'updates the event_id when there is already an event_log_state' do
create(:geo_event_log_state) create(:geo_event_log_state)
expect { described_class.save_processed(2) }.to change(Geo::EventLogState, :count).by(0) expect { described_class.save_processed(2) }.not_to change(Geo::EventLogState, :count)
expect(Geo::EventLogState.last.event_id).to eq(2) expect(Geo::EventLogState.last.event_id).to eq(2)
end end
end end
...@@ -52,6 +59,11 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do ...@@ -52,6 +59,11 @@ describe Gitlab::Geo::LogCursor::Events, lib: true do
it 'returns last event id' do it 'returns last event id' do
expect(described_class.last_processed).to eq(event_log.id) expect(described_class.last_processed).to eq(event_log.id)
end end
it 'saves last event as the last processed' do
expect { described_class.last_processed }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(event_log.id)
end
end end
context 'when there is already an event_log_state' do context 'when there is already an event_log_state' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment