gitlab-org--gitlab-foss/workhorse/internal/objectstore/test/objectstore_stub.go

279 lines
6.6 KiB
Go

package test
import (
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore"
)
type partsEtagMap map[int]string
// ObjectstoreStub is a testing implementation of ObjectStore.
// Instead of storing objects it will just save md5sum.
type ObjectstoreStub struct {
// bucket contains md5sum of uploaded objects
bucket map[string]string
// overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
overwriteMD5 map[string]string
// multipart is a map of MultipartUploads
multipart map[string]partsEtagMap
// HTTP header sent along request
headers map[string]*http.Header
puts int
deletes int
m sync.Mutex
}
// StartObjectStore will start an ObjectStore stub
func StartObjectStore() (*ObjectstoreStub, *httptest.Server) {
return StartObjectStoreWithCustomMD5(make(map[string]string))
}
// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject
func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) {
os := &ObjectstoreStub{
bucket: make(map[string]string),
multipart: make(map[string]partsEtagMap),
overwriteMD5: make(map[string]string),
headers: make(map[string]*http.Header),
}
for k, v := range md5Hashes {
os.overwriteMD5[k] = v
}
return os, httptest.NewServer(os)
}
// PutsCnt counts PutObject invocations
func (o *ObjectstoreStub) PutsCnt() int {
o.m.Lock()
defer o.m.Unlock()
return o.puts
}
// DeletesCnt counts DeleteObject invocation of a valid object
func (o *ObjectstoreStub) DeletesCnt() int {
o.m.Lock()
defer o.m.Unlock()
return o.deletes
}
// GetObjectMD5 return the calculated MD5 of the object uploaded to path
// it will return an empty string if no object has been uploaded on such path
func (o *ObjectstoreStub) GetObjectMD5(path string) string {
o.m.Lock()
defer o.m.Unlock()
return o.bucket[path]
}
// GetHeader returns a given HTTP header of the object uploaded to the path
func (o *ObjectstoreStub) GetHeader(path, key string) string {
o.m.Lock()
defer o.m.Unlock()
if val, ok := o.headers[path]; ok {
return val.Get(key)
}
return ""
}
// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
// It will return an error if a MultipartUpload is already in progress on that path
// InitiateMultipartUpload is only used during test setup.
// Workhorse's production code does not know how to initiate a multipart upload.
//
// Real S3 multipart uploads are more complicated than what we do here,
// but this is enough to verify that workhorse's production code behaves as intended.
func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error {
o.m.Lock()
defer o.m.Unlock()
if o.multipart[path] != nil {
return fmt.Errorf("MultipartUpload for %q already in progress", path)
}
o.multipart[path] = make(partsEtagMap)
return nil
}
// IsMultipartUpload check if the given path has a MultipartUpload in progress
func (o *ObjectstoreStub) IsMultipartUpload(path string) bool {
o.m.Lock()
defer o.m.Unlock()
return o.isMultipartUpload(path)
}
// isMultipartUpload is the lock free version of IsMultipartUpload
func (o *ObjectstoreStub) isMultipartUpload(path string) bool {
return o.multipart[path] != nil
}
func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
if o.isMultipartUpload(objectPath) {
o.deletes++
delete(o.multipart, objectPath)
w.WriteHeader(200)
} else if _, ok := o.bucket[objectPath]; ok {
o.deletes++
delete(o.bucket, objectPath)
w.WriteHeader(200)
} else {
w.WriteHeader(404)
}
}
func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten {
hasher := md5.New()
io.Copy(hasher, r.Body)
checksum := hasher.Sum(nil)
etag = hex.EncodeToString(checksum)
}
o.headers[objectPath] = &r.Header
o.puts++
if o.isMultipartUpload(objectPath) {
pNumber := r.URL.Query().Get("partNumber")
idx, err := strconv.Atoi(pNumber)
if err != nil {
http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400)
return
}
o.multipart[objectPath][idx] = etag
} else {
o.bucket[objectPath] = etag
}
w.Header().Set("ETag", etag)
w.WriteHeader(200)
}
func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError {
return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
}
func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
multipart := o.multipart[objectPath]
if multipart == nil {
http.Error(w, "Unknown MultipartUpload", 404)
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
var msg objectstore.CompleteMultipartUpload
err = xml.Unmarshal(buf, &msg)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
for _, part := range msg.Part {
etag := multipart[part.PartNumber]
if etag != part.ETag {
msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag)
http.Error(w, msg, 400)
return
}
}
etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten {
etag = "CompleteMultipartUploadETag"
}
o.bucket[objectPath] = etag
delete(o.multipart, objectPath)
w.Header().Set("ETag", etag)
split := strings.SplitN(objectPath[1:], "/", 2)
if len(split) < 2 {
encodeXMLAnswer(w, MultipartUploadInternalError())
return
}
bucket := split[0]
key := split[1]
answer := objectstore.CompleteMultipartUploadResult{
Location: r.URL.String(),
Bucket: bucket,
Key: key,
ETag: etag,
}
encodeXMLAnswer(w, answer)
}
func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) {
w.Header().Set("Content-Type", "text/xml")
enc := xml.NewEncoder(w)
if err := enc.Encode(answer); err != nil {
http.Error(w, err.Error(), 500)
}
}
func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
fmt.Println("ObjectStore Stub:", r.Method, r.URL.String())
if r.URL.Path == "" {
http.Error(w, "No path provided", 404)
return
}
switch r.Method {
case "DELETE":
o.removeObject(w, r)
case "PUT":
o.putObject(w, r)
case "POST":
o.completeMultipartUpload(w, r)
default:
w.WriteHeader(404)
}
}