Commit baffc0a4 authored by Kamil Trzcinski's avatar Kamil Trzcinski

Add apiCiLongPolling option to use a new reds queueing mechanism

parent c481d4a9
...@@ -36,6 +36,8 @@ gitlab-workhorse'][brief-history-blog]. ...@@ -36,6 +36,8 @@ gitlab-workhorse'][brief-history-blog].
gitlab-workhorse [OPTIONS] gitlab-workhorse [OPTIONS]
Options: Options:
-apiCiLongPolling duration
Long polling duration for job requesting for runners (default 0s - disabled)
-apiLimit uint -apiLimit uint
Number of API requests allowed at single time Number of API requests allowed at single time
-apiQueueDuration duration -apiQueueDuration duration
......
package builds
import (
"bytes"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
"io"
"io/ioutil"
"net/http"
"time"
)
const MaxRegisterBodySize = 4 * 1024
func readRunnerQueueKey(w http.ResponseWriter, r *http.Request) (string, error) {
limitedBody := http.MaxBytesReader(w, r.Body, MaxRegisterBodySize)
defer limitedBody.Close()
// Read body
var body bytes.Buffer
_, err := io.Copy(&body, limitedBody)
if err != nil {
return "", err
}
r.Body = ioutil.NopCloser(&body)
tmpReq := *r
tmpReq.Body = ioutil.NopCloser(bytes.NewReader(body.Bytes()))
err = tmpReq.ParseForm()
if err != nil {
return "", err
}
token := tmpReq.FormValue("token")
if token == "" {
return "", nil
}
key := "runner:build_queue:" + token
return key, nil
}
func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler {
if pollingDuration == 0 {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lastUpdate := r.Header.Get("X-GitLab-Last-Update")
if lastUpdate != "" {
// We could have a fail-over implementation here, for old runners, that:
// Proxies the requests, if this is 204, we delay the response to client,
// By checking the response from handler, and reading `X-GitLab-Last-Update`,
// and then watching on a key
h.ServeHTTP(w, r)
return
}
queueKey, err := readRunnerQueueKey(w, r)
if err != nil {
helper.Fail500(w, r, err)
return
}
result, err := redis.WatchKey(queueKey, lastUpdate, pollingDuration)
if err != nil {
helper.Fail500(w, r, err)
return
}
switch result {
// It means that we detected a change before starting watching on change,
// We proxy request to Rails, to see whether we can receive the build
case redis.WatchKeyStatusAlreadyChanged:
h.ServeHTTP(w, r)
// It means that we detected a change after watching.
// We could potentially proxy request to Rails, but...
// We can end-up with unreliable responses,
// as don't really know whether ResponseWriter is still in a sane state,
// whether the connection is not dead
case redis.WatchKeyStatusSeenChange:
w.WriteHeader(204)
// When we receive one of these statuses, it means that we detected no change,
// so we return to runner 204, which means nothing got changed,
// and there's no new builds to process
case redis.WatchKeyStatusTimeout, redis.WatchKeyStatusNoChange:
w.WriteHeader(204)
}
})
}
...@@ -38,6 +38,7 @@ type Config struct { ...@@ -38,6 +38,7 @@ type Config struct {
APILimit uint `toml:"-"` APILimit uint `toml:"-"`
APIQueueLimit uint `toml:"-"` APIQueueLimit uint `toml:"-"`
APIQueueTimeout time.Duration `toml:"-"` APIQueueTimeout time.Duration `toml:"-"`
APICILongPolling time.Duration `toml:"-"`
} }
// LoadConfig from a file // LoadConfig from a file
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts" "gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git" "gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs" "gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
...@@ -118,6 +119,7 @@ func (u *Upstream) configureRoutes() { ...@@ -118,6 +119,7 @@ func (u *Upstream) configureRoutes() {
uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy) uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy)
ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, u.APICILongPolling)
u.Routes = []routeEntry{ u.Routes = []routeEntry{
// Git Clone // Git Clone
...@@ -132,8 +134,8 @@ func (u *Upstream) configureRoutes() { ...@@ -132,8 +134,8 @@ func (u *Upstream) configureRoutes() {
// Terminal websocket // Terminal websocket
wsRoute(projectPattern+`environments/[0-9]+/terminal.ws\z`, terminal.Handler(api)), wsRoute(projectPattern+`environments/[0-9]+/terminal.ws\z`, terminal.Handler(api)),
// Limit capacity given to builds/register.json // Long poll and limit capacity given to builds/register.json
route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPIProxyQueue), route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling),
// Explicitly proxy API requests // Explicitly proxy API requests
route("", apiPattern, proxy), route("", apiPattern, proxy),
......
...@@ -51,6 +51,7 @@ var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File w ...@@ -51,6 +51,7 @@ var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File w
var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time") var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time")
var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued") var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued")
var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests") var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests")
var apiCiLongPolling = flag.Duration("apiCiLongPolling", 0, "Long polling duration for job requesting for runners (default 0s - disabled)")
var logFile = flag.String("logFile", "", "Log file to be used") var logFile = flag.String("logFile", "", "Log file to be used")
var prometheusListenAddr = flag.String("prometheusListenAddr", "", "Prometheus listening address, e.g. ':9100'") var prometheusListenAddr = flag.String("prometheusListenAddr", "", "Prometheus listening address, e.g. ':9100'")
...@@ -121,6 +122,7 @@ func main() { ...@@ -121,6 +122,7 @@ func main() {
APILimit: *apiLimit, APILimit: *apiLimit,
APIQueueLimit: *apiQueueLimit, APIQueueLimit: *apiQueueLimit,
APIQueueTimeout: *apiQueueTimeout, APIQueueTimeout: *apiQueueTimeout,
APICILongPolling: *apiCiLongPolling,
} }
if *configFile != "" { if *configFile != "" {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment