Commit 0671c96d authored by Stan Hu's avatar Stan Hu

Support Workhorse directly uploading files to S3

This adds the AWS client directly to Workhorse and a new configuration
section for specifying credentials. This makes it possible to use S3
buckets with KMS encryption and proper MD5 checksums.

This is disabled by default. For this to be used:

1. GitLab Rails needs to send the `UseWorkhorseClient` and
`RemoteTempObjectID` in the `/authorize`
endpoint. (https://gitlab.com/gitlab-org/gitlab/-/merge_requests/29389)

2. S3 configuration must be specified in `config.toml`, or Rails must be
configured to use IAM instance profiles (`use_iam_profile` in Fog
connection parameters).

S3 sessions are created lazily and cached for 10 minutes to avoid
unnecessary local I/O access. When IAM instance profiles are used, this
also cuts down the number of HTTP requests needed to request AWS
credentials.

Related issues:

1. https://gitlab.com/gitlab-org/gitlab-workhorse/issues/222
2. https://gitlab.com/gitlab-org/gitlab-workhorse/issues/185
3. https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/210
parent e1e6ed3d
#!/bin/sh #!/bin/sh
git grep 'context.\(Background\|TODO\)' | \ git grep 'context.\(Background\|TODO\)' | \
grep -v -e '^[^:]*_test\.go:' -e '^vendor/' -e '^_support/' -e '^cmd/[^:]*/main.go' | \ grep -v -e '^[^:]*_test\.go:' -v -e "lint:allow context.Background" -e '^vendor/' -e '^_support/' -e '^cmd/[^:]*/main.go' | \
grep -e '^[^:]*\.go' | \ grep -e '^[^:]*\.go' | \
awk '{ awk '{
print "Found disallowed use of context.Background or TODO" print "Found disallowed use of context.Background or TODO"
......
---
title: Support Workhorse directly uploading files to S3
merge_request: 466
author:
type: added
[redis]
URL = "unix:/home/git/gitlab/redis/redis.socket"
[object_storage]
enabled = false
provider = "AWS"
[object_storage.s3]
aws_access_key_id = "YOUR AWS ACCESS KEY"
aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY"
...@@ -6,6 +6,7 @@ require ( ...@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/FZambia/sentinel v1.0.0 github.com/FZambia/sentinel v1.0.0
github.com/alecthomas/chroma v0.7.3 github.com/alecthomas/chroma v0.7.3
github.com/aws/aws-sdk-go v1.31.7
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/getsentry/raven-go v0.1.2 github.com/getsentry/raven-go v0.1.2
github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721 github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721
...@@ -15,17 +16,19 @@ require ( ...@@ -15,17 +16,19 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/jfbus/httprs v0.0.0-20190827093123-b0af8319bb15 github.com/jfbus/httprs v0.0.0-20190827093123-b0af8319bb15
github.com/johannesboyne/gofakes3 v0.0.0-20200510090907-02d71f533bec
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d
github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_golang v1.0.0
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1
github.com/sebest/xff v0.0.0-20160910043805-6c115e0ffa35 github.com/sebest/xff v0.0.0-20160910043805-6c115e0ffa35
github.com/shabbyrobe/gocovmerge v0.0.0-20190829150210-3e036491d500 // indirect
github.com/sirupsen/logrus v1.3.0 github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
gitlab.com/gitlab-org/gitaly v1.74.0 gitlab.com/gitlab-org/gitaly v1.74.0
gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891 gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/tools v0.0.0-20200117161641-43d50277825c golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375
google.golang.org/grpc v1.24.0 google.golang.org/grpc v1.24.0
gopkg.in/yaml.v2 v2.2.8 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2019.2.3 honnef.co/go/tools v0.0.1-2019.2.3
......
This diff is collapsed.
package config
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestLoadObjectStorageConfig(t *testing.T) {
config := `
[object_storage]
enabled = true
provider = "AWS"
[object_storage.s3]
aws_access_key_id = "minio"
aws_secret_access_key = "gdk-minio"
`
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err := LoadConfig(tmpFile.Name())
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{
Provider: "AWS",
S3Credentials: S3Credentials{
AwsAccessKeyID: "minio",
AwsSecretAccessKey: "gdk-minio",
},
}
require.Equal(t, expected, *cfg.ObjectStorageCredentials)
}
...@@ -11,6 +11,8 @@ import ( ...@@ -11,6 +11,8 @@ import (
"github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab-workhorse/internal/secret"
) )
...@@ -115,7 +117,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -115,7 +117,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
} }
}() }()
if opts.IsMultipart() { useS3Client := opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid()
if useS3Client {
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline)
if err != nil {
return nil, err
}
writers = append(writers, remoteWriter)
} else if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize) remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -154,6 +165,24 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -154,6 +165,24 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size)) return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size))
} }
logger := log.WithContextFields(ctx, log.Fields{
"copied_bytes": fh.Size,
"is_local": opts.IsLocal(),
"is_multipart": opts.IsMultipart(),
"is_remote": opts.IsRemote(),
"remote_id": opts.RemoteID,
"temp_file_prefix": opts.TempFilePrefix,
"use_s3_client": useS3Client,
})
if opts.IsLocal() {
logger = logger.WithField("local_temp_path", opts.LocalTempPath)
} else if useS3Client {
logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID)
}
logger.Info("saved file")
fh.hashes = hashes.finish() fh.hashes = hashes.finish()
if opts.IsRemote() { if opts.IsRemote() {
......
...@@ -275,6 +275,32 @@ func TestSaveFile(t *testing.T) { ...@@ -275,6 +275,32 @@ func TestSaveFile(t *testing.T) {
} }
} }
func TestSaveFileWithWorkhorseClient(t *testing.T) {
s3Creds, s3Config, sess, ts := test.SetupS3(t)
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "AWS",
S3Credentials: s3Creds,
S3Config: s3Config,
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent)
}
func TestSaveMultipartInBodyFailure(t *testing.T) { func TestSaveMultipartInBodyFailure(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
......
package objectstore
import (
"context"
"io"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"gitlab.com/gitlab-org/labkit/log"
)
type S3Object struct {
credentials config.S3Credentials
config config.S3Config
objectName string
uploader
}
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) {
pr, pw := io.Pipe()
objectStorageUploadsOpen.Inc()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &S3Object{
uploader: newUploader(uploadCtx, pw),
credentials: s3Credentials,
config: s3Config,
}
go o.trackUploadTime()
go o.cleanup(ctx)
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
sess, err := setupS3Session(s3Credentials, s3Config)
if err != nil {
o.uploadError = err
log.WithError(err).Error("error creating S3 session")
return
}
o.objectName = objectName
uploader := s3manager.NewUploader(sess)
_, err = uploader.UploadWithContext(uploadCtx, &s3manager.UploadInput{
Bucket: aws.String(s3Config.Bucket),
Key: aws.String(objectName),
Body: pr,
})
if err != nil {
o.uploadError = err
objectStorageUploadRequestsRequestFailed.Inc()
log.WithError(err).Error("error uploading S3 session")
return
}
}()
return o, nil
}
func (o *S3Object) trackUploadTime() {
started := time.Now()
<-o.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
func (o *S3Object) cleanup(ctx context.Context) {
// wait for the upload to finish
<-o.ctx.Done()
if o.uploadError != nil {
objectStorageUploadRequestsRequestFailed.Inc()
o.delete()
return
}
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it.
o.delete()
}
func (o *S3Object) delete() {
if o.objectName == "" {
return
}
session, err := setupS3Session(o.credentials, o.config)
if err != nil {
log.WithError(err).Error("error setting up S3 session in delete")
return
}
svc := s3.New(session)
input := &s3.DeleteObjectInput{
Bucket: aws.String(o.config.Bucket),
Key: aws.String(o.objectName),
}
// Note we can't use the request context because in a successful
// case, the original request has already completed.
deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
defer cancel()
_, err = svc.DeleteObjectWithContext(deleteCtx, input)
if err != nil {
log.WithError(err).Error("error deleting S3 object", err)
}
}
package objectstore_test
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func TestS3ObjectUpload(t *testing.T) {
creds, config, sess, ts := test.SetupS3(t)
defer ts.Close()
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
objectName := filepath.Join(tmpDir, "s3-test-data")
ctx, cancel := context.WithCancel(context.Background())
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
err = object.Close()
require.NoError(t, err)
test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
cancel()
deleted := false
retry(3, time.Second, func() error {
if test.S3ObjectDoesNotExist(t, sess, config, objectName) {
deleted = true
return nil
} else {
return fmt.Errorf("file is still present, retrying")
}
})
require.True(t, deleted)
}
func TestConcurrentS3ObjectUpload(t *testing.T) {
creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads")
defer uploadServer.Close()
// This will return a separate S3 endpoint
_, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts")
defer artifactsServer.Close()
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func(index int) {
var sess *session.Session
var config config.S3Config
if index%2 == 0 {
sess = uploadsSession
config = uploadsConfig
} else {
sess = artifactsSession
config = artifactsConfig
}
name := fmt.Sprintf("s3-test-data-%d", index)
objectName := filepath.Join(tmpDir, name)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
require.NoError(t, object.Close())
test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
wg.Done()
}(i)
}
wg.Wait()
}
func TestS3ObjectUploadCancel(t *testing.T) {
creds, config, _, ts := test.SetupS3(t)
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
objectName := filepath.Join(tmpDir, "s3-test-data")
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// Cancel the transfer before the data has been copied to ensure
// we handle this gracefully.
cancel()
_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
require.Error(t, err)
}
func retry(attempts int, sleep time.Duration, fn func() error) error {
if err := fn(); err != nil {
if s, ok := err.(stop); ok {
// Return the original error for later checking
return s.error
}
if attempts--; attempts > 0 {
time.Sleep(sleep)
return retry(attempts, 2*sleep, fn)
}
return err
}
return nil
}
type stop struct {
error
}
package objectstore
import (
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
)
type s3Session struct {
session *session.Session
expiry time.Time
}
type s3SessionCache struct {
// An S3 session is cached by its input configuration (e.g. region,
// endpoint, path style, etc.), but the bucket is actually
// determined by the type of object to be uploaded (e.g. CI
// artifact, LFS, etc.) during runtime. In practice, we should only
// need one session per Workhorse process if we only allow one
// configuration for many different buckets. However, using a map
// indexed by the config avoids potential pitfalls in case the
// bucket configuration is supplied at startup or we need to support
// multiple S3 endpoints.
sessions map[config.S3Config]*s3Session
sync.Mutex
}
func (s *s3Session) isExpired() bool {
return time.Now().After(s.expiry)
}
func newS3SessionCache() *s3SessionCache {
cache := &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)}
return cache
}
var (
// By default, it looks like IAM instance profiles may last 6 hours
// (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>),
// but this may be configurable from anywhere for 15 minutes to 12
// hours. To be safe, refresh AWS sessions every 10 minutes.
sessionExpiration = time.Duration(10 * time.Minute)
sessionCache = newS3SessionCache()
)
// SetupS3Session initializes a new AWS S3 session and refreshes one if
// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html,
// sessions should be cached when possible. Sessions are safe to use
// concurrently as long as the session isn't modified.
func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) {
sessionCache.Lock()
defer sessionCache.Unlock()
s, ok := sessionCache.sessions[s3Config]
if !ok {
s = &s3Session{}
sessionCache.sessions[s3Config] = s
} else if s.session != nil && !s.isExpired() {
return s.session.Copy(), nil
}
cfg := &aws.Config{
Region: aws.String(s3Config.Region),
S3ForcePathStyle: aws.Bool(s3Config.PathStyle),
}
// In case IAM profiles aren't being used, use the static credentials
if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" {
cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "")
}
if s3Config.Endpoint != "" {
cfg.Endpoint = aws.String(s3Config.Endpoint)
}
sess, err := session.NewSession(cfg)
if err != nil {
return nil, err
}
s.expiry = time.Now().Add(sessionExpiration)
s.session = sess
return sess.Copy(), nil
}
func ResetS3Session(s3Config config.S3Config) {
sessionCache.Lock()
defer sessionCache.Unlock()
s, ok := sessionCache.sessions[s3Config]
if ok {
s.session = nil
}
}
package objectstore
import (
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
)
func TestS3SessionSetup(t *testing.T) {
credentials := config.S3Credentials{}
cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
sess, err := setupS3Session(credentials, cfg)
require.NoError(t, err)
require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
require.Equal(t, len(sessionCache.sessions), 1)
anotherConfig := cfg
_, err = setupS3Session(credentials, anotherConfig)
require.NoError(t, err)
require.Equal(t, len(sessionCache.sessions), 1)
ResetS3Session(cfg)
}
func TestS3SessionExpiry(t *testing.T) {
credentials := config.S3Credentials{}
cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
sess, err := setupS3Session(credentials, cfg)
require.NoError(t, err)
require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
firstSession, ok := sessionCache.sessions[cfg]
require.True(t, ok)
require.False(t, firstSession.isExpired())
firstSession.expiry = time.Now().Add(-1 * time.Second)
require.True(t, firstSession.isExpired())
_, err = setupS3Session(credentials, cfg)
require.NoError(t, err)
nextSession, ok := sessionCache.sessions[cfg]
require.True(t, ok)
require.False(t, nextSession.isExpired())
ResetS3Session(cfg)
}
package test
import (
"io/ioutil"
"net/http/httptest"
"os"
"strings"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
)
func SetupS3(t *testing.T) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
return SetupS3WithBucket(t, "test-bucket")
}
func SetupS3WithBucket(t *testing.T, bucket string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
backend := s3mem.New()
faker := gofakes3.New(backend)
ts := httptest.NewServer(faker.Server())
creds := config.S3Credentials{
AwsAccessKeyID: "YOUR-ACCESSKEYID",
AwsSecretAccessKey: "YOUR-SECRETACCESSKEY",
}
config := config.S3Config{
Bucket: bucket,
Endpoint: ts.URL,
Region: "eu-central-1",
PathStyle: true,
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""),
Endpoint: aws.String(ts.URL),
Region: aws.String(config.Region),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
})
require.NoError(t, err)
// Create S3 service client
svc := s3.New(sess)
_, err = svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
require.NoError(t, err)
return creds, config, sess, ts
}
// S3ObjectExists will fail the test if the file does not exist.
func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) {
downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
require.NoError(t, err)
require.Equal(t, int64(len(expectedBytes)), numBytes)
output, err := ioutil.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, []byte(expectedBytes), output)
})
}
// S3ObjectDoesNotExist returns true if the object has been deleted,
// false otherwise. The return signature is different from
// S3ObjectExists because deletion may need to be retried since deferred
// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the
// initial check.
func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool {
deleted := false
downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
if err != nil && strings.Contains(err.Error(), "NoSuchKey") {
deleted = true
}
})
return deleted
}
func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) {
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
tmpfile, err := ioutil.TempFile(tmpDir, "s3-output")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
downloadSvc := s3manager.NewDownloader(sess)
numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{
Bucket: aws.String(config.Bucket),
Key: aws.String(objectName),
})
handler(tmpfile, numBytes, err)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment