Commit b73e6b4c authored by Stan Hu's avatar Stan Hu

Add additional logging for Geo Log Cursor

1. Log the batch of IDs taken
2. Log when a gap in the event sequence is detected
parent e4fa8edb
---
title: Add additional logging for Geo Log Cursor
merge_request: 6513
author:
type: other
...@@ -37,14 +37,18 @@ module Gitlab ...@@ -37,14 +37,18 @@ module Gitlab
# Wrap this with the connection to make it possible to reconnect if # Wrap this with the connection to make it possible to reconnect if
# PGbouncer dies: https://github.com/rails/rails/issues/29189 # PGbouncer dies: https://github.com/rails/rails/issues/29189
ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.connection_pool.with_connection do
LogCursor::EventLogs.new.fetch_in_batches { |batch| handle_events(batch) } LogCursor::EventLogs.new.fetch_in_batches { |batch, last_id| handle_events(batch, last_id) }
end end
end end
private private
def handle_events(batch) def handle_events(batch, last_id)
batch.each do |event_log| logger.info("Handling events", first_id: batch.first.id, last_id: batch.last.id)
last_event_id = last_id
batch.each_with_index do |event_log, index|
event = event_log.event event = event_log.event
# If a project is deleted, the event log and its associated event data # If a project is deleted, the event log and its associated event data
...@@ -54,6 +58,9 @@ module Gitlab ...@@ -54,6 +58,9 @@ module Gitlab
next next
end end
check_event_id(last_event_id, event_log.id) if last_event_id > 0
last_event_id = event_log.id
unless can_replay?(event_log) unless can_replay?(event_log)
logger.event_info(event_log.created_at, 'Skipped event', event_data(event_log)) logger.event_info(event_log.created_at, 'Skipped event', event_data(event_log))
next next
...@@ -68,6 +75,12 @@ module Gitlab ...@@ -68,6 +75,12 @@ module Gitlab
end end
end end
def check_event_id(last_event_id, current_log_id)
if last_event_id + 1 != current_log_id
logger.info("Event log gap", previous_event_log_id: last_event_id, event_log_id: current_log_id)
end
end
def event_klass_for(event) def event_klass_for(event)
event_klass_name = event.class.name.demodulize event_klass_name = event.class.name.demodulize
current_namespace = self.class.name.deconstantize current_namespace = self.class.name.deconstantize
......
...@@ -7,9 +7,14 @@ module Gitlab ...@@ -7,9 +7,14 @@ module Gitlab
# fetches up to BATCH_SIZE next events and keep track of batches # fetches up to BATCH_SIZE next events and keep track of batches
def fetch_in_batches(batch_size: BATCH_SIZE) def fetch_in_batches(batch_size: BATCH_SIZE)
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: batch_size) do |batch| last_id = last_processed_id
yield batch
save_processed(batch.last.id) ::Geo::EventLog.where('id > ?', last_id).find_in_batches(batch_size: batch_size) do |batch|
yield(batch, last_id)
last_id = batch.last.id
save_processed(last_id)
break unless Lease.renew! break unless Lease.renew!
end end
end end
...@@ -23,7 +28,7 @@ module Gitlab ...@@ -23,7 +28,7 @@ module Gitlab
end end
# @return [Integer] id of last replicated event # @return [Integer] id of last replicated event
def last_processed def last_processed_id
last = ::Geo::EventLogState.last_processed&.id last = ::Geo::EventLogState.last_processed&.id
return last if last return last if last
......
...@@ -80,7 +80,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -80,7 +80,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
it 'handles events' do it 'handles events' do
expect(daemon).to receive(:handle_events).with(batch) expect(daemon).to receive(:handle_events).with(batch, anything)
daemon.run_once! daemon.run_once!
end end
...@@ -97,6 +97,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -97,6 +97,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
before do before do
allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
allow(Gitlab::Geo::Logger).to receive(:info).and_call_original
end end
it 'replays events for projects that belong to selected namespaces to replicate' do it 'replays events for projects that belong to selected namespaces to replicate' do
...@@ -115,6 +116,34 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -115,6 +116,34 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once! daemon.run_once!
end end
it "logs a message if an event was skipped" do
updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event)
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: event_log.id,
event_log_id: new_event.id))
daemon.run_once!
expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id)
# Test that the cursor picks up from the last stored ID
third_event = create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: new_event.id,
event_log_id: third_event.id))
daemon.run_once!
end
it "logs a message if an associated event can't be found" do it "logs a message if an associated event can't be found" do
new_event = create(:geo_event_log) new_event = create(:geo_event_log)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment