Commit 30e944a4 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Merge branch '351657-refactor-upload-destinations' into 'master'

Refactor upload destinations

See merge request gitlab-org/gitlab!80292
parents d9ff351f 95d9c5fd
......@@ -17,8 +17,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) {
......
......@@ -19,10 +19,10 @@ import (
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
......@@ -60,7 +60,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyPro
w.Write(data)
})
mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) {
opts, err := filestore.GetOpts(authResponse)
opts, err := destination.GetOpts(authResponse)
require.NoError(t, err)
if r.Method != "POST" {
......
......@@ -16,8 +16,8 @@ import (
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
)
......@@ -35,7 +35,7 @@ var zipSubcommandsErrorsCounter = promauto.NewCounterVec(
}, []string{"error"})
type artifactsUploadProcessor struct {
opts *filestore.SaveFileOpts
opts *destination.UploadOpts
format string
SavedFileTracker
......@@ -57,11 +57,11 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
}, "/authorize")
}
func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *filestore.FileHandler) (*filestore.FileHandler, error) {
func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler) (*destination.FileHandler, error) {
metaReader, metaWriter := io.Pipe()
defer metaWriter.Close()
metaOpts := &filestore.SaveFileOpts{
metaOpts := &destination.UploadOpts{
LocalTempPath: a.opts.LocalTempPath,
TempFilePrefix: "metadata.gz",
}
......@@ -86,12 +86,12 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
type saveResult struct {
error
*filestore.FileHandler
*destination.FileHandler
}
done := make(chan saveResult)
go func() {
var result saveResult
result.FileHandler, result.error = filestore.SaveFileFromReader(ctx, metaReader, -1, metaOpts)
result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, metaOpts)
done <- result
}()
......@@ -119,7 +119,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
return result.FileHandler, result.error
}
func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error {
func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error {
// ProcessFile for artifacts requires file form-data field name to eq `file`
if formName != "file" {
......
......@@ -8,8 +8,8 @@ import (
"strings"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
// RequestBody is a request middleware. It will store the request body to
......@@ -23,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return
}
fh, err := filestore.SaveFileFromReader(r.Context(), r.Body, r.ContentLength, opts)
fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, opts)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err))
return
......
......@@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
const (
......@@ -169,8 +169,8 @@ type alwaysLocalPreparer struct {
prepareError error
}
func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
opts, err := filestore.GetOpts(&api.Response{TempPath: os.TempDir()})
func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()})
if err != nil {
return nil, nil, err
}
......@@ -180,7 +180,7 @@ func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts,
type alwaysFailsVerifier struct{}
func (alwaysFailsVerifier) Verify(handler *filestore.FileHandler) error {
func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error {
return fmt.Errorf("Verification failed")
}
......@@ -188,7 +188,7 @@ type mockVerifier struct {
invoked bool
}
func (m *mockVerifier) Verify(handler *filestore.FileHandler) error {
func (m *mockVerifier) Verify(handler *destination.FileHandler) error {
m.invoked = true
return nil
......
package filestore
package destination
import (
"context"
......@@ -14,8 +14,9 @@ import (
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
)
type SizeError error
......@@ -107,9 +108,9 @@ type consumer interface {
Consume(context.Context, io.Reader, time.Time) (int64, error)
}
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (*FileHandler, error) {
func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts) (*FileHandler, error) {
fh := &FileHandler{
Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID,
......@@ -126,7 +127,7 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
switch {
case opts.IsLocal():
clientMode = "local"
uploadDestination, err = fh.uploadLocalFile(ctx, opts)
uploadDestination, err = fh.newLocalFile(ctx, opts)
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud():
clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
p := &objectstore.GoCloudObjectParams{
......@@ -210,16 +211,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
return fh, nil
}
func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) (consumer, error) {
func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (consumer, error) {
// make sure TempFolder exists
err := os.MkdirAll(opts.LocalTempPath, 0700)
if err != nil {
return nil, fmt.Errorf("uploadLocalFile: mkdir %q: %v", opts.LocalTempPath, err)
return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err)
}
file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix)
if err != nil {
return nil, fmt.Errorf("uploadLocalFile: create file: %v", err)
return nil, fmt.Errorf("newLocalFile: create file: %v", err)
}
go func() {
......@@ -228,32 +229,5 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts)
}()
fh.LocalPath = file.Name()
return &localUpload{file}, nil
}
type localUpload struct{ io.WriteCloser }
func (loc *localUpload) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) {
n, err := io.Copy(loc.WriteCloser, r)
errClose := loc.Close()
if err == nil {
err = errClose
}
return n, err
}
// SaveFileFromDisk open the local file fileName and calls SaveFileFromReader
func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) {
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return nil, err
}
return SaveFileFromReader(ctx, file, fi.Size(), opts)
return &filestore.LocalFile{File: file}, nil
}
package filestore_test
package destination_test
import (
"context"
......@@ -17,13 +17,13 @@ import (
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func testDeadline() time.Time {
return time.Now().Add(filestore.DefaultObjectStoreTimeout)
return time.Now().Add(destination.DefaultObjectStoreTimeout)
}
func requireFileGetsRemovedAsync(t *testing.T, filePath string) {
......@@ -39,7 +39,7 @@ func requireObjectStoreDeletedAsync(t *testing.T, expectedDeletes int, osStub *t
require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeletes }, time.Second, time.Millisecond, "Object not deleted")
}
func TestSaveFileWrongSize(t *testing.T) {
func TestUploadWrongSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -47,15 +47,15 @@ func TestSaveFileWrongSize(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts)
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts)
require.Error(t, err)
_, isSizeError := err.(filestore.SizeError)
_, isSizeError := err.(destination.SizeError)
require.True(t, isSizeError, "Should fail with SizeError")
require.Nil(t, fh)
}
func TestSaveFileWithKnownSizeExceedLimit(t *testing.T) {
func TestUploadWithKnownSizeExceedLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -63,15 +63,15 @@ func TestSaveFileWithKnownSizeExceedLimit(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
require.Error(t, err)
_, isSizeError := err.(filestore.SizeError)
_, isSizeError := err.(destination.SizeError)
require.True(t, isSizeError, "Should fail with SizeError")
require.Nil(t, fh)
}
func TestSaveFileWithUnknownSizeExceedLimit(t *testing.T) {
func TestUploadWithUnknownSizeExceedLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -79,22 +79,13 @@ func TestSaveFileWithUnknownSizeExceedLimit(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), -1, opts)
require.Equal(t, err, filestore.ErrEntityTooLarge)
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, opts)
require.Equal(t, err, destination.ErrEntityTooLarge)
require.Nil(t, fh)
}
func TestSaveFromDiskNotExistingFile(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromDisk(ctx, "/I/do/not/exist", &filestore.SaveFileOpts{})
require.Error(t, err, "SaveFileFromDisk should fail")
require.True(t, os.IsNotExist(err), "Provided file should not exists")
require.Nil(t, fh, "On error FileHandler should be nil")
}
func TestSaveFileWrongETag(t *testing.T) {
func TestUploadWrongETag(t *testing.T) {
tests := []struct {
name string
multipart bool
......@@ -110,7 +101,7 @@ func TestSaveFileWrongETag(t *testing.T) {
objectURL := ts.URL + test.ObjectPath
opts := &filestore.SaveFileOpts{
opts := &destination.UploadOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
......@@ -126,7 +117,7 @@ func TestSaveFileWrongETag(t *testing.T) {
osStub.InitiateMultipartUpload(test.ObjectPath)
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
require.Nil(t, fh)
require.Error(t, err)
require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded")
......@@ -138,32 +129,7 @@ func TestSaveFileWrongETag(t *testing.T) {
}
}
func TestSaveFileFromDiskToLocalPath(t *testing.T) {
f, err := ioutil.TempFile("", "workhorse-test")
require.NoError(t, err)
defer os.Remove(f.Name())
_, err = fmt.Fprint(f, test.ObjectContent)
require.NoError(t, err)
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder}
fh, err := filestore.SaveFileFromDisk(ctx, f.Name(), opts)
require.NoError(t, err)
require.NotNil(t, fh)
require.NotEmpty(t, fh.LocalPath, "File not persisted on disk")
_, err = os.Stat(fh.LocalPath)
require.NoError(t, err)
}
func TestSaveFile(t *testing.T) {
func TestUpload(t *testing.T) {
testhelper.ConfigureSecret()
type remote int
......@@ -189,7 +155,7 @@ func TestSaveFile(t *testing.T) {
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
var opts filestore.SaveFileOpts
var opts destination.UploadOpts
var expectedDeletes, expectedPuts int
osStub, ts := test.StartObjectStore()
......@@ -231,7 +197,7 @@ func TestSaveFile(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
require.NotNil(t, fh)
......@@ -279,7 +245,7 @@ func TestSaveFile(t *testing.T) {
}
}
func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
func TestUploadWithS3WorkhorseClient(t *testing.T) {
tests := []struct {
name string
objectSize int64
......@@ -298,7 +264,7 @@ func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
name: "unknown object size with limit",
objectSize: -1,
maxSize: test.ObjectSize - 1,
expectedErr: filestore.ErrEntityTooLarge,
expectedErr: destination.ErrEntityTooLarge,
},
}
......@@ -312,12 +278,12 @@ func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
opts := destination.UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
ObjectStorageConfig: destination.ObjectStorageConfig{
Provider: "AWS",
S3Credentials: s3Creds,
S3Config: s3Config,
......@@ -325,7 +291,7 @@ func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
MaximumSize: tc.maxSize,
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts)
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts)
if tc.expectedErr == nil {
require.NoError(t, err)
......@@ -338,7 +304,7 @@ func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
}
}
func TestSaveFileWithAzureWorkhorseClient(t *testing.T) {
func TestUploadWithAzureWorkhorseClient(t *testing.T) {
mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
defer cleanup()
......@@ -346,48 +312,48 @@ func TestSaveFileWithAzureWorkhorseClient(t *testing.T) {
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
opts := destination.UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
ObjectStorageConfig: destination.ObjectStorageConfig{
Provider: "AzureRM",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
test.GoCloudObjectExists(t, bucketDir, remoteObject)
}
func TestSaveFileWithUnknownGoCloudScheme(t *testing.T) {
func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := new(blob.URLMux)
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
opts := destination.UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
ObjectStorageConfig: destination.ObjectStorageConfig{
Provider: "SomeCloud",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.Error(t, err)
}
func TestSaveMultipartInBodyFailure(t *testing.T) {
func TestUploadMultipartInBodyFailure(t *testing.T) {
osStub, ts := test.StartObjectStore()
defer ts.Close()
......@@ -395,7 +361,7 @@ func TestSaveMultipartInBodyFailure(t *testing.T) {
// this is the only way to get an in-body failure from our ObjectStoreStub
objectPath := "/bucket-but-no-object-key"
objectURL := ts.URL + objectPath
opts := filestore.SaveFileOpts{
opts := destination.UploadOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PartSize: test.ObjectSize,
......@@ -409,13 +375,13 @@ func TestSaveMultipartInBodyFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.Nil(t, fh)
require.Error(t, err)
require.EqualError(t, err, test.MultipartUploadInternalError().Error())
}
func TestSaveRemoteFileWithLimit(t *testing.T) {
func TestUploadRemoteFileWithLimit(t *testing.T) {
testhelper.ConfigureSecret()
type remote int
......@@ -449,20 +415,20 @@ func TestSaveRemoteFileWithLimit(t *testing.T) {
testData: test.ObjectContent,
objectSize: -1,
maxSize: test.ObjectSize - 1,
expectedErr: filestore.ErrEntityTooLarge,
expectedErr: destination.ErrEntityTooLarge,
},
{
name: "large object with unknown size with limit",
testData: string(make([]byte, 20000)),
objectSize: -1,
maxSize: 19000,
expectedErr: filestore.ErrEntityTooLarge,
expectedErr: destination.ErrEntityTooLarge,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var opts filestore.SaveFileOpts
var opts destination.UploadOpts
for _, remoteType := range remoteTypes {
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
......@@ -502,7 +468,7 @@ func TestSaveRemoteFileWithLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts)
fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts)
if tc.expectedErr == nil {
require.NoError(t, err)
......@@ -516,7 +482,7 @@ func TestSaveRemoteFileWithLimit(t *testing.T) {
}
}
func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields map[string]string, prefix string) {
func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, fields map[string]string, prefix string) {
key := func(field string) string {
if prefix == "" {
return field
......
package filestore
import (
"context"
"io"
"time"
)
type LocalFile struct {
File io.WriteCloser
}
func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) {
n, err := io.Copy(lf.File, r)
errClose := lf.File.Close()
if err == nil {
err = errClose
}
return n, err
}
package filestore
import (
"context"
"io/ioutil"
"os"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestConsume(t *testing.T) {
f, err := ioutil.TempFile("", "filestore-local-file")
if f != nil {
defer os.Remove(f.Name())
}
require.NoError(t, err)
defer f.Close()
localFile := &LocalFile{File: f}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
content := "file content"
reader := strings.NewReader(content)
var deadline time.Time
n, err := localFile.Consume(ctx, reader, deadline)
require.NoError(t, err)
require.Equal(t, int64(len(content)), n)
consumedContent, err := ioutil.ReadFile(f.Name())
require.NoError(t, err)
require.Equal(t, content, string(consumedContent))
}
......@@ -9,9 +9,9 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func TestGoCloudObjectUpload(t *testing.T) {
......
......@@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func TestMultipartUploadWithUpcaseETags(t *testing.T) {
......
......@@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
const testTimeout = 10 * time.Second
......
......@@ -18,9 +18,9 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
type failedReader struct {
......
......@@ -13,7 +13,7 @@ import (
"strings"
"sync"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
)
type partsEtagMap map[int]string
......
package filestore
package destination
import (
"errors"
......@@ -27,8 +27,8 @@ type ObjectStorageConfig struct {
GoCloudConfig config.GoCloudConfig
}
// SaveFileOpts represents all the options available for saving a file to object store
type SaveFileOpts struct {
// UploadOpts represents all the options available for saving a file to object store
type UploadOpts struct {
// TempFilePrefix is the prefix used to create temporary local file
TempFilePrefix string
// LocalTempPath is the directory where to write a local copy of the file
......@@ -66,28 +66,28 @@ type SaveFileOpts struct {
}
// UseWorkhorseClientEnabled checks if the options require direct access to object storage
func (s *SaveFileOpts) UseWorkhorseClientEnabled() bool {
func (s *UploadOpts) UseWorkhorseClientEnabled() bool {
return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != ""
}
// IsLocal checks if the options require the writing of the file on disk
func (s *SaveFileOpts) IsLocal() bool {
func (s *UploadOpts) IsLocal() bool {
return s.LocalTempPath != ""
}
// IsMultipart checks if the options requires a Multipart upload
func (s *SaveFileOpts) IsMultipart() bool {
func (s *UploadOpts) IsMultipart() bool {
return s.PartSize > 0
}
// GetOpts converts GitLab api.Response to a proper SaveFileOpts
func GetOpts(apiResponse *api.Response) (*SaveFileOpts, error) {
// GetOpts converts GitLab api.Response to a proper UploadOpts
func GetOpts(apiResponse *api.Response) (*UploadOpts, error) {
timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second
if timeout == 0 {
timeout = DefaultObjectStoreTimeout
}
opts := SaveFileOpts{
opts := UploadOpts{
LocalTempPath: apiResponse.TempPath,
RemoteID: apiResponse.RemoteObject.ID,
RemoteURL: apiResponse.RemoteObject.GetURL,
......
package filestore_test
package destination_test
import (
"testing"
......@@ -8,11 +8,11 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func TestSaveFileOptsLocalAndRemote(t *testing.T) {
func TestUploadOptsLocalAndRemote(t *testing.T) {
tests := []struct {
name string
localTempPath string
......@@ -43,7 +43,7 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
opts := filestore.SaveFileOpts{
opts := destination.UploadOpts{
LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut,
PartSize: test.partSize,
......@@ -106,7 +106,7 @@ func TestGetOpts(t *testing.T) {
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse)
opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err)
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
......@@ -155,22 +155,22 @@ func TestGetOptsFail(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, err := filestore.GetOpts(tc.in)
_, err := destination.GetOpts(tc.in)
require.Error(t, err, "expect input to be rejected")
})
}
}
func TestGetOptsDefaultTimeout(t *testing.T) {
deadline := time.Now().Add(filestore.DefaultObjectStoreTimeout)
opts, err := filestore.GetOpts(&api.Response{TempPath: "/foo/bar"})
deadline := time.Now().Add(destination.DefaultObjectStoreTimeout)
opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"})
require.NoError(t, err)
require.WithinDuration(t, deadline, opts.Deadline, time.Minute)
}
func TestUseWorkhorseClientEnabled(t *testing.T) {
cfg := filestore.ObjectStorageConfig{
cfg := destination.ObjectStorageConfig{
Provider: "AWS",
S3Config: config.S3Config{
Bucket: "test-bucket",
......@@ -195,7 +195,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name string
UseWorkhorseClient bool
remoteTempObjectID string
objectStorageConfig filestore.ObjectStorageConfig
objectStorageConfig destination.ObjectStorageConfig
expected bool
}{
{
......@@ -243,7 +243,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name: "missing S3 bucket",
UseWorkhorseClient: true,
remoteTempObjectID: "test-object",
objectStorageConfig: filestore.ObjectStorageConfig{
objectStorageConfig: destination.ObjectStorageConfig{
Provider: "AWS",
S3Config: config.S3Config{},
},
......@@ -269,7 +269,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse)
opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err)
opts.ObjectStorageConfig = test.objectStorageConfig
......@@ -323,7 +323,7 @@ func TestGoCloudConfig(t *testing.T) {
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse)
opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err)
opts.ObjectStorageConfig.URLMux = mux
......
......@@ -5,7 +5,7 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
type object struct {
......@@ -13,7 +13,7 @@ type object struct {
oid string
}
func (l *object) Verify(fh *filestore.FileHandler) error {
func (l *object) Verify(fh *destination.FileHandler) error {
if fh.Size != l.size {
return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size)
}
......@@ -35,7 +35,7 @@ func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer {
return &uploadPreparer{objectPreparer: objectPreparer}
}
func (l *uploadPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, _, err := l.objectPreparer.Prepare(a)
if err != nil {
return nil, nil, err
......
......@@ -3,7 +3,7 @@ package upload
import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
type ObjectStoragePreparer struct {
......@@ -18,8 +18,8 @@ func NewObjectStoragePreparer(c config.Config) Preparer {
return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig}
}
func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
opts, err := filestore.GetOpts(a)
func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := destination.GetOpts(a)
if err != nil {
return nil, nil, err
}
......
......@@ -2,7 +2,7 @@ package upload
import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
// Verifier is an optional pluggable behavior for upload paths. If
......@@ -12,22 +12,22 @@ import (
// checksum of the uploaded file.
type Verifier interface {
// Verify can abort the upload by returning an error
Verify(handler *filestore.FileHandler) error
Verify(handler *destination.FileHandler) error
}
// Preparer is a pluggable behavior that interprets a Rails API response
// and either tells Workhorse how to handle the upload, via the
// SaveFileOpts and Verifier, or it rejects the request by returning a
// UploadOpts and Verifier, or it rejects the request by returning a
// non-nil error. Its intended use is to make sure the upload gets stored
// in the right location: either a local directory, or one of several
// supported object storage backends.
type Preparer interface {
Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error)
Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error)
}
type DefaultPreparer struct{}
func (s *DefaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
opts, err := filestore.GetOpts(a)
func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := destination.GetOpts(a)
return opts, nil, err
}
......@@ -21,8 +21,8 @@ import (
"golang.org/x/image/tiff"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
)
......@@ -68,7 +68,7 @@ type rewriter struct {
finalizedFields map[string]bool
}
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) error {
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) error {
// Create multipart reader
reader, err := r.MultipartReader()
if err != nil {
......@@ -128,7 +128,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s
return params["name"], params["filename"]
}
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *filestore.SaveFileOpts) error {
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *destination.UploadOpts) error {
if rew.filter.Count() >= maxFilesAllowed {
return ErrTooManyFilesUploaded
}
......@@ -164,10 +164,10 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
defer inputReader.Close()
fh, err := filestore.SaveFileFromReader(ctx, inputReader, -1, opts)
fh, err := destination.Upload(ctx, inputReader, -1, opts)
if err != nil {
switch err {
case filestore.ErrEntityTooLarge, exif.ErrRemovingExif:
case destination.ErrEntityTooLarge, exif.ErrRemovingExif:
return err
default:
return fmt.Errorf("persisting multipart file: %v", err)
......
......@@ -6,8 +6,8 @@ import (
"mime/multipart"
"net/http"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
type SavedFileTracker struct {
......@@ -26,7 +26,7 @@ func (s *SavedFileTracker) Count() int {
return len(s.rewrittenFields)
}
func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *filestore.FileHandler, _ *multipart.Writer) error {
func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler, _ *multipart.Writer) error {
if _, ok := s.rewrittenFields[fieldName]; ok {
return fmt.Errorf("the %v field has already been processed", fieldName)
}
......
......@@ -10,8 +10,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
func TestSavedFileTracking(t *testing.T) {
......@@ -23,7 +23,7 @@ func TestSavedFileTracking(t *testing.T) {
tracker := SavedFileTracker{Request: r}
require.Equal(t, "accelerate", tracker.Name())
file := &filestore.FileHandler{}
file := &destination.FileHandler{}
ctx := context.Background()
tracker.ProcessFile(ctx, "test", file, nil)
require.Equal(t, 1, tracker.Count())
......@@ -40,7 +40,7 @@ func TestSavedFileTracking(t *testing.T) {
func TestDuplicatedFileProcessing(t *testing.T) {
tracker := SavedFileTracker{}
file := &filestore.FileHandler{}
file := &destination.FileHandler{}
require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil))
......
......@@ -11,8 +11,8 @@ import (
"github.com/golang-jwt/jwt/v4"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
)
......@@ -31,7 +31,7 @@ type MultipartClaims struct {
// MultipartFormProcessor abstracts away implementation differences
// between generic MIME multipart file uploads and CI artifact uploads.
type MultipartFormProcessor interface {
ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error
ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error
ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error
Finalize(ctx context.Context) error
Name() string
......@@ -40,7 +40,7 @@ type MultipartFormProcessor interface {
// interceptMultipartFiles is the core of the implementation of
// Multipart.
func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) {
func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) {
var body bytes.Buffer
writer := multipart.NewWriter(&body)
defer writer.Close()
......@@ -55,7 +55,7 @@ func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand
helper.CaptureAndFail(w, r, err, err.Error(), http.StatusBadRequest)
case http.ErrNotMultipart:
h.ServeHTTP(w, r)
case filestore.ErrEntityTooLarge:
case destination.ErrEntityTooLarge:
helper.RequestEntityTooLarge(w, r, err)
case zipartifacts.ErrBadMetadata:
helper.RequestEntityTooLarge(w, r, err)
......
......@@ -22,9 +22,9 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment