From 5e25c76f4050a0a01d39451794aa730c5ec8a7ac Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 21 Aug 2020 14:36:13 +0300 Subject: [PATCH] Add client for object service Signed-off-by: Alex Vanin --- v2/object/client.go | 408 +++++++++++++++++++++++++++++ v2/object/client_stream.go | 60 +++++ v2/object/grpc/client.go | 86 ++++++ v2/object/test/client_test.go | 474 ++++++++++++++++++++++++++++++++++ 4 files changed, 1028 insertions(+) create mode 100644 v2/object/client.go create mode 100644 v2/object/client_stream.go create mode 100644 v2/object/grpc/client.go create mode 100644 v2/object/test/client_test.go diff --git a/v2/object/client.go b/v2/object/client.go new file mode 100644 index 00000000..8af24731 --- /dev/null +++ b/v2/object/client.go @@ -0,0 +1,408 @@ +package object + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/client" + object "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +// Client represents universal object +// transport client. +type Client struct { + getClient *getObjectClient + + putClient *putObjectClient + + headClient *headObjectClient + + searchClient *searchObjectClient + + deleteClient *deleteObjectClient + + getRangeClient *getRangeObjectClient + + getRangeHashClient *getRangeHashObjectClient +} + +// Option represents Client option. +type Option func(*cfg) + +type cfg struct { + proto client.Protocol + + globalOpts []client.Option + + gRPC cfgGRPC +} + +type cfgGRPC struct { + serviceClient object.ObjectServiceClient + + grpcCallOpts []grpc.CallOption + + callOpts []object.Option + + client *object.Client +} + +// types of upper level sub-clients, accessed directly from object.Client +type ( + getObjectClient struct { + streamClientGetter func(context.Context, *GetRequest) (interface{}, error) + + streamerConstructor func(interface{}) (GetObjectStreamer, error) + } + + putObjectClient struct { + streamClientGetter func(context.Context) (interface{}, error) + + streamerConstructor func(interface{}) (PutObjectStreamer, error) + } + + headObjectClient struct { + requestConverter func(request *HeadRequest) interface{} + + caller func(context.Context, interface{}) (interface{}, error) + + responseConverter func(interface{}) *HeadResponse + } + + deleteObjectClient struct { + requestConverter func(request *DeleteRequest) interface{} + + caller func(context.Context, interface{}) (interface{}, error) + + responseConverter func(interface{}) *DeleteResponse + } + + searchObjectClient struct { + streamClientGetter func(context.Context, *SearchRequest) (interface{}, error) + + streamerConstructor func(interface{}) (SearchObjectStreamer, error) + } + + getRangeObjectClient struct { + streamClientGetter func(context.Context, *GetRangeRequest) (interface{}, error) + + streamerConstructor func(interface{}) (GetRangeObjectStreamer, error) + } + + getRangeHashObjectClient struct { + requestConverter func(request *GetRangeHashRequest) interface{} + + caller func(context.Context, interface{}) (interface{}, error) + + responseConverter func(interface{}) *GetRangeHashResponse + } +) + +func (c *Client) Get(ctx context.Context, req *GetRequest) (GetObjectStreamer, error) { + cli, err := c.getClient.streamClientGetter(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not send get object request") + } + + return c.getClient.streamerConstructor(cli) +} + +func (c *Client) Put(ctx context.Context) (PutObjectStreamer, error) { + cli, err := c.putClient.streamClientGetter(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not prepare put object streamer") + } + + return c.putClient.streamerConstructor(cli) +} + +func (c *Client) Head(ctx context.Context, req *HeadRequest) (*HeadResponse, error) { + resp, err := c.headClient.caller(ctx, c.headClient.requestConverter(req)) + if err != nil { + return nil, errors.Wrap(err, "could not send head object request") + } + + return c.headClient.responseConverter(resp), nil +} + +func (c *Client) Search(ctx context.Context, req *SearchRequest) (SearchObjectStreamer, error) { + cli, err := c.searchClient.streamClientGetter(ctx, req) + if err != nil { + return nil, err + } + + return c.searchClient.streamerConstructor(cli) +} + +func (c *Client) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { + resp, err := c.deleteClient.caller(ctx, c.deleteClient.requestConverter(req)) + if err != nil { + return nil, errors.Wrap(err, "could not send delete object request") + } + + return c.deleteClient.responseConverter(resp), nil +} + +func (c *Client) GetRange(ctx context.Context, req *GetRangeRequest) (GetRangeObjectStreamer, error) { + cli, err := c.getRangeClient.streamClientGetter(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not send get object range request") + } + + return c.getRangeClient.streamerConstructor(cli) +} + +func (c *Client) GetRangeHash(ctx context.Context, req *GetRangeHashRequest) (*GetRangeHashResponse, error) { + resp, err := c.getRangeHashClient.caller(ctx, c.getRangeHashClient.requestConverter(req)) + if err != nil { + return nil, errors.Wrap(err, "could not send get object range hash request") + } + + return c.getRangeHashClient.responseConverter(resp), nil +} + +func defaultCfg() *cfg { + return &cfg{ + proto: client.ProtoGRPC, + } +} + +func New(opts ...Option) (*Client, error) { + cfg := defaultCfg() + + for i := range opts { + opts[i](cfg) + } + + var err error + + switch cfg.proto { + case client.ProtoGRPC: + var c *object.Client + if c, err = newGRPCClient(cfg); err != nil { + break + } + + return &Client{ + getClient: newGRPCGetClient(c), + putClient: newGRPCPutClient(c), + headClient: newGRPCHeadClient(c), + searchClient: newGRPCSearchClient(c), + deleteClient: newGRPCDeleteClient(c), + getRangeClient: newGRPCGetRangeClient(c), + getRangeHashClient: newGRPCGetRangeHashClient(c), + }, nil + default: + err = client.ErrProtoUnsupported + } + + return nil, errors.Wrapf(err, "could not create %s object client", cfg.proto) +} + +func newGRPCClient(cfg *cfg) (*object.Client, error) { + var err error + + if cfg.gRPC.client == nil { + if cfg.gRPC.serviceClient == nil { + conn, err := client.NewGRPCClientConn(cfg.globalOpts...) + if err != nil { + return nil, errors.Wrap(err, "could not open gRPC getClient connection") + } + + cfg.gRPC.serviceClient = object.NewObjectServiceClient(conn) + } + + cfg.gRPC.client, err = object.NewClient( + cfg.gRPC.serviceClient, + append( + cfg.gRPC.callOpts, + object.WithCallOptions(cfg.gRPC.grpcCallOpts), + )..., + ) + } + + return cfg.gRPC.client, err +} + +func newGRPCGetClient(c *object.Client) *getObjectClient { + cli := &getObjectClient{ + streamClientGetter: func(ctx context.Context, request *GetRequest) (interface{}, error) { + return c.Get(ctx, GetRequestToGRPCMessage(request)) + }, + streamerConstructor: func(i interface{}) (GetObjectStreamer, error) { + cli, ok := i.(object.ObjectService_GetClient) + if !ok { + return nil, errors.New("can't convert interface to grpc get getClient") + } + return &getObjectStream{ + recv: func() (*GetResponse, error) { + resp, err := cli.Recv() + if err != nil { + return nil, err + } + + return GetResponseFromGRPCMessage(resp), nil + }, + }, nil + }, + } + + return cli +} + +func newGRPCPutClient(c *object.Client) *putObjectClient { + cli := &putObjectClient{ + streamClientGetter: func(ctx context.Context) (interface{}, error) { + return c.Put(ctx) + }, + streamerConstructor: func(i interface{}) (PutObjectStreamer, error) { + cli, ok := i.(object.ObjectService_PutClient) + if !ok { + return nil, errors.New("can't convert interface to grpc get getClient") + } + + return &putObjectStream{ + send: func(request *PutRequest) error { + return cli.Send(PutRequestToGRPCMessage(request)) + }, + closeAndRecv: func() (*PutResponse, error) { + resp, err := cli.CloseAndRecv() + if err != nil { + return nil, err + } + + return PutResponseFromGRPCMessage(resp), nil + }, + }, nil + }, + } + + return cli +} + +func newGRPCHeadClient(c *object.Client) *headObjectClient { + return &headObjectClient{ + requestConverter: func(req *HeadRequest) interface{} { + return HeadRequestToGRPCMessage(req) + }, + caller: func(ctx context.Context, req interface{}) (interface{}, error) { + return c.Head(ctx, req.(*object.HeadRequest)) + }, + responseConverter: func(resp interface{}) *HeadResponse { + return HeadResponseFromGRPCMessage(resp.(*object.HeadResponse)) + }, + } +} + +func newGRPCSearchClient(c *object.Client) *searchObjectClient { + cli := &searchObjectClient{ + streamClientGetter: func(ctx context.Context, request *SearchRequest) (interface{}, error) { + return c.Search(ctx, SearchRequestToGRPCMessage(request)) + }, + streamerConstructor: func(i interface{}) (SearchObjectStreamer, error) { + cli, ok := i.(object.ObjectService_SearchClient) + if !ok { + return nil, errors.New("can't convert interface to grpc get getClient") + } + return &searchObjectStream{ + recv: func() (*SearchResponse, error) { + resp, err := cli.Recv() + if err != nil { + return nil, err + } + + return SearchResponseFromGRPCMessage(resp), nil + }, + }, nil + }, + } + + return cli +} + +func newGRPCDeleteClient(c *object.Client) *deleteObjectClient { + return &deleteObjectClient{ + requestConverter: func(req *DeleteRequest) interface{} { + return DeleteRequestToGRPCMessage(req) + }, + caller: func(ctx context.Context, req interface{}) (interface{}, error) { + return c.Delete(ctx, req.(*object.DeleteRequest)) + }, + responseConverter: func(resp interface{}) *DeleteResponse { + return DeleteResponseFromGRPCMessage(resp.(*object.DeleteResponse)) + }, + } +} + +func newGRPCGetRangeClient(c *object.Client) *getRangeObjectClient { + cli := &getRangeObjectClient{ + streamClientGetter: func(ctx context.Context, request *GetRangeRequest) (interface{}, error) { + return c.GetRange(ctx, GetRangeRequestToGRPCMessage(request)) + }, + streamerConstructor: func(i interface{}) (GetRangeObjectStreamer, error) { + cli, ok := i.(object.ObjectService_GetRangeClient) + if !ok { + return nil, errors.New("can't convert interface to grpc get getClient") + } + return &getRangeObjectStream{ + recv: func() (*GetRangeResponse, error) { + resp, err := cli.Recv() + if err != nil { + return nil, err + } + + return GetRangeResponseFromGRPCMessage(resp), nil + }, + }, nil + }, + } + + return cli +} + +func newGRPCGetRangeHashClient(c *object.Client) *getRangeHashObjectClient { + return &getRangeHashObjectClient{ + requestConverter: func(req *GetRangeHashRequest) interface{} { + return GetRangeHashRequestToGRPCMessage(req) + }, + caller: func(ctx context.Context, req interface{}) (interface{}, error) { + return c.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) + }, + responseConverter: func(resp interface{}) *GetRangeHashResponse { + return GetRangeHashResponseFromGRPCMessage(resp.(*object.GetRangeHashResponse)) + }, + } +} + +func WithGlobalOpts(v ...client.Option) Option { + return func(c *cfg) { + if len(v) > 0 { + c.globalOpts = v + } + } +} + +func WithGRPCServiceClient(v object.ObjectServiceClient) Option { + return func(c *cfg) { + c.gRPC.serviceClient = v + } +} + +func WithGRPCCallOpts(v []grpc.CallOption) Option { + return func(c *cfg) { + c.gRPC.grpcCallOpts = v + } +} + +func WithGRPCClientOpts(v []object.Option) Option { + return func(c *cfg) { + c.gRPC.callOpts = v + } +} + +func WithGRPCClient(v *object.Client) Option { + return func(c *cfg) { + c.gRPC.client = v + } +} diff --git a/v2/object/client_stream.go b/v2/object/client_stream.go new file mode 100644 index 00000000..0e64860f --- /dev/null +++ b/v2/object/client_stream.go @@ -0,0 +1,60 @@ +package object + +type ( + GetObjectStreamer interface { + Recv() (*GetResponse, error) + } + + PutObjectStreamer interface { + Send(*PutRequest) error + CloseAndRecv() (*PutResponse, error) + } + + SearchObjectStreamer interface { + Recv() (*SearchResponse, error) + } + + GetRangeObjectStreamer interface { + Recv() (*GetRangeResponse, error) + } +) + +type ( + getObjectStream struct { + recv func() (*GetResponse, error) + } + + putObjectStream struct { + send func(*PutRequest) error + + closeAndRecv func() (*PutResponse, error) + } + + searchObjectStream struct { + recv func() (*SearchResponse, error) + } + + getRangeObjectStream struct { + recv func() (*GetRangeResponse, error) + } +) + +func (s *getObjectStream) Recv() (*GetResponse, error) { + return s.recv() +} + +func (p *putObjectStream) Send(request *PutRequest) error { + return p.send(request) +} + +func (p *putObjectStream) CloseAndRecv() (*PutResponse, error) { + return p.closeAndRecv() +} + +func (s *searchObjectStream) Recv() (*SearchResponse, error) { + return s.recv() +} + +func (r *getRangeObjectStream) Recv() (*GetRangeResponse, error) { + return r.recv() +} diff --git a/v2/object/grpc/client.go b/v2/object/grpc/client.go new file mode 100644 index 00000000..30ff3e0f --- /dev/null +++ b/v2/object/grpc/client.go @@ -0,0 +1,86 @@ +package object + +import ( + "context" + + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +// Client wraps ObjectServiceClient +// with pre-defined configurations. +type Client struct { + *cfg + + client ObjectServiceClient +} + +// Option represents Client option. +type Option func(*cfg) + +type cfg struct { + callOpts []grpc.CallOption +} + +// ErrNilObjectServiceClient is returned by functions that expect +// a non-nil ObjectServiceClient, but received nil. +var ErrNilObjectServiceClient = errors.New("object gRPC client is nil") + +func defaultCfg() *cfg { + return new(cfg) +} + +// NewClient creates, initializes and returns a new Client instance. +// +// Options are applied one by one in order. +func NewClient(c ObjectServiceClient, opts ...Option) (*Client, error) { + if c == nil { + return nil, ErrNilObjectServiceClient + } + + cfg := defaultCfg() + for i := range opts { + opts[i](cfg) + } + + return &Client{ + cfg: cfg, + client: c, + }, nil +} + +func (c *Client) Get(ctx context.Context, req *GetRequest) (ObjectService_GetClient, error) { + return c.client.Get(ctx, req, c.callOpts...) +} + +func (c *Client) Put(ctx context.Context) (ObjectService_PutClient, error) { + return c.client.Put(ctx, c.callOpts...) +} + +func (c *Client) Head(ctx context.Context, req *HeadRequest) (*HeadResponse, error) { + return c.client.Head(ctx, req, c.callOpts...) +} + +func (c *Client) Search(ctx context.Context, req *SearchRequest) (ObjectService_SearchClient, error) { + return c.client.Search(ctx, req, c.callOpts...) +} + +func (c *Client) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { + return c.client.Delete(ctx, req, c.callOpts...) +} + +func (c *Client) GetRange(ctx context.Context, req *GetRangeRequest) (ObjectService_GetRangeClient, error) { + return c.client.GetRange(ctx, req, c.callOpts...) +} + +func (c *Client) GetRangeHash(ctx context.Context, req *GetRangeHashRequest) (*GetRangeHashResponse, error) { + return c.client.GetRangeHash(ctx, req, c.callOpts...) +} + +// WithCallOptions returns Option that configures +// Client to attach call options to each rpc call. +func WithCallOptions(opts []grpc.CallOption) Option { + return func(c *cfg) { + c.callOpts = opts + } +} diff --git a/v2/object/test/client_test.go b/v2/object/test/client_test.go new file mode 100644 index 00000000..87eb9206 --- /dev/null +++ b/v2/object/test/client_test.go @@ -0,0 +1,474 @@ +package main + +import ( + "context" + "crypto/ecdsa" + "errors" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/v2/service" + "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-crypto/test" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type testGRPCClient struct { + server objectGRPC.ObjectServiceServer +} + +func (s *testGRPCClient) Get(ctx context.Context, in *objectGRPC.GetRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_GetClient, error) { + panic("implement me") +} + +func (s *testGRPCClient) Put(ctx context.Context, opts ...grpc.CallOption) (objectGRPC.ObjectService_PutClient, error) { + panic("implement me") +} + +func (s *testGRPCClient) Delete(ctx context.Context, in *objectGRPC.DeleteRequest, opts ...grpc.CallOption) (*objectGRPC.DeleteResponse, error) { + return s.server.Delete(ctx, in) +} + +func (s *testGRPCClient) Head(ctx context.Context, in *objectGRPC.HeadRequest, opts ...grpc.CallOption) (*objectGRPC.HeadResponse, error) { + return s.server.Head(ctx, in) +} + +func (s *testGRPCClient) Search(ctx context.Context, in *objectGRPC.SearchRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_SearchClient, error) { + panic("implement me") +} + +func (s *testGRPCClient) GetRange(ctx context.Context, in *objectGRPC.GetRangeRequest, opts ...grpc.CallOption) (objectGRPC.ObjectService_GetRangeClient, error) { + panic("implement me") +} + +func (s *testGRPCClient) GetRangeHash(ctx context.Context, in *objectGRPC.GetRangeHashRequest, opts ...grpc.CallOption) (*objectGRPC.GetRangeHashResponse, error) { + return s.server.GetRangeHash(ctx, in) +} + +type testGRPCServer struct { + key *ecdsa.PrivateKey + headResp *object.HeadResponse + delResp *object.DeleteResponse + getRangeHashResp *object.GetRangeHashResponse + err error +} + +func (s *testGRPCServer) Get(request *objectGRPC.GetRequest, server objectGRPC.ObjectService_GetServer) error { + panic("implement me") +} + +func (s *testGRPCServer) Put(server objectGRPC.ObjectService_PutServer) error { + panic("implement me") +} + +func (s *testGRPCServer) Delete(ctx context.Context, request *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) { + if s.err != nil { + return nil, s.err + } + + // verify request structure + if err := signature.VerifyServiceMessage( + object.DeleteRequestFromGRPCMessage(request), + ); err != nil { + return nil, err + } + + // sign response structure + if err := signature.SignServiceMessage(s.key, s.delResp); err != nil { + return nil, err + } + + return object.DeleteResponseToGRPCMessage(s.delResp), nil +} + +func (s *testGRPCServer) Head(ctx context.Context, request *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) { + if s.err != nil { + return nil, s.err + } + + // verify request structure + if err := signature.VerifyServiceMessage( + object.HeadRequestFromGRPCMessage(request), + ); err != nil { + return nil, err + } + + // sign response structure + if err := signature.SignServiceMessage(s.key, s.headResp); err != nil { + return nil, err + } + + return object.HeadResponseToGRPCMessage(s.headResp), nil +} + +func (s *testGRPCServer) Search(request *objectGRPC.SearchRequest, server objectGRPC.ObjectService_SearchServer) error { + panic("implement me") +} + +func (s *testGRPCServer) GetRange(request *objectGRPC.GetRangeRequest, server objectGRPC.ObjectService_GetRangeServer) error { + panic("implement me") +} + +func (s *testGRPCServer) GetRangeHash(ctx context.Context, request *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { + if s.err != nil { + return nil, s.err + } + + // verify request structure + if err := signature.VerifyServiceMessage( + object.GetRangeHashRequestFromGRPCMessage(request), + ); err != nil { + return nil, err + } + + // sign response structure + if err := signature.SignServiceMessage(s.key, s.getRangeHashResp); err != nil { + return nil, err + } + + return object.GetRangeHashResponseToGRPCMessage(s.getRangeHashResp), nil +} + +func testHeadRequest() *object.HeadRequest { + cid := new(refs.ContainerID) + cid.SetValue([]byte{1, 2, 3}) + + oid := new(refs.ObjectID) + oid.SetValue([]byte{4, 5, 6}) + + addr := new(refs.Address) + addr.SetContainerID(cid) + addr.SetObjectID(oid) + + body := new(object.HeadRequestBody) + body.SetAddress(addr) + + meta := new(service.RequestMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + req := new(object.HeadRequest) + req.SetBody(body) + req.SetMetaHeader(meta) + + return req +} + +func testHeadResponse() *object.HeadResponse { + shortHdr := new(object.ShortHeader) + shortHdr.SetCreationEpoch(100) + + hdrPart := new(object.GetHeaderPartShort) + hdrPart.SetShortHeader(shortHdr) + + body := new(object.HeadResponseBody) + body.SetHeaderPart(hdrPart) + + meta := new(service.ResponseMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + resp := new(object.HeadResponse) + resp.SetBody(body) + resp.SetMetaHeader(meta) + + return resp +} + +func testDeleteRequest() *object.DeleteRequest { + cid := new(refs.ContainerID) + cid.SetValue([]byte{1, 2, 3}) + + oid := new(refs.ObjectID) + oid.SetValue([]byte{4, 5, 6}) + + addr := new(refs.Address) + addr.SetContainerID(cid) + addr.SetObjectID(oid) + + body := new(object.DeleteRequestBody) + body.SetAddress(addr) + + meta := new(service.RequestMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + req := new(object.DeleteRequest) + req.SetBody(body) + req.SetMetaHeader(meta) + + return req +} + +func testDeleteResponse() *object.DeleteResponse { + body := new(object.DeleteResponseBody) + + meta := new(service.ResponseMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + resp := new(object.DeleteResponse) + resp.SetBody(body) + resp.SetMetaHeader(meta) + + return resp +} + +func testGetRangeHashRequest() *object.GetRangeHashRequest { + cid := new(refs.ContainerID) + cid.SetValue([]byte{1, 2, 3}) + + oid := new(refs.ObjectID) + oid.SetValue([]byte{4, 5, 6}) + + addr := new(refs.Address) + addr.SetContainerID(cid) + addr.SetObjectID(oid) + + body := new(object.GetRangeHashRequestBody) + body.SetAddress(addr) + + meta := new(service.RequestMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + req := new(object.GetRangeHashRequest) + req.SetBody(body) + req.SetMetaHeader(meta) + + return req +} + +func testGetRangeHashResponse() *object.GetRangeHashResponse { + body := new(object.GetRangeHashResponseBody) + body.SetHashList([][]byte{{7, 8, 9}}) + + meta := new(service.ResponseMetaHeader) + meta.SetTTL(1) + meta.SetXHeaders([]*service.XHeader{}) + + resp := new(object.GetRangeHashResponse) + resp.SetBody(body) + resp.SetMetaHeader(meta) + + return resp +} + +func TestGRPCClient_Head(t *testing.T) { + ctx := context.TODO() + + cliKey := test.DecodeKey(0) + srvKey := test.DecodeKey(1) + + t.Run("gRPC server error", func(t *testing.T) { + srvErr := errors.New("test server error") + + srv := &testGRPCServer{ + err: srvErr, + } + + cli := &testGRPCClient{ + server: srv, + } + + c, err := object.New(object.WithGRPCServiceClient(cli)) + require.NoError(t, err) + + resp, err := c.Head(ctx, new(object.HeadRequest)) + require.True(t, errors.Is(err, srvErr)) + require.Nil(t, resp) + }) + + t.Run("invalid request structure", func(t *testing.T) { + req := testHeadRequest() + + require.Error(t, signature.VerifyServiceMessage(req)) + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: new(testGRPCServer), + }, + ), + ) + require.NoError(t, err) + + resp, err := c.Head(ctx, req) + require.Error(t, err) + require.Nil(t, resp) + }) + + t.Run("correct response", func(t *testing.T) { + req := testHeadRequest() + + require.NoError(t, signature.SignServiceMessage(cliKey, req)) + + resp := testHeadResponse() + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: &testGRPCServer{ + key: srvKey, + headResp: resp, + }, + }, + ), + ) + require.NoError(t, err) + + r, err := c.Head(ctx, req) + require.NoError(t, err) + + require.NoError(t, signature.VerifyServiceMessage(r)) + require.Equal(t, resp.GetBody(), r.GetBody()) + require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader()) + }) +} + +func TestGRPCClient_Delete(t *testing.T) { + ctx := context.TODO() + + cliKey := test.DecodeKey(0) + srvKey := test.DecodeKey(1) + + t.Run("gRPC server error", func(t *testing.T) { + srvErr := errors.New("test server error") + + srv := &testGRPCServer{ + err: srvErr, + } + + cli := &testGRPCClient{ + server: srv, + } + + c, err := object.New(object.WithGRPCServiceClient(cli)) + require.NoError(t, err) + + resp, err := c.Delete(ctx, new(object.DeleteRequest)) + require.True(t, errors.Is(err, srvErr)) + require.Nil(t, resp) + }) + + t.Run("invalid request structure", func(t *testing.T) { + req := testDeleteRequest() + + require.Error(t, signature.VerifyServiceMessage(req)) + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: new(testGRPCServer), + }, + ), + ) + require.NoError(t, err) + + resp, err := c.Delete(ctx, req) + require.Error(t, err) + require.Nil(t, resp) + }) + + t.Run("correct response", func(t *testing.T) { + req := testDeleteRequest() + + require.NoError(t, signature.SignServiceMessage(cliKey, req)) + + resp := testDeleteResponse() + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: &testGRPCServer{ + key: srvKey, + delResp: resp, + }, + }, + ), + ) + require.NoError(t, err) + + r, err := c.Delete(ctx, req) + require.NoError(t, err) + + require.NoError(t, signature.VerifyServiceMessage(r)) + require.Equal(t, resp.GetBody(), r.GetBody()) + require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader()) + }) +} + +func TestGRPCClient_GetRangeHash(t *testing.T) { + ctx := context.TODO() + + cliKey := test.DecodeKey(0) + srvKey := test.DecodeKey(1) + + t.Run("gRPC server error", func(t *testing.T) { + srvErr := errors.New("test server error") + + srv := &testGRPCServer{ + err: srvErr, + } + + cli := &testGRPCClient{ + server: srv, + } + + c, err := object.New(object.WithGRPCServiceClient(cli)) + require.NoError(t, err) + + resp, err := c.GetRangeHash(ctx, new(object.GetRangeHashRequest)) + require.True(t, errors.Is(err, srvErr)) + require.Nil(t, resp) + }) + + t.Run("invalid request structure", func(t *testing.T) { + req := testGetRangeHashRequest() + + require.Error(t, signature.VerifyServiceMessage(req)) + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: new(testGRPCServer), + }, + ), + ) + require.NoError(t, err) + + resp, err := c.GetRangeHash(ctx, req) + require.Error(t, err) + require.Nil(t, resp) + }) + + t.Run("correct response", func(t *testing.T) { + req := testGetRangeHashRequest() + + require.NoError(t, signature.SignServiceMessage(cliKey, req)) + + resp := testGetRangeHashResponse() + + c, err := object.New( + object.WithGRPCServiceClient( + &testGRPCClient{ + server: &testGRPCServer{ + key: srvKey, + getRangeHashResp: resp, + }, + }, + ), + ) + require.NoError(t, err) + + r, err := c.GetRangeHash(ctx, req) + require.NoError(t, err) + + require.NoError(t, signature.VerifyServiceMessage(r)) + require.Equal(t, resp.GetBody(), r.GetBody()) + require.Equal(t, resp.GetMetaHeader(), r.GetMetaHeader()) + }) +}