Commit 73345bef authored by Kamil Trzcinski's avatar Kamil Trzcinski

Refactor source to look way nicer

parent 25d46f88
package builds package builds
import ( import (
"bytes"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
"io"
"io/ioutil"
"net/http" "net/http"
"time" "time"
)
const MaxRegisterBodySize = 4 * 1024 "github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
)
func readRunnerQueueKey(w http.ResponseWriter, r *http.Request) (string, error) { const (
limitedBody := http.MaxBytesReader(w, r.Body, MaxRegisterBodySize) maxRegisterBodySize = 4 * 1024
defer limitedBody.Close() runnerBuildQueue = "runner:build_queue:"
runnerBuildQueueKey = "token"
runnerBuildQueueValue = "X-GitLab-Last-Update"
)
// Read body var (
var body bytes.Buffer registerHandlerHits = prometheus.NewCounterVec(
_, err := io.Copy(&body, limitedBody) prometheus.CounterOpts{
if err != nil { Name: "gitlab_workhorse_builds_register_handler",
return "", err Help: "How many connections gitlab-workhorse has opened in total. Can be used to track Redis connection rate for this process",
} },
[]string{"status"},
)
)
r.Body = ioutil.NopCloser(&body) type largeBodyError struct{ error }
type watchError struct{ error }
tmpReq := *r func init() {
tmpReq.Body = ioutil.NopCloser(bytes.NewReader(body.Bytes())) prometheus.MustRegister(
registerHandlerHits,
)
}
err = tmpReq.ParseForm() func readRunnerToken(r *http.Request) (string, error) {
err := r.ParseForm()
if err != nil { if err != nil {
return "", err return "", err
} }
token := tmpReq.FormValue("token") token := r.FormValue(runnerBuildQueueKey)
if token == "" { return token, nil
return "", nil
}
key := "runner:build_queue:" + token
return key, nil
} }
func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler { func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler {
...@@ -48,25 +51,32 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler ...@@ -48,25 +51,32 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler
} }
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lastUpdate := r.Header.Get("X-GitLab-Last-Update") lastUpdate := r.Header.Get(runnerBuildQueueValue)
if lastUpdate == "" { if lastUpdate == "" {
// We could have a fail-over implementation here, for old runners, that: // The client doesn't have update, fail
// Proxies the requests, if this is 204, we delay the response to client, registerHandlerHits.WithLabelValues("missing-value").Inc()
// By checking the response from handler, and reading `X-GitLab-Last-Update`,
// and then watching on a key
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
return return
} }
queueKey, err := readRunnerQueueKey(w, r) requestBody, err := helper.ReadRequestBody(w, r, maxRegisterBodySize)
if err != nil { if err != nil {
helper.Fail500(w, r, err) registerHandlerHits.WithLabelValues("body-read-error").Inc()
helper.Fail500(w, r, &largeBodyError{err})
return
}
runnerToken, err := readRunnerToken(helper.CloneRequestWithNewBody(r, requestBody))
if runnerToken == "" || err != nil {
registerHandlerHits.WithLabelValues("body-parse-error").Inc()
h.ServeHTTP(w, r)
return return
} }
result, err := redis.WatchKey(queueKey, lastUpdate, pollingDuration) result, err := redis.WatchKey(runnerBuildQueue+runnerToken, lastUpdate, pollingDuration)
if err != nil { if err != nil {
helper.Fail500(w, r, err) registerHandlerHits.WithLabelValues("watch-error").Inc()
helper.Fail500(w, r, &watchError{err})
return return
} }
...@@ -74,7 +84,8 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler ...@@ -74,7 +84,8 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler
// It means that we detected a change before starting watching on change, // It means that we detected a change before starting watching on change,
// We proxy request to Rails, to see whether we can receive the build // We proxy request to Rails, to see whether we can receive the build
case redis.WatchKeyStatusAlreadyChanged: case redis.WatchKeyStatusAlreadyChanged:
h.ServeHTTP(w, r) registerHandlerHits.WithLabelValues("already-changed").Inc()
h.ServeHTTP(w, helper.CloneRequestWithNewBody(r, requestBody))
// It means that we detected a change after watching. // It means that we detected a change after watching.
// We could potentially proxy request to Rails, but... // We could potentially proxy request to Rails, but...
...@@ -82,12 +93,18 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler ...@@ -82,12 +93,18 @@ func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler
// as don't really know whether ResponseWriter is still in a sane state, // as don't really know whether ResponseWriter is still in a sane state,
// whether the connection is not dead // whether the connection is not dead
case redis.WatchKeyStatusSeenChange: case redis.WatchKeyStatusSeenChange:
registerHandlerHits.WithLabelValues("seen-change").Inc()
w.WriteHeader(204) w.WriteHeader(204)
// When we receive one of these statuses, it means that we detected no change, // When we receive one of these statuses, it means that we detected no change,
// so we return to runner 204, which means nothing got changed, // so we return to runner 204, which means nothing got changed,
// and there's no new builds to process // and there's no new builds to process
case redis.WatchKeyStatusTimeout, redis.WatchKeyStatusNoChange: case redis.WatchKeyStatusTimeout:
registerHandlerHits.WithLabelValues("timeout").Inc()
w.WriteHeader(204)
case redis.WatchKeyStatusNoChange:
registerHandlerHits.WithLabelValues("no-change").Inc()
w.WriteHeader(204) w.WriteHeader(204)
} }
}) })
......
package helper package helper
import ( import (
"bytes"
"errors" "errors"
"io"
"io/ioutil"
"log" "log"
"mime" "mime"
"net" "net"
...@@ -166,3 +169,22 @@ func IsContentType(expected, actual string) bool { ...@@ -166,3 +169,22 @@ func IsContentType(expected, actual string) bool {
parsed, _, err := mime.ParseMediaType(actual) parsed, _, err := mime.ParseMediaType(actual)
return err == nil && parsed == expected return err == nil && parsed == expected
} }
func ReadRequestBody(w http.ResponseWriter, r *http.Request, maxBodySize int64) ([]byte, error) {
limitedBody := http.MaxBytesReader(w, r.Body, maxBodySize)
defer limitedBody.Close()
var body bytes.Buffer
_, err := io.Copy(&body, limitedBody)
if err != nil {
return nil, err
}
return body.Bytes(), nil
}
func CloneRequestWithNewBody(r *http.Request, body []byte) *http.Request {
newReq := *r
newReq.Body = ioutil.NopCloser(bytes.NewReader(body))
return &newReq
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment