......@@ -3,6 +3,8 @@ package queueing
import (
type errTooManyRequests struct{ error }
......@@ -11,58 +13,184 @@ type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
type queueMetrics struct {
queueingLimit prometheus.Gauge
queueingQueueLimit prometheus.Gauge
queueingQueueTimeout prometheus.Gauge
queueingBusy prometheus.Gauge
queueingWaiting prometheus.Gauge
queueingWaitingTime prometheus.Histogram
queueingErrors *prometheus.CounterVec
// newQueueMetrics prepares Prometheus metrics for queueing mechanism
// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
// Don't call newQueueMetrics twice with the same name argument!
// timeout specifies the timeout of storing a request in queue - queueMetrics
// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
// metric
func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
waitingTimeBuckets := []float64{
timeout.Seconds() * 0.01,
timeout.Seconds() * 0.05,
timeout.Seconds() * 0.10,
timeout.Seconds() * 0.25,
timeout.Seconds() * 0.50,
timeout.Seconds() * 0.75,
timeout.Seconds() * 0.90,
timeout.Seconds() * 0.95,
timeout.Seconds() * 0.99,
metrics := &queueMetrics{
queueingLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_limit",
Help: "Current limit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
queueingQueueLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_limit",
Help: "Current queueLimit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
queueingQueueTimeout: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_timeout",
Help: "Current queueTimeout set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
queueingBusy: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_busy",
Help: "How many queued requests are now processed",
ConstLabels: prometheus.Labels{
"queue_name": name,
queueingWaiting: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_waiting",
Help: "How many requests are now queued",
ConstLabels: prometheus.Labels{
"queue_name": name,
queueingWaitingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gitlab_workhorse_queueing_waiting_time",
Help: "How many time a request spent in queue",
ConstLabels: prometheus.Labels{
"queue_name": name,
Buckets: waitingTimeBuckets,
queueingErrors: prometheus.NewCounterVec(
Name: "gitlab_workhorse_queueing_errors",
Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
ConstLabels: prometheus.Labels{
"queue_name": name,
return metrics
type Queue struct {
name string
busyCh chan struct{}
waitingCh chan struct{}
waitingCh chan time.Time
timeout time.Duration
// NewQueue creates a new queue
// newQueue creates a new queue
// name specifies name used to label queue metrics.
// Don't call newQueue twice with the same name argument!
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit
func NewQueue(limit, queueLimit uint) *Queue {
return &Queue{
func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
queue := &Queue{
name: name,
busyCh: make(chan struct{}, limit),
waitingCh: make(chan struct{}, limit+queueLimit),
waitingCh: make(chan time.Time, limit+queueLimit),
timeout: timeout,
queue.queueMetrics = newQueueMetrics(name, timeout)
return queue
// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire(timeout time.Duration) (err error) {
func (s *Queue) Acquire() (err error) {
// push item to a queue to claim your own slot (non-blocking)
select {
case s.waitingCh <- struct{}{}:
case s.waitingCh <- time.Now():
return ErrTooManyRequests
defer func() {
if err != nil {
waitStarted := <-s.waitingCh
// fast path: push item to current processed items (non-blocking)
select {
case s.busyCh <- struct{}{}:
return nil
timer := time.NewTimer(timeout)
timer := time.NewTimer(s.timeout)
defer timer.Stop()
// push item to current processed items (blocking)
select {
case s.busyCh <- struct{}{}:
return nil
case <-timer.C:
return ErrQueueingTimedout
......@@ -71,6 +199,10 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) {
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed
waitStarted := <-s.waitingCh
......@@ -6,46 +6,46 @@ import (
func TestNormalQueueing(t *testing.T) {
q := NewQueue(2, 1)
err1 := q.Acquire(time.Microsecond)
q := newQueue("queue 1", 2, 1, time.Microsecond)
err1 := q.Acquire()
if err1 != nil {
t.Fatal("we should acquire a new slot")
err2 := q.Acquire(time.Microsecond)
err2 := q.Acquire()
if err2 != nil {
t.Fatal("we should acquire a new slot")
err3 := q.Acquire(time.Microsecond)
err3 := q.Acquire()
if err3 != ErrQueueingTimedout {
t.Fatal("we should timeout")
err4 := q.Acquire(time.Microsecond)
err4 := q.Acquire()
if err4 != nil {
t.Fatal("we should acquire a new slot")
func TestQueueLimit(t *testing.T) {
q := NewQueue(1, 0)
err1 := q.Acquire(time.Microsecond)
q := newQueue("queue 2", 1, 0, time.Microsecond)
err1 := q.Acquire()
if err1 != nil {
t.Fatal("we should acquire a new slot")
err2 := q.Acquire(time.Microsecond)
err2 := q.Acquire()
if err2 != ErrTooManyRequests {
t.Fatal("we should fail because of not enough slots in queue")
func TestQueueProcessing(t *testing.T) {
q := NewQueue(1, 1)
err1 := q.Acquire(time.Microsecond)
q := newQueue("queue 3", 1, 1, time.Second)
err1 := q.Acquire()
if err1 != nil {
t.Fatal("we should acquire a new slot")
......@@ -55,7 +55,7 @@ func TestQueueProcessing(t *testing.T) {
err2 := q.Acquire(time.Second)
err2 := q.Acquire()
if err2 != nil {
t.Fatal("we should acquire slot after the previous one finished")
......@@ -9,7 +9,14 @@ import (
const DefaultTimeout = 30 * time.Second
func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
// QueueRequests creates a new request queue
// name specifies the name of queue, used to label Prometheus metrics
// Don't call QueueRequests twice with the same name argument!
// h specifies a http.Handler which will handle the queue requests
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// queueTimeout specifies the time limit of storing the request in the queue
func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
if limit == 0 {
return h
......@@ -17,10 +24,10 @@ func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Dur
queueTimeout = DefaultTimeout
queue := NewQueue(limit, queueLimit)
queue := newQueue(name, limit, queueLimit, queueTimeout)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire(queueTimeout)
err := queue.Acquire()
switch err {
case nil:
......@@ -21,7 +21,7 @@ func pausedHttpHandler(pauseCh chan struct{}) http.Handler {
func TestNormalRequestProcessing(t *testing.T) {
w := httptest.NewRecorder()
h := QueueRequests(httpHandler, 1, 1, time.Second)
h := QueueRequests("Normal request processing", httpHandler, 1, 1, time.Second)
h.ServeHTTP(w, nil)
if w.Code != 200 {
t.Fatal("QueueRequests should process request")
......@@ -32,11 +32,11 @@ func TestNormalRequestProcessing(t *testing.T) {
// then it runs a number of requests that are going through queue,
// we return the response of first finished request,
// where status of request can be 200, 429 or 503
func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder {
func testSlowRequestProcessing(name string, count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder {
pauseCh := make(chan struct{})
defer close(pauseCh)
handler := QueueRequests(pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout)
handler := QueueRequests("Slow request processing: "+name, pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout)
respCh := make(chan *httptest.ResponseRecorder, count)
......@@ -57,7 +57,7 @@ func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout t
// the queue limit and length is 1,
// the second request gets timed-out
func TestQueueingTimeout(t *testing.T) {
w := testSlowRequestProcessing(2, 1, 1, time.Microsecond)
w := testSlowRequestProcessing("timeout", 2, 1, 1, time.Microsecond)
if w.Code != 503 {
t.Fatal("QueueRequests should timeout queued request")
......@@ -68,7 +68,7 @@ func TestQueueingTimeout(t *testing.T) {
// the queue limit and length is 1,
// so the third request has to be rejected with 429
func TestQueueingTooManyRequests(t *testing.T) {
w := testSlowRequestProcessing(3, 1, 1, time.Minute)
w := testSlowRequestProcessing("too many requests", 3, 1, 1, time.Minute)
if w.Code != 429 {
t.Fatal("QueueRequests should return immediately and return too many requests")
......@@ -119,7 +119,7 @@ func (u *Upstream) configureRoutes() {
uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy)
ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
u.Routes = []routeEntry{
