Commit 082dee86 authored by Shinya Maeda's avatar Shinya Maeda

Fix dead lock by in_lock conflicts. Shared out in_lock logic. Changed...

Fix dead lock by in_lock conflicts.  Shared out in_lock logic.  Changed key_raw path of fog store. and some bug fixes to make it functional.
parent 03495dd0
...@@ -385,7 +385,7 @@ module Ci ...@@ -385,7 +385,7 @@ module Ci
end end
def erase_old_trace! def erase_old_trace!
update_column(:trace, nil) update_column(:trace, nil) if old_trace
end end
def needs_touch? def needs_touch?
......
module Ci module Ci
class BuildTraceChunk < ActiveRecord::Base class BuildTraceChunk < ActiveRecord::Base
include FastDestroyAll include FastDestroyAll
include ExclusiveLeaseLock
extend Gitlab::Ci::Model extend Gitlab::Ci::Model
belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id
...@@ -14,6 +15,8 @@ module Ci ...@@ -14,6 +15,8 @@ module Ci
WRITE_LOCK_SLEEP = 0.01.seconds WRITE_LOCK_SLEEP = 0.01.seconds
WRITE_LOCK_TTL = 1.minute WRITE_LOCK_TTL = 1.minute
# Note: The ordering of this enum is related to the precedence of persist store.
# The bottom item takes the higest precedence, and the top item takes the lowest precedence.
enum data_store: { enum data_store: {
redis: 1, redis: 1,
database: 2, database: 2,
...@@ -38,8 +41,8 @@ module Ci ...@@ -38,8 +41,8 @@ module Ci
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
def begin_fast_destroy def begin_fast_destroy
all_stores.each_with_object({}) do |result, store| all_stores.each_with_object({}) do |store, result|
relation = public_send(store) relation = public_send(store) # rubocop:disable GitlabSecurity/PublicSend
keys = get_store_class(store).keys(relation) keys = get_store_class(store).keys(relation)
result[store] = keys if keys.present? result[store] = keys if keys.present?
...@@ -72,7 +75,13 @@ module Ci ...@@ -72,7 +75,13 @@ module Ci
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize) raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
set_data!(data.byteslice(0, offset) + new_data) in_lock(*lock_params) do
self.reload if self.persisted?
unsafe_set_data!(data.byteslice(0, offset) + new_data)
end
schedule_to_persist if full?
end end
def size def size
...@@ -96,7 +105,9 @@ module Ci ...@@ -96,7 +105,9 @@ module Ci
end end
def persist! def persist!
in_lock do in_lock(*lock_params) do
self.reload if self.persisted?
unsafe_move_to!(self.class.persist_store) unsafe_move_to!(self.class.persist_store)
end end
end end
...@@ -109,10 +120,10 @@ module Ci ...@@ -109,10 +120,10 @@ module Ci
old_store_class = self.class.get_store_class(data_store) old_store_class = self.class.get_store_class(data_store)
self.get_data.tap do |the_data| get_data.tap do |the_data|
self.raw_data = nil self.raw_data = nil
self.data_store = new_store self.data_store = new_store
self.set_data!(the_data) unsafe_set_data!(the_data)
end end
old_store_class.delete_data(self) old_store_class.delete_data(self)
...@@ -122,19 +133,15 @@ module Ci ...@@ -122,19 +133,15 @@ module Ci
self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
end end
def set_data!(value) def unsafe_set_data!(value)
raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE
in_lock do
self.class.get_store_class(data_store).set_data(self, value) self.class.get_store_class(data_store).set_data(self, value)
@data = value @data = value
save! if changed? save! if changed?
end end
schedule_to_persist if full?
end
def schedule_to_persist def schedule_to_persist
return if persisted? return if persisted?
...@@ -145,25 +152,11 @@ module Ci ...@@ -145,25 +152,11 @@ module Ci
size == CHUNK_SIZE size == CHUNK_SIZE
end end
def in_lock def lock_params
write_lock_key = "trace_write:#{build_id}:chunks:#{chunk_index}" ["trace_write:#{build_id}:chunks:#{chunk_index}",
{ ttl: WRITE_LOCK_TTL,
lease = Gitlab::ExclusiveLease.new(write_lock_key, timeout: WRITE_LOCK_TTL) retry_max: WRITE_LOCK_RETRY,
retry_count = 0 sleep_sec: WRITE_LOCK_SLEEP }]
until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit between retries.
sleep(WRITE_LOCK_SLEEP)
break if WRITE_LOCK_RETRY < (retry_count += 1)
end
raise WriteError, 'Failed to obtain write lock' unless uuid
self.reload if self.persisted?
return yield
ensure
Gitlab::ExclusiveLease.cancel(write_lock_key, uuid)
end end
end end
end end
...@@ -6,7 +6,7 @@ module Ci ...@@ -6,7 +6,7 @@ module Ci
end end
def data(model) def data(model)
connection.get_object(bucket_name, key(model)).body connection.get_object(bucket_name, key(model))[:body]
end end
def set_data(model, data) def set_data(model, data)
...@@ -36,7 +36,7 @@ module Ci ...@@ -36,7 +36,7 @@ module Ci
end end
def key_raw(build_id, chunk_index) def key_raw(build_id, chunk_index)
"tmp/chunks/builds/#{build_id.to_i}/chunks/#{chunk_index.to_i}.log" "tmp/builds/#{build_id.to_i}/chunks/#{chunk_index.to_i}.log"
end end
def bucket_name def bucket_name
......
module ExclusiveLeaseLock
extend ActiveSupport::Concern
def in_lock(key, ttl: 1.minute, retry_max: 10, sleep_sec: 0.01.seconds)
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
retry_count = 0
until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit.
sleep(sleep_sec)
break if retry_max < (retry_count += 1)
end
raise WriteError, 'Failed to obtain write lock' unless uuid
return yield
ensure
Gitlab::ExclusiveLease.cancel(key, uuid)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment