Commit a1481cb4 authored by Mike Kozono's avatar Mike Kozono

Implement async API calls

parent e24b98a9
...@@ -3,7 +3,6 @@ package api ...@@ -3,7 +3,6 @@ package api
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
...@@ -40,8 +39,6 @@ type API struct { ...@@ -40,8 +39,6 @@ type API struct {
Version string Version string
} }
var ErrNotGeoSecondary = errors.New("this is not a Geo secondary site")
var ( var (
requestsCounter = promauto.NewCounterVec( requestsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
...@@ -399,7 +396,6 @@ func validResponseContentType(resp *http.Response) bool { ...@@ -399,7 +396,6 @@ func validResponseContentType(resp *http.Response) bool {
return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type")) return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type"))
} }
// TODO: Cache the result of the API requests https://gitlab.com/gitlab-org/gitlab/-/issues/329671
func (api *API) GetGeoProxyURL() (*url.URL, error) { func (api *API) GetGeoProxyURL() (*url.URL, error) {
geoProxyApiUrl := *api.URL geoProxyApiUrl := *api.URL
geoProxyApiUrl.Path, geoProxyApiUrl.RawPath = joinURLPath(api.URL, geoProxyEndpointPath) geoProxyApiUrl.Path, geoProxyApiUrl.RawPath = joinURLPath(api.URL, geoProxyEndpointPath)
...@@ -424,10 +420,6 @@ func (api *API) GetGeoProxyURL() (*url.URL, error) { ...@@ -424,10 +420,6 @@ func (api *API) GetGeoProxyURL() (*url.URL, error) {
return nil, fmt.Errorf("GetGeoProxyURL: decode response: %v", err) return nil, fmt.Errorf("GetGeoProxyURL: decode response: %v", err)
} }
if response.GeoProxyURL == "" {
return nil, ErrNotGeoSecondary
}
geoProxyURL, err := url.Parse(response.GeoProxyURL) geoProxyURL, err := url.Parse(response.GeoProxyURL)
if err != nil { if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err) return nil, fmt.Errorf("GetGeoProxyURL: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
......
...@@ -22,16 +22,14 @@ func TestGetGeoProxyURLWhenGeoSecondary(t *testing.T) { ...@@ -22,16 +22,14 @@ func TestGetGeoProxyURLWhenGeoSecondary(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, `{"geo_proxy_url":"http://primary"}`) geoProxyURL, err := getGeoProxyURLGivenResponse(t, `{"geo_proxy_url":"http://primary"}`)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, geoProxyURL)
require.Equal(t, "http://primary", geoProxyURL.String()) require.Equal(t, "http://primary", geoProxyURL.String())
} }
func TestGetGeoProxyURLWhenGeoPrimaryOrNonGeo(t *testing.T) { func TestGetGeoProxyURLWhenGeoPrimaryOrNonGeo(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, "{}") geoProxyURL, err := getGeoProxyURLGivenResponse(t, "{}")
require.Error(t, err) require.NoError(t, err)
require.Equal(t, ErrNotGeoSecondary, err) require.Equal(t, "", geoProxyURL.String())
require.Nil(t, geoProxyURL)
} }
func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string) (*url.URL, error) { func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string) (*url.URL, error) {
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"os" "os"
"sync" "sync"
"time"
"net/http" "net/http"
"net/url" "net/url"
...@@ -35,6 +36,7 @@ var ( ...@@ -35,6 +36,7 @@ var (
requestHeaderBlacklist = []string{ requestHeaderBlacklist = []string{
upload.RewrittenFieldsHeader, upload.RewrittenFieldsHeader,
} }
geoProxyApiPollingInterval = 10 * time.Second
) )
type upstream struct { type upstream struct {
...@@ -48,6 +50,7 @@ type upstream struct { ...@@ -48,6 +50,7 @@ type upstream struct {
geoLocalRoutes []routeEntry geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry geoProxyCableRoute routeEntry
geoProxyRoute routeEntry geoProxyRoute routeEntry
geoProxyTestChannel chan struct{}
accessLogger *logrus.Logger accessLogger *logrus.Logger
enableGeoProxyFeature bool enableGeoProxyFeature bool
mu sync.RWMutex mu sync.RWMutex
...@@ -61,6 +64,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback ...@@ -61,6 +64,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
up := upstream{ up := upstream{
Config: cfg, Config: cfg,
accessLogger: accessLogger, accessLogger: accessLogger,
// Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1",
geoProxyBackend: &url.URL{},
} }
if up.Backend == nil { if up.Backend == nil {
up.Backend = DefaultBackend up.Backend = DefaultBackend
...@@ -79,10 +85,13 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback ...@@ -79,10 +85,13 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
up.Version, up.Version,
up.RoundTripper, up.RoundTripper,
) )
// Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
up.enableGeoProxyFeature = os.Getenv("GEO_SECONDARY_PROXY") == "1"
routesCallback(&up) routesCallback(&up)
if up.enableGeoProxyFeature {
go up.pollGeoProxyAPI()
}
var correlationOpts []correlation.InboundHandlerOption var correlationOpts []correlation.InboundHandlerOption
if cfg.PropagateCorrelationID { if cfg.PropagateCorrelationID {
correlationOpts = append(correlationOpts, correlation.WithPropagation()) correlationOpts = append(correlationOpts, correlation.WithPropagation())
...@@ -168,19 +177,14 @@ func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry { ...@@ -168,19 +177,14 @@ func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry {
} }
func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry { func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry {
geoProxyURL, err := u.APIClient.GetGeoProxyURL() u.mu.RLock()
defer u.mu.RUnlock()
if err == nil { if u.geoProxyBackend.String() == "" {
u.setGeoProxyRoutes(geoProxyURL) log.WithRequest(r).Debug("Geo Proxy: Not a Geo proxy")
return u.matchGeoProxyRoute(cleanedPath, r) return nil
} else if err != apipkg.ErrNotGeoSecondary {
log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing")
} }
return nil
}
func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry {
// Some routes are safe to serve from this GitLab instance // Some routes are safe to serve from this GitLab instance
for _, ro := range u.geoLocalRoutes { for _, ro := range u.geoLocalRoutes {
if ro.isMatch(cleanedPath, r) { if ro.isMatch(cleanedPath, r) {
...@@ -191,8 +195,6 @@ func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *rout ...@@ -191,8 +195,6 @@ func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *rout
log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request") log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request")
u.mu.RLock()
defer u.mu.RUnlock()
if cleanedPath == "/-/cable" { if cleanedPath == "/-/cable" {
return &u.geoProxyCableRoute return &u.geoProxyCableRoute
} }
...@@ -200,15 +202,40 @@ func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *rout ...@@ -200,15 +202,40 @@ func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *rout
return &u.geoProxyRoute return &u.geoProxyRoute
} }
func (u *upstream) setGeoProxyRoutes(geoProxyURL *url.URL) { func (u *upstream) pollGeoProxyAPI() {
for {
u.callGeoProxyAPI()
// Notify tests when callGeoProxyAPI() finishes
if u.geoProxyTestChannel != nil {
u.geoProxyTestChannel <- struct{}{}
}
time.Sleep(geoProxyApiPollingInterval)
}
}
// Calls /api/v4/geo/proxy and sets up routes
func (u *upstream) callGeoProxyAPI() {
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
if err != nil {
log.WithError(err).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Error("Geo Proxy: Unable to determine Geo Proxy URL. Fallback on cached value.")
return
}
if u.geoProxyBackend.String() != geoProxyURL.String() {
log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyURL}).Info("Geo Proxy: URL changed")
u.updateGeoProxyFields(geoProxyURL)
}
}
func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
u.mu.Lock() u.mu.Lock()
defer u.mu.Unlock() defer u.mu.Unlock()
if u.geoProxyBackend == nil || u.geoProxyBackend.String() != geoProxyURL.String() {
log.WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Debug("Geo Proxy: Update GeoProxyRoute") u.geoProxyBackend = geoProxyURL
u.geoProxyBackend = geoProxyURL geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper)
geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream)
u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) u.geoProxyRoute = u.route("", "", geoProxyUpstream)
u.geoProxyRoute = u.route("", "", geoProxyUpstream)
}
} }
...@@ -141,7 +141,7 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { ...@@ -141,7 +141,7 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
runTestCases(t, ws, testCases) runTestCases(t, ws, testCases)
} }
func TestGeoProxyWithAPIError(t *testing.T) { func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) {
geoProxyEndpointResponseBody := "Invalid response" geoProxyEndpointResponseBody := "Invalid response"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
...@@ -214,10 +214,15 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin ...@@ -214,10 +214,15 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin
} }
func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) { func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) {
geoProxyTestChannel := make(chan struct{})
myConfigureRoutes := func(u *upstream) { myConfigureRoutes := func(u *upstream) {
// Enable environment variable "feature flag" // Enable environment variable "feature flag"
u.enableGeoProxyFeature = enableGeoProxyFeature u.enableGeoProxyFeature = enableGeoProxyFeature
// An empty message will be sent to this channel after every callGeoProxyAPI()
u.geoProxyTestChannel = geoProxyTestChannel
// call original // call original
configureRoutes(u) configureRoutes(u)
} }
...@@ -226,5 +231,13 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h ...@@ -226,5 +231,13 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h
ws := httptest.NewServer(upstreamHandler) ws := httptest.NewServer(upstreamHandler)
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
if enableGeoProxyFeature {
// Wait for an empty message from callGeoProxyAPI(). This should be done on
// all tests where enableGeoProxyFeature is true, including the ones where
// we expect geoProxyURL to be nil or error, to ensure the tests do not pass
// by coincidence.
<-geoProxyTestChannel
}
return ws, ws.Close return ws, ws.Close
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment