Commit 0bdfc8b1 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-refactor-upload-preparers' into 'master'

Refactor Preparer and SaveFileOpts handling [1/3]

See merge request gitlab-org/gitlab-workhorse!515
parents 15c9d4d5 7efb5f0a
---
title: Refactor Preparer and SaveFileOpts handling
merge_request: 515
author:
type: other
......@@ -141,10 +141,15 @@ func (a *artifactsUploadProcessor) Name() string {
return "artifacts"
}
func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
func UploadArtifacts(myAPI *api.API, h http.Handler, p upload.Preparer) http.Handler {
return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
mg := &artifactsUploadProcessor{opts: filestore.GetOpts(a), SavedFileTracker: upload.SavedFileTracker{Request: r}}
opts, _, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options"))
return
}
upload.HandleFileUploads(w, r, h, a, mg)
mg := &artifactsUploadProcessor{opts: opts, SavedFileTracker: upload.SavedFileTracker{Request: r}}
upload.HandleFileUploads(w, r, h, a, mg, opts)
}, "/authorize")
}
......@@ -154,7 +154,7 @@ func testUploadArtifacts(t *testing.T, contentType, url string, body io.Reader)
testhelper.ConfigureSecret()
apiClient := api.NewAPI(parsedURL, "123", roundTripper)
proxyClient := proxy.NewProxy(parsedURL, "123", roundTripper)
UploadArtifacts(apiClient, proxyClient).ServeHTTP(response, httpRequest)
UploadArtifacts(apiClient, proxyClient, &upload.DefaultPreparer{}).ServeHTTP(response, httpRequest)
return response
}
......
......@@ -9,6 +9,7 @@ import (
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
)
......@@ -32,6 +33,10 @@ func (l *object) Verify(fh *filestore.FileHandler) error {
type uploadPreparer struct{}
func NewLfsUploadPreparer(c config.Config) upload.Preparer {
return &uploadPreparer{}
}
func (l *uploadPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, upload.Verifier, error) {
opts := filestore.GetOpts(a)
opts.TempFilePrefix = a.LfsOid
......@@ -39,6 +44,6 @@ func (l *uploadPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, uplo
return opts, &object{oid: a.LfsOid, size: a.LfsSize}, nil
}
func PutStore(a *api.API, h http.Handler) http.Handler {
return upload.BodyUploader(a, h, &uploadPreparer{})
func PutStore(a *api.API, h http.Handler, p upload.Preparer) http.Handler {
return upload.BodyUploader(a, h, p)
}
package upload
import (
"fmt"
"net/http"
"github.com/dgrijalva/jwt-go"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields"
......@@ -15,9 +17,16 @@ type MultipartClaims struct {
jwt.StandardClaims
}
func Accelerate(rails PreAuthorizer, h http.Handler) http.Handler {
func Accelerate(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
s := &SavedFileTracker{Request: r}
HandleFileUploads(w, r, h, a, s)
opts, _, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("Accelerate: error preparing file storage options"))
return
}
HandleFileUploads(w, r, h, a, s, opts)
}, "/authorize")
}
......@@ -29,9 +29,9 @@ type Preparer interface {
Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error)
}
type defaultPreparer struct{}
type DefaultPreparer struct{}
func (s *defaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
func (s *DefaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
return filestore.GetOpts(a), nil, nil
}
......@@ -39,10 +39,6 @@ func (s *defaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Ver
// uploading it.
// Providing an Preparer allows to customize the upload process
func BodyUploader(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
if p == nil {
p = &defaultPreparer{}
}
return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
opts, verifier, err := p.Prepare(a)
if err != nil {
......
......@@ -29,7 +29,7 @@ func TestBodyUploader(t *testing.T) {
body := strings.NewReader(fileContent)
resp := testUpload(&rails{}, nil, echoProxy(t, fileLen), body)
resp := testUpload(&rails{}, &alwaysLocalPreparer{}, echoProxy(t, fileLen), body)
require.Equal(t, http.StatusOK, resp.StatusCode)
uploadEcho, err := ioutil.ReadAll(resp.Body)
......@@ -63,7 +63,7 @@ func TestBodyUploaderCustomVerifier(t *testing.T) {
}
func TestBodyUploaderAuthorizationFailure(t *testing.T) {
testNoProxyInvocation(t, http.StatusUnauthorized, &rails{unauthorized: true}, nil)
testNoProxyInvocation(t, http.StatusUnauthorized, &rails{unauthorized: true}, &alwaysLocalPreparer{})
}
func TestBodyUploaderErrors(t *testing.T) {
......
......@@ -61,7 +61,7 @@ func init() {
prometheus.MustRegister(multipartFiles)
}
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor) error {
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) error {
// Create multipart reader
reader, err := r.MultipartReader()
if err != nil {
......@@ -100,7 +100,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
}
if p.FileName() != "" {
err = rew.handleFilePart(r.Context(), name, p)
err = rew.handleFilePart(r.Context(), name, p, opts)
} else {
err = rew.copyPart(r.Context(), name, p)
}
......@@ -113,7 +113,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
return nil
}
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part) error {
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *filestore.SaveFileOpts) error {
multipartFiles.WithLabelValues(rew.filter.Name()).Inc()
filename := p.FileName()
......@@ -122,7 +122,6 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
return fmt.Errorf("illegal filename: %q", filename)
}
opts := filestore.GetOpts(rew.preauth)
opts.TempFilePrefix = filename
var inputReader io.Reader
......
......@@ -23,8 +23,7 @@ type MultipartFormProcessor interface {
Name() string
}
func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor) {
opts := filestore.GetOpts(preauth)
func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) {
if !opts.IsLocal() && !opts.IsRemote() {
helper.Fail500(w, r, fmt.Errorf("handleFileUploads: missing destination storage"))
return
......@@ -35,7 +34,7 @@ func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, p
defer writer.Close()
// Rewrite multipart form data
err := rewriteFormFilesFromMultipart(r, writer, preauth, filter)
err := rewriteFormFilesFromMultipart(r, writer, preauth, filter, opts)
if err != nil {
switch err {
case ErrInjectedClientParam:
......
......@@ -56,7 +56,12 @@ func TestUploadTempPathRequirement(t *testing.T) {
if err != nil {
t.Fatal(err)
}
HandleFileUploads(response, request, nilHandler, &api.Response{}, nil)
apiResponse := &api.Response{}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, request, nilHandler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 500)
}
......@@ -91,7 +96,13 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
response := httptest.NewRecorder()
handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, nil)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, handler, apiResponse, nil, opts)
testhelper.AssertResponseCode(t, response, 202)
if response.Body.String() != "RESPONSE" {
t.Fatal("Expected RESPONSE in response body")
......@@ -191,7 +202,13 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
response := httptest.NewRecorder()
handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 202)
cancel() // this will trigger an async cleanup
......@@ -263,7 +280,12 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
response := httptest.NewRecorder()
handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, test.response)
cancel() // this will trigger an async cleanup
......@@ -292,7 +314,13 @@ func TestUploadProcessingField(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 500)
}
......@@ -347,7 +375,13 @@ func TestUploadProcessingFile(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, &test.preauth, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 200)
})
}
......@@ -390,7 +424,12 @@ func TestInvalidFileNames(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, &api.Response{TempPath: tempPath}, &SavedFileTracker{Request: httpRequest})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
testhelper.AssertResponseCode(t, response, testCase.code)
}
}
......@@ -441,7 +480,12 @@ func TestUploadHandlerRemovingExif(t *testing.T) {
response := httptest.NewRecorder()
handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 200)
}
......@@ -478,7 +522,12 @@ func TestUploadHandlerRemovingInvalidExif(t *testing.T) {
response := httptest.NewRecorder()
handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
HandleFileUploads(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
testhelper.AssertResponseCode(t, response, 422)
}
......
......@@ -14,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/channel"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
......@@ -42,6 +43,13 @@ type routeOptions struct {
matchers []matcherFunc
}
type uploadPreparers struct {
artifacts upload.Preparer
lfs upload.Preparer
packages upload.Preparer
uploads upload.Preparer
}
const (
apiPattern = `^/api/`
ciAPIPattern = `^/ci/api/`
......@@ -166,8 +174,9 @@ func (u *upstream) configureRoutes() {
signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
signingProxy := buildProxy(u.Backend, u.Version, signingTripper)
preparers := createUploadPreparers(u.Config)
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
uploadAccelerateProxy := upload.Accelerate(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy)
uploadAccelerateProxy := upload.Accelerate(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparers.uploads)
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
......@@ -185,11 +194,11 @@ func (u *upstream) configureRoutes() {
route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))),
route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy), withMatcher(isContentType("application/octet-stream"))),
route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))),
// CI Artifacts
route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy))),
route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy))),
route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))),
route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))),
// ActionCable websocket
wsRoute(`^/-/cable\z`, cableProxy),
......@@ -206,29 +215,29 @@ func (u *upstream) configureRoutes() {
route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling),
// Maven Artifact Repository
route("PUT", apiPattern+`v4/projects/[0-9]+/packages/maven/`, upload.BodyUploader(api, signingProxy, nil)),
route("PUT", apiPattern+`v4/projects/[0-9]+/packages/maven/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
// Conan Artifact Repository
route("PUT", apiPattern+`v4/packages/conan/`, upload.BodyUploader(api, signingProxy, nil)),
route("PUT", apiPattern+`v4/packages/conan/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
// NuGet Artifact Repository
route("PUT", apiPattern+`v4/projects/[0-9]+/packages/nuget/`, upload.Accelerate(api, signingProxy)),
route("PUT", apiPattern+`v4/projects/[0-9]+/packages/nuget/`, upload.Accelerate(api, signingProxy, preparers.packages)),
// PyPI Artifact Repository
route("POST", apiPattern+`v4/projects/[0-9]+/packages/pypi`, upload.Accelerate(api, signingProxy)),
route("POST", apiPattern+`v4/projects/[0-9]+/packages/pypi`, upload.Accelerate(api, signingProxy, preparers.packages)),
// We are porting API to disk acceleration
// we need to declare each routes until we have fixed all the routes on the rails codebase.
// Overall status can be seen at https://gitlab.com/groups/gitlab-org/-/epics/1802#current-status
route("POST", apiPattern+`v4/projects/[0-9]+/wikis/attachments\z`, uploadAccelerateProxy),
route("POST", apiPattern+`graphql\z`, uploadAccelerateProxy),
route("POST", apiPattern+`v4/groups/import`, upload.Accelerate(api, signingProxy)),
route("POST", apiPattern+`v4/projects/import`, upload.Accelerate(api, signingProxy)),
route("POST", apiPattern+`v4/groups/import`, upload.Accelerate(api, signingProxy, preparers.uploads)),
route("POST", apiPattern+`v4/projects/import`, upload.Accelerate(api, signingProxy, preparers.uploads)),
// Project Import via UI upload acceleration
route("POST", importPattern+`gitlab_project`, upload.Accelerate(api, signingProxy)),
route("POST", importPattern+`gitlab_project`, upload.Accelerate(api, signingProxy, preparers.uploads)),
// Group Import via UI upload acceleration
route("POST", importPattern+`gitlab_group`, upload.Accelerate(api, signingProxy)),
route("POST", importPattern+`gitlab_group`, upload.Accelerate(api, signingProxy, preparers.uploads)),
// Explicitly proxy API requests
route("", apiPattern, proxy),
......@@ -246,9 +255,9 @@ func (u *upstream) configureRoutes() {
),
// Uploads
route("POST", projectPattern+`uploads\z`, upload.Accelerate(api, signingProxy)),
route("POST", snippetUploadPattern, upload.Accelerate(api, signingProxy)),
route("POST", userUploadPattern, upload.Accelerate(api, signingProxy)),
route("POST", projectPattern+`uploads\z`, upload.Accelerate(api, signingProxy, preparers.uploads)),
route("POST", snippetUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)),
route("POST", userUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)),
// For legacy reasons, user uploads are stored under the document root.
// To prevent anybody who knows/guesses the URL of a user-uploaded file
......@@ -269,6 +278,17 @@ func (u *upstream) configureRoutes() {
}
}
func createUploadPreparers(cfg config.Config) uploadPreparers {
defaultPreparer := &upload.DefaultPreparer{}
return uploadPreparers{
artifacts: defaultPreparer,
lfs: lfs.NewLfsUploadPreparer(cfg),
packages: defaultPreparer,
uploads: defaultPreparer,
}
}
func denyWebsocket(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if websocket.IsWebSocketUpgrade(r) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment