package client import ( "context" buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) { var w objectWriterTransformer w.it = internalTarget{ client: c, prm: prm, } key := &c.prm.Key if prm.Key != nil { key = prm.Key } w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return &w.it }, MaxSize: prm.MaxSize, WithoutHomomorphicHash: prm.WithoutHomomorphHash, NetworkState: prm.EpochSource, Pool: prm.Pool, }) return &w, nil } type objectWriterTransformer struct { ot transformer.ChunkedObjectWriter it internalTarget err error } func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool { x.err = x.ot.WriteHeader(ctx, &hdr) return x.err == nil } func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool { _, x.err = x.ot.Write(ctx, chunk) return x.err == nil } func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) { if x.err != nil { return nil, x.err } ai, err := x.ot.Close(ctx) if err != nil { return nil, err } if ai != nil { x.it.res.epoch = ai.Epoch if ai.ParentID != nil { x.it.res.obj = *ai.ParentID } } return x.it.res, nil } type internalTarget struct { client *Client res *ResObjectPut prm PrmObjectPutInit useStream bool } func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error { putSingleImplemented, err := it.tryPutSingle(ctx, o) if putSingleImplemented { return err } it.useStream = true return it.putAsStream(ctx, o) } func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { wrt, err := it.client.objectPutInitRaw(ctx, it.prm) if err != nil { return err } if wrt.WriteHeader(ctx, *o) { wrt.WritePayloadChunk(ctx, o.Payload()) } it.res, err = wrt.Close(ctx) if err == nil && it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) { err = apistatus.ErrFromStatus(it.res.st) } return err } func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) { if it.useStream { return false, nil } prm := PrmObjectPutSingle{ XHeaders: it.prm.XHeaders, BearerToken: it.prm.BearerToken, Session: it.prm.Session, Local: it.prm.Local, CopiesNumber: it.prm.CopiesNumber, Object: o, Key: it.prm.Key, } res, err := it.client.ObjectPutSingle(ctx, prm) if err != nil && status.Code(err) == codes.Unimplemented { return false, err } if err == nil { it.returnBuffPool(o.Payload()) id, _ := o.ID() it.res = &ResObjectPut{ statusRes: res.statusRes, obj: id, epoch: res.epoch, } if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) { return true, apistatus.ErrFromStatus(it.res.st) } return true, nil } return true, err } func (it *internalTarget) returnBuffPool(playback []byte) { if it.prm.Pool == nil { return } var buffer buffPool.Buffer buffer.Data = playback it.prm.Pool.Put(&buffer) }