Commit aa5192b4 authored by Stan Hu's avatar Stan Hu

Add Azure Blob Storage support

This merge request introduces a client for Azure Blob Storage in
Workhorse. Currently customers wanting to use Azure Blob Storage have to
set up a Minio Gateway
(https://docs.gitlab.com/charts/advanced/external-object-storage/azure-minio-gateway.html),
which isn't ideal because it requires customers to maintain their own
proxy server for Azure. We have a number of customers who want native
support for Azure Blob Storage.

Unlike AWS and Google, Azure needs to use an Azure client inside
Workhorse to support direct uploads. Using standard HTTP transfers with
pre-signed URLs with the Azure Put Blob API
(https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
doesn't work because Azure doesn't support chunked transfer encoding.
However, Azure does support uploading files in segments via the Put
Block and Put Block List API
(https://docs.microsoft.com/en-us/rest/api/storageservices/put-block),
but this requires an Azure client that can speak this API.

Instead of embedding the Microsoft Azure client directly, we use the Go
Cloud Development Kit (https://godoc.org/gocloud.dev/blob) to make it
easier to add other object storage providers later. For example, GitLab
Rails might return this JSON payload in the
`/internal/uploads/authorize` call:

```json
{
   "UseWorkhorseClient":true,
   "ObjectStorage":{
      "Provider":"AzureRM",
      "GoCloudConfig":{
         "URL":"azblob://test-bucket"
      }
   }
}
```

The `azblob` scheme is managed by the Go Cloud `URLMux`
(https://godoc.org/gocloud.dev/blob#URLMux).

Converting our existing S3 client with Go Cloud should be done later
(https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/275).

This changes requires
https://gitlab.com/gitlab-org/gitlab/-/merge_requests/38882 to work.

Omnibus configuration changes are in
https://gitlab.com/gitlab-org/omnibus-gitlab/-/merge_requests/4505.

Part of https://gitlab.com/gitlab-org/gitlab/-/issues/25877
parent 144c928b
---
title: Add Azure blob store support
merge_request: 555
author:
type: added
...@@ -2,9 +2,12 @@ ...@@ -2,9 +2,12 @@
URL = "unix:/home/git/gitlab/redis/redis.socket" URL = "unix:/home/git/gitlab/redis/redis.socket"
[object_storage] [object_storage]
enabled = false provider = "AWS" # Allowed options: AWS, AzureRM
provider = "AWS"
[object_storage.s3] [object_storage.s3]
aws_access_key_id = "YOUR AWS ACCESS KEY" aws_access_key_id = "YOUR AWS ACCESS KEY"
aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY" aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY"
[object_store.azurerm]
azure_storage_account_name = "YOUR ACCOUNT NAME"
azure_storage_access_key = "YOUR ACCOUNT KEY"
...@@ -3,14 +3,15 @@ module gitlab.com/gitlab-org/gitlab-workhorse ...@@ -3,14 +3,15 @@ module gitlab.com/gitlab-org/gitlab-workhorse
go 1.13 go 1.13
require ( require (
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/FZambia/sentinel v1.0.0 github.com/FZambia/sentinel v1.0.0
github.com/alecthomas/chroma v0.7.3 github.com/alecthomas/chroma v0.7.3
github.com/aws/aws-sdk-go v1.31.7 github.com/aws/aws-sdk-go v1.31.13
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/getsentry/raven-go v0.1.2 github.com/getsentry/raven-go v0.1.2
github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721 github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721
github.com/golang/protobuf v1.3.2 github.com/golang/protobuf v1.4.2
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
...@@ -27,10 +28,11 @@ require ( ...@@ -27,10 +28,11 @@ require (
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
gitlab.com/gitlab-org/gitaly v1.74.0 gitlab.com/gitlab-org/gitaly v1.74.0
gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891 gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f gocloud.dev v0.20.0
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b golang.org/x/lint v0.0.0-20200302205851-738671d3881b
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 golang.org/x/net v0.0.0-20200602114024-627f9648deb9
google.golang.org/grpc v1.24.0 golang.org/x/tools v0.0.0-20200608174601-1b747fd94509
google.golang.org/grpc v1.29.1
gopkg.in/yaml.v2 v2.2.8 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 honnef.co/go/tools v0.0.1-2020.1.5
) )
This diff is collapsed.
...@@ -76,8 +76,9 @@ type MultipartUploadParams struct { ...@@ -76,8 +76,9 @@ type MultipartUploadParams struct {
} }
type ObjectStorageParams struct { type ObjectStorageParams struct {
Provider string Provider string
S3Config config.S3Config S3Config config.S3Config
GoCloudConfig config.GoCloudConfig
} }
type RemoteObject struct { type RemoteObject struct {
......
...@@ -2,9 +2,14 @@ package config ...@@ -2,9 +2,14 @@ package config
import ( import (
"net/url" "net/url"
"strings"
"time" "time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"gitlab.com/gitlab-org/labkit/log"
"gocloud.dev/blob"
"gocloud.dev/blob/azureblob"
) )
type TomlURL struct { type TomlURL struct {
...@@ -30,7 +35,12 @@ func (d *TomlDuration) UnmarshalTest(text []byte) error { ...@@ -30,7 +35,12 @@ func (d *TomlDuration) UnmarshalTest(text []byte) error {
type ObjectStorageCredentials struct { type ObjectStorageCredentials struct {
Provider string Provider string
S3Credentials S3Credentials `toml:"s3"` S3Credentials S3Credentials `toml:"s3"`
AzureCredentials AzureCredentials `toml:"azurerm"`
}
type ObjectStorageConfig struct {
URLMux *blob.URLMux `toml:"-"`
} }
type S3Credentials struct { type S3Credentials struct {
...@@ -48,6 +58,15 @@ type S3Config struct { ...@@ -48,6 +58,15 @@ type S3Config struct {
SSEKMSKeyID string `toml:"-"` // Server-side encryption key-management service key ID (e.g. arn:aws:xxx) SSEKMSKeyID string `toml:"-"` // Server-side encryption key-management service key ID (e.g. arn:aws:xxx)
} }
type GoCloudConfig struct {
URL string `toml:"-"`
}
type AzureCredentials struct {
AccountName string `toml:"azure_storage_account_name"`
AccountKey string `toml:"azure_storage_access_key"`
}
type RedisConfig struct { type RedisConfig struct {
URL TomlURL URL TomlURL
Sentinel []TomlURL Sentinel []TomlURL
...@@ -75,6 +94,7 @@ type Config struct { ...@@ -75,6 +94,7 @@ type Config struct {
APIQueueLimit uint `toml:"-"` APIQueueLimit uint `toml:"-"`
APIQueueTimeout time.Duration `toml:"-"` APIQueueTimeout time.Duration `toml:"-"`
APICILongPollingDuration time.Duration `toml:"-"` APICILongPollingDuration time.Duration `toml:"-"`
ObjectStorageConfig ObjectStorageConfig `toml:"-"`
ObjectStorageCredentials *ObjectStorageCredentials `toml:"object_storage"` ObjectStorageCredentials *ObjectStorageCredentials `toml:"object_storage"`
PropagateCorrelationID bool `toml:"-"` PropagateCorrelationID bool `toml:"-"`
} }
...@@ -88,3 +108,31 @@ func LoadConfig(filename string) (*Config, error) { ...@@ -88,3 +108,31 @@ func LoadConfig(filename string) (*Config, error) {
return cfg, nil return cfg, nil
} }
func (c *Config) RegisterGoCloudURLOpeners() error {
c.ObjectStorageConfig.URLMux = new(blob.URLMux)
creds := c.ObjectStorageCredentials
if strings.EqualFold(creds.Provider, "AzureRM") && creds.AzureCredentials.AccountName != "" && creds.AzureCredentials.AccountKey != "" {
accountName := azureblob.AccountName(creds.AzureCredentials.AccountName)
accountKey := azureblob.AccountKey(creds.AzureCredentials.AccountKey)
credential, err := azureblob.NewCredential(accountName, accountKey)
if err != nil {
log.WithError(err).Error("error creating Azure credentials")
return err
}
pipeline := azureblob.NewPipeline(credential, azblob.PipelineOptions{})
azureURLOpener := &azureblob.URLOpener{
AccountName: accountName,
Pipeline: pipeline,
Options: azureblob.Options{Credential: credential},
}
c.ObjectStorageConfig.URLMux.RegisterBucket(azureblob.Scheme, azureURLOpener)
}
return nil
}
...@@ -11,24 +11,16 @@ import ( ...@@ -11,24 +11,16 @@ import (
func TestLoadObjectStorageConfig(t *testing.T) { func TestLoadObjectStorageConfig(t *testing.T) {
config := ` config := `
[object_storage] [object_storage]
enabled = true
provider = "AWS" provider = "AWS"
[object_storage.s3] [object_storage.s3]
aws_access_key_id = "minio" aws_access_key_id = "minio"
aws_secret_access_key = "gdk-minio" aws_secret_access_key = "gdk-minio"
` `
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
tmpFile, cfg := loadTempConfig(t, config)
defer os.Remove(tmpFile.Name()) defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err := LoadConfig(tmpFile.Name())
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials") require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{ expected := ObjectStorageCredentials{
...@@ -41,3 +33,49 @@ aws_secret_access_key = "gdk-minio" ...@@ -41,3 +33,49 @@ aws_secret_access_key = "gdk-minio"
require.Equal(t, expected, *cfg.ObjectStorageCredentials) require.Equal(t, expected, *cfg.ObjectStorageCredentials)
} }
func TestRegisterGoCloudURLOpeners(t *testing.T) {
config := `
[object_storage]
provider = "AzureRM"
[object_storage.azurerm]
azure_storage_account_name = "azuretester"
azure_storage_access_key = "deadbeef"
`
tmpFile, cfg := loadTempConfig(t, config)
defer os.Remove(tmpFile.Name())
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{
Provider: "AzureRM",
AzureCredentials: AzureCredentials{
AccountName: "azuretester",
AccountKey: "deadbeef",
},
}
require.Equal(t, expected, *cfg.ObjectStorageCredentials)
require.Nil(t, cfg.ObjectStorageConfig.URLMux)
err := cfg.RegisterGoCloudURLOpeners()
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageConfig.URLMux)
require.True(t, cfg.ObjectStorageConfig.URLMux.ValidBucketScheme("azblob"))
require.Equal(t, []string{"azblob"}, cfg.ObjectStorageConfig.URLMux.BucketSchemes())
}
func loadTempConfig(t *testing.T, config string) (f *os.File, cfg *Config) {
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err = LoadConfig(tmpFile.Name())
require.NoError(t, err)
return tmpFile, cfg
}
...@@ -117,32 +117,41 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -117,32 +117,41 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
} }
}() }()
useS3Client := opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid() var clientMode string
if opts.IsRemote() {
if useS3Client { if opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud() {
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline) clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
if err != nil { p := &objectstore.GoCloudObjectParams{
return nil, err Ctx: ctx,
} Mux: opts.ObjectStorageConfig.URLMux,
BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL,
writers = append(writers, remoteWriter) ObjectName: opts.RemoteTempObjectID,
} else if opts.IsMultipart() { Deadline: opts.Deadline,
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize) }
if err != nil { remoteWriter, err = objectstore.NewGoCloudObject(p)
return nil, err } else if opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid() {
clientMode = "s3"
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline)
} else if opts.IsMultipart() {
clientMode = "multipart"
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
} else {
clientMode = "http"
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
} }
writers = append(writers, remoteWriter)
} else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
writers = append(writers, remoteWriter) writers = append(writers, remoteWriter)
} }
if opts.IsLocal() { if opts.IsLocal() {
if clientMode == "" {
clientMode = "local"
} else {
clientMode += "+local"
}
fileWriter, err := fh.uploadLocalFile(ctx, opts) fileWriter, err := fh.uploadLocalFile(ctx, opts)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -172,12 +181,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -172,12 +181,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
"is_remote": opts.IsRemote(), "is_remote": opts.IsRemote(),
"remote_id": opts.RemoteID, "remote_id": opts.RemoteID,
"temp_file_prefix": opts.TempFilePrefix, "temp_file_prefix": opts.TempFilePrefix,
"use_s3_client": useS3Client, "client_mode": clientMode,
}) })
if opts.IsLocal() { if opts.IsLocal() {
logger = logger.WithField("local_temp_path", opts.LocalTempPath) logger = logger.WithField("local_temp_path", opts.LocalTempPath)
} else if useS3Client { }
if opts.IsRemote() {
logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID) logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID)
} }
......
...@@ -14,7 +14,9 @@ import ( ...@@ -14,7 +14,9 @@ import (
"github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
...@@ -184,9 +186,11 @@ func TestSaveFile(t *testing.T) { ...@@ -184,9 +186,11 @@ func TestSaveFile(t *testing.T) {
for _, spec := range tests { for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) { t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
logHook := testhelper.SetupLogger()
var opts filestore.SaveFileOpts var opts filestore.SaveFileOpts
var expectedDeletes, expectedPuts int var expectedDeletes, expectedPuts int
var expectedClientMode string
osStub, ts := test.StartObjectStore() osStub, ts := test.StartObjectStore()
defer ts.Close() defer ts.Close()
...@@ -203,6 +207,7 @@ func TestSaveFile(t *testing.T) { ...@@ -203,6 +207,7 @@ func TestSaveFile(t *testing.T) {
expectedDeletes = 1 expectedDeletes = 1
expectedPuts = 1 expectedPuts = 1
expectedClientMode = "http"
case remoteMultipart: case remoteMultipart:
objectURL := ts.URL + test.ObjectPath objectURL := ts.URL + test.ObjectPath
...@@ -217,11 +222,18 @@ func TestSaveFile(t *testing.T) { ...@@ -217,11 +222,18 @@ func TestSaveFile(t *testing.T) {
osStub.InitiateMultipartUpload(test.ObjectPath) osStub.InitiateMultipartUpload(test.ObjectPath)
expectedDeletes = 1 expectedDeletes = 1
expectedPuts = 2 expectedPuts = 2
expectedClientMode = "multipart"
} }
if spec.local { if spec.local {
opts.LocalTempPath = tmpFolder opts.LocalTempPath = tmpFolder
opts.TempFilePrefix = "test-file" opts.TempFilePrefix = "test-file"
if expectedClientMode != "" {
expectedClientMode += "+local"
} else {
expectedClientMode = "local"
}
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -271,11 +283,20 @@ func TestSaveFile(t *testing.T) { ...@@ -271,11 +283,20 @@ func TestSaveFile(t *testing.T) {
uploadFields := token.Claims.(*testhelper.UploadClaims).Upload uploadFields := token.Claims.(*testhelper.UploadClaims).Upload
checkFileHandlerWithFields(t, fh, uploadFields, "", spec.remote == notRemote) checkFileHandlerWithFields(t, fh, uploadFields, "", spec.remote == notRemote)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, fmt.Sprintf("client_mode=%s", expectedClientMode))
}) })
} }
} }
func TestSaveFileWithWorkhorseClient(t *testing.T) { func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
logHook := testhelper.SetupLogger()
s3Creds, s3Config, sess, ts := test.SetupS3(t, "") s3Creds, s3Config, sess, ts := test.SetupS3(t, "")
defer ts.Close() defer ts.Close()
...@@ -299,6 +320,71 @@ func TestSaveFileWithWorkhorseClient(t *testing.T) { ...@@ -299,6 +320,71 @@ func TestSaveFileWithWorkhorseClient(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent) test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, "client_mode=s3")
}
func TestSaveFileWithAzureWorkhorseClient(t *testing.T) {
logHook := testhelper.SetupLogger()
mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "AzureRM",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
test.GoCloudObjectExists(t, bucketDir, remoteObject)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, "client_mode=\"go_cloud:AzureRM\"")
}
func TestSaveFileWithUnknownGoCloudScheme(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := new(blob.URLMux)
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "SomeCloud",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.Error(t, err)
} }
func TestSaveMultipartInBodyFailure(t *testing.T) { func TestSaveMultipartInBodyFailure(t *testing.T) {
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"strings" "strings"
"time" "time"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
) )
...@@ -16,6 +18,12 @@ type ObjectStorageConfig struct { ...@@ -16,6 +18,12 @@ type ObjectStorageConfig struct {
S3Credentials config.S3Credentials S3Credentials config.S3Credentials
S3Config config.S3Config S3Config config.S3Config
// GoCloud mux that maps azureblob:// and future URLs (e.g. s3://, gcs://, etc.) to a handler
URLMux *blob.URLMux
// Azure credentials are registered at startup in the GoCloud URLMux, so only the container name is needed
GoCloudConfig config.GoCloudConfig
} }
// SaveFileOpts represents all the options available for saving a file to object store // SaveFileOpts represents all the options available for saving a file to object store
...@@ -66,7 +74,7 @@ func (s *SaveFileOpts) IsLocal() bool { ...@@ -66,7 +74,7 @@ func (s *SaveFileOpts) IsLocal() bool {
// IsRemote checks if the options requires a remote upload // IsRemote checks if the options requires a remote upload
func (s *SaveFileOpts) IsRemote() bool { func (s *SaveFileOpts) IsRemote() bool {
return s.PresignedPut != "" || s.IsMultipart() return s.PresignedPut != "" || s.IsMultipart() || s.UseWorkhorseClient
} }
// IsMultipart checks if the options requires a Multipart upload // IsMultipart checks if the options requires a Multipart upload
...@@ -97,6 +105,7 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts { ...@@ -97,6 +105,7 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts {
if opts.UseWorkhorseClient && objectStorageParams != nil { if opts.UseWorkhorseClient && objectStorageParams != nil {
opts.ObjectStorageConfig.Provider = objectStorageParams.Provider opts.ObjectStorageConfig.Provider = objectStorageParams.Provider
opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config
opts.ObjectStorageConfig.GoCloudConfig = objectStorageParams.GoCloudConfig
} }
// Backwards compatibility to ensure API servers that do not include the // Backwards compatibility to ensure API servers that do not include the
...@@ -120,11 +129,28 @@ func (c *ObjectStorageConfig) IsAWS() bool { ...@@ -120,11 +129,28 @@ func (c *ObjectStorageConfig) IsAWS() bool {
return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3") return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3")
} }
func (c *ObjectStorageConfig) IsAzure() bool {
return strings.EqualFold(c.Provider, "AzureRM")
}
func (c *ObjectStorageConfig) IsGoCloud() bool {
return c.GoCloudConfig.URL != ""
}
func (c *ObjectStorageConfig) IsValid() bool { func (c *ObjectStorageConfig) IsValid() bool {
return c.S3Config.Bucket != "" && c.S3Config.Region != "" && c.credentialsValid() if c.IsAWS() {
return c.S3Config.Bucket != "" && c.S3Config.Region != "" && c.s3CredentialsValid()
} else if c.IsGoCloud() {
// We could parse and validate the URL, but GoCloud providers
// such as AzureRM don't have a fallback to normal HTTP, so we
// always want to try the GoCloud path if there is a URL.
return true
}
return false
} }
func (c *ObjectStorageConfig) credentialsValid() bool { func (c *ObjectStorageConfig) s3CredentialsValid() bool {
// We need to be able to distinguish between two cases of AWS access: // We need to be able to distinguish between two cases of AWS access:
// 1. AWS access via key and secret, but credentials not configured in Workhorse // 1. AWS access via key and secret, but credentials not configured in Workhorse
// 2. IAM instance profiles used // 2. IAM instance profiles used
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
) )
func TestSaveFileOptsLocalAndRemote(t *testing.T) { func TestSaveFileOptsLocalAndRemote(t *testing.T) {
...@@ -269,6 +270,66 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { ...@@ -269,6 +270,66 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled()) require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled())
require.Equal(t, test.UseWorkhorseClient, opts.IsRemote())
})
}
}
func TestGoCloudConfig(t *testing.T) {
mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
defer cleanup()
tests := []struct {
name string
provider string
url string
valid bool
}{
{
name: "valid AzureRM config",
provider: "AzureRM",
url: "azblob:://test-container",
valid: true,
},
{
name: "invalid GoCloud scheme",
provider: "AzureRM",
url: "unknown:://test-container",
valid: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
apiResponse := &api.Response{
TempPath: "/tmp",
RemoteObject: api.RemoteObject{
Timeout: 10,
ID: "id",
UseWorkhorseClient: true,
RemoteTempObjectID: "test-object",
ObjectStorage: &api.ObjectStorageParams{
Provider: test.provider,
GoCloudConfig: config.GoCloudConfig{
URL: test.url,
},
},
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts := filestore.GetOpts(apiResponse)
opts.ObjectStorageConfig.URLMux = mux
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
require.Equal(t, apiResponse.RemoteObject.RemoteTempObjectID, opts.RemoteTempObjectID)
require.WithinDuration(t, deadline, opts.Deadline, time.Second)
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
require.Equal(t, test.provider, opts.ObjectStorageConfig.Provider)
require.Equal(t, apiResponse.RemoteObject.ObjectStorage.GoCloudConfig, opts.ObjectStorageConfig.GoCloudConfig)
require.True(t, opts.UseWorkhorseClientEnabled())
require.Equal(t, test.valid, opts.ObjectStorageConfig.IsValid())
require.True(t, opts.IsRemote())
}) })
} }
} }
package objectstore
import (
"context"
"io"
"time"
"gitlab.com/gitlab-org/labkit/log"
"gocloud.dev/blob"
)
type GoCloudObject struct {
bucket *blob.Bucket
mux *blob.URLMux
bucketURL string
objectName string
uploader
}
type GoCloudObjectParams struct {
Ctx context.Context
Mux *blob.URLMux
BucketURL string
ObjectName string
Deadline time.Time
}
func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) {
bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL)
if err != nil {
return nil, err
}
o := &GoCloudObject{
bucket: bucket,
mux: p.Mux,
bucketURL: p.BucketURL,
objectName: p.ObjectName,
}
o.uploader = newUploader(o)
o.Execute(p.Ctx, p.Deadline)
return o, nil
}
func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error {
defer o.bucket.Close()
writer, err := o.bucket.NewWriter(ctx, o.objectName, nil)
if err != nil {
log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket")
return err
}
if _, err = io.Copy(writer, r); err != nil {
log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket")
writer.Close()
return err
}
if err := writer.Close(); err != nil {
log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket")
return err
}
return nil
}
func (o *GoCloudObject) ETag() string {
return ""
}
func (o *GoCloudObject) Abort() {
o.Delete()
}
// Delete will always attempt to delete the temporary file.
// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883,
// if the writer is closed before any Write is called, Close will create an empty file.
func (o *GoCloudObject) Delete() {
if o.bucketURL == "" || o.objectName == "" {
return
}
// Note we can't use the request context because in a successful
// case, the original request has already completed.
deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
defer cancel()
bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL)
if err != nil {
log.WithError(err).Error("error opening bucket for delete")
return
}
if err := bucket.Delete(deleteCtx, o.objectName); err != nil {
log.WithError(err).Error("error deleting object", err)
}
}
package objectstore_test
import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func TestGoCloudObjectUpload(t *testing.T) {
mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest")
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
deadline := time.Now().Add(testTimeout)
objectName := "test.png"
testURL := "azuretest://azure.example.com/test-container"
p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName, Deadline: deadline}
object, err := objectstore.NewGoCloudObject(p)
require.NotNil(t, object)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
err = object.Close()
require.NoError(t, err)
bucket, err := mux.OpenBucket(ctx, testURL)
require.NoError(t, err)
// Verify the data was copied correctly.
received, err := bucket.ReadAll(ctx, objectName)
require.NoError(t, err)
require.Equal(t, []byte(test.ObjectContent), received)
cancel()
deleted := false
retry(3, time.Second, func() error {
exists, err := bucket.Exists(ctx, objectName)
require.NoError(t, err)
if exists {
return fmt.Errorf("file %s is still present, retrying", objectName)
} else {
deleted = true
return nil
}
})
require.True(t, deleted)
}
package test
import (
"context"
"io/ioutil"
"net/url"
"os"
"testing"
"github.com/stretchr/testify/require"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
)
type dirOpener struct {
u *url.URL // last url passed to OpenBucketURL
tmpDir string
}
func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
o.u = u
return fileblob.OpenBucket(o.tmpDir, nil)
}
func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) {
tmpDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
mux := new(blob.URLMux)
fake := &dirOpener{tmpDir: tmpDir}
mux.RegisterBucket(scheme, fake)
cleanup = func() {
os.RemoveAll(tmpDir)
}
return mux, tmpDir, cleanup
}
func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) {
bucket, err := fileblob.OpenBucket(bucketDir, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background
defer cancel()
exists, err := bucket.Exists(ctx, objectName)
require.NoError(t, err)
require.True(t, exists)
}
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"io" "io"
"strings" "strings"
"time" "time"
"gitlab.com/gitlab-org/labkit/log"
) )
// Upload represents an upload to an ObjectStorage provider // Upload represents an upload to an ObjectStorage provider
...@@ -123,6 +125,8 @@ func (u *uploader) Execute(ctx context.Context, deadline time.Time) { ...@@ -123,6 +125,8 @@ func (u *uploader) Execute(ctx context.Context, deadline time.Time) {
if u.md5 != nil { if u.md5 != nil {
err := compareMD5(u.md5Sum(), u.etag) err := compareMD5(u.md5Sum(), u.etag)
if err != nil { if err != nil {
log.ContextLogger(ctx).WithError(err).Error("error comparing MD5 checksum")
u.uploadError = err u.uploadError = err
if u.metrics { if u.metrics {
objectStorageUploadRequestsRequestFailed.Inc() objectStorageUploadRequestsRequestFailed.Inc()
......
...@@ -15,8 +15,11 @@ import ( ...@@ -15,8 +15,11 @@ import (
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
"time"
"github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
...@@ -207,3 +210,25 @@ type UploadClaims struct { ...@@ -207,3 +210,25 @@ type UploadClaims struct {
Upload map[string]string `json:"upload"` Upload map[string]string `json:"upload"`
jwt.StandardClaims jwt.StandardClaims
} }
func SetupLogger() *test.Hook {
logger, hook := test.NewNullLogger()
logrus.SetOutput(logger.Writer())
return hook
}
// logrus fires a Goroutine to write the output log, but there's no way to
// flush all outstanding hooks to fire. We just wait up to a second
// for an event to appear.
func WaitForLogEvent(hook *test.Hook) bool {
for i := 0; i < 10; i++ {
if entry := hook.LastEntry(); entry != nil {
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
) )
type ObjectStoragePreparer struct { type ObjectStoragePreparer struct {
config config.ObjectStorageConfig
credentials config.ObjectStorageCredentials credentials config.ObjectStorageCredentials
} }
...@@ -17,11 +18,12 @@ func NewObjectStoragePreparer(c config.Config) Preparer { ...@@ -17,11 +18,12 @@ func NewObjectStoragePreparer(c config.Config) Preparer {
creds = &config.ObjectStorageCredentials{} creds = &config.ObjectStorageCredentials{}
} }
return &ObjectStoragePreparer{credentials: *creds} return &ObjectStoragePreparer{credentials: *creds, config: c.ObjectStorageConfig}
} }
func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
opts := filestore.GetOpts(a) opts := filestore.GetOpts(a)
opts.ObjectStorageConfig.URLMux = p.config.URLMux
opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials
return opts, nil, nil return opts, nil, nil
......
...@@ -3,6 +3,8 @@ package upload_test ...@@ -3,6 +3,8 @@ package upload_test
import ( import (
"testing" "testing"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
...@@ -21,6 +23,9 @@ func TestPrepareWithS3Config(t *testing.T) { ...@@ -21,6 +23,9 @@ func TestPrepareWithS3Config(t *testing.T) {
Provider: "AWS", Provider: "AWS",
S3Credentials: creds, S3Credentials: creds,
}, },
ObjectStorageConfig: config.ObjectStorageConfig{
URLMux: new(blob.URLMux),
},
} }
r := &api.Response{ r := &api.Response{
...@@ -39,6 +44,7 @@ func TestPrepareWithS3Config(t *testing.T) { ...@@ -39,6 +44,7 @@ func TestPrepareWithS3Config(t *testing.T) {
require.True(t, opts.ObjectStorageConfig.IsAWS()) require.True(t, opts.ObjectStorageConfig.IsAWS())
require.True(t, opts.UseWorkhorseClient) require.True(t, opts.UseWorkhorseClient)
require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials) require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials)
require.NotNil(t, opts.ObjectStorageConfig.URLMux)
require.Equal(t, nil, v) require.Equal(t, nil, v)
} }
...@@ -50,5 +56,6 @@ func TestPrepareWithNoConfig(t *testing.T) { ...@@ -50,5 +56,6 @@ func TestPrepareWithNoConfig(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, opts.UseWorkhorseClient) require.False(t, opts.UseWorkhorseClient)
require.Equal(t, nil, v) require.Nil(t, v)
require.Nil(t, opts.ObjectStorageConfig.URLMux)
} }
...@@ -172,6 +172,11 @@ func main() { ...@@ -172,6 +172,11 @@ func main() {
redis.Configure(cfg.Redis, redis.DefaultDialFunc) redis.Configure(cfg.Redis, redis.DefaultDialFunc)
go redis.Process() go redis.Process()
} }
err = cfg.RegisterGoCloudURLOpeners()
if err != nil {
log.WithError(err).Fatal("could not load cloud credentials")
}
} }
accessLogger, accessCloser, err := getAccessLogger(logConfig) accessLogger, accessCloser, err := getAccessLogger(logConfig)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment