frostfs-api-go/v2/object/client.go

409 lines
10 KiB
Go
Raw Normal View History

package object
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/client"
object "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Client represents universal object
// transport client.
type Client struct {
getClient *getObjectClient
putClient *putObjectClient
headClient *headObjectClient
searchClient *searchObjectClient
deleteClient *deleteObjectClient
getRangeClient *getRangeObjectClient
getRangeHashClient *getRangeHashObjectClient
}
// Option represents Client option.
type Option func(*cfg)
type cfg struct {
proto client.Protocol
globalOpts []client.Option
gRPC cfgGRPC
}
type cfgGRPC struct {
serviceClient object.ObjectServiceClient
grpcCallOpts []grpc.CallOption
callOpts []object.Option
client *object.Client
}
// types of upper level sub-clients, accessed directly from object.Client
type (
getObjectClient struct {
streamClientGetter func(context.Context, *GetRequest) (interface{}, error)
streamerConstructor func(interface{}) (GetObjectStreamer, error)
}
putObjectClient struct {
streamClientGetter func(context.Context) (interface{}, error)
streamerConstructor func(interface{}) (PutObjectStreamer, error)
}
headObjectClient struct {
requestConverter func(request *HeadRequest) interface{}
caller func(context.Context, interface{}) (interface{}, error)
responseConverter func(interface{}) *HeadResponse
}
deleteObjectClient struct {
requestConverter func(request *DeleteRequest) interface{}
caller func(context.Context, interface{}) (interface{}, error)
responseConverter func(interface{}) *DeleteResponse
}
searchObjectClient struct {
streamClientGetter func(context.Context, *SearchRequest) (interface{}, error)
streamerConstructor func(interface{}) (SearchObjectStreamer, error)
}
getRangeObjectClient struct {
streamClientGetter func(context.Context, *GetRangeRequest) (interface{}, error)
streamerConstructor func(interface{}) (GetRangeObjectStreamer, error)
}
getRangeHashObjectClient struct {
requestConverter func(request *GetRangeHashRequest) interface{}
caller func(context.Context, interface{}) (interface{}, error)
responseConverter func(interface{}) *GetRangeHashResponse
}
)
func (c *Client) Get(ctx context.Context, req *GetRequest) (GetObjectStreamer, error) {
cli, err := c.getClient.streamClientGetter(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "could not send get object request")
}
return c.getClient.streamerConstructor(cli)
}
func (c *Client) Put(ctx context.Context) (PutObjectStreamer, error) {
cli, err := c.putClient.streamClientGetter(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not prepare put object streamer")
}
return c.putClient.streamerConstructor(cli)
}
func (c *Client) Head(ctx context.Context, req *HeadRequest) (*HeadResponse, error) {
resp, err := c.headClient.caller(ctx, c.headClient.requestConverter(req))
if err != nil {
return nil, errors.Wrap(err, "could not send head object request")
}
return c.headClient.responseConverter(resp), nil
}
func (c *Client) Search(ctx context.Context, req *SearchRequest) (SearchObjectStreamer, error) {
cli, err := c.searchClient.streamClientGetter(ctx, req)
if err != nil {
return nil, err
}
return c.searchClient.streamerConstructor(cli)
}
func (c *Client) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
resp, err := c.deleteClient.caller(ctx, c.deleteClient.requestConverter(req))
if err != nil {
return nil, errors.Wrap(err, "could not send delete object request")
}
return c.deleteClient.responseConverter(resp), nil
}
func (c *Client) GetRange(ctx context.Context, req *GetRangeRequest) (GetRangeObjectStreamer, error) {
cli, err := c.getRangeClient.streamClientGetter(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "could not send get object range request")
}
return c.getRangeClient.streamerConstructor(cli)
}
func (c *Client) GetRangeHash(ctx context.Context, req *GetRangeHashRequest) (*GetRangeHashResponse, error) {
resp, err := c.getRangeHashClient.caller(ctx, c.getRangeHashClient.requestConverter(req))
if err != nil {
return nil, errors.Wrap(err, "could not send get object range hash request")
}
return c.getRangeHashClient.responseConverter(resp), nil
}
func defaultCfg() *cfg {
return &cfg{
proto: client.ProtoGRPC,
}
}
func New(opts ...Option) (*Client, error) {
cfg := defaultCfg()
for i := range opts {
opts[i](cfg)
}
var err error
switch cfg.proto {
case client.ProtoGRPC:
var c *object.Client
if c, err = newGRPCClient(cfg); err != nil {
break
}
return &Client{
getClient: newGRPCGetClient(c),
putClient: newGRPCPutClient(c),
headClient: newGRPCHeadClient(c),
searchClient: newGRPCSearchClient(c),
deleteClient: newGRPCDeleteClient(c),
getRangeClient: newGRPCGetRangeClient(c),
getRangeHashClient: newGRPCGetRangeHashClient(c),
}, nil
default:
err = client.ErrProtoUnsupported
}
return nil, errors.Wrapf(err, "could not create %s object client", cfg.proto)
}
func newGRPCClient(cfg *cfg) (*object.Client, error) {
var err error
if cfg.gRPC.client == nil {
if cfg.gRPC.serviceClient == nil {
conn, err := client.NewGRPCClientConn(cfg.globalOpts...)
if err != nil {
return nil, errors.Wrap(err, "could not open gRPC getClient connection")
}
cfg.gRPC.serviceClient = object.NewObjectServiceClient(conn)
}
cfg.gRPC.client, err = object.NewClient(
cfg.gRPC.serviceClient,
append(
cfg.gRPC.callOpts,
object.WithCallOptions(cfg.gRPC.grpcCallOpts),
)...,
)
}
return cfg.gRPC.client, err
}
func newGRPCGetClient(c *object.Client) *getObjectClient {
cli := &getObjectClient{
streamClientGetter: func(ctx context.Context, request *GetRequest) (interface{}, error) {
return c.Get(ctx, GetRequestToGRPCMessage(request))
},
streamerConstructor: func(i interface{}) (GetObjectStreamer, error) {
cli, ok := i.(object.ObjectService_GetClient)
if !ok {
return nil, errors.New("can't convert interface to grpc get getClient")
}
return &getObjectGRPCStream{
recv: func() (*GetResponse, error) {
resp, err := cli.Recv()
if err != nil {
return nil, err
}
return GetResponseFromGRPCMessage(resp), nil
},
}, nil
},
}
return cli
}
func newGRPCPutClient(c *object.Client) *putObjectClient {
cli := &putObjectClient{
streamClientGetter: func(ctx context.Context) (interface{}, error) {
return c.Put(ctx)
},
streamerConstructor: func(i interface{}) (PutObjectStreamer, error) {
cli, ok := i.(object.ObjectService_PutClient)
if !ok {
return nil, errors.New("can't convert interface to grpc get getClient")
}
return &putObjectGRPCStream{
send: func(request *PutRequest) error {
return cli.Send(PutRequestToGRPCMessage(request))
},
closeAndRecv: func() (*PutResponse, error) {
resp, err := cli.CloseAndRecv()
if err != nil {
return nil, err
}
return PutResponseFromGRPCMessage(resp), nil
},
}, nil
},
}
return cli
}
func newGRPCHeadClient(c *object.Client) *headObjectClient {
return &headObjectClient{
requestConverter: func(req *HeadRequest) interface{} {
return HeadRequestToGRPCMessage(req)
},
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
return c.Head(ctx, req.(*object.HeadRequest))
},
responseConverter: func(resp interface{}) *HeadResponse {
return HeadResponseFromGRPCMessage(resp.(*object.HeadResponse))
},
}
}
func newGRPCSearchClient(c *object.Client) *searchObjectClient {
cli := &searchObjectClient{
streamClientGetter: func(ctx context.Context, request *SearchRequest) (interface{}, error) {
return c.Search(ctx, SearchRequestToGRPCMessage(request))
},
streamerConstructor: func(i interface{}) (SearchObjectStreamer, error) {
cli, ok := i.(object.ObjectService_SearchClient)
if !ok {
return nil, errors.New("can't convert interface to grpc get getClient")
}
return &searchObjectGRPCStream{
recv: func() (*SearchResponse, error) {
resp, err := cli.Recv()
if err != nil {
return nil, err
}
return SearchResponseFromGRPCMessage(resp), nil
},
}, nil
},
}
return cli
}
func newGRPCDeleteClient(c *object.Client) *deleteObjectClient {
return &deleteObjectClient{
requestConverter: func(req *DeleteRequest) interface{} {
return DeleteRequestToGRPCMessage(req)
},
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
return c.Delete(ctx, req.(*object.DeleteRequest))
},
responseConverter: func(resp interface{}) *DeleteResponse {
return DeleteResponseFromGRPCMessage(resp.(*object.DeleteResponse))
},
}
}
func newGRPCGetRangeClient(c *object.Client) *getRangeObjectClient {
cli := &getRangeObjectClient{
streamClientGetter: func(ctx context.Context, request *GetRangeRequest) (interface{}, error) {
return c.GetRange(ctx, GetRangeRequestToGRPCMessage(request))
},
streamerConstructor: func(i interface{}) (GetRangeObjectStreamer, error) {
cli, ok := i.(object.ObjectService_GetRangeClient)
if !ok {
return nil, errors.New("can't convert interface to grpc get getClient")
}
return &getRangeObjectGRPCStream{
recv: func() (*GetRangeResponse, error) {
resp, err := cli.Recv()
if err != nil {
return nil, err
}
return GetRangeResponseFromGRPCMessage(resp), nil
},
}, nil
},
}
return cli
}
func newGRPCGetRangeHashClient(c *object.Client) *getRangeHashObjectClient {
return &getRangeHashObjectClient{
requestConverter: func(req *GetRangeHashRequest) interface{} {
return GetRangeHashRequestToGRPCMessage(req)
},
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
return c.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
responseConverter: func(resp interface{}) *GetRangeHashResponse {
return GetRangeHashResponseFromGRPCMessage(resp.(*object.GetRangeHashResponse))
},
}
}
func WithGlobalOpts(v ...client.Option) Option {
return func(c *cfg) {
if len(v) > 0 {
c.globalOpts = v
}
}
}
func WithGRPCServiceClient(v object.ObjectServiceClient) Option {
return func(c *cfg) {
c.gRPC.serviceClient = v
}
}
func WithGRPCCallOpts(v []grpc.CallOption) Option {
return func(c *cfg) {
c.gRPC.grpcCallOpts = v
}
}
func WithGRPCClientOpts(v []object.Option) Option {
return func(c *cfg) {
c.gRPC.callOpts = v
}
}
func WithGRPCClient(v *object.Client) Option {
return func(c *cfg) {
c.gRPC.client = v
}
}