forked from TrueCloudLab/frostfs-sdk-go
[#83] pkg/client: Support status returns
Make all `Client` methods to return structured values and error. Parse v2 status messages in all RPC and provide status getter from all result structures. Returns status failures as status result instead of error. Interface changes: * all methods return `<method>Res` structure; * rename some methods to be more clear; * unify TZ and SHA256 objecy payload hashing in single method. Behavior changes: * client doesn't verify object header structure received via Object.Head. If the caller was tied to verification, now it must do it explicitly. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
9dcff95a29
commit
bf78cddf69
8 changed files with 835 additions and 372 deletions
495
client/object.go
495
client/object.go
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -15,6 +14,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
|
||||
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
signer "github.com/nspcc-dev/neofs-sdk-go/util/signature"
|
||||
|
@ -23,28 +23,25 @@ import (
|
|||
// Object contains methods for working with objects.
|
||||
type Object interface {
|
||||
// PutObject puts new object to NeoFS.
|
||||
PutObject(context.Context, *PutObjectParams, ...CallOption) (*object.ID, error)
|
||||
PutObject(context.Context, *PutObjectParams, ...CallOption) (*ObjectPutRes, error)
|
||||
|
||||
// DeleteObject deletes object to NeoFS.
|
||||
DeleteObject(context.Context, *DeleteObjectParams, ...CallOption) error
|
||||
DeleteObject(context.Context, *DeleteObjectParams, ...CallOption) (*ObjectDeleteRes, error)
|
||||
|
||||
// GetObject returns object stored in NeoFS.
|
||||
GetObject(context.Context, *GetObjectParams, ...CallOption) (*object.Object, error)
|
||||
GetObject(context.Context, *GetObjectParams, ...CallOption) (*ObjectGetRes, error)
|
||||
|
||||
// GetObjectHeader returns object header.
|
||||
GetObjectHeader(context.Context, *ObjectHeaderParams, ...CallOption) (*object.Object, error)
|
||||
// HeadObject returns object header.
|
||||
HeadObject(context.Context, *ObjectHeaderParams, ...CallOption) (*ObjectHeadRes, error)
|
||||
|
||||
// ObjectPayloadRangeData returns range of object payload.
|
||||
ObjectPayloadRangeData(context.Context, *RangeDataParams, ...CallOption) ([]byte, error)
|
||||
ObjectPayloadRangeData(context.Context, *RangeDataParams, ...CallOption) (*ObjectRangeRes, error)
|
||||
|
||||
// ObjectPayloadRangeSHA256 returns sha-256 hashes of object sub-ranges from NeoFS.
|
||||
ObjectPayloadRangeSHA256(context.Context, *RangeChecksumParams, ...CallOption) ([][sha256.Size]byte, error)
|
||||
// HashObjectPayloadRanges returns hashes of the object payload ranges from NeoFS.
|
||||
HashObjectPayloadRanges(context.Context, *RangeChecksumParams, ...CallOption) (*ObjectRangeHashRes, error)
|
||||
|
||||
// ObjectPayloadRangeTZ returns homomorphic hashes of object sub-ranges from NeoFS.
|
||||
ObjectPayloadRangeTZ(context.Context, *RangeChecksumParams, ...CallOption) ([][TZSize]byte, error)
|
||||
|
||||
// SearchObject searches for objects in NeoFS using provided parameters.
|
||||
SearchObject(context.Context, *SearchObjectParams, ...CallOption) ([]*object.ID, error)
|
||||
// SearchObjects searches for objects in NeoFS using provided parameters.
|
||||
SearchObjects(context.Context, *SearchObjectParams, ...CallOption) (*ObjectSearchRes, error)
|
||||
}
|
||||
|
||||
type PutObjectParams struct {
|
||||
|
@ -59,10 +56,6 @@ type ObjectAddressWriter interface {
|
|||
SetAddress(*object.Address)
|
||||
}
|
||||
|
||||
type objectAddressWriter struct {
|
||||
addr *object.Address
|
||||
}
|
||||
|
||||
type DeleteObjectParams struct {
|
||||
addr *object.Address
|
||||
|
||||
|
@ -98,7 +91,7 @@ type RangeDataParams struct {
|
|||
}
|
||||
|
||||
type RangeChecksumParams struct {
|
||||
typ checksumType
|
||||
tz bool
|
||||
|
||||
addr *object.Address
|
||||
|
||||
|
@ -141,17 +134,11 @@ const TZSize = 64
|
|||
|
||||
const searchQueryVersion uint32 = 1
|
||||
|
||||
var errNilObjectPart = errors.New("received nil object part")
|
||||
|
||||
func (w *objectAddressWriter) SetAddress(addr *object.Address) {
|
||||
w.addr = addr
|
||||
}
|
||||
|
||||
func rangesToV2(rs []*object.Range) []*v2object.Range {
|
||||
r2 := make([]*v2object.Range, len(rs))
|
||||
r2 := make([]*v2object.Range, 0, len(rs))
|
||||
|
||||
for i := range rs {
|
||||
r2[i] = rs[i].ToV2()
|
||||
r2 = append(r2, rs[i].ToV2())
|
||||
}
|
||||
|
||||
return r2
|
||||
|
@ -220,7 +207,21 @@ func (p *PutObjectParams) PayloadReader() io.Reader {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
||||
type ObjectPutRes struct {
|
||||
statusRes
|
||||
|
||||
id *object.ID
|
||||
}
|
||||
|
||||
func (x *ObjectPutRes) setID(id *object.ID) {
|
||||
x.id = id
|
||||
}
|
||||
|
||||
func (x ObjectPutRes) ID() *object.ID {
|
||||
return x.id
|
||||
}
|
||||
|
||||
func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*ObjectPutRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -313,20 +314,32 @@ func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...
|
|||
return nil, fmt.Errorf("closing the stream failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
res = new(ObjectPutRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// convert object identifier
|
||||
id := object.NewIDFromV2(resp.GetBody().GetObjectID())
|
||||
|
||||
return id, nil
|
||||
res.setID(id)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *DeleteObjectParams) WithAddress(v *object.Address) *DeleteObjectParams {
|
||||
|
@ -363,24 +376,24 @@ func (p *DeleteObjectParams) TombstoneAddressTarget() ObjectAddressWriter {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteObject is a wrapper over Client.DeleteObject method
|
||||
// that provides the ability to receive tombstone address
|
||||
// without setting a target in the parameters.
|
||||
func DeleteObject(ctx context.Context, c Client, p *DeleteObjectParams, opts ...CallOption) (*object.Address, error) {
|
||||
w := new(objectAddressWriter)
|
||||
type ObjectDeleteRes struct {
|
||||
statusRes
|
||||
|
||||
err := c.DeleteObject(ctx, p.WithTombstoneAddressTarget(w), opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tombAddr *object.Address
|
||||
}
|
||||
|
||||
return w.addr, nil
|
||||
func (x ObjectDeleteRes) TombstoneAddress() *object.Address {
|
||||
return x.tombAddr
|
||||
}
|
||||
|
||||
func (x *ObjectDeleteRes) setTombstoneAddress(addr *object.Address) {
|
||||
x.tombAddr = addr
|
||||
}
|
||||
|
||||
// DeleteObject removes object by address.
|
||||
//
|
||||
// If target of tombstone address is not set, the address is ignored.
|
||||
func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error {
|
||||
func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*ObjectDeleteRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -403,7 +416,7 @@ func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, op
|
|||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbDelete,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("could not attach session token: %w", err)
|
||||
return nil, fmt.Errorf("could not attach session token: %w", err)
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -413,30 +426,40 @@ func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, op
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return fmt.Errorf("signing the request failed: %w", err)
|
||||
return nil, fmt.Errorf("signing the request failed: %w", err)
|
||||
}
|
||||
|
||||
// send request
|
||||
resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending the request failed: %w", err)
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return err
|
||||
var (
|
||||
res = new(ObjectDeleteRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
addrv2 := resp.GetBody().GetTombstone()
|
||||
|
||||
if p.tombTgt != nil {
|
||||
p.tombTgt.SetAddress(object.NewAddressFromV2(resp.GetBody().GetTombstone()))
|
||||
}
|
||||
res.setTombstoneAddress(object.NewAddressFromV2(addrv2))
|
||||
|
||||
return nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams {
|
||||
|
@ -567,7 +590,32 @@ func (x *objectPayloadReader) Read(p []byte) (read int, err error) {
|
|||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
||||
type ObjectGetRes struct {
|
||||
statusRes
|
||||
objectRes
|
||||
}
|
||||
|
||||
type objectRes struct {
|
||||
obj *object.Object
|
||||
}
|
||||
|
||||
func (x *objectRes) setObject(obj *object.Object) {
|
||||
x.obj = obj
|
||||
}
|
||||
|
||||
func (x objectRes) Object() *object.Object {
|
||||
return x.obj
|
||||
}
|
||||
|
||||
func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) {
|
||||
var st apistatus.ServerInternal // specific API status should be used
|
||||
|
||||
apistatus.WriteInternalServerErr(&st, fmt.Errorf("unexpected message type %T", val))
|
||||
|
||||
res.setStatus(st)
|
||||
}
|
||||
|
||||
func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*ObjectGetRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -615,16 +663,27 @@ func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...
|
|||
payload []byte
|
||||
obj = new(v2object.Object)
|
||||
resp = new(v2object.GetResponse)
|
||||
|
||||
messageWas bool
|
||||
|
||||
res = new(ObjectGetRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
loop:
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !headWas {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
if !messageWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
break
|
||||
|
@ -633,19 +692,20 @@ loop:
|
|||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
messageWas = true
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected object part %T", v)
|
||||
return nil, errWrongMessageSeq
|
||||
case *v2object.GetObjectPartInit:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
|
@ -683,6 +743,10 @@ loop:
|
|||
payload = append(payload, v.GetChunk()...)
|
||||
}
|
||||
case *v2object.SplitInfo:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
|
@ -691,7 +755,9 @@ loop:
|
|||
obj.SetPayload(payload)
|
||||
|
||||
// convert the object
|
||||
return object.NewFromV2(obj), nil
|
||||
res.setObject(object.NewFromV2(obj))
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *ObjectHeaderParams) WithAddress(v *object.Address) *ObjectHeaderParams {
|
||||
|
@ -752,7 +818,12 @@ func (p *ObjectHeaderParams) RawFlag() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
||||
type ObjectHeadRes struct {
|
||||
statusRes
|
||||
objectRes
|
||||
}
|
||||
|
||||
func (c *clientImpl) HeadObject(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*ObjectHeadRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -796,14 +867,24 @@ func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams,
|
|||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
res = new(ObjectHeadRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -813,12 +894,12 @@ func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams,
|
|||
|
||||
switch v := resp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
writeUnexpectedMessageTypeErr(res, v)
|
||||
return res, nil
|
||||
case *v2object.ShortHeader:
|
||||
if !p.short {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*v2object.ShortHeader)(nil), (*v2object.HeaderWithSignature)(nil),
|
||||
)
|
||||
writeUnexpectedMessageTypeErr(res, v)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
h := v
|
||||
|
@ -833,29 +914,12 @@ func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams,
|
|||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *v2object.HeaderWithSignature:
|
||||
if p.short {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*v2object.HeaderWithSignature)(nil), (*v2object.ShortHeader)(nil),
|
||||
)
|
||||
writeUnexpectedMessageTypeErr(res, v)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
hdrWithSig := v
|
||||
if hdrWithSig == nil {
|
||||
return nil, errNilObjectPart
|
||||
}
|
||||
|
||||
hdr = hdrWithSig.GetHeader()
|
||||
idSig = hdrWithSig.GetSignature()
|
||||
|
||||
if err := signer.VerifyDataWithSource(
|
||||
signature.StableMarshalerWrapper{
|
||||
SM: p.addr.ObjectID().ToV2(),
|
||||
},
|
||||
func() (key, sig []byte) {
|
||||
return idSig.GetKey(), idSig.GetSign()
|
||||
},
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("incorrect object header signature: %w", err)
|
||||
}
|
||||
hdr = v.GetHeader()
|
||||
idSig = v.GetSignature()
|
||||
case *v2object.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
|
@ -869,8 +933,9 @@ func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams,
|
|||
raw := object.NewRawFromV2(obj)
|
||||
raw.SetID(p.addr.ObjectID())
|
||||
|
||||
// convert the object
|
||||
return raw.Object(), nil
|
||||
res.setObject(raw.Object())
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) WithAddress(v *object.Address) *RangeDataParams {
|
||||
|
@ -937,7 +1002,21 @@ func (p *RangeDataParams) DataWriter() io.Writer {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) {
|
||||
type ObjectRangeRes struct {
|
||||
statusRes
|
||||
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (x *ObjectRangeRes) setData(data []byte) {
|
||||
x.data = data
|
||||
}
|
||||
|
||||
func (x ObjectRangeRes) Data() []byte {
|
||||
return x.data
|
||||
}
|
||||
|
||||
func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) (*ObjectRangeRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -986,33 +1065,54 @@ func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataPar
|
|||
payload = make([]byte, 0, p.r.GetLength())
|
||||
}
|
||||
|
||||
resp := new(v2object.GetRangeResponse)
|
||||
var (
|
||||
resp = new(v2object.GetRangeResponse)
|
||||
|
||||
chunkWas, messageWas bool
|
||||
|
||||
res = new(ObjectRangeRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !messageWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
messageWas = true
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected range type %T", v)
|
||||
writeUnexpectedMessageTypeErr(res, v)
|
||||
return res, nil
|
||||
case *v2object.GetRangePartChunk:
|
||||
chunkWas = true
|
||||
|
||||
if p.w != nil {
|
||||
if _, err = p.w.Write(v.GetChunk()); err != nil {
|
||||
return nil, fmt.Errorf("could not write payload chunk: %w", err)
|
||||
|
@ -1021,13 +1121,19 @@ func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataPar
|
|||
payload = append(payload, v.GetChunk()...)
|
||||
}
|
||||
case *v2object.SplitInfo:
|
||||
if chunkWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
return payload, nil
|
||||
res.setData(payload)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) WithAddress(v *object.Address) *RangeChecksumParams {
|
||||
|
@ -1078,33 +1184,26 @@ func (p *RangeChecksumParams) Salt() []byte {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) withChecksumType(t checksumType) *RangeChecksumParams {
|
||||
if p != nil {
|
||||
p.typ = t
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) TZ() *RangeChecksumParams {
|
||||
p.tz = true
|
||||
return p
|
||||
}
|
||||
|
||||
func (c *clientImpl) ObjectPayloadRangeSHA256(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][sha256.Size]byte, error) {
|
||||
res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumSHA256), opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
type ObjectRangeHashRes struct {
|
||||
statusRes
|
||||
|
||||
return res.([][sha256.Size]byte), nil
|
||||
hashes [][]byte
|
||||
}
|
||||
|
||||
func (c *clientImpl) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][TZSize]byte, error) {
|
||||
res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumTZ), opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.([][TZSize]byte), nil
|
||||
func (x *ObjectRangeHashRes) setHashes(v [][]byte) {
|
||||
x.hashes = v
|
||||
}
|
||||
|
||||
func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) {
|
||||
func (x ObjectRangeHashRes) Hashes() [][]byte {
|
||||
return x.hashes
|
||||
}
|
||||
|
||||
func (c *clientImpl) HashObjectPayloadRanges(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (*ObjectRangeHashRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -1136,7 +1235,12 @@ func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksu
|
|||
body.SetAddress(p.addr.ToV2())
|
||||
body.SetSalt(p.salt)
|
||||
|
||||
typV2 := p.typ.toV2()
|
||||
typ := checksumSHA256
|
||||
if p.tz {
|
||||
typ = checksumTZ
|
||||
}
|
||||
|
||||
typV2 := typ.toV2()
|
||||
body.SetType(typV2)
|
||||
|
||||
rsV2 := rangesToV2(p.rs)
|
||||
|
@ -1153,61 +1257,28 @@ func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksu
|
|||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
res = new(ObjectRangeHashRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
respBody := resp.GetBody()
|
||||
respType := respBody.GetType()
|
||||
respHashes := respBody.GetHashList()
|
||||
procRes.statusRes = res
|
||||
|
||||
if t := p.typ.toV2(); respType != t {
|
||||
return nil, fmt.Errorf("invalid checksum type: expected %v, received %v", t, respType)
|
||||
} else if reqLn, respLn := len(rsV2), len(respHashes); reqLn != respLn {
|
||||
return nil, fmt.Errorf("wrong checksum number: expected %d, received %d", reqLn, respLn)
|
||||
}
|
||||
|
||||
var res interface{}
|
||||
|
||||
switch p.typ {
|
||||
case checksumSHA256:
|
||||
r := make([][sha256.Size]byte, 0, len(respHashes))
|
||||
|
||||
for i := range respHashes {
|
||||
if ln := len(respHashes[i]); ln != sha256.Size {
|
||||
return nil, fmt.Errorf("invalid checksum length: expected %d, received %d", sha256.Size, ln)
|
||||
}
|
||||
|
||||
cs := [sha256.Size]byte{}
|
||||
copy(cs[:], respHashes[i])
|
||||
|
||||
r = append(r, cs)
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
res = r
|
||||
case checksumTZ:
|
||||
r := make([][TZSize]byte, 0, len(respHashes))
|
||||
|
||||
for i := range respHashes {
|
||||
if ln := len(respHashes[i]); ln != TZSize {
|
||||
return nil, fmt.Errorf("invalid checksum length: expected %d, received %d", TZSize, ln)
|
||||
}
|
||||
|
||||
cs := [TZSize]byte{}
|
||||
copy(cs[:], respHashes[i])
|
||||
|
||||
r = append(r, cs)
|
||||
}
|
||||
|
||||
res = r
|
||||
return res, nil
|
||||
}
|
||||
|
||||
res.setHashes(resp.GetBody().GetHashList())
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
@ -1243,7 +1314,21 @@ func (p *SearchObjectParams) SearchFilters() object.SearchFilters {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
||||
type ObjectSearchRes struct {
|
||||
statusRes
|
||||
|
||||
ids []*object.ID
|
||||
}
|
||||
|
||||
func (x *ObjectSearchRes) setIDList(v []*object.ID) {
|
||||
x.ids = v
|
||||
}
|
||||
|
||||
func (x ObjectSearchRes) IDList() []*object.ID {
|
||||
return x.ids
|
||||
}
|
||||
|
||||
func (c *clientImpl) SearchObjects(ctx context.Context, p *SearchObjectParams, opts ...CallOption) (*ObjectSearchRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -1293,27 +1378,43 @@ func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, op
|
|||
var (
|
||||
searchResult []*object.ID
|
||||
resp = new(v2object.SearchResponse)
|
||||
|
||||
messageWas bool
|
||||
|
||||
res = new(ObjectSearchRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !messageWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(callOpts, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
messageWas = true
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
chunk := resp.GetBody().GetIDList()
|
||||
|
@ -1322,7 +1423,9 @@ func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, op
|
|||
}
|
||||
}
|
||||
|
||||
return searchResult, nil
|
||||
res.setIDList(searchResult)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *clientImpl) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue