Commit 23b09948 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-object-storage-put-headers' into 'master'

Support adding PUT headers for object storage from Rails

See merge request gitlab-org/gitlab-workhorse!297
parents 78342712 7231f85e
......@@ -85,6 +85,10 @@ type RemoteObject struct {
DeleteURL string
// StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file
StoreURL string
// Boolean to indicate whether to use headers included in PutHeaders
CustomPutHeaders bool
// PutHeaders are HTTP headers (e.g. Content-Type) to be sent with StoreURL
PutHeaders map[string]string
// ID is a unique identifier of object storage upload
ID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL
......
......@@ -100,14 +100,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}()
if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.Deadline, opts.PartSize)
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
if err != nil {
return nil, err
}
writers = append(writers, remoteWriter)
} else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Deadline, size)
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
if err != nil {
return nil, err
}
......
......@@ -23,6 +23,9 @@ type SaveFileOpts struct {
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
// HTTP headers to be sent along with PUT request
PutHeaders map[string]string
// Deadline it the S3 operation deadline, the upload will be aborted if not completed in time
Deadline time.Time
......@@ -65,9 +68,17 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts {
RemoteURL: apiResponse.RemoteObject.GetURL,
PresignedPut: apiResponse.RemoteObject.StoreURL,
PresignedDelete: apiResponse.RemoteObject.DeleteURL,
PutHeaders: apiResponse.RemoteObject.PutHeaders,
Deadline: time.Now().Add(timeout),
}
// Backwards compatibility to ensure API servers that do not include the
// CustomPutHeaders flag will default to the original content type.
if !apiResponse.RemoteObject.CustomPutHeaders {
opts.PutHeaders = make(map[string]string)
opts.PutHeaders["Content-Type"] = "application/octet-stream"
}
if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil {
opts.PartSize = multiParams.PartSize
opts.PresignedCompleteMultipart = multiParams.CompleteURL
......
......@@ -76,8 +76,10 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
func TestGetOpts(t *testing.T) {
tests := []struct {
name string
multipart *api.MultipartUploadParams
name string
multipart *api.MultipartUploadParams
customPutHeaders bool
putHeaders map[string]string
}{
{
name: "Single upload",
......@@ -90,6 +92,21 @@ func TestGetOpts(t *testing.T) {
PartURLs: []string{"http://part1", "http://part2"},
},
},
{
name: "Single upload with custom content type",
customPutHeaders: true,
putHeaders: map[string]string{"Content-Type": "image/jpeg"},
}, {
name: "Multipart upload with custom content type",
multipart: &api.MultipartUploadParams{
PartSize: 10,
CompleteURL: "http://complete",
AbortURL: "http://abort",
PartURLs: []string{"http://part1", "http://part2"},
},
customPutHeaders: true,
putHeaders: map[string]string{"Content-Type": "image/jpeg"},
},
}
for _, test := range tests {
......@@ -99,12 +116,14 @@ func TestGetOpts(t *testing.T) {
apiResponse := &api.Response{
TempPath: "/tmp",
RemoteObject: api.RemoteObject{
Timeout: 10,
ID: "id",
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
MultipartUpload: test.multipart,
Timeout: 10,
ID: "id",
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
MultipartUpload: test.multipart,
CustomPutHeaders: test.customPutHeaders,
PutHeaders: test.putHeaders,
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
......@@ -116,6 +135,12 @@ func TestGetOpts(t *testing.T) {
assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
if test.customPutHeaders {
assert.Equal(opts.PutHeaders, apiResponse.RemoteObject.PutHeaders)
} else {
assert.Equal(opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"})
}
if test.multipart == nil {
assert.False(opts.IsMultipart())
assert.Empty(opts.PresignedCompleteMultipart)
......
......@@ -36,7 +36,7 @@ type Multipart struct {
// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, deadline time.Time, partSize int64) (*Multipart, error) {
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) {
pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
m := &Multipart{
......@@ -62,7 +62,7 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL,
cmu := &CompleteMultipartUpload{}
for i, partURL := range partURLs {
src := io.LimitReader(pr, partSize)
part, err := m.readAndUploadOnePart(partURL, src, i+1)
part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1)
if err != nil {
m.uploadError = err
return
......@@ -175,7 +175,7 @@ func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error {
return nil
}
func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
file, err := ioutil.TempFile("", "part-buffer")
if err != nil {
return nil, fmt.Errorf("Unable to create a temporary file for buffering: %v", err)
......@@ -198,20 +198,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumb
return nil, fmt.Errorf("Cannot rewind part %d temporary dump : %v", partNumber, err)
}
etag, err := m.uploadPart(partURL, file, n)
etag, err := m.uploadPart(partURL, putHeaders, file, n)
if err != nil {
return nil, fmt.Errorf("Cannot upload part %d: %v", partNumber, err)
}
return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
}
func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string, error) {
func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) {
deadline, ok := m.ctx.Deadline()
if !ok {
return "", fmt.Errorf("Missing deadline")
}
part, err := newObject(m.ctx, url, "", deadline, size, false)
part, err := newObject(m.ctx, url, "", headers, deadline, size, false)
if err != nil {
return "", err
}
......
......@@ -47,11 +47,11 @@ type Object struct {
}
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, deadline, size, true)
func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true)
}
func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64, metrics bool) (*Object, error) {
func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now()
pr, pw := io.Pipe()
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
......@@ -63,7 +63,10 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err)
}
req.ContentLength = size
req.Header.Set("Content-Type", "application/octet-stream")
for k, v := range putHeaders {
req.Header.Set(k, v)
}
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{
......
......@@ -18,7 +18,7 @@ import (
const testTimeout = 10 * time.Second
func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool, contentType string) {
assert := assert.New(t)
osStub, ts := test.StartObjectStore()
......@@ -30,11 +30,13 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
deleteURL = objectURL
}
putHeaders := map[string]string{"Content-Type": contentType}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
deadline := time.Now().Add(testTimeout)
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, deadline, test.ObjectSize)
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, putHeaders, deadline, test.ObjectSize)
require.NoError(t, err)
// copy data
......@@ -46,6 +48,8 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
err = object.Close()
assert.NoError(err)
assert.Equal(contentType, osStub.GetHeader(test.ObjectPath, "Content-Type"))
// Checking MD5 extraction
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag())
......@@ -73,8 +77,9 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
}
func TestObjectUpload(t *testing.T) {
t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true) })
t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false) })
t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true, "application/octet-stream") })
t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false, "application/octet-stream") })
t.Run("with custom content type", func(t *testing.T) { testObjectUploadNoErrors(t, false, "image/jpeg") })
}
func TestObjectUpload404(t *testing.T) {
......@@ -89,7 +94,7 @@ func TestObjectUpload404(t *testing.T) {
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, test.ObjectSize)
object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, test.ObjectSize)
require.NoError(err)
_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
......@@ -134,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) {
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, -1)
object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, -1)
require.NoError(t, err)
_, copyErr := io.Copy(object, &endlessReader{})
......
......@@ -27,6 +27,8 @@ type ObjectstoreStub struct {
overwriteMD5 map[string]string
// multipart is a map of MultipartUploads
multipart map[string]partsEtagMap
// HTTP header sent along request
headers map[string]*http.Header
puts int
deletes int
......@@ -45,6 +47,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu
bucket: make(map[string]string),
multipart: make(map[string]partsEtagMap),
overwriteMD5: make(map[string]string),
headers: make(map[string]*http.Header),
}
for k, v := range md5Hashes {
......@@ -79,6 +82,18 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string {
return o.bucket[path]
}
// GetHeader returns a given HTTP header of the object uploaded to the path
func (o *ObjectstoreStub) GetHeader(path, key string) string {
o.m.Lock()
defer o.m.Unlock()
if val, ok := o.headers[path]; ok {
return val.Get(key)
}
return ""
}
// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
// It will return an error if a MultipartUpload is already in progress on that path
// InitiateMultipartUpload is only used during test setup.
......@@ -146,6 +161,7 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
etag = hex.EncodeToString(checksum)
}
o.headers[objectPath] = &r.Header
o.puts++
if o.isMultipartUpload(objectPath) {
pNumber := r.URL.Query().Get("partNumber")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment