Commit 619d33bc authored by Ahmad Sherif's avatar Ahmad Sherif

Forward {upload,receive}-pack requests to Gitaly

Closes gitlab-org/gitaly#125
parent c290616d
...@@ -2,9 +2,11 @@ package git ...@@ -2,9 +2,11 @@ package git
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
...@@ -16,7 +18,19 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons ...@@ -16,7 +18,19 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons
cr, cw := helper.NewWriteAfterReader(r.Body, w) cr, cw := helper.NewWriteAfterReader(r.Body, w)
defer cw.Flush() defer cw.Flush()
cmd, err := startGitCommand(a, cr, cw, action)
var err error
if a.GitalySocketPath == "" {
err = handleReceivePackLocally(a, r, cr, cw, action)
} else {
err = handleReceivePackWithGitaly(a, cr, cw)
}
return err
}
func handleReceivePackLocally(a *api.Response, r *http.Request, stdin io.Reader, stdout io.Writer, action string) error {
cmd, err := startGitCommand(a, stdin, stdout, action)
if err != nil { if err != nil {
return fmt.Errorf("startGitCommand: %v", err) return fmt.Errorf("startGitCommand: %v", err)
} }
...@@ -30,3 +44,16 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons ...@@ -30,3 +44,16 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons
return nil return nil
} }
func handleReceivePackWithGitaly(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
smarthttp, err := gitaly.NewSmartHTTPClient(a.GitalySocketPath)
if err != nil {
return fmt.Errorf("smarthttp.ReceivePack: %v", err)
}
if err := smarthttp.ReceivePack(a, clientRequest, clientResponse); err != nil {
return fmt.Errorf("smarthttp.ReceivePack: %v", err)
}
return nil
}
...@@ -4,8 +4,10 @@ import ( ...@@ -4,8 +4,10 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"os"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
...@@ -22,15 +24,25 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response ...@@ -22,15 +24,25 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response
defer buffer.Close() defer buffer.Close()
r.Body.Close() r.Body.Close()
isShallowClone := scanDeepen(buffer)
if _, err := buffer.Seek(0, 0); err != nil {
return fmt.Errorf("seek tempfile: %v", err)
}
action := getService(r) action := getService(r)
writePostRPCHeader(w, action) writePostRPCHeader(w, action)
cmd, err := startGitCommand(a, buffer, w, action) if a.GitalySocketPath == "" {
err = handleUploadPackLocally(a, r, buffer, w, action)
} else {
err = handleUploadPackWithGitaly(a, buffer, w)
}
return err
}
func handleUploadPackLocally(a *api.Response, r *http.Request, stdin *os.File, stdout io.Writer, action string) error {
isShallowClone := scanDeepen(stdin)
if _, err := stdin.Seek(0, 0); err != nil {
return fmt.Errorf("seek tempfile: %v", err)
}
cmd, err := startGitCommand(a, stdin, stdout, action)
if err != nil { if err != nil {
return fmt.Errorf("startGitCommand: %v", err) return fmt.Errorf("startGitCommand: %v", err)
} }
...@@ -44,3 +56,16 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response ...@@ -44,3 +56,16 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response
return nil return nil
} }
func handleUploadPackWithGitaly(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
smarthttp, err := gitaly.NewSmartHTTPClient(a.GitalySocketPath)
if err != nil {
return fmt.Errorf("smarthttp.UploadPack: %v", err)
}
if err := smarthttp.UploadPack(a, clientRequest, clientResponse); err != nil {
return fmt.Errorf("smarthttp.UploadPack: %v", err)
}
return nil
}
...@@ -4,15 +4,29 @@ import ( ...@@ -4,15 +4,29 @@ import (
"fmt" "fmt"
"io" "io"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
pb "gitlab.com/gitlab-org/gitaly-proto/go" pb "gitlab.com/gitlab-org/gitaly-proto/go"
pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type SmartHTTPClient struct { type SmartHTTPClient struct {
pb.SmartHTTPClient pb.SmartHTTPClient
} }
type uploadPackWriter struct {
pb.SmartHTTP_PostUploadPackClient
}
type receivePackWriter struct {
pb.SmartHTTP_PostReceivePackClient
}
const sendChunkSize = 16384
func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc string) (io.WriterTo, error) { func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc string) (io.WriterTo, error) {
rpcRequest := &pb.InfoRefsRequest{Repository: repo} rpcRequest := &pb.InfoRefsRequest{Repository: repo}
var c pbhelper.InfoRefsClient var c pbhelper.InfoRefsClient
...@@ -33,3 +47,112 @@ func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc ...@@ -33,3 +47,112 @@ func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc
return &pbhelper.InfoRefsClientWriterTo{c}, nil return &pbhelper.InfoRefsClientWriterTo{c}, nil
} }
func (client *SmartHTTPClient) ReceivePack(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
repo := &pb.Repository{Path: a.RepoPath}
stream, err := client.PostReceivePack(context.Background())
if err != nil {
return err
}
rpcRequest := &pb.PostReceivePackRequest{
Repository: repo,
GlId: a.GL_ID,
}
if err := stream.Send(rpcRequest); err != nil {
return fmt.Errorf("initial request: %v", err)
}
waitc := make(chan error, 1)
go receiveGitalyResponse(stream, waitc, clientResponse, func() ([]byte, error) {
response, err := stream.Recv()
return response.GetData(), err
})
_, sendErr := io.Copy(receivePackWriter{stream}, clientRequest)
stream.CloseSend()
if recvErr := <-waitc; recvErr != nil {
return recvErr
}
if sendErr != nil {
return fmt.Errorf("send: %v", sendErr)
}
return nil
}
func (client *SmartHTTPClient) UploadPack(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
repo := &pb.Repository{Path: a.RepoPath}
stream, err := client.PostUploadPack(context.Background())
if err != nil {
return err
}
rpcRequest := &pb.PostUploadPackRequest{
Repository: repo,
}
if err := stream.Send(rpcRequest); err != nil {
return fmt.Errorf("initial request: %v", err)
}
waitc := make(chan error, 1)
go receiveGitalyResponse(stream, waitc, clientResponse, func() ([]byte, error) {
response, err := stream.Recv()
return response.GetData(), err
})
_, sendErr := io.Copy(uploadPackWriter{stream}, clientRequest)
stream.CloseSend()
if recvErr := <-waitc; recvErr != nil {
return recvErr
}
if sendErr != nil {
return fmt.Errorf("send: %v", sendErr)
}
return nil
}
func receiveGitalyResponse(cs grpc.ClientStream, waitc chan error, clientResponse io.Writer, receiver func() ([]byte, error)) {
defer func() {
close(waitc)
cs.CloseSend()
}()
for {
data, err := receiver()
if err != nil {
if err != io.EOF {
waitc <- fmt.Errorf("receive: %v", err)
}
return
}
if _, err := clientResponse.Write(data); err != nil {
waitc <- fmt.Errorf("write: %v", err)
return
}
}
}
func (rw uploadPackWriter) Write(p []byte) (int, error) {
resp := &pb.PostUploadPackRequest{Data: p}
if err := rw.Send(resp); err != nil {
return 0, err
}
return len(p), nil
}
func (rw receivePackWriter) Write(p []byte) (int, error) {
resp := &pb.PostReceivePackRequest{Data: p}
if err := rw.Send(resp); err != nil {
return 0, err
}
return len(p), nil
}
package testhelper package testhelper
import ( import (
"io"
"io/ioutil"
"log"
"path"
pb "gitlab.com/gitlab-org/gitaly-proto/go" pb "gitlab.com/gitlab-org/gitaly-proto/go"
) )
...@@ -8,6 +13,19 @@ type GitalyTestServer struct{} ...@@ -8,6 +13,19 @@ type GitalyTestServer struct{}
const GitalyInfoRefsResponseMock = "Mock Gitaly InfoRefsResponse data" const GitalyInfoRefsResponseMock = "Mock Gitaly InfoRefsResponse data"
var GitalyReceivePackResponseMock []byte
var GitalyUploadPackResponseMock []byte
func init() {
var err error
if GitalyReceivePackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/receive-pack-fixture.txt")); err != nil {
log.Fatal(err)
}
if GitalyUploadPackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/upload-pack-fixture.txt")); err != nil {
log.Fatal(err)
}
}
func NewGitalyServer() *GitalyTestServer { func NewGitalyServer() *GitalyTestServer {
return &GitalyTestServer{} return &GitalyTestServer{}
} }
...@@ -26,6 +44,70 @@ func (s *GitalyTestServer) InfoRefsReceivePack(in *pb.InfoRefsRequest, stream pb ...@@ -26,6 +44,70 @@ func (s *GitalyTestServer) InfoRefsReceivePack(in *pb.InfoRefsRequest, stream pb
return stream.Send(response) return stream.Send(response)
} }
// TODO replace these empty implementations func (s *GitalyTestServer) PostReceivePack(stream pb.SmartHTTP_PostReceivePackServer) error {
func (*GitalyTestServer) PostUploadPack(pb.SmartHTTP_PostUploadPackServer) error { return nil } req, err := stream.Recv()
func (*GitalyTestServer) PostReceivePack(pb.SmartHTTP_PostReceivePackServer) error { return nil } if err != nil {
return err
}
response := &pb.PostReceivePackResponse{
Data: []byte(req.Repository.GetPath() + req.GlId),
}
if err := stream.Send(response); err != nil {
return err
}
// The body of the request starts in the second message
for {
req, err := stream.Recv()
if err != nil {
if err != io.EOF {
return err
}
break
}
response := &pb.PostReceivePackResponse{
Data: req.GetData(),
}
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
func (s *GitalyTestServer) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
response := &pb.PostUploadPackResponse{
Data: []byte(req.Repository.GetPath()),
}
if err := stream.Send(response); err != nil {
return err
}
// The body of the request starts in the second message
for {
req, err := stream.Recv()
if err != nil {
if err != io.EOF {
return err
}
break
}
response := &pb.PostUploadPackResponse{
Data: req.GetData(),
}
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
...@@ -28,6 +28,8 @@ import ( ...@@ -28,6 +28,8 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream"
pb "gitlab.com/gitlab-org/gitaly-proto/go" pb "gitlab.com/gitlab-org/gitaly-proto/go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
...@@ -645,6 +647,80 @@ func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) { ...@@ -645,6 +647,80 @@ func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
} }
} }
func TestPostReceivePackProxiedToGitalySuccessfully(t *testing.T) {
apiResponse := gitOkBody(t)
gitalyServer, socketPath := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
resp, err := http.Post(
ws.URL+resource,
"application/x-git-receive-pack-request",
bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-git-receive-pack-result")
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if string(responseBody) != apiResponse.RepoPath+apiResponse.GL_ID+string(testhelper.GitalyReceivePackResponseMock) {
t.Errorf("GET %q: Unexpected response", resource)
}
}
func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
apiResponse := gitOkBody(t)
gitalyServer, socketPath := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
resp, err := http.Post(
ws.URL+resource,
"application/x-git-upload-pack-request",
bytes.NewReader(testhelper.GitalyUploadPackResponseMock),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result")
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if string(responseBody) != apiResponse.RepoPath+string(testhelper.GitalyUploadPackResponseMock) {
t.Errorf("GET %q: Unexpected response", resource)
}
}
func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) { func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer, _ := startGitalyServer(t) gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop() defer gitalyServer.Stop()
...@@ -678,6 +754,68 @@ func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) { ...@@ -678,6 +754,68 @@ func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
} }
} }
func TestPostReceivePackHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse := gitOkBody(t)
apiResponse.GitalySocketPath = ""
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
payload := []byte("This payload should not reach Gitaly")
resp, err := http.Post(ws.URL+resource, "application/x-git-receive-pack-request", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if bytes.Contains(responseBody, payload) {
t.Errorf("GET %q: request should not have been proxied to Gitaly", resource)
}
}
func TestPostUploadPackHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse := gitOkBody(t)
apiResponse.GitalySocketPath = ""
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
payload := []byte("This payload should not reach Gitaly")
resp, err := http.Post(ws.URL+resource, "application/x-git-upload-pack-request", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if bytes.Contains(responseBody, payload) {
t.Errorf("GET %q: request should not have been proxied to Gitaly", resource)
}
}
func TestAPIFalsePositivesAreProxied(t *testing.T) { func TestAPIFalsePositivesAreProxied(t *testing.T) {
goodResponse := []byte(`<html></html>`) goodResponse := []byte(`<html></html>`)
ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) { ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) {
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment