feature/146-add_flag_to_check_content-encoding_in_chunked_payload #167

12 changed files with 123 additions and 23 deletions

View file

@ -20,7 +20,7 @@ This document outlines major changes between releases.
- Return bearer token in `s3-authmate obtain-secret` result (#132) - Return bearer token in `s3-authmate obtain-secret` result (#132)
- Support multiple version credentials using GSet (#135) - Support multiple version credentials using GSet (#135)
- Implement chunk uploading (#106) - Implement chunk uploading (#106)
- Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146)
### Changed ### Changed
- Update prometheus to v1.15.0 (#94) - Update prometheus to v1.15.0 (#94)

View file

@ -37,6 +37,7 @@ type (
ResolveZoneList []string ResolveZoneList []string
IsResolveListAllow bool // True if ResolveZoneList contains allowed zones IsResolveListAllow bool // True if ResolveZoneList contains allowed zones
CompleteMultipartKeepalive time.Duration CompleteMultipartKeepalive time.Duration
Kludge KludgeSettings
} }
PlacementPolicy interface { PlacementPolicy interface {
@ -49,6 +50,10 @@ type (
XMLDecoderProvider interface { XMLDecoderProvider interface {
NewCompleteMultipartDecoder(io.Reader) *xml.Decoder NewCompleteMultipartDecoder(io.Reader) *xml.Decoder
} }
KludgeSettings interface {
BypassContentEncodingInChunks() bool
}
) )
const ( const (

View file

@ -105,6 +105,9 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
if expires := info.Headers[api.Expires]; expires != "" { if expires := info.Headers[api.Expires]; expires != "" {
h.Set(api.Expires, expires) h.Set(api.Expires, expires)
} }
if encodings := info.Headers[api.ContentEncoding]; encodings != "" {
h.Set(api.ContentEncoding, encodings)
}
for key, val := range info.Headers { for key, val := range info.Headers {
if layer.IsSystemHeader(key) { if layer.IsSystemHeader(key) {

View file

@ -36,6 +36,7 @@ type handlerContext struct {
tp *layer.TestFrostFS tp *layer.TestFrostFS
tree *tree.Tree tree *tree.Tree
context context.Context context context.Context
kludge *kludgeSettingsMock
} }
func (hc *handlerContext) Handler() *handler { func (hc *handlerContext) Handler() *handler {
@ -83,6 +84,14 @@ func (p *xmlDecoderProviderMock) NewCompleteMultipartDecoder(r io.Reader) *xml.D
return xml.NewDecoder(r) return xml.NewDecoder(r)
} }
type kludgeSettingsMock struct {
bypassContentEncodingInChunks bool
}
func (k *kludgeSettingsMock) BypassContentEncodingInChunks() bool {
return k.bypassContentEncodingInChunks
}
func prepareHandlerContext(t *testing.T) *handlerContext { func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false) return prepareHandlerContextBase(t, false)
} }
@ -124,12 +133,15 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
err = pp.DecodeString("REP 1") err = pp.DecodeString("REP 1")
require.NoError(t, err) require.NoError(t, err)
kludge := &kludgeSettingsMock{}
h := &handler{ h := &handler{
log: l, log: l,
obj: layer.NewLayer(l, tp, layerCfg), obj: layer.NewLayer(l, tp, layerCfg),
cfg: &Config{ cfg: &Config{
Policy: &placementPolicyMock{defaultPolicy: pp}, Policy: &placementPolicyMock{defaultPolicy: pp},
XMLDecoder: &xmlDecoderProviderMock{}, XMLDecoder: &xmlDecoderProviderMock{},
Kludge: kludge,
}, },
} }
@ -140,6 +152,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
tp: tp, tp: tp,
tree: treeMock, tree: treeMock,
context: context.WithValue(context.Background(), middleware.BoxData, newTestAccessBox(t, key)), context: context.WithValue(context.Background(), middleware.BoxData, newTestAccessBox(t, key)),
kludge: kludge,
} }
} }

View file

@ -225,6 +225,9 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "failed to get body reader", reqInfo, err) h.logAndSendError(w, "failed to get body reader", reqInfo, err)
return return
} }
if encodings := r.Header.Get(api.ContentEncoding); len(encodings) > 0 {
metadata[api.ContentEncoding] = encodings
}
var size uint64 var size uint64
if r.ContentLength > 0 { if r.ContentLength > 0 {
@ -329,6 +332,26 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
return r.Body, nil return r.Body, nil
} }
encodings := r.Header.Values(api.ContentEncoding)
var chunkedEncoding bool
resultContentEncoding := make([]string, 0, len(encodings))
for _, enc := range encodings {
for _, e := range strings.Split(enc, ",") {
e = strings.TrimSpace(e)
if e == api.AwsChunked { // probably we should also check position of this header value
chunkedEncoding = true
} else {
resultContentEncoding = append(resultContentEncoding, e)
}
}
}
r.Header.Set(api.ContentEncoding, strings.Join(resultContentEncoding, ","))
if !chunkedEncoding && !h.cfg.Kludge.BypassContentEncodingInChunks() {
return nil, fmt.Errorf("%w: request is not chunk encoded, encodings '%s'",
errors.GetAPIError(errors.ErrInvalidEncodingMethod), strings.Join(encodings, ","))
}
decodeContentSize := r.Header.Get(api.AmzDecodedContentLength) decodeContentSize := r.Header.Get(api.AmzDecodedContentLength)
if len(decodeContentSize) == 0 { if len(decodeContentSize) == 0 {
return nil, errors.GetAPIError(errors.ErrMissingContentLength) return nil, errors.GetAPIError(errors.ErrMissingContentLength)

View file

@ -173,11 +173,52 @@ func TestPutObjectWithStreamBodyError(t *testing.T) {
} }
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) { func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
tc := prepareHandlerContext(t) hc := prepareHandlerContext(t)
bktName, objName := "examplebucket", "chunkObject.txt" bktName, objName := "examplebucket", "chunkObject.txt"
createTestBucket(tc, bktName) createTestBucket(hc, bktName)
w, req, chunk := getChunkedRequest(hc.context, t, bktName, objName)
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, hc, bktName, objName, 0, 66824)
for i := range chunk {
require.Equal(t, chunk[i], data[i])
}
}
func TestPutChunkedTestContentEncoding(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "examplebucket", "chunkObject.txt"
createTestBucket(hc, bktName)
w, req, _ := getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, api.AwsChunked+",gzip")
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
resp := headObjectBase(hc, bktName, objName, emptyVersion)
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
w, req, _ = getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, "gzip")
hc.Handler().PutObjectHandler(w, req)
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidEncodingMethod))
hc.kludge.bypassContentEncodingInChunks = true
w, req, _ = getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, "gzip")
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
resp = headObjectBase(hc, bktName, objName, emptyVersion)
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
}
func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) {
chunk := make([]byte, 65*1024) chunk := make([]byte, 65*1024)
for i := range chunk { for i := range chunk {
chunk[i] = 'a' chunk[i] = 'a'
@ -219,7 +260,7 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
w := httptest.NewRecorder() w := httptest.NewRecorder()
reqInfo := middleware.NewReqInfo(w, req, middleware.ObjectRequest{Bucket: bktName, Object: objName}) reqInfo := middleware.NewReqInfo(w, req, middleware.ObjectRequest{Bucket: bktName, Object: objName})
req = req.WithContext(middleware.SetReqInfo(tc.Context(), reqInfo)) req = req.WithContext(middleware.SetReqInfo(ctx, reqInfo))
req = req.WithContext(context.WithValue(req.Context(), middleware.ClientTime, signTime)) req = req.WithContext(context.WithValue(req.Context(), middleware.ClientTime, signTime))
req = req.WithContext(context.WithValue(req.Context(), middleware.AuthHeaders, &auth.AuthHeader{ req = req.WithContext(context.WithValue(req.Context(), middleware.AuthHeaders, &auth.AuthHeader{
AccessKeyID: AWSAccessKeyID, AccessKeyID: AWSAccessKeyID,
@ -232,13 +273,8 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
AccessKey: AWSSecretAccessKey, AccessKey: AWSSecretAccessKey,
}, },
})) }))
tc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, tc, bktName, objName, 0, 66824) return w, req, chunk
for i := range chunk {
require.Equal(t, chunk[i], data[i])
}
} }
func TestCreateBucket(t *testing.T) { func TestCreateBucket(t *testing.T) {

View file

@ -117,10 +117,6 @@ var SystemMetadata = map[string]struct{}{
} }
func IsSignedStreamingV4(r *http.Request) bool { func IsSignedStreamingV4(r *http.Request) bool {
// The Content-Encoding must have "aws-chunked" as part of its value.
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
// Minio does not set this value, thus for compatibility reasons
// we do not check it.
return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 && return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 &&
r.Method == http.MethodPut r.Method == http.MethodPut
} }

View file

@ -10,6 +10,7 @@ import (
"os/signal" "os/signal"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"syscall" "syscall"
"time" "time"
@ -65,10 +66,11 @@ type (
} }
appSettings struct { appSettings struct {
logLevel zap.AtomicLevel logLevel zap.AtomicLevel
policies *placementPolicy policies *placementPolicy
xmlDecoder *xml.DecoderProvider xmlDecoder *xml.DecoderProvider
maxClient maxClientsConfig maxClient maxClientsConfig
bypassContentEncodingInChunks atomic.Bool
} }
maxClientsConfig struct { maxClientsConfig struct {
@ -162,12 +164,24 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
log.logger.Fatal("failed to create new policy mapping", zap.Error(err)) log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
} }
return &appSettings{ settings := &appSettings{
logLevel: log.lvl, logLevel: log.lvl,
policies: policies, policies: policies,
xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)), xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)),
maxClient: newMaxClients(v), maxClient: newMaxClients(v),
} }
settings.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
return settings
}
func (s *appSettings) BypassContentEncodingInChunks() bool {
return s.bypassContentEncodingInChunks.Load()
}
func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
s.bypassContentEncodingInChunks.Store(bypass)
} }
func getDefaultPolicyValue(v *viper.Viper) string { func getDefaultPolicyValue(v *viper.Viper) string {
@ -595,6 +609,7 @@ func (a *App) updateSettings() {
a.settings.policies.update(a.log, a.cfg) a.settings.policies.update(a.log, a.cfg)
a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)) a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload))
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
} }
func (a *App) startServices() { func (a *App) startServices() {
@ -783,6 +798,7 @@ func (a *App) initHandler() {
} }
cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive) cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive)
cfg.Kludge = a.settings
var err error var err error
a.api, err = handler.New(a.log, a.obj, a.nc, cfg) a.api, err = handler.New(a.log, a.obj, a.nc, cfg)

View file

@ -120,6 +120,7 @@ const ( // Settings.
// Kludge. // Kludge.
cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload = "kludge.use_default_xmlns_for_complete_multipart" cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload = "kludge.use_default_xmlns_for_complete_multipart"
cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive" cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive"
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
// Command line args. // Command line args.
cmdHelp = "help" cmdHelp = "help"
@ -306,6 +307,7 @@ func newSettings() *viper.Viper {
// kludge // kludge
v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false) v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false)
v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second) v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second)
v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false)
// Bind flags // Bind flags
if err := bindFlags(v, flags); err != nil { if err := bindFlags(v, flags); err != nil {

View file

@ -138,6 +138,8 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. # Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
S3_GW_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
S3_GW_TRACING_ENABLED=false S3_GW_TRACING_ENABLED=false
S3_GW_TRACING_ENDPOINT="localhost:4318" S3_GW_TRACING_ENDPOINT="localhost:4318"

View file

@ -167,3 +167,5 @@ kludge:
use_default_xmlns_for_complete_multipart: false use_default_xmlns_for_complete_multipart: false
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. # Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
complete_multipart_keepalive: 10s complete_multipart_keepalive: 10s
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
bypass_content_encoding_check_in_chunks: false

View file

@ -536,9 +536,11 @@ Workarounds for non-standard use cases.
kludge: kludge:
use_default_xmlns_for_complete_multipart: false use_default_xmlns_for_complete_multipart: false
complete_multipart_keepalive: 10s complete_multipart_keepalive: 10s
bypass_content_encoding_check_in_chunks: false
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------| |--------------------------------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. | | `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. |
| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. | | `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | false | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |