Commit 761c545d authored by Toon Claes's avatar Toon Claes

Further improve Geo event log gap tracking and add more tests

parent 3abe09f4
---
title: Track the Geo event log gaps in redis and handle them later
merge_request: 6640
author:
type: changed
......@@ -34,40 +34,27 @@ module Gitlab
end
def run_once!
gap_tracking.fill_gaps.each { |event_id| handle_gap_event(event_id) }
gap_tracking.fill_gaps { |event_id| handle_gap_event(event_id) }
# Wrap this with the connection to make it possible to reconnect if
# PGbouncer dies: https://github.com/rails/rails/issues/29189
ActiveRecord::Base.connection_pool.with_connection do
LogCursor::EventLogs.new.fetch_in_batches { |batch, last_id| handle_events(batch, last_id) }
end
gap_tracking.fill!
end
private
def handle_events(batch, previous_batch_last_id)
logger.info("Handling events", first_id: batch.first.id, last_id: batch.last.id)
gap_tracking.previous_id = previous_batch_last_id
batch.each_with_index do |event_log, index|
batch.each do |event_log|
gap_tracking.check!(event_log.id)
handle_signle_event(event_log)
handle_single_event(event_log)
end
end
def handle_gap_event(event_id)
event_log = ::Geo::EventLog.find_by(id: event_id)
return false unless event_log
handle_single_event(event_log)
true
end
def handle_single_event(event_log)
event = event_log.event
......@@ -93,6 +80,15 @@ module Gitlab
raise e
end
def handle_gap_event(event_id)
event_log = ::Geo::EventLog.find_by(id: event_id)
return false unless event_log
handle_single_event(event_log)
true
end
def event_klass_for(event)
event_klass_name = event.class.name.demodulize
current_namespace = self.class.name.deconstantize
......
......@@ -47,7 +47,7 @@ module Gitlab
with_redis do |redis|
expire_time = Time.now.to_i
(previous_id+1 .. current_id-1).each do |gap_id|
((previous_id + 1)..(current_id - 1)).each do |gap_id|
redis.zadd(GEO_LOG_CURSOR_GAPS, expire_time, gap_id)
end
end
......
......@@ -84,6 +84,15 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once!
end
it 'calls #handle_gap_event for each gap the gap tracking finds' do
allow(daemon.gap_tracking).to receive(:fill_gaps).and_yield(1).and_yield(5)
expect(daemon).to receive(:handle_gap_event).with(1)
expect(daemon).to receive(:handle_gap_event).with(5)
daemon.run_once!
end
end
context 'when node has namespace restrictions' do
......@@ -116,32 +125,28 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once!
end
it "logs a message if an event was skipped" do
it 'detects when an event was skipped' do
updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event)
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: event_log.id,
event_log_id: new_event.id))
daemon.run_once!
expect(read_gaps).to eq([event_log.id + 1])
expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id)
end
it 'detects when an event was skipped between batches' do
updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, repository_updated_event: updated_event)
# Test that the cursor picks up from the last stored ID
third_event = create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once!
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: new_event.id,
event_log_id: third_event.id))
create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once!
expect(read_gaps).to eq([new_event.id + 1, new_event.id + 2])
end
it "logs a message if an associated event can't be found" do
......@@ -183,4 +188,67 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end
end
end
describe '#handle_events' do
let(:batch) { create_list(:geo_event_log, 2) }
it 'passes the previous batch id on to gap tracking' do
expect(daemon.gap_tracking).to receive(:previous_id=).with(55).ordered
batch.each do |event_log|
expect(daemon.gap_tracking).to receive(:previous_id=).with(event_log.id).ordered
end
daemon.handle_events(batch, 55)
end
it 'checks for gaps for each id in batch' do
batch.each do |event_log|
expect(daemon.gap_tracking).to receive(:check!).with(event_log.id)
end
daemon.handle_events(batch, 55)
end
it 'handles every single event' do
batch.each do |event_log|
expect(daemon).to receive(:handle_single_event).with(event_log)
end
daemon.handle_events(batch, 55)
end
end
describe '#handle_single_event' do
set(:event_log) { create(:geo_event_log, :updated_event) }
it 'skips execution when no event data is found' do
event_log = build(:geo_event_log)
expect(daemon).not_to receive(:can_replay?)
daemon.handle_single_event(event_log)
end
it 'checks if it can replay the event' do
expect(daemon).to receive(:can_replay?)
daemon.handle_single_event(event_log)
end
it 'processes event when it is replayable' do
allow(daemon).to receive(:can_replay?).and_return(true)
expect(daemon).to receive(:process_event).with(event_log.event, event_log)
daemon.handle_single_event(event_log)
end
end
def read_gaps
gaps = []
Timecop.travel(12.minutes) do
daemon.gap_tracking.fill_gaps { |id| gaps << id }
end
gaps
end
end
......@@ -14,7 +14,7 @@ describe Gitlab::Geo::LogCursor::EventGapTracking, :clean_gitlab_redis_cache do
it 'does nothing when previous id not valid' do
gap_tracking.previous_id = 0
expect(gap_tracking).to_not receive(:gap?)
expect(gap_tracking).not_to receive(:gap?)
gap_tracking.check!(event_id_with_gap)
......@@ -22,7 +22,7 @@ describe Gitlab::Geo::LogCursor::EventGapTracking, :clean_gitlab_redis_cache do
end
it 'does nothing when there is no gap' do
expect(gap_tracking).to_not receive(:track_gap)
expect(gap_tracking).not_to receive(:track_gap)
gap_tracking.check!(previous_event_id + 1)
......@@ -88,7 +88,7 @@ describe Gitlab::Geo::LogCursor::EventGapTracking, :clean_gitlab_redis_cache do
Timecop.freeze do
gap_tracking.track_gap(event_id_with_gap + 3)
expected_gaps = (previous_event_id+1..event_id_with_gap+2).collect { |id| [id.to_s, Time.now.to_i] }
expected_gaps = ((previous_event_id + 1)..(event_id_with_gap + 2)).collect { |id| [id.to_s, Time.now.to_i] }
expect(read_gaps).to match_array(expected_gaps)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment