Commit fb57599d authored by Kirill Smelkov's avatar Kirill Smelkov

blob/auth: Cache auth backend reply for 30s

In previous patch we added code to serve blob content via running `git cat-file
...` directly, but for every such request a request to slow RoR-based auth
backend is made, which is bad for performance.

Let's cache auth backend reply for small period of time, e.g. 30 seconds, which
will change the situation dramatically:

If we have a lot of requests to the same repository, we query auth backend only
for every Nth request and with e.g. 100 raw blob request/s N=3000 which means
that previous load to RoR code essentially goes away.

On the other hand as we query auth backend only once in a while and refresh the
cache, we will not miss potential changes in project settings. I mean potential
e.g. 25 seconds delay for a project to become public, or vise versa to become
private does no real harm.

The cache is done with the idea to allow the read side codepath to execute in
parallel and to be not blocked by eventual cache updates.

Overall this improves performance a lot:

  (on a 8-CPU i7-3770S with 16GB of RAM, 2001:67c:1254:e:89::fa34 is on localhost)

  # request is handled by gitlab-workhorse, but without auth caching
  $ ./wrk -c40 -d10 -t1 --latency http://[2001:67c:1254:e:89::fa34]:7777/root/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ http://[2001:67c:1254:e:89::fa34]:7777/root/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency   622.99ms  200.08ms   1.40s    77.03%
      Req/Sec    62.65     22.37   120.00     55.00%
    Latency Distribution
       50%  589.51ms
       75%  726.88ms
       90%  896.09ms
       99%    1.18s
    626 requests in 10.01s, 1.11MB read
  Requests/sec:     62.55
  Transfer/sec:    113.73KB

  # request goes to gitlab-workhorse with auth caching (this patch)
  $ ./wrk -c40 -d10 -t1 --latency http://[2001:67c:1254:e:89::fa34]:7777/root/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ http://[2001:67c:1254:e:89::fa34]:7777/root/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency    36.62ms   25.39ms 351.14ms   72.02%
      Req/Sec     1.16k    93.73     1.36k    77.00%
    Latency Distribution
       50%   36.30ms
       75%   47.02ms
       90%   66.36ms
       99%  122.46ms
    11580 requests in 10.01s, 20.56MB read
  Requests/sec:   1156.85
  Transfer/sec:      2.05MB

i.e. it is ~ 17x improvement.
parent 277d5067
Pipeline #280 failed with stage
...@@ -14,20 +14,23 @@ import ( ...@@ -14,20 +14,23 @@ import (
) )
type API struct { type API struct {
Client *http.Client Client *http.Client
URL *url.URL URL *url.URL
Version string Version string
authCache *AuthCache
} }
func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTripper) *API { func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTripper) *API {
if roundTripper == nil { if roundTripper == nil {
roundTripper = badgateway.NewRoundTripper("", 0) roundTripper = badgateway.NewRoundTripper("", 0)
} }
return &API{ a := &API{
Client: &http.Client{Transport: roundTripper}, Client: &http.Client{Transport: roundTripper},
URL: myURL, URL: myURL,
Version: version, Version: version,
} }
a.authCache = NewAuthCache(a)
return a
} }
type HandleFunc func(http.ResponseWriter, *http.Request, *Response) type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
......
...@@ -6,12 +6,16 @@ import ( ...@@ -6,12 +6,16 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"strings"
"sync"
"time"
) )
// Reply from auth backend, e.g. for "download from repo" authorization request // Reply from auth backend, e.g. for "download from repo" authorization request
type AuthReply struct { type AuthReply struct {
// raw reply from auth backend & PreAuthorizeHandler(). // raw reply from auth backend & PreAuthorizeHandler().
// recorded so we can replay it to client in full // recorded so we can replay it from auth cache to each client in full
// if e.g. access is rejected. // if e.g. access is rejected.
RawReply *httptest.ResponseRecorder RawReply *httptest.ResponseRecorder
...@@ -19,10 +23,188 @@ type AuthReply struct { ...@@ -19,10 +23,188 @@ type AuthReply struct {
Response Response
} }
// Entry in authorization reply cache
type AuthCacheEntry struct {
// FIXME we need to lock the entry only to "correctly" update Nhit on
// read side, but we can tolerate some looses in Nhit and update it
// without mutex or atomic. Only -race complains...
// ( we could use atomic.Value for atomic whole cache entry updates from
// refresher without need for locks on readers side, but the need to do
// .Nhit++ on readers side ruins that )
sync.Mutex
AuthReply
Tauth int64 // in seconds
Nhit int64 // how many times this entry was hit when querying auth cache during the last refresh period.
ready chan struct{} // closed when entry is ready
}
// Entries are keyed by project + credentials
type AuthCacheKey struct {
project string
query string // e.g. with passing in private_token=...
header string // request header url-encoded, e.g. PRIVATE-TOKEN=...
}
// Authorization reply cache
// {} AuthCacheKey -> *AuthCacheEntry
type AuthCache struct {
a *API // for which API we cache auth
mu sync.RWMutex // guards .cached
cached map[AuthCacheKey]*AuthCacheEntry
}
func NewAuthCache(a *API) *AuthCache {
return &AuthCache{a: a, cached: make(map[AuthCacheKey]*AuthCacheEntry)}
}
// Verify that download access is ok or not.
// first we try to use the cache; if information is not there -> ask auth backend
// download is ok if AuthReply.RepoPath != ""
func (c *AuthCache) VerifyDownloadAccess(project string, query string, header http.Header) AuthReply {
// Use only tokens from query/header to minimize cache and avoid
// creating redundant cache entries because of e.g. unrelated headers.
queryValues, _ := url.ParseQuery(query) // this is what URL.Query() does
q := url.Values{}
for k, v := range queryValues {
if strings.HasSuffix(k, "_token") {
q[k] = v
}
}
h := url.Values{}
for k, v := range header {
if strings.HasSuffix(strings.ToUpper(k), "-TOKEN") {
h[k] = v
}
}
key := AuthCacheKey{project, q.Encode(), h.Encode()}
return c.verifyDownloadAccess(key)
}
func (c *AuthCache) verifyDownloadAccess(key AuthCacheKey) AuthReply {
var authReply AuthReply
// first try to read from cache in parallel with other readers
c.mu.RLock()
auth := c.cached[key]
c.mu.RUnlock()
have_entry:
// entry in cache - use it
if auth != nil {
<-auth.ready // wait until it is ready
auth.Lock()
auth.Nhit++
//log.Printf("authReply for %v cached ago: %v (hits: %v)",
// key,
// time.Since(time.Unix(auth.Tauth, 0)),
// auth.Nhit)
authReply = auth.AuthReply
auth.Unlock()
} else {
// no entry - relock the cache in exclusive mode, create empty entry,
// and start filling it
c.mu.Lock()
// another ex-reader could be trying to create this entry
// simultaneously with us - recheck
auth = c.cached[key]
if auth != nil {
c.mu.Unlock()
goto have_entry
}
// new not-yet-ready entry
auth = &AuthCacheEntry{ready: make(chan struct{})}
c.cached[key] = auth
c.mu.Unlock()
// this goroutine becomes responsible for querying auth backend
auth.AuthReply = c.askAuthBackend(key)
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
authReply = auth.AuthReply
// broadcast to other goroutines that this entry is ready
close(auth.ready)
// launch entry refresher
go c.refreshEntry(auth, key)
}
return authReply
}
// Time period for refreshing / removing unused entires in authCache
const authCacheRefresh = 30 * time.Second
// Goroutine to refresh auth cache entry periodically while it is used.
// if the entry is detected to be not used - remove it from cache and stop refreshing.
func (c *AuthCache) refreshEntry(auth *AuthCacheEntry, key AuthCacheKey) {
for {
time.Sleep(authCacheRefresh)
auth.Lock()
nhit := auth.Nhit
auth.Unlock()
// clear cache entry if it is not used
//log.Printf("AUTH refresh - %v #hit: %v", key, nhit)
if nhit == 0 { // not used - we can remove and stop refreshing
//log.Printf("AUTH - removing %v", key)
// NOTE it is ok even if someone gets this auth in this time window
// and use it for some time
c.mu.Lock()
delete(c.cached, key)
c.mu.Unlock()
break
}
//log.Printf("AUTH - refreshing %v", key)
authReply := c.askAuthBackend(key)
auth.Lock()
auth.AuthReply = authReply
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
auth.Unlock()
}
}
// Ask auth backend about cache key
func (c *AuthCache) askAuthBackend(key AuthCacheKey) AuthReply {
// key.header -> url.Values -> http.Header
hv, err := url.ParseQuery(key.header)
if err != nil {
// we prepared key.header ourselves via url-encoding in AuthCache.VerifyDownloadAccess().
// It must be ok
panic(err)
}
header := make(http.Header)
for k, v := range hv {
header[k] = v
}
return c.a.verifyDownloadAccess(key.project, key.query, header)
}
// Ask auth backend about whether download is ok for a project. // Ask auth backend about whether download is ok for a project.
// Authorization is approved if AuthReply.RepoPath != "" on return // Authorization is approved if AuthReply.RepoPath != "" on return
// Raw auth backend response is emitted to AuthReply.RawReply // Raw auth backend response is emitted to AuthReply.RawReply
//
// Replies from authentication backend are cached for 30 seconds as each
// request to Rails code is heavy and slow.
func (a *API) VerifyDownloadAccess(project, query string, header http.Header) AuthReply { func (a *API) VerifyDownloadAccess(project, query string, header http.Header) AuthReply {
return a.authCache.VerifyDownloadAccess(project, query, header)
}
func (a *API) verifyDownloadAccess(project, query string, header http.Header) AuthReply {
authReply := AuthReply{ authReply := AuthReply{
RawReply: httptest.NewRecorder(), RawReply: httptest.NewRecorder(),
} }
......
...@@ -49,7 +49,9 @@ func handleGetBlobRaw(a *api.API, w http.ResponseWriter, r *http.Request) { ...@@ -49,7 +49,9 @@ func handleGetBlobRaw(a *api.API, w http.ResponseWriter, r *http.Request) {
w.Header()[k] = v w.Header()[k] = v
} }
w.WriteHeader(authReply.RawReply.Code) w.WriteHeader(authReply.RawReply.Code)
_, err := io.Copy(w, authReply.RawReply.Body) // NOTE do not consume authReply.RawReply.Body with io.Copy() -
// this way it will be read one time only and next reads will be empty.
_, err := w.Write(authReply.RawReply.Body.Bytes())
if err != nil { if err != nil {
helper.LogError(fmt.Errorf("writing authReply.RawReply.Body: %v", err)) helper.LogError(fmt.Errorf("writing authReply.RawReply.Body: %v", err))
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment