Commit caaf8192 authored by Kirill Smelkov's avatar Kirill Smelkov

X first cut on scalable parallel auth cache.

parent 74604cde
Pipeline #134 failed with stage
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"net/http/httptest" "net/http/httptest"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
) )
...@@ -23,7 +24,7 @@ import ( ...@@ -23,7 +24,7 @@ import (
type AuthReply struct { type AuthReply struct {
// raw reply from auth backend & preAuthorizeHandler(). // raw reply from auth backend & preAuthorizeHandler().
// recorded so we can replay it from auth cache to each client in full // recorded so we can replay it from auth cache to each client in full
// if access is rejected. XXX for accepted too? (see WWW-Authenticate in preAuthorizeHandler) // if e.g. access is rejected.
RawReply *httptest.ResponseRecorder RawReply *httptest.ResponseRecorder
// decoded auth reply // decoded auth reply
...@@ -32,46 +33,106 @@ type AuthReply struct { ...@@ -32,46 +33,106 @@ type AuthReply struct {
// Entry in authorization reply cache // Entry in authorization reply cache
type AuthCacheEntry struct { type AuthCacheEntry struct {
AuthReply // FIXME we need to lock the entry only to "correctly" update Nhit on
// read side but we can tolerate some looses in Nhit and update it
// without mutex or atomic. Only -race complains...
sync.Mutex
Tauth int64 // in seconds XXX needed? AuthReply
// how many times this entry was hit when querying auth cache during Tauth int64 // in seconds XXX needed?
// the last refresh period. Nhit int64 // how many times this entry was hit when querying
Nhit int64 // auth cache during the last refresh period.
ready chan struct{} // closed when entry is ready
} }
// Authorization reply cache // Authorization reply cache
// {} project -> AuthCacheEntry // {} project -> AuthCacheEntry
// //
// XXX should be not only project (privateToken etc...) // XXX should be not only project (privateToken etc...)
var authCache = make(map[string]*AuthCacheEntry) type AuthCache struct {
mu sync.RWMutex // guards .cached
cached map[string]*AuthCacheEntry
}
var authCache = AuthCache{} // XXX make map ?
// XXX u should not be in args?
func (c *AuthCache) VerifyDownloadAccess(u *upstream, project string) AuthReply {
// first try to read from cache in parallel with other readers
c.mu.RLock()
auth := c.cached[project]
c.mu.RUnlock()
have_entry:
// entry in cache - use it
if auth != nil {
//have_entry:
<-auth.ready // make sure it is ready
auth.Lock()
auth.Nhit++
//log.Printf("authReply for %v cached ago: %v (hits: %v)",
// project,
// time.Since(time.Unix(auth.Tauth, 0)),
// auth.Nhit)
auth.Unlock()
// no entry - relock the cache in exclusive mode, create empty entry,
// and start filling it
} else {
c.mu.Lock()
// another ex-reader could be trying to create this entry
// simultaneously with us - recheck
auth = c.cached[project]
if auth != nil {
c.mu.Unlock()
goto have_entry
}
// new non-ready entry
auth = &AuthCacheEntry{ready: make(chan struct{})}
c.cached[project] = auth
c.mu.Unlock()
// this goroutine becomes responsible for quering auth backend
auth.AuthReply = askAuthBackend(u, project)
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
// broadcast to other goroutines this entry is ready
close(auth.ready)
// launch entry refresher
go c.refreshEntry(auth, u, project)
}
return auth.AuthReply
}
// Time period for refreshing / removing unused entires in authCache // Time period for refreshing / removing unused entires in authCache
const authCacheRefresh = 10 * time.Second // XXX 30 const authCacheRefresh = 10 * time.Second // XXX 30
// Goroutine to refresh auth cache entry periodically while it is used. // Goroutine to refresh auth cache entry periodically while it is used.
// if the entry is detected to be not used - remove it from cache and stop refreshing. // if the entry is detected to be not used - remove it from cache and stop refreshing.
func authRefreshEntry(u *upstream, project string) { func (c *AuthCache) refreshEntry(auth *AuthCacheEntry, u *upstream, project string) {
// XXX auth := authCache[project]
// and then use auth without authCache lookup ?
for { for {
time.Sleep(authCacheRefresh) time.Sleep(authCacheRefresh)
// XXX lock auth.Lock()
auth, ok := authCache[project] nhit := auth.Nhit
if !ok { // someone removed the entry from cache - no auth.Unlock()
log.Printf("AUTH refresh - %v entry removed", project)
break // no need to further refresh
}
// clear cache entry if it is not used // clear cache entry if it is not used
log.Printf("AUTH refresh - %v #hit: %v", project, auth.Nhit) log.Printf("AUTH refresh - %v #hit: %v", project, nhit)
if auth.Nhit == 0 { // not used - we can remove and stop refreshing if nhit == 0 { // not used - we can remove and stop refreshing
log.Printf("AUTH - removing %v", project) log.Printf("AUTH - removing %v", project)
// XXX lock // NOTE it is ok even if someone gets this auth in this time window
delete(authCache, project) // and use it for some time
c.mu.Lock()
delete(c.cached, project)
c.mu.Unlock()
break break
} }
...@@ -79,10 +140,11 @@ func authRefreshEntry(u *upstream, project string) { ...@@ -79,10 +140,11 @@ func authRefreshEntry(u *upstream, project string) {
// XXX what if it stucks? // XXX what if it stucks?
authReply := askAuthBackend(u, project) authReply := askAuthBackend(u, project)
// XXX lock auth.Lock()
auth.AuthReply = authReply auth.AuthReply = authReply
auth.Tauth = time.Now().Unix() auth.Tauth = time.Now().Unix()
auth.Nhit = 0 auth.Nhit = 0
auth.Unlock()
} }
} }
...@@ -122,11 +184,12 @@ func askAuthBackend(u *upstream, project string) AuthReply { ...@@ -122,11 +184,12 @@ func askAuthBackend(u *upstream, project string) AuthReply {
return authReply return authReply
} }
/*
// Verify that download access is ok or not. // Verify that download access is ok or not.
// first we try to see authCache; if information is not there -> ask auth backend // first we try to see authCache; if information is not there -> ask auth backend
// download is ok if AuthReply.RepoPath != "" // download is ok if AuthReply.RepoPath != ""
// XXX return -> *AuthReply ? // XXX return -> *AuthReply ?
func verifyDownloadAccess(w http.ResponseWriter, u *upstream, project string) AuthReply { func verifyDownloadAccess(u *upstream, project string) AuthReply {
// XXX lock authCache // XXX lock authCache
auth, ok := authCache[project] auth, ok := authCache[project]
if ok { if ok {
...@@ -147,6 +210,10 @@ func verifyDownloadAccess(w http.ResponseWriter, u *upstream, project string) Au ...@@ -147,6 +210,10 @@ func verifyDownloadAccess(w http.ResponseWriter, u *upstream, project string) Au
return authReply return authReply
} }
*/
func verifyDownloadAccess(u *upstream, project string) AuthReply {
return authCache.VerifyDownloadAccess(u, project)
}
// HTTP handler for `.../raw/<ref>/path` // HTTP handler for `.../raw/<ref>/path`
var projectRe = regexp.MustCompile(`^/[\w\.-]+/[\w\.-]+/`) var projectRe = regexp.MustCompile(`^/[\w\.-]+/[\w\.-]+/`)
...@@ -168,7 +235,7 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) { ...@@ -168,7 +235,7 @@ func handleGetBlobRaw(w http.ResponseWriter, r *gitRequest) {
refpath = refpath[4:] // strip 'raw/...' refpath = refpath[4:] // strip 'raw/...'
// Query download access auth for this project // Query download access auth for this project
authReply := verifyDownloadAccess(w, r.u, project) authReply := verifyDownloadAccess(r.u, project)
if authReply.RepoPath == "" { if authReply.RepoPath == "" {
// access denied - copy auth reply to client in full - // access denied - copy auth reply to client in full -
// there are HTTP code and other headers / body relevant for // there are HTTP code and other headers / body relevant for
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment