Commit 6f9fa8c7 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Calculate histogram buckets from apiQueueDuration value

parent 304bc473
...@@ -13,113 +13,155 @@ type errQueueingTimedout struct{ error } ...@@ -13,113 +13,155 @@ type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")} var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")} var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
var ( type queueMetrics struct {
queueingLimit = prometheus.NewGauge(prometheus.GaugeOpts{ queueingLimit prometheus.Gauge
queueingQueueLimit prometheus.Gauge
queueingQueueTimeout prometheus.Gauge
queueingBusy prometheus.Gauge
queueingWaiting prometheus.Gauge
queueingWaitingTime prometheus.Histogram
queueingErrors *prometheus.CounterVec
}
func NewQueueMetrics(timeout time.Duration) *queueMetrics {
waitingTimeBuckets := []float64{
timeout.Seconds() * 0.01,
timeout.Seconds() * 0.05,
timeout.Seconds() * 0.10,
timeout.Seconds() * 0.25,
timeout.Seconds() * 0.50,
timeout.Seconds() * 0.75,
timeout.Seconds() * 0.90,
timeout.Seconds() * 0.95,
timeout.Seconds() * 0.99,
timeout.Seconds(),
timeout.Seconds() * 1.5,
timeout.Seconds() * 2,
}
metrics := &queueMetrics{
queueingLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_limit", Name: "gitlab_workhorse_queueing_limit",
Help: "Current limit set for the queueing mechanism", Help: "Current limit set for the queueing mechanism",
}) }),
queueingQueueLimit = prometheus.NewGauge(prometheus.GaugeOpts{ queueingQueueLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_limit", Name: "gitlab_workhorse_queueing_queue_limit",
Help: "Current queueLimit set for the queueing mechanism", Help: "Current queueLimit set for the queueing mechanism",
}) }),
queueingBusy = prometheus.NewGauge(prometheus.GaugeOpts{ queueingQueueTimeout: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_timeout",
Help: "Current queueTimeout set for the queueing mechanism",
}),
queueingBusy: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_busy", Name: "gitlab_workhorse_queueing_busy",
Help: "How many queued requests are now processed", Help: "How many queued requests are now processed",
}) }),
queueingWaiting = prometheus.NewGauge(prometheus.GaugeOpts{ queueingWaiting: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_waiting", Name: "gitlab_workhorse_queueing_waiting",
Help: "How many requests are now queued", Help: "How many requests are now queued",
}) }),
queueingWaitingTime = prometheus.NewHistogram(prometheus.HistogramOpts{ queueingWaitingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gitlab_workhorse_queueing_waiting_time", Name: "gitlab_workhorse_queueing_waiting_time",
Help: "How many time a request spent in queue", Help: "How many time a request spent in queue",
Buckets: []float64{0.01, 0.02, 0.05, 0.10, 0.20, 0.50, 1.00, 2.00, 5.00, 10.00, 30.00, 60.00}, Buckets: waitingTimeBuckets,
}) }),
queueingErrors = prometheus.NewCounterVec( queueingErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "gitlab_workhorse_queueing_errors", Name: "gitlab_workhorse_queueing_errors",
Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type", Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
}, },
[]string{"type"}, []string{"type"},
) ),
) }
prometheus.MustRegister(metrics.queueingLimit)
prometheus.MustRegister(metrics.queueingQueueLimit)
prometheus.MustRegister(metrics.queueingQueueTimeout)
prometheus.MustRegister(metrics.queueingBusy)
prometheus.MustRegister(metrics.queueingWaiting)
prometheus.MustRegister(metrics.queueingWaitingTime)
prometheus.MustRegister(metrics.queueingErrors)
return metrics
}
type Queue struct { type Queue struct {
*queueMetrics
busyCh chan struct{} busyCh chan struct{}
waitingCh chan time.Time waitingCh chan time.Time
} timeout time.Duration
func init() {
prometheus.MustRegister(queueingErrors)
prometheus.MustRegister(queueingLimit)
prometheus.MustRegister(queueingBusy)
prometheus.MustRegister(queueingWaiting)
prometheus.MustRegister(queueingWaitingTime)
prometheus.MustRegister(queueingQueueLimit)
} }
// NewQueue creates a new queue // NewQueue creates a new queue
// limit specifies number of requests run concurrently // limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued // queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit // if the number of requests is above the limit
func NewQueue(limit, queueLimit uint) *Queue { func NewQueue(limit, queueLimit uint, timeout time.Duration) *Queue {
queueingLimit.Set(float64(limit)) queue := &Queue{
queueingQueueLimit.Set(float64(queueLimit))
return &Queue{
busyCh: make(chan struct{}, limit), busyCh: make(chan struct{}, limit),
waitingCh: make(chan time.Time, limit+queueLimit), waitingCh: make(chan time.Time, limit+queueLimit),
timeout: timeout,
} }
queue.queueMetrics = NewQueueMetrics(timeout)
queue.queueingLimit.Set(float64(limit))
queue.queueingQueueLimit.Set(float64(queueLimit))
queue.queueingQueueTimeout.Set(timeout.Seconds())
return queue
} }
// Acquire takes one slot from the Queue // Acquire takes one slot from the Queue
// and returns when a request should be processed // and returns when a request should be processed
// it allows up to (limit) of requests running at a time // it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests // it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire(timeout time.Duration) (err error) { func (s *Queue) Acquire() (err error) {
// push item to a queue to claim your own slot (non-blocking) // push item to a queue to claim your own slot (non-blocking)
select { select {
case s.waitingCh <- time.Now(): case s.waitingCh <- time.Now():
queueingWaiting.Inc() s.queueingWaiting.Inc()
break break
default: default:
queueingErrors.WithLabelValues("too_many_requests").Inc() s.queueingErrors.WithLabelValues("too_many_requests").Inc()
return ErrTooManyRequests return ErrTooManyRequests
} }
defer func() { defer func() {
if err != nil { if err != nil {
waitStarted := <-s.waitingCh waitStarted := <-s.waitingCh
queueingWaiting.Dec() s.queueingWaiting.Dec()
queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
} }
}() }()
// fast path: push item to current processed items (non-blocking) // fast path: push item to current processed items (non-blocking)
select { select {
case s.busyCh <- struct{}{}: case s.busyCh <- struct{}{}:
queueingBusy.Inc() s.queueingBusy.Inc()
return nil return nil
default: default:
break break
} }
timer := time.NewTimer(timeout) timer := time.NewTimer(s.timeout)
defer timer.Stop() defer timer.Stop()
// push item to current processed items (blocking) // push item to current processed items (blocking)
select { select {
case s.busyCh <- struct{}{}: case s.busyCh <- struct{}{}:
queueingBusy.Inc() s.queueingBusy.Inc()
return nil return nil
case <-timer.C: case <-timer.C:
queueingErrors.WithLabelValues("queueing_timedout").Inc() s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
return ErrQueueingTimedout return ErrQueueingTimedout
} }
} }
...@@ -129,9 +171,9 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { ...@@ -129,9 +171,9 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) {
func (s *Queue) Release() { func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed // dequeue from queue to allow next request to be processed
waitStarted := <-s.waitingCh waitStarted := <-s.waitingCh
queueingWaiting.Dec() s.queueingWaiting.Dec()
queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
<-s.busyCh <-s.busyCh
queueingBusy.Dec() s.queueingBusy.Dec()
} }
...@@ -6,46 +6,46 @@ import ( ...@@ -6,46 +6,46 @@ import (
) )
func TestNormalQueueing(t *testing.T) { func TestNormalQueueing(t *testing.T) {
q := NewQueue(2, 1) q := NewQueue(2, 1, time.Microsecond)
err1 := q.Acquire(time.Microsecond) err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
} }
err2 := q.Acquire(time.Microsecond) err2 := q.Acquire()
if err2 != nil { if err2 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
} }
err3 := q.Acquire(time.Microsecond) err3 := q.Acquire()
if err3 != ErrQueueingTimedout { if err3 != ErrQueueingTimedout {
t.Fatal("we should timeout") t.Fatal("we should timeout")
} }
q.Release() q.Release()
err4 := q.Acquire(time.Microsecond) err4 := q.Acquire()
if err4 != nil { if err4 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
} }
} }
func TestQueueLimit(t *testing.T) { func TestQueueLimit(t *testing.T) {
q := NewQueue(1, 0) q := NewQueue(1, 0, time.Microsecond)
err1 := q.Acquire(time.Microsecond) err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
} }
err2 := q.Acquire(time.Microsecond) err2 := q.Acquire()
if err2 != ErrTooManyRequests { if err2 != ErrTooManyRequests {
t.Fatal("we should fail because of not enough slots in queue") t.Fatal("we should fail because of not enough slots in queue")
} }
} }
func TestQueueProcessing(t *testing.T) { func TestQueueProcessing(t *testing.T) {
q := NewQueue(1, 1) q := NewQueue(1, 1, time.Second)
err1 := q.Acquire(time.Microsecond) err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
} }
...@@ -55,7 +55,7 @@ func TestQueueProcessing(t *testing.T) { ...@@ -55,7 +55,7 @@ func TestQueueProcessing(t *testing.T) {
q.Release() q.Release()
}() }()
err2 := q.Acquire(time.Second) err2 := q.Acquire()
if err2 != nil { if err2 != nil {
t.Fatal("we should acquire slot after the previous one finished") t.Fatal("we should acquire slot after the previous one finished")
} }
......
...@@ -17,10 +17,10 @@ func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Dur ...@@ -17,10 +17,10 @@ func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Dur
queueTimeout = DefaultTimeout queueTimeout = DefaultTimeout
} }
queue := NewQueue(limit, queueLimit) queue := NewQueue(limit, queueLimit, queueTimeout)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire(queueTimeout) err := queue.Acquire()
switch err { switch err {
case nil: case nil:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment