Commit cb807306 authored by Stan Hu's avatar Stan Hu

Refactor uploaders to use different upload strategies

Previously it was particularly tricky to add a new object storage method
because you had to be aware of how to deal with different Goroutines and
contexts to handle the Workhorse upload flow
(https://docs.gitlab.com/ee/development/uploads.html#direct-upload). In
addition, the execution engine to handle this was duplicated across
multiple files. The execution engine essentially did the following:

1. Set up an upload context with a deadline
2. Record upload metrics
3. Initialize cleanup functions
4. Initiate upload
5. Validate upload ETag
6. Do cleanup (e.g. delete the temporary file)

To reduce code duplication and to make it easier to add new object
stores, the common execution sequence is now encapsulated in the
`uploader` `Execute()` method. We also introduce an `UploadStrategy`
interface that handles the details of the uploads, and `Execute()` calls
methods on this interface.

Now adding a new object storage type is a matter of implementing the
`UploadStrategy` interface without needing to understand the details of
the execution engine.
parent 8b819cbd
---
title: Refactor uploaders to use different upload strategies
merge_request: 553
author:
type: other
...@@ -22,12 +22,16 @@ var ErrNotEnoughParts = errors.New("not enough Parts") ...@@ -22,12 +22,16 @@ var ErrNotEnoughParts = errors.New("not enough Parts")
// Multipart represents a MultipartUpload on a S3 compatible Object Store service. // Multipart represents a MultipartUpload on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object // It can be used as io.WriteCloser for uploading an object
type Multipart struct { type Multipart struct {
PartURLs []string
// CompleteURL is a presigned URL for CompleteMultipartUpload // CompleteURL is a presigned URL for CompleteMultipartUpload
CompleteURL string CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload // AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string AbortURL string
// DeleteURL is a presigned URL for RemoveObject // DeleteURL is a presigned URL for RemoveObject
DeleteURL string DeleteURL string
PutHeaders map[string]string
partSize int64
etag string
uploader uploader
} }
...@@ -36,35 +40,28 @@ type Multipart struct { ...@@ -36,35 +40,28 @@ type Multipart struct {
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) { func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) {
pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
m := &Multipart{ m := &Multipart{
PartURLs: partURLs,
CompleteURL: completeURL, CompleteURL: completeURL,
AbortURL: abortURL, AbortURL: abortURL,
DeleteURL: deleteURL, DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw), PutHeaders: putHeaders,
partSize: partSize,
} }
go m.trackUploadTime() m.uploader = newUploader(m)
go m.cleanup(ctx) m.Execute(ctx, deadline)
objectStorageUploadsOpen.Inc() return m, nil
}
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(m.uploadError)
}()
func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
cmu := &CompleteMultipartUpload{} cmu := &CompleteMultipartUpload{}
for i, partURL := range partURLs { for i, partURL := range m.PartURLs {
src := io.LimitReader(pr, partSize) src := io.LimitReader(r, m.partSize)
part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1) part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
if err != nil { if err != nil {
m.uploadError = err return err
return
} }
if part == nil { if part == nil {
break break
...@@ -73,93 +70,33 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, ...@@ -73,93 +70,33 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL,
} }
} }
n, err := io.Copy(ioutil.Discard, pr) n, err := io.Copy(ioutil.Discard, r)
if err != nil { if err != nil {
m.uploadError = fmt.Errorf("drain pipe: %v", err) return fmt.Errorf("drain pipe: %v", err)
return
} }
if n > 0 { if n > 0 {
m.uploadError = ErrNotEnoughParts return ErrNotEnoughParts
return
} }
if err := m.complete(cmu); err != nil { if err := m.complete(ctx, cmu); err != nil {
m.uploadError = err return err
return
} }
}()
return m, nil return nil
} }
func (m *Multipart) trackUploadTime() { func (m *Multipart) ETag() string {
started := time.Now() return m.etag
<-m.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
} }
func (m *Multipart) Abort() {
func (m *Multipart) cleanup(ctx context.Context) { deleteURL(m.AbortURL)
// wait for the upload to finish
<-m.ctx.Done()
if m.uploadError != nil {
objectStorageUploadRequestsRequestFailed.Inc()
m.abort()
return
}
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it.
m.delete()
} }
func (m *Multipart) complete(cmu *CompleteMultipartUpload) error { func (m *Multipart) Delete() {
body, err := xml.Marshal(cmu) deleteURL(m.DeleteURL)
if err != nil {
return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
}
req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
}
req.ContentLength = int64(len(body))
req.Header.Set("Content-Type", "application/xml")
req = req.WithContext(m.ctx)
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
}
result := &compoundCompleteMultipartUploadResult{}
decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(&result); err != nil {
return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
}
if result.isError() {
return result
}
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("empty CompleteMultipartUploadResult")
}
m.extractETag(result.ETag)
return nil
} }
func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
file, err := ioutil.TempFile("", "part-buffer") file, err := ioutil.TempFile("", "part-buffer")
if err != nil { if err != nil {
return nil, fmt.Errorf("create temporary buffer file: %v", err) return nil, fmt.Errorf("create temporary buffer file: %v", err)
...@@ -182,20 +119,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]s ...@@ -182,20 +119,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]s
return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err)
} }
etag, err := m.uploadPart(partURL, putHeaders, file, n) etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n)
if err != nil { if err != nil {
return nil, fmt.Errorf("upload part %d: %v", partNumber, err) return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
} }
return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
} }
func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) { func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
deadline, ok := m.ctx.Deadline() deadline, ok := ctx.Deadline()
if !ok { if !ok {
return "", fmt.Errorf("missing deadline") return "", fmt.Errorf("missing deadline")
} }
part, err := newObject(m.ctx, url, "", headers, deadline, size, false) part, err := newObject(ctx, url, "", headers, deadline, size, false)
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -213,10 +150,45 @@ func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Re ...@@ -213,10 +150,45 @@ func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Re
return part.ETag(), nil return part.ETag(), nil
} }
func (m *Multipart) delete() { func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
m.syncAndDelete(m.DeleteURL) body, err := xml.Marshal(cmu)
} if err != nil {
return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
}
req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
}
req.ContentLength = int64(len(body))
req.Header.Set("Content-Type", "application/xml")
req = req.WithContext(ctx)
func (m *Multipart) abort() { resp, err := httpClient.Do(req)
m.syncAndDelete(m.AbortURL) if err != nil {
return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
}
result := &compoundCompleteMultipartUploadResult{}
decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(&result); err != nil {
return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
}
if result.isError() {
return result
}
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("empty CompleteMultipartUploadResult")
}
m.etag = extractETag(result.ETag)
return nil
} }
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"strings"
"time" "time"
"gitlab.com/gitlab-org/labkit/correlation" "gitlab.com/gitlab-org/labkit/correlation"
...@@ -36,109 +35,82 @@ var httpClient = &http.Client{ ...@@ -36,109 +35,82 @@ var httpClient = &http.Client{
Transport: httpTransport, Transport: httpTransport,
} }
type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service. // Object represents an object on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object // It can be used as io.WriteCloser for uploading an object
type Object struct { type Object struct {
// PutURL is a presigned URL for PutObject // putURL is a presigned URL for PutObject
PutURL string putURL string
// DeleteURL is a presigned URL for RemoveObject // deleteURL is a presigned URL for RemoveObject
DeleteURL string deleteURL string
putHeaders map[string]string
size int64
etag string
metrics bool
uploader uploader
} }
type StatusCodeError error
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. // NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) { func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true) return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true)
} }
func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) { func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now()
pr, pw := io.Pipe()
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr))
if err != nil {
if metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
return nil, fmt.Errorf("PUT %q: %v", mask.URL(putURL), err)
}
req.ContentLength = size
for k, v := range putHeaders {
req.Header.Set(k, v)
}
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{ o := &Object{
PutURL: putURL, putURL: putURL,
DeleteURL: deleteURL, deleteURL: deleteURL,
uploader: newMD5Uploader(uploadCtx, pw), putHeaders: putHeaders,
size: size,
metrics: metrics,
} }
if metrics { o.uploader = newMD5Uploader(o, metrics)
objectStorageUploadsOpen.Inc() o.Execute(ctx, deadline)
}
go func() { return o, nil
// wait for the upload to finish }
<-o.ctx.Done()
if metrics {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
// wait for provided context to finish before performing cleanup func (o *Object) Upload(ctx context.Context, r io.Reader) error {
<-ctx.Done() // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
o.delete() req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r))
}()
go func() { if err != nil {
defer cancelFn() return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err)
if metrics {
defer objectStorageUploadsOpen.Dec()
} }
defer func() { req.ContentLength = o.size
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
req = req.WithContext(o.ctx) for k, v := range o.putHeaders {
req.Header.Set(k, v)
}
resp, err := httpClient.Do(req) resp, err := httpClient.Do(req)
if err != nil { if err != nil {
if metrics { return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err)
objectStorageUploadRequestsRequestFailed.Inc()
}
o.uploadError = fmt.Errorf("PUT request %q: %v", mask.URL(o.PutURL), err)
return
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
if metrics { if o.metrics {
objectStorageUploadRequestsInvalidStatus.Inc() objectStorageUploadRequestsInvalidStatus.Inc()
} }
o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.PutURL), resp.Status)) return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status))
return
} }
o.extractETag(resp.Header.Get("ETag")) o.etag = extractETag(resp.Header.Get("ETag"))
o.uploadError = compareMD5(o.md5Sum(), o.etag)
}()
return o, nil return nil
} }
func (o *Object) delete() { func (o *Object) ETag() string {
o.syncAndDelete(o.DeleteURL) return o.etag
} }
func compareMD5(local, remote string) error { func (o *Object) Abort() {
if !strings.EqualFold(local, remote) { o.Delete()
return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) }
}
return nil func (o *Object) Delete() {
deleteURL(o.deleteURL)
} }
...@@ -5,23 +5,36 @@ import ( ...@@ -5,23 +5,36 @@ import (
"io" "io"
"time" "time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
) )
type S3Object struct { type S3Object struct {
credentials config.S3Credentials credentials config.S3Credentials
config config.S3Config config config.S3Config
objectName string objectName string
uploaded bool
uploader uploader
} }
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) {
o := &S3Object{
credentials: s3Credentials,
config: s3Config,
objectName: objectName,
}
o.uploader = newUploader(o)
o.Execute(ctx, deadline)
return o, nil
}
func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) { func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) {
if s3Config.ServerSideEncryption != "" { if s3Config.ServerSideEncryption != "" {
input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption) input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption)
...@@ -32,88 +45,48 @@ func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config ...@@ -32,88 +45,48 @@ func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config
} }
} }
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) { func (s *S3Object) Upload(ctx context.Context, r io.Reader) error {
pr, pw := io.Pipe() sess, err := setupS3Session(s.credentials, s.config)
objectStorageUploadsOpen.Inc()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &S3Object{
uploader: newUploader(uploadCtx, pw),
credentials: s3Credentials,
config: s3Config,
}
go o.trackUploadTime()
go o.cleanup(ctx)
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
sess, err := setupS3Session(s3Credentials, s3Config)
if err != nil { if err != nil {
o.uploadError = err
log.WithError(err).Error("error creating S3 session") log.WithError(err).Error("error creating S3 session")
return return err
} }
o.objectName = objectName
uploader := s3manager.NewUploader(sess) uploader := s3manager.NewUploader(sess)
input := &s3manager.UploadInput{ input := &s3manager.UploadInput{
Bucket: aws.String(s3Config.Bucket), Bucket: aws.String(s.config.Bucket),
Key: aws.String(objectName), Key: aws.String(s.objectName),
Body: pr, Body: r,
} }
setEncryptionOptions(input, s3Config) setEncryptionOptions(input, s.config)
_, err = uploader.UploadWithContext(uploadCtx, input) _, err = uploader.UploadWithContext(ctx, input)
if err != nil { if err != nil {
o.uploadError = err
objectStorageUploadRequestsRequestFailed.Inc()
log.WithError(err).Error("error uploading S3 session") log.WithError(err).Error("error uploading S3 session")
return return err
} }
}()
return o, nil s.uploaded = true
}
func (o *S3Object) trackUploadTime() { return nil
started := time.Now()
<-o.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
} }
func (o *S3Object) cleanup(ctx context.Context) { func (s *S3Object) ETag() string {
// wait for the upload to finish return ""
<-o.ctx.Done() }
if o.uploadError != nil {
objectStorageUploadRequestsRequestFailed.Inc()
o.delete()
return
}
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it. func (s *S3Object) Abort() {
o.delete() s.Delete()
} }
func (o *S3Object) delete() { func (s *S3Object) Delete() {
if o.objectName == "" { if !s.uploaded {
return return
} }
session, err := setupS3Session(o.credentials, o.config) session, err := setupS3Session(s.credentials, s.config)
if err != nil { if err != nil {
log.WithError(err).Error("error setting up S3 session in delete") log.WithError(err).Error("error setting up S3 session in delete")
return return
...@@ -121,8 +94,8 @@ func (o *S3Object) delete() { ...@@ -121,8 +94,8 @@ func (o *S3Object) delete() {
svc := s3.New(session) svc := s3.New(session)
input := &s3.DeleteObjectInput{ input := &s3.DeleteObjectInput{
Bucket: aws.String(o.config.Bucket), Bucket: aws.String(s.config.Bucket),
Key: aws.String(o.objectName), Key: aws.String(s.objectName),
} }
// Note we can't use the request context because in a successful // Note we can't use the request context because in a successful
......
package objectstore
import (
"context"
"io"
"net/http"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/labkit/mask"
)
type uploadStrategy interface {
Upload(ctx context.Context, r io.Reader) error
ETag() string
Abort()
Delete()
}
func deleteURL(url string) {
if url == "" {
return
}
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
return
}
// TODO: consider adding the context to the outgoing request for better instrumentation
// here we are not using u.ctx because we must perform cleanup regardless of parent context
resp, err := httpClient.Do(req)
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
return
}
resp.Body.Close()
}
func extractETag(rawETag string) string {
if rawETag != "" && rawETag[0] == '"' {
rawETag = rawETag[1 : len(rawETag)-1]
}
return rawETag
}
...@@ -4,12 +4,11 @@ import ( ...@@ -4,12 +4,11 @@ import (
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"fmt"
"hash" "hash"
"io" "io"
"net/http" "strings"
"time"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/labkit/mask"
) )
// Upload represents an upload to an ObjectStorage provider // Upload represents an upload to an ObjectStorage provider
...@@ -33,16 +32,23 @@ type uploader struct { ...@@ -33,16 +32,23 @@ type uploader struct {
uploadError error uploadError error
// ctx is the internal context bound to the upload request // ctx is the internal context bound to the upload request
ctx context.Context ctx context.Context
pr *io.PipeReader
pw *io.PipeWriter
strategy uploadStrategy
metrics bool
} }
func newUploader(ctx context.Context, w io.WriteCloser) uploader { func newUploader(strategy uploadStrategy) uploader {
return uploader{w: w, c: w, ctx: ctx} pr, pw := io.Pipe()
return uploader{w: pw, c: pw, pr: pr, pw: pw, strategy: strategy, metrics: true}
} }
func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader { func newMD5Uploader(strategy uploadStrategy, metrics bool) uploader {
pr, pw := io.Pipe()
hasher := md5.New() hasher := md5.New()
mw := io.MultiWriter(w, hasher) mw := io.MultiWriter(pw, hasher)
return uploader{w: mw, c: w, md5: hasher, ctx: ctx} return uploader{w: mw, c: pw, pr: pr, pw: pw, md5: hasher, strategy: strategy, metrics: metrics}
} }
// Close implements the standard io.Closer interface: it closes the http client request. // Close implements the standard io.Closer interface: it closes the http client request.
...@@ -65,50 +71,100 @@ func (u *uploader) Write(p []byte) (int, error) { ...@@ -65,50 +71,100 @@ func (u *uploader) Write(p []byte) (int, error) {
return u.w.Write(p) return u.w.Write(p)
} }
// syncAndDelete wait for Context to be Done and then performs the requested HTTP call func (u *uploader) md5Sum() string {
func (u *uploader) syncAndDelete(url string) { if u.md5 == nil {
if url == "" { return ""
return
} }
checksum := u.md5.Sum(nil)
return hex.EncodeToString(checksum)
}
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header.
// This method will wait until upload context is done before returning.
func (u *uploader) ETag() string {
<-u.ctx.Done() <-u.ctx.Done()
req, err := http.NewRequest("DELETE", url, nil) return u.etag
}
func (u *uploader) Execute(ctx context.Context, deadline time.Time) {
if u.metrics {
objectStorageUploadsOpen.Inc()
}
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
u.ctx = uploadCtx
if u.metrics {
go u.trackUploadTime()
}
go u.cleanup(ctx)
go func() {
defer cancelFn()
if u.metrics {
defer objectStorageUploadsOpen.Dec()
}
defer func() {
// This will be returned as error to the next write operation on the pipe
u.pr.CloseWithError(u.uploadError)
}()
err := u.strategy.Upload(uploadCtx, u.pr)
if err != nil { if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") u.uploadError = err
if u.metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
return return
} }
// TODO: consider adding the context to the outgoing request for better instrumentation
// here we are not using u.ctx because we must perform cleanup regardless of parent context u.etag = u.strategy.ETag()
resp, err := httpClient.Do(req)
if u.md5 != nil {
err := compareMD5(u.md5Sum(), u.etag)
if err != nil { if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") u.uploadError = err
return if u.metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
} }
resp.Body.Close() }
}()
} }
func (u *uploader) extractETag(rawETag string) { func (u *uploader) trackUploadTime() {
if rawETag != "" && rawETag[0] == '"' { started := time.Now()
rawETag = rawETag[1 : len(rawETag)-1] <-u.ctx.Done()
if u.metrics {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
} }
u.etag = rawETag
} }
func (u *uploader) md5Sum() string { func (u *uploader) cleanup(ctx context.Context) {
if u.md5 == nil { // wait for the upload to finish
return "" <-u.ctx.Done()
if u.uploadError != nil {
if u.metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
u.strategy.Abort()
return
} }
checksum := u.md5.Sum(nil) // We have now successfully uploaded the file to object storage. Another
return hex.EncodeToString(checksum) // goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it.
u.strategy.Delete()
} }
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header. func compareMD5(local, remote string) error {
// This method will wait until upload context is done before returning. if !strings.EqualFold(local, remote) {
func (u *uploader) ETag() string { return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
<-u.ctx.Done() }
return u.etag return nil
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment