gitlab-org--gitlab-foss/workhorse/internal/objectstore/uploader.go

116 lines
2.8 KiB
Go

package objectstore
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"hash"
"io"
"strings"
"time"
"gitlab.com/gitlab-org/labkit/log"
)
// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy.
type uploader struct {
strategy uploadStrategy
// In the case of S3 uploads, we have a multipart upload which
// instantiates uploads for the individual parts. We don't want to
// increment metrics for the individual parts, so that is why we have
// this boolean flag.
metrics bool
// With S3 we compare the MD5 of the data we sent with the ETag returned
// by the object storage server.
checkETag bool
}
func newUploader(strategy uploadStrategy) *uploader {
return &uploader{strategy: strategy, metrics: true}
}
func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
return &uploader{strategy: strategy, metrics: metrics, checkETag: true}
}
func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
// Consume reads the reader until it reaches EOF or an error. It spawns a
// goroutine that waits for outerCtx to be done, after which the remote
// file is deleted. The deadline applies to the upload performed inside
// Consume, not to outerCtx.
func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
if u.metrics {
objectStorageUploadsOpen.Inc()
defer func(started time.Time) {
objectStorageUploadsOpen.Dec()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
}
}(time.Now())
}
defer func() {
// We do this mainly to abort S3 multipart uploads: it is not enough to
// "delete" them.
if err != nil {
u.strategy.Abort()
}
}()
go func() {
// Once gitlab-rails is done handling the request, we are supposed to
// delete the upload from its temporary location.
<-outerCtx.Done()
u.strategy.Delete()
}()
uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
defer cancelFn()
var hasher hash.Hash
if u.checkETag {
hasher = md5.New()
reader = io.TeeReader(reader, hasher)
}
cr := &countReader{r: reader}
if err := u.strategy.Upload(uploadCtx, cr); err != nil {
return cr.n, err
}
if u.checkETag {
if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil {
log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum")
return cr.n, err
}
}
objectStorageUploadBytes.Add(float64(cr.n))
return cr.n, nil
}
func compareMD5(local, remote string) error {
if !strings.EqualFold(local, remote) {
return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
}
return nil
}
type countReader struct {
r io.Reader
n int64
}
func (cr *countReader) Read(p []byte) (int, error) {
nRead, err := cr.r.Read(p)
cr.n += int64(nRead)
return nRead, err
}