Commit 7378c05b authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch 'ci-api-long-polling' into 'master'

Add apiCiLongPolling option to use a new redis queueing mechanism

See merge request !128
parents ccbc989d 97764f10
...@@ -36,6 +36,8 @@ gitlab-workhorse'][brief-history-blog]. ...@@ -36,6 +36,8 @@ gitlab-workhorse'][brief-history-blog].
gitlab-workhorse [OPTIONS] gitlab-workhorse [OPTIONS]
Options: Options:
-apiCiLongPollingDuration duration
Long polling duration for job requesting for runners (default 0s - disabled)
-apiLimit uint -apiLimit uint
Number of API requests allowed at single time Number of API requests allowed at single time
-apiQueueDuration duration -apiQueueDuration duration
......
package main
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func startWorkhorseServerWithLongPolling(authBackend string, pollingDuration time.Duration) *httptest.Server {
uc := newUpstreamConfig(authBackend)
uc.APICILongPollingDuration = pollingDuration
return startWorkhorseServerWithConfig(uc)
}
func postBuildsRegister(url string, body io.Reader) (*http.Response, error) {
resource := `/ci/api/v1/builds/register.json`
return http.Post(url+resource, `application/json`, body)
}
func TestBuildsLongPullingEndpointDisabled(t *testing.T) {
ws := startWorkhorseServerWithLongPolling("http://localhost/", 0)
defer ws.Close()
resp, err := postBuildsRegister(ws.URL, nil)
assert.NoError(t, err)
defer resp.Body.Close()
assert.NotEqual(t, "yes", resp.Header.Get("Gitlab-Ci-Builds-Polling"))
}
func TestBuildsLongPullingEndpoint(t *testing.T) {
ws := startWorkhorseServerWithLongPolling("http://localhost/", time.Minute)
defer ws.Close()
resp, err := postBuildsRegister(ws.URL, nil)
assert.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, "yes", resp.Header.Get("Gitlab-Ci-Builds-Polling"))
}
package builds
import (
"encoding/json"
"errors"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
)
const (
maxRegisterBodySize = 32 * 1024
runnerBuildQueue = "runner:build_queue:"
runnerBuildQueueHeaderKey = "Gitlab-Ci-Builds-Polling"
runnerBuildQueueHeaderValue = "yes"
)
var (
registerHandlerRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_builds_register_handler_requests",
Help: "Describes how many requests in different states hit a register handler",
},
[]string{"status"},
)
registerHandlerOpen = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_builds_register_handler_open",
Help: "Describes how many requests is currently open in given state",
},
[]string{"state"},
)
registerHandlerOpenAtReading = registerHandlerOpen.WithLabelValues("reading")
registerHandlerOpenAtProxying = registerHandlerOpen.WithLabelValues("proxying")
registerHandlerOpenAtWatching = registerHandlerOpen.WithLabelValues("watching")
registerHandlerBodyReadErrors = registerHandlerRequests.WithLabelValues("body-read-error")
registerHandlerBodyParseErrors = registerHandlerRequests.WithLabelValues("body-parse-error")
registerHandlerMissingValues = registerHandlerRequests.WithLabelValues("missing-values")
registerHandlerWatchErrors = registerHandlerRequests.WithLabelValues("watch-error")
registerHandlerAlreadyChangedRequests = registerHandlerRequests.WithLabelValues("already-changed")
registerHandlerSeenChangeRequests = registerHandlerRequests.WithLabelValues("seen-change")
registerHandlerTimeoutRequests = registerHandlerRequests.WithLabelValues("timeout")
registerHandlerNoChangeRequests = registerHandlerRequests.WithLabelValues("no-change")
)
type largeBodyError struct{ error }
type watchError struct{ error }
type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error)
func init() {
prometheus.MustRegister(
registerHandlerRequests,
registerHandlerOpen,
)
}
type runnerRequest struct {
Token string `json:"token,omitempty"`
LastUpdate string `json:"last_update,omitempty"`
}
func readRunnerBody(w http.ResponseWriter, r *http.Request) ([]byte, error) {
registerHandlerOpenAtReading.Inc()
defer registerHandlerOpenAtReading.Dec()
return helper.ReadRequestBody(w, r, maxRegisterBodySize)
}
func readRunnerRequest(r *http.Request, body []byte) (*runnerRequest, error) {
if !helper.IsApplicationJson(r) {
return nil, errors.New("invalid content-type received")
}
var runnerRequest runnerRequest
err := json.Unmarshal(body, &runnerRequest)
if err != nil {
return nil, err
}
return &runnerRequest, nil
}
func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request) {
registerHandlerOpenAtProxying.Inc()
defer registerHandlerOpenAtProxying.Dec()
h.ServeHTTP(w, r)
}
func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) {
registerHandlerOpenAtWatching.Inc()
defer registerHandlerOpenAtWatching.Dec()
return watchHandler(runnerBuildQueue+token, lastUpdate, duration)
}
func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler {
if pollingDuration == 0 {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set(runnerBuildQueueHeaderKey, runnerBuildQueueHeaderValue)
requestBody, err := readRunnerBody(w, r)
if err != nil {
registerHandlerBodyReadErrors.Inc()
helper.RequestEntityTooLarge(w, r, &largeBodyError{err})
return
}
newRequest := helper.CloneRequestWithNewBody(r, requestBody)
runnerRequest, err := readRunnerRequest(r, requestBody)
if err != nil {
registerHandlerBodyParseErrors.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
if runnerRequest.Token == "" || runnerRequest.LastUpdate == "" {
registerHandlerMissingValues.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
result, err := watchForRunnerChange(watchHandler, runnerRequest.Token,
runnerRequest.LastUpdate, pollingDuration)
if err != nil {
registerHandlerWatchErrors.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
switch result {
// It means that we detected a change before starting watching on change,
// We proxy request to Rails, to see whether we have a build to receive
case redis.WatchKeyStatusAlreadyChanged:
registerHandlerAlreadyChangedRequests.Inc()
proxyRegisterRequest(h, w, newRequest)
// It means that we detected a change after watching.
// We could potentially proxy request to Rails, but...
// We can end-up with unreliable responses,
// as don't really know whether ResponseWriter is still in a sane state,
// for example the connection is dead
case redis.WatchKeyStatusSeenChange:
registerHandlerSeenChangeRequests.Inc()
w.WriteHeader(http.StatusNoContent)
// When we receive one of these statuses, it means that we detected no change,
// so we return to runner 204, which means nothing got changed,
// and there's no new builds to process
case redis.WatchKeyStatusTimeout:
registerHandlerTimeoutRequests.Inc()
w.WriteHeader(http.StatusNoContent)
case redis.WatchKeyStatusNoChange:
registerHandlerNoChangeRequests.Inc()
w.WriteHeader(http.StatusNoContent)
}
})
}
package builds
import (
"bytes"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
)
const upstreamResponseCode = 999
func echoRequest(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(upstreamResponseCode)
io.Copy(rw, req.Body)
}
var echoRequestFunc = http.HandlerFunc(echoRequest)
func expectHandlerWithWatcher(t *testing.T, watchHandler WatchKeyHandler, data string, contentType string, expectedHttpStatus int, msgAndArgs ...interface{}) {
h := RegisterHandler(echoRequestFunc, watchHandler, time.Second)
rw := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/", bytes.NewBufferString(data))
req.Header.Set("Content-Type", contentType)
h.ServeHTTP(rw, req)
assert.Equal(t, expectedHttpStatus, rw.Code, msgAndArgs...)
}
func expectHandler(t *testing.T, data string, contentType string, expectedHttpStatus int, msgAndArgs ...interface{}) {
expectHandlerWithWatcher(t, nil, data, contentType, expectedHttpStatus, msgAndArgs...)
}
func TestRegisterHandlerLargeBody(t *testing.T) {
data := strings.Repeat(".", maxRegisterBodySize+5)
expectHandler(t, data, "application/json", http.StatusRequestEntityTooLarge,
"rejects body with entity too large")
}
func TestRegisterHandlerInvalidRunnerRequest(t *testing.T) {
expectHandler(t, "invalid content", "text/plain", upstreamResponseCode,
"proxies request to upstream")
}
func TestRegisterHandlerInvalidJsonPayload(t *testing.T) {
expectHandler(t, `{[`, "application/json", upstreamResponseCode,
"fails on parsing body and proxies request to upstream")
}
func TestRegisterHandlerMissingData(t *testing.T) {
testCases := []string{
`{"token":"token"}`,
`{"last_update":"data"}`,
}
for _, testCase := range testCases {
expectHandler(t, testCase, "application/json", upstreamResponseCode,
"fails on argument validation and proxies request to upstream")
}
}
func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error,
httpStatus int, msgAndArgs ...interface{}) {
executed := false
watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) {
executed = true
return watchKeyStatus, watchKeyError
}
parsableData := `{"token":"token","last_update":"last_update"}`
expectHandlerWithWatcher(t, watchKeyHandler, parsableData, "application/json", httpStatus, msgAndArgs...)
assert.True(t, executed, msgAndArgs...)
}
func TestRegisterHandlerWatcherError(t *testing.T) {
expectWatcherToBeExecuted(t, redis.WatchKeyStatusNoChange, errors.New("redis connection"),
upstreamResponseCode, "proxies data to upstream")
}
func TestRegisterHandlerWatcherAlreadyChanged(t *testing.T) {
expectWatcherToBeExecuted(t, redis.WatchKeyStatusAlreadyChanged, nil,
upstreamResponseCode, "proxies data to upstream")
}
func TestRegisterHandlerWatcherSeenChange(t *testing.T) {
expectWatcherToBeExecuted(t, redis.WatchKeyStatusSeenChange, nil,
http.StatusNoContent)
}
func TestRegisterHandlerWatcherTimeout(t *testing.T) {
expectWatcherToBeExecuted(t, redis.WatchKeyStatusTimeout, nil,
http.StatusNoContent)
}
func TestRegisterHandlerWatcherNoChange(t *testing.T) {
expectWatcherToBeExecuted(t, redis.WatchKeyStatusNoChange, nil,
http.StatusNoContent)
}
...@@ -28,16 +28,17 @@ type RedisConfig struct { ...@@ -28,16 +28,17 @@ type RedisConfig struct {
} }
type Config struct { type Config struct {
Redis *RedisConfig `toml:"redis"` Redis *RedisConfig `toml:"redis"`
Backend *url.URL `toml:"-"` Backend *url.URL `toml:"-"`
Version string `toml:"-"` Version string `toml:"-"`
DocumentRoot string `toml:"-"` DocumentRoot string `toml:"-"`
DevelopmentMode bool `toml:"-"` DevelopmentMode bool `toml:"-"`
Socket string `toml:"-"` Socket string `toml:"-"`
ProxyHeadersTimeout time.Duration `toml:"-"` ProxyHeadersTimeout time.Duration `toml:"-"`
APILimit uint `toml:"-"` APILimit uint `toml:"-"`
APIQueueLimit uint `toml:"-"` APIQueueLimit uint `toml:"-"`
APIQueueTimeout time.Duration `toml:"-"` APIQueueTimeout time.Duration `toml:"-"`
APICILongPollingDuration time.Duration `toml:"-"`
} }
// LoadConfig from a file // LoadConfig from a file
......
package helper package helper
import ( import (
"bytes"
"errors" "errors"
"io/ioutil"
"log" "log"
"mime" "mime"
"net" "net"
...@@ -38,6 +40,12 @@ func TooManyRequests(w http.ResponseWriter, r *http.Request, err error) { ...@@ -38,6 +40,12 @@ func TooManyRequests(w http.ResponseWriter, r *http.Request, err error) {
printError(r, err) printError(r, err)
} }
func RequestEntityTooLarge(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Request Entity Too Large", http.StatusRequestEntityTooLarge)
captureRavenError(r, err)
printError(r, err)
}
func printError(r *http.Request, err error) { func printError(r *http.Request, err error) {
if r != nil { if r != nil {
log.Printf("error: %s %q: %v", r.Method, r.RequestURI, err) log.Printf("error: %s %q: %v", r.Method, r.RequestURI, err)
...@@ -166,3 +174,23 @@ func IsContentType(expected, actual string) bool { ...@@ -166,3 +174,23 @@ func IsContentType(expected, actual string) bool {
parsed, _, err := mime.ParseMediaType(actual) parsed, _, err := mime.ParseMediaType(actual)
return err == nil && parsed == expected return err == nil && parsed == expected
} }
func IsApplicationJson(r *http.Request) bool {
contentType := r.Header.Get("Content-Type")
return IsContentType("application/json", contentType)
}
func ReadRequestBody(w http.ResponseWriter, r *http.Request, maxBodySize int64) ([]byte, error) {
limitedBody := http.MaxBytesReader(w, r.Body, maxBodySize)
defer limitedBody.Close()
return ioutil.ReadAll(limitedBody)
}
func CloneRequestWithNewBody(r *http.Request, body []byte) *http.Request {
newReq := *r
newReq.Body = ioutil.NopCloser(bytes.NewReader(body))
newReq.Header = HeaderClone(r.Header)
newReq.ContentLength = int64(len(body))
return &newReq
}
package helper package helper
import ( import (
"bytes"
"io"
"net/http" "net/http"
"net/http/httptest"
"testing" "testing"
"github.com/stretchr/testify/assert"
) )
func TestSetForwardedForGeneratesHeader(t *testing.T) { func TestSetForwardedForGeneratesHeader(t *testing.T) {
...@@ -47,3 +52,50 @@ func TestSetForwardedForGeneratesHeader(t *testing.T) { ...@@ -47,3 +52,50 @@ func TestSetForwardedForGeneratesHeader(t *testing.T) {
} }
} }
} }
func TestReadRequestBody(t *testing.T) {
data := []byte("123456")
rw := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/test", bytes.NewBuffer(data))
result, err := ReadRequestBody(rw, req, 1000)
assert.NoError(t, err)
assert.Equal(t, data, result)
}
func TestReadRequestBodyLimit(t *testing.T) {
data := []byte("123456")
rw := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/test", bytes.NewBuffer(data))
_, err := ReadRequestBody(rw, req, 2)
assert.Error(t, err)
}
func TestCloneRequestWithBody(t *testing.T) {
input := []byte("test")
newInput := []byte("new body")
req, _ := http.NewRequest("POST", "/test", bytes.NewBuffer(input))
newReq := CloneRequestWithNewBody(req, newInput)
assert.NotEqual(t, req, newReq)
assert.NotEqual(t, req.Body, newReq.Body)
assert.NotEqual(t, len(newInput), newReq.ContentLength)
var buffer bytes.Buffer
io.Copy(&buffer, newReq.Body)
assert.Equal(t, newInput, buffer.Bytes())
}
func TestApplicationJson(t *testing.T) {
req, _ := http.NewRequest("POST", "/test", nil)
req.Header.Set("Content-Type", "application/json")
assert.True(t, IsApplicationJson(req), "expected to match 'application/json' as 'application/json'")
req.Header.Set("Content-Type", "application/json; charset=utf-8")
assert.True(t, IsApplicationJson(req), "expected to match 'application/json; charset=utf-8' as 'application/json'")
req.Header.Set("Content-Type", "text/plain")
assert.False(t, IsApplicationJson(req), "expected not to match 'text/plain' as 'application/json'")
}
...@@ -9,11 +9,13 @@ import ( ...@@ -9,11 +9,13 @@ import (
apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts" "gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git" "gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs" "gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy" proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing" "gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata" "gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile" "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages" "gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages"
...@@ -118,6 +120,7 @@ func (u *Upstream) configureRoutes() { ...@@ -118,6 +120,7 @@ func (u *Upstream) configureRoutes() {
uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy) uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy)
ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
u.Routes = []routeEntry{ u.Routes = []routeEntry{
// Git Clone // Git Clone
...@@ -132,8 +135,8 @@ func (u *Upstream) configureRoutes() { ...@@ -132,8 +135,8 @@ func (u *Upstream) configureRoutes() {
// Terminal websocket // Terminal websocket
wsRoute(projectPattern+`environments/[0-9]+/terminal.ws\z`, terminal.Handler(api)), wsRoute(projectPattern+`environments/[0-9]+/terminal.ws\z`, terminal.Handler(api)),
// Limit capacity given to builds/register.json // Long poll and limit capacity given to builds/register.json
route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPIProxyQueue), route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling),
// Explicitly proxy API requests // Explicitly proxy API requests
route("", apiPattern, proxy), route("", apiPattern, proxy),
......
...@@ -51,6 +51,7 @@ var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File w ...@@ -51,6 +51,7 @@ var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File w
var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time") var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time")
var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued") var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued")
var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests") var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests")
var apiCiLongPollingDuration = flag.Duration("apiCiLongPollingDuration", 0, "Long polling duration for job requesting for runners (default 0s - disabled)")
var logFile = flag.String("logFile", "", "Log file to be used") var logFile = flag.String("logFile", "", "Log file to be used")
var prometheusListenAddr = flag.String("prometheusListenAddr", "", "Prometheus listening address, e.g. ':9100'") var prometheusListenAddr = flag.String("prometheusListenAddr", "", "Prometheus listening address, e.g. ':9100'")
...@@ -112,15 +113,16 @@ func main() { ...@@ -112,15 +113,16 @@ func main() {
secret.SetPath(*secretPath) secret.SetPath(*secretPath)
cfg := config.Config{ cfg := config.Config{
Backend: backendURL, Backend: backendURL,
Socket: *authSocket, Socket: *authSocket,
Version: Version, Version: Version,
DocumentRoot: *documentRoot, DocumentRoot: *documentRoot,
DevelopmentMode: *developmentMode, DevelopmentMode: *developmentMode,
ProxyHeadersTimeout: *proxyHeadersTimeout, ProxyHeadersTimeout: *proxyHeadersTimeout,
APILimit: *apiLimit, APILimit: *apiLimit,
APIQueueLimit: *apiQueueLimit, APIQueueLimit: *apiQueueLimit,
APIQueueTimeout: *apiQueueTimeout, APIQueueTimeout: *apiQueueTimeout,
APICILongPollingDuration: *apiCiLongPollingDuration,
} }
if *configFile != "" { if *configFile != "" {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment