feature/451-minio-multiparts-fixes #454
7 changed files with 184 additions and 16 deletions
api
handler
layer
middleware
cmd/s3-gw
|
@ -41,6 +41,7 @@ type (
|
||||||
RetryMaxAttempts() int
|
RetryMaxAttempts() int
|
||||||
RetryMaxBackoff() time.Duration
|
RetryMaxBackoff() time.Duration
|
||||||
RetryStrategy() RetryStrategy
|
RetryStrategy() RetryStrategy
|
||||||
|
Domains() []string
|
||||||
}
|
}
|
||||||
|
|
||||||
FrostFSID interface {
|
FrostFSID interface {
|
||||||
|
|
|
@ -72,6 +72,7 @@ type configMock struct {
|
||||||
defaultCopiesNumbers []uint32
|
defaultCopiesNumbers []uint32
|
||||||
bypassContentEncodingInChunks bool
|
bypassContentEncodingInChunks bool
|
||||||
md5Enabled bool
|
md5Enabled bool
|
||||||
|
domains []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
|
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
|
||||||
|
@ -135,6 +136,10 @@ func (c *configMock) RetryStrategy() RetryStrategy {
|
||||||
return RetryStrategyConstant
|
return RetryStrategyConstant
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configMock) Domains() []string {
|
||||||
|
return c.domains
|
||||||
|
}
|
||||||
|
|
||||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
@ -26,10 +28,11 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
CompleteMultipartUploadResponse struct {
|
CompleteMultipartUploadResponse struct {
|
||||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
||||||
Bucket string `xml:"Bucket"`
|
Bucket string `xml:"Bucket"`
|
||||||
Key string `xml:"Key"`
|
Key string `xml:"Key"`
|
||||||
ETag string `xml:"ETag"`
|
ETag string `xml:"ETag"`
|
||||||
|
Location string `xml:"Location"`
|
||||||
}
|
}
|
||||||
|
|
||||||
ListMultipartUploadsResponse struct {
|
ListMultipartUploadsResponse struct {
|
||||||
|
@ -54,11 +57,11 @@ type (
|
||||||
Initiator Initiator `xml:"Initiator"`
|
Initiator Initiator `xml:"Initiator"`
|
||||||
IsTruncated bool `xml:"IsTruncated"`
|
IsTruncated bool `xml:"IsTruncated"`
|
||||||
Key string `xml:"Key"`
|
Key string `xml:"Key"`
|
||||||
MaxParts int `xml:"MaxParts,omitempty"`
|
MaxParts int `xml:"MaxParts"`
|
||||||
NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"`
|
NextPartNumberMarker int `xml:"NextPartNumberMarker"`
|
||||||
Owner Owner `xml:"Owner"`
|
Owner Owner `xml:"Owner"`
|
||||||
Parts []*layer.Part `xml:"Part"`
|
Parts []*layer.Part `xml:"Part"`
|
||||||
PartNumberMarker int `xml:"PartNumberMarker,omitempty"`
|
PartNumberMarker int `xml:"PartNumberMarker"`
|
||||||
StorageClass string `xml:"StorageClass"`
|
StorageClass string `xml:"StorageClass"`
|
||||||
UploadID string `xml:"UploadId"`
|
UploadID string `xml:"UploadId"`
|
||||||
}
|
}
|
||||||
|
@ -423,9 +426,10 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
}
|
}
|
||||||
|
|
||||||
response := CompleteMultipartUploadResponse{
|
response := CompleteMultipartUploadResponse{
|
||||||
Bucket: objInfo.Bucket,
|
Bucket: objInfo.Bucket,
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
||||||
|
Location: getObjectLocation(r, h.cfg.Domains(), reqInfo.BucketName, reqInfo.ObjectName),
|
||||||
}
|
}
|
||||||
|
|
||||||
if settings.VersioningEnabled() {
|
if settings.VersioningEnabled() {
|
||||||
|
@ -437,6 +441,35 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// returns "https" if the tls boolean is true, "http" otherwise.
|
||||||
|
func getURLScheme(r *http.Request) string {
|
||||||
|
if r.TLS != nil {
|
||||||
|
return "https"
|
||||||
|
}
|
||||||
|
return "http"
|
||||||
|
}
|
||||||
|
|
||||||
|
// getObjectLocation gets the fully qualified URL of an object.
|
||||||
|
func getObjectLocation(r *http.Request, domains []string, bucket, object string) string {
|
||||||
|
proto := middleware.GetSourceScheme(r)
|
||||||
|
if proto == "" {
|
||||||
|
proto = getURLScheme(r)
|
||||||
|
}
|
||||||
|
u := &url.URL{
|
||||||
|
Host: r.Host,
|
||||||
|
Path: path.Join("/", bucket, object),
|
||||||
|
Scheme: proto,
|
||||||
|
}
|
||||||
|
// If domain is set then we need to use bucket DNS style.
|
||||||
|
for _, domain := range domains {
|
||||||
|
if strings.HasPrefix(r.Host, bucket+"."+domain) {
|
||||||
|
u.Path = path.Join("/", object)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return u.String()
|
||||||
|
}
|
||||||
|
|
||||||
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
|
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
||||||
|
|
|
@ -292,13 +292,19 @@ func TestListParts(t *testing.T) {
|
||||||
require.Len(t, list.Parts, 2)
|
require.Len(t, list.Parts, 2)
|
||||||
require.Equal(t, etag1, list.Parts[0].ETag)
|
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||||
require.Equal(t, etag2, list.Parts[1].ETag)
|
require.Equal(t, etag2, list.Parts[1].ETag)
|
||||||
|
require.Zero(t, list.PartNumberMarker)
|
||||||
|
require.Equal(t, 2, list.NextPartNumberMarker)
|
||||||
|
|
||||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "1", http.StatusOK)
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "1", http.StatusOK)
|
||||||
require.Len(t, list.Parts, 1)
|
require.Len(t, list.Parts, 1)
|
||||||
require.Equal(t, etag2, list.Parts[0].ETag)
|
require.Equal(t, etag2, list.Parts[0].ETag)
|
||||||
|
require.Equal(t, 1, list.PartNumberMarker)
|
||||||
|
require.Equal(t, 2, list.NextPartNumberMarker)
|
||||||
|
|
||||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "2", http.StatusOK)
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "2", http.StatusOK)
|
||||||
require.Len(t, list.Parts, 0)
|
require.Len(t, list.Parts, 0)
|
||||||
|
require.Equal(t, 2, list.PartNumberMarker)
|
||||||
|
require.Equal(t, 0, list.NextPartNumberMarker)
|
||||||
|
|
||||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "7", http.StatusOK)
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "7", http.StatusOK)
|
||||||
require.Len(t, list.Parts, 0)
|
require.Len(t, list.Parts, 0)
|
||||||
|
@ -435,6 +441,80 @@ func TestUploadPartCheckContentSHA256(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartObjectLocation(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
req *http.Request
|
||||||
|
bucket string
|
||||||
|
object string
|
||||||
|
domains []string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
req: &http.Request{
|
||||||
|
Host: "127.0.0.1:8084",
|
||||||
|
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
|
||||||
|
},
|
||||||
|
bucket: "testbucket1",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "http://127.0.0.1:8084/testbucket1/test/1.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{
|
||||||
|
Host: "localhost:8084",
|
||||||
|
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
|
||||||
|
},
|
||||||
|
bucket: "testbucket1",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "https://localhost:8084/testbucket1/test/1.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{
|
||||||
|
Host: "s3.mybucket.org",
|
||||||
|
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
|
||||||
|
},
|
||||||
|
bucket: "mybucket",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "http://s3.mybucket.org/mybucket/test/1.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{Host: "mys3.mybucket.org"},
|
||||||
|
bucket: "mybucket",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "http://mys3.mybucket.org/mybucket/test/1.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{Host: "s3.bucket.org", TLS: &tls.ConnectionState{}},
|
||||||
|
bucket: "bucket",
|
||||||
|
object: "obj",
|
||||||
|
expected: "https://s3.bucket.org/bucket/obj",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{
|
||||||
|
Host: "mybucket.s3dev.frostfs.devenv",
|
||||||
|
},
|
||||||
|
domains: []string{"s3dev.frostfs.devenv"},
|
||||||
|
bucket: "mybucket",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "http://mybucket.s3dev.frostfs.devenv/test/1.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: &http.Request{
|
||||||
|
Host: "mybucket.s3dev.frostfs.devenv",
|
||||||
|
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
|
||||||
|
},
|
||||||
|
domains: []string{"s3dev.frostfs.devenv"},
|
||||||
|
bucket: "mybucket",
|
||||||
|
object: "test/1.txt",
|
||||||
|
expected: "https://mybucket.s3dev.frostfs.devenv/test/1.txt",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
location := getObjectLocation(tc.req, tc.domains, tc.bucket, tc.object)
|
||||||
|
require.Equal(t, tc.expected, location)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||||
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||||
}
|
}
|
||||||
|
|
|
@ -603,10 +603,10 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
||||||
|
|
||||||
if len(parts) > p.MaxParts {
|
if len(parts) > p.MaxParts {
|
||||||
res.IsTruncated = true
|
res.IsTruncated = true
|
||||||
res.NextPartNumberMarker = parts[p.MaxParts-1].PartNumber
|
|
||||||
parts = parts[:p.MaxParts]
|
parts = parts[:p.MaxParts]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res.NextPartNumberMarker = parts[len(parts)-1].PartNumber
|
||||||
res.Parts = parts
|
res.Parts = parts
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
|
|
|
@ -69,8 +69,10 @@ var deploymentID = uuid.Must(uuid.NewRandom())
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// De-facto standard header keys.
|
// De-facto standard header keys.
|
||||||
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
|
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
|
||||||
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
|
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
|
||||||
|
xForwardedProto = http.CanonicalHeaderKey("X-Forwarded-Proto")
|
||||||
|
xForwardedScheme = http.CanonicalHeaderKey("X-Forwarded-Scheme")
|
||||||
|
|
||||||
// RFC7239 defines a new "Forwarded: " header designed to replace the
|
// RFC7239 defines a new "Forwarded: " header designed to replace the
|
||||||
// existing use of X-Forwarded-* headers.
|
// existing use of X-Forwarded-* headers.
|
||||||
|
@ -79,6 +81,9 @@ var (
|
||||||
// Allows for a sub-match of the first value after 'for=' to the next
|
// Allows for a sub-match of the first value after 'for=' to the next
|
||||||
// comma, semi-colon or space. The match is case-insensitive.
|
// comma, semi-colon or space. The match is case-insensitive.
|
||||||
forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`)
|
forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`)
|
||||||
|
// Allows for a sub-match for the first instance of scheme (http|https)
|
||||||
|
// prefixed by 'proto='. The match is case-insensitive.
|
||||||
|
protoRegex = regexp.MustCompile(`(?i)^(;|,| )+(?:proto=)(https|http)`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewReqInfo returns new ReqInfo based on parameters.
|
// NewReqInfo returns new ReqInfo based on parameters.
|
||||||
|
@ -291,3 +296,31 @@ func getSourceIP(r *http.Request) string {
|
||||||
}
|
}
|
||||||
return raddr
|
return raddr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239
|
||||||
|
// Forwarded headers (in that order).
|
||||||
|
func GetSourceScheme(r *http.Request) string {
|
||||||
|
var scheme string
|
||||||
|
|
||||||
|
// Retrieve the scheme from X-Forwarded-Proto.
|
||||||
|
if proto := r.Header.Get(xForwardedProto); proto != "" {
|
||||||
|
scheme = strings.ToLower(proto)
|
||||||
|
} else if proto = r.Header.Get(xForwardedScheme); proto != "" {
|
||||||
|
scheme = strings.ToLower(proto)
|
||||||
|
} else if proto := r.Header.Get(forwarded); proto != "" {
|
||||||
|
// match should contain at least two elements if the protocol was
|
||||||
|
// specified in the Forwarded header. The first element will always be
|
||||||
|
// the 'for=', which we ignore, subsequently we proceed to look for
|
||||||
|
// 'proto=' which should precede right after `for=` if not
|
||||||
|
// we simply ignore the values and return empty. This is in line
|
||||||
|
// with the approach we took for returning first ip from multiple
|
||||||
|
// params.
|
||||||
|
if match := forRegex.FindStringSubmatch(proto); len(match) > 1 {
|
||||||
|
if match = protoRegex.FindStringSubmatch(match[2]); len(match) > 1 {
|
||||||
|
scheme = strings.ToLower(match[2])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return scheme
|
||||||
|
}
|
||||||
|
|
|
@ -107,6 +107,7 @@ type (
|
||||||
retryMaxAttempts int
|
retryMaxAttempts int
|
||||||
retryMaxBackoff time.Duration
|
retryMaxBackoff time.Duration
|
||||||
retryStrategy handler.RetryStrategy
|
retryStrategy handler.RetryStrategy
|
||||||
|
domains []string
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -231,6 +232,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
||||||
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
||||||
s.setRetryStrategy(fetchRetryStrategy(v))
|
s.setRetryStrategy(fetchRetryStrategy(v))
|
||||||
|
s.setVHSSettings(v, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
||||||
|
@ -245,6 +247,15 @@ func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger)
|
||||||
s.namespaces = nsConfig.Namespaces
|
s.namespaces = nsConfig.Namespaces
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setVHSSettings(v *viper.Viper, _ *zap.Logger) {
|
||||||
|
domains := v.GetStringSlice(cfgListenDomains)
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.domains = domains
|
||||||
|
}
|
||||||
|
|
||||||
func (s *appSettings) BypassContentEncodingInChunks() bool {
|
func (s *appSettings) BypassContentEncodingInChunks() bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
@ -447,6 +458,12 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
||||||
return s.retryStrategy
|
return s.retryStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) Domains() []string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.domains
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
@ -684,8 +701,7 @@ func (a *App) setHealthStatus() {
|
||||||
// Serve runs HTTP server to handle S3 API requests.
|
// Serve runs HTTP server to handle S3 API requests.
|
||||||
func (a *App) Serve(ctx context.Context) {
|
func (a *App) Serve(ctx context.Context) {
|
||||||
// Attach S3 API:
|
// Attach S3 API:
|
||||||
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", a.settings.Domains()))
|
||||||
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", domains))
|
|
||||||
|
|
||||||
cfg := api.Config{
|
cfg := api.Config{
|
||||||
Throttle: middleware.ThrottleOpts{
|
Throttle: middleware.ThrottleOpts{
|
||||||
|
@ -696,7 +712,7 @@ func (a *App) Serve(ctx context.Context) {
|
||||||
Center: a.ctr,
|
Center: a.ctr,
|
||||||
Log: a.log,
|
Log: a.log,
|
||||||
Metrics: a.metrics,
|
Metrics: a.metrics,
|
||||||
Domains: domains,
|
Domains: a.settings.Domains(),
|
||||||
|
|
||||||
MiddlewareSettings: a.settings,
|
MiddlewareSettings: a.settings,
|
||||||
PolicyChecker: a.policyStorage,
|
PolicyChecker: a.policyStorage,
|
||||||
|
|
Loading…
Reference in a new issue