diff --git a/pool/object_put_transformer.go b/pool/object_put_transformer.go index 74c88c3..08d41dd 100644 --- a/pool/object_put_transformer.go +++ b/pool/object_put_transformer.go @@ -2,30 +2,39 @@ package pool import ( "context" - "crypto/ecdsa" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type logger interface { + log(level zapcore.Level, msg string, fields ...zap.Field) +} + type PrmObjectPutClientCutInit struct { - sdkClient.PrmObjectPutInit - key *ecdsa.PrivateKey - maxSize uint64 - epochSource transformer.EpochSource + PrmObjectPut withoutHomomorphicHash bool - stoken *session.Object } func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) { + cl, err := c.getClient() + if err != nil { + return nil, err + } var w objectWriterTransformer + w.it = internalTarget{ - client: c.client, - prm: prm, + client: cl, + prm: prm, + address: c.address(), + logger: &c.clientStatusMonitor, } key := &c.prm.key if prm.key != nil { @@ -35,9 +44,9 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return &w.it }, - MaxSize: prm.maxSize, + MaxSize: prm.networkInfo.MaxObjectSize(), WithoutHomomorphicHash: prm.withoutHomomorphicHash, - NetworkState: prm.epochSource, + NetworkState: prm.networkInfo, SessionToken: prm.stoken, }) return &w, nil @@ -78,20 +87,41 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err } type internalTarget struct { - client *sdkClient.Client - res ResObjectPut - prm PrmObjectPutClientCutInit - useStream bool + client *sdkClient.Client + res ResObjectPut + prm PrmObjectPutClientCutInit + useStream bool + address string + logger logger + resolveFrostFSErrors bool } func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error { - // todo support PutSingle + putSingleImplemented, err := it.tryPutSingle(ctx, o) + if putSingleImplemented { + return err + } + + it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address)) + it.useStream = true return it.putAsStream(ctx, o) } func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { - wrt, err := it.client.ObjectPutInit(ctx, it.prm.PrmObjectPutInit) + var cliPrm sdkClient.PrmObjectPutInit + cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber) + if it.prm.stoken != nil { + cliPrm.WithinSession(*it.prm.stoken) + } + if it.prm.key != nil { + cliPrm.UseKey(*it.prm.key) + } + if it.prm.btoken != nil { + cliPrm.WithBearerToken(*it.prm.btoken) + } + + wrt, err := it.client.ObjectPutInit(ctx, cliPrm) if err != nil { return err } @@ -105,3 +135,37 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err } return err } + +func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) { + if it.useStream { + return false, nil + } + var cliPrm sdkClient.PrmObjectPutSingle + cliPrm.SetCopiesNumber(it.prm.copiesNumber) + cliPrm.UseKey(it.prm.key) + if it.prm.stoken != nil { + cliPrm.WithinSession(*it.prm.stoken) + } + if it.prm.btoken != nil { + cliPrm.WithBearerToken(*it.prm.btoken) + } + cliPrm.SetObject(o.ToV2()) + + res, err := it.client.ObjectPutSingle(ctx, cliPrm) + if err != nil && status.Code(err) == codes.Unimplemented { + return false, err + } + + if err == nil { + id, _ := o.ID() + it.res = ResObjectPut{ + Status: res.Status(), + OID: id, + } + if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) { + return true, apistatus.ErrFromStatus(it.res.Status) + } + return true, nil + } + return true, err +} diff --git a/pool/pool.go b/pool/pool.go index a3f3b45..94c0813 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -252,6 +252,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { x.key = key } +// setLogger sets sdkClient.Client logger. +func (x *wrapperPrm) setLogger(logger *zap.Logger) { + x.logger = logger +} + // setDialTimeout sets the timeout for connection to be established. func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { x.dialTimeout = timeout @@ -719,24 +724,8 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut } func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { - var cliPrm sdkClient.PrmObjectPutInit - cliPrm.SetCopiesNumberByVectors(prm.copiesNumber) - if prm.stoken != nil { - cliPrm.WithinSession(*prm.stoken) - } - if prm.key != nil { - cliPrm.UseKey(*prm.key) - } - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - putInitPrm := PrmObjectPutClientCutInit{ - PrmObjectPutInit: cliPrm, - key: prm.key, - maxSize: prm.networkInfo.MaxObjectSize(), - epochSource: prm.networkInfo, - stoken: prm.stoken, + PrmObjectPut: prm, } start := time.Now() @@ -1054,12 +1043,20 @@ func (c *clientStatusMonitor) incErrorRate() { } c.mu.Unlock() - if thresholdReached && c.logger != nil { - c.logger.Warn("error threshold reached", + if thresholdReached { + c.log(zapcore.WarnLevel, "error threshold reached", zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) } } +func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) { + if c.logger == nil { + return + } + + c.logger.Log(level, msg, fields...) +} + func (c *clientStatusMonitor) currentErrorRate() uint32 { c.mu.RLock() defer c.mu.RUnlock() @@ -1911,6 +1908,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { var prm wrapperPrm prm.setAddress(addr) prm.setKey(*params.key) + prm.setLogger(params.logger) prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold)