Commit 8fcefd58 authored by Jacob Vosmaer's avatar Jacob Vosmaer Committed by Alessio Caiazza

Further simplify remote/local upload code

parent c8575e0b
---
title: Further simplify remote/local upload code
merge_request: 602
author:
type: changed
...@@ -98,10 +98,17 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e ...@@ -98,10 +98,17 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e
return data, nil return data, nil
} }
// Upload represents a destination where we store an upload
type uploadWriter interface {
io.WriteCloser
CloseWithError(error) error
ETag() string
}
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done // SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails. // Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) { func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
var remoteWriter objectstore.Upload var uploadWriter uploadWriter
fh = &FileHandler{ fh = &FileHandler{
Name: opts.TempFilePrefix, Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID, RemoteID: opts.RemoteID,
...@@ -118,76 +125,65 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -118,76 +125,65 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}() }()
var clientMode string var clientMode string
if !opts.IsLocal() {
switch {
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud():
clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
p := &objectstore.GoCloudObjectParams{
Ctx: ctx,
Mux: opts.ObjectStorageConfig.URLMux,
BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL,
ObjectName: opts.RemoteTempObjectID,
Deadline: opts.Deadline,
}
remoteWriter, err = objectstore.NewGoCloudObject(p)
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid():
clientMode = "s3"
remoteWriter, err = objectstore.NewS3Object(
ctx,
opts.RemoteTempObjectID,
opts.ObjectStorageConfig.S3Credentials,
opts.ObjectStorageConfig.S3Config,
opts.Deadline,
)
case opts.IsMultipart():
clientMode = "multipart"
remoteWriter, err = objectstore.NewMultipart(
ctx,
opts.PresignedParts,
opts.PresignedCompleteMultipart,
opts.PresignedAbortMultipart,
opts.PresignedDelete,
opts.PutHeaders,
opts.Deadline,
opts.PartSize,
)
default:
clientMode = "http"
remoteWriter, err = objectstore.NewObject(
ctx,
opts.PresignedPut,
opts.PresignedDelete,
opts.PutHeaders,
opts.Deadline,
size,
)
}
if err != nil { switch {
return nil, err case opts.IsLocal():
clientMode = "local"
uploadWriter, err = fh.uploadLocalFile(ctx, opts)
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud():
clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
p := &objectstore.GoCloudObjectParams{
Ctx: ctx,
Mux: opts.ObjectStorageConfig.URLMux,
BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL,
ObjectName: opts.RemoteTempObjectID,
Deadline: opts.Deadline,
} }
uploadWriter, err = objectstore.NewGoCloudObject(p)
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid():
clientMode = "s3"
uploadWriter, err = objectstore.NewS3Object(
ctx,
opts.RemoteTempObjectID,
opts.ObjectStorageConfig.S3Credentials,
opts.ObjectStorageConfig.S3Config,
opts.Deadline,
)
case opts.IsMultipart():
clientMode = "multipart"
uploadWriter, err = objectstore.NewMultipart(
ctx,
opts.PresignedParts,
opts.PresignedCompleteMultipart,
opts.PresignedAbortMultipart,
opts.PresignedDelete,
opts.PutHeaders,
opts.Deadline,
opts.PartSize,
)
default:
clientMode = "http"
uploadWriter, err = objectstore.NewObject(
ctx,
opts.PresignedPut,
opts.PresignedDelete,
opts.PutHeaders,
opts.Deadline,
size,
)
}
writers = append(writers, remoteWriter) if err != nil {
return nil, err
}
defer func() { writers = append(writers, uploadWriter)
if err != nil {
remoteWriter.CloseWithError(err)
}
}()
} else {
clientMode = "local"
fileWriter, err := fh.uploadLocalFile(ctx, opts) defer func() {
if err != nil { if err != nil {
return nil, err uploadWriter.CloseWithError(err)
} }
}()
writers = append(writers, fileWriter)
}
if len(writers) == 1 {
return nil, errors.New("missing upload destination")
}
if opts.MaximumSize > 0 { if opts.MaximumSize > 0 {
if size > opts.MaximumSize { if size > opts.MaximumSize {
...@@ -233,24 +229,22 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -233,24 +229,22 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
fh.hashes = hashes.finish() fh.hashes = hashes.finish()
if !opts.IsLocal() { // we need to close the writer in order to get ETag header
// we need to close the writer in order to get ETag header err = uploadWriter.Close()
err = remoteWriter.Close() if err != nil {
if err != nil { if err == objectstore.ErrNotEnoughParts {
if err == objectstore.ErrNotEnoughParts { return nil, ErrEntityTooLarge
return nil, ErrEntityTooLarge
}
return nil, err
} }
return nil, err
etag := remoteWriter.ETag()
fh.hashes["etag"] = etag
} }
etag := uploadWriter.ETag()
fh.hashes["etag"] = etag
return fh, err return fh, err
} }
func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) (io.WriteCloser, error) { func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) (uploadWriter, error) {
// make sure TempFolder exists // make sure TempFolder exists
err := os.MkdirAll(opts.LocalTempPath, 0700) err := os.MkdirAll(opts.LocalTempPath, 0700)
if err != nil { if err != nil {
...@@ -268,9 +262,14 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) ...@@ -268,9 +262,14 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts)
}() }()
fh.LocalPath = file.Name() fh.LocalPath = file.Name()
return file, nil return &nopUpload{file}, nil
} }
type nopUpload struct{ io.WriteCloser }
func (nop *nopUpload) CloseWithError(error) error { return nop.Close() }
func (nop *nopUpload) ETag() string { return "" }
// SaveFileFromDisk open the local file fileName and calls SaveFileFromReader // SaveFileFromDisk open the local file fileName and calls SaveFileFromReader
func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) { func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) {
file, err := os.Open(fileName) file, err := os.Open(fileName)
......
...@@ -280,14 +280,14 @@ func TestSaveFile(t *testing.T) { ...@@ -280,14 +280,14 @@ func TestSaveFile(t *testing.T) {
fields, err := fh.GitLabFinalizeFields("file") fields, err := fh.GitLabFinalizeFields("file")
require.NoError(t, err) require.NoError(t, err)
checkFileHandlerWithFields(t, fh, fields, "file", spec.remote == notRemote) checkFileHandlerWithFields(t, fh, fields, "file")
token, jwtErr := jwt.ParseWithClaims(fields["file.gitlab-workhorse-upload"], &testhelper.UploadClaims{}, testhelper.ParseJWT) token, jwtErr := jwt.ParseWithClaims(fields["file.gitlab-workhorse-upload"], &testhelper.UploadClaims{}, testhelper.ParseJWT)
require.NoError(t, jwtErr) require.NoError(t, jwtErr)
uploadFields := token.Claims.(*testhelper.UploadClaims).Upload uploadFields := token.Claims.(*testhelper.UploadClaims).Upload
checkFileHandlerWithFields(t, fh, uploadFields, "", spec.remote == notRemote) checkFileHandlerWithFields(t, fh, uploadFields, "")
}) })
} }
} }
...@@ -395,7 +395,7 @@ func TestSaveMultipartInBodyFailure(t *testing.T) { ...@@ -395,7 +395,7 @@ func TestSaveMultipartInBodyFailure(t *testing.T) {
require.EqualError(t, err, test.MultipartUploadInternalError().Error()) require.EqualError(t, err, test.MultipartUploadInternalError().Error())
} }
func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields map[string]string, prefix string, remote bool) { func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields map[string]string, prefix string) {
key := func(field string) string { key := func(field string) string {
if prefix == "" { if prefix == "" {
return field return field
...@@ -413,9 +413,5 @@ func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields ...@@ -413,9 +413,5 @@ func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields
require.Equal(t, test.ObjectSHA1, fields[key("sha1")]) require.Equal(t, test.ObjectSHA1, fields[key("sha1")])
require.Equal(t, test.ObjectSHA256, fields[key("sha256")]) require.Equal(t, test.ObjectSHA256, fields[key("sha256")])
require.Equal(t, test.ObjectSHA512, fields[key("sha512")]) require.Equal(t, test.ObjectSHA512, fields[key("sha512")])
if remote { require.Contains(t, fields, key("etag"))
require.NotContains(t, fields, key("etag"))
} else {
require.Contains(t, fields, key("etag"))
}
} }
...@@ -14,13 +14,6 @@ import ( ...@@ -14,13 +14,6 @@ import (
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
) )
// Upload represents an upload to an ObjectStorage provider
type Upload interface {
io.WriteCloser
CloseWithError(error) error
ETag() string
}
// uploader is an io.WriteCloser that can be used as write end of the uploading pipe. // uploader is an io.WriteCloser that can be used as write end of the uploading pipe.
type uploader struct { type uploader struct {
// etag is the object storage provided checksum // etag is the object storage provided checksum
......
...@@ -123,7 +123,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { ...@@ -123,7 +123,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
require.Equal(t, hash, r.FormValue("file."+algo), "file hash %s", algo) require.Equal(t, hash, r.FormValue("file."+algo), "file hash %s", algo)
} }
require.Len(t, r.MultipartForm.Value, 11, "multipart form values") require.Len(t, r.MultipartForm.Value, 12, "multipart form values")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprint(w, "RESPONSE") fmt.Fprint(w, "RESPONSE")
......
...@@ -79,7 +79,7 @@ func uploadTestServer(t *testing.T, extraTests func(r *http.Request)) *httptest. ...@@ -79,7 +79,7 @@ func uploadTestServer(t *testing.T, extraTests func(r *http.Request)) *httptest.
require.NoError(t, r.ParseMultipartForm(100000)) require.NoError(t, r.ParseMultipartForm(100000))
const nValues = 10 // file name, path, remote_url, remote_id, size, md5, sha1, sha256, sha512, gitlab-workhorse-upload for just the upload (no metadata because we are not POSTing a valid zip file) const nValues = 11 // file name, path, remote_url, remote_id, size, md5, sha1, sha256, sha512, gitlab-workhorse-upload, etag for just the upload (no metadata because we are not POSTing a valid zip file)
require.Len(t, r.MultipartForm.Value, nValues) require.Len(t, r.MultipartForm.Value, nValues)
require.Empty(t, r.MultipartForm.File, "multipart form files") require.Empty(t, r.MultipartForm.File, "multipart form files")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment