Commit ea922b77 authored by Andreas Brandl's avatar Andreas Brandl Committed by Alper Akgun

Refactor and integrate Grafana annotations

* Introduce observer-like pattern.
* Refactor ReindexAction.keep_track_of and split
parent c71d8433
...@@ -8,9 +8,9 @@ module Gitlab ...@@ -8,9 +8,9 @@ module Gitlab
# candidate_indexes: Array of Gitlab::Database::PostgresIndex # candidate_indexes: Array of Gitlab::Database::PostgresIndex
def self.perform(candidate_indexes, how_many: DEFAULT_INDEXES_PER_INVOCATION) def self.perform(candidate_indexes, how_many: DEFAULT_INDEXES_PER_INVOCATION)
indexes = IndexSelection.new(candidate_indexes).take(how_many) IndexSelection.new(candidate_indexes).take(how_many).each do |index|
Coordinator.new(index).perform
Coordinator.new(indexes).perform end
end end
def self.candidate_indexes def self.candidate_indexes
......
...@@ -12,26 +12,44 @@ module Gitlab ...@@ -12,26 +12,44 @@ module Gitlab
# statement timeouts). # statement timeouts).
TIMEOUT_PER_ACTION = 1.day TIMEOUT_PER_ACTION = 1.day
attr_reader :indexes attr_reader :index, :notifier
def initialize(indexes) def initialize(index, notifier = GrafanaNotifier.new)
@indexes = indexes @index = index
@notifier = notifier
end end
def perform def perform
indexes.each do |index|
# This obtains a global lease such that there's # This obtains a global lease such that there's
# only one live reindexing process at a time. # only one live reindexing process at a time.
try_obtain_lease do try_obtain_lease do
ReindexAction.keep_track_of(index) do action = ReindexAction.create_for(index)
ConcurrentReindex.new(index).perform
end with_notifications(action) do
perform_for(index, action)
end end
end end
end end
private private
def with_notifications(action)
notifier.notify_start(action)
yield
ensure
notifier.notify_end(action)
end
def perform_for(index, action)
ConcurrentReindex.new(index).perform
rescue
action.state = :failed
raise
ensure
action.finish
end
def lease_timeout def lease_timeout
TIMEOUT_PER_ACTION TIMEOUT_PER_ACTION
end end
......
# frozen_string_literal: true
module Gitlab
module Database
module Reindexing
# This can be used to send annotations for reindexing to a Grafana API
class GrafanaNotifier
def initialize(api_key = ENV['GITLAB_GRAFANA_API_KEY'], api_url = ENV['GITLAB_GRAFANA_API_URL'])
@api_key = api_key
@api_url = api_url
end
def notify_start(action)
return unless enabled?
payload = base_payload(action).merge(
text: "Started reindexing of #{action.index.name} on #{action.index.tablename}"
)
annotate(payload)
end
def notify_end(action)
return unless enabled?
payload = base_payload(action).merge(
text: "Finished reindexing of #{action.index.name} on #{action.index.tablename} (#{action.state})",
timeEnd: (action.action_end.utc.to_f * 1000).to_i,
isRegion: true
)
annotate(payload)
end
private
def base_payload(action)
{
time: (action.action_start.utc.to_f * 1000).to_i,
tags: ['reindex', action.index.tablename, action.index.name]
}
end
def annotate(payload)
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer #{@api_key}"
}
success = Gitlab::HTTP.post("#{@api_url}/api/annotations", body: payload.to_json, headers: headers, allow_local_requests: true).success?
log_error("Response code #{response.code}") unless success
success
rescue => err
log_error(err)
false
end
def log_error(err)
Gitlab::AppLogger.warn("Unable to notify Grafana from #{self.class}: #{err}")
end
def enabled?
!(@api_url.blank? || @api_key.blank?)
end
end
end
end
end
...@@ -14,27 +14,23 @@ module Gitlab ...@@ -14,27 +14,23 @@ module Gitlab
scope :recent, -> { where(state: :finished).where('action_end > ?', Time.zone.now - RECENT_THRESHOLD) } scope :recent, -> { where(state: :finished).where('action_end > ?', Time.zone.now - RECENT_THRESHOLD) }
def self.keep_track_of(index, &block) def self.create_for(index)
action = create!( create!(
index_identifier: index.identifier, index_identifier: index.identifier,
action_start: Time.zone.now, action_start: Time.zone.now,
ondisk_size_bytes_start: index.ondisk_size_bytes, ondisk_size_bytes_start: index.ondisk_size_bytes,
bloat_estimate_bytes_start: index.bloat_size bloat_estimate_bytes_start: index.bloat_size
) )
end
yield def finish
action.state = :finished
rescue
action.state = :failed
raise
ensure
index.reload # rubocop:disable Cop/ActiveRecordAssociationReload index.reload # rubocop:disable Cop/ActiveRecordAssociationReload
action.action_end = Time.zone.now self.state = :finished unless failed?
action.ondisk_size_bytes_end = index.ondisk_size_bytes self.action_end = Time.zone.now
self.ondisk_size_bytes_end = index.ondisk_size_bytes
action.save! save!
end end
end end
end end
......
...@@ -3,66 +3,80 @@ ...@@ -3,66 +3,80 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Database::Reindexing::Coordinator do RSpec.describe Gitlab::Database::Reindexing::Coordinator do
include Database::DatabaseHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
describe '.perform' do describe '.perform' do
subject { described_class.new(indexes).perform } subject { described_class.new(index, notifier).perform }
let(:indexes) { [instance_double(Gitlab::Database::PostgresIndex), instance_double(Gitlab::Database::PostgresIndex)] } before do
let(:reindexers) { [instance_double(Gitlab::Database::Reindexing::ConcurrentReindex), instance_double(Gitlab::Database::Reindexing::ConcurrentReindex)] } swapout_view_for_table(:postgres_indexes)
allow(Gitlab::Database::Reindexing::ConcurrentReindex).to receive(:new).with(index).and_return(reindexer)
allow(Gitlab::Database::Reindexing::ReindexAction).to receive(:create_for).with(index).and_return(action)
end
let(:index) { create(:postgres_index) }
let(:notifier) { instance_double(Gitlab::Database::Reindexing::GrafanaNotifier, notify_start: nil, notify_end: nil) }
let(:reindexer) { instance_double(Gitlab::Database::Reindexing::ConcurrentReindex, perform: nil) }
let(:action) { create(:reindex_action, index: index) }
let!(:lease) { stub_exclusive_lease(lease_key, uuid, timeout: lease_timeout) } let!(:lease) { stub_exclusive_lease(lease_key, uuid, timeout: lease_timeout) }
let(:lease_key) { 'gitlab/database/reindexing/coordinator' } let(:lease_key) { 'gitlab/database/reindexing/coordinator' }
let(:lease_timeout) { 1.day } let(:lease_timeout) { 1.day }
let(:uuid) { 'uuid' } let(:uuid) { 'uuid' }
before do context 'locking' do
allow(Gitlab::Database::Reindexing::ReindexAction).to receive(:keep_track_of).and_yield it 'acquires a lock while reindexing' do
expect(lease).to receive(:try_obtain).ordered.and_return(uuid)
indexes.zip(reindexers).each do |index, reindexer| expect(reindexer).to receive(:perform).ordered
allow(Gitlab::Database::Reindexing::ConcurrentReindex).to receive(:new).with(index).and_return(reindexer)
allow(reindexer).to receive(:perform)
end
end
it 'performs concurrent reindexing for each index' do expect(Gitlab::ExclusiveLease).to receive(:cancel).ordered.with(lease_key, uuid)
indexes.zip(reindexers).each do |index, reindexer|
expect(Gitlab::Database::Reindexing::ConcurrentReindex).to receive(:new).with(index).ordered.and_return(reindexer)
expect(reindexer).to receive(:perform)
end
subject subject
end end
it 'keeps track of actions and creates ReindexAction records' do it 'does not perform reindexing actions if lease is not granted' do
indexes.each do |index| expect(lease).to receive(:try_obtain).ordered.and_return(false)
expect(Gitlab::Database::Reindexing::ReindexAction).to receive(:keep_track_of).with(index).and_yield expect(Gitlab::Database::Reindexing::ConcurrentReindex).not_to receive(:new)
end
subject subject
end end
context 'locking' do
it 'acquires a lock while reindexing' do
indexes.each do |index|
expect(lease).to receive(:try_obtain).ordered.and_return(uuid)
action = instance_double(Gitlab::Database::Reindexing::ConcurrentReindex)
expect(Gitlab::Database::Reindexing::ConcurrentReindex).to receive(:new).ordered.with(index).and_return(action)
expect(action).to receive(:perform).ordered
expect(Gitlab::ExclusiveLease).to receive(:cancel).ordered.with(lease_key, uuid)
end end
context 'notifications' do
it 'sends #notify_start before reindexing' do
expect(notifier).to receive(:notify_start).with(action).ordered
expect(reindexer).to receive(:perform).ordered
subject subject
end end
it 'does does not perform reindexing actions if lease is not granted' do it 'sends #notify_end after reindexing and updating the action is done' do
indexes.each do |index| expect(action).to receive(:finish).ordered
expect(lease).to receive(:try_obtain).ordered.and_return(false) expect(notifier).to receive(:notify_end).with(action).ordered
expect(Gitlab::Database::Reindexing::ConcurrentReindex).not_to receive(:new)
subject
end end
end
context 'action tracking' do
it 'calls #finish on the action' do
expect(reindexer).to receive(:perform).ordered
expect(action).to receive(:finish).ordered
subject subject
end end
it 'upon error, it still calls finish and raises the error' do
expect(reindexer).to receive(:perform).ordered.and_raise('something went wrong')
expect(action).to receive(:finish).ordered
expect { subject }.to raise_error(/something went wrong/)
expect(action).to be_failed
end
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::Reindexing::GrafanaNotifier do
include Database::DatabaseHelpers
let(:api_key) { "foo" }
let(:api_url) { "http://bar"}
let(:action) { create(:reindex_action) }
before do
swapout_view_for_table(:postgres_indexes)
end
let(:headers) do
{
'Content-Type': 'application/json',
'Authorization': "Bearer #{api_key}"
}
end
let(:response) { double('response', success?: true) }
def expect_api_call(payload)
expect(Gitlab::HTTP).to receive(:post).with("#{api_url}/api/annotations", body: payload.to_json, headers: headers, allow_local_requests: true).and_return(response)
end
shared_examples_for 'interacting with Grafana annotations API' do
it 'POSTs a JSON payload' do
expect_api_call(payload)
expect(subject).to be_truthy
end
context 'on error' do
it 'does not raise the error and returns false' do
allow(Gitlab::HTTP).to receive(:post).and_raise('something went wrong')
expect(subject).to be_falsey
end
context 'when request was not successful' do
it 'returns false' do
expect_api_call(payload)
allow(response).to receive(:success?).and_return(false)
expect(subject).to be_falsey
end
end
end
context 'without api_key' do
let(:api_key) { '' }
it 'does not post anything' do
expect(Gitlab::HTTP).not_to receive(:post)
expect(subject).to be_falsey
end
end
context 'without api_url' do
let(:api_url) { '' }
it 'does not post anything' do
expect(Gitlab::HTTP).not_to receive(:post)
expect(subject).to be_falsey
end
end
end
describe '#notify_start' do
subject { described_class.new(api_key, api_url).notify_start(action) }
let(:payload) do
{
time: (action.action_start.utc.to_f * 1000).to_i,
tags: ['reindex', action.index.tablename, action.index.name],
text: "Started reindexing of #{action.index.name} on #{action.index.tablename}"
}
end
it_behaves_like 'interacting with Grafana annotations API'
end
describe '#notify_end' do
subject { described_class.new(api_key, api_url).notify_end(action) }
let(:payload) do
{
time: (action.action_start.utc.to_f * 1000).to_i,
tags: ['reindex', action.index.tablename, action.index.name],
text: "Finished reindexing of #{action.index.name} on #{action.index.tablename} (#{action.state})",
timeEnd: (action.action_end.utc.to_f * 1000).to_i,
isRegion: true
}
end
it_behaves_like 'interacting with Grafana annotations API'
end
end
...@@ -2,91 +2,83 @@ ...@@ -2,91 +2,83 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Database::Reindexing::ReindexAction, '.keep_track_of' do RSpec.describe Gitlab::Database::Reindexing::ReindexAction do
let(:index) { double('index', identifier: 'public.something', ondisk_size_bytes: 10240, reload: nil, bloat_size: 42) } include Database::DatabaseHelpers
let(:size_after) { 512 }
it 'yields to the caller' do let(:index) { create(:postgres_index) }
expect { |b| described_class.keep_track_of(index, &b) }.to yield_control
end
def find_record before_all do
described_class.find_by(index_identifier: index.identifier) swapout_view_for_table(:postgres_indexes)
end end
it 'creates the record with a start time and updates its end time' do describe '.create_for' do
subject { described_class.create_for(index) }
it 'creates a new record for the given index' do
freeze_time do freeze_time do
described_class.keep_track_of(index) do record = subject
expect(find_record.action_start).to be_within(1.second).of(Time.zone.now)
expect(record.index_identifier).to eq(index.identifier)
expect(record.action_start).to eq(Time.zone.now)
expect(record.ondisk_size_bytes_start).to eq(index.ondisk_size_bytes)
expect(subject.bloat_estimate_bytes_start).to eq(index.bloat_size)
travel(10.seconds) expect(record).to be_persisted
end
end
end end
duration = find_record.action_end - find_record.action_start describe '#finish' do
subject { action.finish }
expect(duration).to be_within(1.second).of(10.seconds) let(:action) { build(:reindex_action, index: index) }
it 'sets #action_end' do
freeze_time do
subject
expect(action.action_end).to eq(Time.zone.now)
end end
end end
it 'creates the record with its status set to :started and updates its state to :finished' do it 'sets #ondisk_size_bytes_end after reloading the index record' do
described_class.keep_track_of(index) do new_size = 4711
expect(find_record).to be_started expect(action.index).to receive(:reload).ordered
end expect(action.index).to receive(:ondisk_size_bytes).and_return(new_size).ordered
expect(find_record).to be_finished subject
expect(action.ondisk_size_bytes_end).to eq(new_size)
end end
it 'creates the record with the indexes start size and updates its end size' do context 'setting #state' do
described_class.keep_track_of(index) do it 'sets #state to finished if not given' do
expect(find_record.ondisk_size_bytes_start).to eq(index.ondisk_size_bytes) action.state = nil
expect(index).to receive(:reload).once subject
allow(index).to receive(:ondisk_size_bytes).and_return(size_after)
end
expect(find_record.ondisk_size_bytes_end).to eq(size_after) expect(action).to be_finished
end end
it 'creates the record with the indexes bloat estimate' do it 'sets #state to finished if not set to started' do
described_class.keep_track_of(index) do action.state = :started
expect(find_record.bloat_estimate_bytes_start).to eq(index.bloat_size)
end
end
context 'in case of errors' do subject
it 'sets the state to failed' do
expect do
described_class.keep_track_of(index) do
raise 'something went wrong'
end
end.to raise_error(/something went wrong/)
expect(find_record).to be_failed expect(action).to be_finished
end end
it 'records the end time' do it 'does not change state if set to failed' do
freeze_time do action.state = :failed
expect do
described_class.keep_track_of(index) do
raise 'something went wrong'
end
end.to raise_error(/something went wrong/)
expect(find_record.action_end).to be_within(1.second).of(Time.zone.now) expect { subject }.not_to change { action.state }
end end
end end
it 'records the resulting index size' do it 'saves the record' do
expect(index).to receive(:reload).once expect(action).to receive(:save!)
allow(index).to receive(:ondisk_size_bytes).and_return(size_after)
expect do
described_class.keep_track_of(index) do
raise 'something went wrong'
end
end.to raise_error(/something went wrong/)
expect(find_record.ondisk_size_bytes_end).to eq(size_after) subject
end end
end end
end end
...@@ -11,13 +11,16 @@ RSpec.describe Gitlab::Database::Reindexing do ...@@ -11,13 +11,16 @@ RSpec.describe Gitlab::Database::Reindexing do
let(:coordinator) { instance_double(Gitlab::Database::Reindexing::Coordinator) } let(:coordinator) { instance_double(Gitlab::Database::Reindexing::Coordinator) }
let(:index_selection) { instance_double(Gitlab::Database::Reindexing::IndexSelection) } let(:index_selection) { instance_double(Gitlab::Database::Reindexing::IndexSelection) }
let(:candidate_indexes) { double } let(:candidate_indexes) { double }
let(:indexes) { double } let(:indexes) { [double, double] }
it 'delegates to Coordinator' do it 'delegates to Coordinator' do
expect(Gitlab::Database::Reindexing::IndexSelection).to receive(:new).with(candidate_indexes).and_return(index_selection) expect(Gitlab::Database::Reindexing::IndexSelection).to receive(:new).with(candidate_indexes).and_return(index_selection)
expect(index_selection).to receive(:take).with(2).and_return(indexes) expect(index_selection).to receive(:take).with(2).and_return(indexes)
expect(Gitlab::Database::Reindexing::Coordinator).to receive(:new).with(indexes).and_return(coordinator)
indexes.each do |index|
expect(Gitlab::Database::Reindexing::Coordinator).to receive(:new).with(index).and_return(coordinator)
expect(coordinator).to receive(:perform) expect(coordinator).to receive(:perform)
end
subject subject
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment