144 lines
3.5 KiB
Go
144 lines
3.5 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
|
|
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
|
var w objectWriterTransformer
|
|
w.it = internalTarget{
|
|
client: c,
|
|
prm: prm,
|
|
}
|
|
key := &c.prm.Key
|
|
if prm.Key != nil {
|
|
key = prm.Key
|
|
}
|
|
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
|
Key: key,
|
|
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
|
MaxSize: prm.MaxSize,
|
|
WithoutHomomorphicHash: prm.WithoutHomomorphHash,
|
|
NetworkState: prm.EpochSource,
|
|
Pool: prm.Pool,
|
|
})
|
|
return &w, nil
|
|
}
|
|
|
|
type objectWriterTransformer struct {
|
|
ot transformer.ChunkedObjectWriter
|
|
it internalTarget
|
|
err error
|
|
}
|
|
|
|
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
|
x.err = x.ot.WriteHeader(ctx, &hdr)
|
|
return x.err == nil
|
|
}
|
|
|
|
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
|
_, x.err = x.ot.Write(ctx, chunk)
|
|
return x.err == nil
|
|
}
|
|
|
|
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
|
if x.err != nil {
|
|
return nil, x.err
|
|
}
|
|
|
|
ai, err := x.ot.Close(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if ai != nil {
|
|
x.it.res.epoch = ai.Epoch
|
|
if ai.ParentID != nil {
|
|
x.it.res.obj = *ai.ParentID
|
|
}
|
|
}
|
|
return x.it.res, nil
|
|
}
|
|
|
|
type internalTarget struct {
|
|
client *Client
|
|
res *ResObjectPut
|
|
prm PrmObjectPutInit
|
|
useStream bool
|
|
}
|
|
|
|
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
|
putSingleImplemented, err := it.tryPutSingle(ctx, o)
|
|
if putSingleImplemented {
|
|
return err
|
|
}
|
|
it.useStream = true
|
|
return it.putAsStream(ctx, o)
|
|
}
|
|
|
|
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
|
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if wrt.WriteHeader(ctx, *o) {
|
|
wrt.WritePayloadChunk(ctx, o.Payload())
|
|
}
|
|
it.res, err = wrt.Close(ctx)
|
|
if err == nil && it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
|
|
err = apistatus.ErrFromStatus(it.res.st)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
|
|
if it.useStream {
|
|
return false, nil
|
|
}
|
|
|
|
prm := PrmObjectPutSingle{
|
|
XHeaders: it.prm.XHeaders,
|
|
BearerToken: it.prm.BearerToken,
|
|
Session: it.prm.Session,
|
|
Local: it.prm.Local,
|
|
CopiesNumber: it.prm.CopiesNumber,
|
|
Object: o,
|
|
Key: it.prm.Key,
|
|
}
|
|
|
|
res, err := it.client.ObjectPutSingle(ctx, prm)
|
|
if err != nil && status.Code(err) == codes.Unimplemented {
|
|
return false, err
|
|
}
|
|
|
|
if err == nil {
|
|
it.returnBuffPool(o.Payload())
|
|
id, _ := o.ID()
|
|
it.res = &ResObjectPut{
|
|
statusRes: res.statusRes,
|
|
obj: id,
|
|
epoch: res.epoch,
|
|
}
|
|
if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
|
|
return true, apistatus.ErrFromStatus(it.res.st)
|
|
}
|
|
return true, nil
|
|
}
|
|
return true, err
|
|
}
|
|
|
|
func (it *internalTarget) returnBuffPool(playback []byte) {
|
|
if it.prm.Pool == nil {
|
|
return
|
|
}
|
|
var buffer buffPool.Buffer
|
|
buffer.Data = playback
|
|
it.prm.Pool.Put(&buffer)
|
|
}
|