[#115] pool: Try putSingle if possible

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-08-01 11:50:18 +03:00
parent faeeeab87a
commit 3cb3841073
2 changed files with 98 additions and 36 deletions

View file

@ -2,30 +2,39 @@ package pool
import ( import (
"context" "context"
"crypto/ecdsa"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct { type PrmObjectPutClientCutInit struct {
sdkClient.PrmObjectPutInit PrmObjectPut
key *ecdsa.PrivateKey
maxSize uint64
epochSource transformer.EpochSource
withoutHomomorphicHash bool withoutHomomorphicHash bool
stoken *session.Object
} }
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) { func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer var w objectWriterTransformer
w.it = internalTarget{ w.it = internalTarget{
client: c.client, client: cl,
prm: prm, prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
} }
key := &c.prm.key key := &c.prm.key
if prm.key != nil { if prm.key != nil {
@ -35,9 +44,9 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit)
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key, Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it }, NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.maxSize, MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash, WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.epochSource, NetworkState: prm.networkInfo,
SessionToken: prm.stoken, SessionToken: prm.stoken,
}) })
return &w, nil return &w, nil
@ -82,16 +91,37 @@ type internalTarget struct {
res ResObjectPut res ResObjectPut
prm PrmObjectPutClientCutInit prm PrmObjectPutClientCutInit
useStream bool useStream bool
address string
logger logger
resolveFrostFSErrors bool
} }
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error { func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
// todo support PutSingle putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true it.useStream = true
return it.putAsStream(ctx, o) return it.putAsStream(ctx, o)
} }
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
wrt, err := it.client.ObjectPutInit(ctx, it.prm.PrmObjectPutInit) var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil { if err != nil {
return err return err
} }
@ -105,3 +135,37 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
} }
return err return err
} }
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -252,6 +252,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key x.key = key
} }
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established. // setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout x.dialTimeout = timeout
@ -719,24 +724,8 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
} }
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(prm.copiesNumber)
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
putInitPrm := PrmObjectPutClientCutInit{ putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPutInit: cliPrm, PrmObjectPut: prm,
key: prm.key,
maxSize: prm.networkInfo.MaxObjectSize(),
epochSource: prm.networkInfo,
stoken: prm.stoken,
} }
start := time.Now() start := time.Now()
@ -1054,12 +1043,20 @@ func (c *clientStatusMonitor) incErrorRate() {
} }
c.mu.Unlock() c.mu.Unlock()
if thresholdReached && c.logger != nil { if thresholdReached {
c.logger.Warn("error threshold reached", c.log(zapcore.WarnLevel, "error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
} }
} }
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -1911,6 +1908,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm var prm wrapperPrm
prm.setAddress(addr) prm.setAddress(addr)
prm.setKey(*params.key) prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)