Commit 2571f08e authored by Valery Sizov's avatar Valery Sizov Committed by Michael Kozono

Add service to checksum PackageFiles on primary

This adds the functionality in a context of new
self-service framework
parent 537bbde3
# frozen_string_literal: true
class AddVerificationColumnsToPackages < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :packages_package_files, :verification_retry_at, :datetime_with_timezone
add_column :packages_package_files, :verified_at, :datetime_with_timezone
add_column :packages_package_files, :verification_checksum, :string, limit: 255
add_column :packages_package_files, :verification_failure, :string, limit: 255
add_column :packages_package_files, :verification_retry_count, :integer
end
end
...@@ -4364,6 +4364,11 @@ CREATE TABLE public.packages_package_files ( ...@@ -4364,6 +4364,11 @@ CREATE TABLE public.packages_package_files (
file_sha1 bytea, file_sha1 bytea,
file_name character varying NOT NULL, file_name character varying NOT NULL,
file text NOT NULL, file text NOT NULL,
verification_retry_at timestamp with time zone,
verified_at timestamp with time zone,
verification_checksum character varying(255),
verification_failure character varying(255),
verification_retry_count integer,
file_sha256 bytea file_sha256 bytea
); );
...@@ -12720,6 +12725,7 @@ COPY "schema_migrations" (version) FROM STDIN; ...@@ -12720,6 +12725,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200212133945 20200212133945
20200212134201 20200212134201
20200213093702 20200213093702
20200213100530
20200213155311 20200213155311
20200213204737 20200213204737
20200213220159 20200213220159
......
...@@ -174,7 +174,7 @@ For example, to add support for files referenced by a `Widget` model with a ...@@ -174,7 +174,7 @@ For example, to add support for files referenced by a `Widget` model with a
def change def change
add_column :widgets, :verification_retry_at, :datetime_with_timezone add_column :widgets, :verification_retry_at, :datetime_with_timezone
add_column :widgets, :last_verification_ran_at, :datetime_with_timezone add_column :widgets, :verified_at, :datetime_with_timezone
add_column :widgets, :verification_checksum, :string add_column :widgets, :verification_checksum, :string
add_column :widgets, :verification_failure, :string add_column :widgets, :verification_failure, :string
add_column :widgets, :verification_retry_count, :integer add_column :widgets, :verification_retry_count, :integer
......
...@@ -4,6 +4,9 @@ module Geo ...@@ -4,6 +4,9 @@ module Geo
module BlobReplicatorStrategy module BlobReplicatorStrategy
extend ActiveSupport::Concern extend ActiveSupport::Concern
include Delay
include Gitlab::Geo::LogHelpers
included do included do
event :created event :created
end end
...@@ -13,6 +16,10 @@ module Geo ...@@ -13,6 +16,10 @@ module Geo
def handle_after_create_commit def handle_after_create_commit
publish(:created, **created_params) publish(:created, **created_params)
return unless Feature.enabled?(:geo_self_service_framework)
schedule_checksum_calculation if needs_checksum?
end end
# Called by Gitlab::Geo::Replicator#consume # Called by Gitlab::Geo::Replicator#consume
...@@ -24,14 +31,49 @@ module Geo ...@@ -24,14 +31,49 @@ module Geo
raise NotImplementedError raise NotImplementedError
end end
def calculate_checksum!
checksum = model_record.calculate_checksum!
update_verification_state!(checksum: checksum)
rescue => e
log_error('Error calculating the checksum', e)
update_verification_state!(failure: e.message)
end
private private
def update_verification_state!(checksum: nil, failure: nil)
retry_at, retry_count = calculate_next_retry_attempt if failure.present?
model_record.update!(
verification_checksum: checksum,
verified_at: Time.now,
verification_failure: failure,
verification_retry_at: retry_at,
verification_retry_count: retry_count
)
end
def calculate_next_retry_attempt
retry_count = model_record.verification_retry_count.to_i + 1
[next_retry_time(retry_count), retry_count]
end
def download def download
::Geo::BlobDownloadService.new(replicator: self).execute ::Geo::BlobDownloadService.new(replicator: self).execute
end end
def schedule_checksum_calculation
Geo::BlobVerificationPrimaryWorker.perform_async(replicable_name, model_record.id)
end
def created_params def created_params
{ model_record_id: model_record.id } { model_record_id: model_record.id }
end end
def needs_checksum?
return true unless model_record.respond_to?(:needs_checksum?)
model_record.needs_checksum?
end
end end
end end
...@@ -55,4 +55,8 @@ class Packages::PackageFile < ApplicationRecord ...@@ -55,4 +55,8 @@ class Packages::PackageFile < ApplicationRecord
def download_path def download_path
Gitlab::Routing.url_helpers.download_project_package_file_path(project, self) Gitlab::Routing.url_helpers.download_project_package_file_path(project, self)
end end
def local?
file_store == ::Packages::PackageFileUploader::Store::LOCAL
end
end end
...@@ -213,6 +213,13 @@ ...@@ -213,6 +213,13 @@
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: :idempotent:
- :name: geo:geo_blob_verification_primary
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
- :name: geo:geo_container_repository_sync - :name: geo:geo_container_repository_sync
:feature_category: :geo_replication :feature_category: :geo_replication
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class BlobVerificationPrimaryWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
sidekiq_options retry: 3, dead: false
idempotent!
def perform(replicable_name, replicable_id)
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator = replicator_class.new(model_record_id: replicable_id)
replicator.calculate_checksum!
rescue ActiveRecord::RecordNotFound
log_error("Couldn't find the blob, skipping", replicable_name: replicable_name, replicable_id: replicable_id)
end
end
end
---
title: Add verification related fields to packages_package_files table
merge_request: 25411
author:
type: other
...@@ -4,6 +4,7 @@ module Gitlab ...@@ -4,6 +4,7 @@ module Gitlab
module Geo module Geo
module ReplicableModel module ReplicableModel
extend ActiveSupport::Concern extend ActiveSupport::Concern
include Checksummable
included do included do
# If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel` # If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel`
...@@ -28,6 +29,33 @@ module Gitlab ...@@ -28,6 +29,33 @@ module Gitlab
def replicator def replicator
raise NotImplementedError, 'There is no Replicator defined for this model' raise NotImplementedError, 'There is no Replicator defined for this model'
end end
def calculate_checksum!
self.verification_checksum = nil
return unless needs_checksum?
self.verification_checksum = self.class.hexdigest(file.path)
end
def needs_checksum?
verification_checksum.nil? && checksummable?
end
def checksummable?
local? && file_exist?
end
# This checks for existence of the file on storage
#
# @return [Boolean] whether the file exists on storage
def file_exist?
if local?
File.exist?(file.path)
else
file.exists?
end
end
end end
end end
end end
...@@ -51,6 +51,8 @@ module Gitlab ...@@ -51,6 +51,8 @@ module Gitlab
const_get(replicator_class_name, false) const_get(replicator_class_name, false)
end end
attr_reader :model_record_id
def initialize(model_record: nil, model_record_id: nil) def initialize(model_record: nil, model_record_id: nil)
@model_record = model_record @model_record = model_record
@model_record_id = model_record_id @model_record_id = model_record_id
......
...@@ -67,4 +67,35 @@ RSpec.describe Packages::PackageFile, type: :model do ...@@ -67,4 +67,35 @@ RSpec.describe Packages::PackageFile, type: :model do
expect(package_file.size).to eq 3513 expect(package_file.size).to eq 3513
end end
end end
describe '#calculate_checksum!' do
let(:package_file) { create(:conan_package_file, :conan_recipe_file) }
it 'sets `verification_checksum` to SHA256 sum of the file' do
expected = Digest::SHA256.file(package_file.file.path).hexdigest
expect { package_file.calculate_checksum! }
.to change { package_file.verification_checksum }.from(nil).to(expected)
end
it 'sets `checksum` to nil for a non-existent file' do
checksum = Digest::SHA256.file(package_file.file.path).hexdigest
package_file.verification_checksum = checksum
allow(package_file).to receive(:file_exist?).and_return(false)
expect { package_file.calculate_checksum! }
.to change { package_file.verification_checksum }.from(checksum).to(nil)
end
end
context 'new file' do
it 'calls checksum worker' do
allow(Geo::BlobVerificationPrimaryWorker).to receive(:perform_async)
package_file = create(:conan_package_file, :conan_recipe_file)
expect(Geo::BlobVerificationPrimaryWorker).to have_received(:perform_async).with('package_file', package_file.id)
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::BlobVerificationPrimaryWorker, :geo do
let(:package_file) { create(:conan_package_file, :conan_recipe_file) }
describe '#perform' do
it 'calculates the checksum' do
expect { described_class.new.perform('package_file', package_file.id) }
.to change { package_file.reload.verification_checksum }.from(nil)
end
end
end
...@@ -27,6 +27,45 @@ RSpec.shared_examples 'a blob replicator' do ...@@ -27,6 +27,45 @@ RSpec.shared_examples 'a blob replicator' do
expect(::Geo::Event.last.attributes).to include( expect(::Geo::Event.last.attributes).to include(
"replicable_name" => replicator.replicable_name, "event_name" => "created", "payload" => { "model_record_id" => replicator.model_record.id }) "replicable_name" => replicator.replicable_name, "event_name" => "created", "payload" => { "model_record_id" => replicator.model_record.id })
end end
it 'schedules the checksum calculation if needed' do
expect(Geo::BlobVerificationPrimaryWorker).to receive(:perform_async)
expect(replicator).to receive(:needs_checksum?).and_return(true)
replicator.handle_after_create_commit
end
it 'does not schedule the checksum calculation if feature flag is disabled' do
stub_feature_flags(geo_self_service_framework: false)
expect(Geo::BlobVerificationPrimaryWorker).not_to receive(:perform_async)
allow(replicator).to receive(:needs_checksum?).and_return(true)
replicator.handle_after_create_commit
end
end
describe '#calculate_checksum!' do
it 'calculates the checksum' do
model_record.save!
replicator.calculate_checksum!
expect(model_record.reload.verification_checksum).not_to be_nil
end
it 'saves the error message and increments retry counter' do
model_record.save!
allow(model_record).to receive(:calculate_checksum!) do
raise StandardError.new('Failure to calculate checksum')
end
replicator.calculate_checksum!
expect(model_record.reload.verification_failure).to eq 'Failure to calculate checksum'
expect(model_record.verification_retry_count).to be 1
end
end end
describe '#consume_created_event' do describe '#consume_created_event' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment