package pool import ( "context" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type logger interface { log(level zapcore.Level, msg string, fields ...zap.Field) } type PrmObjectPutClientCutInit struct { PrmObjectPut } func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) { cl, err := c.getClient() if err != nil { return nil, err } var w objectWriterTransformer w.it = internalTarget{ client: cl, prm: prm, address: c.address(), logger: &c.clientStatusMonitor, } key := &c.prm.key if prm.key != nil { key = prm.key } w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return &w.it }, MaxSize: prm.networkInfo.MaxObjectSize(), WithoutHomomorphicHash: prm.withoutHomomorphicHash, NetworkState: prm.networkInfo, SessionToken: prm.stoken, }) return &w, nil } type objectWriterTransformer struct { ot transformer.ChunkedObjectWriter it internalTarget err error } func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool { x.err = x.ot.WriteHeader(ctx, &hdr) return x.err == nil } func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool { _, x.err = x.ot.Write(ctx, chunk) return x.err == nil } // ResObjectPut groups the final result values of ObjectPutInit operation. type ResObjectPut struct { Status apistatus.Status OID oid.ID } // Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing. func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) { ai, err := x.ot.Close(ctx) if err != nil { return nil, err } if ai != nil && ai.ParentID != nil { x.it.res.OID = *ai.ParentID } return &x.it.res, nil } type internalTarget struct { client *sdkClient.Client res ResObjectPut prm PrmObjectPutClientCutInit useStream bool address string logger logger resolveFrostFSErrors bool } func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error { putSingleImplemented, err := it.tryPutSingle(ctx, o) if putSingleImplemented { return err } it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address)) it.useStream = true return it.putAsStream(ctx, o) } func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { var cliPrm sdkClient.PrmObjectPutInit cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber) if it.prm.stoken != nil { cliPrm.WithinSession(*it.prm.stoken) } if it.prm.key != nil { cliPrm.UseKey(*it.prm.key) } if it.prm.btoken != nil { cliPrm.WithBearerToken(*it.prm.btoken) } wrt, err := it.client.ObjectPutInit(ctx, cliPrm) if err != nil { return err } if wrt.WriteHeader(ctx, *o) { wrt.WritePayloadChunk(ctx, o.Payload()) } res, err := wrt.Close(ctx) if res != nil { it.res.Status = res.Status() it.res.OID = res.StoredObjectID() } return err } func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) { if it.useStream { return false, nil } var cliPrm sdkClient.PrmObjectPutSingle cliPrm.SetCopiesNumber(it.prm.copiesNumber) cliPrm.UseKey(it.prm.key) if it.prm.stoken != nil { cliPrm.WithinSession(*it.prm.stoken) } if it.prm.btoken != nil { cliPrm.WithBearerToken(*it.prm.btoken) } cliPrm.SetObject(o.ToV2()) res, err := it.client.ObjectPutSingle(ctx, cliPrm) if err != nil && status.Code(err) == codes.Unimplemented { return false, err } if err == nil { id, _ := o.ID() it.res = ResObjectPut{ Status: res.Status(), OID: id, } if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) { return true, apistatus.ErrFromStatus(it.res.Status) } return true, nil } return true, err }