Commit 355080d5 authored by Roy Zwambag's avatar Roy Zwambag

Move all process related methods to the ProcessManagement module

These methods are not directly related to SidekiqCluster process
management. They are currently only used by sidekiq cluster,
but in the future they may be used in other places as well.
parent 747a7b6f
# frozen_string_literal: true
module Gitlab
module ProcessManagement
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
# Traps the given signals and yields the block whenever these signals are
# received.
#
# The block is passed the name of the signal.
#
# Example:
#
# trap_signals(%i(HUP TERM)) do |signal|
# ...
# end
def self.trap_signals(signals)
signals.each do |signal|
trap(signal) do
yield signal
end
end
end
def self.trap_terminate(&block)
trap_signals(TERMINATE_SIGNALS, &block)
end
def self.trap_forward(&block)
trap_signals(FORWARD_SIGNALS, &block)
end
def self.signal(pid, signal)
Process.kill(signal, pid)
true
rescue Errno::ESRCH
false
end
def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
return false unless process_alive?(pid)
end
true
end
def self.any_alive?(pids)
pids_alive(pids).any?
end
def self.pids_alive(pids)
pids.select { |pid| process_alive?(pid) }
end
def self.process_alive?(pid)
# Signal 0 tests whether the process exists and we have access to send signals
# but is otherwise a noop (doesn't actually send a signal to the process)
signal(pid, 0)
end
def self.write_pid(path)
File.open(path, 'w') do |handle|
handle.write(Process.pid.to_s)
end
end
end
end
......@@ -11,6 +11,7 @@ require_relative '../lib/gitlab/utils'
require_relative '../lib/gitlab/sidekiq_config/cli_methods'
require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
require_relative '../lib/gitlab/sidekiq_logging/json_formatter'
require_relative '../lib/gitlab/process_management'
require_relative 'sidekiq_cluster'
module Gitlab
......@@ -106,7 +107,7 @@ module Gitlab
end
def write_pid
SidekiqCluster.write_pid(@pid) if @pid
ProcessManagement.write_pid(@pid) if @pid
end
def soft_timeout_seconds
......@@ -123,11 +124,11 @@ module Gitlab
end
def continue_waiting?(deadline)
SidekiqCluster.any_alive?(@processes) && monotonic_time < deadline
ProcessManagement.any_alive?(@processes) && monotonic_time < deadline
end
def hard_stop_stuck_pids
SidekiqCluster.signal_processes(SidekiqCluster.pids_alive(@processes), "-KILL")
ProcessManagement.signal_processes(ProcessManagement.pids_alive(@processes), "-KILL")
end
def wait_for_termination
......@@ -138,14 +139,14 @@ module Gitlab
end
def trap_signals
SidekiqCluster.trap_terminate do |signal|
ProcessManagement.trap_terminate do |signal|
@alive = false
SidekiqCluster.signal_processes(@processes, signal)
ProcessManagement.signal_processes(@processes, signal)
wait_for_termination
end
SidekiqCluster.trap_forward do |signal|
SidekiqCluster.signal_processes(@processes, signal)
ProcessManagement.trap_forward do |signal|
ProcessManagement.signal_processes(@processes, signal)
end
end
......@@ -153,12 +154,12 @@ module Gitlab
while @alive
sleep(@interval)
unless SidekiqCluster.all_alive?(@processes)
unless ProcessManagement.all_alive?(@processes)
# If a child process died we'll just terminate the whole cluster. It's up to
# runit and such to then restart the cluster.
@logger.info('A worker terminated, shutting down the cluster')
SidekiqCluster.signal_processes(@processes, :TERM)
ProcessManagement.signal_processes(@processes, :TERM)
break
end
end
......
# frozen_string_literal: true
require_relative 'dependencies'
require_relative '../lib/gitlab/process_management'
module Gitlab
module SidekiqCluster
......@@ -17,49 +18,6 @@ module Gitlab
# After surpassing the soft timeout.
DEFAULT_HARD_TIMEOUT_SECONDS = 5
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
# Traps the given signals and yields the block whenever these signals are
# received.
#
# The block is passed the name of the signal.
#
# Example:
#
# trap_signals(%i(HUP TERM)) do |signal|
# ...
# end
def self.trap_signals(signals)
signals.each do |signal|
trap(signal) do
yield signal
end
end
end
def self.trap_terminate(&block)
trap_signals(TERMINATE_SIGNALS, &block)
end
def self.trap_forward(&block)
trap_signals(FORWARD_SIGNALS, &block)
end
def self.signal(pid, signal)
Process.kill(signal, pid)
true
rescue Errno::ESRCH
false
end
def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
# Starts Sidekiq workers for the pairs of processes.
#
# Example:
......@@ -118,7 +76,7 @@ module Gitlab
out: $stdout
)
wait_async(pid)
ProcessManagement.wait_async(pid)
pid
end
......@@ -144,41 +102,5 @@ module Gitlab
concurrency_from_queues.clamp(min, max)
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
return false unless process_alive?(pid)
end
true
end
def self.any_alive?(pids)
pids_alive(pids).any?
end
def self.pids_alive(pids)
pids.select { |pid| process_alive?(pid) }
end
def self.process_alive?(pid)
# Signal 0 tests whether the process exists and we have access to send signals
# but is otherwise a noop (doesn't actually send a signal to the process)
signal(pid, 0)
end
def self.write_pid(path)
File.open(path, 'w') do |handle|
handle.write(Process.pid.to_s)
end
end
end
end
......@@ -246,7 +246,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#write_pid' do
context 'when a PID is specified' do
it 'writes the PID to a file' do
expect(Gitlab::SidekiqCluster).to receive(:write_pid).with('/dev/null')
expect(Gitlab::ProcessManagement).to receive(:write_pid).with('/dev/null')
cli.option_parser.parse!(%w(-P /dev/null))
cli.write_pid
......@@ -255,7 +255,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
context 'when no PID is specified' do
it 'does not write a PID' do
expect(Gitlab::SidekiqCluster).not_to receive(:write_pid)
expect(Gitlab::ProcessManagement).not_to receive(:write_pid)
cli.write_pid
end
......@@ -264,13 +264,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#wait_for_termination' do
it 'waits for termination of all sub-processes and succeeds after 3 checks' do
expect(Gitlab::SidekiqCluster).to receive(:any_alive?)
expect(Gitlab::ProcessManagement).to receive(:any_alive?)
.with(an_instance_of(Array)).and_return(true, true, true, false)
expect(Gitlab::SidekiqCluster).to receive(:pids_alive)
expect(Gitlab::ProcessManagement).to receive(:pids_alive)
.with([]).and_return([])
expect(Gitlab::SidekiqCluster).to receive(:signal_processes)
expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with([], "-KILL")
stub_const("Gitlab::SidekiqCluster::CHECK_TERMINATE_INTERVAL_SECONDS", 0.1)
......@@ -292,13 +292,13 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
.with([['foo']], default_options)
.and_return(worker_pids)
expect(Gitlab::SidekiqCluster).to receive(:any_alive?)
expect(Gitlab::ProcessManagement).to receive(:any_alive?)
.with(worker_pids).and_return(true).at_least(10).times
expect(Gitlab::SidekiqCluster).to receive(:pids_alive)
expect(Gitlab::ProcessManagement).to receive(:pids_alive)
.with(worker_pids).and_return([102])
expect(Gitlab::SidekiqCluster).to receive(:signal_processes)
expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with([102], "-KILL")
cli.run(%w(foo))
......@@ -313,8 +313,8 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
describe '#trap_signals' do
it 'traps the termination and forwarding signals' do
expect(Gitlab::SidekiqCluster).to receive(:trap_terminate)
expect(Gitlab::SidekiqCluster).to receive(:trap_forward)
expect(Gitlab::ProcessManagement).to receive(:trap_terminate)
expect(Gitlab::ProcessManagement).to receive(:trap_forward)
cli.trap_signals
end
......@@ -324,10 +324,10 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do # rubocop:disable RSpec/FilePath
it 'runs until one of the processes has been terminated' do
allow(cli).to receive(:sleep).with(a_kind_of(Numeric))
expect(Gitlab::SidekiqCluster).to receive(:all_alive?)
expect(Gitlab::ProcessManagement).to receive(:all_alive?)
.with(an_instance_of(Array)).and_return(false)
expect(Gitlab::SidekiqCluster).to receive(:signal_processes)
expect(Gitlab::ProcessManagement).to receive(:signal_processes)
.with(an_instance_of(Array), :TERM)
cli.start_loop
......
# frozen_string_literal: true
require_relative '../../../lib/gitlab/process_management'
RSpec.describe Gitlab::ProcessManagement do
describe '.trap_signals' do
it 'traps the given signals' do
expect(described_class).to receive(:trap).ordered.with(:INT)
expect(described_class).to receive(:trap).ordered.with(:HUP)
described_class.trap_signals(%i(INT HUP))
end
end
describe '.trap_terminate' do
it 'traps the termination signals' do
expect(described_class).to receive(:trap_signals)
.with(described_class::TERMINATE_SIGNALS)
described_class.trap_terminate { }
end
end
describe '.trap_forward' do
it 'traps the signals to forward' do
expect(described_class).to receive(:trap_signals)
.with(described_class::FORWARD_SIGNALS)
described_class.trap_forward { }
end
end
describe '.signal_processes' do
it 'sends a signal to every given process' do
expect(described_class).to receive(:signal).with(1, :INT)
described_class.signal_processes([1], :INT)
end
end
describe '.signal' do
it 'sends a signal to the given process' do
allow(Process).to receive(:kill).with(:INT, 4)
expect(described_class.signal(4, :INT)).to eq(true)
end
it 'returns false when the process does not exist' do
allow(Process).to receive(:kill).with(:INT, 4).and_raise(Errno::ESRCH)
expect(described_class.signal(4, :INT)).to eq(false)
end
end
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
# In the X_alive? checks, we check negative PIDs sometimes as a simple way
# to be sure the pids are definitely for non-existent processes.
# Note that -1 is special, and sends the signal to every process we have permission
# for, so we use -2, -3 etc
describe '.all_alive?' do
it 'returns true if all processes are alive' do
processes = [Process.pid]
expect(described_class.all_alive?(processes)).to eq(true)
end
it 'returns false when a thread was not alive' do
processes = [-2]
expect(described_class.all_alive?(processes)).to eq(false)
end
end
describe '.process_alive?' do
it 'returns true if the proces is alive' do
process = Process.pid
expect(described_class.process_alive?(process)).to eq(true)
end
it 'returns false when a thread was not alive' do
process = -2
expect(described_class.process_alive?(process)).to eq(false)
end
end
describe '.pids_alive' do
it 'returns the pids that are alive, from a given array' do
pids = [Process.pid, -2]
expect(described_class.pids_alive(pids)).to match_array([Process.pid])
end
end
describe '.any_alive?' do
it 'returns true if at least one process is alive' do
processes = [Process.pid, -2]
expect(described_class.any_alive?(processes)).to eq(true)
end
it 'returns false when all threads are dead' do
processes = [-2, -3]
expect(described_class.any_alive?(processes)).to eq(false)
end
end
describe '.write_pid' do
it 'writes the PID of the current process to the given file' do
handle = double(:handle)
allow(File).to receive(:open).with('/dev/null', 'w').and_yield(handle)
expect(handle).to receive(:write).with(Process.pid.to_s)
described_class.write_pid('/dev/null')
end
end
end
......@@ -3,55 +3,9 @@
require 'rspec-parameterized'
require_relative '../../sidekiq_cluster/sidekiq_cluster'
require_relative '../../lib/gitlab/process_management'
RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
describe '.trap_signals' do
it 'traps the given signals' do
expect(described_class).to receive(:trap).ordered.with(:INT)
expect(described_class).to receive(:trap).ordered.with(:HUP)
described_class.trap_signals(%i(INT HUP))
end
end
describe '.trap_terminate' do
it 'traps the termination signals' do
expect(described_class).to receive(:trap_signals)
.with(described_class::TERMINATE_SIGNALS)
described_class.trap_terminate { }
end
end
describe '.trap_forward' do
it 'traps the signals to forward' do
expect(described_class).to receive(:trap_signals)
.with(described_class::FORWARD_SIGNALS)
described_class.trap_forward { }
end
end
describe '.signal' do
it 'sends a signal to the given process' do
allow(Process).to receive(:kill).with(:INT, 4)
expect(described_class.signal(4, :INT)).to eq(true)
end
it 'returns false when the process does not exist' do
allow(Process).to receive(:kill).with(:INT, 4).and_raise(Errno::ESRCH)
expect(described_class.signal(4, :INT)).to eq(false)
end
end
describe '.signal_processes' do
it 'sends a signal to every given process' do
expect(described_class).to receive(:signal).with(1, :INT)
described_class.signal_processes([1], :INT)
end
end
describe '.start' do
it 'starts Sidekiq with the given queues, environment and options' do
expected_options = {
......@@ -99,7 +53,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
it 'starts a Sidekiq process' do
allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), **options)).to eq(1)
end
......@@ -109,7 +63,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
.with(env, *args, anything)
.and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), **options)).to eq(1)
end
......@@ -119,7 +73,7 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
.with(anything, *args, a_hash_including(pgroup: true))
.and_return(1)
allow(described_class).to receive(:wait_async)
allow(Gitlab::ProcessManagement).to receive(:wait_async)
expect(described_class.start_sidekiq(%w(foo bar baz), **options)).to eq(1)
end
end
......@@ -152,57 +106,4 @@ RSpec.describe Gitlab::SidekiqCluster do # rubocop:disable RSpec/FilePath
it { expect(described_class.concurrency(queues, min, max)).to eq(expected) }
end
end
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
# In the X_alive? checks, we check negative PIDs sometimes as a simple way
# to be sure the pids are definitely for non-existent processes.
# Note that -1 is special, and sends the signal to every process we have permission
# for, so we use -2, -3 etc
describe '.all_alive?' do
it 'returns true if all processes are alive' do
processes = [Process.pid]
expect(described_class.all_alive?(processes)).to eq(true)
end
it 'returns false when a thread was not alive' do
processes = [-2]
expect(described_class.all_alive?(processes)).to eq(false)
end
end
describe '.any_alive?' do
it 'returns true if at least one process is alive' do
processes = [Process.pid, -2]
expect(described_class.any_alive?(processes)).to eq(true)
end
it 'returns false when all threads are dead' do
processes = [-2, -3]
expect(described_class.any_alive?(processes)).to eq(false)
end
end
describe '.write_pid' do
it 'writes the PID of the current process to the given file' do
handle = double(:handle)
allow(File).to receive(:open).with('/dev/null', 'w').and_yield(handle)
expect(handle).to receive(:write).with(Process.pid.to_s)
described_class.write_pid('/dev/null')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment