Commit 2beb8c95 authored by Kirill Smelkov's avatar Kirill Smelkov

blob: Cache auth backend reply for 30s

In previous patch we added code to serve blob content via running `git cat-file
...` directly, but for every such request a request to slow RoR-based auth
backend is made, which is bad for performance.

Let's cache auth backend reply for small period of time, e.g. 30 seconds, which
will change the situation dramatically:

If we have a lot of requests to the same repository, we query auth backend only
for every Nth request and with e.g. 100 raw blob request/s N=3000 which means
that previous load to RoR code essentially goes away.

On the other hand as we query auth backend only once in a while and refresh the
cache we will not miss potential changes in project settings. I mean potential
e.g. 25 seconds delay for a project to become public, or vise versa to become
private does no real harm.

The cache is done with the idea to allow the read side codepath to execute in
parallel and to be not blocked by eventual cache updates.

Overall this improves performance a lot:

  (on a 8-CPU i7-3770S with 16GB of RAM)

  # request goes to gitlab-workhorse with the following added to nginx conf
  # location ~ ^/[\w\.-]+/[\w\.-]+/raw/ {
  #   error_page 418 = @gitlab-workhorse;
  #   return 418;
  # }
  # but without auth caching
  $ ./wrk -c40 -d10 -t1 --latency https://[2001:67c:1254:e:8b::c776]:7777/root/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ https://[2001:67c:1254:e:8b::c776]:7777/root/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency   549.37ms  220.53ms   1.69s    84.74%
      Req/Sec    71.01     25.49   160.00     70.71%
    Latency Distribution
       50%  514.66ms
       75%  584.32ms
       90%  767.46ms
       99%    1.37s
    709 requests in 10.01s, 1.26MB read
  Requests/sec:     70.83
  Transfer/sec:    128.79KB

  # request goes to gitlab-workhorse with auth caching (this patch)
  $ ./wrk -c40 -d10 -t1 --latency https://[2001:67c:1254:e:8b::c776]:7777/root/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ https://[2001:67c:1254:e:8b::c776]:7777/root/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency    35.18ms   20.78ms 291.34ms   72.79%
      Req/Sec     1.18k   135.34     1.34k    88.00%
    Latency Distribution
       50%   33.96ms
       75%   44.35ms
       90%   58.70ms
       99%  104.76ms
    11704 requests in 10.01s, 20.78MB read
  Requests/sec:   1169.75
  Transfer/sec:      2.08MB

i.e. it is ~ 17x improvement.
parent 1b274d0d
Pipeline #153 failed with stage
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// //
// Blobs are read via `git cat-file ...` with first querying authentication // Blobs are read via `git cat-file ...` with first querying authentication
// backend about download-access permission for containing repository. // backend about download-access permission for containing repository.
// Replies from authentication backend are cached for 30 seconds to keep
// access-to-blobs latency to minimum.
package main package main
...@@ -13,14 +15,17 @@ import ( ...@@ -13,14 +15,17 @@ import (
"log" "log"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"regexp" "regexp"
"strings" "strings"
"sync"
"time"
) )
// Reply from auth backend for "download from repo" authorization request // Reply from auth backend for "download from repo" authorization request
type AuthReply struct { type AuthReply struct {
// raw reply from auth backend & preAuthorizeHandler(). // raw reply from auth backend & preAuthorizeHandler().
// recorded so we can replay it to client in full // recorded so we can replay it from auth cache to each client in full
// if e.g. access is rejected. // if e.g. access is rejected.
RawReply *httptest.ResponseRecorder RawReply *httptest.ResponseRecorder
...@@ -28,6 +33,176 @@ type AuthReply struct { ...@@ -28,6 +33,176 @@ type AuthReply struct {
authorizationResponse authorizationResponse
} }
// Entry in authorization reply cache
type AuthCacheEntry struct {
// FIXME we need to lock the entry only to "correctly" update Nhit on
// read side, but we can tolerate some looses in Nhit and update it
// without mutex or atomic. Only -race complains...
// ( we could use atomic.Value for atomic whole cache entry updates from
// refresher without need for locks on readers side, but the need to do
// .Nhit++ on readers side ruins that )
sync.Mutex
AuthReply
Tauth int64 // in seconds
Nhit int64 // how many times this entry was hit when querying auth cache during the last refresh period.
ready chan struct{} // closed when entry is ready
}
// Entries are keyed by project + credentials
type AuthCacheKey struct {
project string
query string // e.g. with passing in private_token=...
header string // request header url-encoded, e.g. PRIVATE-TOKEN=...
}
// Authorization reply cache
// {} AuthCacheKey -> *AuthCacheEntry
type AuthCache struct {
u *upstream // for which upstream we cache auth
mu sync.RWMutex // guards .cached
cached map[AuthCacheKey]*AuthCacheEntry
}
func NewAuthCache(u *upstream) *AuthCache {
return &AuthCache{u: u, cached: make(map[AuthCacheKey]*AuthCacheEntry)}
}
// Verify that download access is ok or not.
// first we try to use the cache; if information is not there -> ask auth backend
// download is ok if AuthReply.RepoPath != ""
func (c *AuthCache) VerifyDownloadAccess(project string, query url.Values, header http.Header) AuthReply {
// Use only tokens from query/header to minimize cache and avoid
// creating redundant cache entries because of e.g. unrelated headers.
q := url.Values{}
for k, v := range query {
if strings.HasSuffix(k, "_token") {
q[k] = v
}
}
h := url.Values{}
for k, v := range header {
if strings.HasSuffix(strings.ToUpper(k), "-TOKEN") {
h[k] = v
}
}
key := AuthCacheKey{project, q.Encode(), h.Encode()}
return c.verifyDownloadAccess(key)
}
func (c *AuthCache) verifyDownloadAccess(key AuthCacheKey) AuthReply {
var authReply AuthReply
// first try to read from cache in parallel with other readers
c.mu.RLock()
auth := c.cached[key]
c.mu.RUnlock()
have_entry:
// entry in cache - use it
if auth != nil {
<-auth.ready // wait untill it is ready
auth.Lock()
auth.Nhit++
//log.Printf("authReply for %v cached ago: %v (hits: %v)",
// key,
// time.Since(time.Unix(auth.Tauth, 0)),
// auth.Nhit)
authReply = auth.AuthReply
auth.Unlock()
} else {
// no entry - relock the cache in exclusive mode, create empty entry,
// and start filling it
c.mu.Lock()
// another ex-reader could be trying to create this entry
// simultaneously with us - recheck
auth = c.cached[key]
if auth != nil {
c.mu.Unlock()
goto have_entry
}
// new not-yet-ready entry
auth = &AuthCacheEntry{ready: make(chan struct{})}
c.cached[key] = auth
c.mu.Unlock()
// this goroutine becomes responsible for querying auth backend
auth.AuthReply = c.askAuthBackend(key)
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
authReply = auth.AuthReply
// broadcast to other goroutines that this entry is ready
close(auth.ready)
// launch entry refresher
go c.refreshEntry(auth, key)
}
return authReply
}
// Time period for refreshing / removing unused entires in authCache
const authCacheRefresh = 30 * time.Second
// Goroutine to refresh auth cache entry periodically while it is used.
// if the entry is detected to be not used - remove it from cache and stop refreshing.
func (c *AuthCache) refreshEntry(auth *AuthCacheEntry, key AuthCacheKey) {
for {
time.Sleep(authCacheRefresh)
auth.Lock()
nhit := auth.Nhit
auth.Unlock()
// clear cache entry if it is not used
//log.Printf("AUTH refresh - %v #hit: %v", key, nhit)
if nhit == 0 { // not used - we can remove and stop refreshing
//log.Printf("AUTH - removing %v", key)
// NOTE it is ok even if someone gets this auth in this time window
// and use it for some time
c.mu.Lock()
delete(c.cached, key)
c.mu.Unlock()
break
}
//log.Printf("AUTH - refreshing %v", key)
authReply := c.askAuthBackend(key)
auth.Lock()
auth.AuthReply = authReply
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
auth.Unlock()
}
}
// Ask auth backend about cache key
func (c *AuthCache) askAuthBackend(key AuthCacheKey) AuthReply {
// key.header -> url.Values -> http.Header
hv, err := url.ParseQuery(key.header)
if err != nil {
// we prepared key.header ourselves via url-encoding in AuthCache.VerifyDownloadAccess().
// It must be ok
panic(err)
}
header := make(http.Header)
for k, v := range hv {
header[k] = v
}
return askAuthBackend(c.u, key.project, key.query, header)
}
// Ask auth backend about whether download is ok for a project. // Ask auth backend about whether download is ok for a project.
// Authorization is approved if AuthReply.RepoPath != "" on return // Authorization is approved if AuthReply.RepoPath != "" on return
// Raw auth backend response is emitted to AuthReply.RawReply // Raw auth backend response is emitted to AuthReply.RawReply
...@@ -89,7 +264,7 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) { ...@@ -89,7 +264,7 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) {
refpath := u.Path[rawLoc[1]:] refpath := u.Path[rawLoc[1]:]
// Query download access auth for this project // Query download access auth for this project
authReply := askAuthBackend(r.u, project, u.RawQuery, r.Request.Header) authReply := r.u.authCache.VerifyDownloadAccess(project, u.Query(), r.Request.Header)
if authReply.RepoPath == "" { if authReply.RepoPath == "" {
// access denied - copy auth reply to client in full - // access denied - copy auth reply to client in full -
// there are HTTP code and other headers / body relevant for // there are HTTP code and other headers / body relevant for
...@@ -98,7 +273,9 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) { ...@@ -98,7 +273,9 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) {
w.Header()[k] = v w.Header()[k] = v
} }
w.WriteHeader(authReply.RawReply.Code) w.WriteHeader(authReply.RawReply.Code)
_, err := io.Copy(w, authReply.RawReply.Body) // NOTE do not consume authReply.RawReply.Body with io.Copy() -
// this way it will be read one time only and next reads will be empty.
_, err := w.Write(authReply.RawReply.Body.Bytes())
if err != nil { if err != nil {
logError(fmt.Errorf("writing authReply.RawReply.Body: %v", err)) logError(fmt.Errorf("writing authReply.RawReply.Body: %v", err))
} }
......
...@@ -20,6 +20,7 @@ type serviceHandleFunc func(w http.ResponseWriter, r *gitRequest) ...@@ -20,6 +20,7 @@ type serviceHandleFunc func(w http.ResponseWriter, r *gitRequest)
type upstream struct { type upstream struct {
httpClient *http.Client httpClient *http.Client
authBackend string authBackend string
authCache *AuthCache
} }
type gitService struct { type gitService struct {
...@@ -89,7 +90,9 @@ var gitServices = [...]gitService{ ...@@ -89,7 +90,9 @@ var gitServices = [...]gitService{
} }
func newUpstream(authBackend string, authTransport http.RoundTripper) *upstream { func newUpstream(authBackend string, authTransport http.RoundTripper) *upstream {
return &upstream{&http.Client{Transport: authTransport}, authBackend} u := &upstream{&http.Client{Transport: authTransport}, authBackend, nil}
u.authCache = NewAuthCache(u)
return u
} }
func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment