Commit ba75b82a authored by Valery Sizov's avatar Valery Sizov

Docker Regitry event handling

This is the first part of Docker Registry replication
for secondary Geo node.
parent 20af759e
...@@ -86,4 +86,9 @@ class ContainerRepository < ApplicationRecord ...@@ -86,4 +86,9 @@ class ContainerRepository < ApplicationRecord
def self.build_root_repository(project) def self.build_root_repository(project)
self.new(project: project, name: '') self.new(project: project, name: '')
end end
def self.find_by_path!(path)
self.find_by!(project: path.repository_project,
name: path.repository_name)
end
end end
...@@ -400,6 +400,15 @@ production: &base ...@@ -400,6 +400,15 @@ production: &base
# path: shared/registry # path: shared/registry
# issuer: gitlab-issuer # issuer: gitlab-issuer
# Add notification settings if you plan to use Geo Replication for the registry
# notifications:
# - name: geo_event
# url: https://example.com/api/v4/container_registry_event/events
# timeout: 2s
# threshold: 5
# backoff: 1s
# headers:
# Authorization: secret_phrase
## Error Reporting and Logging with Sentry ## Error Reporting and Logging with Sentry
sentry: sentry:
......
...@@ -259,6 +259,7 @@ Settings.registry['key'] ||= nil ...@@ -259,6 +259,7 @@ Settings.registry['key'] ||= nil
Settings.registry['issuer'] ||= nil Settings.registry['issuer'] ||= nil
Settings.registry['host_port'] ||= [Settings.registry['host'], Settings.registry['port']].compact.join(':') Settings.registry['host_port'] ||= [Settings.registry['host'], Settings.registry['port']].compact.join(':')
Settings.registry['path'] = Settings.absolute(Settings.registry['path'] || File.join(Settings.shared['path'], 'registry')) Settings.registry['path'] = Settings.absolute(Settings.registry['path'] || File.join(Settings.shared['path'], 'registry'))
Settings.registry['notifications'] ||= []
# #
# Error Reporting and Logging with Sentry # Error Reporting and Logging with Sentry
......
# frozen_string_literal: true
class AddGeoContainerRepositoryUpdatedEventsTable < ActiveRecord::Migration[5.1]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
create_table :geo_container_repository_updated_events, force: :cascade do |t|
t.integer :container_repository_id, null: false
t.index :container_repository_id, name: :idx_geo_con_rep_updated_events_on_container_repository_id, using: :btree
end
add_column :geo_event_log, :container_repository_updated_event_id, :bigint
end
end
# frozen_string_literal: true
class AddIndexToGeoEventLog < ActiveRecord::Migration[5.1]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_index :geo_event_log, :container_repository_updated_event_id
end
def down
remove_concurrent_index(:geo_event_log, :container_repository_updated_event_id)
end
end
# frozen_string_literal: true
class AddForeignKeysForContainerRepository < ActiveRecord::Migration[5.1]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_foreign_key(:geo_container_repository_updated_events, :container_repositories, column: :container_repository_id, on_delete: :cascade)
add_concurrent_foreign_key(:geo_event_log, :geo_container_repository_updated_events, column: :container_repository_updated_event_id, on_delete: :cascade)
end
def down
if foreign_key_exists?(:geo_container_repository_updated_events, :container_repositories)
remove_foreign_key(:geo_container_repository_updated_events, :container_repositories)
end
if foreign_key_exists?(:geo_event_log, :geo_container_repository_updated_events)
remove_foreign_key(:geo_event_log, :geo_container_repository_updated_events)
end
end
end
...@@ -1275,6 +1275,11 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do ...@@ -1275,6 +1275,11 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do
t.string "key", null: false t.string "key", null: false
end end
create_table "geo_container_repository_updated_events", force: :cascade do |t|
t.integer "container_repository_id", null: false
t.index ["container_repository_id"], name: "idx_geo_con_rep_updated_events_on_container_repository_id"
end
create_table "geo_event_log", force: :cascade do |t| create_table "geo_event_log", force: :cascade do |t|
t.datetime "created_at", null: false t.datetime "created_at", null: false
t.bigint "repository_updated_event_id" t.bigint "repository_updated_event_id"
...@@ -1289,7 +1294,9 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do ...@@ -1289,7 +1294,9 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do
t.bigint "job_artifact_deleted_event_id" t.bigint "job_artifact_deleted_event_id"
t.bigint "reset_checksum_event_id" t.bigint "reset_checksum_event_id"
t.bigint "cache_invalidation_event_id" t.bigint "cache_invalidation_event_id"
t.bigint "container_repository_updated_event_id"
t.index ["cache_invalidation_event_id"], name: "index_geo_event_log_on_cache_invalidation_event_id", where: "(cache_invalidation_event_id IS NOT NULL)" t.index ["cache_invalidation_event_id"], name: "index_geo_event_log_on_cache_invalidation_event_id", where: "(cache_invalidation_event_id IS NOT NULL)"
t.index ["container_repository_updated_event_id"], name: "index_geo_event_log_on_container_repository_updated_event_id"
t.index ["hashed_storage_attachments_event_id"], name: "index_geo_event_log_on_hashed_storage_attachments_event_id", where: "(hashed_storage_attachments_event_id IS NOT NULL)" t.index ["hashed_storage_attachments_event_id"], name: "index_geo_event_log_on_hashed_storage_attachments_event_id", where: "(hashed_storage_attachments_event_id IS NOT NULL)"
t.index ["hashed_storage_migrated_event_id"], name: "index_geo_event_log_on_hashed_storage_migrated_event_id", where: "(hashed_storage_migrated_event_id IS NOT NULL)" t.index ["hashed_storage_migrated_event_id"], name: "index_geo_event_log_on_hashed_storage_migrated_event_id", where: "(hashed_storage_migrated_event_id IS NOT NULL)"
t.index ["job_artifact_deleted_event_id"], name: "index_geo_event_log_on_job_artifact_deleted_event_id", where: "(job_artifact_deleted_event_id IS NOT NULL)" t.index ["job_artifact_deleted_event_id"], name: "index_geo_event_log_on_job_artifact_deleted_event_id", where: "(job_artifact_deleted_event_id IS NOT NULL)"
...@@ -3700,7 +3707,9 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do ...@@ -3700,7 +3707,9 @@ ActiveRecord::Schema.define(version: 2019_07_15_114644) do
add_foreign_key "fork_network_members", "projects", on_delete: :cascade add_foreign_key "fork_network_members", "projects", on_delete: :cascade
add_foreign_key "fork_networks", "projects", column: "root_project_id", name: "fk_e7b436b2b5", on_delete: :nullify add_foreign_key "fork_networks", "projects", column: "root_project_id", name: "fk_e7b436b2b5", on_delete: :nullify
add_foreign_key "forked_project_links", "projects", column: "forked_to_project_id", name: "fk_434510edb0", on_delete: :cascade add_foreign_key "forked_project_links", "projects", column: "forked_to_project_id", name: "fk_434510edb0", on_delete: :cascade
add_foreign_key "geo_container_repository_updated_events", "container_repositories", name: "fk_212c89c706", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_cache_invalidation_events", column: "cache_invalidation_event_id", name: "fk_42c3b54bed", on_delete: :cascade add_foreign_key "geo_event_log", "geo_cache_invalidation_events", column: "cache_invalidation_event_id", name: "fk_42c3b54bed", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_container_repository_updated_events", column: "container_repository_updated_event_id", name: "fk_6ada82d42a", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_job_artifact_deleted_events", column: "job_artifact_deleted_event_id", name: "fk_176d3fbb5d", on_delete: :cascade add_foreign_key "geo_event_log", "geo_job_artifact_deleted_events", column: "job_artifact_deleted_event_id", name: "fk_176d3fbb5d", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade
......
# frozen_string_literal: true
module Geo
class ContainerRepositoryUpdatedEvent < ApplicationRecord
include Geo::Model
include Geo::Eventable
belongs_to :container_repository
validates :container_repository, presence: true
end
end
...@@ -16,7 +16,8 @@ module Geo ...@@ -16,7 +16,8 @@ module Geo
Geo::HashedStorageAttachmentsEvent Geo::HashedStorageAttachmentsEvent
Geo::LfsObjectDeletedEvent Geo::LfsObjectDeletedEvent
Geo::JobArtifactDeletedEvent Geo::JobArtifactDeletedEvent
Geo::UploadDeletedEvent].freeze Geo::UploadDeletedEvent
Geo::ContainerRepositoryUpdatedEvent].freeze
belongs_to :cache_invalidation_event, belongs_to :cache_invalidation_event,
class_name: 'Geo::CacheInvalidationEvent', class_name: 'Geo::CacheInvalidationEvent',
...@@ -66,6 +67,10 @@ module Geo ...@@ -66,6 +67,10 @@ module Geo
class_name: 'Geo::ResetChecksumEvent', class_name: 'Geo::ResetChecksumEvent',
foreign_key: :reset_checksum_event_id foreign_key: :reset_checksum_event_id
belongs_to :container_repository_updated_event,
class_name: 'Geo::ContainerRepositoryUpdatedEvent',
foreign_key: :container_repository_updated_event_id
def self.latest_event def self.latest_event
order(id: :desc).first order(id: :desc).first
end end
...@@ -97,7 +102,8 @@ module Geo ...@@ -97,7 +102,8 @@ module Geo
job_artifact_deleted_event || job_artifact_deleted_event ||
upload_deleted_event || upload_deleted_event ||
reset_checksum_event || reset_checksum_event ||
cache_invalidation_event cache_invalidation_event ||
container_repository_updated_event
end end
def project_id def project_id
......
# frozen_string_literal: true
module Geo
class ContainerRepositoryUpdatedEventStore < EventStore
self.event_type = :container_repository_updated_event
attr_reader :repository
def initialize(repository)
@repository = repository
end
private
def build_event
Geo::ContainerRepositoryUpdatedEvent.new(
container_repository: repository
)
end
# This is called by ProjectLogHelpers to build json log with context info
#
# @see ::Gitlab::Geo::ProjectLogHelpers
def base_log_data(message)
{
class: self.class.name,
container_repository_id: repository.try(:id),
message: message
}.compact
end
end
end
# frozen_string_literal: true
module API
class ContainerRegistryEvent < Grape::API
DOCKER_DISTRIBUTION_EVENTS_V1_JSON = 'application/vnd.docker.distribution.events.v1+json'
before { authenticate_registry_notification! }
resource :container_registry_event do
helpers do
def authenticate_registry_notification!
unauthorized! unless Feature.enabled?(:geo_registry_replication)
endpoint = Gitlab.config.registry.notifications.find { |e| e['name'] == 'geo_event'}
secret_token = endpoint['headers']['Authorization']
unauthorized! unless Devise.secure_compare(secret_token, headers['Authorization'])
end
end
# Docker Registry sends data in a body of the request as JSON string,
# by setting 'content_type' we make Grape to parse it automatically
content_type :json, DOCKER_DISTRIBUTION_EVENTS_V1_JSON
format :json
params do
requires :events, type: Array
end
# This endpoint is used by Docker Registry to push a set of event
# that took place recently.
post 'events' do
::ContainerRegistry::EventHandler.new(params['events']).execute
status :ok
end
end
end
end
# frozen_string_literal: true
module ContainerRegistry
class EventHandler
attr_reader :events
def initialize(events)
@events = events
end
def execute
events.each do |event|
handle_push_event(event) if event['action'] == 'push'
end
end
private
def handle_push_event(event)
return unless manifest_push?(event)
::Geo::ContainerRepositoryUpdatedEventStore.new(find_repository!(event)).create!
end
def manifest_push?(event)
event['target']['mediaType'] =~ /manifest/
end
def find_repository!(event)
repository_name = event['target']['repository']
path = ContainerRegistry::Path.new(repository_name)
ContainerRepository.find_by_path!(path)
end
end
end
...@@ -14,6 +14,7 @@ module EE ...@@ -14,6 +14,7 @@ module EE
mount ::API::EpicIssues mount ::API::EpicIssues
mount ::API::EpicLinks mount ::API::EpicLinks
mount ::API::Epics mount ::API::Epics
mount ::API::ContainerRegistryEvent
mount ::API::Geo mount ::API::Geo
mount ::API::GeoNodes mount ::API::GeoNodes
mount ::API::IssueLinks mount ::API::IssueLinks
......
# frozen_string_literal: true
module Gitlab
module Geo
module LogCursor
module Events
class ContainerRepositoryUpdatedEvent
include BaseEvent
def process
end
end
end
end
end
end
...@@ -154,4 +154,8 @@ FactoryBot.define do ...@@ -154,4 +154,8 @@ FactoryBot.define do
factory :geo_cache_invalidation_event, class: Geo::CacheInvalidationEvent do factory :geo_cache_invalidation_event, class: Geo::CacheInvalidationEvent do
sequence(:key) { |n| "cache-key-#{n}" } sequence(:key) { |n| "cache-key-#{n}" }
end end
factory :geo_container_repository_updated_event, class: Geo::ContainerRepositoryUpdatedEvent do
container_repository
end
end end
# frozen_string_literal: true
require 'spec_helper'
describe ContainerRegistry::EventHandler do
include ::EE::GeoHelpers
let(:container_repository) { create(:container_repository) }
let(:event_target) do
{ 'mediaType' => 'application/vnd.docker.distribution.manifest.v2+json', 'repository' => container_repository.path }
end
set(:primary_node) { create(:geo_node, :primary) }
set(:secondary_node) { create(:geo_node) }
before do
stub_current_geo_node(primary_node)
end
it 'creates event' do
push_event = { action: 'push', target: event_target }.with_indifferent_access
expect { described_class.new([push_event]).execute }
.to change { ::Geo::ContainerRepositoryUpdatedEvent.count }.by(1)
end
it 'ignores non-push events' do
pull_event = { action: 'pull', target: event_target }.with_indifferent_access
expect { described_class.new([pull_event]).execute }
.to change { ::Geo::ContainerRepositoryUpdatedEvent.count }.by(0)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::ContainerRepositoryUpdatedEvent, type: :model do
describe 'relationships' do
it { is_expected.to belong_to(:container_repository).class_name('ContainerRepository') }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:container_repository) }
end
end
...@@ -13,6 +13,7 @@ RSpec.describe Geo::EventLog, type: :model do ...@@ -13,6 +13,7 @@ RSpec.describe Geo::EventLog, type: :model do
it { is_expected.to belong_to(:hashed_storage_attachments_event).class_name('Geo::HashedStorageAttachmentsEvent').with_foreign_key('hashed_storage_attachments_event_id') } it { is_expected.to belong_to(:hashed_storage_attachments_event).class_name('Geo::HashedStorageAttachmentsEvent').with_foreign_key('hashed_storage_attachments_event_id') }
it { is_expected.to belong_to(:lfs_object_deleted_event).class_name('Geo::LfsObjectDeletedEvent').with_foreign_key('lfs_object_deleted_event_id') } it { is_expected.to belong_to(:lfs_object_deleted_event).class_name('Geo::LfsObjectDeletedEvent').with_foreign_key('lfs_object_deleted_event_id') }
it { is_expected.to belong_to(:job_artifact_deleted_event).class_name('Geo::JobArtifactDeletedEvent').with_foreign_key('job_artifact_deleted_event_id') } it { is_expected.to belong_to(:job_artifact_deleted_event).class_name('Geo::JobArtifactDeletedEvent').with_foreign_key('job_artifact_deleted_event_id') }
it { is_expected.to belong_to(:container_repository_updated_event).class_name('Geo::ContainerRepositoryUpdatedEvent').with_foreign_key('container_repository_updated_event_id') }
end end
describe '.next_unprocessed_event' do describe '.next_unprocessed_event' do
......
# frozen_string_literal: true
require 'spec_helper'
describe API::ContainerRegistryEvent do
let(:secret_token) { 'secret_token' }
let(:events) { [{ action: 'push' }] }
let(:registry_headers) { { 'Content-Type' => ::API::ContainerRegistryEvent::DOCKER_DISTRIBUTION_EVENTS_V1_JSON } }
describe 'POST /container_registry_event/events' do
before do
stub_registry_endpoints_configuration([{
name: 'geo_event',
headers: { 'Authorization' => secret_token }
}.with_indifferent_access])
end
it 'returns 200 status and events are passed to event handler' do
handler = spy(:handle)
allow(::ContainerRegistry::EventHandler).to receive(:new).with(events).and_return(handler)
post api('/container_registry_event/events'),
params: { events: events }.to_json,
headers: registry_headers.merge('Authorization' => secret_token)
expect(handler).to have_received(:execute).once
expect(response.status).to eq 200
end
it 'returns 401 error status when token is invalid' do
post api('/container_registry_event/events'),
params: { events: events }.to_json,
headers: registry_headers.merge('Authorization' => 'invalid_token')
expect(response.status).to eq 401
end
it 'returns 401 error status when feature is disabled' do
allow(Feature).to receive(:enabled?).with(:geo_registry_replication).and_return(false)
expect(::ContainerRegistry::EventHandler).not_to receive(:new)
post api('/container_registry_event/events'),
params: { events: events }.to_json,
headers: registry_headers.merge('Authorization' => secret_token)
expect(response.status).to eq 401
end
def stub_registry_endpoints_configuration(configuration)
allow(Gitlab.config.registry).to receive(:notifications) { configuration }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositoryUpdatedEventStore do
include EE::GeoHelpers
set(:secondary_node) { create(:geo_node) }
let(:container_repository) { create :container_repository }
subject { described_class.new(container_repository) }
describe '#create' do
it_behaves_like 'a Geo event store', Geo::ContainerRepositoryUpdatedEvent
context 'when running on a primary node' do
before do
stub_primary_node
end
it 'refers to a container repository' do
subject.create!
expect(Geo::ContainerRepositoryUpdatedEvent.last).to have_attributes(container_repository: container_repository)
end
it 'logs an error message when event creation fail' do
subject = described_class.new(nil)
expected_message = {
class: described_class.name,
message: 'Container repository updated event could not be created',
error: "Validation failed: Container repository can't be blank"
}
expect(Gitlab::Geo::Logger).to receive(:error).with(expected_message).and_call_original
subject.create!
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment