Commit d6ab5e88 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'geo-spike-self-service-framework' into 'master'

Geo: Spike Self-Service Framework

Closes #197319

See merge request gitlab-org/gitlab!23447
parents de5869d5 f97df573
---
title: 'Geo: Add tables to prepare to replicate package files'
merge_request: 23447
author:
type: added
...@@ -55,6 +55,8 @@ module Gitlab ...@@ -55,6 +55,8 @@ module Gitlab
memo << ee_path.to_s memo << ee_path.to_s
end end
ee_paths << "ee/app/replicators"
# Eager load should load CE first # Eager load should load CE first
config.eager_load_paths.push(*ee_paths) config.eager_load_paths.push(*ee_paths)
config.helpers_paths.push "#{config.root}/ee/app/helpers" config.helpers_paths.push "#{config.root}/ee/app/helpers"
......
...@@ -19,6 +19,7 @@ ActiveSupport::Inflector.inflections do |inflect| ...@@ -19,6 +19,7 @@ ActiveSupport::Inflector.inflections do |inflect|
group_view group_view
job_artifact_registry job_artifact_registry
lfs_object_registry lfs_object_registry
package_file_registry
project_auto_devops project_auto_devops
project_registry project_registry
project_statistics project_statistics
......
# frozen_string_literal: true
class CreateGeoEvents < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
create_table :geo_events do |t|
t.string :replicable_name, limit: 255, null: false
t.string :event_name, limit: 255, null: false
t.jsonb :payload, default: {}, null: false
t.datetime_with_timezone :created_at, null: false
end
end
end
# frozen_string_literal: true
class AddGeoEventIdToGeoEventLog < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
add_column :geo_event_log, :geo_event_id, :integer
end
end
# frozen_string_literal: true
class AddGeoEventIdIndexToGeoEventLog < ActiveRecord::Migration[5.2]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_index :geo_event_log, :geo_event_id,
where: "(geo_event_id IS NOT NULL)",
using: :btree,
name: 'index_geo_event_log_on_geo_event_id'
end
def down
remove_concurrent_index :geo_event_log, :geo_event_id, name: 'index_geo_event_log_on_geo_event_id'
end
end
# frozen_string_literal: true
class AddGeoEventsForeignKey < ActiveRecord::Migration[5.2]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_foreign_key :geo_event_log, :geo_events,
column: :geo_event_id,
name: 'fk_geo_event_log_on_geo_event_id',
on_delete: :cascade
end
def down
remove_foreign_key_without_error :geo_event_log, column: :geo_event_id, name: 'fk_geo_event_log_on_geo_event_id'
end
end
...@@ -1665,8 +1665,10 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do ...@@ -1665,8 +1665,10 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do
t.bigint "reset_checksum_event_id" t.bigint "reset_checksum_event_id"
t.bigint "cache_invalidation_event_id" t.bigint "cache_invalidation_event_id"
t.bigint "container_repository_updated_event_id" t.bigint "container_repository_updated_event_id"
t.integer "geo_event_id"
t.index ["cache_invalidation_event_id"], name: "index_geo_event_log_on_cache_invalidation_event_id", where: "(cache_invalidation_event_id IS NOT NULL)" t.index ["cache_invalidation_event_id"], name: "index_geo_event_log_on_cache_invalidation_event_id", where: "(cache_invalidation_event_id IS NOT NULL)"
t.index ["container_repository_updated_event_id"], name: "index_geo_event_log_on_container_repository_updated_event_id" t.index ["container_repository_updated_event_id"], name: "index_geo_event_log_on_container_repository_updated_event_id"
t.index ["geo_event_id"], name: "index_geo_event_log_on_geo_event_id", where: "(geo_event_id IS NOT NULL)"
t.index ["hashed_storage_attachments_event_id"], name: "index_geo_event_log_on_hashed_storage_attachments_event_id", where: "(hashed_storage_attachments_event_id IS NOT NULL)" t.index ["hashed_storage_attachments_event_id"], name: "index_geo_event_log_on_hashed_storage_attachments_event_id", where: "(hashed_storage_attachments_event_id IS NOT NULL)"
t.index ["hashed_storage_migrated_event_id"], name: "index_geo_event_log_on_hashed_storage_migrated_event_id", where: "(hashed_storage_migrated_event_id IS NOT NULL)" t.index ["hashed_storage_migrated_event_id"], name: "index_geo_event_log_on_hashed_storage_migrated_event_id", where: "(hashed_storage_migrated_event_id IS NOT NULL)"
t.index ["job_artifact_deleted_event_id"], name: "index_geo_event_log_on_job_artifact_deleted_event_id", where: "(job_artifact_deleted_event_id IS NOT NULL)" t.index ["job_artifact_deleted_event_id"], name: "index_geo_event_log_on_job_artifact_deleted_event_id", where: "(job_artifact_deleted_event_id IS NOT NULL)"
...@@ -1680,6 +1682,13 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do ...@@ -1680,6 +1682,13 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do
t.index ["upload_deleted_event_id"], name: "index_geo_event_log_on_upload_deleted_event_id", where: "(upload_deleted_event_id IS NOT NULL)" t.index ["upload_deleted_event_id"], name: "index_geo_event_log_on_upload_deleted_event_id", where: "(upload_deleted_event_id IS NOT NULL)"
end end
create_table "geo_events", force: :cascade do |t|
t.string "replicable_name", limit: 255, null: false
t.string "event_name", limit: 255, null: false
t.jsonb "payload", default: {}, null: false
t.datetime_with_timezone "created_at", null: false
end
create_table "geo_hashed_storage_attachments_events", force: :cascade do |t| create_table "geo_hashed_storage_attachments_events", force: :cascade do |t|
t.integer "project_id", null: false t.integer "project_id", null: false
t.text "old_attachments_path", null: false t.text "old_attachments_path", null: false
...@@ -4620,6 +4629,7 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do ...@@ -4620,6 +4629,7 @@ ActiveRecord::Schema.define(version: 2020_02_03_025821) do
add_foreign_key "geo_container_repository_updated_events", "container_repositories", name: "fk_212c89c706", on_delete: :cascade add_foreign_key "geo_container_repository_updated_events", "container_repositories", name: "fk_212c89c706", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_cache_invalidation_events", column: "cache_invalidation_event_id", name: "fk_42c3b54bed", on_delete: :cascade add_foreign_key "geo_event_log", "geo_cache_invalidation_events", column: "cache_invalidation_event_id", name: "fk_42c3b54bed", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_container_repository_updated_events", column: "container_repository_updated_event_id", name: "fk_6ada82d42a", on_delete: :cascade add_foreign_key "geo_event_log", "geo_container_repository_updated_events", column: "container_repository_updated_event_id", name: "fk_6ada82d42a", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_events", name: "fk_geo_event_log_on_geo_event_id", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_job_artifact_deleted_events", column: "job_artifact_deleted_event_id", name: "fk_176d3fbb5d", on_delete: :cascade add_foreign_key "geo_event_log", "geo_job_artifact_deleted_events", column: "job_artifact_deleted_event_id", name: "fk_176d3fbb5d", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade
......
# frozen_string_literal: true
module Geo
module BlobReplicatorStrategy
extend ActiveSupport::Concern
included do
event :created
end
class_methods do
end
# Called by Packages::PackageFile on create
def publish_created_event
publish(:created, **created_params)
end
# Called by Gitlab::Geo::Replicator#consume
def consume_created_event
download
end
def carrierwave_uploader
raise NotImplementedError
end
private
# Example:
#
# package_file.replicator.download
def download
::Geo::BlobDownloadService.new(replicator: self).execute
end
def created_params
{ model_record_id: model_record.id }
end
end
end
# frozen_string_literal: true
module Geo
class Event < ApplicationRecord
include Geo::Model
include Geo::Eventable
has_one :geo_event_log, class_name: 'Geo::EventLog', foreign_key: :geo_event_id
end
end
...@@ -17,7 +17,8 @@ module Geo ...@@ -17,7 +17,8 @@ module Geo
Geo::LfsObjectDeletedEvent Geo::LfsObjectDeletedEvent
Geo::JobArtifactDeletedEvent Geo::JobArtifactDeletedEvent
Geo::UploadDeletedEvent Geo::UploadDeletedEvent
Geo::ContainerRepositoryUpdatedEvent].freeze Geo::ContainerRepositoryUpdatedEvent
Geo::Event].freeze
belongs_to :cache_invalidation_event, belongs_to :cache_invalidation_event,
class_name: 'Geo::CacheInvalidationEvent', class_name: 'Geo::CacheInvalidationEvent',
...@@ -71,6 +72,10 @@ module Geo ...@@ -71,6 +72,10 @@ module Geo
class_name: 'Geo::ContainerRepositoryUpdatedEvent', class_name: 'Geo::ContainerRepositoryUpdatedEvent',
foreign_key: :container_repository_updated_event_id foreign_key: :container_repository_updated_event_id
belongs_to :event,
class_name: 'Geo::Event',
foreign_key: :geo_event_id
def self.latest_event def self.latest_event
order(id: :desc).first order(id: :desc).first
end end
......
# frozen_string_literal: true
class Geo::PackageFileRegistry < Geo::BaseRegistry
include ::Delay
STATE_VALUES = {
pending: 0,
started: 1,
synced: 2,
failed: 3
}.freeze
belongs_to :package_file, class_name: 'Packages::PackageFile'
scope :package_file_id_not_in, -> (ids) { where.not(package_file_id: ids) }
scope :never, -> { where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) }
state_machine :state, initial: :pending do
state :pending, value: STATE_VALUES[:pending]
state :started, value: STATE_VALUES[:started]
state :synced, value: STATE_VALUES[:synced]
state :failed, value: STATE_VALUES[:failed]
before_transition any => :started do |registry, _|
registry.last_synced_at = Time.now
end
before_transition any => :pending do |registry, _|
registry.retry_at = 0
registry.retry_count = 0
end
before_transition any => :failed do |registry, _|
registry.retry_count += 1
registry.retry_at = registry.next_retry_time(registry.retry_count)
end
before_transition any => :synced do |registry, _|
registry.retry_count = 0
registry.last_sync_failure = nil
registry.retry_at = nil
end
event :start do
transition [:pending, :synced, :failed] => :started
end
event :synced do
transition [:started] => :synced
end
event :failed do
transition [:started] => :failed
end
event :resync do
transition [:synced, :failed] => :pending
end
end
# @return [Geo::PackageFileRegistry] an instance of this class
def self.for_model_record_id(id)
find_or_initialize_by(package_file_id: id)
end
def self.state_value(state_string)
STATE_VALUES[state_string]
end
# Override state machine failed! event method to record a failure message at
# the same time.
#
# @param [String] message error information
# @param [StandardError] error exception
def failed!(message, error = nil)
self.last_sync_failure = message
self.last_sync_failure += ": #{error.message}" if error.respond_to?(:message)
super()
end
end
# frozen_string_literal: true # frozen_string_literal: true
class Packages::PackageFile < ApplicationRecord class Packages::PackageFile < ApplicationRecord
include UpdateProjectStatistics include UpdateProjectStatistics
include ::Gitlab::Geo::ReplicableModel
delegate :project, :project_id, to: :package delegate :project, :project_id, to: :package
delegate :conan_file_type, to: :conan_file_metadatum delegate :conan_file_type, to: :conan_file_metadatum
...@@ -28,7 +29,10 @@ class Packages::PackageFile < ApplicationRecord ...@@ -28,7 +29,10 @@ class Packages::PackageFile < ApplicationRecord
mount_uploader :file, Packages::PackageFileUploader mount_uploader :file, Packages::PackageFileUploader
with_replicator Geo::PackageFileReplicator
after_save :update_file_store, if: :saved_change_to_file? after_save :update_file_store, if: :saved_change_to_file?
after_create_commit -> { replicator.publish_created_event }
def update_file_store def update_file_store
# The file.object_store is set during `uploader.store!` # The file.object_store is set during `uploader.store!`
......
# frozen_string_literal: true
module Geo
class PackageFileReplicator < Gitlab::Geo::Replicator
include ::Geo::BlobReplicatorStrategy
def carrierwave_uploader
model_record.file
end
private
def model
::Packages::PackageFile
end
end
end
# frozen_string_literal: true
module Geo
class BlobDownloadService
include Gitlab::Geo::LogHelpers
# Imagine a multi-gigabyte LFS object file and an instance on the other side
# of the earth
LEASE_TIMEOUT = 8.hours.freeze
include ExclusiveLeaseGuard
# Initialize a new blob downloader service
#
# @param [Gitlab::Geo::Replicator] replicator instance
def initialize(replicator:)
@replicator = replicator
end
# Downloads a blob from the primary and places it where it should be. And
# records sync status in Registry.
#
# Exits early if another instance is running for the same replicable model.
#
# @return [Boolean] true if synced, false if not
def execute
try_obtain_lease do
start_time = Time.now
registry.start!
download_result = ::Gitlab::Geo::Replication::BlobDownloader.new(replicator: @replicator).execute
mark_as_synced = download_result.success || download_result.primary_missing_file
if mark_as_synced
registry.synced!
else
message = download_result.reason
error = download_result.extra_details&.fetch(:error, nil)
registry.failed!(message, error)
end
log_download(mark_as_synced, download_result, start_time)
!!mark_as_synced
end
end
private
def registry
@registry ||= @replicator.registry
end
def log_download(mark_as_synced, download_result, start_time)
metadata = {
mark_as_synced: mark_as_synced,
download_success: download_result.success,
bytes_downloaded: download_result.bytes_downloaded,
primary_missing_file: download_result.primary_missing_file,
download_time_s: (Time.now - start_time).to_f.round(3),
reason: download_result.reason
}
metadata.merge(download_result.extra_details) if download_result.extra_details
log_info("Blob download", metadata)
end
def lease_key
@lease_key ||= "#{self.class.name.underscore}:#{@replicator.replicable_name}:#{@replicator.model_record.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
# frozen_string_literal: true
module Geo
# Called by Geo::EventWorker to consume the event
class EventService
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
attr_reader :replicable_name, :event_name, :payload
def initialize(replicable_name, event_name, payload)
@replicable_name = replicable_name
@event_name = event_name
@payload = payload
end
def execute
replicator.consume(event_name, payload)
end
private
def replicator
strong_memoize(:replicator) do
model_record_id = payload['model_record_id']
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.new(model_record_id: model_record_id)
end
end
def extra_log_data
{
replicable_name: replicable_name,
event_name: event_name,
payload: payload
}
end
end
end
...@@ -189,6 +189,12 @@ ...@@ -189,6 +189,12 @@
:latency_sensitive: :latency_sensitive:
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
- :name: geo:geo_event
:feature_category: :geo_replication
:has_external_dependencies:
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
- :name: geo:geo_file_download - :name: geo:geo_file_download
:feature_category: :geo_replication :feature_category: :geo_replication
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class EventWorker
include ApplicationWorker
include GeoQueue
sidekiq_options retry: 3, dead: false
def perform(replicable_name, event_name, payload)
Geo::EventService.new(replicable_name, event_name, payload).execute
end
end
end
# frozen_string_literal: true
class CreatePackageFileRegistry < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
create_table :package_file_registry, id: :serial, force: :cascade do |t|
t.integer :package_file_id, null: false
t.integer :state, default: 0, null: false
t.integer :retry_count, default: 0
t.string :last_sync_failure, limit: 255
t.datetime_with_timezone :retry_at
t.datetime_with_timezone :last_synced_at
t.datetime_with_timezone :created_at, null: false
t.index :package_file_id, name: :index_package_file_registry_on_repository_id, using: :btree
t.index :retry_at, name: :index_package_file_registry_on_retry_at, using: :btree
t.index :state, name: :index_package_file_registry_on_state, using: :btree
end
end
end
...@@ -2,15 +2,15 @@ ...@@ -2,15 +2,15 @@
# of editing this file, please use the migrations feature of Active Record to # of editing this file, please use the migrations feature of Active Record to
# incrementally modify your database, and then regenerate this schema definition. # incrementally modify your database, and then regenerate this schema definition.
# #
# Note that this schema.rb definition is the authoritative source for your # This file is the source Rails uses to define your schema when running `rails
# database schema. If you need to create the application database on another # db:schema:load`. When creating a new database, `rails db:schema:load` tends to
# system, you should be using db:schema:load, not running all the migrations # be faster and is potentially less error prone than running all of your
# from scratch. The latter is a flawed and unsustainable approach (the more migrations # migrations from scratch. Old migrations may fail to apply correctly if those
# you'll amass, the slower it'll run and the greater likelihood for issues). # migrations use external dependencies or application code.
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_10_25_194337) do ActiveRecord::Schema.define(version: 2020_01_21_194300) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -91,6 +91,19 @@ ActiveRecord::Schema.define(version: 2019_10_25_194337) do ...@@ -91,6 +91,19 @@ ActiveRecord::Schema.define(version: 2019_10_25_194337) do
t.index ["success"], name: "index_lfs_object_registry_on_success" t.index ["success"], name: "index_lfs_object_registry_on_success"
end end
create_table "package_file_registry", id: :serial, force: :cascade do |t|
t.integer "package_file_id", null: false
t.integer "state", default: 0, null: false
t.integer "retry_count", default: 0
t.string "last_sync_failure", limit: 255
t.datetime_with_timezone "retry_at"
t.datetime_with_timezone "last_synced_at"
t.datetime_with_timezone "created_at", null: false
t.index ["package_file_id"], name: "index_package_file_registry_on_repository_id"
t.index ["retry_at"], name: "index_package_file_registry_on_retry_at"
t.index ["state"], name: "index_package_file_registry_on_state"
end
create_table "project_registry", id: :serial, force: :cascade do |t| create_table "project_registry", id: :serial, force: :cascade do |t|
t.integer "project_id", null: false t.integer "project_id", null: false
t.datetime "last_repository_synced_at" t.datetime "last_repository_synced_at"
......
# frozen_string_literal: true
module Gitlab
module Geo
module LogCursor
module Events
class Event
include BaseEvent
def process
::Geo::EventWorker.perform_async(event.replicable_name, event.event_name, event.payload)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module ReplicableModel
def self.included(klass)
klass.extend(ClassMethods)
end
module ClassMethods
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
RUBY
end
end
# Geo Replicator
#
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
class BlobDownloader
TEMP_PREFIX = 'tmp_'.freeze
attr_reader :replicator
delegate :replicable_name, :model_record, :primary_checksum, :carrierwave_uploader, to: :replicator
delegate :file_storage?, to: :carrierwave_uploader
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :reason, :extra_details
def initialize(success:, bytes_downloaded:, primary_missing_file: false, reason: nil, extra_details: nil)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@reason = reason
@extra_details = extra_details
end
end
def initialize(replicator:)
@replicator = replicator
end
# Download the file to a tempfile, then put it where it belongs.
#
# @return [Result] a result object containing all relevant information
def execute
check_result = check_preconditions
return check_result if check_result
temp_file = open_temp_file
return temp_file if temp_file.is_a?(Result)
begin
result = download_file(resource_url, request_headers, temp_file)
ensure
temp_file.close
temp_file.unlink
end
result
end
# @return [String] URL to download the resource from
def resource_url
# TODO change to Generalized API endpoint after that is implemented
Gitlab::Geo.primary_node.geo_transfers_url(replicable_name, model_record.id.to_s)
end
private
# Encodes data about the requested resource in the authorization header.
# The primary will decode it and compare the decoded data to the
# requested resource. If decoding works and the data makes sense, then
# this proves to the primary that the secondary knows its GeoNode's
# secret_access_key.
#
# @return [Hash] HTTP request headers
def request_headers
request_data = {
replicable_name: replicable_name,
model_record_id: model_record.id
}
TransferRequest.new(request_data).headers
end
# Returns nil if passed preconditions, otherwise returns a Result object
#
# @return [Result] a result object with skipped reason
def check_preconditions
unless Gitlab::Geo.secondary?
return failure_result(reason: 'Skipping transfer as this is not a Secondary node')
end
unless Gitlab::Geo.primary_node
return failure_result(reason: 'Skipping transfer as there is no Primary node to download from')
end
if file_storage?
if File.directory?(absolute_path)
return failure_result(reason: 'Skipping transfer as destination exist and is a directory')
end
unless ensure_destination_path_exists
return failure_result(reason: 'Skipping transfer as we cannot create the destination directory')
end
end
nil
end
def absolute_path
carrierwave_uploader.path
end
def failure_result(bytes_downloaded: 0, primary_missing_file: false, reason: nil, extra_details: nil)
Result.new(success: false, bytes_downloaded: bytes_downloaded, primary_missing_file: primary_missing_file, reason: reason, extra_details: extra_details)
end
# Ensure entire destination path exist or try to create when not available
#
# @return [Boolean] whether destination path exists or could be created
def ensure_destination_path_exists
path = Pathname.new(absolute_path)
dir = path.dirname
return true if File.directory?(dir)
begin
FileUtils.mkdir_p(dir)
rescue => e
log_error("Unable to create directory #{dir}: #{e}")
return false
end
true
end
# Download file from informed URL using HTTP.rb
#
# @return [Result] Object with transfer status and information
def download_file(url, req_headers, temp_file)
file_size = -1
# Make the request
response = ::HTTP.follow.get(url, headers: req_headers)
# Check for failures
unless response.status.success?
return failure_result(primary_missing_file: primary_missing_file?(response), reason: "Non-success HTTP response status code #{response.status.code}", extra_details: { status_code: response.status.code, reason: response.status.reason, url: url })
end
# Stream to temporary file on disk
response.body.each do |chunk|
temp_file.write(chunk)
end
file_size = temp_file.size
# Check for checksum mismatch
if checksum_mismatch?(temp_file.path)
return failure_result(bytes_downloaded: file_size, reason: "Downloaded file checksum mismatch", extra_details: { primary_checksum: primary_checksum, actual_checksum: @actual_checksum })
end
carrierwave_uploader.replace_file_without_saving!(CarrierWave::SanitizedFile.new(temp_file))
Result.new(success: true, bytes_downloaded: [file_size, 0].max)
rescue StandardError => e
failure_result(bytes_downloaded: file_size, reason: "Error downloading file", extra_details: { error: e, url: url })
end
def primary_missing_file?(response)
return false unless response.status.not_found?
return false unless response.content_type.mime_type == 'application/json'
json_response = response.parse
code_file_not_found?(json_response['geo_code'])
rescue JSON::ParserError
false
end
def code_file_not_found?(geo_code)
geo_code == Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE
end
def default_permissions
0666 - File.umask
end
def open_temp_file
if file_storage?
# Make sure the file is in the same directory to prevent moves across filesystems
pathname = Pathname.new(absolute_path)
temp = Tempfile.new(TEMP_PREFIX, pathname.dirname.to_s)
else
temp = Tempfile.new("#{TEMP_PREFIX}-#{replicable_name}-#{model_record.id}")
end
temp.chmod(default_permissions)
temp.binmode
temp
rescue => e
details = { error: e }
details.merge({ absolute_path: absolute_path }) if absolute_path
failure_result(reason: "Error creating temporary file", extra_details: details)
end
# @param [String] file_path disk location to compare checksum mismatch
def checksum_mismatch?(file_path)
# Skip checksum check if primary didn't generate one because, for
# example, large attachments are checksummed asynchronously, and most
# types of artifacts are not checksummed at all at the moment.
return false if primary_checksum.blank?
return false unless Feature.enabled?(:geo_file_transfer_validation, default_enabled: true)
primary_checksum != actual_checksum(file_path)
end
def actual_checksum(file_path)
@actual_checksum = Digest::SHA256.file(file_path).hexdigest
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
class Replicator
include ::Gitlab::Geo::LogHelpers
# Declare supported event
#
# @example Declaring support for :update and :delete events
# class MyReplicator < Gitlab::Geo::Replicator
# event :update
# event :delete
# end
#
# @param [Symbol] event_name
def self.event(event_name)
@events ||= []
@events << event_name.to_sym
end
private_class_method :event
# List supported events
#
# @return [Array<Symbol>] with list of events
def self.supported_events
@events.dup
end
# Check if the replicator supports a specific event
#
# @param [Boolean] event_name
def self.event_supported?(event_name)
@events.include?(event_name.to_sym)
end
# Return the name of the replicator
#
# @return [String] name
def self.replicable_name
self.name.demodulize.sub('Replicator', '').underscore
end
def self.registry_class
const_get("::Geo::#{replicable_name.camelize}Registry", false)
end
def self.for_replicable_name(replicable_name)
replicator_class_name = "::Geo::#{replicable_name.camelize}Replicator"
const_get(replicator_class_name, false)
end
def initialize(model_record: nil, model_record_id: nil)
@model_record = model_record
@model_record_id = model_record_id
end
def model_record
if defined?(@model_record) && @model_record
return @model_record
end
if model_record_id
@model_record = model.find(model_record_id)
end
end
def publish(event_name, **event_data)
return unless Feature.enabled?(:geo_self_service_framework)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
create_event_with(
class_name: ::Geo::Event,
replicable_name: self.class.replicable_name,
event_name: event_name,
payload: event_data
)
end
# Consume an event, using the published contextual data
#
# This method is called by the GeoLogCursor when reading the event from the queue
#
# @param [Symbol] event_name
# @param [Hash] params contextual data published with the event
def consume(event_name, **params)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
consume_method = "consume_#{event_name}".to_sym
raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless instance_method_defined?(consume_method)
# Inject model_record based on included class
if model_record
params[:model_record] = model_record
end
send(consume_method, **params) # rubocop:disable GitlabSecurity/PublicSend
end
def replicable_name
self.class.replicable_name
end
def registry_class
self.class.registry_class
end
def registry
registry_class.for_model_record_id(model_record.id)
end
def primary_checksum
nil
end
protected
# Store an event on the database
#
# @example Create an event
# create_event_with(class_name: Geo::CacheInvalidationEvent, key: key)
#
# @param [Class] class_name a ActiveRecord class that's used to store an event for Geo
# @param [Hash] **params context information that will be stored in the event table
# @return [ApplicationRecord] event instance that was just persisted
def create_event_with(class_name:, **params)
return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any?
event = class_name.create!(**params)
# Only works with the new geo_events at the moment because we need to
# know which foreign key to use
::Geo::EventLog.create!(geo_event_id: event)
event
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params)
end
private
# Checks if method is implemented by current class (ignoring inherited methods)
#
# @param [Symbol] method_name
# @return [Boolean] whether method is implemented
def instance_method_defined?(method_name)
self.class.instance_methods(false).include?(method_name)
end
end
end
end
# frozen_string_literal: true
FactoryBot.define do
factory :geo_event, class: 'Geo::Event' do
replicable_name { 'package_file' }
event_name { 'created' }
trait :package_file do
payload do
{ model_record_id: create(:package_file, :pom).id }
end
end
end
end
...@@ -53,6 +53,10 @@ FactoryBot.define do ...@@ -53,6 +53,10 @@ FactoryBot.define do
trait :design_repository_updated_event do trait :design_repository_updated_event do
repository_updated_event factory: :geo_design_repository_updated_event repository_updated_event factory: :geo_design_repository_updated_event
end end
trait :event do
event factory: :geo_event
end
end end
factory :geo_repository_created_event, class: 'Geo::RepositoryCreatedEvent' do factory :geo_repository_created_event, class: 'Geo::RepositoryCreatedEvent' do
......
# frozen_string_literal: true
FactoryBot.define do
factory :package_file_registry, class: 'Geo::PackageFileRegistry' do
association :package_file, factory: [:package_file, :npm]
last_sync_failure { nil }
last_synced_at { nil }
state { Geo::PackageFileRegistry.state_value(:pending) }
trait :synced do
state { Geo::PackageFileRegistry.state_value(:synced) }
last_synced_at { 5.days.ago }
end
trait :failed do
state { Geo::PackageFileRegistry.state_value(:failed) }
last_synced_at { 1.day.ago }
retry_count { 2 }
last_sync_failure { 'Random error' }
end
trait :started do
state { Geo::PackageFileRegistry.state_value(:started) }
last_synced_at { 1.day.ago }
retry_count { 0 }
end
end
end
# frozen_string_literal: true
require "spec_helper"
describe Gitlab::Geo::LogCursor::Events::Event, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event) { create(:geo_event, :package_file, event_name: "created" ) }
let(:event_log) { create(:geo_event_log, event: event) }
let(:replicable) { Packages::PackageFile.find(event.payload["model_record_id"]) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(event, Time.now, logger) }
describe "#process" do
it "enqueues Geo::EventWorker" do
expect(::Geo::EventWorker).to receive(:perform_async).with(
"package_file",
"created",
{ "model_record_id" => replicable.id }
)
subject.process
end
it "eventually calls Replicator#consume", :sidekiq_inline do
expect_next_instance_of(::Geo::PackageFileReplicator) do |replicator|
expect(replicator).to receive(:consume).with(
"created",
{ "model_record_id" => replicable.id }
)
end
subject.process
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Geo::Replication::BlobDownloader do
include ::EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
let(:replicator) { model_record.replicator }
subject { described_class.new(replicator: replicator) }
describe '#execute' do
before do
stub_current_geo_node(secondary)
end
context 'precondition failures' do
context 'not a Geo secondary' do
it 'returns failure' do
stub_current_geo_node(primary)
result = subject.execute
expect(result.success).to be_falsey
end
end
context 'no Geo primary exists' do
it 'returns failure' do
primary.update!(primary: false)
result = subject.execute
expect(result.success).to be_falsey
end
end
context 'when the file is locally stored' do
context 'when the file destination is already taken by a directory' do
it 'returns failure' do
path = replicator.carrierwave_uploader.path
expect(File).to receive(:directory?).with(path).and_return(true)
result = subject.execute
expect(result.success).to be_falsey
end
end
xit 'ensures the file destination directory exists' # Not worth testing here as-is. Extract the functionality first.
end
end
context 'when an error occurs while getting a Tempfile' do
it 'returns failure' do
subject
expect(Tempfile).to receive(:new).and_raise('boom')
result = subject.execute
expect(result.success).to be_falsey
expect(result.extra_details).to have_key(:error)
end
end
context 'when the HTTP response is unsuccessful' do
context 'when the HTTP response indicates a missing file on the primary' do
it 'returns a failed result indicating primary_missing_file' do
stub_request(:get, subject.resource_url)
.to_return(status: 404,
headers: { content_type: 'application/json' },
body: { geo_code: Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE }.to_json)
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: 0, primary_missing_file: true)
end
end
context 'when the HTTP response does not indicate a missing file on the primary' do
it 'returns a failed result' do
stub_request(:get, subject.resource_url)
.to_return(status: 404,
headers: { content_type: 'application/json' },
body: 'Not found')
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: 0, primary_missing_file: false)
end
end
end
context 'when the HTTP response is successful' do
it 'returns success' do
path = replicator.carrierwave_uploader.path
content = replicator.carrierwave_uploader.file.read
size = content.bytesize
stub_request(:get, subject.resource_url).to_return(status: 200, body: content)
result = subject.execute
stat = File.stat(path)
expect_blob_downloader_result(result, success: true, bytes_downloaded: size, primary_missing_file: false)
expect(stat.size).to eq(size)
expect(stat.mode & 0777).to eq(0666 - File.umask)
expect(File.binread(path)).to eq(content)
end
context 'when the checksum of the downloaded file does not match' do
it 'returns a failed result' do
allow(replicator).to receive(:primary_checksum).and_return('something')
bad_content = 'corrupted!!!'
stub_request(:get, subject.resource_url)
.to_return(status: 200, body: bad_content)
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: bad_content.bytesize, primary_missing_file: false)
end
end
context 'when the primary has not stored a checksum for the file' do
it 'returns a successful result' do
expect(replicator).to receive(:primary_checksum).and_return(nil)
content = 'foo'
stub_request(:get, subject.resource_url)
.to_return(status: 200, body: content)
result = subject.execute
expect_blob_downloader_result(result, success: true, bytes_downloaded: content.bytesize, primary_missing_file: false)
end
end
end
end
def expect_blob_downloader_result(result, success:, bytes_downloaded:, primary_missing_file:, extra_details: nil)
expect(result.success).to eq(success)
expect(result.bytes_downloaded).to eq(bytes_downloaded)
expect(result.primary_missing_file).to eq(primary_missing_file)
# Sanity check to help ensure a valid test
expect(success).not_to be_nil
expect(primary_missing_file).not_to be_nil
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Geo::Replicator do
context 'with defined events' do
class DummyReplicator < Gitlab::Geo::Replicator
event :test
event :another_test
protected
def publish_test(other:)
true
end
end
context 'event DSL' do
subject { DummyReplicator }
describe '.supported_events' do
it 'expects :test event to be supported' do
expect(subject.supported_events).to match_array([:test, :another_test])
end
end
describe '.event_supported?' do
it 'expects a supported event to return true' do
expect(subject.event_supported?(:test)).to be_truthy
end
it 'expect an unsupported event to return false' do
expect(subject.event_supported?(:something_else)).to be_falsey
end
end
end
context 'model DSL' do
class DummyModel
include ActiveModel::Model
include Gitlab::Geo::ReplicableModel
with_replicator DummyReplicator
end
subject { DummyModel.new }
it 'adds replicator method to the model' do
expect(subject).respond_to? :replicator
end
it 'instantiates a replicator into the model' do
expect(subject.replicator).to be_a(DummyReplicator)
end
end
describe '#publish' do
subject { DummyReplicator.new }
context 'when geo_self_service_framework feature is disabled' do
before do
stub_feature_flags(geo_self_service_framework: false)
end
it 'returns nil' do
expect(subject.publish(:test, other: true)).to be_nil
end
it 'does not call create_event' do
expect(subject).not_to receive(:create_event_with)
subject.publish(:test, other: true)
end
end
context 'when publishing a supported events with required params' do
it 'does not raise errors' do
expect { subject.publish(:test, other: true) }.not_to raise_error
end
end
context 'when publishing unsupported event' do
it 'raises an argument error' do
expect { subject.publish(:unsupported) }.to raise_error(ArgumentError)
end
end
end
end
end
...@@ -18,7 +18,7 @@ describe 'Every Geo event' do ...@@ -18,7 +18,7 @@ describe 'Every Geo event' do
geo = root.join('geo') geo = root.join('geo')
events = Dir[geo.join('**', '*.rb')] events = Dir[geo.join('**', '*.rb')]
.select { |path| path.end_with?('_event.rb') } .select { |path| path.end_with?('event.rb') }
events.map! do |path| events.map! do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '') ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::PackageFileRegistry, :geo, type: :model do
let_it_be(:registry) { create(:package_file_registry) }
specify 'factory is valid' do
expect(registry).to be_valid
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::PackageFileReplicator do
include EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:model_record) { create(:package_file, :npm) }
subject { described_class.new(model_record: model_record) }
before do
stub_current_geo_node(primary)
end
describe '#publish_created_event' do
it "creates a Geo::Event" do
expect do
subject.publish_created_event
end.to change { ::Geo::Event.count }.by(1)
expect(::Geo::Event.last.attributes).to include("replicable_name" => "package_file", "event_name" => "created", "payload" => { "model_record_id" => model_record.id })
end
end
describe '#consume_created_event' do
it 'invokes Geo::BlobDownloadService' do
service = double(:service)
expect(service).to receive(:execute)
expect(::Geo::BlobDownloadService).to receive(:new).with(replicator: subject).and_return(service)
subject.consume_created_event
end
end
end
# frozen_string_literal: true
require "spec_helper"
describe Geo::BlobDownloadService do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
let(:replicator) { model_record.replicator }
let(:registry_class) { replicator.registry_class }
subject { described_class.new(replicator: replicator) }
before do
stub_current_geo_node(secondary)
end
describe "#execute" do
let(:downloader) { double(:downloader) }
before do
expect(downloader).to receive(:execute).and_return(result)
expect(::Gitlab::Geo::Replication::BlobDownloader).to receive(:new).and_return(downloader)
end
context "when it can obtain the exclusive lease" do
context "when the registry record does not exist" do
context "when the downloader returns success" do
let(:result) { double(:result, success: true, primary_missing_file: false, bytes_downloaded: 123, reason: nil, extra_details: nil) }
it "creates the registry" do
expect do
subject.execute
end.to change { registry_class.count }.by(1)
end
it "sets sync state to synced" do
subject.execute
expect(registry_class.last).to be_synced
end
end
context "when the downloader returns failure" do
let(:result) { double(:result, success: false, primary_missing_file: false, bytes_downloaded: 123, reason: "foo", extra_details: nil) }
it "creates the registry" do
expect do
subject.execute
end.to change { registry_class.count }.by(1)
end
it "sets sync state to failed" do
subject.execute
expect(registry_class.last).to be_failed
end
end
end
end
end
end
# frozen_string_literal: true
require "spec_helper"
describe Geo::EventWorker, :geo do
describe "#perform" do
it "calls Geo::EventService" do
args = ["package_file", "created", { "model_record_id" => 1 }]
service = double(:service)
expect(service).to receive(:execute)
expect(::Geo::EventService).to receive(:new).with(*args).and_return(service)
described_class.new.perform(*args)
end
end
end
...@@ -27,6 +27,7 @@ module Quality ...@@ -27,6 +27,7 @@ module Quality
policies policies
presenters presenters
rack_servers rack_servers
replicators
routing routing
rubocop rubocop
serializers serializers
......
...@@ -21,7 +21,7 @@ RSpec.describe Quality::TestLevel do ...@@ -21,7 +21,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do context 'when level is unit' do
it 'returns a pattern' do it 'returns a pattern' do
expect(subject.pattern(:unit)) expect(subject.pattern(:unit))
.to eq("spec/{bin,config,db,dependencies,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,routing,rubocop,serializers,services,sidekiq,tasks,uploaders,validators,views,workers,elastic_integration}{,/**/}*_spec.rb") .to eq("spec/{bin,config,db,dependencies,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,replicators,routing,rubocop,serializers,services,sidekiq,tasks,uploaders,validators,views,workers,elastic_integration}{,/**/}*_spec.rb")
end end
end end
...@@ -82,7 +82,7 @@ RSpec.describe Quality::TestLevel do ...@@ -82,7 +82,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do context 'when level is unit' do
it 'returns a regexp' do it 'returns a regexp' do
expect(subject.regexp(:unit)) expect(subject.regexp(:unit))
.to eq(%r{spec/(bin|config|db|dependencies|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|routing|rubocop|serializers|services|sidekiq|tasks|uploaders|validators|views|workers|elastic_integration)}) .to eq(%r{spec/(bin|config|db|dependencies|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|replicators|routing|rubocop|serializers|services|sidekiq|tasks|uploaders|validators|views|workers|elastic_integration)})
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment