Commit 468b4fe7 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'upload-pack-deadlock' into 'master'

Fix another upload-pack deadlock

See merge request !118
parents 2bd8da84 47edcb11
......@@ -14,6 +14,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
......@@ -27,19 +28,18 @@ func UploadPack(a *api.API) http.Handler {
return postRPCHandler(a, "handleUploadPack", handleUploadPack)
}
func postRPCHandler(a *api.API, name string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) http.Handler {
func postRPCHandler(a *api.API, name string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) error) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) {
var writtenIn int64
var err error
cr := &countReadCloser{ReadCloser: r.Body}
r.Body = cr
w := NewGitHttpResponseWriter(rw)
defer func() {
w.Log(r, writtenIn)
w.Log(r, cr.Count())
}()
writtenIn, err = handler(w, r, ar)
if err != nil {
helper.LogError(r, fmt.Errorf("%s: %v", name, err))
if err := handler(w, r, ar); err != nil {
helper.Fail500(w, r, fmt.Errorf("%s: %v", name, err))
}
})
}
......@@ -70,46 +70,25 @@ func repoPreAuthorizeHandler(myAPI *api.API, handleFunc api.HandleFunc) http.Han
}, "")
}
func setupGitCommand(action string, a *api.Response, options ...string) (cmd *exec.Cmd, stdin io.WriteCloser, stdout io.ReadCloser, err error) {
// Don't leak pipes when we return early after an error
defer func() {
if err == nil {
return
}
if stdin != nil {
stdin.Close()
}
if stdout != nil {
stdout.Close()
}
}()
func startGitCommand(a *api.Response, stdin io.Reader, stdout io.Writer, action string, options ...string) (cmd *exec.Cmd, err error) {
// Prepare our Git subprocess
args := []string{subCommand(action), "--stateless-rpc"}
args = append(args, options...)
args = append(args, a.RepoPath)
cmd = gitCommand(a.GL_ID, "git", args...)
stdout, err = cmd.StdoutPipe()
if err != nil {
return nil, nil, nil, fmt.Errorf("stdout pipe: %v", err)
}
stdin, err = cmd.StdinPipe()
if err != nil {
return nil, nil, nil, fmt.Errorf("stdin pipe: %v", err)
}
cmd.Stdin = stdin
cmd.Stdout = stdout
if err = cmd.Start(); err != nil {
return nil, nil, nil, fmt.Errorf("start %v: %v", cmd.Args, err)
return nil, fmt.Errorf("start %v: %v", cmd.Args, err)
}
return cmd, stdin, stdout, nil
return cmd, nil
}
func writePostRPCHeader(w http.ResponseWriter, action string) {
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action))
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
}
func getService(r *http.Request) string {
......@@ -127,3 +106,25 @@ func isExitError(err error) bool {
func subCommand(rpc string) string {
return strings.TrimPrefix(rpc, "git-")
}
type countReadCloser struct {
n int64
io.ReadCloser
sync.Mutex
}
func (c *countReadCloser) Read(p []byte) (n int, err error) {
n, err = c.ReadCloser.Read(p)
c.Lock()
defer c.Unlock()
c.n += int64(n)
return n, err
}
func (c *countReadCloser) Count() int64 {
c.Lock()
defer c.Unlock()
return c.n
}
......@@ -39,7 +39,7 @@ func TestHandleReceivePack(t *testing.T) {
testHandlePostRpc(t, "git-receive-pack", handleReceivePack)
}
func testHandlePostRpc(t *testing.T, action string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) {
func testHandlePostRpc(t *testing.T, action string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) error) {
execCommand = fakeExecCommand
defer func() { execCommand = exec.Command }()
......
......@@ -2,7 +2,6 @@ package git
import (
"fmt"
"io"
"net/http"
"path"
......@@ -45,36 +44,31 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
return
}
cmd, stdin, stdout, err := setupGitCommand(rpc, a, "--advertise-refs")
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handleGetInfoRefs: setupGitCommand: %v", err))
return
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdin.Close() // Not needed for this request
defer stdout.Close()
// Start writing the response
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-advertisement", rpc))
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
if err := writeBody(w, a, rpc); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: %v", err))
}
}
func writeBody(w http.ResponseWriter, a *api.Response, rpc string) error {
if err := pktLine(w, fmt.Sprintf("# service=%s\n", rpc)); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktLine: %v", err))
return
return fmt.Errorf("pktLine: %v", err)
}
if err := pktFlush(w); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktFlush: %v", err))
return
return fmt.Errorf("pktFlush: %v", err)
}
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handleGetInfoRefs: copy output of %v: %v", cmd.Args, err)},
)
return
cmd, err := startGitCommand(a, nil, w, rpc, "--advertise-refs")
if err != nil {
return fmt.Errorf("startGitCommand: %v", err)
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: wait for %v: %v", cmd.Args, err))
return
return fmt.Errorf("wait for %v: %v", cmd.Args, err)
}
return nil
}
......@@ -2,53 +2,29 @@ package git
import (
"fmt"
"io"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
// Will not return a non-nil error after the response body has been
// written to.
func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) error {
action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
}
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
writtenIn, err = io.Copy(stdin, r.Body)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
writePostRPCHeader(w, action)
// This io.Copy may take a long time, both for Git push and pull.
_, err = io.Copy(w, stdout)
cmd, err := startGitCommand(a, r.Body, w, action)
if err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)}
return fmt.Errorf("startGitCommand: %v", err)
}
defer helper.CleanUpProcessGroup(cmd)
err = cmd.Wait()
if err != nil {
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err)
if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("wait for %v: %v", cmd.Args, err))
// Return nil because the response body has been written to already.
return nil
}
return writtenIn, nil
return nil
}
......@@ -9,68 +9,38 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
// Will not return a non-nil error after the response body has been
// written to.
func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) error {
// The body will consist almost entirely of 'have XXX' and 'want XXX'
// lines; these are about 50 bytes long. With a limit of 10MB the client
// can send over 200,000 have/want lines.
buffer, err := helper.ReadAllTempfile(io.LimitReader(r.Body, 10*1024*1024))
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("ReadAllTempfile: %v", err)
return fmt.Errorf("ReadAllTempfile: %v", err)
}
defer buffer.Close()
r.Body.Close()
isShallowClone := scanDeepen(buffer)
if _, err := buffer.Seek(0, 0); err != nil {
fail500(w)
return writtenIn, fmt.Errorf("seek tempfile: %v", err)
return fmt.Errorf("seek tempfile: %v", err)
}
action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a)
writePostRPCHeader(w, action)
cmd, err := startGitCommand(a, buffer, w, action)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
return fmt.Errorf("startGitCommand: %v", err)
}
defer helper.CleanUpProcessGroup(cmd)
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdoutError := make(chan error, 1)
go func() {
writePostRPCHeader(w, action)
// Start reading from stdout already to avoid blocking while writing to
// stdin below.
_, err := io.Copy(w, stdout)
// This error may be lost if some other error prevents us from <-ing on this channel.
stdoutError <- err
}()
// Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, buffer); err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
if err := <-stdoutError; err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)}
if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) {
helper.LogError(r, fmt.Errorf("wait for %v: %v", cmd.Args, err))
// Return nil because the response body has been written to already.
return nil
}
err = cmd.Wait()
if err != nil && !(isExitError(err) && isShallowClone) {
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err)
}
return writtenIn, nil
}
func fail500(w http.ResponseWriter) {
helper.Fail500(w, nil, nil)
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment