Commit 64452959 authored by Kamil Trzciński's avatar Kamil Trzciński

Allow to store BuildTraceChunks on Object Storage

parent 4f526a33
...@@ -10,45 +10,48 @@ module Ci ...@@ -10,45 +10,48 @@ module Ci
WriteError = Class.new(StandardError) WriteError = Class.new(StandardError)
CHUNK_SIZE = 128.kilobytes CHUNK_SIZE = 128.kilobytes
CHUNK_REDIS_TTL = 1.week
WRITE_LOCK_RETRY = 10 WRITE_LOCK_RETRY = 10
WRITE_LOCK_SLEEP = 0.01.seconds WRITE_LOCK_SLEEP = 0.01.seconds
WRITE_LOCK_TTL = 1.minute WRITE_LOCK_TTL = 1.minute
enum data_store: { enum data_store: {
redis: 1, redis: 1,
db: 2 database: 2,
fog: 3
} }
class << self class << self
def redis_data_key(build_id, chunk_index) def all_stores
"gitlab:ci:trace:#{build_id}:chunks:#{chunk_index}" @all_stores ||= self.data_stores.keys
end end
def redis_data_keys def persist_store
redis.pluck(:build_id, :chunk_index).map do |data| # get first available store from the back of the list
redis_data_key(data.first, data.second) all_stores.reverse.find { |store| get_store_class(store).available? }
end
end end
def redis_delete_data(keys) def get_store_class(store)
return if keys.empty? @stores ||= {}
@stores[store] ||= "Ci::BuildTraceChunks::#{store.capitalize}".constantize.new
Gitlab::Redis::SharedState.with do |redis|
redis.del(keys)
end
end end
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
def begin_fast_destroy def begin_fast_destroy
redis_data_keys all_stores.each_with_object({}) do |result, store|
relation = public_send(store)
keys = get_store_class(store).keys(relation)
result[store] = keys if keys.present?
end
end end
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
def finalize_fast_destroy(keys) def finalize_fast_destroy(keys)
redis_delete_data(keys) keys.each do |store, value|
get_store_class(store).delete_keys(value)
end
end end
end end
...@@ -69,7 +72,7 @@ module Ci ...@@ -69,7 +72,7 @@ module Ci
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize) raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
set_data(data.byteslice(0, offset) + new_data) set_data!(data.byteslice(0, offset) + new_data)
end end
def size def size
...@@ -87,51 +90,53 @@ module Ci ...@@ -87,51 +90,53 @@ module Ci
def range def range
(start_offset...end_offset) (start_offset...end_offset)
end end
def persisted?
!redis?
end
def use_database! def persist!
in_lock do in_lock do
break if db? unsafe_move_to!(self.class.persist_store)
break unless size > 0
self.update!(raw_data: data, data_store: :db)
self.class.redis_delete_data([redis_data_key])
end end
end end
private private
def unsafe_move_to!(new_store)
return if data_store == new_store.to_s
return unless size > 0
old_store_class = self.class.get_store_class(data_store)
self.get_data.tap do |the_data|
self.raw_data = nil
self.data_store = new_store
self.set_data!(the_data)
end
old_store_class.delete_data(self)
end
def get_data def get_data
if redis? self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
redis_data
elsif db?
raw_data
else
raise 'Unsupported data store'
end&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
end end
def set_data(value) def set_data!(value)
raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE
in_lock do in_lock do
if redis? self.class.get_store_class(data_store).set_data(self, value)
redis_set_data(value)
elsif db?
self.raw_data = value
else
raise 'Unsupported data store'
end
@data = value @data = value
save! if changed? save! if changed?
end end
schedule_to_db if full? schedule_to_persist if full?
end end
def schedule_to_db def schedule_to_persist
return if db? return if persisted?
Ci::BuildTraceChunkFlushWorker.perform_async(id) Ci::BuildTraceChunkFlushWorker.perform_async(id)
end end
...@@ -140,22 +145,6 @@ module Ci ...@@ -140,22 +145,6 @@ module Ci
size == CHUNK_SIZE size == CHUNK_SIZE
end end
def redis_data
Gitlab::Redis::SharedState.with do |redis|
redis.get(redis_data_key)
end
end
def redis_set_data(data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(redis_data_key, data, ex: CHUNK_REDIS_TTL)
end
end
def redis_data_key
self.class.redis_data_key(build_id, chunk_index)
end
def in_lock def in_lock
write_lock_key = "trace_write:#{build_id}:chunks:#{chunk_index}" write_lock_key = "trace_write:#{build_id}:chunks:#{chunk_index}"
......
module Ci
module BuildTraceChunks
class Database
def available?
true
end
def keys(relation)
[]
end
def delete_keys(keys)
# no-op
end
def data(model)
model.raw_data
end
def set_data(model, data)
model.raw_data = data
end
def delete_data(model)
model.update_columns(raw_data: nil) unless model.raw_data.nil?
end
end
end
end
module Ci
module BuildTraceChunks
class Fog
def available?
object_store.enabled
end
def data(model)
connection.get_object(bucket_name, key(model)).body
end
def set_data(model, data)
connection.put_object(bucket_name, key(model), data)
end
def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]])
end
def keys(relation)
return [] unless available?
relation.pluck(:build_id, :chunk_index)
end
def delete_keys(keys)
keys.each do |key|
connection.delete_object(bucket_name, key_raw(*key))
end
end
private
def key(model)
key_raw(model.build_id, model.chunk_index)
end
def key_raw(build_id, chunk_index)
"tmp/chunks/builds/#{build_id.to_i}/chunks/#{chunk_index.to_i}.log"
end
def bucket_name
return unless available?
object_store.remote_directory
end
def connection
return unless available?
@connection ||= ::Fog::Storage.new(object_store.connection.to_hash.deep_symbolize_keys)
end
def object_store
Gitlab.config.artifacts.object_store
end
end
end
end
module Ci
module BuildTraceChunks
class Redis
CHUNK_REDIS_TTL = 1.week
def available?
true
end
def data(model)
Gitlab::Redis::SharedState.with do |redis|
redis.get(key(model))
end
end
def set_data(model, data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(key(model), data, ex: CHUNK_REDIS_TTL)
end
end
def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]])
end
def keys(relation)
relation.pluck(:build_id, :chunk_index)
end
def delete_keys(keys)
return if keys.empty?
keys = keys.map { |key| key_raw(*key) }
Gitlab::Redis::SharedState.with do |redis|
redis.del(keys)
end
end
private
def key(model)
key_raw(model.build_id, model.chunk_index)
end
def key_raw(build_id, chunk_index)
"gitlab:ci:trace:#{build_id.to_i}:chunks:#{chunk_index.to_i}"
end
end
end
end
...@@ -5,7 +5,7 @@ module Ci ...@@ -5,7 +5,7 @@ module Ci
def perform(build_trace_chunk_id) def perform(build_trace_chunk_id)
::Ci::BuildTraceChunk.find_by(id: build_trace_chunk_id).try do |build_trace_chunk| ::Ci::BuildTraceChunk.find_by(id: build_trace_chunk_id).try do |build_trace_chunk|
build_trace_chunk.use_database! build_trace_chunk.persist!
end end
end end
end end
......
...@@ -294,8 +294,8 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -294,8 +294,8 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
end end
end end
describe '#use_database!' do describe '#persist!' do
subject { build_trace_chunk.use_database! } subject { build_trace_chunk.persist! }
context 'when data_store is redis' do context 'when data_store is redis' do
let(:data_store) { :redis } let(:data_store) { :redis }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment