diff --git a/api/layer/frostfs/frostfs.go b/api/layer/frostfs/frostfs.go index 26340dc8..d9517a60 100644 --- a/api/layer/frostfs/frostfs.go +++ b/api/layer/frostfs/frostfs.go @@ -164,6 +164,8 @@ type PrmObjectCreate struct { // Sets max buffer size to read payload. BufferMaxSize uint64 + + ThresholdMaxSize uint64 } // CreateObjectResult is a result parameter of FrostFS.CreateObject operation. diff --git a/api/layer/layer.go b/api/layer/layer.go index 0768c83b..5f76eebb 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -42,6 +42,7 @@ type ( FeatureSettings interface { ClientCut() bool BufferMaxSizeForPut() uint64 + ThresholdMaxSizeForPut() uint64 MD5Enabled() bool FormContainerZone(ns string) string } diff --git a/api/layer/object.go b/api/layer/object.go index 8d07ff3f..b1a59a4d 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -522,6 +522,9 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm frostfs.PrmObjectCreat hash.Write(buf) md5Hash.Write(buf) }) + if threshold := n.features.ThresholdMaxSizeForPut(); threshold > 0 { + prm.Payload = wrapReaderThreshold(prm.Payload, int(threshold)) + } res, err := n.frostFS.CreateObject(ctx, prm) if err != nil { if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil { @@ -574,3 +577,29 @@ func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader { }() return r } + +func wrapReaderThreshold(input io.Reader, threshold int) io.Reader { + if input == nil { + return nil + } + + r, w := io.Pipe() + go func() { + var buf = make([]byte, threshold) + var cur int + for { + n, err := input.Read(buf[cur:]) + cur += n + if err != nil { + _, _ = w.Write(buf[:cur]) + _ = w.CloseWithError(err) + } + if cur < threshold { + continue + } + _, _ = w.Write(buf[:cur]) + cur = 0 + } + }() + return r +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 3e24acf8..a2fc650b 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -106,6 +106,7 @@ type ( bypassContentEncodingInChunks bool clientCut bool maxBufferSizeForPut uint64 + maxThresholdSizeForPut uint64 md5Enabled bool namespaceHeader string defaultNamespaces []string @@ -260,6 +261,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { bypassContentEncodingInChunks := v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks) clientCut := v.GetBool(cfgClientCut) maxBufferSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut) + maxThresholdSizeForPut := v.GetUint64(cfgThresholdMaxSizeForPut) md5Enabled := v.GetBool(cfgMD5Enabled) policyDenyByDefault := v.GetBool(cfgPolicyDenyByDefault) sourceIPHeader := v.GetString(cfgSourceIPHeader) @@ -293,6 +295,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { s.bypassContentEncodingInChunks = bypassContentEncodingInChunks s.clientCut = clientCut s.maxBufferSizeForPut = maxBufferSizeForPut + s.maxThresholdSizeForPut = maxThresholdSizeForPut s.md5Enabled = md5Enabled s.policyDenyByDefault = policyDenyByDefault s.sourceIPHeader = sourceIPHeader @@ -367,6 +370,12 @@ func (s *appSettings) BufferMaxSizeForPut() uint64 { return s.maxBufferSizeForPut } +func (s *appSettings) ThresholdMaxSizeForPut() uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.maxThresholdSizeForPut +} + func (s *appSettings) DefaultPlacementPolicy(namespace string) netmap.PlacementPolicy { s.mu.RLock() defer s.mu.RUnlock() diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index bd7c35cf..a24a5a8a 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -236,7 +236,8 @@ const ( // Settings. // Enabling client side object preparing for PUT operations. cfgClientCut = "frostfs.client_cut" // Sets max buffer size for read payload in put operations. - cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" + cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" + cfgThresholdMaxSizeForPut = "frostfs.threshold_max_size_for_put" // Sets max attempt to make successful tree request. cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"