Commit aa4fa178 authored by Alessio Caiazza's avatar Alessio Caiazza

Fix objectstore error shadowing

When we are uploading big objects, remote server may close the connection
while we are still writing.

This patch allows to fetch the real error instead of io.ErrClosedPipe
parent 5ee4b0a9
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
...@@ -73,7 +74,8 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat ...@@ -73,7 +74,8 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
pr, pw := io.Pipe() pr, pw := io.Pipe()
o.writeCloser = pw o.writeCloser = pw
req, err := http.NewRequest(http.MethodPut, o.PutURL, pr) // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, o.PutURL, ioutil.NopCloser(pr))
if err != nil { if err != nil {
objectStorageUploadRequestsRequestFailed.Inc() objectStorageUploadRequestsRequestFailed.Inc()
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err) return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err)
...@@ -103,7 +105,10 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat ...@@ -103,7 +105,10 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
go func() { go func() {
defer cancelFn() defer cancelFn()
defer objectStorageUploadsOpen.Dec() defer objectStorageUploadsOpen.Dec()
defer pr.Close() defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
req = req.WithContext(o.ctx) req = req.WithContext(o.ctx)
......
...@@ -78,6 +78,7 @@ func TestObjectUpload(t *testing.T) { ...@@ -78,6 +78,7 @@ func TestObjectUpload(t *testing.T) {
func TestObjectUpload404(t *testing.T) { func TestObjectUpload404(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t)
ts := httptest.NewServer(http.NotFoundHandler()) ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close() defer ts.Close()
...@@ -87,13 +88,56 @@ func TestObjectUpload404(t *testing.T) { ...@@ -87,13 +88,56 @@ func TestObjectUpload404(t *testing.T) {
objectURL := ts.URL + test.ObjectPath objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize) object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize)
require.NoError(t, err) require.NoError(err)
_, err = io.Copy(object, strings.NewReader(test.ObjectContent)) _, err = io.Copy(object, strings.NewReader(test.ObjectContent))
assert.NoError(err) assert.NoError(err)
err = object.Close() err = object.Close()
assert.Error(err) assert.Error(err)
_, isStatusCodeError := err.(objectstore.StatusCodeError) _, isStatusCodeError := err.(objectstore.StatusCodeError)
assert.True(isStatusCodeError, "Should fail with StatusCodeError") require.True(isStatusCodeError, "Should fail with StatusCodeError")
assert.Contains(err.Error(), "404") require.Contains(err.Error(), "404")
}
type endlessReader struct{}
func (e *endlessReader) Read(p []byte) (n int, err error) {
for i := 0; i < len(p); i++ {
p[i] = '*'
}
return len(p), nil
}
// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly.
// This is important for troubleshooting in production.
func TestObjectUploadBrokenConnection(t *testing.T) {
// This test server closes connection immediately
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hj, ok := w.(http.Hijacker)
if !ok {
require.FailNow(t, "webserver doesn't support hijacking")
}
conn, _, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
conn.Close()
}))
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, -1)
require.NoError(t, err)
_, copyErr := io.Copy(object, &endlessReader{})
require.Error(t, copyErr)
require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error")
closeErr := object.Close()
require.Equal(t, copyErr, closeErr)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment