sidekiq_server_middleware.rb 3.86 KB
Newer Older
1 2 3 4 5 6 7 8
# frozen_string_literal: true

module Gitlab
  module Database
    module LoadBalancing
      class SidekiqServerMiddleware
        JobReplicaNotUpToDate = Class.new(StandardError)

9 10
        MINIMUM_DELAY_INTERVAL = 1

11
        def call(worker, job, _queue)
12 13 14
          worker_class = worker.class
          strategy = select_load_balancing_strategy(worker_class, job)

15
          job['load_balancing_strategy'] = strategy.to_s
16

17
          if use_primary?(strategy)
18
            ::Gitlab::Database::LoadBalancing::Session.current.use_primary!
19
          elsif strategy == :retry
20
            raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
21 22
              "  Replica was not up to date."
          else
23
            # this means we selected an up-to-date replica, but there is nothing to do in this case.
24 25 26 27 28 29 30 31 32 33
          end

          yield
        ensure
          clear
        end

        private

        def clear
34 35
          ::Gitlab::Database::LoadBalancing.release_hosts
          ::Gitlab::Database::LoadBalancing::Session.clear_session
36 37
        end

38 39 40 41
        def use_primary?(strategy)
          strategy.start_with?('primary')
        end

42 43
        def select_load_balancing_strategy(worker_class, job)
          return :primary unless load_balancing_available?(worker_class)
44

45 46
          wal_locations = get_wal_locations(job)

47 48 49
          return :primary_no_wal if wal_locations.blank?

          sleep_if_needed(job)
50

51
          if databases_in_sync?(wal_locations)
52 53 54 55 56
            # Happy case: we can read from a replica.
            retried_before?(worker_class, job) ? :replica_retried : :replica
          elsif can_retry?(worker_class, job)
            # Optimistic case: The worker allows retries and we have retries left.
            :retry
57
          else
58
            # Sad case: we need to fall back to the primary.
59
            :primary
60 61 62
          end
        end

63 64 65 66 67
        def sleep_if_needed(job)
          time_diff = Time.current.to_f - job['created_at'].to_f
          sleep time_diff if time_diff > 0 && time_diff < MINIMUM_DELAY_INTERVAL
        end

68
        def get_wal_locations(job)
69
          job['dedup_wal_locations'] || job['wal_locations'] || legacy_wal_location(job)
70 71 72 73 74 75 76 77
        end

        # Already scheduled jobs could still contain legacy database write location.
        # TODO: remove this in the next iteration
        # https://gitlab.com/gitlab-org/gitlab/-/issues/338213
        def legacy_wal_location(job)
          wal_location = job['database_write_location'] || job['database_replica_location']

78
          { ::Gitlab::Database::MAIN_DATABASE_NAME.to_sym => wal_location } if wal_location
79 80
        end

81 82 83 84 85 86
        def load_balancing_available?(worker_class)
          worker_class.include?(::ApplicationWorker) &&
            worker_class.utilizes_load_balancing_capabilities? &&
            worker_class.get_data_consistency_feature_flag_enabled?
        end

87 88 89 90 91 92 93 94
        def can_retry?(worker_class, job)
          worker_class.get_data_consistency == :delayed && not_yet_retried?(job)
        end

        def retried_before?(worker_class, job)
          worker_class.get_data_consistency == :delayed && !not_yet_retried?(job)
        end

95 96 97 98 99 100
        def not_yet_retried?(job)
          # if `retry_count` is `nil` it indicates that this job was never retried
          # the `0` indicates that this is a first retry
          job['retry_count'].nil?
        end

101
        def databases_in_sync?(wal_locations)
102
          ::Gitlab::Database::LoadBalancing.each_load_balancer.all? do |lb|
103 104 105 106 107 108 109 110
            if (location = wal_locations[lb.name])
              lb.select_up_to_date_host(location)
            else
              # If there's no entry for a load balancer it means the Sidekiq
              # job doesn't care for it. In this case we'll treat the load
              # balancer as being in sync.
              true
            end
111
          end
112 113 114 115 116
        end
      end
    end
  end
end