Commit 99b8cb27 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Merge branch 'mk/add-workhorse-function-to-call-geo-proxy-endpoint' into 'master'

Geo: Add function to call /api/v4/internal/geo_proxy in Workhorse

See merge request gitlab-org/gitlab!60769
parents 3e372137 376635a7
...@@ -3,6 +3,7 @@ package api ...@@ -3,6 +3,7 @@ package api
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
...@@ -29,6 +30,8 @@ const ( ...@@ -29,6 +30,8 @@ const (
ResponseContentType = "application/vnd.gitlab-workhorse+json" ResponseContentType = "application/vnd.gitlab-workhorse+json"
failureResponseLimit = 32768 failureResponseLimit = 32768
geoProxyEndpointPath = "/api/v4/geo/proxy"
) )
type API struct { type API struct {
...@@ -37,6 +40,8 @@ type API struct { ...@@ -37,6 +40,8 @@ type API struct {
Version string Version string
} }
var ErrNotGeoSecondary = errors.New("this is not a Geo secondary site")
var ( var (
requestsCounter = promauto.NewCounterVec( requestsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
...@@ -61,6 +66,10 @@ func NewAPI(myURL *url.URL, version string, roundTripper http.RoundTripper) *API ...@@ -61,6 +66,10 @@ func NewAPI(myURL *url.URL, version string, roundTripper http.RoundTripper) *API
} }
} }
type GeoProxyEndpointResponse struct {
GeoProxyURL string `json:"geo_proxy_url"`
}
type HandleFunc func(http.ResponseWriter, *http.Request, *Response) type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
type MultipartUploadParams struct { type MultipartUploadParams struct {
...@@ -389,3 +398,40 @@ func bufferResponse(r io.Reader) (*bytes.Buffer, error) { ...@@ -389,3 +398,40 @@ func bufferResponse(r io.Reader) (*bytes.Buffer, error) {
func validResponseContentType(resp *http.Response) bool { func validResponseContentType(resp *http.Response) bool {
return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type")) return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type"))
} }
// TODO: Cache the result of the API requests https://gitlab.com/gitlab-org/gitlab/-/issues/329671
func (api *API) GetGeoProxyURL() (*url.URL, error) {
geoProxyApiUrl := *api.URL
geoProxyApiUrl.Path, geoProxyApiUrl.RawPath = joinURLPath(api.URL, geoProxyEndpointPath)
geoProxyApiReq := &http.Request{
Method: "GET",
URL: &geoProxyApiUrl,
Header: make(http.Header),
}
httpResponse, err := api.doRequestWithoutRedirects(geoProxyApiReq)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: do request: %v", err)
}
defer httpResponse.Body.Close()
if httpResponse.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GetGeoProxyURL: Received HTTP status code: %v", httpResponse.StatusCode)
}
response := &GeoProxyEndpointResponse{}
if err := json.NewDecoder(httpResponse.Body).Decode(response); err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: decode response: %v", err)
}
if response.GeoProxyURL == "" {
return nil, ErrNotGeoSecondary
}
geoProxyURL, err := url.Parse(response.GeoProxyURL)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
}
return geoProxyURL, nil
}
package api
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper"
)
func TestGetGeoProxyURLWhenGeoSecondary(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, `{"geo_proxy_url":"http://primary"}`)
require.NoError(t, err)
require.NotNil(t, geoProxyURL)
require.Equal(t, "http://primary", geoProxyURL.String())
}
func TestGetGeoProxyURLWhenGeoPrimaryOrNonGeo(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, "{}")
require.Error(t, err)
require.Equal(t, ErrNotGeoSecondary, err)
require.Nil(t, geoProxyURL)
}
func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string) (*url.URL, error) {
t.Helper()
ts := testRailsServer(regexp.MustCompile(`/api/v4/geo/proxy`), 200, givenInternalApiResponse)
defer ts.Close()
backend := helper.URLMustParse(ts.URL)
version := "123"
rt := roundtripper.NewTestBackendRoundTripper(backend)
testhelper.ConfigureSecret()
apiClient := NewAPI(backend, version, rt)
geoProxyURL, err := apiClient.GetGeoProxyURL()
return geoProxyURL, err
}
func testRailsServer(url *regexp.Regexp, code int, body string) *httptest.Server {
return testhelper.TestServerWithHandler(url, func(w http.ResponseWriter, r *http.Request) {
// return a 204 No Content response if we don't receive the JWT header
if r.Header.Get(secret.RequestHeader) == "" {
w.WriteHeader(204)
return
}
w.Header().Set("Content-Type", ResponseContentType)
logEntry := log.WithFields(log.Fields{
"method": r.Method,
"url": r.URL,
})
logEntryWithCode := logEntry.WithField("code", code)
// Write pure string
logEntryWithCode.Info("UPSTREAM")
w.WriteHeader(code)
fmt.Fprint(w, body)
})
}
...@@ -191,12 +191,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf ...@@ -191,12 +191,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
// see upstream.ServeHTTP // see upstream.ServeHTTP
func configureRoutes(u *upstream) { func configureRoutes(u *upstream) {
api := apipkg.NewAPI( api := u.APIClient
u.Backend,
u.Version,
u.RoundTripper,
)
static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude} static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude}
proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config) proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config)
cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper) cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper)
......
...@@ -8,6 +8,7 @@ package upstream ...@@ -8,6 +8,7 @@ package upstream
import ( import (
"fmt" "fmt"
"os"
"net/http" "net/http"
"strings" "strings"
...@@ -15,8 +16,10 @@ import ( ...@@ -15,8 +16,10 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/labkit/correlation" "gitlab.com/gitlab-org/labkit/correlation"
apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/log"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/rejectmethods" "gitlab.com/gitlab-org/gitlab-workhorse/internal/rejectmethods"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper"
...@@ -36,7 +39,9 @@ type upstream struct { ...@@ -36,7 +39,9 @@ type upstream struct {
Routes []routeEntry Routes []routeEntry
RoundTripper http.RoundTripper RoundTripper http.RoundTripper
CableRoundTripper http.RoundTripper CableRoundTripper http.RoundTripper
APIClient *apipkg.API
accessLogger *logrus.Logger accessLogger *logrus.Logger
enableGeoProxyFeature bool
} }
func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler { func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler {
...@@ -60,6 +65,13 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback ...@@ -60,6 +65,13 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
up.RoundTripper = roundtripper.NewBackendRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout, cfg.DevelopmentMode) up.RoundTripper = roundtripper.NewBackendRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout, cfg.DevelopmentMode)
up.CableRoundTripper = roundtripper.NewBackendRoundTripper(up.CableBackend, up.CableSocket, up.ProxyHeadersTimeout, cfg.DevelopmentMode) up.CableRoundTripper = roundtripper.NewBackendRoundTripper(up.CableBackend, up.CableSocket, up.ProxyHeadersTimeout, cfg.DevelopmentMode)
up.configureURLPrefix() up.configureURLPrefix()
up.APIClient = apipkg.NewAPI(
up.Backend,
up.Version,
up.RoundTripper,
)
// Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
up.enableGeoProxyFeature = os.Getenv("GEO_SECONDARY_PROXY") == "1"
routesCallback(&up) routesCallback(&up)
var correlationOpts []correlation.InboundHandlerOption var correlationOpts []correlation.InboundHandlerOption
...@@ -108,6 +120,17 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -108,6 +120,17 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Look for a matching route // Look for a matching route
var route *routeEntry var route *routeEntry
if u.enableGeoProxyFeature {
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
if err == nil {
log.WithRequest(r).WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Info("Geo Proxy: Set route according to Geo Proxy logic")
} else if err != apipkg.ErrNotGeoSecondary {
log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing")
}
}
for _, ro := range u.Routes { for _, ro := range u.Routes {
if ro.isMatch(prefix.Strip(URIPath), r) { if ro.isMatch(prefix.Strip(URIPath), r) {
route = &ro route = &ro
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment