forked from TrueCloudLab/frostfs-api-go
Add client for object service
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
f4b734b58e
commit
5e25c76f40
4 changed files with 1028 additions and 0 deletions
408
v2/object/client.go
Normal file
408
v2/object/client.go
Normal file
|
@ -0,0 +1,408 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/client"
|
||||||
|
object "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client represents universal object
|
||||||
|
// transport client.
|
||||||
|
type Client struct {
|
||||||
|
getClient *getObjectClient
|
||||||
|
|
||||||
|
putClient *putObjectClient
|
||||||
|
|
||||||
|
headClient *headObjectClient
|
||||||
|
|
||||||
|
searchClient *searchObjectClient
|
||||||
|
|
||||||
|
deleteClient *deleteObjectClient
|
||||||
|
|
||||||
|
getRangeClient *getRangeObjectClient
|
||||||
|
|
||||||
|
getRangeHashClient *getRangeHashObjectClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option represents Client option.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
proto client.Protocol
|
||||||
|
|
||||||
|
globalOpts []client.Option
|
||||||
|
|
||||||
|
gRPC cfgGRPC
|
||||||
|
}
|
||||||
|
|
||||||
|
type cfgGRPC struct {
|
||||||
|
serviceClient object.ObjectServiceClient
|
||||||
|
|
||||||
|
grpcCallOpts []grpc.CallOption
|
||||||
|
|
||||||
|
callOpts []object.Option
|
||||||
|
|
||||||
|
client *object.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// types of upper level sub-clients, accessed directly from object.Client
|
||||||
|
type (
|
||||||
|
getObjectClient struct {
|
||||||
|
streamClientGetter func(context.Context, *GetRequest) (interface{}, error)
|
||||||
|
|
||||||
|
streamerConstructor func(interface{}) (GetObjectStreamer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
putObjectClient struct {
|
||||||
|
streamClientGetter func(context.Context) (interface{}, error)
|
||||||
|
|
||||||
|
streamerConstructor func(interface{}) (PutObjectStreamer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
headObjectClient struct {
|
||||||
|
requestConverter func(request *HeadRequest) interface{}
|
||||||
|
|
||||||
|
caller func(context.Context, interface{}) (interface{}, error)
|
||||||
|
|
||||||
|
responseConverter func(interface{}) *HeadResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteObjectClient struct {
|
||||||
|
requestConverter func(request *DeleteRequest) interface{}
|
||||||
|
|
||||||
|
caller func(context.Context, interface{}) (interface{}, error)
|
||||||
|
|
||||||
|
responseConverter func(interface{}) *DeleteResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
searchObjectClient struct {
|
||||||
|
streamClientGetter func(context.Context, *SearchRequest) (interface{}, error)
|
||||||
|
|
||||||
|
streamerConstructor func(interface{}) (SearchObjectStreamer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
getRangeObjectClient struct {
|
||||||
|
streamClientGetter func(context.Context, *GetRangeRequest) (interface{}, error)
|
||||||
|
|
||||||
|
streamerConstructor func(interface{}) (GetRangeObjectStreamer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
getRangeHashObjectClient struct {
|
||||||
|
requestConverter func(request *GetRangeHashRequest) interface{}
|
||||||
|
|
||||||
|
caller func(context.Context, interface{}) (interface{}, error)
|
||||||
|
|
||||||
|
responseConverter func(interface{}) *GetRangeHashResponse
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Client) Get(ctx context.Context, req *GetRequest) (GetObjectStreamer, error) {
|
||||||
|
cli, err := c.getClient.streamClientGetter(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not send get object request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.getClient.streamerConstructor(cli)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Put(ctx context.Context) (PutObjectStreamer, error) {
|
||||||
|
cli, err := c.putClient.streamClientGetter(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not prepare put object streamer")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.putClient.streamerConstructor(cli)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Head(ctx context.Context, req *HeadRequest) (*HeadResponse, error) {
|
||||||
|
resp, err := c.headClient.caller(ctx, c.headClient.requestConverter(req))
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not send head object request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.headClient.responseConverter(resp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Search(ctx context.Context, req *SearchRequest) (SearchObjectStreamer, error) {
|
||||||
|
cli, err := c.searchClient.streamClientGetter(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.searchClient.streamerConstructor(cli)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
|
||||||
|
resp, err := c.deleteClient.caller(ctx, c.deleteClient.requestConverter(req))
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not send delete object request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.deleteClient.responseConverter(resp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetRange(ctx context.Context, req *GetRangeRequest) (GetRangeObjectStreamer, error) {
|
||||||
|
cli, err := c.getRangeClient.streamClientGetter(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not send get object range request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.getRangeClient.streamerConstructor(cli)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetRangeHash(ctx context.Context, req *GetRangeHashRequest) (*GetRangeHashResponse, error) {
|
||||||
|
resp, err := c.getRangeHashClient.caller(ctx, c.getRangeHashClient.requestConverter(req))
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not send get object range hash request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.getRangeHashClient.responseConverter(resp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return &cfg{
|
||||||
|
proto: client.ProtoGRPC,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(opts ...Option) (*Client, error) {
|
||||||
|
cfg := defaultCfg()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch cfg.proto {
|
||||||
|
case client.ProtoGRPC:
|
||||||
|
var c *object.Client
|
||||||
|
if c, err = newGRPCClient(cfg); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Client{
|
||||||
|
getClient: newGRPCGetClient(c),
|
||||||
|
putClient: newGRPCPutClient(c),
|
||||||
|
headClient: newGRPCHeadClient(c),
|
||||||
|
searchClient: newGRPCSearchClient(c),
|
||||||
|
deleteClient: newGRPCDeleteClient(c),
|
||||||
|
getRangeClient: newGRPCGetRangeClient(c),
|
||||||
|
getRangeHashClient: newGRPCGetRangeHashClient(c),
|
||||||
|
}, nil
|
||||||
|
default:
|
||||||
|
err = client.ErrProtoUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.Wrapf(err, "could not create %s object client", cfg.proto)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCClient(cfg *cfg) (*object.Client, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if cfg.gRPC.client == nil {
|
||||||
|
if cfg.gRPC.serviceClient == nil {
|
||||||
|
conn, err := client.NewGRPCClientConn(cfg.globalOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not open gRPC getClient connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.gRPC.serviceClient = object.NewObjectServiceClient(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.gRPC.client, err = object.NewClient(
|
||||||
|
cfg.gRPC.serviceClient,
|
||||||
|
append(
|
||||||
|
cfg.gRPC.callOpts,
|
||||||
|
object.WithCallOptions(cfg.gRPC.grpcCallOpts),
|
||||||
|
)...,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg.gRPC.client, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCGetClient(c *object.Client) *getObjectClient {
|
||||||
|
cli := &getObjectClient{
|
||||||
|
streamClientGetter: func(ctx context.Context, request *GetRequest) (interface{}, error) {
|
||||||
|
return c.Get(ctx, GetRequestToGRPCMessage(request))
|
||||||
|
},
|
||||||
|
streamerConstructor: func(i interface{}) (GetObjectStreamer, error) {
|
||||||
|
cli, ok := i.(object.ObjectService_GetClient)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("can't convert interface to grpc get getClient")
|
||||||
|
}
|
||||||
|
return &getObjectStream{
|
||||||
|
recv: func() (*GetResponse, error) {
|
||||||
|
resp, err := cli.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return GetResponseFromGRPCMessage(resp), nil
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCPutClient(c *object.Client) *putObjectClient {
|
||||||
|
cli := &putObjectClient{
|
||||||
|
streamClientGetter: func(ctx context.Context) (interface{}, error) {
|
||||||
|
return c.Put(ctx)
|
||||||
|
},
|
||||||
|
streamerConstructor: func(i interface{}) (PutObjectStreamer, error) {
|
||||||
|
cli, ok := i.(object.ObjectService_PutClient)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("can't convert interface to grpc get getClient")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &putObjectStream{
|
||||||
|
send: func(request *PutRequest) error {
|
||||||
|
return cli.Send(PutRequestToGRPCMessage(request))
|
||||||
|
},
|
||||||
|
closeAndRecv: func() (*PutResponse, error) {
|
||||||
|
resp, err := cli.CloseAndRecv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return PutResponseFromGRPCMessage(resp), nil
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCHeadClient(c *object.Client) *headObjectClient {
|
||||||
|
return &headObjectClient{
|
||||||
|
requestConverter: func(req *HeadRequest) interface{} {
|
||||||
|
return HeadRequestToGRPCMessage(req)
|
||||||
|
},
|
||||||
|
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return c.Head(ctx, req.(*object.HeadRequest))
|
||||||
|
},
|
||||||
|
responseConverter: func(resp interface{}) *HeadResponse {
|
||||||
|
return HeadResponseFromGRPCMessage(resp.(*object.HeadResponse))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCSearchClient(c *object.Client) *searchObjectClient {
|
||||||
|
cli := &searchObjectClient{
|
||||||
|
streamClientGetter: func(ctx context.Context, request *SearchRequest) (interface{}, error) {
|
||||||
|
return c.Search(ctx, SearchRequestToGRPCMessage(request))
|
||||||
|
},
|
||||||
|
streamerConstructor: func(i interface{}) (SearchObjectStreamer, error) {
|
||||||
|
cli, ok := i.(object.ObjectService_SearchClient)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("can't convert interface to grpc get getClient")
|
||||||
|
}
|
||||||
|
return &searchObjectStream{
|
||||||
|
recv: func() (*SearchResponse, error) {
|
||||||
|
resp, err := cli.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return SearchResponseFromGRPCMessage(resp), nil
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCDeleteClient(c *object.Client) *deleteObjectClient {
|
||||||
|
return &deleteObjectClient{
|
||||||
|
requestConverter: func(req *DeleteRequest) interface{} {
|
||||||
|
return DeleteRequestToGRPCMessage(req)
|
||||||
|
},
|
||||||
|
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return c.Delete(ctx, req.(*object.DeleteRequest))
|
||||||
|
},
|
||||||
|
responseConverter: func(resp interface{}) *DeleteResponse {
|
||||||
|
return DeleteResponseFromGRPCMessage(resp.(*object.DeleteResponse))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCGetRangeClient(c *object.Client) *getRangeObjectClient {
|
||||||
|
cli := &getRangeObjectClient{
|
||||||
|
streamClientGetter: func(ctx context.Context, request *GetRangeRequest) (interface{}, error) {
|
||||||
|
return c.GetRange(ctx, GetRangeRequestToGRPCMessage(request))
|
||||||
|
},
|
||||||
|
streamerConstructor: func(i interface{}) (GetRangeObjectStreamer, error) {
|
||||||
|
cli, ok := i.(object.ObjectService_GetRangeClient)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("can't convert interface to grpc get getClient")
|
||||||
|
}
|
||||||
|
return &getRangeObjectStream{
|
||||||
|
recv: func() (*GetRangeResponse, error) {
|
||||||
|
resp, err := cli.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return GetRangeResponseFromGRPCMessage(resp), nil
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCGetRangeHashClient(c *object.Client) *getRangeHashObjectClient {
|
||||||
|
return &getRangeHashObjectClient{
|
||||||
|
requestConverter: func(req *GetRangeHashRequest) interface{} {
|
||||||
|
return GetRangeHashRequestToGRPCMessage(req)
|
||||||
|
},
|
||||||
|
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return c.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
|
||||||
|
},
|
||||||
|
responseConverter: func(resp interface{}) *GetRangeHashResponse {
|
||||||
|
return GetRangeHashResponseFromGRPCMessage(resp.(*object.GetRangeHashResponse))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithGlobalOpts(v ...client.Option) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
if len(v) > 0 {
|
||||||
|
c.globalOpts = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithGRPCServiceClient(v object.ObjectServiceClient) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gRPC.serviceClient = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithGRPCCallOpts(v []grpc.CallOption) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gRPC.grpcCallOpts = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithGRPCClientOpts(v []object.Option) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gRPC.callOpts = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithGRPCClient(v *object.Client) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gRPC.client = v
|
||||||
|
}
|
||||||
|
}
|
60
v2/object/client_stream.go
Normal file
60
v2/object/client_stream.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
type (
|
||||||
|
GetObjectStreamer interface {
|
||||||
|
Recv() (*GetResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
PutObjectStreamer interface {
|
||||||
|
Send(*PutRequest) error
|
||||||
|
CloseAndRecv() (*PutResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
SearchObjectStreamer interface {
|
||||||
|
Recv() (*SearchResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
GetRangeObjectStreamer interface {
|
||||||
|
Recv() (*GetRangeResponse, error)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
getObjectStream struct {
|
||||||
|
recv func() (*GetResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
putObjectStream struct {
|
||||||
|
send func(*PutRequest) error
|
||||||
|
|
||||||
|
closeAndRecv func() (*PutResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
searchObjectStream struct {
|
||||||
|
recv func() (*SearchResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
getRangeObjectStream struct {
|
||||||
|
recv func() (*GetRangeResponse, error)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *getObjectStream) Recv() (*GetResponse, error) {
|
||||||
|
return s.recv()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *putObjectStream) Send(request *PutRequest) error {
|
||||||
|
return p.send(request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *putObjectStream) CloseAndRecv() (*PutResponse, error) {
|
||||||
|
return p.closeAndRecv()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchObjectStream) Recv() (*SearchResponse, error) {
|
||||||
|
return s.recv()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *getRangeObjectStream) Recv() (*GetRangeResponse, error) {
|
||||||
|
return r.recv()
|
||||||
|
}
|
86
v2/object/grpc/client.go
Normal file
86
v2/object/grpc/client.go
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client wraps ObjectServiceClient
|
||||||
|
// with pre-defined configurations.
|
||||||
|
type Client struct {
|
||||||
|
*cfg
|
||||||
|
|
||||||
|
client ObjectServiceClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option represents Client option.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
callOpts []grpc.CallOption
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrNilObjectServiceClient is returned by functions that expect
|
||||||
|
// a non-nil ObjectServiceClient, but received nil.
|
||||||
|
var ErrNilObjectServiceClient = errors.New("object gRPC client is nil")
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return new(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates, initializes and returns a new Client instance.
|
||||||
|
//
|
||||||
|
// Options are applied one by one in order.
|
||||||
|
func NewClient(c ObjectServiceClient, opts ...Option) (*Client, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, ErrNilObjectServiceClient
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := defaultCfg()
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Client{
|
||||||
|
cfg: cfg,
|
||||||
|
client: c,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Get(ctx context.Context, req *GetRequest) (ObjectService_GetClient, error) {
|
||||||
|
return c.client.Get(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Put(ctx context.Context) (ObjectService_PutClient, error) {
|
||||||
|
return c.client.Put(ctx, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Head(ctx context.Context, req *HeadRequest) (*HeadResponse, error) {
|
||||||
|
return c.client.Head(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Search(ctx context.Context, req *SearchRequest) (ObjectService_SearchClient, error) {
|
||||||
|
return c.client.Search(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
|
||||||
|
return c.client.Delete(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetRange(ctx context.Context, req *GetRangeRequest) (ObjectService_GetRangeClient, error) {
|
||||||
|
return c.client.GetRange(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetRangeHash(ctx context.Context, req *GetRangeHashRequest) (*GetRangeHashResponse, error) {
|
||||||
|
return c.client.GetRangeHash(ctx, req, c.callOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithCallOptions returns Option that configures
|
||||||
|
// Client to attach call options to each rpc call.
|
||||||
|
func WithCallOptions(opts []grpc.CallOption) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.callOpts = opts
|
||||||
|
}
|
||||||
|
}
|
474
v2/object/test/client_test.go
Normal file
474
v2/object/test/client_test.go
Normal file
|
@ -0,0 +1,474 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
|
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/service"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||||
|
"github.com/nspcc-dev/neofs-crypto/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testGRPCClient struct {
|
||||||
|
server objectGRPC.ObjectServiceServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) Get(ctx context.Context, in *objectGRPC.GetRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_GetClient, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) Put(ctx context.Context, opts ...grpc.CallOption) (objectGRPC.ObjectService_PutClient, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) Delete(ctx context.Context, in *objectGRPC.DeleteRequest, opts ...grpc.CallOption) (*objectGRPC.DeleteResponse, error) {
|
||||||
|
return s.server.Delete(ctx, in)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) Head(ctx context.Context, in *objectGRPC.HeadRequest, opts ...grpc.CallOption) (*objectGRPC.HeadResponse, error) {
|
||||||
|
return s.server.Head(ctx, in)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) Search(ctx context.Context, in *objectGRPC.SearchRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_SearchClient, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) GetRange(ctx context.Context, in *objectGRPC.GetRangeRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_GetRangeClient, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCClient) GetRangeHash(ctx context.Context, in *objectGRPC.GetRangeHashRequest, opts ...grpc.CallOption) (*objectGRPC.GetRangeHashResponse, error) {
|
||||||
|
return s.server.GetRangeHash(ctx, in)
|
||||||
|
}
|
||||||
|
|
||||||
|
type testGRPCServer struct {
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
headResp *object.HeadResponse
|
||||||
|
delResp *object.DeleteResponse
|
||||||
|
getRangeHashResp *object.GetRangeHashResponse
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) Get(request *objectGRPC.GetRequest, server objectGRPC.ObjectService_GetServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) Put(server objectGRPC.ObjectService_PutServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) Delete(ctx context.Context, request *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) {
|
||||||
|
if s.err != nil {
|
||||||
|
return nil, s.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify request structure
|
||||||
|
if err := signature.VerifyServiceMessage(
|
||||||
|
object.DeleteRequestFromGRPCMessage(request),
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// sign response structure
|
||||||
|
if err := signature.SignServiceMessage(s.key, s.delResp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return object.DeleteResponseToGRPCMessage(s.delResp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) Head(ctx context.Context, request *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) {
|
||||||
|
if s.err != nil {
|
||||||
|
return nil, s.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify request structure
|
||||||
|
if err := signature.VerifyServiceMessage(
|
||||||
|
object.HeadRequestFromGRPCMessage(request),
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// sign response structure
|
||||||
|
if err := signature.SignServiceMessage(s.key, s.headResp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return object.HeadResponseToGRPCMessage(s.headResp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) Search(request *objectGRPC.SearchRequest, server objectGRPC.ObjectService_SearchServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) GetRange(request *objectGRPC.GetRangeRequest, server objectGRPC.ObjectService_GetRangeServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testGRPCServer) GetRangeHash(ctx context.Context, request *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
|
||||||
|
if s.err != nil {
|
||||||
|
return nil, s.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify request structure
|
||||||
|
if err := signature.VerifyServiceMessage(
|
||||||
|
object.GetRangeHashRequestFromGRPCMessage(request),
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// sign response structure
|
||||||
|
if err := signature.SignServiceMessage(s.key, s.getRangeHashResp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return object.GetRangeHashResponseToGRPCMessage(s.getRangeHashResp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHeadRequest() *object.HeadRequest {
|
||||||
|
cid := new(refs.ContainerID)
|
||||||
|
cid.SetValue([]byte{1, 2, 3})
|
||||||
|
|
||||||
|
oid := new(refs.ObjectID)
|
||||||
|
oid.SetValue([]byte{4, 5, 6})
|
||||||
|
|
||||||
|
addr := new(refs.Address)
|
||||||
|
addr.SetContainerID(cid)
|
||||||
|
addr.SetObjectID(oid)
|
||||||
|
|
||||||
|
body := new(object.HeadRequestBody)
|
||||||
|
body.SetAddress(addr)
|
||||||
|
|
||||||
|
meta := new(service.RequestMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
req := new(object.HeadRequest)
|
||||||
|
req.SetBody(body)
|
||||||
|
req.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHeadResponse() *object.HeadResponse {
|
||||||
|
shortHdr := new(object.ShortHeader)
|
||||||
|
shortHdr.SetCreationEpoch(100)
|
||||||
|
|
||||||
|
hdrPart := new(object.GetHeaderPartShort)
|
||||||
|
hdrPart.SetShortHeader(shortHdr)
|
||||||
|
|
||||||
|
body := new(object.HeadResponseBody)
|
||||||
|
body.SetHeaderPart(hdrPart)
|
||||||
|
|
||||||
|
meta := new(service.ResponseMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
resp := new(object.HeadResponse)
|
||||||
|
resp.SetBody(body)
|
||||||
|
resp.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDeleteRequest() *object.DeleteRequest {
|
||||||
|
cid := new(refs.ContainerID)
|
||||||
|
cid.SetValue([]byte{1, 2, 3})
|
||||||
|
|
||||||
|
oid := new(refs.ObjectID)
|
||||||
|
oid.SetValue([]byte{4, 5, 6})
|
||||||
|
|
||||||
|
addr := new(refs.Address)
|
||||||
|
addr.SetContainerID(cid)
|
||||||
|
addr.SetObjectID(oid)
|
||||||
|
|
||||||
|
body := new(object.DeleteRequestBody)
|
||||||
|
body.SetAddress(addr)
|
||||||
|
|
||||||
|
meta := new(service.RequestMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
req := new(object.DeleteRequest)
|
||||||
|
req.SetBody(body)
|
||||||
|
req.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDeleteResponse() *object.DeleteResponse {
|
||||||
|
body := new(object.DeleteResponseBody)
|
||||||
|
|
||||||
|
meta := new(service.ResponseMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
resp := new(object.DeleteResponse)
|
||||||
|
resp.SetBody(body)
|
||||||
|
resp.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func testGetRangeHashRequest() *object.GetRangeHashRequest {
|
||||||
|
cid := new(refs.ContainerID)
|
||||||
|
cid.SetValue([]byte{1, 2, 3})
|
||||||
|
|
||||||
|
oid := new(refs.ObjectID)
|
||||||
|
oid.SetValue([]byte{4, 5, 6})
|
||||||
|
|
||||||
|
addr := new(refs.Address)
|
||||||
|
addr.SetContainerID(cid)
|
||||||
|
addr.SetObjectID(oid)
|
||||||
|
|
||||||
|
body := new(object.GetRangeHashRequestBody)
|
||||||
|
body.SetAddress(addr)
|
||||||
|
|
||||||
|
meta := new(service.RequestMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
req := new(object.GetRangeHashRequest)
|
||||||
|
req.SetBody(body)
|
||||||
|
req.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
func testGetRangeHashResponse() *object.GetRangeHashResponse {
|
||||||
|
body := new(object.GetRangeHashResponseBody)
|
||||||
|
body.SetHashList([][]byte{{7, 8, 9}})
|
||||||
|
|
||||||
|
meta := new(service.ResponseMetaHeader)
|
||||||
|
meta.SetTTL(1)
|
||||||
|
meta.SetXHeaders([]*service.XHeader{})
|
||||||
|
|
||||||
|
resp := new(object.GetRangeHashResponse)
|
||||||
|
resp.SetBody(body)
|
||||||
|
resp.SetMetaHeader(meta)
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGRPCClient_Head(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
cliKey := test.DecodeKey(0)
|
||||||
|
srvKey := test.DecodeKey(1)
|
||||||
|
|
||||||
|
t.Run("gRPC server error", func(t *testing.T) {
|
||||||
|
srvErr := errors.New("test server error")
|
||||||
|
|
||||||
|
srv := &testGRPCServer{
|
||||||
|
err: srvErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
cli := &testGRPCClient{
|
||||||
|
server: srv,
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := object.New(object.WithGRPCServiceClient(cli))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.Head(ctx, new(object.HeadRequest))
|
||||||
|
require.True(t, errors.Is(err, srvErr))
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid request structure", func(t *testing.T) {
|
||||||
|
req := testHeadRequest()
|
||||||
|
|
||||||
|
require.Error(t, signature.VerifyServiceMessage(req))
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: new(testGRPCServer),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.Head(ctx, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct response", func(t *testing.T) {
|
||||||
|
req := testHeadRequest()
|
||||||
|
|
||||||
|
require.NoError(t, signature.SignServiceMessage(cliKey, req))
|
||||||
|
|
||||||
|
resp := testHeadResponse()
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: &testGRPCServer{
|
||||||
|
key: srvKey,
|
||||||
|
headResp: resp,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
r, err := c.Head(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, signature.VerifyServiceMessage(r))
|
||||||
|
require.Equal(t, resp.GetBody(), r.GetBody())
|
||||||
|
require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGRPCClient_Delete(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
cliKey := test.DecodeKey(0)
|
||||||
|
srvKey := test.DecodeKey(1)
|
||||||
|
|
||||||
|
t.Run("gRPC server error", func(t *testing.T) {
|
||||||
|
srvErr := errors.New("test server error")
|
||||||
|
|
||||||
|
srv := &testGRPCServer{
|
||||||
|
err: srvErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
cli := &testGRPCClient{
|
||||||
|
server: srv,
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := object.New(object.WithGRPCServiceClient(cli))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.Delete(ctx, new(object.DeleteRequest))
|
||||||
|
require.True(t, errors.Is(err, srvErr))
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid request structure", func(t *testing.T) {
|
||||||
|
req := testDeleteRequest()
|
||||||
|
|
||||||
|
require.Error(t, signature.VerifyServiceMessage(req))
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: new(testGRPCServer),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.Delete(ctx, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct response", func(t *testing.T) {
|
||||||
|
req := testDeleteRequest()
|
||||||
|
|
||||||
|
require.NoError(t, signature.SignServiceMessage(cliKey, req))
|
||||||
|
|
||||||
|
resp := testDeleteResponse()
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: &testGRPCServer{
|
||||||
|
key: srvKey,
|
||||||
|
delResp: resp,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
r, err := c.Delete(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, signature.VerifyServiceMessage(r))
|
||||||
|
require.Equal(t, resp.GetBody(), r.GetBody())
|
||||||
|
require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGRPCClient_GetRangeHash(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
cliKey := test.DecodeKey(0)
|
||||||
|
srvKey := test.DecodeKey(1)
|
||||||
|
|
||||||
|
t.Run("gRPC server error", func(t *testing.T) {
|
||||||
|
srvErr := errors.New("test server error")
|
||||||
|
|
||||||
|
srv := &testGRPCServer{
|
||||||
|
err: srvErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
cli := &testGRPCClient{
|
||||||
|
server: srv,
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := object.New(object.WithGRPCServiceClient(cli))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.GetRangeHash(ctx, new(object.GetRangeHashRequest))
|
||||||
|
require.True(t, errors.Is(err, srvErr))
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid request structure", func(t *testing.T) {
|
||||||
|
req := testGetRangeHashRequest()
|
||||||
|
|
||||||
|
require.Error(t, signature.VerifyServiceMessage(req))
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: new(testGRPCServer),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp, err := c.GetRangeHash(ctx, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct response", func(t *testing.T) {
|
||||||
|
req := testGetRangeHashRequest()
|
||||||
|
|
||||||
|
require.NoError(t, signature.SignServiceMessage(cliKey, req))
|
||||||
|
|
||||||
|
resp := testGetRangeHashResponse()
|
||||||
|
|
||||||
|
c, err := object.New(
|
||||||
|
object.WithGRPCServiceClient(
|
||||||
|
&testGRPCClient{
|
||||||
|
server: &testGRPCServer{
|
||||||
|
key: srvKey,
|
||||||
|
getRangeHashResp: resp,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
r, err := c.GetRangeHash(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, signature.VerifyServiceMessage(r))
|
||||||
|
require.Equal(t, resp.GetBody(), r.GetBody())
|
||||||
|
require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader())
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue