Commit 87127664 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-fix-git-large-upload-pack' into 'master'

Fix stalled HTTP fetches with large payloads

Closes #92, gitlab-ce#25916, and gitlab-com/infrastructure#941

See merge request !110
parents 1b0af335 fc80c6a3
...@@ -5,7 +5,6 @@ In this file we handle the Git 'smart HTTP' protocol ...@@ -5,7 +5,6 @@ In this file we handle the Git 'smart HTTP' protocol
package git package git
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"log" "log"
...@@ -17,23 +16,32 @@ import ( ...@@ -17,23 +16,32 @@ import (
"strings" "strings"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
func GetInfoRefsHandler(a *api.API, cfg *config.Config) http.Handler { func ReceivePack(a *api.API) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, apiResponse *api.Response) { return postRPCHandler(a, "handleReceivePack", handleReceivePack)
if apiResponse.GitalySocketPath == "" {
handleGetInfoRefs(rw, r, apiResponse)
} else {
handleGetInfoRefsWithGitaly(rw, r, apiResponse, gitaly.NewClient(apiResponse.GitalySocketPath, cfg))
}
})
} }
func PostRPC(a *api.API) http.Handler { func UploadPack(a *api.API) http.Handler {
return repoPreAuthorizeHandler(a, handlePostRPC) return postRPCHandler(a, "handleUploadPack", handleUploadPack)
}
func postRPCHandler(a *api.API, name string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) {
var writtenIn int64
var err error
w := NewGitHttpResponseWriter(rw)
defer func() {
w.Log(r, writtenIn)
}()
writtenIn, err = handler(w, r, ar)
if err != nil {
helper.LogError(r, fmt.Errorf("%s: %v", name, err))
}
})
} }
func looksLikeRepo(p string) bool { func looksLikeRepo(p string) bool {
...@@ -62,152 +70,46 @@ func repoPreAuthorizeHandler(myAPI *api.API, handleFunc api.HandleFunc) http.Han ...@@ -62,152 +70,46 @@ func repoPreAuthorizeHandler(myAPI *api.API, handleFunc api.HandleFunc) http.Han
}, "") }, "")
} }
func handleGetInfoRefsWithGitaly(rw http.ResponseWriter, r *http.Request, a *api.Response, gitalyClient *gitaly.Client) { func setupGitCommand(action string, a *api.Response, options ...string) (cmd *exec.Cmd, stdin io.WriteCloser, stdout io.ReadCloser, err error) {
req := *r // Make a copy of r // Don't leak pipes when we return early after an error
req.Header = helper.HeaderClone(r.Header)
req.Header.Add("Gitaly-Repo-Path", a.RepoPath)
req.Header.Add("Gitaly-GL-Id", a.GL_ID)
req.URL.Path = path.Join(a.GitalyResourcePath, subCommand(getService(r)))
req.URL.RawQuery = ""
gitalyClient.Proxy.ServeHTTP(rw, &req)
}
func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) {
w := NewGitHttpResponseWriter(rw)
// Log 0 bytes in because we ignore the request body (and there usually is none anyway).
defer w.Log(r, 0)
rpc := getService(r)
if !(rpc == "git-upload-pack" || rpc == "git-receive-pack") {
// The 'dumb' Git HTTP protocol is not supported
http.Error(w, "Not Found", 404)
return
}
// Prepare our Git subprocess
cmd := gitCommand(a.GL_ID, "git", subCommand(rpc), "--stateless-rpc", "--advertise-refs", a.RepoPath)
stdout, err := cmd.StdoutPipe()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handleGetInfoRefs: stdout: %v", err))
return
}
defer stdout.Close()
if err := cmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("handleGetInfoRefs: start %v: %v", cmd.Args, err))
return
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
// Start writing the response
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-advertisement", rpc))
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
if err := pktLine(w, fmt.Sprintf("# service=%s\n", rpc)); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktLine: %v", err))
return
}
if err := pktFlush(w); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktFlush: %v", err))
return
}
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handleGetInfoRefs: copy output of %v: %v", cmd.Args, err)},
)
return
}
if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: wait for %v: %v", cmd.Args, err))
return
}
}
func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
var err error
var body io.Reader
var isShallowClone bool
var writtenIn int64
w := NewGitHttpResponseWriter(rw)
defer func() { defer func() {
w.Log(r, writtenIn) if err == nil {
}()
action := getService(r)
if !(action == "git-upload-pack" || action == "git-receive-pack") {
// The 'dumb' Git HTTP protocol is not supported
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: unsupported action: %s", r.URL.Path))
return return
} }
if stdin != nil {
if action == "git-upload-pack" { stdin.Close()
buffer := &bytes.Buffer{}
// Only sniff on the first 4096 bytes: we assume that if we find no
// 'deepen' message in the first 4096 bytes there won't be one later
// either.
_, err = io.Copy(buffer, io.LimitReader(r.Body, 4096))
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handlePostRPC: buffer git-upload-pack body: %v", err)})
return
} }
if stdout != nil {
isShallowClone = scanDeepen(bytes.NewReader(buffer.Bytes())) stdout.Close()
body = io.MultiReader(buffer, r.Body)
} else {
body = r.Body
} }
}()
// Prepare our Git subprocess // Prepare our Git subprocess
cmd := gitCommand(a.GL_ID, "git", subCommand(action), "--stateless-rpc", a.RepoPath) args := []string{subCommand(action), "--stateless-rpc"}
stdout, err := cmd.StdoutPipe() args = append(args, options...)
args = append(args, a.RepoPath)
cmd = gitCommand(a.GL_ID, "git", args...)
stdout, err = cmd.StdoutPipe()
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: stdout: %v", err)) return nil, nil, nil, fmt.Errorf("stdout pipe: %v", err)
return
} }
defer stdout.Close()
stdin, err := cmd.StdinPipe() stdin, err = cmd.StdinPipe()
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: stdin: %v", err)) return nil, nil, nil, fmt.Errorf("stdin pipe: %v", err)
return
}
defer stdin.Close()
if err := cmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: start %v: %v", cmd.Args, err))
return
} }
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
// Write the client request body to Git's standard input if err = cmd.Start(); err != nil {
if writtenIn, err = io.Copy(stdin, body); err != nil { return nil, nil, nil, fmt.Errorf("start %v: %v", cmd.Args, err)
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: write to %v: %v", cmd.Args, err))
return
} }
// Signal to the Git subprocess that no more data is coming
stdin.Close()
// It may take a while before we return and the deferred closes happen return cmd, stdin, stdout, nil
// so let's free up some resources already. }
r.Body.Close()
// Start writing the response func writePostRPCHeader(w http.ResponseWriter, action string) {
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action)) w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action))
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
// This io.Copy may take a long time, both for Git push and pull.
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)},
)
return
}
if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) {
helper.LogError(r, fmt.Errorf("handlePostRPC: wait for %v: %v", cmd.Args, err))
return
}
} }
func getService(r *http.Request) string { func getService(r *http.Request) string {
......
...@@ -2,7 +2,9 @@ package git ...@@ -2,7 +2,9 @@ package git
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
...@@ -29,13 +31,22 @@ func createTestPayload() []byte { ...@@ -29,13 +31,22 @@ func createTestPayload() []byte {
return bytes.Repeat([]byte{'0'}, expectedBytes) return bytes.Repeat([]byte{'0'}, expectedBytes)
} }
func TestRunUploadPack(t *testing.T) { func TestHandleUploadPack(t *testing.T) {
testHandlePostRpc(t, "git-upload-pack", handleUploadPack)
}
func TestHandleReceivePack(t *testing.T) {
testHandlePostRpc(t, "git-receive-pack", handleReceivePack)
}
func testHandlePostRpc(t *testing.T, action string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) {
execCommand = fakeExecCommand execCommand = fakeExecCommand
defer func() { execCommand = exec.Command }() defer func() { execCommand = exec.Command }()
testInput := createTestPayload() testInput := createTestPayload()
body := bytes.NewReader([]byte(testInput)) body := bytes.NewReader([]byte(testInput))
req, err := http.NewRequest("GET", "/gitlab/gitlab-ce.git/?service=git-upload-pack", body) url := fmt.Sprintf("/gitlab/gitlab-ce.git/?service=%s", action)
req, err := http.NewRequest("GET", url, body)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -44,7 +55,7 @@ func TestRunUploadPack(t *testing.T) { ...@@ -44,7 +55,7 @@ func TestRunUploadPack(t *testing.T) {
resp := &api.Response{GL_ID: GL_ID} resp := &api.Response{GL_ID: GL_ID}
rr := httptest.NewRecorder() rr := httptest.NewRecorder()
handlePostRPC(rr, req, resp) handler(NewGitHttpResponseWriter(rr), req, resp)
// Check HTTP status code // Check HTTP status code
if status := rr.Code; status != http.StatusOK { if status := rr.Code; status != http.StatusOK {
...@@ -52,11 +63,12 @@ func TestRunUploadPack(t *testing.T) { ...@@ -52,11 +63,12 @@ func TestRunUploadPack(t *testing.T) {
http.StatusOK, status) http.StatusOK, status)
} }
ct := fmt.Sprintf("application/x-%s-result", action)
headers := []struct { headers := []struct {
key string key string
value string value string
}{ }{
{"Content-Type", "application/x-git-upload-pack-result"}, {"Content-Type", ct},
{"Cache-Control", "no-cache"}, {"Cache-Control", "no-cache"},
} }
...@@ -69,11 +81,20 @@ func TestRunUploadPack(t *testing.T) { ...@@ -69,11 +81,20 @@ func TestRunUploadPack(t *testing.T) {
} }
if rr.Body.String() != string(testInput) { if rr.Body.String() != string(testInput) {
t.Errorf("handler did not echo back properly: got %d, expected %d bytes", t.Errorf("handler did not receive expected data: got %d, expected %d bytes",
len(rr.Body.String()), len(testInput)) len(rr.Body.String()), len(testInput))
} }
} }
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func TestGitCommandProcess(t *testing.T) { func TestGitCommandProcess(t *testing.T) {
if os.Getenv("GL_ID") != GL_ID { if os.Getenv("GL_ID") != GL_ID {
return return
...@@ -81,6 +102,18 @@ func TestGitCommandProcess(t *testing.T) { ...@@ -81,6 +102,18 @@ func TestGitCommandProcess(t *testing.T) {
defer os.Exit(0) defer os.Exit(0)
// Echo back the input to test sender uploadPack := stringInSlice("upload-pack", os.Args)
if uploadPack {
// First, send a large payload to stdout so that this executable will be blocked
// until the reader consumes the data
testInput := createTestPayload()
body := bytes.NewReader([]byte(testInput))
io.Copy(os.Stdout, body)
// Now consume all the data to unblock the sender
ioutil.ReadAll(os.Stdin)
} else {
io.Copy(os.Stdout, os.Stdin) io.Copy(os.Stdout, os.Stdin)
}
} }
package git
import (
"fmt"
"io"
"net/http"
"path"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
func GetInfoRefsHandler(a *api.API, cfg *config.Config) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, apiResponse *api.Response) {
if apiResponse.GitalySocketPath == "" {
handleGetInfoRefs(rw, r, apiResponse)
} else {
handleGetInfoRefsWithGitaly(rw, r, apiResponse, gitaly.NewClient(apiResponse.GitalySocketPath, cfg))
}
})
}
func handleGetInfoRefsWithGitaly(rw http.ResponseWriter, r *http.Request, a *api.Response, gitalyClient *gitaly.Client) {
req := *r // Make a copy of r
req.Header = helper.HeaderClone(r.Header)
req.Header.Add("Gitaly-Repo-Path", a.RepoPath)
req.Header.Add("Gitaly-GL-Id", a.GL_ID)
req.URL.Path = path.Join(a.GitalyResourcePath, subCommand(getService(r)))
req.URL.RawQuery = ""
gitalyClient.Proxy.ServeHTTP(rw, &req)
}
func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) {
w := NewGitHttpResponseWriter(rw)
// Log 0 bytes in because we ignore the request body (and there usually is none anyway).
defer w.Log(r, 0)
rpc := getService(r)
if !(rpc == "git-upload-pack" || rpc == "git-receive-pack") {
// The 'dumb' Git HTTP protocol is not supported
http.Error(w, "Not Found", 404)
return
}
cmd, stdin, stdout, err := setupGitCommand(rpc, a, "--advertise-refs")
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handleGetInfoRefs: setupGitCommand: %v", err))
return
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdin.Close() // Not needed for this request
defer stdout.Close()
// Start writing the response
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-advertisement", rpc))
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
if err := pktLine(w, fmt.Sprintf("# service=%s\n", rpc)); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktLine: %v", err))
return
}
if err := pktFlush(w); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktFlush: %v", err))
return
}
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handleGetInfoRefs: copy output of %v: %v", cmd.Args, err)},
)
return
}
if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: wait for %v: %v", cmd.Args, err))
return
}
}
package git
import (
"fmt"
"io"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
body := r.Body
action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
}
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
// Write the client request body to Git's standard input
writtenIn, err = io.Copy(stdin, body)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
writePostRPCHeader(w, action)
// This io.Copy may take a long time, both for Git push and pull.
_, err = io.Copy(w, stdout)
if err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)}
}
err = cmd.Wait()
if err != nil {
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err)
}
return writtenIn, nil
}
package git
import (
"fmt"
"io"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
// The body will consist almost entirely of 'have XXX' and 'want XXX'
// lines; these are about 50 bytes long. With a limit of 10MB the client
// can send over 200,000 have/want lines.
buffer, err := helper.ReadAllTempfile(io.LimitReader(r.Body, 10*1024*1024))
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("ReadAllTempfile: %v", err)
}
defer buffer.Close()
r.Body.Close()
isShallowClone := scanDeepen(buffer)
if _, err := buffer.Seek(0, 0); err != nil {
fail500(w)
return writtenIn, fmt.Errorf("seek tempfile: %v", err)
}
action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
}
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdoutError := make(chan error, 1)
go func() {
writePostRPCHeader(w, action)
// Start reading from stdout already to avoid blocking while writing to
// stdin below.
_, err := io.Copy(w, stdout)
// This error may be lost if some other error prevents us from <-ing on this channel.
stdoutError <- err
}()
// Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, buffer); err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
if err := <-stdoutError; err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)}
}
err = cmd.Wait()
if err != nil && !(isExitError(err) && isShallowClone) {
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err)
}
return writtenIn, nil
}
func fail500(w http.ResponseWriter) {
helper.Fail500(w, nil, nil)
}
package helper
import (
"io"
"io/ioutil"
"os"
)
func ReadAllTempfile(r io.Reader) (tempfile *os.File, err error) {
tempfile, err = ioutil.TempFile("", "gitlab-workhorse-read-all-tempfile")
if err != nil {
return nil, err
}
defer func() {
// Avoid leaking an open file if the function returns with an error
if err != nil {
tempfile.Close()
}
}()
if err := os.Remove(tempfile.Name()); err != nil {
return nil, err
}
if _, err := io.Copy(tempfile, r); err != nil {
return nil, err
}
if _, err := tempfile.Seek(0, 0); err != nil {
return nil, err
}
return tempfile, nil
}
...@@ -122,8 +122,8 @@ func (u *Upstream) configureRoutes() { ...@@ -122,8 +122,8 @@ func (u *Upstream) configureRoutes() {
u.Routes = []routeEntry{ u.Routes = []routeEntry{
// Git Clone // Git Clone
route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api, &u.Config)), route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api, &u.Config)),
route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.PostRPC(api)), isContentType("application/x-git-upload-pack-request")), route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), isContentType("application/x-git-upload-pack-request")),
route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.PostRPC(api)), isContentType("application/x-git-receive-pack-request")), route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), isContentType("application/x-git-receive-pack-request")),
route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, proxy), isContentType("application/octet-stream")), route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, proxy), isContentType("application/octet-stream")),
// CI Artifacts // CI Artifacts
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment