Commit 73e46e40 authored by Alessio Caiazza's avatar Alessio Caiazza Committed by Nick Thomas

Multipart Upload support

parent 26d2606f
......@@ -67,6 +67,17 @@ func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTrippe
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
type MultipartUploadParams struct {
// PartSize is the exact size of each uploaded part. Only the last one can be smaller
PartSize int64
// PartURLs contains the presigned URLs for each part
PartURLs []string
// CompleteURL is a presigned URL for CompleteMulipartUpload
CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string
}
type RemoteObject struct {
// GetURL is an S3 GetObject URL
GetURL string
......@@ -78,6 +89,8 @@ type RemoteObject struct {
ID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL
Timeout int
// MultipartUpload contains presigned URLs for S3 MultipartUpload
MultipartUpload *MultipartUploadParams
}
type Response struct {
......
......@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)
......@@ -273,3 +274,45 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) {
os, server := test.StartObjectStore()
defer server.Close()
err := os.InitiateMultipartUpload(test.ObjectPath)
require.NoError(t, err)
objectURL := server.URL + test.ObjectPath
partSize := int64(1)
uploadSize := 10
preauth := api.Response{
RemoteObject: api.RemoteObject{
ID: "store-id",
MultipartUpload: &api.MultipartUploadParams{
PartSize: partSize,
PartURLs: []string{objectURL + "?partNumber=1"},
AbortURL: objectURL, // DELETE
CompleteURL: objectURL, // POST
},
},
}
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
ts := testArtifactsUploadServer(t, preauth, responseProcessor)
defer ts.Close()
contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize))
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, http.StatusRequestEntityTooLarge)
// Poll because AbortMultipartUpload is async
for i := 0; os.IsMultipartUpload(test.ObjectPath) && i < 100; i++ {
time.Sleep(10 * time.Millisecond)
}
assert.False(t, os.IsMultipartUpload(test.ObjectPath), "MultipartUpload should not be in progress anymore")
assert.Empty(t, os.GetObjectMD5(test.ObjectPath), "upload should have failed, so the object should not exists")
}
......@@ -12,9 +12,11 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
type MD5Error error
type SizeError error
// ErrEntityTooLarge means that the uploaded content is bigger then maximum allowed size
var ErrEntityTooLarge = errors.New("Entity is too large")
// FileHandler represent a file that has been processed for upload
// it may be either uploaded to an ObjectStore and/or saved on local path.
type FileHandler struct {
......@@ -81,7 +83,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
var object *objectstore.Object
var remoteWriter io.WriteCloser
fh = &FileHandler{
Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID,
......@@ -97,13 +99,20 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
}()
if opts.IsRemote() {
object, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Timeout, size)
if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.Deadline, opts.PartSize)
if err != nil {
return nil, err
}
writers = append(writers, object)
writers = append(writers, remoteWriter)
} else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Deadline, size)
if err != nil {
return nil, err
}
writers = append(writers, remoteWriter)
}
if opts.IsLocal() {
......@@ -133,13 +142,12 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
if opts.IsRemote() {
// we need to close the writer in order to get ETag header
err = object.Close()
err = remoteWriter.Close()
if err != nil {
return nil, err
if err == objectstore.ErrNotEnoughParts {
return nil, ErrEntityTooLarge
}
if fh.MD5() != object.MD5() {
return nil, MD5Error(fmt.Errorf("expected md5 %s, got %s", fh.MD5(), object.MD5()))
return nil, err
}
}
......
......@@ -19,6 +19,10 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func testDeadline() time.Time {
return time.Now().Add(filestore.DefaultObjectStoreTimeout)
}
func assertFileGetsRemovedAsync(t *testing.T, filePath string) {
var err error
......@@ -76,33 +80,6 @@ func TestSaveFromDiskNotExistingFile(t *testing.T) {
assert.Nil(fh, "On error FileHandler should be nil")
}
func TestSaveFileWrongMD5(t *testing.T) {
assert := assert.New(t)
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
objectURL := ts.URL + test.ObjectPath
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
_, isMD5Error := err.(filestore.MD5Error)
assert.True(isMD5Error, "Should fail with MD5Error")
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
}
func TestSaveFileFromDiskToLocalPath(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
......@@ -132,6 +109,12 @@ func TestSaveFileFromDiskToLocalPath(t *testing.T) {
}
func TestSaveFile(t *testing.T) {
type remote int
const (
remoteSingle remote = iota
remoteMultipart
)
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
......@@ -139,11 +122,13 @@ func TestSaveFile(t *testing.T) {
tests := []struct {
name string
local bool
remote bool
remote remote
}{
{name: "Local only", local: true},
{name: "Remote only", remote: true},
{name: "Both", local: true, remote: true},
{name: "Remote Single only", remote: remoteSingle},
{name: "Remote Single and Local", local: true, remote: remoteSingle},
{name: "Remote Multipart only", remote: remoteMultipart},
{name: "Remote Multipart and Local", local: true, remote: remoteMultipart},
}
for _, spec := range tests {
......@@ -156,16 +141,32 @@ func TestSaveFile(t *testing.T) {
osStub, ts := test.StartObjectStore()
defer ts.Close()
if spec.remote {
switch spec.remote {
case remoteSingle:
objectURL := ts.URL + test.ObjectPath
opts.RemoteID = "test-file"
opts.RemoteURL = objectURL
opts.PresignedPut = objectURL + "?Signature=ASignature"
opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
opts.Deadline = testDeadline()
expectedDeletes = 1
expectedPuts = 1
case remoteMultipart:
objectURL := ts.URL + test.ObjectPath
opts.RemoteID = "test-file"
opts.RemoteURL = objectURL
opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
opts.PartSize = int64(len(test.ObjectContent)/2) + 1
opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}
opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature"
opts.Deadline = testDeadline()
osStub.InitiateMultipartUpload(test.ObjectPath)
expectedDeletes = 1
expectedPuts = 2
}
if spec.local {
......@@ -223,3 +224,33 @@ func TestSaveFile(t *testing.T) {
})
}
}
func TestSaveMultipartInBodyFailure(t *testing.T) {
assert := assert.New(t)
osStub, ts := test.StartObjectStore()
defer ts.Close()
// this is a broken path because it contains bucket name but no key
// this is the only way to get an in-body failure from our ObjectStoreStub
objectPath := "/bucket-but-no-object-key"
objectURL := ts.URL + objectPath
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PartSize: test.ObjectSize,
PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
Deadline: testDeadline(),
}
osStub.InitiateMultipartUpload(objectPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
assert.Nil(fh)
require.Error(t, err)
assert.EqualError(err, test.MultipartUploadInternalError().Error())
}
package filestore
import (
"net/url"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
// DefaultObjectStoreTimeout is the timeout for ObjectStore upload operation
const DefaultObjectStoreTimeout = 4 * time.Hour
// SaveFileOpts represents all the options available for saving a file to object store
type SaveFileOpts struct {
// TempFilePrefix is the prefix used to create temporary local file
......@@ -22,8 +23,18 @@ type SaveFileOpts struct {
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
// Timeout it the S3 operation timeout. If 0, objectstore.DefaultObjectStoreTimeout will be used
Timeout time.Duration
// Deadline it the S3 operation deadline, the upload will be aborted if not completed in time
Deadline time.Time
//MultipartUpload parameters
// PartSize is the exact size of each uploaded part. Only the last one can be smaller
PartSize int64
// PresignedParts contains the presigned URLs for each part
PresignedParts []string
// PresignedCompleteMultipart is a presigned URL for CompleteMulipartUpload
PresignedCompleteMultipart string
// PresignedAbortMultipart is a presigned URL for AbortMultipartUpload
PresignedAbortMultipart string
}
// IsLocal checks if the options require the writing of the file on disk
......@@ -33,35 +44,36 @@ func (s *SaveFileOpts) IsLocal() bool {
// IsRemote checks if the options requires a remote upload
func (s *SaveFileOpts) IsRemote() bool {
return s.PresignedPut != ""
return s.PresignedPut != "" || s.IsMultipart()
}
func (s *SaveFileOpts) isGoogleCloudStorage() bool {
if !s.IsRemote() {
return false
}
getURL, err := url.Parse(s.RemoteURL)
if err != nil {
return false
}
return objectstore.IsGoogleCloudStorage(getURL)
// IsMultipart checks if the options requires a Multipart upload
func (s *SaveFileOpts) IsMultipart() bool {
return s.PartSize > 0
}
// GetOpts converts GitLab api.Response to a proper SaveFileOpts
func GetOpts(apiResponse *api.Response) *SaveFileOpts {
timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second
if timeout == 0 {
timeout = objectstore.DefaultObjectStoreTimeout
timeout = DefaultObjectStoreTimeout
}
return &SaveFileOpts{
opts := SaveFileOpts{
LocalTempPath: apiResponse.TempPath,
RemoteID: apiResponse.RemoteObject.ID,
RemoteURL: apiResponse.RemoteObject.GetURL,
PresignedPut: apiResponse.RemoteObject.StoreURL,
PresignedDelete: apiResponse.RemoteObject.DeleteURL,
Timeout: timeout,
Deadline: time.Now().Add(timeout),
}
if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil {
opts.PartSize = multiParams.PartSize
opts.PresignedCompleteMultipart = multiParams.CompleteURL
opts.PresignedAbortMultipart = multiParams.AbortURL
opts.PresignedParts = append([]string(nil), multiParams.PartURLs...)
}
return &opts
}
......@@ -8,7 +8,6 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
func TestSaveFileOptsLocalAndRemote(t *testing.T) {
......@@ -16,8 +15,10 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
name string
localTempPath string
presignedPut string
partSize int64
isLocal bool
isRemote bool
isMultipart bool
}{
{
name: "Only LocalTempPath",
......@@ -39,6 +40,20 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
presignedPut: "http://example.com",
isRemote: true,
},
{
name: "Multipart",
partSize: 10,
isRemote: true,
isMultipart: true,
},
{
name: "Multipart and Local",
partSize: 10,
localTempPath: "/tmp",
isRemote: true,
isMultipart: true,
isLocal: true,
},
}
for _, test := range tests {
......@@ -49,16 +64,37 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
opts := filestore.SaveFileOpts{
LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut,
PartSize: test.partSize,
}
assert.Equal(test.isLocal, opts.IsLocal(), "IsLocal() mismatch")
assert.Equal(test.isRemote, opts.IsRemote(), "IsRemote() mismatch")
assert.Equal(test.isMultipart, opts.IsMultipart(), "IsMultipart() mismatch")
})
}
}
func TestGetOpts(t *testing.T) {
tests := []struct {
name string
multipart *api.MultipartUploadParams
}{
{
name: "Single upload",
}, {
name: "Multipart upload",
multipart: &api.MultipartUploadParams{
PartSize: 10,
CompleteURL: "http://complete",
AbortURL: "http://abort",
PartURLs: []string{"http://part1", "http://part2"},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert := assert.New(t)
apiResponse := &api.Response{
TempPath: "/tmp",
......@@ -68,23 +104,40 @@ func TestGetOpts(t *testing.T) {
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
MultipartUpload: test.multipart,
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts := filestore.GetOpts(apiResponse)
assert.Equal(apiResponse.TempPath, opts.LocalTempPath)
assert.Equal(time.Duration(apiResponse.RemoteObject.Timeout)*time.Second, opts.Timeout)
assert.WithinDuration(deadline, opts.Deadline, time.Second)
assert.Equal(apiResponse.RemoteObject.ID, opts.RemoteID)
assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
if test.multipart == nil {
assert.False(opts.IsMultipart())
assert.Empty(opts.PresignedCompleteMultipart)
assert.Empty(opts.PresignedAbortMultipart)
assert.Zero(opts.PartSize)
assert.Empty(opts.PresignedParts)
} else {
assert.True(opts.IsMultipart())
assert.Equal(test.multipart.CompleteURL, opts.PresignedCompleteMultipart)
assert.Equal(test.multipart.AbortURL, opts.PresignedAbortMultipart)
assert.Equal(test.multipart.PartSize, opts.PartSize)
assert.Equal(test.multipart.PartURLs, opts.PresignedParts)
}
})
}
}
func TestGetOptsDefaultTimeout(t *testing.T) {
assert := assert.New(t)
deadline := time.Now().Add(filestore.DefaultObjectStoreTimeout)
opts := filestore.GetOpts(&api.Response{})
assert.Equal(objectstore.DefaultObjectStoreTimeout, opts.Timeout)
assert.WithinDuration(deadline, opts.Deadline, time.Minute)
}
package objectstore
import (
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
var ErrNotEnoughParts = errors.New("Not enough Parts")
// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Multipart struct {
// CompleteURL is a presigned URL for CompleteMultipartUpload
CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
uploader
}
// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, deadline time.Time, partSize int64) (*Multipart, error) {
pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
m := &Multipart{
CompleteURL: completeURL,
AbortURL: abortURL,
DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw),
}
go m.trackUploadTime()
go m.cleanup(ctx)
objectStorageUploadsOpen.Inc()
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(m.uploadError)
}()
cmu := &CompleteMultipartUpload{}
for i, partURL := range partURLs {
src := io.LimitReader(pr, partSize)
part, err := m.readAndUploadOnePart(partURL, src, i+1)
if err != nil {
m.uploadError = err
return
}
if part == nil {
break
} else {
cmu.Part = append(cmu.Part, part)
}
}
n, err := io.Copy(ioutil.Discard, pr)
if err != nil {
m.uploadError = fmt.Errorf("Cannot drain pipe: %v", err)
return
}
if n > 0 {
m.uploadError = ErrNotEnoughParts
return
}
if err := m.complete(cmu); err != nil {
m.uploadError = err
return
}
}()
return m, nil
}
func (m *Multipart) trackUploadTime() {
started := time.Now()
<-m.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
func (m *Multipart) cleanup(ctx context.Context) {
// wait for the upload to finish
<-m.ctx.Done()
if m.uploadError != nil {
objectStorageUploadRequestsRequestFailed.Inc()
m.abort()
return
}
// We have now succesfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it.
m.delete()
}
func (m *Multipart) complete(cmu *CompleteMultipartUpload) error {
body, err := xml.Marshal(cmu)
if err != nil {
return fmt.Errorf("Cannot marshal CompleteMultipartUpload request: %v", err)
}
req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("Cannot create CompleteMultipartUpload request: %v", err)
}
req.ContentLength = int64(len(body))
req.Header.Set("Content-Type", "application/xml")
req = req.WithContext(m.ctx)
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("CompleteMultipartUpload request %q: %v", helper.ScrubURLParams(m.CompleteURL), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", helper.ScrubURLParams(m.CompleteURL), resp.Status)
}
result := &compoundCompleteMultipartUploadResult{}
decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(&result); err != nil {
return fmt.Errorf("Cannot decode CompleteMultipartUpload answer: %v", err)
}
if result.isError() {
return result
}
return nil
}
func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
file, err := ioutil.TempFile("", "part-buffer")
if err != nil {
return nil, fmt.Errorf("Unable to create a temporary file for buffering: %v", err)
}
defer func(path string) {
if err := os.Remove(path); err != nil {
log.WithError(err).WithField("file", path).Warning("Unable to delete temporary file")
}
}(file.Name())
n, err := io.Copy(file, src)
if err != nil {
return nil, fmt.Errorf("Cannot write part %d to disk: %v", partNumber, err)
}
if n == 0 {
return nil, nil
}
if _, err = file.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("Cannot rewind part %d temporary dump : %v", partNumber, err)
}
etag, err := m.uploadPart(partURL, file, n)
if err != nil {
return nil, fmt.Errorf("Cannot upload part %d: %v", partNumber, err)
}
return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
}
func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string, error) {
deadline, ok := m.ctx.Deadline()
if !ok {
return "", fmt.Errorf("Missing deadline")
}
part, err := newObject(m.ctx, url, "", deadline, size, false)
if err != nil {
return "", err
}
_, err = io.CopyN(part, body, size)
if err != nil {
return "", err
}
err = part.Close()
if err != nil {
return "", err
}
return part.MD5(), nil
}
func (m *Multipart) delete() {
m.syncAndDelete(m.DeleteURL)
}
func (m *Multipart) abort() {
m.syncAndDelete(m.AbortURL)
}
......@@ -7,16 +7,11 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// DefaultObjectStoreTimeout is the timeout for ObjectStore PutObject api calls
const DefaultObjectStoreTimeout = 360 * time.Second
// httpTransport defines a http.Transport with values
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more agressive connection closing
......@@ -38,11 +33,6 @@ var httpClient = &http.Client{
Transport: httpTransport,
}
// IsGoogleCloudStorage checks if the provided URL is from Google Cloud Storage service
func IsGoogleCloudStorage(u *url.URL) bool {
return strings.ToLower(u.Host) == "storage.googleapis.com"
}
type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service.
......@@ -55,47 +45,45 @@ type Object struct {
// md5 is the checksum provided by the Object Store
md5 string
// writeCloser is the writer bound to the PutObject body
writeCloser io.WriteCloser
// uploadError is the last error occourred during upload
uploadError error
// ctx is the internal context bound to the upload request
ctx context.Context
uploader
}
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Duration, size int64) (*Object, error) {
started := time.Now()
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
}
func NewObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, deadline, size, true)
}
func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now()
pr, pw := io.Pipe()
o.writeCloser = pw
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, o.PutURL, ioutil.NopCloser(pr))
req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr))
if err != nil {
if metrics {
objectStorageUploadRequestsRequestFailed.Inc()
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err)
}
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err)
}
req.ContentLength = size
req.Header.Set("Content-Type", "application/octet-stream")
if timeout == 0 {
timeout = DefaultObjectStoreTimeout
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw),
}
uploadCtx, cancelFn := context.WithTimeout(ctx, timeout)
o.ctx = uploadCtx
if metrics {
objectStorageUploadsOpen.Inc()
}
go func() {
// wait for the upload to finish
<-o.ctx.Done()
if metrics {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
// wait for provided context to finish before performing cleanup
<-ctx.Done()
......@@ -104,7 +92,9 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
go func() {
defer cancelFn()
if metrics {
defer objectStorageUploadsOpen.Dec()
}
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
......@@ -114,14 +104,18 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
resp, err := httpClient.Do(req)
if err != nil {
if metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
o.uploadError = fmt.Errorf("PUT request %q: %v", helper.ScrubURLParams(o.PutURL), err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if metrics {
objectStorageUploadRequestsInvalidStatus.Inc()
}
o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", helper.ScrubURLParams(o.PutURL), resp.Status))
return
}
......@@ -132,23 +126,6 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
return o, nil
}
// Write implements the standard io.Writer interface: it writes data to the PutObject body.
func (o *Object) Write(p []byte) (int, error) {
return o.writeCloser.Write(p)
}
// Close implements the standard io.Closer interface: it closes the http client request.
// This method will also wait for the connection to terminate and return any error occurred during the upload
func (o *Object) Close() error {
if err := o.writeCloser.Close(); err != nil {
return err
}
<-o.ctx.Done()
return o.uploadError
}
// MD5 returns the md5sum of the uploaded returned by the Object Store provider via ETag Header.
// This method will wait until upload context is done before returning.
func (o *Object) MD5() string {
......@@ -166,22 +143,5 @@ func (o *Object) extractMD5(h http.Header) {
}
func (o *Object) delete() {
if o.DeleteURL == "" {
return
}
<-o.ctx.Done()
req, err := http.NewRequest(http.MethodDelete, o.DeleteURL, nil)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return
}
resp, err := httpClient.Do(req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return
}
resp.Body.Close()
o.syncAndDelete(o.DeleteURL)
}
......@@ -33,7 +33,8 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, testTimeout, test.ObjectSize)
deadline := time.Now().Add(testTimeout)
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, deadline, test.ObjectSize)
require.NoError(t, err)
// copy data
......@@ -86,8 +87,9 @@ func TestObjectUpload404(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize)
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, test.ObjectSize)
require.NoError(err)
_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
......@@ -130,8 +132,9 @@ func TestObjectUploadBrokenConnection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, -1)
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, -1)
require.NoError(t, err)
_, copyErr := io.Copy(object, &endlessReader{})
......
package objectstore
import (
"encoding/xml"
"fmt"
)
// CompleteMultipartUpload is the S3 CompleteMultipartUpload body
type CompleteMultipartUpload struct {
Part []*completeMultipartUploadPart
}
type completeMultipartUploadPart struct {
PartNumber int
ETag string
}
// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request
type CompleteMultipartUploadResult struct {
Location string
Bucket string
Key string
ETag string
}
// CompleteMultipartUploadError is the in-body error structure
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples
// the answer contains other fields we are not using
type CompleteMultipartUploadError struct {
XMLName xml.Name `xml:"Error"`
Code string
Message string
}
func (c *CompleteMultipartUploadError) Error() string {
return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message)
}
// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError
// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult
type compoundCompleteMultipartUploadResult struct {
*CompleteMultipartUploadResult
*CompleteMultipartUploadError
// XMLName this overrides CompleteMultipartUploadError.XMLName tags
XMLName xml.Name
}
func (c *compoundCompleteMultipartUploadResult) isError() bool {
return c.CompleteMultipartUploadError != nil
}
......@@ -3,13 +3,21 @@ package test
import (
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
type partsEtagMap map[int]string
// ObjectstoreStub is a testing implementation of ObjectStore.
// Instead of storing objects it will just save md5sum.
type ObjectstoreStub struct {
......@@ -17,6 +25,8 @@ type ObjectstoreStub struct {
bucket map[string]string
// overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
overwriteMD5 map[string]string
// multipart is a map of MultipartUploads
multipart map[string]partsEtagMap
puts int
deletes int
......@@ -33,6 +43,7 @@ func StartObjectStore() (*ObjectstoreStub, *httptest.Server) {
func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) {
os := &ObjectstoreStub{
bucket: make(map[string]string),
multipart: make(map[string]partsEtagMap),
overwriteMD5: make(map[string]string),
}
......@@ -68,12 +79,49 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string {
return o.bucket[path]
}
// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
// It will return an error if a MultipartUpload is already in progress on that path
// InitiateMultipartUpload is only used during test setup.
// Workhorse's production code does not know how to initiate a multipart upload.
//
// Real S3 multipart uploads are more complicated than what we do here,
// but this is enough to verify that workhorse's production code behaves as intended.
func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error {
o.m.Lock()
defer o.m.Unlock()
if o.multipart[path] != nil {
return fmt.Errorf("MultipartUpload for %q already in progress", path)
}
o.multipart[path] = make(partsEtagMap)
return nil
}
// IsMultipartUpload check if the given path has a MultipartUpload in progress
func (o *ObjectstoreStub) IsMultipartUpload(path string) bool {
o.m.Lock()
defer o.m.Unlock()
return o.isMultipartUpload(path)
}
// isMultipartUpload is the lock free version of IsMultipartUpload
func (o *ObjectstoreStub) isMultipartUpload(path string) bool {
return o.multipart[path] != nil
}
func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
if _, ok := o.bucket[objectPath]; ok {
if o.isMultipartUpload(objectPath) {
o.deletes++
delete(o.multipart, objectPath)
w.WriteHeader(200)
} else if _, ok := o.bucket[objectPath]; ok {
o.deletes++
delete(o.bucket, objectPath)
......@@ -99,24 +147,115 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
}
o.puts++
if o.isMultipartUpload(objectPath) {
pNumber := r.URL.Query().Get("partNumber")
idx, err := strconv.Atoi(pNumber)
if err != nil {
http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400)
return
}
o.multipart[objectPath][idx] = etag
} else {
o.bucket[objectPath] = etag
}
w.Header().Set("ETag", etag)
w.WriteHeader(200)
}
func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError {
return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
}
func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
multipart := o.multipart[objectPath]
if multipart == nil {
http.Error(w, "Unknown MultipartUpload", 404)
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
var msg objectstore.CompleteMultipartUpload
err = xml.Unmarshal(buf, &msg)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
for _, part := range msg.Part {
etag := multipart[part.PartNumber]
if etag != part.ETag {
msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag)
http.Error(w, msg, 400)
return
}
}
etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten {
etag = "not an md5 hash"
}
o.bucket[objectPath] = etag
delete(o.multipart, objectPath)
w.Header().Set("ETag", etag)
split := strings.SplitN(objectPath[1:], "/", 2)
if len(split) < 2 {
encodeXMLAnswer(w, MultipartUploadInternalError())
return
}
bucket := split[0]
key := split[1]
answer := objectstore.CompleteMultipartUploadResult{
Location: r.URL.String(),
Bucket: bucket,
Key: key,
ETag: etag,
}
encodeXMLAnswer(w, answer)
}
func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) {
w.Header().Set("Content-Type", "text/xml")
enc := xml.NewEncoder(w)
if err := enc.Encode(answer); err != nil {
http.Error(w, err.Error(), 500)
}
}
func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
fmt.Println("ObjectStore Stub:", r.Method, r.URL.Path)
fmt.Println("ObjectStore Stub:", r.Method, r.URL.String())
if r.URL.Path == "" {
http.Error(w, "No path provided", 404)
return
}
switch r.Method {
case "DELETE":
o.removeObject(w, r)
case "PUT":
o.putObject(w, r)
case "POST":
o.completeMultipartUpload(w, r)
default:
w.WriteHeader(404)
}
......
package test
import (
"fmt"
"io"
"net/http"
"strings"
"testing"
......@@ -9,6 +11,20 @@ import (
"github.com/stretchr/testify/require"
)
func doRequest(method, url string, body io.Reader) error {
req, err := http.NewRequest(method, url, body)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
return resp.Body.Close()
}
func TestObjectStoreStub(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
......@@ -21,21 +37,13 @@ func TestObjectStoreStub(t *testing.T) {
objectURL := ts.URL + ObjectPath
req, err := http.NewRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent))
require.NoError(err)
_, err = http.DefaultClient.Do(req)
require.NoError(err)
require.NoError(doRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent)))
assert.Equal(1, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.Equal(ObjectMD5, stub.GetObjectMD5(ObjectPath))
req, err = http.NewRequest(http.MethodDelete, objectURL, nil)
require.NoError(err)
_, err = http.DefaultClient.Do(req)
require.NoError(err)
require.NoError(doRequest(http.MethodDelete, objectURL, nil))
assert.Equal(1, stub.PutsCnt())
assert.Equal(1, stub.DeletesCnt())
......@@ -55,7 +63,121 @@ func TestObjectStoreStubDelete404(t *testing.T) {
resp, err := http.DefaultClient.Do(req)
require.NoError(err)
defer resp.Body.Close()
assert.Equal(404, resp.StatusCode)
assert.Equal(0, stub.DeletesCnt())
}
func TestObjectStoreInitiateMultipartUpload(t *testing.T) {
require := require.New(t)
stub, ts := StartObjectStore()
defer ts.Close()
path := "/my-multipart"
err := stub.InitiateMultipartUpload(path)
require.NoError(err)
err = stub.InitiateMultipartUpload(path)
require.Error(err, "second attempt to open the same MultipartUpload")
}
func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
stub, ts := StartObjectStore()
defer ts.Close()
objectURL := ts.URL + ObjectPath
parts := []struct {
number int
content string
contentMD5 string
}{
{
number: 1,
content: "first part",
contentMD5: "550cf6b6e60f65a0e3104a26e70fea42",
}, {
number: 2,
content: "second part",
contentMD5: "920b914bca0a70780b40881b8f376135",
},
}
stub.InitiateMultipartUpload(ObjectPath)
require.True(stub.IsMultipartUpload(ObjectPath))
assert.Equal(0, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
// Workhorse knows nothing about S3 MultipartUpload, it receives some URLs
// from GitLab-rails and PUTs chunk of data to each of them.
// Then it completes the upload with a final POST
partPutURLs := []string{
fmt.Sprintf("%s?partNumber=%d", objectURL, 1),
fmt.Sprintf("%s?partNumber=%d", objectURL, 2),
}
completePostURL := objectURL
for i, partPutURL := range partPutURLs {
part := parts[i]
require.NoError(doRequest(http.MethodPut, partPutURL, strings.NewReader(part.content)))
assert.Equal(i+1, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.Equal(part.contentMD5, stub.multipart[ObjectPath][part.number], "Part %d was not uploaded into ObjectStorage", part.number)
assert.Empty(stub.GetObjectMD5(ObjectPath), "Part %d was mistakenly uploaded as a single object", part.number)
assert.True(stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
}
completeBody := fmt.Sprintf(`<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber>
<ETag>%s</ETag>
</Part>
<Part>
<PartNumber>2</PartNumber>
<ETag>%s</ETag>
</Part>
</CompleteMultipartUpload>`, parts[0].contentMD5, parts[1].contentMD5)
require.NoError(doRequest(http.MethodPost, completePostURL, strings.NewReader(completeBody)))
assert.Equal(len(parts), stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.NotEmpty(stub.GetObjectMD5(ObjectPath), "MultipartUpload not completed")
assert.False(stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress")
}
func TestObjectStoreAbortMultipartUpload(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
stub, ts := StartObjectStore()
defer ts.Close()
stub.InitiateMultipartUpload(ObjectPath)
require.True(stub.IsMultipartUpload(ObjectPath))
assert.Equal(0, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
objectURL := ts.URL + ObjectPath
require.NoError(doRequest(http.MethodPut, fmt.Sprintf("%s?partNumber=%d", objectURL, 1), strings.NewReader(ObjectContent)))
assert.Equal(1, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.Equal(ObjectMD5, stub.multipart[ObjectPath][1], "Part was not uploaded into ObjectStorage")
assert.Empty(stub.GetObjectMD5(ObjectPath), "Part was mistakenly uploaded as a single object")
assert.True(stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
require.NoError(doRequest(http.MethodDelete, objectURL, nil))
assert.Equal(1, stub.PutsCnt())
assert.Equal(1, stub.DeletesCnt())
assert.Empty(stub.GetObjectMD5(ObjectPath), "MultiUpload has been completed")
assert.False(stub.IsMultipartUpload(ObjectPath), "MultiUpload is still in progress")
}
package objectstore
import (
"context"
"io"
"net/http"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// uploader is an io.WriteCloser that can be used as write end of the uploading pipe.
type uploader struct {
// writeCloser is the writer bound to the request body
io.WriteCloser
// uploadError is the last error occourred during upload
uploadError error
// ctx is the internal context bound to the upload request
ctx context.Context
}
func newUploader(ctx context.Context, w io.WriteCloser) uploader {
return uploader{WriteCloser: w, ctx: ctx}
}
// Close implements the standard io.Closer interface: it closes the http client request.
// This method will also wait for the connection to terminate and return any error occurred during the upload
func (u *uploader) Close() error {
if err := u.WriteCloser.Close(); err != nil {
return err
}
<-u.ctx.Done()
if err := u.ctx.Err(); err == context.DeadlineExceeded {
return err
}
return u.uploadError
}
// syncAndDelete wait for Context to be Done and then performs the requested HTTP call
func (u *uploader) syncAndDelete(url string) {
if url == "" {
return
}
<-u.ctx.Done()
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
log.WithError(err).WithField("object", helper.ScrubURLParams(url)).Warning("Delete failed")
return
}
// here we are not using u.ctx because we must perform cleanup regardless of parent context
resp, err := httpClient.Do(req)
if err != nil {
log.WithError(err).WithField("object", helper.ScrubURLParams(url)).Warning("Delete failed")
return
}
resp.Body.Close()
}
......@@ -115,6 +115,9 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
fh, err := filestore.SaveFileFromReader(ctx, p, -1, opts)
if err != nil {
if err == filestore.ErrEntityTooLarge {
return err
}
return fmt.Errorf("Persisting multipart file: %v", err)
}
......
......@@ -35,9 +35,12 @@ func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, p
// Rewrite multipart form data
err := rewriteFormFilesFromMultipart(r, writer, preauth, filter)
if err != nil {
if err == http.ErrNotMultipart {
switch err {
case http.ErrNotMultipart:
h.ServeHTTP(w, r)
} else {
case filestore.ErrEntityTooLarge:
helper.RequestEntityTooLarge(w, r, err)
default:
helper.Fail500(w, r, fmt.Errorf("handleFileUploads: extract files from multipart: %v", err))
}
return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment