forked from TrueCloudLab/frostfs-api-go
[#263] pkg/client: Refactor the client to use raw protobuf client
Make `Client` to be the wrapper over raw protobuf client. Provide public method to get the underlying raw client. Change implementations of all methods with the new approach of the RPC execution. Additional changes: * key replaced from `New` argument to `WithDefaultPrivateKey` option; * `GetSelfBalance` is removed as non-viable; * `GetEACLWithSignature` is removed, `GetEACL` returns `EACLWithSignature`; * `AttachSessionToken` / `AttachBearerToken` are removed as non-viable; * redundant options are removed. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
5a9dd7ab3f
commit
c819909906
9 changed files with 380 additions and 1068 deletions
|
@ -10,10 +10,11 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/client"
|
||||
signer "github.com/nspcc-dev/neofs-api-go/util/signature"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/client"
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -114,7 +115,7 @@ type putObjectV2Writer struct {
|
|||
|
||||
req *v2object.PutRequest
|
||||
|
||||
stream v2object.PutObjectStreamer
|
||||
stream *rpcapi.PutRequestWriter
|
||||
}
|
||||
|
||||
type checksumType int
|
||||
|
@ -171,7 +172,7 @@ func (w *putObjectV2Writer) Write(p []byte) (int, error) {
|
|||
return 0, errors.Wrap(err, "could not sign chunk request message")
|
||||
}
|
||||
|
||||
if err := w.stream.Send(w.req); err != nil {
|
||||
if err := w.stream.Write(w.req); err != nil {
|
||||
return 0, errors.Wrap(err, "could not send chunk request message")
|
||||
}
|
||||
|
||||
|
@ -211,32 +212,11 @@ func (p *PutObjectParams) PayloadReader() io.Reader {
|
|||
}
|
||||
|
||||
func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.putObjectV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
stream, err := cli.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not open Put object stream")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,11 +233,12 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts .
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: v2Addr,
|
||||
verb: v2session.ObjectVerbPut,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -275,12 +256,21 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts .
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrapf(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// open stream
|
||||
resp := new(v2object.PutResponse)
|
||||
|
||||
stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "stream opening failed")
|
||||
}
|
||||
|
||||
// send init part
|
||||
if err := stream.Send(req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not send %T", req)
|
||||
err = stream.Write(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "sending the initial message to stream failed")
|
||||
}
|
||||
|
||||
// create payload bytes reader
|
||||
|
@ -305,18 +295,18 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts .
|
|||
// copy payload from reader to stream writer
|
||||
_, err = io.CopyBuffer(w, r, make([]byte, chunkSize))
|
||||
if err != nil && !errors.Is(errors.Cause(err), io.EOF) {
|
||||
return nil, errors.Wrap(err, "could not send payload bytes to Put object stream")
|
||||
return nil, errors.Wrap(err, "payload streaming failed")
|
||||
}
|
||||
|
||||
// close object stream and receive response from remote node
|
||||
resp, err := stream.CloseAndRecv()
|
||||
err = stream.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not close %T", stream)
|
||||
return nil, errors.Wrap(err, "closing the stream failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
// convert object identifier
|
||||
|
@ -377,39 +367,11 @@ func DeleteObject(ctx context.Context, c Client, p *DeleteObjectParams, opts ...
|
|||
//
|
||||
// If target of tombstone address is not set, the address is ignored.
|
||||
func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
if p.tombTgt == nil {
|
||||
p.tombTgt = new(objectAddressWriter)
|
||||
}
|
||||
|
||||
resp, err := c.deleteObjectV2(ctx, p, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addrV2 := resp.GetBody().GetTombstone()
|
||||
p.tombTgt.SetAddress(object.NewAddressFromV2(addrV2))
|
||||
|
||||
return nil
|
||||
default:
|
||||
return errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*v2object.DeleteResponse, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -422,11 +384,12 @@ func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams,
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbDelete,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -436,21 +399,25 @@ func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams,
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return errors.Wrap(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// send request
|
||||
resp, err := cli.Delete(ctx, req)
|
||||
resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not send %T", req)
|
||||
return errors.Wrap(err, "sending the request failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
if p.tombTgt != nil {
|
||||
p.tombTgt.SetAddress(object.NewAddressFromV2(resp.GetBody().GetTombstone()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams {
|
||||
|
@ -501,28 +468,14 @@ func (p *GetObjectParams) RawFlag() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.getObjectV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -535,11 +488,12 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts .
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbGet,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -550,40 +504,52 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts .
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrap(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// create Get object stream
|
||||
stream, err := cli.Get(ctx, req)
|
||||
// open stream
|
||||
stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Get object stream")
|
||||
return nil, errors.Wrap(err, "stream opening failed")
|
||||
}
|
||||
|
||||
var (
|
||||
headWas bool
|
||||
payload []byte
|
||||
obj = new(v2object.Object)
|
||||
resp = new(v2object.GetResponse)
|
||||
)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
resp, err := stream.Recv()
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
if !headWas {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "could not receive Get response")
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
case nil:
|
||||
return nil, errNilObjectPart
|
||||
default:
|
||||
return nil, errors.Errorf("unexpected object part %T", v)
|
||||
case *v2object.GetObjectPartInit:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
headWas = true
|
||||
|
||||
obj.SetObjectID(v.GetObjectID())
|
||||
obj.SetSignature(v.GetSignature())
|
||||
|
||||
|
@ -594,6 +560,10 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts .
|
|||
payload = make([]byte, 0, hdr.GetPayloadLength())
|
||||
}
|
||||
case *v2object.GetObjectPartChunk:
|
||||
if !headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
if p.w != nil {
|
||||
if _, err := p.w.Write(v.GetChunk()); err != nil {
|
||||
return nil, errors.Wrap(err, "could not write payload chunk")
|
||||
|
@ -604,8 +574,6 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts .
|
|||
case *v2object.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected Get object part type %T", v))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -674,27 +642,11 @@ func (p *ObjectHeaderParams) RawFlag() bool {
|
|||
}
|
||||
|
||||
func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.getObjectHeaderV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -707,11 +659,12 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbHead,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -723,18 +676,18 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrap(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// send Head request
|
||||
resp, err := cli.Head(ctx, req)
|
||||
resp, err := rpcapi.HeadObject(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not send %T", req)
|
||||
return nil, errors.Wrap(err, "sending the request failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -744,7 +697,7 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam
|
|||
|
||||
switch v := resp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, errNilObjectPart
|
||||
return nil, errors.Errorf("unexpected header type %T", v)
|
||||
case *v2object.ShortHeader:
|
||||
if !p.short {
|
||||
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
||||
|
@ -791,8 +744,6 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam
|
|||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected Head object type %T", v))
|
||||
}
|
||||
|
||||
obj := new(v2object.Object)
|
||||
|
@ -871,27 +822,11 @@ func (p *RangeDataParams) DataWriter() io.Writer {
|
|||
}
|
||||
|
||||
func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.objectPayloadRangeV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -904,11 +839,12 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbRange,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -920,11 +856,11 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrapf(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// create Get payload range stream
|
||||
stream, err := cli.GetRange(ctx, req)
|
||||
// open stream
|
||||
stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Get payload range stream")
|
||||
}
|
||||
|
@ -934,15 +870,17 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam
|
|||
payload = make([]byte, p.r.GetLength())
|
||||
}
|
||||
|
||||
resp := new(v2object.GetRangeResponse)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
resp, err := stream.Recv()
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "could not receive Get payload range response")
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
|
@ -952,7 +890,7 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam
|
|||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return nil, errNilObjectPart
|
||||
return nil, errors.Errorf("unexpected range type %T", v)
|
||||
case *v2object.GetRangePartChunk:
|
||||
if p.w != nil {
|
||||
if _, err = p.w.Write(v.GetChunk()); err != nil {
|
||||
|
@ -965,8 +903,6 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam
|
|||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected GetRange object type %T", v))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1048,27 +984,11 @@ func (c *clientImpl) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumP
|
|||
}
|
||||
|
||||
func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.objectPayloadRangeHashV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1081,11 +1001,12 @@ func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeCheck
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbRangeHash,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -1102,18 +1023,18 @@ func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeCheck
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrapf(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// send request
|
||||
resp, err := cli.GetRangeHash(ctx, req)
|
||||
resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not send %T", req)
|
||||
return nil, errors.Wrap(err, "sending the request failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
respBody := resp.GetBody()
|
||||
|
@ -1197,27 +1118,11 @@ func (p *SearchObjectParams) SearchFilters() object.SearchFilters {
|
|||
}
|
||||
|
||||
func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
||||
// check remote node version
|
||||
switch c.remoteNode.Version.Major() {
|
||||
case 2:
|
||||
return c.searchObjectV2(ctx, p, opts...)
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
||||
// create V2 Object client
|
||||
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Object V2 client")
|
||||
}
|
||||
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i].apply(&callOpts)
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1233,11 +1138,12 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams,
|
|||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: v2Addr,
|
||||
verb: v2session.ObjectVerbSearch,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign session token")
|
||||
return nil, errors.Wrap(err, "could not attach session token")
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
@ -1249,26 +1155,29 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams,
|
|||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not sign %T", req)
|
||||
return nil, errors.Wrapf(err, "signing the request failed")
|
||||
}
|
||||
|
||||
// create search stream
|
||||
stream, err := cli.Search(ctx, req)
|
||||
stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create search stream")
|
||||
return nil, errors.Wrap(err, "stream opening failed")
|
||||
}
|
||||
|
||||
var searchResult []*object.ID
|
||||
var (
|
||||
searchResult []*object.ID
|
||||
resp = new(v2object.SearchResponse)
|
||||
)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
resp, err := stream.Recv()
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "could not receive search response")
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
|
@ -1285,39 +1194,7 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams,
|
|||
return searchResult, nil
|
||||
}
|
||||
|
||||
func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) {
|
||||
switch proto {
|
||||
case GRPC:
|
||||
var err error
|
||||
|
||||
if opts.grpcOpts.objectClientV2 == nil {
|
||||
var optsV2 []v2object.Option
|
||||
|
||||
if opts.grpcOpts.conn != nil {
|
||||
optsV2 = []v2object.Option{
|
||||
v2object.WithGlobalOpts(
|
||||
client.WithGRPCConn(opts.grpcOpts.conn),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
optsV2 = []v2object.Option{
|
||||
v2object.WithGlobalOpts(
|
||||
client.WithNetworkAddress(opts.addr),
|
||||
client.WithDialTimeout(opts.dialTimeout),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
opts.grpcOpts.objectClientV2, err = v2object.NewClient(optsV2...)
|
||||
}
|
||||
|
||||
return opts.grpcOpts.objectClientV2, err
|
||||
default:
|
||||
return nil, errUnsupportedProtocol
|
||||
}
|
||||
}
|
||||
|
||||
func (c clientImpl) attachV2SessionToken(opts callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
|
||||
func (c *clientImpl) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
|
||||
if opts.session == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue