[#451] Support Location in CompleteMultipart response

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-08-06 15:43:45 +03:00
parent 69e77aecc9
commit 3dc989d7fe
6 changed files with 174 additions and 12 deletions

View file

@ -41,6 +41,7 @@ type (
RetryMaxAttempts() int RetryMaxAttempts() int
RetryMaxBackoff() time.Duration RetryMaxBackoff() time.Duration
RetryStrategy() RetryStrategy RetryStrategy() RetryStrategy
Domains() []string
} }
FrostFSID interface { FrostFSID interface {

View file

@ -72,6 +72,7 @@ type configMock struct {
defaultCopiesNumbers []uint32 defaultCopiesNumbers []uint32
bypassContentEncodingInChunks bool bypassContentEncodingInChunks bool
md5Enabled bool md5Enabled bool
domains []string
} }
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy { func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
@ -135,6 +136,10 @@ func (c *configMock) RetryStrategy() RetryStrategy {
return RetryStrategyConstant return RetryStrategyConstant
} }
func (c *configMock) Domains() []string {
return c.domains
}
func prepareHandlerContext(t *testing.T) *handlerContext { func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample())) return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
} }

View file

@ -5,7 +5,9 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"path"
"strconv" "strconv"
"strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
@ -26,10 +28,11 @@ type (
} }
CompleteMultipartUploadResponse struct { CompleteMultipartUploadResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"` XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
Bucket string `xml:"Bucket"` Bucket string `xml:"Bucket"`
Key string `xml:"Key"` Key string `xml:"Key"`
ETag string `xml:"ETag"` ETag string `xml:"ETag"`
Location string `xml:"Location"`
} }
ListMultipartUploadsResponse struct { ListMultipartUploadsResponse struct {
@ -423,9 +426,10 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
response := CompleteMultipartUploadResponse{ response := CompleteMultipartUploadResponse{
Bucket: objInfo.Bucket, Bucket: objInfo.Bucket,
Key: objInfo.Name, Key: objInfo.Name,
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())), ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
Location: getObjectLocation(r, h.cfg.Domains(), reqInfo.BucketName, reqInfo.ObjectName),
} }
if settings.VersioningEnabled() { if settings.VersioningEnabled() {
@ -437,6 +441,35 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
} }
// returns "https" if the tls boolean is true, "http" otherwise.
func getURLScheme(r *http.Request) string {
if r.TLS != nil {
return "https"
}
return "http"
}
// getObjectLocation gets the fully qualified URL of an object.
func getObjectLocation(r *http.Request, domains []string, bucket, object string) string {
proto := middleware.GetSourceScheme(r)
if proto == "" {
proto = getURLScheme(r)
}
u := &url.URL{
Host: r.Host,
Path: path.Join("/", bucket, object),
Scheme: proto,
}
// If domain is set then we need to use bucket DNS style.
for _, domain := range domains {
if strings.HasPrefix(r.Host, bucket+"."+domain) {
u.Path = path.Join("/", object)
break
}
}
return u.String()
}
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) { func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
ctx := r.Context() ctx := r.Context()
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c) uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)

View file

@ -441,6 +441,80 @@ func TestUploadPartCheckContentSHA256(t *testing.T) {
} }
} }
func TestMultipartObjectLocation(t *testing.T) {
for _, tc := range []struct {
req *http.Request
bucket string
object string
domains []string
expected string
}{
{
req: &http.Request{
Host: "127.0.0.1:8084",
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
},
bucket: "testbucket1",
object: "test/1.txt",
expected: "http://127.0.0.1:8084/testbucket1/test/1.txt",
},
{
req: &http.Request{
Host: "localhost:8084",
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
},
bucket: "testbucket1",
object: "test/1.txt",
expected: "https://localhost:8084/testbucket1/test/1.txt",
},
{
req: &http.Request{
Host: "s3.mybucket.org",
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
},
bucket: "mybucket",
object: "test/1.txt",
expected: "http://s3.mybucket.org/mybucket/test/1.txt",
},
{
req: &http.Request{Host: "mys3.mybucket.org"},
bucket: "mybucket",
object: "test/1.txt",
expected: "http://mys3.mybucket.org/mybucket/test/1.txt",
},
{
req: &http.Request{Host: "s3.bucket.org", TLS: &tls.ConnectionState{}},
bucket: "bucket",
object: "obj",
expected: "https://s3.bucket.org/bucket/obj",
},
{
req: &http.Request{
Host: "mybucket.s3dev.frostfs.devenv",
},
domains: []string{"s3dev.frostfs.devenv"},
bucket: "mybucket",
object: "test/1.txt",
expected: "http://mybucket.s3dev.frostfs.devenv/test/1.txt",
},
{
req: &http.Request{
Host: "mybucket.s3dev.frostfs.devenv",
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
},
domains: []string{"s3dev.frostfs.devenv"},
bucket: "mybucket",
object: "test/1.txt",
expected: "https://mybucket.s3dev.frostfs.devenv/test/1.txt",
},
} {
t.Run("", func(t *testing.T) {
location := getObjectLocation(tc.req, tc.domains, tc.bucket, tc.object)
require.Equal(t, tc.expected, location)
})
}
}
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse { func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end) return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
} }

View file

@ -69,8 +69,10 @@ var deploymentID = uuid.Must(uuid.NewRandom())
var ( var (
// De-facto standard header keys. // De-facto standard header keys.
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For") xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
xRealIP = http.CanonicalHeaderKey("X-Real-IP") xRealIP = http.CanonicalHeaderKey("X-Real-IP")
xForwardedProto = http.CanonicalHeaderKey("X-Forwarded-Proto")
xForwardedScheme = http.CanonicalHeaderKey("X-Forwarded-Scheme")
// RFC7239 defines a new "Forwarded: " header designed to replace the // RFC7239 defines a new "Forwarded: " header designed to replace the
// existing use of X-Forwarded-* headers. // existing use of X-Forwarded-* headers.
@ -79,6 +81,9 @@ var (
// Allows for a sub-match of the first value after 'for=' to the next // Allows for a sub-match of the first value after 'for=' to the next
// comma, semi-colon or space. The match is case-insensitive. // comma, semi-colon or space. The match is case-insensitive.
forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`) forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`)
// Allows for a sub-match for the first instance of scheme (http|https)
// prefixed by 'proto='. The match is case-insensitive.
protoRegex = regexp.MustCompile(`(?i)^(;|,| )+(?:proto=)(https|http)`)
) )
// NewReqInfo returns new ReqInfo based on parameters. // NewReqInfo returns new ReqInfo based on parameters.
@ -291,3 +296,31 @@ func getSourceIP(r *http.Request) string {
} }
return raddr return raddr
} }
// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239
// Forwarded headers (in that order).
func GetSourceScheme(r *http.Request) string {
var scheme string
// Retrieve the scheme from X-Forwarded-Proto.
if proto := r.Header.Get(xForwardedProto); proto != "" {
scheme = strings.ToLower(proto)
} else if proto = r.Header.Get(xForwardedScheme); proto != "" {
scheme = strings.ToLower(proto)
} else if proto := r.Header.Get(forwarded); proto != "" {
// match should contain at least two elements if the protocol was
// specified in the Forwarded header. The first element will always be
// the 'for=', which we ignore, subsequently we proceed to look for
// 'proto=' which should precede right after `for=` if not
// we simply ignore the values and return empty. This is in line
// with the approach we took for returning first ip from multiple
// params.
if match := forRegex.FindStringSubmatch(proto); len(match) > 1 {
if match = protoRegex.FindStringSubmatch(match[2]); len(match) > 1 {
scheme = strings.ToLower(match[2])
}
}
}
return scheme
}

View file

@ -107,6 +107,7 @@ type (
retryMaxAttempts int retryMaxAttempts int
retryMaxBackoff time.Duration retryMaxBackoff time.Duration
retryStrategy handler.RetryStrategy retryStrategy handler.RetryStrategy
domains []string
} }
maxClientsConfig struct { maxClientsConfig struct {
@ -231,6 +232,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v)) s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v)) s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
s.setRetryStrategy(fetchRetryStrategy(v)) s.setRetryStrategy(fetchRetryStrategy(v))
s.setVHSSettings(v, log)
} }
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) { func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
@ -245,6 +247,15 @@ func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger)
s.namespaces = nsConfig.Namespaces s.namespaces = nsConfig.Namespaces
} }
func (s *appSettings) setVHSSettings(v *viper.Viper, _ *zap.Logger) {
domains := v.GetStringSlice(cfgListenDomains)
s.mu.Lock()
defer s.mu.Unlock()
s.domains = domains
}
func (s *appSettings) BypassContentEncodingInChunks() bool { func (s *appSettings) BypassContentEncodingInChunks() bool {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
@ -447,6 +458,12 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
return s.retryStrategy return s.retryStrategy
} }
func (s *appSettings) Domains() []string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.domains
}
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx) a.initLayer(ctx)
a.initHandler() a.initHandler()
@ -684,8 +701,7 @@ func (a *App) setHealthStatus() {
// Serve runs HTTP server to handle S3 API requests. // Serve runs HTTP server to handle S3 API requests.
func (a *App) Serve(ctx context.Context) { func (a *App) Serve(ctx context.Context) {
// Attach S3 API: // Attach S3 API:
domains := a.cfg.GetStringSlice(cfgListenDomains) a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", a.settings.Domains()))
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", domains))
cfg := api.Config{ cfg := api.Config{
Throttle: middleware.ThrottleOpts{ Throttle: middleware.ThrottleOpts{
@ -696,7 +712,7 @@ func (a *App) Serve(ctx context.Context) {
Center: a.ctr, Center: a.ctr,
Log: a.log, Log: a.log,
Metrics: a.metrics, Metrics: a.metrics,
Domains: domains, Domains: a.settings.Domains(),
MiddlewareSettings: a.settings, MiddlewareSettings: a.settings,
PolicyChecker: a.policyStorage, PolicyChecker: a.policyStorage,