Commit 95d9c5fd authored by Patrick Bajao's avatar Patrick Bajao

Refactor upload destinations

In https://gitlab.com/gitlab-org/gitlab/-/issues/351657, the
goal is to document and refactor workhorse code related to
object storage upload. This is to improve readability and
help new contributors.

This include the following changes:
- Created `destination` package to consolidate upload destination
  related logic. Used this name so we're consistent with the
  language we use for upload destination.
- Moved `objectstore` and `filestore` packages under destination
  package since these are the store used as destination for object
  storage upload.
- Made `filestore` package specific for storing to disk. This is to
  clarify that `filestore` package is responsible for uploading to
  disk instead of its previous responsibilities.
- Renamed `SaveFileFromReader` to `Upload` to be consistent with the
  language of object storage upload. The previous name is a bit too
  tied to implementation details.
- Renamed `SaveFileOpts` to `UploadOpts` to be consistent with language.
- Removed `SaveFileFromDisk` as it's not being used at all.
parent ac28e3ae
...@@ -17,8 +17,8 @@ import ( ...@@ -17,8 +17,8 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) { func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) {
......
...@@ -19,10 +19,10 @@ import ( ...@@ -19,10 +19,10 @@ import (
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
...@@ -60,7 +60,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyPro ...@@ -60,7 +60,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyPro
w.Write(data) w.Write(data)
}) })
mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) {
opts, err := filestore.GetOpts(authResponse) opts, err := destination.GetOpts(authResponse)
require.NoError(t, err) require.NoError(t, err)
if r.Method != "POST" { if r.Method != "POST" {
......
...@@ -16,8 +16,8 @@ import ( ...@@ -16,8 +16,8 @@ import (
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
) )
...@@ -35,7 +35,7 @@ var zipSubcommandsErrorsCounter = promauto.NewCounterVec( ...@@ -35,7 +35,7 @@ var zipSubcommandsErrorsCounter = promauto.NewCounterVec(
}, []string{"error"}) }, []string{"error"})
type artifactsUploadProcessor struct { type artifactsUploadProcessor struct {
opts *filestore.SaveFileOpts opts *destination.UploadOpts
format string format string
SavedFileTracker SavedFileTracker
...@@ -57,11 +57,11 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler { ...@@ -57,11 +57,11 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
}, "/authorize") }, "/authorize")
} }
func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *filestore.FileHandler) (*filestore.FileHandler, error) { func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler) (*destination.FileHandler, error) {
metaReader, metaWriter := io.Pipe() metaReader, metaWriter := io.Pipe()
defer metaWriter.Close() defer metaWriter.Close()
metaOpts := &filestore.SaveFileOpts{ metaOpts := &destination.UploadOpts{
LocalTempPath: a.opts.LocalTempPath, LocalTempPath: a.opts.LocalTempPath,
TempFilePrefix: "metadata.gz", TempFilePrefix: "metadata.gz",
} }
...@@ -86,12 +86,12 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, ...@@ -86,12 +86,12 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
type saveResult struct { type saveResult struct {
error error
*filestore.FileHandler *destination.FileHandler
} }
done := make(chan saveResult) done := make(chan saveResult)
go func() { go func() {
var result saveResult var result saveResult
result.FileHandler, result.error = filestore.SaveFileFromReader(ctx, metaReader, -1, metaOpts) result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, metaOpts)
done <- result done <- result
}() }()
...@@ -119,7 +119,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, ...@@ -119,7 +119,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
return result.FileHandler, result.error return result.FileHandler, result.error
} }
func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error { func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error {
// ProcessFile for artifacts requires file form-data field name to eq `file` // ProcessFile for artifacts requires file form-data field name to eq `file`
if formName != "file" { if formName != "file" {
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
"strings" "strings"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
// RequestBody is a request middleware. It will store the request body to // RequestBody is a request middleware. It will store the request body to
...@@ -23,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { ...@@ -23,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return return
} }
fh, err := filestore.SaveFileFromReader(r.Context(), r.Body, r.ContentLength, opts) fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, opts)
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err)) helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err))
return return
......
...@@ -15,8 +15,8 @@ import ( ...@@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
const ( const (
...@@ -169,8 +169,8 @@ type alwaysLocalPreparer struct { ...@@ -169,8 +169,8 @@ type alwaysLocalPreparer struct {
prepareError error prepareError error
} }
func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := filestore.GetOpts(&api.Response{TempPath: os.TempDir()}) opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -180,7 +180,7 @@ func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, ...@@ -180,7 +180,7 @@ func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts,
type alwaysFailsVerifier struct{} type alwaysFailsVerifier struct{}
func (alwaysFailsVerifier) Verify(handler *filestore.FileHandler) error { func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error {
return fmt.Errorf("Verification failed") return fmt.Errorf("Verification failed")
} }
...@@ -188,7 +188,7 @@ type mockVerifier struct { ...@@ -188,7 +188,7 @@ type mockVerifier struct {
invoked bool invoked bool
} }
func (m *mockVerifier) Verify(handler *filestore.FileHandler) error { func (m *mockVerifier) Verify(handler *destination.FileHandler) error {
m.invoked = true m.invoked = true
return nil return nil
......
package filestore package destination
import ( import (
"context" "context"
...@@ -14,8 +14,9 @@ import ( ...@@ -14,8 +14,9 @@ import (
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
) )
type SizeError error type SizeError error
...@@ -107,9 +108,9 @@ type consumer interface { ...@@ -107,9 +108,9 @@ type consumer interface {
Consume(context.Context, io.Reader, time.Time) (int64, error) Consume(context.Context, io.Reader, time.Time) (int64, error)
} }
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done // Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails. // Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (*FileHandler, error) { func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts) (*FileHandler, error) {
fh := &FileHandler{ fh := &FileHandler{
Name: opts.TempFilePrefix, Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID, RemoteID: opts.RemoteID,
...@@ -126,7 +127,7 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -126,7 +127,7 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
switch { switch {
case opts.IsLocal(): case opts.IsLocal():
clientMode = "local" clientMode = "local"
uploadDestination, err = fh.uploadLocalFile(ctx, opts) uploadDestination, err = fh.newLocalFile(ctx, opts)
case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud(): case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud():
clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider) clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
p := &objectstore.GoCloudObjectParams{ p := &objectstore.GoCloudObjectParams{
...@@ -210,16 +211,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -210,16 +211,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
return fh, nil return fh, nil
} }
func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) (consumer, error) { func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (consumer, error) {
// make sure TempFolder exists // make sure TempFolder exists
err := os.MkdirAll(opts.LocalTempPath, 0700) err := os.MkdirAll(opts.LocalTempPath, 0700)
if err != nil { if err != nil {
return nil, fmt.Errorf("uploadLocalFile: mkdir %q: %v", opts.LocalTempPath, err) return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err)
} }
file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix) file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix)
if err != nil { if err != nil {
return nil, fmt.Errorf("uploadLocalFile: create file: %v", err) return nil, fmt.Errorf("newLocalFile: create file: %v", err)
} }
go func() { go func() {
...@@ -228,32 +229,5 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) ...@@ -228,32 +229,5 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts)
}() }()
fh.LocalPath = file.Name() fh.LocalPath = file.Name()
return &localUpload{file}, nil return &filestore.LocalFile{File: file}, nil
}
type localUpload struct{ io.WriteCloser }
func (loc *localUpload) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) {
n, err := io.Copy(loc.WriteCloser, r)
errClose := loc.Close()
if err == nil {
err = errClose
}
return n, err
}
// SaveFileFromDisk open the local file fileName and calls SaveFileFromReader
func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) {
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return nil, err
}
return SaveFileFromReader(ctx, file, fi.Size(), opts)
} }
package filestore
import (
"context"
"io"
"time"
)
type LocalFile struct {
File io.WriteCloser
}
func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) {
n, err := io.Copy(lf.File, r)
errClose := lf.File.Close()
if err == nil {
err = errClose
}
return n, err
}
package filestore
import (
"context"
"io/ioutil"
"os"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestConsume(t *testing.T) {
f, err := ioutil.TempFile("", "filestore-local-file")
if f != nil {
defer os.Remove(f.Name())
}
require.NoError(t, err)
defer f.Close()
localFile := &LocalFile{File: f}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
content := "file content"
reader := strings.NewReader(content)
var deadline time.Time
n, err := localFile.Consume(ctx, reader, deadline)
require.NoError(t, err)
require.Equal(t, int64(len(content)), n)
consumedContent, err := ioutil.ReadFile(f.Name())
require.NoError(t, err)
require.Equal(t, content, string(consumedContent))
}
package filestore package destination
import ( import (
"crypto/md5" "crypto/md5"
......
...@@ -9,9 +9,9 @@ import ( ...@@ -9,9 +9,9 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
func TestGoCloudObjectUpload(t *testing.T) { func TestGoCloudObjectUpload(t *testing.T) {
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
func TestMultipartUploadWithUpcaseETags(t *testing.T) { func TestMultipartUploadWithUpcaseETags(t *testing.T) {
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
const testTimeout = 10 * time.Second const testTimeout = 10 * time.Second
......
...@@ -18,9 +18,9 @@ import ( ...@@ -18,9 +18,9 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
type failedReader struct { type failedReader struct {
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
"strings" "strings"
"sync" "sync"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
) )
type partsEtagMap map[int]string type partsEtagMap map[int]string
......
package filestore package destination
import ( import (
"errors" "errors"
...@@ -27,8 +27,8 @@ type ObjectStorageConfig struct { ...@@ -27,8 +27,8 @@ type ObjectStorageConfig struct {
GoCloudConfig config.GoCloudConfig GoCloudConfig config.GoCloudConfig
} }
// SaveFileOpts represents all the options available for saving a file to object store // UploadOpts represents all the options available for saving a file to object store
type SaveFileOpts struct { type UploadOpts struct {
// TempFilePrefix is the prefix used to create temporary local file // TempFilePrefix is the prefix used to create temporary local file
TempFilePrefix string TempFilePrefix string
// LocalTempPath is the directory where to write a local copy of the file // LocalTempPath is the directory where to write a local copy of the file
...@@ -66,28 +66,28 @@ type SaveFileOpts struct { ...@@ -66,28 +66,28 @@ type SaveFileOpts struct {
} }
// UseWorkhorseClientEnabled checks if the options require direct access to object storage // UseWorkhorseClientEnabled checks if the options require direct access to object storage
func (s *SaveFileOpts) UseWorkhorseClientEnabled() bool { func (s *UploadOpts) UseWorkhorseClientEnabled() bool {
return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != "" return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != ""
} }
// IsLocal checks if the options require the writing of the file on disk // IsLocal checks if the options require the writing of the file on disk
func (s *SaveFileOpts) IsLocal() bool { func (s *UploadOpts) IsLocal() bool {
return s.LocalTempPath != "" return s.LocalTempPath != ""
} }
// IsMultipart checks if the options requires a Multipart upload // IsMultipart checks if the options requires a Multipart upload
func (s *SaveFileOpts) IsMultipart() bool { func (s *UploadOpts) IsMultipart() bool {
return s.PartSize > 0 return s.PartSize > 0
} }
// GetOpts converts GitLab api.Response to a proper SaveFileOpts // GetOpts converts GitLab api.Response to a proper UploadOpts
func GetOpts(apiResponse *api.Response) (*SaveFileOpts, error) { func GetOpts(apiResponse *api.Response) (*UploadOpts, error) {
timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second
if timeout == 0 { if timeout == 0 {
timeout = DefaultObjectStoreTimeout timeout = DefaultObjectStoreTimeout
} }
opts := SaveFileOpts{ opts := UploadOpts{
LocalTempPath: apiResponse.TempPath, LocalTempPath: apiResponse.TempPath,
RemoteID: apiResponse.RemoteObject.ID, RemoteID: apiResponse.RemoteObject.ID,
RemoteURL: apiResponse.RemoteObject.GetURL, RemoteURL: apiResponse.RemoteObject.GetURL,
......
package filestore_test package destination_test
import ( import (
"testing" "testing"
...@@ -8,11 +8,11 @@ import ( ...@@ -8,11 +8,11 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
) )
func TestSaveFileOptsLocalAndRemote(t *testing.T) { func TestUploadOptsLocalAndRemote(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
localTempPath string localTempPath string
...@@ -43,7 +43,7 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) { ...@@ -43,7 +43,7 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
opts := filestore.SaveFileOpts{ opts := destination.UploadOpts{
LocalTempPath: test.localTempPath, LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut, PresignedPut: test.presignedPut,
PartSize: test.partSize, PartSize: test.partSize,
...@@ -106,7 +106,7 @@ func TestGetOpts(t *testing.T) { ...@@ -106,7 +106,7 @@ func TestGetOpts(t *testing.T) {
}, },
} }
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse) opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
...@@ -155,22 +155,22 @@ func TestGetOptsFail(t *testing.T) { ...@@ -155,22 +155,22 @@ func TestGetOptsFail(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
_, err := filestore.GetOpts(tc.in) _, err := destination.GetOpts(tc.in)
require.Error(t, err, "expect input to be rejected") require.Error(t, err, "expect input to be rejected")
}) })
} }
} }
func TestGetOptsDefaultTimeout(t *testing.T) { func TestGetOptsDefaultTimeout(t *testing.T) {
deadline := time.Now().Add(filestore.DefaultObjectStoreTimeout) deadline := time.Now().Add(destination.DefaultObjectStoreTimeout)
opts, err := filestore.GetOpts(&api.Response{TempPath: "/foo/bar"}) opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"})
require.NoError(t, err) require.NoError(t, err)
require.WithinDuration(t, deadline, opts.Deadline, time.Minute) require.WithinDuration(t, deadline, opts.Deadline, time.Minute)
} }
func TestUseWorkhorseClientEnabled(t *testing.T) { func TestUseWorkhorseClientEnabled(t *testing.T) {
cfg := filestore.ObjectStorageConfig{ cfg := destination.ObjectStorageConfig{
Provider: "AWS", Provider: "AWS",
S3Config: config.S3Config{ S3Config: config.S3Config{
Bucket: "test-bucket", Bucket: "test-bucket",
...@@ -195,7 +195,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { ...@@ -195,7 +195,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name string name string
UseWorkhorseClient bool UseWorkhorseClient bool
remoteTempObjectID string remoteTempObjectID string
objectStorageConfig filestore.ObjectStorageConfig objectStorageConfig destination.ObjectStorageConfig
expected bool expected bool
}{ }{
{ {
...@@ -243,7 +243,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { ...@@ -243,7 +243,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name: "missing S3 bucket", name: "missing S3 bucket",
UseWorkhorseClient: true, UseWorkhorseClient: true,
remoteTempObjectID: "test-object", remoteTempObjectID: "test-object",
objectStorageConfig: filestore.ObjectStorageConfig{ objectStorageConfig: destination.ObjectStorageConfig{
Provider: "AWS", Provider: "AWS",
S3Config: config.S3Config{}, S3Config: config.S3Config{},
}, },
...@@ -269,7 +269,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { ...@@ -269,7 +269,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
}, },
} }
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse) opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err) require.NoError(t, err)
opts.ObjectStorageConfig = test.objectStorageConfig opts.ObjectStorageConfig = test.objectStorageConfig
...@@ -323,7 +323,7 @@ func TestGoCloudConfig(t *testing.T) { ...@@ -323,7 +323,7 @@ func TestGoCloudConfig(t *testing.T) {
}, },
} }
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts, err := filestore.GetOpts(apiResponse) opts, err := destination.GetOpts(apiResponse)
require.NoError(t, err) require.NoError(t, err)
opts.ObjectStorageConfig.URLMux = mux opts.ObjectStorageConfig.URLMux = mux
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
type object struct { type object struct {
...@@ -13,7 +13,7 @@ type object struct { ...@@ -13,7 +13,7 @@ type object struct {
oid string oid string
} }
func (l *object) Verify(fh *filestore.FileHandler) error { func (l *object) Verify(fh *destination.FileHandler) error {
if fh.Size != l.size { if fh.Size != l.size {
return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size) return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size)
} }
...@@ -35,7 +35,7 @@ func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer { ...@@ -35,7 +35,7 @@ func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer {
return &uploadPreparer{objectPreparer: objectPreparer} return &uploadPreparer{objectPreparer: objectPreparer}
} }
func (l *uploadPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, _, err := l.objectPreparer.Prepare(a) opts, _, err := l.objectPreparer.Prepare(a)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
......
...@@ -3,7 +3,7 @@ package upload ...@@ -3,7 +3,7 @@ package upload
import ( import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
type ObjectStoragePreparer struct { type ObjectStoragePreparer struct {
...@@ -18,8 +18,8 @@ func NewObjectStoragePreparer(c config.Config) Preparer { ...@@ -18,8 +18,8 @@ func NewObjectStoragePreparer(c config.Config) Preparer {
return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig} return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig}
} }
func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := filestore.GetOpts(a) opts, err := destination.GetOpts(a)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
......
...@@ -2,7 +2,7 @@ package upload ...@@ -2,7 +2,7 @@ package upload
import ( import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
// Verifier is an optional pluggable behavior for upload paths. If // Verifier is an optional pluggable behavior for upload paths. If
...@@ -12,22 +12,22 @@ import ( ...@@ -12,22 +12,22 @@ import (
// checksum of the uploaded file. // checksum of the uploaded file.
type Verifier interface { type Verifier interface {
// Verify can abort the upload by returning an error // Verify can abort the upload by returning an error
Verify(handler *filestore.FileHandler) error Verify(handler *destination.FileHandler) error
} }
// Preparer is a pluggable behavior that interprets a Rails API response // Preparer is a pluggable behavior that interprets a Rails API response
// and either tells Workhorse how to handle the upload, via the // and either tells Workhorse how to handle the upload, via the
// SaveFileOpts and Verifier, or it rejects the request by returning a // UploadOpts and Verifier, or it rejects the request by returning a
// non-nil error. Its intended use is to make sure the upload gets stored // non-nil error. Its intended use is to make sure the upload gets stored
// in the right location: either a local directory, or one of several // in the right location: either a local directory, or one of several
// supported object storage backends. // supported object storage backends.
type Preparer interface { type Preparer interface {
Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error)
} }
type DefaultPreparer struct{} type DefaultPreparer struct{}
func (s *DefaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
opts, err := filestore.GetOpts(a) opts, err := destination.GetOpts(a)
return opts, nil, err return opts, nil, err
} }
...@@ -21,8 +21,8 @@ import ( ...@@ -21,8 +21,8 @@ import (
"golang.org/x/image/tiff" "golang.org/x/image/tiff"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser" "gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
) )
...@@ -68,7 +68,7 @@ type rewriter struct { ...@@ -68,7 +68,7 @@ type rewriter struct {
finalizedFields map[string]bool finalizedFields map[string]bool
} }
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) error { func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) error {
// Create multipart reader // Create multipart reader
reader, err := r.MultipartReader() reader, err := r.MultipartReader()
if err != nil { if err != nil {
...@@ -128,7 +128,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s ...@@ -128,7 +128,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s
return params["name"], params["filename"] return params["name"], params["filename"]
} }
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *filestore.SaveFileOpts) error { func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *destination.UploadOpts) error {
if rew.filter.Count() >= maxFilesAllowed { if rew.filter.Count() >= maxFilesAllowed {
return ErrTooManyFilesUploaded return ErrTooManyFilesUploaded
} }
...@@ -164,10 +164,10 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa ...@@ -164,10 +164,10 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
defer inputReader.Close() defer inputReader.Close()
fh, err := filestore.SaveFileFromReader(ctx, inputReader, -1, opts) fh, err := destination.Upload(ctx, inputReader, -1, opts)
if err != nil { if err != nil {
switch err { switch err {
case filestore.ErrEntityTooLarge, exif.ErrRemovingExif: case destination.ErrEntityTooLarge, exif.ErrRemovingExif:
return err return err
default: default:
return fmt.Errorf("persisting multipart file: %v", err) return fmt.Errorf("persisting multipart file: %v", err)
......
...@@ -6,8 +6,8 @@ import ( ...@@ -6,8 +6,8 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
type SavedFileTracker struct { type SavedFileTracker struct {
...@@ -26,7 +26,7 @@ func (s *SavedFileTracker) Count() int { ...@@ -26,7 +26,7 @@ func (s *SavedFileTracker) Count() int {
return len(s.rewrittenFields) return len(s.rewrittenFields)
} }
func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *filestore.FileHandler, _ *multipart.Writer) error { func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler, _ *multipart.Writer) error {
if _, ok := s.rewrittenFields[fieldName]; ok { if _, ok := s.rewrittenFields[fieldName]; ok {
return fmt.Errorf("the %v field has already been processed", fieldName) return fmt.Errorf("the %v field has already been processed", fieldName)
} }
......
...@@ -10,8 +10,8 @@ import ( ...@@ -10,8 +10,8 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
) )
func TestSavedFileTracking(t *testing.T) { func TestSavedFileTracking(t *testing.T) {
...@@ -23,7 +23,7 @@ func TestSavedFileTracking(t *testing.T) { ...@@ -23,7 +23,7 @@ func TestSavedFileTracking(t *testing.T) {
tracker := SavedFileTracker{Request: r} tracker := SavedFileTracker{Request: r}
require.Equal(t, "accelerate", tracker.Name()) require.Equal(t, "accelerate", tracker.Name())
file := &filestore.FileHandler{} file := &destination.FileHandler{}
ctx := context.Background() ctx := context.Background()
tracker.ProcessFile(ctx, "test", file, nil) tracker.ProcessFile(ctx, "test", file, nil)
require.Equal(t, 1, tracker.Count()) require.Equal(t, 1, tracker.Count())
...@@ -40,7 +40,7 @@ func TestSavedFileTracking(t *testing.T) { ...@@ -40,7 +40,7 @@ func TestSavedFileTracking(t *testing.T) {
func TestDuplicatedFileProcessing(t *testing.T) { func TestDuplicatedFileProcessing(t *testing.T) {
tracker := SavedFileTracker{} tracker := SavedFileTracker{}
file := &filestore.FileHandler{} file := &destination.FileHandler{}
require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil)) require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil))
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
"github.com/golang-jwt/jwt/v4" "github.com/golang-jwt/jwt/v4"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
) )
...@@ -31,7 +31,7 @@ type MultipartClaims struct { ...@@ -31,7 +31,7 @@ type MultipartClaims struct {
// MultipartFormProcessor abstracts away implementation differences // MultipartFormProcessor abstracts away implementation differences
// between generic MIME multipart file uploads and CI artifact uploads. // between generic MIME multipart file uploads and CI artifact uploads.
type MultipartFormProcessor interface { type MultipartFormProcessor interface {
ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error
ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error
Finalize(ctx context.Context) error Finalize(ctx context.Context) error
Name() string Name() string
...@@ -40,7 +40,7 @@ type MultipartFormProcessor interface { ...@@ -40,7 +40,7 @@ type MultipartFormProcessor interface {
// interceptMultipartFiles is the core of the implementation of // interceptMultipartFiles is the core of the implementation of
// Multipart. // Multipart.
func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) { func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) {
var body bytes.Buffer var body bytes.Buffer
writer := multipart.NewWriter(&body) writer := multipart.NewWriter(&body)
defer writer.Close() defer writer.Close()
...@@ -55,7 +55,7 @@ func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand ...@@ -55,7 +55,7 @@ func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand
helper.CaptureAndFail(w, r, err, err.Error(), http.StatusBadRequest) helper.CaptureAndFail(w, r, err, err.Error(), http.StatusBadRequest)
case http.ErrNotMultipart: case http.ErrNotMultipart:
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
case filestore.ErrEntityTooLarge: case destination.ErrEntityTooLarge:
helper.RequestEntityTooLarge(w, r, err) helper.RequestEntityTooLarge(w, r, err)
case zipartifacts.ErrBadMetadata: case zipartifacts.ErrBadMetadata:
helper.RequestEntityTooLarge(w, r, err) helper.RequestEntityTooLarge(w, r, err)
......
...@@ -22,9 +22,9 @@ import ( ...@@ -22,9 +22,9 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
) )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment