Use threshold max size setting for buffered reader

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2024-11-06 19:07:10 +03:00
parent 7f1089db3f
commit 0a9ed6d632
5 changed files with 43 additions and 1 deletions

View file

@ -164,6 +164,8 @@ type PrmObjectCreate struct {
// Sets max buffer size to read payload.
BufferMaxSize uint64
ThresholdMaxSize uint64
}
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.

View file

@ -42,6 +42,7 @@ type (
FeatureSettings interface {
ClientCut() bool
BufferMaxSizeForPut() uint64
ThresholdMaxSizeForPut() uint64
MD5Enabled() bool
FormContainerZone(ns string) string
}

View file

@ -522,6 +522,9 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm frostfs.PrmObjectCreat
hash.Write(buf)
md5Hash.Write(buf)
})
if threshold := n.features.ThresholdMaxSizeForPut(); threshold > 0 {
prm.Payload = wrapReaderThreshold(prm.Payload, int(threshold))
}
res, err := n.frostFS.CreateObject(ctx, prm)
if err != nil {
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
@ -574,3 +577,29 @@ func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
}()
return r
}
func wrapReaderThreshold(input io.Reader, threshold int) io.Reader {
if input == nil {
return nil
}
r, w := io.Pipe()
go func() {
var buf = make([]byte, threshold)
var cur int
for {
n, err := input.Read(buf[cur:])
cur += n
if err != nil {
_, _ = w.Write(buf[:cur])
_ = w.CloseWithError(err)
}
if cur < threshold {
continue
}
_, _ = w.Write(buf[:cur])
cur = 0
}
}()
return r
}

View file

@ -106,6 +106,7 @@ type (
bypassContentEncodingInChunks bool
clientCut bool
maxBufferSizeForPut uint64
maxThresholdSizeForPut uint64
md5Enabled bool
namespaceHeader string
defaultNamespaces []string
@ -260,6 +261,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
bypassContentEncodingInChunks := v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks)
clientCut := v.GetBool(cfgClientCut)
maxBufferSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
maxThresholdSizeForPut := v.GetUint64(cfgThresholdMaxSizeForPut)
md5Enabled := v.GetBool(cfgMD5Enabled)
policyDenyByDefault := v.GetBool(cfgPolicyDenyByDefault)
sourceIPHeader := v.GetString(cfgSourceIPHeader)
@ -293,6 +295,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.bypassContentEncodingInChunks = bypassContentEncodingInChunks
s.clientCut = clientCut
s.maxBufferSizeForPut = maxBufferSizeForPut
s.maxThresholdSizeForPut = maxThresholdSizeForPut
s.md5Enabled = md5Enabled
s.policyDenyByDefault = policyDenyByDefault
s.sourceIPHeader = sourceIPHeader
@ -367,6 +370,12 @@ func (s *appSettings) BufferMaxSizeForPut() uint64 {
return s.maxBufferSizeForPut
}
func (s *appSettings) ThresholdMaxSizeForPut() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.maxThresholdSizeForPut
}
func (s *appSettings) DefaultPlacementPolicy(namespace string) netmap.PlacementPolicy {
s.mu.RLock()
defer s.mu.RUnlock()

View file

@ -236,7 +236,8 @@ const ( // Settings.
// Enabling client side object preparing for PUT operations.
cfgClientCut = "frostfs.client_cut"
// Sets max buffer size for read payload in put operations.
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
cfgThresholdMaxSizeForPut = "frostfs.threshold_max_size_for_put"
// Sets max attempt to make successful tree request.
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"