Commit 0a9168e0 authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Terri Chu

Refactor the current_migration method

parent 708ffb4e
...@@ -38,7 +38,7 @@ module Elastic ...@@ -38,7 +38,7 @@ module Elastic
def load_from_index def load_from_index
client.get(index: index_name, id: version) client.get(index: index_name, id: version)
rescue StandardError => e rescue StandardError => e
logger.error("[Elastic::MigrationRecord]: #{e.class}: #{e.message}") logger.error("[#{self.class.name}]: #{e.class}: #{e.message}")
nil nil
end end
...@@ -59,6 +59,14 @@ module Elastic ...@@ -59,6 +59,14 @@ module Elastic
name.underscore name.underscore
end end
def running?
started? && !stopped?
end
def stopped?
halted? || completed?
end
def self.load_versions(completed:) def self.load_versions(completed:)
helper = Gitlab::Elastic::Helper.default helper = Gitlab::Elastic::Helper.default
helper.client helper.client
...@@ -67,8 +75,15 @@ module Elastic ...@@ -67,8 +75,15 @@ module Elastic
.map { |v| v['_id'].to_i } .map { |v| v['_id'].to_i }
end end
def running? def self.completed_versions
started? && !halted? && !completed? load_versions(completed: true)
end
def self.current_migration
completed_migrations = completed_versions
# use exclude to support new migrations which do not exist in the index yet
Elastic::DataMigrationService.migrations.find { |migration| completed_migrations.exclude?(migration.version) } # rubocop: disable CodeReuse/ServiceClass
end end
private private
......
...@@ -28,7 +28,7 @@ module Elastic ...@@ -28,7 +28,7 @@ module Elastic
helper.create_migrations_index helper.create_migrations_index
end end
migration = current_migration migration = Elastic::MigrationRecord.current_migration
unless migration unless migration
logger.info 'MigrationWorker: no migration available' logger.info 'MigrationWorker: no migration available'
...@@ -66,6 +66,8 @@ module Elastic ...@@ -66,6 +66,8 @@ module Elastic
Elastic::DataMigrationService.drop_migration_has_finished_cache!(migration) Elastic::DataMigrationService.drop_migration_has_finished_cache!(migration)
end end
rescue StandardError => e
logger.error("#{self.class.name}: #{e.class} #{e.message}")
end end
private private
...@@ -88,17 +90,6 @@ module Elastic ...@@ -88,17 +90,6 @@ module Elastic
end end
end end
def current_migration
completed_migrations = Elastic::MigrationRecord.load_versions(completed: true)
# use a negative condition to support new migrations which do not exist in the index yet
Elastic::DataMigrationService.migrations.find { |migration| !completed_migrations.include?(migration.version) }
rescue StandardError => e
# do not return a migration if there is an issue communicating with the Elasticsearch instance
logger.error("MigrationWorker: #{e.class}: #{e.message}")
nil
end
def pause_indexing!(migration) def pause_indexing!(migration)
return unless migration.pause_indexing? return unless migration.pause_indexing?
return if migration.load_state[:pause_indexing].present? return if migration.load_state[:pause_indexing].present?
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Elastic::MigrationRecord, :elastic do RSpec.describe Elastic::MigrationRecord, :elastic do
using RSpec::Parameterized::TableSyntax
let(:record) { described_class.new(version: Time.now.to_i, name: 'ExampleMigration', filename: nil) } let(:record) { described_class.new(version: Time.now.to_i, name: 'ExampleMigration', filename: nil) }
describe '#save!' do describe '#save!' do
...@@ -110,9 +112,30 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -110,9 +112,30 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
end end
end end
describe '#running?' do describe '#current_migration' do
using RSpec::Parameterized::TableSyntax before do
allow(Elastic::DataMigrationService).to receive(:migrations).and_return([record])
allow(described_class).to receive(:completed_versions).and_return(completed_migrations.map(&:version))
end
context 'when there is an unexecuted migration' do
let(:completed_migrations) { [] }
it 'returns the correct migration' do
expect(described_class.current_migration).to eq record
end
end
context 'when there are no uncompleted migrations' do
let(:completed_migrations) { [record] }
it 'returns nil' do
expect(described_class.current_migration).to be_nil
end
end
end
describe '#running?' do
before do before do
allow(record).to receive(:halted?).and_return(halted) allow(record).to receive(:halted?).and_return(halted)
allow(record).to receive(:started?).and_return(started) allow(record).to receive(:started?).and_return(started)
...@@ -133,4 +156,24 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -133,4 +156,24 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
end end
end end
end end
describe '#stopped?' do
before do
allow(record).to receive(:halted?).and_return(halted)
allow(record).to receive(:completed?).and_return(completed)
end
where(:halted, :completed, :expected) do
false | false | false
false | true | true
true | false | true
true | true | true
end
with_them do
it 'returns the expected result' do
expect(record.stopped?).to eq(expected)
end
end
end
end end
...@@ -26,7 +26,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -26,7 +26,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
context 'an unexecuted migration present' do context 'an unexecuted migration present' do
before do before do
allow(subject).to receive(:current_migration).and_return(migration) allow(Elastic::MigrationRecord).to receive(:current_migration).and_return(migration)
end end
it 'creates an index if it does not exist' do it 'creates an index if it does not exist' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment