697 lines
22 KiB
Go
697 lines
22 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"encoding/base64"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"io/ioutil"
|
||
|
"math/rand"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"os/exec"
|
||
|
"path"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
|
||
|
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
|
||
|
"github.com/stretchr/testify/require"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/codes"
|
||
|
|
||
|
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
|
||
|
|
||
|
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
|
||
|
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
|
||
|
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
|
||
|
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
|
||
|
)
|
||
|
|
||
|
func TestFailedCloneNoGitaly(t *testing.T) {
|
||
|
// Prepare clone directory
|
||
|
require.NoError(t, os.RemoveAll(scratchDir))
|
||
|
|
||
|
authBody := &api.Response{
|
||
|
GL_ID: "user-123",
|
||
|
GL_USERNAME: "username",
|
||
|
// This will create a failure to connect to Gitaly
|
||
|
GitalyServer: gitaly.Server{Address: "unix:/nonexistent"},
|
||
|
}
|
||
|
|
||
|
// Prepare test server and backend
|
||
|
ts := testAuthServer(t, nil, nil, 200, authBody)
|
||
|
defer ts.Close()
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
// Do the git clone
|
||
|
cloneCmd := exec.Command("git", "clone", fmt.Sprintf("%s/%s", ws.URL, testRepo), checkoutDir)
|
||
|
out, err := cloneCmd.CombinedOutput()
|
||
|
t.Log(string(out))
|
||
|
require.Error(t, err, "git clone should have failed")
|
||
|
}
|
||
|
|
||
|
func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
|
||
|
apiResponse := gitOkBody(t)
|
||
|
apiResponse.GitalyServer.Address = gitalyAddress
|
||
|
|
||
|
goodMetadata := map[string]string{
|
||
|
"gitaly-feature-foobar": "true",
|
||
|
"gitaly-feature-bazqux": "false",
|
||
|
}
|
||
|
badMetadata := map[string]string{
|
||
|
"bad-metadata": "is blocked",
|
||
|
}
|
||
|
|
||
|
features := make(map[string]string)
|
||
|
for k, v := range goodMetadata {
|
||
|
features[k] = v
|
||
|
}
|
||
|
for k, v := range badMetadata {
|
||
|
features[k] = v
|
||
|
}
|
||
|
apiResponse.GitalyServer.Features = features
|
||
|
|
||
|
testCases := []struct {
|
||
|
showAllRefs bool
|
||
|
gitRpc string
|
||
|
}{
|
||
|
{showAllRefs: false, gitRpc: "git-upload-pack"},
|
||
|
{showAllRefs: true, gitRpc: "git-upload-pack"},
|
||
|
{showAllRefs: false, gitRpc: "git-receive-pack"},
|
||
|
{showAllRefs: true, gitRpc: "git-receive-pack"},
|
||
|
}
|
||
|
|
||
|
for _, tc := range testCases {
|
||
|
t.Run(fmt.Sprintf("ShowAllRefs=%v,gitRpc=%v", tc.showAllRefs, tc.gitRpc), func(t *testing.T) {
|
||
|
apiResponse.ShowAllRefs = tc.showAllRefs
|
||
|
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
gitProtocol := "fake git protocol"
|
||
|
resource := "/gitlab-org/gitlab-test.git/info/refs?service=" + tc.gitRpc
|
||
|
resp, body := httpGet(t, ws.URL+resource, map[string]string{"Git-Protocol": gitProtocol})
|
||
|
|
||
|
require.Equal(t, 200, resp.StatusCode)
|
||
|
|
||
|
bodySplit := strings.SplitN(body, "\000", 3)
|
||
|
require.Len(t, bodySplit, 3)
|
||
|
|
||
|
gitalyRequest := &gitalypb.InfoRefsRequest{}
|
||
|
require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest))
|
||
|
|
||
|
require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)
|
||
|
if tc.showAllRefs {
|
||
|
require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions)
|
||
|
} else {
|
||
|
require.Empty(t, gitalyRequest.GitConfigOptions)
|
||
|
}
|
||
|
|
||
|
require.Equal(t, tc.gitRpc, bodySplit[1])
|
||
|
|
||
|
require.Equal(t, string(testhelper.GitalyInfoRefsResponseMock), bodySplit[2], "GET %q: response body", resource)
|
||
|
|
||
|
md := gitalyServer.LastIncomingMetadata
|
||
|
for k, v := range goodMetadata {
|
||
|
actual := md[k]
|
||
|
require.Len(t, actual, 1, "number of metadata values for %v", k)
|
||
|
require.Equal(t, v, actual[0], "value for %v", k)
|
||
|
}
|
||
|
|
||
|
for k := range badMetadata {
|
||
|
actual := md[k]
|
||
|
require.Empty(t, actual, "metadata for bad key %v", k)
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestGetInfoRefsProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
apiResponse := gitOkBody(t)
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
apiResponse.GitalyServer.Address = gitalyAddress
|
||
|
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
resource := "/gitlab-org/gitlab-test.git/info/refs?service=git-upload-pack"
|
||
|
resp, err := http.Get(ws.URL + resource)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func waitDone(t *testing.T, done chan struct{}) {
|
||
|
t.Helper()
|
||
|
select {
|
||
|
case <-done:
|
||
|
return
|
||
|
case <-time.After(10 * time.Second):
|
||
|
t.Fatal("time out waiting for gitaly handler to return")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestPostReceivePackProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
apiResponse := gitOkBody(t)
|
||
|
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
apiResponse.GitalyServer.Address = "unix:" + socketPath
|
||
|
apiResponse.GitConfigOptions = []string{"git-config-hello=world"}
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
gitProtocol := "fake Git protocol"
|
||
|
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
|
||
|
resp := httpPost(
|
||
|
t,
|
||
|
ws.URL+resource,
|
||
|
map[string]string{
|
||
|
"Content-Type": "application/x-git-receive-pack-request",
|
||
|
"Git-Protocol": gitProtocol,
|
||
|
},
|
||
|
bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
|
||
|
)
|
||
|
defer resp.Body.Close()
|
||
|
body := string(testhelper.ReadAll(t, resp.Body))
|
||
|
|
||
|
split := strings.SplitN(body, "\000", 2)
|
||
|
require.Len(t, split, 2)
|
||
|
|
||
|
gitalyRequest := &gitalypb.PostReceivePackRequest{}
|
||
|
require.NoError(t, jsonpb.UnmarshalString(split[0], gitalyRequest))
|
||
|
|
||
|
require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName)
|
||
|
require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath)
|
||
|
require.Equal(t, apiResponse.GL_ID, gitalyRequest.GlId)
|
||
|
require.Equal(t, apiResponse.GL_USERNAME, gitalyRequest.GlUsername)
|
||
|
require.Equal(t, apiResponse.GitConfigOptions, gitalyRequest.GitConfigOptions)
|
||
|
require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)
|
||
|
|
||
|
require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
|
||
|
require.Equal(t, string(testhelper.GitalyReceivePackResponseMock), split[1])
|
||
|
testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-receive-pack-result")
|
||
|
}
|
||
|
|
||
|
func TestPostReceivePackProxiedToGitalyInterrupted(t *testing.T) {
|
||
|
apiResponse := gitOkBody(t)
|
||
|
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
apiResponse.GitalyServer.Address = "unix:" + socketPath
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
|
||
|
resp, err := http.Post(
|
||
|
ws.URL+resource,
|
||
|
"application/x-git-receive-pack-request",
|
||
|
bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
|
||
|
)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
// ReaderFunc is an adapter to turn a conforming function into an io.Reader.
|
||
|
type ReaderFunc func(b []byte) (int, error)
|
||
|
|
||
|
func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) }
|
||
|
|
||
|
func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
for i, tc := range []struct {
|
||
|
showAllRefs bool
|
||
|
code codes.Code
|
||
|
}{
|
||
|
{true, codes.OK},
|
||
|
{true, codes.Unavailable},
|
||
|
{false, codes.OK},
|
||
|
{false, codes.Unavailable},
|
||
|
} {
|
||
|
t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) {
|
||
|
apiResponse := gitOkBody(t)
|
||
|
apiResponse.ShowAllRefs = tc.showAllRefs
|
||
|
|
||
|
gitalyServer, socketPath := startGitalyServer(t, tc.code)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
apiResponse.GitalyServer.Address = "unix:" + socketPath
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
gitProtocol := "fake git protocol"
|
||
|
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
|
||
|
|
||
|
requestReader := bytes.NewReader(testhelper.GitalyUploadPackResponseMock)
|
||
|
var m sync.Mutex
|
||
|
requestReadFinished := false
|
||
|
resp := httpPost(
|
||
|
t,
|
||
|
ws.URL+resource,
|
||
|
map[string]string{
|
||
|
"Content-Type": "application/x-git-upload-pack-request",
|
||
|
"Git-Protocol": gitProtocol,
|
||
|
},
|
||
|
ReaderFunc(func(b []byte) (int, error) {
|
||
|
n, err := requestReader.Read(b)
|
||
|
if err != nil {
|
||
|
m.Lock()
|
||
|
requestReadFinished = true
|
||
|
m.Unlock()
|
||
|
}
|
||
|
return n, err
|
||
|
}),
|
||
|
)
|
||
|
defer resp.Body.Close()
|
||
|
require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
|
||
|
testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result")
|
||
|
|
||
|
m.Lock()
|
||
|
requestFinished := requestReadFinished
|
||
|
m.Unlock()
|
||
|
require.True(t, requestFinished, "response written before request was fully read")
|
||
|
|
||
|
body := string(testhelper.ReadAll(t, resp.Body))
|
||
|
bodySplit := strings.SplitN(body, "\000", 2)
|
||
|
require.Len(t, bodySplit, 2)
|
||
|
|
||
|
gitalyRequest := &gitalypb.PostUploadPackRequest{}
|
||
|
require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest))
|
||
|
|
||
|
require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName)
|
||
|
require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath)
|
||
|
require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)
|
||
|
|
||
|
if tc.showAllRefs {
|
||
|
require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions)
|
||
|
} else {
|
||
|
require.Empty(t, gitalyRequest.GitConfigOptions)
|
||
|
}
|
||
|
|
||
|
require.Equal(t, string(testhelper.GitalyUploadPackResponseMock), bodySplit[1], "POST %q: response body", resource)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
|
||
|
apiResponse := gitOkBody(t)
|
||
|
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
apiResponse.GitalyServer.Address = "unix:" + socketPath
|
||
|
ts := testAuthServer(t, nil, nil, 200, apiResponse)
|
||
|
defer ts.Close()
|
||
|
|
||
|
ws := startWorkhorseServer(ts.URL)
|
||
|
defer ws.Close()
|
||
|
|
||
|
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
|
||
|
resp, err := http.Post(
|
||
|
ws.URL+resource,
|
||
|
"application/x-git-upload-pack-request",
|
||
|
bytes.NewReader(testhelper.GitalyUploadPackResponseMock),
|
||
|
)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func TestGetDiffProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
|
||
|
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
|
||
|
expectedBody := testhelper.GitalyGetDiffResponseMock
|
||
|
|
||
|
resp, body, err := doSendDataRequest("/something", "git-diff", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
|
||
|
require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
|
||
|
}
|
||
|
|
||
|
func TestGetPatchProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
|
||
|
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
|
||
|
expectedBody := testhelper.GitalyGetPatchResponseMock
|
||
|
|
||
|
resp, body, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
|
||
|
require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
|
||
|
}
|
||
|
|
||
|
func TestGetBlobProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GetBlobRequest":{"repository":{"storage_name":"%s","relative_path":"%s"},"oid":"%s","limit":-1}}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, oid)
|
||
|
|
||
|
resp, _, err := doSendDataRequest("/something", "git-blob", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func TestGetArchiveProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
archivePrefix := "repo-1"
|
||
|
expectedBody := testhelper.GitalyGetArchiveResponseMock
|
||
|
archiveLength := len(expectedBody)
|
||
|
|
||
|
testCases := []struct {
|
||
|
archivePath string
|
||
|
cacheDisabled bool
|
||
|
}{
|
||
|
{archivePath: path.Join(scratchDir, "my/path"), cacheDisabled: false},
|
||
|
{archivePath: "/var/empty/my/path", cacheDisabled: true},
|
||
|
}
|
||
|
|
||
|
for _, tc := range testCases {
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s","DisableCache":%v}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, tc.archivePath, archivePrefix, oid, tc.cacheDisabled)
|
||
|
resp, body, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
|
||
|
require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
|
||
|
require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL)
|
||
|
|
||
|
if tc.cacheDisabled {
|
||
|
_, err := os.Stat(tc.archivePath)
|
||
|
require.True(t, os.IsNotExist(err), "expected 'does not exist', got: %v", err)
|
||
|
} else {
|
||
|
cachedArchive, err := ioutil.ReadFile(tc.archivePath)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, expectedBody, string(cachedArchive))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestGetArchiveProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
archivePath := "my/path"
|
||
|
archivePrefix := "repo-1"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s"}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, path.Join(scratchDir, archivePath), archivePrefix, oid)
|
||
|
|
||
|
resp, _, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func TestGetDiffProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
|
||
|
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
|
||
|
|
||
|
resp, _, err := doSendDataRequest("/something", "git-diff", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func TestGetPatchProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
repoStorage := "default"
|
||
|
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
|
||
|
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
|
||
|
repoRelativePath := "foo/bar.git"
|
||
|
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
|
||
|
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
|
||
|
|
||
|
resp, _, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func TestGetSnapshotProxiedToGitalySuccessfully(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
expectedBody := testhelper.GitalyGetSnapshotResponseMock
|
||
|
archiveLength := len(expectedBody)
|
||
|
|
||
|
params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
|
||
|
resp, body, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
require.Equal(t, http.StatusOK, resp.StatusCode, "GET %q: status code", resp.Request.URL)
|
||
|
require.Equal(t, expectedBody, string(body), "GET %q: body", resp.Request.URL)
|
||
|
require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL)
|
||
|
|
||
|
testhelper.RequireResponseHeader(t, resp, "Content-Disposition", `attachment; filename="snapshot.tar"`)
|
||
|
testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-tar")
|
||
|
testhelper.RequireResponseHeader(t, resp, "Content-Transfer-Encoding", "binary")
|
||
|
testhelper.RequireResponseHeader(t, resp, "Cache-Control", "private")
|
||
|
}
|
||
|
|
||
|
func TestGetSnapshotProxiedToGitalyInterruptedStream(t *testing.T) {
|
||
|
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
|
||
|
defer gitalyServer.GracefulStop()
|
||
|
|
||
|
gitalyAddress := "unix:" + socketPath
|
||
|
|
||
|
params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
|
||
|
resp, _, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// This causes the server stream to be interrupted instead of consumed entirely.
|
||
|
resp.Body.Close()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
gitalyServer.WaitGroup.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
|
||
|
waitDone(t, done)
|
||
|
}
|
||
|
|
||
|
func buildGetSnapshotParams(gitalyAddress string, repo *gitalypb.Repository) string {
|
||
|
msg := serializedMessage("GetSnapshotRequest", &gitalypb.GetSnapshotRequest{Repository: repo})
|
||
|
return buildGitalyRPCParams(gitalyAddress, msg)
|
||
|
}
|
||
|
|
||
|
type rpcArg struct {
|
||
|
k string
|
||
|
v interface{}
|
||
|
}
|
||
|
|
||
|
// Gitlab asks workhorse to perform some long-running RPCs for it by sending
|
||
|
// the RPC arguments (which are protobuf messages) in HTTP response headers.
|
||
|
// The messages are encoded to JSON objects using pbjson, The strings are then
|
||
|
// re-encoded to JSON strings using json. We must replicate this behaviour here
|
||
|
func buildGitalyRPCParams(gitalyAddress string, rpcArgs ...rpcArg) string {
|
||
|
built := map[string]interface{}{
|
||
|
"GitalyServer": map[string]string{
|
||
|
"Address": gitalyAddress,
|
||
|
"Token": "",
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for _, arg := range rpcArgs {
|
||
|
built[arg.k] = arg.v
|
||
|
}
|
||
|
|
||
|
b, err := json.Marshal(interface{}(built))
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
return string(b)
|
||
|
}
|
||
|
|
||
|
func buildPbRepo(storageName, relativePath string) *gitalypb.Repository {
|
||
|
return &gitalypb.Repository{
|
||
|
StorageName: storageName,
|
||
|
RelativePath: relativePath,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func serializedMessage(name string, arg proto.Message) rpcArg {
|
||
|
m := &jsonpb.Marshaler{}
|
||
|
str, err := m.MarshalToString(arg)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
return rpcArg{name, str}
|
||
|
}
|
||
|
|
||
|
func serializedProtoMessage(name string, arg proto.Message) rpcArg {
|
||
|
msg, err := proto.Marshal(arg)
|
||
|
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
return rpcArg{name, base64.URLEncoding.EncodeToString(msg)}
|
||
|
}
|
||
|
|
||
|
type combinedServer struct {
|
||
|
*grpc.Server
|
||
|
*testhelper.GitalyTestServer
|
||
|
}
|
||
|
|
||
|
func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) {
|
||
|
socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int()))
|
||
|
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
server := grpc.NewServer()
|
||
|
listener, err := net.Listen("unix", socketPath)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
gitalyServer := testhelper.NewGitalyServer(finalMessageCode)
|
||
|
gitalypb.RegisterSmartHTTPServiceServer(server, gitalyServer)
|
||
|
gitalypb.RegisterBlobServiceServer(server, gitalyServer)
|
||
|
gitalypb.RegisterRepositoryServiceServer(server, gitalyServer)
|
||
|
gitalypb.RegisterDiffServiceServer(server, gitalyServer)
|
||
|
|
||
|
go server.Serve(listener)
|
||
|
|
||
|
return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
|
||
|
}
|