Commit 1390af67 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'add-inline-upload-to-remote-storage' into 'master'

Add internal upload to external storage

See merge request !148
parents eacd5b7a 91d15f77
......@@ -66,6 +66,15 @@ func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTrippe
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
type RemoteObjectStore struct {
// StoreURL is the temporary URL to which upload the first found file
StoreURL string
// ObjectID is a unique identifier of object storage upload
ObjectID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL
Timeout int
}
type Response struct {
// GL_ID is an environment variable used by gitlab-shell hooks during 'git
// push' and 'git pull'
......@@ -83,6 +92,9 @@ type Response struct {
// TmpPath is the path where we should store temporary files
// This is set by authorization middleware
TempPath string
// ObjectStore is provided by the GitLab Rails application
// and defines a way to store object on remote storage
ObjectStore RemoteObjectStore
// Archive is the path where the artifacts archive is stored
Archive string `json:"archive"`
// Entry is a filename inside the archive point to file that needs to be extracted
......
package artifacts
import (
"fmt"
"mime/multipart"
"net/http"
"os"
"time"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/client_golang/prometheus"
)
var (
DefaultObjectStoreTimeoutSeconds = 360
)
var (
objectStorageUploadRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_requests",
Help: "How many object storage requests have been processed",
},
[]string{"status"},
)
objectStorageUploadsOpen = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_object_storage_upload_open",
Help: "Describes many object storage requests are open now",
},
)
objectStorageUploadBytes = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_bytes",
Help: "How many bytes were sent to object storage",
},
)
objectStorageUploadTime = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "gitlab_workhorse_object_storage_upload_time",
Help: "How long it took to upload objects",
Buckets: objectStorageUploadTimeBuckets,
})
objectStorageUploadRequestsFileFailed = objectStorageUploadRequests.WithLabelValues("file-failed")
objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
objectStorageUploadRequestsSucceeded = objectStorageUploadRequests.WithLabelValues("succeeded")
objectStorageUploadRequestsMultipleUploads = objectStorageUploadRequests.WithLabelValues("multiple-uploads")
objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
)
func init() {
prometheus.MustRegister(
objectStorageUploadRequests,
objectStorageUploadsOpen,
objectStorageUploadBytes)
}
func (a *artifactsUploadProcessor) storeFile(formName, fileName string, writer *multipart.Writer) error {
if a.ObjectStore.StoreURL == "" {
return nil
}
if a.stored {
objectStorageUploadRequestsMultipleUploads.Inc()
return nil
}
started := time.Now()
defer func() {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}()
file, err := os.Open(fileName)
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
}
req, err := http.NewRequest("PUT", a.ObjectStore.StoreURL, file)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT %q: %v", a.ObjectStore.StoreURL, err)
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = fi.Size()
objectStorageUploadsOpen.Inc()
defer objectStorageUploadsOpen.Dec()
timeout := DefaultObjectStoreTimeoutSeconds
if a.ObjectStore.Timeout != 0 {
timeout = a.ObjectStore.Timeout
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancelFn()
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT request %q: %v", a.ObjectStore.StoreURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
objectStorageUploadRequestsInvalidStatus.Inc()
return fmt.Errorf("PUT request %v returned: %d %s", a.ObjectStore.StoreURL, resp.StatusCode, resp.Status)
}
writer.WriteField(formName+".store_url", a.ObjectStore.StoreURL)
writer.WriteField(formName+".object_id", a.ObjectStore.ObjectID)
objectStorageUploadRequestsSucceeded.Inc()
objectStorageUploadBytes.Add(float64(fi.Size()))
// Allow to upload only once using given credentials
a.stored = true
return nil
}
package artifacts
import (
"archive/zip"
"bytes"
"fmt"
"io/ioutil"
"mime/multipart"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)
func createTestZipArchive(t *testing.T) []byte {
var buffer bytes.Buffer
archive := zip.NewWriter(&buffer)
fileInArchive, err := archive.Create("test.file")
require.NoError(t, err)
fmt.Fprint(fileInArchive, "test")
archive.Close()
return buffer.Bytes()
}
func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) {
var buffer bytes.Buffer
writer := multipart.NewWriter(&buffer)
file, err := writer.CreateFormFile("file", "my.file")
require.NoError(t, err)
file.Write(data)
writer.Close()
return buffer, writer.FormDataContentType()
}
func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
storeServerCalled := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "PUT", r.Method)
receivedData, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Equal(t, archiveData, receivedData)
storeServerCalled++
w.WriteHeader(200)
})
responseProcessorCalled := 0
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "store-id", r.FormValue("file.object_id"))
assert.NotEmpty(t, r.FormValue("file.store_url"))
w.WriteHeader(200)
responseProcessorCalled++
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 200)
assert.Equal(t, 1, storeServerCalled, "store should be called only once")
assert.Equal(t, 1, responseProcessorCalled, "response processor should be called only once")
}
func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: "http://localhost:12323/invalid/url",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
}
func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: "htt:////invalid-url",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
}
func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
putCalledTimes := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
putCalledTimes++
assert.Equal(t, "PUT", r.Method)
w.WriteHeader(510)
})
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
putCalledTimes := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
putCalledTimes++
assert.Equal(t, "PUT", r.Method)
time.Sleep(10 * time.Second)
w.WriteHeader(510)
})
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
Timeout: 1,
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
......@@ -2,6 +2,7 @@ package artifacts
import (
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
......@@ -17,7 +18,30 @@ import (
type artifactsUploadProcessor struct {
TempPath string
ObjectStore api.RemoteObjectStore
metadataFile string
stored bool
}
func (a *artifactsUploadProcessor) generateMetadataFromZip(fileName string, metadataFile io.Writer) (bool, error) {
// Generate metadata and save to file
zipMd := exec.Command("gitlab-zip-metadata", fileName)
zipMd.Stderr = os.Stderr
zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
zipMd.Stdout = metadataFile
if err := zipMd.Start(); err != nil {
return false, err
}
defer helper.CleanUpProcessGroup(zipMd)
if err := zipMd.Wait(); err != nil {
if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip {
return false, nil
}
return false, err
}
return true, nil
}
func (a *artifactsUploadProcessor) ProcessFile(formName, fileName string, writer *multipart.Writer) error {
......@@ -36,28 +60,23 @@ func (a *artifactsUploadProcessor) ProcessFile(formName, fileName string, writer
return err
}
defer tempFile.Close()
a.metadataFile = tempFile.Name()
// Generate metadata and save to file
zipMd := exec.Command("gitlab-zip-metadata", fileName)
zipMd.Stderr = os.Stderr
zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
zipMd.Stdout = tempFile
a.metadataFile = tempFile.Name()
if err := zipMd.Start(); err != nil {
return err
generatedMetadata, err := a.generateMetadataFromZip(fileName, tempFile)
if err != nil {
return fmt.Errorf("generateMetadataFromZip: %v", err)
}
defer helper.CleanUpProcessGroup(zipMd)
if err := zipMd.Wait(); err != nil {
if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip {
return nil
}
return err
if generatedMetadata {
// Pass metadata file path to Rails
writer.WriteField("metadata.path", a.metadataFile)
writer.WriteField("metadata.name", "metadata.gz")
}
// Pass metadata file path to Rails
writer.WriteField("metadata.path", a.metadataFile)
writer.WriteField("metadata.name", "metadata.gz")
if err := a.storeFile(formName, fileName, writer); err != nil {
return fmt.Errorf("storeFile: %v", err)
}
return nil
}
......@@ -86,7 +105,10 @@ func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
return
}
mg := &artifactsUploadProcessor{TempPath: a.TempPath}
mg := &artifactsUploadProcessor{
TempPath: a.TempPath,
ObjectStore: a.ObjectStore,
}
defer mg.Cleanup()
upload.HandleFileUploads(w, r, h, a.TempPath, mg)
......
......@@ -22,7 +22,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts"
)
func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("/url/path/authorize", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
......@@ -31,9 +31,7 @@ func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
w.Header().Set("Content-Type", api.ResponseContentType)
data, err := json.Marshal(&api.Response{
TempPath: tempPath,
})
data, err := json.Marshal(&authResponse)
if err != nil {
t.Fatal("Expected to marshal")
}
......@@ -79,7 +77,11 @@ func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
return
}
w.WriteHeader(200)
if bodyProcessor != nil {
bodyProcessor(w, r)
} else {
w.WriteHeader(200)
}
})
return testhelper.TestServerWithHandler(nil, mux.ServeHTTP)
}
......@@ -107,7 +109,7 @@ func TestUploadHandlerAddingMetadata(t *testing.T) {
}
defer os.RemoveAll(tempPath)
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
var buffer bytes.Buffer
......@@ -138,7 +140,7 @@ func TestUploadHandlerForUnsupportedArchive(t *testing.T) {
}
defer os.RemoveAll(tempPath)
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
var buffer bytes.Buffer
......@@ -162,7 +164,7 @@ func TestUploadFormProcessing(t *testing.T) {
}
defer os.RemoveAll(tempPath)
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
var buffer bytes.Buffer
......
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
func nop() {}
var (
testHookContextDoneBeforeHeaders = nop
testHookDoReturned = nop
testHookDidBodyClose = nop
)
// Do sends an HTTP request with the provided http.Client and returns an HTTP response.
// If the client is nil, http.DefaultClient is used.
// If the context is canceled or times out, ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
// TODO(djd): Respect any existing value of req.Cancel.
cancel := make(chan struct{})
req.Cancel = cancel
type responseAndError struct {
resp *http.Response
err error
}
result := make(chan responseAndError, 1)
// Make local copies of test hooks closed over by goroutines below.
// Prevents data races in tests.
testHookDoReturned := testHookDoReturned
testHookDidBodyClose := testHookDidBodyClose
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
close(cancel)
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
c := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(cancel)
case <-c:
// The response's Body is closed.
}
}()
resp.Body = &notifyingReader{resp.Body, c}
return resp, nil
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// notifyingReader is an io.ReadCloser that closes the notify channel after
// Close is called or a Read fails on the underlying ReadCloser.
type notifyingReader struct {
io.ReadCloser
notify chan<- struct{}
}
func (r *notifyingReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err != nil && r.notify != nil {
close(r.notify)
r.notify = nil
}
return n, err
}
func (r *notifyingReader) Close() error {
err := r.ReadCloser.Close()
if r.notify != nil {
close(r.notify)
r.notify = nil
}
return err
}
......@@ -148,6 +148,12 @@
"path": "golang.org/x/net/context",
"revision": "f2499483f923065a842d38eb4c7f1927e6fc6e6d"
},
{
"checksumSHA1": "WHc3uByvGaMcnSoI21fhzYgbOgg=",
"path": "golang.org/x/net/context/ctxhttp",
"revision": "a6577fac2d73be281a500b310739095313165611",
"revisionTime": "2017-03-08T20:54:49Z"
},
{
"checksumSHA1": "N1akwAdrHVfPPrsFOhG2ouP21VA=",
"path": "golang.org/x/net/http2",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment