Commit 60117481 authored by Michael Kozono's avatar Michael Kozono

Refactor Geo LogCursor Daemon loop

- Rename run_once! to find_and_handle_events!
- Extract loop contents into run_once!
- Reuse sleep_break in arbitrary_sleep
- Remove now-redundant exit? check
- Use exit! to set @exit due to error
- Now we can avoid calling run! in specs to avoid excessive stubbing
parent da77dc56
...@@ -21,28 +21,28 @@ module Gitlab ...@@ -21,28 +21,28 @@ module Gitlab
logger.debug('#run!: start') logger.debug('#run!: start')
trap_signals trap_signals
until exit? run_once! until exit?
# Prevent the node from processing events unless it's a secondary
unless Geo.secondary?
logger.debug("#run!: not a secondary, sleeping for #{SECONDARY_CHECK_INTERVAL} secs")
sleep_break(SECONDARY_CHECK_INTERVAL)
next
end
lease = Lease.try_obtain_with_ttl { run_once! } logger.debug('#run!: finish')
end
handle_error(lease[:error]) def run_once!
# Prevent the node from processing events unless it's a secondary
unless Geo.secondary?
logger.debug("#run!: not a secondary, sleeping for #{SECONDARY_CHECK_INTERVAL} secs")
sleep_break(SECONDARY_CHECK_INTERVAL)
return
end
return if exit? lease = Lease.try_obtain_with_ttl { find_and_handle_events! }
# When no new event is found sleep for a few moments handle_error(lease[:error])
arbitrary_sleep(lease[:ttl])
end
logger.debug('#run!: finish') # When no new event is found sleep for a few moments
arbitrary_sleep(lease[:ttl])
end end
def run_once! def find_and_handle_events!
gap_tracking.fill_gaps { |event_log| handle_single_event(event_log) } gap_tracking.fill_gaps { |event_log| handle_single_event(event_log) }
# Wrap this with the connection to make it possible to reconnect if # Wrap this with the connection to make it possible to reconnect if
...@@ -58,8 +58,7 @@ module Gitlab ...@@ -58,8 +58,7 @@ module Gitlab
track_failing_since(did_error) track_failing_since(did_error)
if excessive_errors? if excessive_errors?
logger.error("#run!: Exiting due to consecutive errors for over #{MAX_ERROR_DURATION} seconds") exit!("Consecutive errors for over #{MAX_ERROR_DURATION} seconds")
@exit = true
end end
end end
...@@ -77,14 +76,6 @@ module Gitlab ...@@ -77,14 +76,6 @@ module Gitlab
MAX_ERROR_DURATION < (Time.now - @failing_since) MAX_ERROR_DURATION < (Time.now - @failing_since)
end end
def sleep_break(seconds)
while seconds > 0
sleep(1)
seconds -= 1
break if exit?
end
end
def handle_events(batch, previous_batch_last_id) def handle_events(batch, previous_batch_last_id)
logger.info("#handle_events:", first_id: batch.first.id, last_id: batch.last.id) logger.info("#handle_events:", first_id: batch.first.id, last_id: batch.last.id)
...@@ -140,6 +131,12 @@ module Gitlab ...@@ -140,6 +131,12 @@ module Gitlab
@exit = true @exit = true
end end
def exit!(error_message)
logger.error("Exiting due to: #{error_message}") if error_message
@exit = true
end
def exit? def exit?
@exit @exit
end end
...@@ -160,7 +157,18 @@ module Gitlab ...@@ -160,7 +157,18 @@ module Gitlab
# This allows multiple GeoLogCursors to randomly process a batch of events, # This allows multiple GeoLogCursors to randomly process a batch of events,
# without favouring the shortest path (or latency). # without favouring the shortest path (or latency).
def arbitrary_sleep(delay) def arbitrary_sleep(delay)
sleep(delay + rand(1..20) * 0.1) jitter = rand(1..20) * 0.1
sleep_break(delay + jitter)
end
def sleep_break(seconds)
while seconds > 0.0
to_sleep = seconds > 1.0 ? 1.0 : seconds
seconds -= to_sleep
sleep(to_sleep)
break if exit?
end
end end
def gap_tracking def gap_tracking
......
...@@ -22,16 +22,9 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -22,16 +22,9 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
allow(daemon).to receive(:arbitrary_sleep).and_return(0.1) allow(daemon).to receive(:arbitrary_sleep).and_return(0.1)
end end
# WARNINGS # Warning: Ensure an exit condition for the main run! loop, or RSpec will not
# # stop without an interrupt. You can use `ensure_exit_on` to specify the exact
# 1. Ensure an exit condition for the main run! loop, or RSpec will not stop # number of calls to `exit?`, with the last call returning `true`.
# without an interrupt.
#
# I recommend using `ensure_exit_on`.
#
# 2. run! occasionally spawns git processes that run forever at 100% CPU.
#
# I don't know why this happens.
describe '#run!' do describe '#run!' do
it 'traps signals' do it 'traps signals' do
ensure_exit_on(1) ensure_exit_on(1)
...@@ -41,12 +34,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -41,12 +34,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
end end
it 'delegates to #run_once! in a loop' do it 'delegates to #run_once! in a loop' do
ensure_exit_on(4) ensure_exit_on(3)
is_expected.to receive(:run_once!).twice is_expected.to receive(:run_once!).twice
daemon.run! daemon.run!
end end
end
describe '#run_once!' do
it 'skips execution if cannot achieve a lease' do it 'skips execution if cannot achieve a lease' do
lease = stub_exclusive_lease_taken('geo_log_cursor_processed') lease = stub_exclusive_lease_taken('geo_log_cursor_processed')
...@@ -54,83 +49,68 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -54,83 +49,68 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
allow(lease).to receive(:same_uuid?).and_return(false) allow(lease).to receive(:same_uuid?).and_return(false)
allow(Gitlab::Geo::LogCursor::Lease).to receive(:exclusive_lease).and_return(lease) allow(Gitlab::Geo::LogCursor::Lease).to receive(:exclusive_lease).and_return(lease)
ensure_exit_on(2) is_expected.not_to receive(:find_and_handle_events!)
is_expected.not_to receive(:run_once!)
daemon.run! daemon.run_once!
end end
it 'skips execution if not a Geo node' do it 'skips execution if not a Geo node' do
stub_current_geo_node(nil) stub_current_geo_node(nil)
ensure_exit_on(2)
is_expected.to receive(:sleep_break).with(1.minute) is_expected.to receive(:sleep_break).with(1.minute)
is_expected.not_to receive(:run_once!) is_expected.not_to receive(:find_and_handle_events!)
daemon.run! daemon.run_once!
end end
it 'skips execution if the current node is a primary' do it 'skips execution if the current node is a primary' do
stub_current_geo_node(primary) stub_current_geo_node(primary)
ensure_exit_on(2)
is_expected.to receive(:sleep_break).with(1.minute) is_expected.to receive(:sleep_break).with(1.minute)
is_expected.not_to receive(:run_once!) is_expected.not_to receive(:find_and_handle_events!)
daemon.run! daemon.run_once!
end end
context 'when run! has handled an error every call for over the allowed duration' do context 'when the lease block rescues an error' do
it 'exits' do context 'when this error is the final straw' do
# Can't use ensure_exit_on here since the logic we are testing depends on `exit?` behavior. it 'calls `#exit!`' do
# If `exit?` is called a third time, then the "exit if failing for too long" behavior is broken. is_expected.to receive(:exit!)
expect(daemon).to receive(:exit?).and_call_original.twice
Timecop.freeze do
daemon.send(:handle_error, true)
Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do is_expected.to receive(:find_and_handle_events!).and_raise('any error').twice
expect(daemon).to receive(:gap_tracking).and_raise('boom')
is_expected.to receive(:run_once!).and_call_original.once Timecop.freeze do
daemon.run_once!
daemon.run! Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do
daemon.run_once!
end
end end
end end
end end
end
context 'when run_once! has returned one call without raising an error before the allowed duration' do
it 'does not exit' do
# Can't use ensure_exit_on here since the logic we are testing depends on `exit?` behavior
expect(daemon).to receive(:exit?).and_call_original.exactly(5).times
# Force exit on 6th call context 'when this error is not the final straw' do
# If `exit?` is not called 6 times, then the daemon stopped too early. it 'does not call `#exit!`' do
expect(daemon).to receive(:exit?).and_return(true) is_expected.not_to receive(:exit!)
Timecop.freeze do Timecop.freeze do
# As if an error occurred is_expected.to receive(:find_and_handle_events!).and_raise('any error')
daemon.send(:handle_error, true) daemon.run_once!
Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do
# First, a successful run to reset the timer is_expected.to receive(:find_and_handle_events!) # successful
expect(daemon).to receive(:gap_tracking).and_call_original daemon.run_once!
# Then more errors is_expected.to receive(:find_and_handle_events!).and_raise('any error')
expect(daemon).to receive(:gap_tracking).and_raise('boom').twice daemon.run_once!
end
# It should continue running until we force it to exit
is_expected.to receive(:run_once!).and_call_original.exactly(3).times
daemon.run!
end end
end end
end end
end end
end end
describe '#run_once!' do describe '#find_and_handle_events!' do
context 'with some event logs' do context 'with some event logs' do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) } let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
...@@ -141,7 +121,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -141,7 +121,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
it 'handles events' do it 'handles events' do
expect(daemon).to receive(:handle_events).with(batch, anything) expect(daemon).to receive(:handle_events).with(batch, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
it 'calls #handle_gap_event for each gap the gap tracking finds' do it 'calls #handle_gap_event for each gap the gap tracking finds' do
...@@ -153,7 +133,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -153,7 +133,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(daemon).to receive(:handle_single_event).with(event_log) expect(daemon).to receive(:handle_single_event).with(event_log)
expect(daemon).to receive(:handle_single_event).with(second_event_log) expect(daemon).to receive(:handle_single_event).with(second_event_log)
daemon.run_once! daemon.find_and_handle_events!
end end
end end
...@@ -176,7 +156,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -176,7 +156,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once
daemon.run_once! daemon.find_and_handle_events!
end end
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
...@@ -184,14 +164,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -184,14 +164,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
it 'detects when an event was skipped' do it 'detects when an event was skipped' do
updated_event = create(:geo_repository_updated_event, project: project) updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event) new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: event_log.id + 1) create(:geo_event_log, id: event_log.id + 1)
...@@ -204,11 +184,11 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -204,11 +184,11 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
updated_event = create(:geo_repository_updated_event, project: project) updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, repository_updated_event: updated_event) new_event = create(:geo_event_log, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: new_event.id + 1, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 1, repository_updated_event: updated_event)
create(:geo_event_log, id: new_event.id + 2, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 2, repository_updated_event: updated_event)
...@@ -225,7 +205,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -225,7 +205,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
message: '#handle_single_event: unknown event', message: '#handle_single_event: unknown event',
event_log_id: new_event.id)) event_log_id: new_event.id))
daemon.run_once! daemon.find_and_handle_events!
expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id) expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id)
end end
...@@ -243,7 +223,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -243,7 +223,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
event_type: 'Geo::RepositoryUpdatedEvent', event_type: 'Geo::RepositoryUpdatedEvent',
project_id: project.id)) project_id: project.id))
daemon.run_once! daemon.find_and_handle_events!
end end
it 'does not replay events for projects that do not belong to selected shards to replicate' do it 'does not replay events for projects that do not belong to selected shards to replicate' do
...@@ -251,7 +231,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -251,7 +231,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment