forked from TrueCloudLab/frostfs-sdk-go
Evgenii Stratonikov
ecb1fef78c
Here is a scenario: 1. `resolveFrostFSErrors` is false. 2. The first object part was not written, which was signified in status. 3. The second part was written correctly. Client now thinks that the object is written even though it was not. In theory we could also return only status, but client-side splitting is not a single RPC, so it makes sense. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
119 lines
3 KiB
Go
119 lines
3 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
|
var w objectWriterTransformer
|
|
w.it = internalTarget{
|
|
client: c,
|
|
prm: prm,
|
|
}
|
|
key := &c.prm.key
|
|
if prm.key != nil {
|
|
key = prm.key
|
|
}
|
|
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
|
Key: key,
|
|
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
|
MaxSize: prm.maxSize,
|
|
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
|
NetworkState: prm.epochSource,
|
|
})
|
|
return &w, nil
|
|
}
|
|
|
|
type objectWriterTransformer struct {
|
|
ot transformer.ChunkedObjectWriter
|
|
it internalTarget
|
|
err error
|
|
}
|
|
|
|
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
|
x.err = x.ot.WriteHeader(ctx, &hdr)
|
|
return x.err == nil
|
|
}
|
|
|
|
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
|
_, x.err = x.ot.Write(ctx, chunk)
|
|
return x.err == nil
|
|
}
|
|
|
|
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
|
ai, err := x.ot.Close(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if ai != nil && ai.ParentID != nil {
|
|
x.it.res.obj = *ai.ParentID
|
|
}
|
|
return x.it.res, nil
|
|
}
|
|
|
|
type internalTarget struct {
|
|
client *Client
|
|
res *ResObjectPut
|
|
prm PrmObjectPutInit
|
|
useStream bool
|
|
}
|
|
|
|
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
|
putSingleImplemented, err := it.tryPutSingle(ctx, o)
|
|
if putSingleImplemented {
|
|
return err
|
|
}
|
|
it.useStream = true
|
|
return it.putAsStream(ctx, o)
|
|
}
|
|
|
|
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
|
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if wrt.WriteHeader(ctx, *o) {
|
|
wrt.WritePayloadChunk(ctx, o.Payload())
|
|
}
|
|
it.res, err = wrt.Close(ctx)
|
|
if err == nil && !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) {
|
|
err = apistatus.ErrFromStatus(it.res.st)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
|
|
if it.useStream {
|
|
return false, nil
|
|
}
|
|
var prm PrmObjectPutSingle
|
|
prm.SetCopiesNumber(it.prm.copyNum)
|
|
prm.SetObject(o.ToV2())
|
|
prm.UseKey(prm.key)
|
|
prm.meta = it.prm.meta
|
|
|
|
res, err := it.client.ObjectPutSingle(ctx, prm)
|
|
if err != nil && status.Code(err) == codes.Unimplemented {
|
|
return false, err
|
|
}
|
|
|
|
if err == nil {
|
|
id, _ := o.ID()
|
|
it.res = &ResObjectPut{
|
|
statusRes: res.statusRes,
|
|
obj: id,
|
|
}
|
|
if !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) {
|
|
return true, apistatus.ErrFromStatus(it.res.st)
|
|
}
|
|
return true, nil
|
|
}
|
|
return true, err
|
|
}
|