feature/146-add_flag_to_check_content-encoding_in_chunked_payload #167

12 changed files with 123 additions and 23 deletions

View file

@ -20,7 +20,7 @@ This document outlines major changes between releases.
- Return bearer token in `s3-authmate obtain-secret` result (#132)
- Support multiple version credentials using GSet (#135)
- Implement chunk uploading (#106)
- Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146)
### Changed
- Update prometheus to v1.15.0 (#94)

View file

@ -37,6 +37,7 @@ type (
ResolveZoneList []string
IsResolveListAllow bool // True if ResolveZoneList contains allowed zones
CompleteMultipartKeepalive time.Duration
Kludge KludgeSettings
}
PlacementPolicy interface {
@ -49,6 +50,10 @@ type (
XMLDecoderProvider interface {
NewCompleteMultipartDecoder(io.Reader) *xml.Decoder
}
KludgeSettings interface {
BypassContentEncodingInChunks() bool
}
)
const (

View file

@ -105,6 +105,9 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
if expires := info.Headers[api.Expires]; expires != "" {
h.Set(api.Expires, expires)
}
if encodings := info.Headers[api.ContentEncoding]; encodings != "" {
h.Set(api.ContentEncoding, encodings)
}
for key, val := range info.Headers {
if layer.IsSystemHeader(key) {

View file

@ -36,6 +36,7 @@ type handlerContext struct {
tp *layer.TestFrostFS
tree *tree.Tree
context context.Context
kludge *kludgeSettingsMock
}
func (hc *handlerContext) Handler() *handler {
@ -83,6 +84,14 @@ func (p *xmlDecoderProviderMock) NewCompleteMultipartDecoder(r io.Reader) *xml.D
return xml.NewDecoder(r)
}
type kludgeSettingsMock struct {
bypassContentEncodingInChunks bool
}
func (k *kludgeSettingsMock) BypassContentEncodingInChunks() bool {
return k.bypassContentEncodingInChunks
}
func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false)
}
@ -124,12 +133,15 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
err = pp.DecodeString("REP 1")
require.NoError(t, err)
kludge := &kludgeSettingsMock{}
h := &handler{
log: l,
obj: layer.NewLayer(l, tp, layerCfg),
cfg: &Config{
Policy: &placementPolicyMock{defaultPolicy: pp},
XMLDecoder: &xmlDecoderProviderMock{},
Kludge: kludge,
},
}
@ -140,6 +152,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), middleware.BoxData, newTestAccessBox(t, key)),
kludge: kludge,
}
}

View file

@ -225,6 +225,9 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "failed to get body reader", reqInfo, err)
return
}
if encodings := r.Header.Get(api.ContentEncoding); len(encodings) > 0 {
metadata[api.ContentEncoding] = encodings
}
var size uint64
if r.ContentLength > 0 {
@ -329,6 +332,26 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
return r.Body, nil
}
encodings := r.Header.Values(api.ContentEncoding)
var chunkedEncoding bool
resultContentEncoding := make([]string, 0, len(encodings))
for _, enc := range encodings {
for _, e := range strings.Split(enc, ",") {
e = strings.TrimSpace(e)
if e == api.AwsChunked { // probably we should also check position of this header value
chunkedEncoding = true
} else {
resultContentEncoding = append(resultContentEncoding, e)
}
}
}
r.Header.Set(api.ContentEncoding, strings.Join(resultContentEncoding, ","))
if !chunkedEncoding && !h.cfg.Kludge.BypassContentEncodingInChunks() {
return nil, fmt.Errorf("%w: request is not chunk encoded, encodings '%s'",
errors.GetAPIError(errors.ErrInvalidEncodingMethod), strings.Join(encodings, ","))
}
decodeContentSize := r.Header.Get(api.AmzDecodedContentLength)
if len(decodeContentSize) == 0 {
return nil, errors.GetAPIError(errors.ErrMissingContentLength)

View file

@ -173,11 +173,52 @@ func TestPutObjectWithStreamBodyError(t *testing.T) {
}
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
tc := prepareHandlerContext(t)
hc := prepareHandlerContext(t)
bktName, objName := "examplebucket", "chunkObject.txt"
createTestBucket(tc, bktName)
createTestBucket(hc, bktName)
w, req, chunk := getChunkedRequest(hc.context, t, bktName, objName)
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, hc, bktName, objName, 0, 66824)
for i := range chunk {
require.Equal(t, chunk[i], data[i])
}
}
func TestPutChunkedTestContentEncoding(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "examplebucket", "chunkObject.txt"
createTestBucket(hc, bktName)
w, req, _ := getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, api.AwsChunked+",gzip")
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
resp := headObjectBase(hc, bktName, objName, emptyVersion)
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
w, req, _ = getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, "gzip")
hc.Handler().PutObjectHandler(w, req)
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidEncodingMethod))
hc.kludge.bypassContentEncodingInChunks = true
w, req, _ = getChunkedRequest(hc.context, t, bktName, objName)
req.Header.Set(api.ContentEncoding, "gzip")
hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
resp = headObjectBase(hc, bktName, objName, emptyVersion)
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
}
func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) {
chunk := make([]byte, 65*1024)
for i := range chunk {
chunk[i] = 'a'
@ -219,7 +260,7 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
w := httptest.NewRecorder()
reqInfo := middleware.NewReqInfo(w, req, middleware.ObjectRequest{Bucket: bktName, Object: objName})
req = req.WithContext(middleware.SetReqInfo(tc.Context(), reqInfo))
req = req.WithContext(middleware.SetReqInfo(ctx, reqInfo))
req = req.WithContext(context.WithValue(req.Context(), middleware.ClientTime, signTime))
req = req.WithContext(context.WithValue(req.Context(), middleware.AuthHeaders, &auth.AuthHeader{
AccessKeyID: AWSAccessKeyID,
@ -232,13 +273,8 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
AccessKey: AWSSecretAccessKey,
},
}))
tc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, tc, bktName, objName, 0, 66824)
for i := range chunk {
require.Equal(t, chunk[i], data[i])
}
return w, req, chunk
}
func TestCreateBucket(t *testing.T) {

View file

@ -117,10 +117,6 @@ var SystemMetadata = map[string]struct{}{
}
func IsSignedStreamingV4(r *http.Request) bool {
// The Content-Encoding must have "aws-chunked" as part of its value.
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
// Minio does not set this value, thus for compatibility reasons
// we do not check it.
return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 &&
r.Method == http.MethodPut
}

View file

@ -10,6 +10,7 @@ import (
"os/signal"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
@ -65,10 +66,11 @@ type (
}
appSettings struct {
logLevel zap.AtomicLevel
policies *placementPolicy
xmlDecoder *xml.DecoderProvider
maxClient maxClientsConfig
logLevel zap.AtomicLevel
policies *placementPolicy
xmlDecoder *xml.DecoderProvider
maxClient maxClientsConfig
bypassContentEncodingInChunks atomic.Bool
}
maxClientsConfig struct {
@ -162,12 +164,24 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
}
return &appSettings{
settings := &appSettings{
logLevel: log.lvl,
policies: policies,
xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)),
maxClient: newMaxClients(v),
}
settings.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
return settings
}
func (s *appSettings) BypassContentEncodingInChunks() bool {
return s.bypassContentEncodingInChunks.Load()
}
func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
s.bypassContentEncodingInChunks.Store(bypass)
}
func getDefaultPolicyValue(v *viper.Viper) string {
@ -595,6 +609,7 @@ func (a *App) updateSettings() {
a.settings.policies.update(a.log, a.cfg)
a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload))
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
}
func (a *App) startServices() {
@ -783,6 +798,7 @@ func (a *App) initHandler() {
}
cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive)
cfg.Kludge = a.settings
var err error
a.api, err = handler.New(a.log, a.obj, a.nc, cfg)

View file

@ -120,6 +120,7 @@ const ( // Settings.
// Kludge.
cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload = "kludge.use_default_xmlns_for_complete_multipart"
cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive"
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
// Command line args.
cmdHelp = "help"
@ -306,6 +307,7 @@ func newSettings() *viper.Viper {
// kludge
v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false)
v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second)
v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false)
// Bind flags
if err := bindFlags(v, flags); err != nil {

View file

@ -138,6 +138,8 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
S3_GW_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
S3_GW_TRACING_ENABLED=false
S3_GW_TRACING_ENDPOINT="localhost:4318"

View file

@ -167,3 +167,5 @@ kludge:
use_default_xmlns_for_complete_multipart: false
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
complete_multipart_keepalive: 10s
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
bypass_content_encoding_check_in_chunks: false

View file

@ -536,9 +536,11 @@ Workarounds for non-standard use cases.
kludge:
use_default_xmlns_for_complete_multipart: false
complete_multipart_keepalive: 10s
bypass_content_encoding_check_in_chunks: false
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------|
| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. |
| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. |
| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | false | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |