Commit 6df3ec4c authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Merge branch '345880-process-management-module' into 'master'

Move all process related methods to the ProcessManagement module

See merge request gitlab-org/gitlab!74693
parents 1bdfeab2 6f2ca4fa
# frozen_string_literal: true
module Gitlab
module ProcessManagement
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
# Traps the given signals and yields the block whenever these signals are
# received.
#
# The block is passed the name of the signal.
#
# Example:
#
# trap_signals(%i(HUP TERM)) do |signal|
# ...
# end
def self.trap_signals(signals)
signals.each do |signal|
trap(signal) do
yield signal
end
end
end
def self.trap_terminate(&block)
trap_signals(TERMINATE_SIGNALS, &block)
end
def self.trap_forward(&block)
trap_signals(FORWARD_SIGNALS, &block)
end
def self.signal(pid, signal)
Process.kill(signal, pid)
true
rescue Errno::ESRCH
false
end
def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
return false unless process_alive?(pid)
end
true
end
def self.any_alive?(pids)
pids_alive(pids).any?
end
def self.pids_alive(pids)
pids.select { |pid| process_alive?(pid) }
end
def self.process_alive?(pid)
# Signal 0 tests whether the process exists and we have access to send signals
# but is otherwise a noop (doesn't actually send a signal to the process)
signal(pid, 0)
end
def self.write_pid(path)
File.open(path, 'w') do |handle|
handle.write(Process.pid.to_s)
end
end
end
end
...@@ -11,6 +11,7 @@ require_relative '../lib/gitlab/utils' ...@@ -11,6 +11,7 @@ require_relative '../lib/gitlab/utils'
require_relative '../lib/gitlab/sidekiq_config/cli_methods' require_relative '../lib/gitlab/sidekiq_config/cli_methods'
require_relative '../lib/gitlab/sidekiq_config/worker_matcher' require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
require_relative '../lib/gitlab/sidekiq_logging/json_formatter' require_relative '../lib/gitlab/sidekiq_logging/json_formatter'
require_relative '../lib/gitlab/process_management'
require_relative 'sidekiq_cluster' require_relative 'sidekiq_cluster'
module Gitlab module Gitlab
...@@ -106,7 +107,7 @@ module Gitlab ...@@ -106,7 +107,7 @@ module Gitlab
end end
def write_pid def write_pid
SidekiqCluster.write_pid(@pid) if @pid ProcessManagement.write_pid(@pid) if @pid
end end
def soft_timeout_seconds def soft_timeout_seconds
...@@ -123,11 +124,11 @@ module Gitlab ...@@ -123,11 +124,11 @@ module Gitlab
end end
def continue_waiting?(deadline) def continue_waiting?(deadline)
SidekiqCluster.any_alive?(@processes) && monotonic_time < deadline ProcessManagement.any_alive?(@processes) && monotonic_time < deadline
end end
def hard_stop_stuck_pids def hard_stop_stuck_pids
SidekiqCluster.signal_processes(SidekiqCluster.pids_alive(@processes), "-KILL") ProcessManagement.signal_processes(ProcessManagement.pids_alive(@processes), "-KILL")
end end
def wait_for_termination def wait_for_termination
...@@ -138,14 +139,14 @@ module Gitlab ...@@ -138,14 +139,14 @@ module Gitlab
end end
def trap_signals def trap_signals
SidekiqCluster.trap_terminate do |signal| ProcessManagement.trap_terminate do |signal|
@alive = false @alive = false
SidekiqCluster.signal_processes(@processes, signal) ProcessManagement.signal_processes(@processes, signal)
wait_for_termination wait_for_termination
end end
SidekiqCluster.trap_forward do |signal| ProcessManagement.trap_forward do |signal|
SidekiqCluster.signal_processes(@processes, signal) ProcessManagement.signal_processes(@processes, signal)
end end
end end
...@@ -153,12 +154,12 @@ module Gitlab ...@@ -153,12 +154,12 @@ module Gitlab
while @alive while @alive
sleep(@interval) sleep(@interval)
unless SidekiqCluster.all_alive?(@processes) unless ProcessManagement.all_alive?(@processes)
# If a child process died we'll just terminate the whole cluster. It's up to # If a child process died we'll just terminate the whole cluster. It's up to
# runit and such to then restart the cluster. # runit and such to then restart the cluster.
@logger.info('A worker terminated, shutting down the cluster') @logger.info('A worker terminated, shutting down the cluster')
SidekiqCluster.signal_processes(@processes, :TERM) ProcessManagement.signal_processes(@processes, :TERM)
break break
end end
end end
......
# frozen_string_literal: true # frozen_string_literal: true
require_relative 'dependencies' require_relative 'dependencies'
require_relative '../lib/gitlab/process_management'
module Gitlab module Gitlab
module SidekiqCluster module SidekiqCluster
...@@ -17,49 +18,6 @@ module Gitlab ...@@ -17,49 +18,6 @@ module Gitlab
# After surpassing the soft timeout. # After surpassing the soft timeout.
DEFAULT_HARD_TIMEOUT_SECONDS = 5 DEFAULT_HARD_TIMEOUT_SECONDS = 5
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
# Traps the given signals and yields the block whenever these signals are
# received.
#
# The block is passed the name of the signal.
#
# Example:
#
# trap_signals(%i(HUP TERM)) do |signal|
# ...
# end
def self.trap_signals(signals)
signals.each do |signal|
trap(signal) do
yield signal
end
end
end
def self.trap_terminate(&block)
trap_signals(TERMINATE_SIGNALS, &block)
end
def self.trap_forward(&block)
trap_signals(FORWARD_SIGNALS, &block)
end
def self.signal(pid, signal)
Process.kill(signal, pid)
true
rescue Errno::ESRCH
false
end
def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
# Starts Sidekiq workers for the pairs of processes. # Starts Sidekiq workers for the pairs of processes.
# #
# Example: # Example:
...@@ -118,7 +76,7 @@ module Gitlab ...@@ -118,7 +76,7 @@ module Gitlab
out: $stdout out: $stdout
) )
wait_async(pid) ProcessManagement.wait_async(pid)
pid pid
end end
...@@ -144,41 +102,5 @@ module Gitlab ...@@ -144,41 +102,5 @@ module Gitlab
concurrency_from_queues.clamp(min, max) concurrency_from_queues.clamp(min, max)
end end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
return false unless process_alive?(pid)
end
true
end
def self.any_alive?(pids)
pids_alive(pids).any?
end
def self.pids_alive(pids)
pids.select { |pid| process_alive?(pid) }
end
def self.process_alive?(pid)
# Signal 0 tests whether the process exists and we have access to send signals
# but is otherwise a noop (doesn't actually send a signal to the process)
signal(pid, 0)
end
def self.write_pid(path)
File.open(path, 'w') do |handle|
handle.write(Process.pid.to_s)
end
end
end end
end end
...@@ -246,7 +246,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -246,7 +246,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#write_pid' do describe '#write_pid' do
context 'when a PID is specified' do context 'when a PID is specified' do
it 'writes the PID to a file' do it 'writes the PID to a file' do
expect(Gitlab::SidekiqCluster).to receive(:write_pid).with('/dev/null') expect(Gitlab::ProcessManagement).to receive(:write_pid).with('/dev/null')
cli.option_parser.parse!(%w(-P /dev/null)) cli.option_parser.parse!(%w(-P /dev/null))
cli.write_pid cli.write_pid
...@@ -255,7 +255,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -255,7 +255,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
context 'when no PID is specified' do context 'when no PID is specified' do
it 'does not write a PID' do it 'does not write a PID' do
expect(Gitlab::SidekiqCluster).not_to receive(:write_pid) expect(Gitlab::ProcessManagement).not_to receive(:write_pid)
cli.write_pid cli.write_pid
end end
...@@ -264,13 +264,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -264,13 +264,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#wait_for_termination' do describe '#wait_for_termination' do
it 'waits for termination of all sub-processes and succeeds after 3 checks' do it 'waits for termination of all sub-processes and succeeds after 3 checks' do
expect(Gitlab::SidekiqCluster).to receive(:any_alive?) expect(Gitlab::ProcessManagement).to receive(:any_alive?)
.with(an_instance_of(Array)).and_return(true, true, true, false) .with(an_instance_of(Array)).and_return(true, true, true, false)
expect(Gitlab::SidekiqCluster).to receive(:pids_alive) expect(Gitlab::ProcessManagement).to receive(:pids_alive)
.with([]).and_return([]) .with([]).and_return([])
expect(Gitlab::SidekiqCluster).to receive(:signal_processes) expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with([], "-KILL") .with([], "-KILL")
stub_const("Gitlab::SidekiqCluster::CHECK_TERMINATE_INTERVAL_SECONDS", 0.1) stub_const("Gitlab::SidekiqCluster::CHECK_TERMINATE_INTERVAL_SECONDS", 0.1)
...@@ -292,13 +292,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -292,13 +292,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
.with([['foo']], default_options) .with([['foo']], default_options)
.and_return(worker_pids) .and_return(worker_pids)
expect(Gitlab::SidekiqCluster).to receive(:any_alive?) expect(Gitlab::ProcessManagement).to receive(:any_alive?)
.with(worker_pids).and_return(true).at_least(10).times .with(worker_pids).and_return(true).at_least(10).times
expect(Gitlab::SidekiqCluster).to receive(:pids_alive) expect(Gitlab::ProcessManagement).to receive(:pids_alive)
.with(worker_pids).and_return([102]) .with(worker_pids).and_return([102])
expect(Gitlab::SidekiqCluster).to receive(:signal_processes) expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with([102], "-KILL") .with([102], "-KILL")
cli.run(%w(foo)) cli.run(%w(foo))
...@@ -313,8 +313,8 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -313,8 +313,8 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#trap_signals' do describe '#trap_signals' do
it 'traps the termination and forwarding signals' do it 'traps the termination and forwarding signals' do
expect(Gitlab::SidekiqCluster).to receive(:trap_terminate) expect(Gitlab::ProcessManagement).to receive(:trap_terminate)
expect(Gitlab::SidekiqCluster).to receive(:trap_forward) expect(Gitlab::ProcessManagement).to receive(:trap_forward)
cli.trap_signals cli.trap_signals
end end
...@@ -324,10 +324,10 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath ...@@ -324,10 +324,10 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
it 'runs until one of the processes has been terminated' do it 'runs until one of the processes has been terminated' do
allow(cli).to receive(:sleep).with(a_kind_of(Numeric)) allow(cli).to receive(:sleep).with(a_kind_of(Numeric))
expect(Gitlab::SidekiqCluster).to receive(:all_alive?) expect(Gitlab::ProcessManagement).to receive(:all_alive?)
.with(an_instance_of(Array)).and_return(false) .with(an_instance_of(Array)).and_return(false)
expect(Gitlab::SidekiqCluster).to receive(:signal_processes) expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with(an_instance_of(Array), :TERM) .with(an_instance_of(Array), :TERM)
cli.start_loop cli.start_loop
......
# frozen_string_literal: true
require_relative '../../../lib/gitlab/process_management'
RSpec.describe Gitlab::ProcessManagement do
describe '.trap_signals' do
it 'traps the given signals' do
expect(described_class).to receive(:trap).ordered.with(:INT)
expect(described_class).to receive(:trap).ordered.with(:HUP)
described_class.trap_signals(%i(INT HUP))
end
end
describe '.trap_terminate' do
it 'traps the termination signals' do
expect(described_class).to receive(:trap_signals)
.with(described_class::TERMINATE_SIGNALS)
described_class.trap_terminate { }
end
end
describe '.trap_forward' do
it 'traps the signals to forward' do
expect(described_class).to receive(:trap_signals)
.with(described_class::FORWARD_SIGNALS)
described_class.trap_forward { }
end
end
describe '.signal_processes' do
it 'sends a signal to every given process' do
expect(described_class).to receive(:signal).with(1, :INT)
described_class.signal_processes([1], :INT)
end
end
describe '.signal' do
it 'sends a signal to the given process' do
allow(Process).to receive(:kill).with(:INT, 4)
expect(described_class.signal(4, :INT)).to eq(true)
end
it 'returns false when the process does not exist' do
allow(Process).to receive(:kill).with(:INT, 4).and_raise(Errno::ESRCH)
expect(described_class.signal(4, :INT)).to eq(false)
end
end
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
# In the X_alive? checks, we check negative PIDs sometimes as a simple way
# to be sure the pids are definitely for non-existent processes.
# Note that -1 is special, and sends the signal to every process we have permission
# for, so we use -2, -3 etc
describe '.all_alive?' do
it 'returns true if all processes are alive' do
processes = [Process.pid]
expect(described_class.all_alive?(processes)).to eq(true)
end
it 'returns false when a thread was not alive' do
processes = [-2]
expect(described_class.all_alive?(processes)).to eq(false)
end
end
describe '.process_alive?' do
it 'returns true if the proces is alive' do
process = Process.pid
expect(described_class.process_alive?(process)).to eq(true)
end
it 'returns false when a thread was not alive' do
process = -2
expect(described_class.process_alive?(process)).to eq(false)
end
end
describe '.pids_alive' do
it 'returns the pids that are alive, from a given array' do
pids = [Process.pid, -2]
expect(described_class.pids_alive(pids)).to match_array([Process.pid])
end
end
describe '.any_alive?' do
it 'returns true if at least one process is alive' do
processes = [Process.pid, -2]
expect(described_class.any_alive?(processes)).to eq(true)
end
it 'returns false when all threads are dead' do
processes = [-2, -3]
expect(described_class.any_alive?(processes)).to eq(false)
end
end
describe '.write_pid' do
it 'writes the PID of the current process to the given file' do
handle = double(:handle)
allow(File).to receive(:open).with('/dev/null', 'w').and_yield(handle)
expect(handle).to receive(:write).with(Process.pid.to_s)
described_class.write_pid('/dev/null')
end
end
end
...@@ -5,53 +5,6 @@ require 'rspec-parameterized' ...@@ -5,53 +5,6 @@ require 'rspec-parameterized'
require_relative '../../sidekiq_cluster/sidekiq_cluster' require_relative '../../sidekiq_cluster/sidekiq_cluster'
RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
describe '.trap_signals' do
it 'traps the given signals' do
expect(described_class).to receive(:trap).ordered.with(:INT)
expect(described_class).to receive(:trap).ordered.with(:HUP)
described_class.trap_signals(%i(INT HUP))
end
end
describe '.trap_terminate' do
it 'traps the termination signals' do
expect(described_class).to receive(:trap_signals)
.with(described_class::TERMINATE_SIGNALS)
described_class.trap_terminate { }
end
end
describe '.trap_forward' do
it 'traps the signals to forward' do
expect(described_class).to receive(:trap_signals)
.with(described_class::FORWARD_SIGNALS)
described_class.trap_forward { }
end
end
describe '.signal' do
it 'sends a signal to the given process' do
allow(Process).to receive(:kill).with(:INT, 4)
expect(described_class.signal(4, :INT)).to eq(true)
end
it 'returns false when the process does not exist' do
allow(Process).to receive(:kill).with(:INT, 4).and_raise(Errno::ESRCH)
expect(described_class.signal(4, :INT)).to eq(false)
end
end
describe '.signal_processes' do
it 'sends a signal to every given process' do
expect(described_class).to receive(:signal).with(1, :INT)
described_class.signal_processes([1], :INT)
end
end
describe '.start' do describe '.start' do
it 'starts Sidekiq with the given queues, environment and options' do it 'starts Sidekiq with the given queues, environment and options' do
expected_options = { expected_options = {
...@@ -99,7 +52,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath ...@@ -99,7 +52,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
it 'starts a Sidekiq process' do it 'starts a Sidekiq process' do
allow(Process).to receive(:spawn).and_return(1) allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1) expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), **options)).to eq(1) expect(described_class.start_sidekiq(%w(foo), **options)).to eq(1)
end end
...@@ -109,7 +62,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath ...@@ -109,7 +62,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
.with(env, *args, anything) .with(env, *args, anything)
.and_return(1) .and_return(1)
expect(described_class).to receive(:wait_async).with(1) expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), **options)).to eq(1) expect(described_class.start_sidekiq(%w(foo foo bar baz), **options)).to eq(1)
end end
...@@ -119,7 +72,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath ...@@ -119,7 +72,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
.with(anything, *args, a_hash_including(pgroup: true)) .with(anything, *args, a_hash_including(pgroup: true))
.and_return(1) .and_return(1)
allow(described_class).to receive(:wait_async) allow(Gitlab::ProcessManagement).to receive(:wait_async)
expect(described_class.start_sidekiq(%w(foo bar baz), **options)).to eq(1) expect(described_class.start_sidekiq(%w(foo bar baz), **options)).to eq(1)
end end
end end
...@@ -152,57 +105,4 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath ...@@ -152,57 +105,4 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
it { expect(described_class.concurrency(queues, min, max)).to eq(expected) } it { expect(described_class.concurrency(queues, min, max)).to eq(expected) }
end end
end end
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
# In the X_alive? checks, we check negative PIDs sometimes as a simple way
# to be sure the pids are definitely for non-existent processes.
# Note that -1 is special, and sends the signal to every process we have permission
# for, so we use -2, -3 etc
describe '.all_alive?' do
it 'returns true if all processes are alive' do
processes = [Process.pid]
expect(described_class.all_alive?(processes)).to eq(true)
end
it 'returns false when a thread was not alive' do
processes = [-2]
expect(described_class.all_alive?(processes)).to eq(false)
end
end
describe '.any_alive?' do
it 'returns true if at least one process is alive' do
processes = [Process.pid, -2]
expect(described_class.any_alive?(processes)).to eq(true)
end
it 'returns false when all threads are dead' do
processes = [-2, -3]
expect(described_class.any_alive?(processes)).to eq(false)
end
end
describe '.write_pid' do
it 'writes the PID of the current process to the given file' do
handle = double(:handle)
allow(File).to receive(:open).with('/dev/null', 'w').and_yield(handle)
expect(handle).to receive(:write).with(Process.pid.to_s)
described_class.write_pid('/dev/null')
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment