Commit d134ecd0 authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch 'object-etag' into 'master'

Objectstorage ETag checking

See merge request gitlab-org/gitlab-workhorse!263
parents a8ade9b0 3ad81e62
...@@ -83,7 +83,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string { ...@@ -83,7 +83,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done // SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails. // Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) { func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
var remoteWriter io.WriteCloser var remoteWriter objectstore.Upload
fh = &FileHandler{ fh = &FileHandler{
Name: opts.TempFilePrefix, Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID, RemoteID: opts.RemoteID,
...@@ -149,6 +149,9 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -149,6 +149,9 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
} }
return nil, err return nil, err
} }
etag := remoteWriter.ETag()
fh.hashes["etag"] = etag
} }
return fh, err return fh, err
......
...@@ -80,6 +80,52 @@ func TestSaveFromDiskNotExistingFile(t *testing.T) { ...@@ -80,6 +80,52 @@ func TestSaveFromDiskNotExistingFile(t *testing.T) {
assert.Nil(fh, "On error FileHandler should be nil") assert.Nil(fh, "On error FileHandler should be nil")
} }
func TestSaveFileWrongETag(t *testing.T) {
tests := []struct {
name string
multipart bool
}{
{name: "single part"},
{name: "multi part", multipart: true},
}
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t)
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
objectURL := ts.URL + test.ObjectPath
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
Deadline: testDeadline(),
}
if spec.multipart {
opts.PresignedParts = []string{objectURL + "?partNumber=1"}
opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig"
opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig"
opts.PartSize = test.ObjectSize
osStub.InitiateMultipartUpload(test.ObjectPath)
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
assert.False(spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now")
})
}
}
func TestSaveFileFromDiskToLocalPath(t *testing.T) { func TestSaveFileFromDiskToLocalPath(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t) require := require.New(t)
...@@ -111,7 +157,8 @@ func TestSaveFileFromDiskToLocalPath(t *testing.T) { ...@@ -111,7 +157,8 @@ func TestSaveFileFromDiskToLocalPath(t *testing.T) {
func TestSaveFile(t *testing.T) { func TestSaveFile(t *testing.T) {
type remote int type remote int
const ( const (
remoteSingle remote = iota notRemote remote = iota
remoteSingle
remoteMultipart remoteMultipart
) )
...@@ -221,6 +268,11 @@ func TestSaveFile(t *testing.T) { ...@@ -221,6 +268,11 @@ func TestSaveFile(t *testing.T) {
assert.Equal(test.ObjectSHA1, fields["file.sha1"]) assert.Equal(test.ObjectSHA1, fields["file.sha1"])
assert.Equal(test.ObjectSHA256, fields["file.sha256"]) assert.Equal(test.ObjectSHA256, fields["file.sha256"])
assert.Equal(test.ObjectSHA512, fields["file.sha512"]) assert.Equal(test.ObjectSHA512, fields["file.sha512"])
if spec.remote == notRemote {
assert.NotContains(fields, "file.etag")
} else {
assert.Contains(fields, "file.etag")
}
}) })
} }
} }
......
...@@ -151,6 +151,27 @@ func (m *Multipart) complete(cmu *CompleteMultipartUpload) error { ...@@ -151,6 +151,27 @@ func (m *Multipart) complete(cmu *CompleteMultipartUpload) error {
return result return result
} }
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("Cannot read CompleteMultipartUpload answer")
}
m.extractETag(result.ETag)
if err := m.verifyETag(cmu); err != nil {
return fmt.Errorf("ETag verification failure: %v", err)
}
return nil
}
func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error {
expectedChecksum, err := cmu.BuildMultipartUploadETag()
if err != nil {
return err
}
if expectedChecksum != m.etag {
return fmt.Errorf("got %q expected %q", m.etag, expectedChecksum)
}
return nil return nil
} }
...@@ -205,7 +226,7 @@ func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string, ...@@ -205,7 +226,7 @@ func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string,
return "", err return "", err
} }
return part.MD5(), nil return part.ETag(), nil
} }
func (m *Multipart) delete() { func (m *Multipart) delete() {
......
...@@ -42,8 +42,6 @@ type Object struct { ...@@ -42,8 +42,6 @@ type Object struct {
PutURL string PutURL string
// DeleteURL is a presigned URL for RemoveObject // DeleteURL is a presigned URL for RemoveObject
DeleteURL string DeleteURL string
// md5 is the checksum provided by the Object Store
md5 string
uploader uploader
} }
...@@ -71,7 +69,7 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time ...@@ -71,7 +69,7 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
o := &Object{ o := &Object{
PutURL: putURL, PutURL: putURL,
DeleteURL: deleteURL, DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw), uploader: newMD5Uploader(uploadCtx, pw),
} }
if metrics { if metrics {
...@@ -120,28 +118,16 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time ...@@ -120,28 +118,16 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
return return
} }
o.extractMD5(resp.Header) o.extractETag(resp.Header.Get("ETag"))
if o.etag != o.md5Sum() {
o.uploadError = fmt.Errorf("ETag mismatch. expected %q got %q", o.md5Sum(), o.etag)
return
}
}() }()
return o, nil return o, nil
} }
// MD5 returns the md5sum of the uploaded returned by the Object Store provider via ETag Header.
// This method will wait until upload context is done before returning.
func (o *Object) MD5() string {
<-o.ctx.Done()
return o.md5
}
func (o *Object) extractMD5(h http.Header) {
etag := h.Get("ETag")
if etag != "" && etag[0] == '"' {
etag = etag[1 : len(etag)-1]
}
o.md5 = etag
}
func (o *Object) delete() { func (o *Object) delete() {
o.syncAndDelete(o.DeleteURL) o.syncAndDelete(o.DeleteURL)
} }
...@@ -47,7 +47,7 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { ...@@ -47,7 +47,7 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
assert.NoError(err) assert.NoError(err)
// Checking MD5 extraction // Checking MD5 extraction
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.MD5()) assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag())
// Checking cleanup // Checking cleanup
cancel() cancel()
......
package objectstore package objectstore
import ( import (
"crypto/md5"
"encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
) )
...@@ -49,3 +51,25 @@ type compoundCompleteMultipartUploadResult struct { ...@@ -49,3 +51,25 @@ type compoundCompleteMultipartUploadResult struct {
func (c *compoundCompleteMultipartUploadResult) isError() bool { func (c *compoundCompleteMultipartUploadResult) isError() bool {
return c.CompleteMultipartUploadError != nil return c.CompleteMultipartUploadError != nil
} }
// BuildMultipartUploadETag creates an S3 compatible ETag for MultipartUpload
// Given the MD5 hash for each uploaded part of the file, concatenate
// the hashes into a single binary string and calculate the MD5 hash of that result,
// the append "-len(etags)"
// http://permalink.gmane.org/gmane.comp.file-systems.s3.s3tools/583
func (cmu *CompleteMultipartUpload) BuildMultipartUploadETag() (string, error) {
hasher := md5.New()
for _, part := range cmu.Part {
checksum, err := hex.DecodeString(part.ETag)
if err != nil {
return "", err
}
_, err = hasher.Write(checksum)
if err != nil {
return "", err
}
}
multipartChecksum := hasher.Sum(nil)
return fmt.Sprintf("%s-%d", hex.EncodeToString(multipartChecksum), len(cmu.Part)), nil
}
package objectstore
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMultipartUploadETag(t *testing.T) {
cmu := CompleteMultipartUpload{
Part: []*completeMultipartUploadPart{
{PartNumber: 1, ETag: "550cf6b6e60f65a0e3104a26e70fea42"},
{PartNumber: 2, ETag: "920b914bca0a70780b40881b8f376135"},
{PartNumber: 3, ETag: "175719e13d23c021058bc7376696f26f"},
},
}
expectedETag := "1dc6ab8f946f699770f14f46a8739671-3"
etag, err := cmu.BuildMultipartUploadETag()
require.NoError(t, err)
require.Equal(t, expectedETag, etag)
}
...@@ -204,7 +204,11 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http ...@@ -204,7 +204,11 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http
etag, overwritten := o.overwriteMD5[objectPath] etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten { if !overwritten {
etag = "not an md5 hash" etag, err = msg.BuildMultipartUploadETag()
if err != nil {
http.Error(w, "cannot compute ETag", 400)
return
}
} }
o.bucket[objectPath] = etag o.bucket[objectPath] = etag
......
...@@ -106,6 +106,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) { ...@@ -106,6 +106,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
contentMD5: "920b914bca0a70780b40881b8f376135", contentMD5: "920b914bca0a70780b40881b8f376135",
}, },
} }
expectedETag := "2f2f82eceacf5bd0ac5d7c3d3d388849-2"
stub.InitiateMultipartUpload(ObjectPath) stub.InitiateMultipartUpload(ObjectPath)
...@@ -148,7 +149,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) { ...@@ -148,7 +149,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
assert.Equal(len(parts), stub.PutsCnt()) assert.Equal(len(parts), stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt()) assert.Equal(0, stub.DeletesCnt())
assert.NotEmpty(stub.GetObjectMD5(ObjectPath), "MultipartUpload not completed") assert.Equal(expectedETag, stub.GetObjectMD5(ObjectPath))
assert.False(stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress") assert.False(stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress")
} }
......
...@@ -2,6 +2,9 @@ package objectstore ...@@ -2,6 +2,9 @@ package objectstore
import ( import (
"context" "context"
"crypto/md5"
"encoding/hex"
"hash"
"io" "io"
"net/http" "net/http"
...@@ -10,10 +13,22 @@ import ( ...@@ -10,10 +13,22 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
// Upload represents an upload to an ObjectStorage provider
type Upload interface {
io.WriteCloser
ETag() string
}
// uploader is an io.WriteCloser that can be used as write end of the uploading pipe. // uploader is an io.WriteCloser that can be used as write end of the uploading pipe.
type uploader struct { type uploader struct {
// writeCloser is the writer bound to the request body // etag is the object storage provided checksum
io.WriteCloser etag string
// md5 is an optional hasher for calculating md5 on the fly
md5 hash.Hash
w io.Writer
c io.Closer
// uploadError is the last error occourred during upload // uploadError is the last error occourred during upload
uploadError error uploadError error
...@@ -22,13 +37,19 @@ type uploader struct { ...@@ -22,13 +37,19 @@ type uploader struct {
} }
func newUploader(ctx context.Context, w io.WriteCloser) uploader { func newUploader(ctx context.Context, w io.WriteCloser) uploader {
return uploader{WriteCloser: w, ctx: ctx} return uploader{w: w, c: w, ctx: ctx}
}
func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader {
hasher := md5.New()
mw := io.MultiWriter(w, hasher)
return uploader{w: mw, c: w, md5: hasher, ctx: ctx}
} }
// Close implements the standard io.Closer interface: it closes the http client request. // Close implements the standard io.Closer interface: it closes the http client request.
// This method will also wait for the connection to terminate and return any error occurred during the upload // This method will also wait for the connection to terminate and return any error occurred during the upload
func (u *uploader) Close() error { func (u *uploader) Close() error {
if err := u.WriteCloser.Close(); err != nil { if err := u.c.Close(); err != nil {
return err return err
} }
...@@ -41,6 +62,10 @@ func (u *uploader) Close() error { ...@@ -41,6 +62,10 @@ func (u *uploader) Close() error {
return u.uploadError return u.uploadError
} }
func (u *uploader) Write(p []byte) (int, error) {
return u.w.Write(p)
}
// syncAndDelete wait for Context to be Done and then performs the requested HTTP call // syncAndDelete wait for Context to be Done and then performs the requested HTTP call
func (u *uploader) syncAndDelete(url string) { func (u *uploader) syncAndDelete(url string) {
if url == "" { if url == "" {
...@@ -63,3 +88,27 @@ func (u *uploader) syncAndDelete(url string) { ...@@ -63,3 +88,27 @@ func (u *uploader) syncAndDelete(url string) {
} }
resp.Body.Close() resp.Body.Close()
} }
func (u *uploader) extractETag(rawETag string) {
if rawETag != "" && rawETag[0] == '"' {
rawETag = rawETag[1 : len(rawETag)-1]
}
u.etag = rawETag
}
func (u *uploader) md5Sum() string {
if u.md5 == nil {
return ""
}
checksum := u.md5.Sum(nil)
return hex.EncodeToString(checksum)
}
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header.
// This method will wait until upload context is done before returning.
func (u *uploader) ETag() string {
<-u.ctx.Done()
return u.etag
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment