From 22dad0573d66a47b75cf3ed4e75fdfef866494f2 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 7 Mar 2022 14:39:49 +0300 Subject: [PATCH] [#164] client: Refactor and document package functionality Get rid of `Option` pattern. Define `Init`, `Dial` and `Close` methods for the corresponding stages of use. Signed-off-by: Leonard Lyubich --- client/accounting.go | 2 +- client/client.go | 177 +++++++++++++++++++++++++++++++++++----- client/common.go | 25 +++++- client/container.go | 20 ++--- client/doc.go | 85 +++++++++++++++++++ client/netmap.go | 4 +- client/object_delete.go | 2 +- client/object_get.go | 44 +--------- client/object_hash.go | 2 +- client/object_put.go | 2 +- client/object_search.go | 2 +- client/opts.go | 130 ----------------------------- client/raw.go | 17 ---- client/reputation.go | 4 +- client/response.go | 8 -- client/session.go | 4 +- pool/pool.go | 31 ++++--- pool/pool_test.go | 22 ++--- 18 files changed, 316 insertions(+), 265 deletions(-) create mode 100644 client/doc.go delete mode 100644 client/opts.go delete mode 100644 client/raw.go diff --git a/client/accounting.go b/client/accounting.go index a4cffc5..f29b711 100644 --- a/client/accounting.go +++ b/client/accounting.go @@ -88,7 +88,7 @@ func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (*ResBalance cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.Balance(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.Balance(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2accounting.BalanceResponse) diff --git a/client/client.go b/client/client.go index d4320d3..04d8104 100644 --- a/client/client.go +++ b/client/client.go @@ -1,19 +1,30 @@ package client import ( + "crypto/ecdsa" + "crypto/tls" + "time" + + v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" + "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" ) -// Client is a wrapper over raw NeoFS API client. +// Client represents virtual connection to the NeoFS network to communicate +// with NeoFS server using NeoFS API protocol. It is designed to provide +// an abstraction interface from the protocol details of data transfer over +// a network in NeoFS. // -// It is not allowed to override client's behaviour: -// the parameters for the all operations are write-only -// and the results of the all operations are read-only. -// -// Working client must be created via constructor New. -// Using the Client that has been created with new(Client) -// expression (or just declaring a Client variable) is unsafe -// and can lead to panic. +// Client can be created using simple Go variable declaration. Before starting +// work with the Client, it SHOULD BE correctly initialized (see Init method). +// Before executing the NeoFS operations using the Client, connection to the +// server MUST BE correctly established (see Dial method and pay attention +// to the mandatory parameters). Using the Client before connecting have +// been established can lead to a panic. After the work, the Client SHOULD BE +// closed (see Close method): it frees internal and system resources which were +// allocated for the period of work of the Client. Calling Init/Dial/Close method +// during the communication process step strongly discouraged as it leads to +// undefined behavior. // // Each method which produces a NeoFS API call may return a server response. // Status responses are returned in the result structure, and can be cast @@ -25,25 +36,147 @@ import ( // returned from all of them (pay attention to the presence of the pointer sign): // - *apistatus.ServerInternal on internal server error; // - *apistatus.SuccessDefaultV2 on default success. +// +// Client MUST NOT be copied by value: use pointer to Client instead. +// +// See client package overview to get some examples. type Client struct { - raw *client.Client + prm PrmInit - opts *clientOptions + c client.Client } -// New creates, initializes and returns the Client instance. +// Init brings the Client instance to its initial state. // -// If multiple options of the same config value are supplied, -// the option with the highest index in the arguments will be used. -func New(opts ...Option) (*Client, error) { - clientOptions := defaultClientOptions() +// One-time method call during application init stage (before Dial) is expected. +// Calling multiple times leads to undefined behavior. +// +// See docs of PrmInit methods for details. See also Dial / Close. +func (c *Client) Init(prm PrmInit) { + c.prm = prm +} - for i := range opts { - opts[i](clientOptions) +// Dial establishes a connection to the server from the NeoFS network. +// Returns an error describing failure reason. If failed, the Client +// SHOULD NOT be used. +// +// Panics if required parameters are set incorrectly, look carefully +// at the method documentation. +// +// One-time method call during application start-up stage (after Init ) is expected. +// Calling multiple times leads to undefined behavior. +// +// See also Init / Close. +func (c *Client) Dial(prm PrmDial) error { + if prm.endpoint == "" { + panic("server address is unset or empty") } - return &Client{ - opts: clientOptions, - raw: client.New(clientOptions.rawOpts...), - }, nil + if prm.timeoutDialSet { + if prm.timeoutDial <= 0 { + panic("non-positive timeout") + } + } else { + prm.timeoutDial = 5 * time.Second + } + + c.c = *client.New(append( + client.WithNetworkURIAddress(prm.endpoint, prm.tlsConfig), + client.WithDialTimeout(prm.timeoutDial), + )...) + + // TODO: (neofs-api-go#382) perform generic dial stage of the client.Client + _, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest)) + + return err +} + +// Close closes underlying connection to the NeoFS server. Implements io.Closer. +// MUST NOT be called before successful Dial. Can be called concurrently +// with server operations processing on running goroutines: in this case +// they are likely to fail due to a connection error. +// +// One-time method call during application shutdown stage (after Init and Dial) +// is expected. Calling multiple times leads to undefined behavior. +// +// See also Init / Dial. +func (c *Client) Close() error { + return c.c.Conn().Close() +} + +// PrmInit groups initialization parameters of Client instances. +// +// See also Init. +type PrmInit struct { + resolveNeoFSErrors bool + + key ecdsa.PrivateKey + + cbRespInfo func(ResponseMetaInfo) error + + netMagic uint64 +} + +// SetDefaultPrivateKey sets Client private key to be used for the protocol +// communication by default. +// +// Required for operations without custom key parametrization (see corresponding Prm* docs). +func (x *PrmInit) SetDefaultPrivateKey(key ecdsa.PrivateKey) { + x.key = key +} + +// ResolveNeoFSFailures makes the Client to resolve failure statuses of the +// NeoFS protocol into Go built-in errors. These errors are returned from +// each protocol operation. By default, statuses aren't resolved and written +// to the resulting structure (see corresponding Res* docs). +func (x *PrmInit) ResolveNeoFSFailures() { + x.resolveNeoFSErrors = true +} + +// SetResponseInfoCallback makes the Client to pass ResponseMetaInfo from each +// NeoFS server response to f. Nil (default) means ignore response meta info. +func (x *PrmInit) SetResponseInfoCallback(f func(ResponseMetaInfo) error) { + x.cbRespInfo = f +} + +// PrmDial groups connection parameters for the Client. +// +// See also Dial. +type PrmDial struct { + endpoint string + + tlsConfig *tls.Config + + timeoutDialSet bool + timeoutDial time.Duration +} + +// SetServerURI sets server URI in the NeoFS network. +// Required parameter. +// +// Format of the URI: +// [scheme://]host:port +// +// Supported schemes: +// grpc +// grpcs +// +// See also SetTLSConfig. +func (x *PrmDial) SetServerURI(endpoint string) { + x.endpoint = endpoint +} + +// SetTLSConfig sets tls.Config to open TLS client connection +// to the NeoFS server. Nil (default) means insecure connection. +// +// See also SetServerURI. +func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) { + x.tlsConfig = tlsConfig +} + +// SetTimeout sets the timeout for connection to be established. +// MUST BE positive. If not called, 5s timeout will be used by default. +func (x *PrmDial) SetTimeout(timeout time.Duration) { + x.timeoutDialSet = true + x.timeoutDial = timeout } diff --git a/client/common.go b/client/common.go index f5d454f..56de0a3 100644 --- a/client/common.go +++ b/client/common.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "fmt" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -303,13 +304,29 @@ func (x *contextCall) processCall() bool { // initializes static cross-call parameters inherited from client. func (c *Client) initCallContext(ctx *contextCall) { - ctx.key = *c.opts.key + ctx.key = c.prm.key c.initCallContextWithoutKey(ctx) } // initializes static cross-call parameters inherited from client except private key. func (c *Client) initCallContextWithoutKey(ctx *contextCall) { - ctx.resolveAPIFailures = c.opts.parseNeoFSErrors - ctx.callbackResp = c.opts.cbRespInfo - ctx.netMagic = c.opts.netMagic + ctx.resolveAPIFailures = c.prm.resolveNeoFSErrors + ctx.callbackResp = c.prm.cbRespInfo + ctx.netMagic = c.prm.netMagic +} + +// ExecRaw executes f with underlying github.com/nspcc-dev/neofs-api-go/v2/rpc/client.Client +// instance. Communicate over the Protocol Buffers protocol in a more flexible way: +// most often used to transmit data over a fixed version of the NeoFS protocol, as well +// as to support custom services. +// +// The f must not manipulate the client connection passed into it. +// +// Like all other operations, must be called after connecting to the server and +// before closing the connection. +// +// See also Dial and Close. +// See also github.com/nspcc-dev/neofs-api-go/v2/rpc/client package docs. +func (c *Client) ExecRaw(f func(client *client.Client) error) error { + return f(&c.c) } diff --git a/client/container.go b/client/container.go index b64979f..2cea90b 100644 --- a/client/container.go +++ b/client/container.go @@ -88,7 +88,7 @@ func (c *Client) ContainerPut(ctx context.Context, prm PrmContainerPut) (*ResCon // sign container signWrapper := v2signature.StableMarshalerWrapper{SM: reqBody.GetContainer()} - sig, err := sigutil.SignData(c.opts.key, signWrapper, sigutil.SignWithRFC6979()) + sig, err := sigutil.SignData(&c.prm.key, signWrapper, sigutil.SignWithRFC6979()) if err != nil { return nil, err } @@ -117,7 +117,7 @@ func (c *Client) ContainerPut(ctx context.Context, prm PrmContainerPut) (*ResCon cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.PutContainer(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.PutContainer(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.PutResponse) @@ -208,7 +208,7 @@ func (c *Client) ContainerGet(ctx context.Context, prm PrmContainerGet) (*ResCon cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.GetContainer(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.GetContainer(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.GetResponse) @@ -314,7 +314,7 @@ func (c *Client) ContainerList(ctx context.Context, prm PrmContainerList) (*ResC cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.ListContainers(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.ListContainers(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.ListResponse) @@ -407,7 +407,7 @@ func (c *Client) ContainerDelete(ctx context.Context, prm PrmContainerDelete) (* signWrapper := delContainerSignWrapper{body: reqBody} // sign container - sig, err := sigutil.SignData(c.opts.key, signWrapper, sigutil.SignWithRFC6979()) + sig, err := sigutil.SignData(&c.prm.key, signWrapper, sigutil.SignWithRFC6979()) if err != nil { return nil, err } @@ -437,7 +437,7 @@ func (c *Client) ContainerDelete(ctx context.Context, prm PrmContainerDelete) (* cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.DeleteContainer(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.DeleteContainer(&c.c, &req, client.WithContext(ctx)) } // process call @@ -525,7 +525,7 @@ func (c *Client) ContainerEACL(ctx context.Context, prm PrmContainerEACL) (*ResC cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.GetEACL(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.GetEACL(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.GetExtendedACLResponse) @@ -607,7 +607,7 @@ func (c *Client) ContainerSetEACL(ctx context.Context, prm PrmContainerSetEACL) // sign the eACL table signWrapper := v2signature.StableMarshalerWrapper{SM: reqBody.GetEACL()} - sig, err := sigutil.SignData(c.opts.key, signWrapper, sigutil.SignWithRFC6979()) + sig, err := sigutil.SignData(&c.prm.key, signWrapper, sigutil.SignWithRFC6979()) if err != nil { return nil, err } @@ -636,7 +636,7 @@ func (c *Client) ContainerSetEACL(ctx context.Context, prm PrmContainerSetEACL) cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.SetEACL(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.SetEACL(&c.c, &req, client.WithContext(ctx)) } // process call @@ -721,7 +721,7 @@ func (c *Client) ContainerAnnounceUsedSpace(ctx context.Context, prm PrmAnnounce cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.AnnounceUsedSpace(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.AnnounceUsedSpace(&c.c, &req, client.WithContext(ctx)) } // process call diff --git a/client/doc.go b/client/doc.go new file mode 100644 index 0000000..b6e7ccc --- /dev/null +++ b/client/doc.go @@ -0,0 +1,85 @@ +/* +Package client provides NeoFS API client implementation. + +The main component is Client type. It is a virtual connection to the network +and provides methods for executing operations on the server. + +Create client instance: + var c client.Client + +Initialize client state: + var prm client.PrmInit + prm.SetDefaultPrivateKey(key) + // ... + + c.Init(prm) + +Connect to the NeoFS server: + var prm client.PrmDial + prm.SetServerURI("localhost:8080") + prm.SetDefaultPrivateKey(key) + // ... + + err := c.Dial(prm) + // ... + +Execute NeoFS operation on the server: + var prm client.PrmContainerPut + prm.SetContainer(cnr) + // ... + + res, err := c.ContainerPut(context.Background(), prm) + err := c.Dial(dialPrm) + if err == nil { + err = apistatus.ErrFromStatus(res.Status()) + } + // ... + +Consume custom service of the server: + syntax = "proto3"; + + service CustomService { + rpc CustomRPC(CustomRPCRequest) returns (CustomRPCResponse); + } + + import "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + import "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" + + req := new(CustomRPCRequest) + // ... + resp := new(CustomRPCResponse) + + err := c.ExecRaw(func(c *client.Client) error { + return client.SendUnary(c, common.CallMethodInfo{ + Service: "CustomService", + Name: "CustomRPC", + }, req, resp) + }) + // ... + +Close the connection: + err := c.Close() + // ... + +Note that it's not allowed to override Client behaviour directly: the parameters +for the all operations are write-only and the results of the all operations are +read-only. To be able to override client behavior (e.g. for tests), abstract it +with an interface: + import "github.com/nspcc-dev/neofs-sdk-go/client" + + type NeoFSClient interface { + // Operations according to the application needs + CreateContainer(context.Context, container.Container) error + // ... + } + + type client struct { + c *client.Client + } + + func (x *client) CreateContainer(context.Context, container.Container) error { + // ... + } + +*/ +package client diff --git a/client/netmap.go b/client/netmap.go index 58c784f..86384e5 100644 --- a/client/netmap.go +++ b/client/netmap.go @@ -84,7 +84,7 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.LocalNodeInfo(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.LocalNodeInfo(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2netmap.LocalNodeInfoResponse) @@ -162,7 +162,7 @@ func (c *Client) NetworkInfo(ctx context.Context, prm PrmNetworkInfo) (*ResNetwo cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.NetworkInfo(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.NetworkInfo(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2netmap.NetworkInfoResponse) diff --git a/client/object_delete.go b/client/object_delete.go index d89d2e7..9b27895 100644 --- a/client/object_delete.go +++ b/client/object_delete.go @@ -152,7 +152,7 @@ func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObj cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.DeleteObject(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { res.idTomb = r.(*v2object.DeleteResponse).GetBody().GetTombstone().GetObjectID() diff --git a/client/object_get.go b/client/object_get.go index e16f384..2fc0542 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -356,7 +356,7 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe r.ctxCall.wReq = func() error { var err error - stream, err = rpcapi.GetObject(c.Raw(), &req, client.WithContext(ctx)) + stream, err = rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) if err != nil { return fmt.Errorf("open stream: %w", err) } @@ -370,44 +370,6 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe return &r, nil } -// GetFullObject reads object header and full payload to the memory and returns -// the result. -// -// Function behavior equivalent to Client.ObjectGetInit, see documentation for details. -func GetFullObject(ctx context.Context, c *Client, idCnr cid.ID, idObj oid.ID) (*object.Object, error) { - var prm PrmObjectGet - - prm.FromContainer(idCnr) - prm.ByID(idObj) - - rdr, err := c.ObjectGetInit(ctx, prm) - if err != nil { - return nil, fmt.Errorf("init object reading: %w", err) - } - - var obj object.Object - - if rdr.ReadHeader(&obj) { - payload, err := io.ReadAll(rdr) - if err != nil { - return nil, fmt.Errorf("read payload: %w", err) - } - - obj.SetPayload(payload) - } - - res, err := rdr.Close() - if err == nil { - err = apistatus.ErrFromStatus(res.Status()) - } - - if err != nil { - return nil, fmt.Errorf("finish object reading: %w", err) - } - - return &obj, nil -} - // PrmObjectHead groups parameters of ObjectHead operation. type PrmObjectHead struct { prmObjectRead @@ -525,7 +487,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.HeadObject(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { switch v := r.(*v2object.HeadResponse).GetBody().GetHeaderPart().(type) { @@ -799,7 +761,7 @@ func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*Obje r.ctxCall.wReq = func() error { var err error - stream, err = rpcapi.GetObjectRange(c.Raw(), &req, client.WithContext(ctx)) + stream, err = rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) if err != nil { return fmt.Errorf("open stream: %w", err) } diff --git a/client/object_hash.go b/client/object_hash.go index b764505..44b187c 100644 --- a/client/object_hash.go +++ b/client/object_hash.go @@ -179,7 +179,7 @@ func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectH cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.HashObjectRange(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { res.checksums = r.(*v2object.GetRangeHashResponse).GetBody().GetHashList() diff --git a/client/object_put.go b/client/object_put.go index 8daf8a5..3bdae0d 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -203,7 +203,7 @@ func (c *Client) ObjectPutInit(ctx context.Context, _ PrmObjectPutInit) (*Object ctx, w.cancelCtxStream = context.WithCancel(ctx) - stream, err := rpcapi.PutObject(c.Raw(), &res.resp, client.WithContext(ctx)) + stream, err := rpcapi.PutObject(&c.c, &res.resp, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("open stream: %w", err) } diff --git a/client/object_search.go b/client/object_search.go index 42236c6..7e93ccf 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -291,7 +291,7 @@ func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*Ob r.ctxCall.wReq = func() error { var err error - stream, err = rpcapi.SearchObjects(c.Raw(), &req, client.WithContext(ctx)) + stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx)) if err != nil { return fmt.Errorf("open stream: %w", err) } diff --git a/client/opts.go b/client/opts.go deleted file mode 100644 index 1da1c98..0000000 --- a/client/opts.go +++ /dev/null @@ -1,130 +0,0 @@ -package client - -import ( - "crypto/ecdsa" - "crypto/tls" - "time" - - "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" - "google.golang.org/grpc" -) - -type ( - Option func(*clientOptions) - - clientOptions struct { - key *ecdsa.PrivateKey - - rawOpts []client.Option - - cbRespInfo func(ResponseMetaInfo) error - - // defines if client parses erroneous NeoFS - // statuses and returns them as `error` - // - // default is false - parseNeoFSErrors bool - - netMagic uint64 - } -) - -func defaultClientOptions() *clientOptions { - return &clientOptions{ - rawOpts: make([]client.Option, 0, 4), - } -} - -// WithAddress returns option to specify -// network address of the remote server. -// -// Ignored if WithGRPCConnection is provided. -func WithAddress(addr string) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithNetworkAddress(addr)) - } -} - -// WithDialTimeout returns option to set connection timeout to the remote node. -// -// Ignored if WithGRPCConn is provided. -func WithDialTimeout(dur time.Duration) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithDialTimeout(dur)) - } -} - -// WithRWTimeout returns option to set timeout for single read and write -// operation on protobuf message. -func WithRWTimeout(dur time.Duration) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithRWTimeout(dur)) - } -} - -// WithTLSConfig returns option to set connection's TLS config to the remote node. -// -// Ignored if WithGRPCConnection is provided. -func WithTLSConfig(cfg *tls.Config) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithTLSCfg(cfg)) - } -} - -// WithDefaultPrivateKey returns option to set default private key -// used for the work. -func WithDefaultPrivateKey(key *ecdsa.PrivateKey) Option { - return func(opts *clientOptions) { - opts.key = key - } -} - -// WithURIAddress returns option to specify -// network address of a remote server and connection -// scheme for it. -// -// Format of the URI: -// -// [scheme://]host:port -// -// Supported schemes: -// - grpc; -// - grpcs. -// -// tls.Cfg second argument is optional and is taken into -// account only in case of `grpcs` scheme. -// -// Falls back to WithNetworkAddress if address is not a valid URI. -// -// Do not use along with WithAddress and WithTLSConfig. -// -// Ignored if WithGRPCConnection is provided. -func WithURIAddress(addr string, tlsCfg *tls.Config) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithNetworkURIAddress(addr, tlsCfg)...) - } -} - -// WithGRPCConnection returns option to set GRPC connection to -// the remote node. -func WithGRPCConnection(grpcConn *grpc.ClientConn) Option { - return func(opts *clientOptions) { - opts.rawOpts = append(opts.rawOpts, client.WithGRPCConn(grpcConn)) - } -} - -// WithNeoFSErrorParsing returns option that makes client parse -// erroneous NeoFS statuses and return them as `error` of the method -// call. -func WithNeoFSErrorParsing() Option { - return func(opts *clientOptions) { - opts.parseNeoFSErrors = true - } -} - -// WithNetworkMagic returns option to specify NeoFS network magic. -func WithNetworkMagic(magic uint64) Option { - return func(opts *clientOptions) { - opts.netMagic = magic - } -} diff --git a/client/raw.go b/client/raw.go deleted file mode 100644 index bc98065..0000000 --- a/client/raw.go +++ /dev/null @@ -1,17 +0,0 @@ -package client - -import ( - "io" - - "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" -) - -// Raw returns underlying raw protobuf client. -func (c *Client) Raw() *client.Client { - return c.raw -} - -// Conn implements Client.Conn method. -func (c *Client) Conn() io.Closer { - return c.raw.Conn() -} diff --git a/client/reputation.go b/client/reputation.go index d601362..a1b4b46 100644 --- a/client/reputation.go +++ b/client/reputation.go @@ -90,7 +90,7 @@ func (c *Client) AnnounceLocalTrust(ctx context.Context, prm PrmAnnounceLocalTru cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.AnnounceLocalTrust(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.AnnounceLocalTrust(&c.c, &req, client.WithContext(ctx)) } // process call @@ -185,7 +185,7 @@ func (c *Client) AnnounceIntermediateTrust(ctx context.Context, prm PrmAnnounceI cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.AnnounceIntermediateResult(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.AnnounceIntermediateResult(&c.c, &req, client.WithContext(ctx)) } // process call diff --git a/client/response.go b/client/response.go index ad1667c..7ec3594 100644 --- a/client/response.go +++ b/client/response.go @@ -18,11 +18,3 @@ type responseV2 interface { func (x ResponseMetaInfo) ResponderKey() []byte { return x.key } - -// WithResponseInfoHandler allows specifying handler of response meta information for the all Client operations. -// The handler is called right after the response is received. Client returns handler's error immediately. -func WithResponseInfoHandler(f func(ResponseMetaInfo) error) Option { - return func(opts *clientOptions) { - opts.cbRespInfo = f - } -} diff --git a/client/session.go b/client/session.go index 75f0b9d..ca490eb 100644 --- a/client/session.go +++ b/client/session.go @@ -71,7 +71,7 @@ func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResS panic(panicMsgMissingContext) } - ownerID := owner.NewIDFromPublicKey(&c.opts.key.PublicKey) + ownerID := owner.NewIDFromPublicKey(&c.prm.key.PublicKey) // form request body reqBody := new(v2session.CreateRequestBody) @@ -95,7 +95,7 @@ func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResS cc.req = &req cc.statusRes = &res cc.call = func() (responseV2, error) { - return rpcapi.CreateSession(c.Raw(), &req, client.WithContext(ctx)) + return rpcapi.CreateSession(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2session.CreateResponse) diff --git a/pool/pool.go b/pool/pool.go index 4a452e4..b283af6 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -60,7 +60,7 @@ type BuilderOptions struct { SessionTokenThreshold time.Duration SessionExpirationDuration uint64 nodesParams []*NodesParam - clientBuilder func(opts ...client.Option) (Client, error) + clientBuilder func(endpoint string) (Client, error) } type NodesParam struct { @@ -131,12 +131,6 @@ func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, er return options.nodesParams[i].priority < options.nodesParams[j].priority }) - if options.clientBuilder == nil { - options.clientBuilder = func(opts ...client.Option) (Client, error) { - return client.New(opts...) - } - } - return newPool(ctx, options) } @@ -279,13 +273,28 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { inner := make([]*innerPool, len(options.nodesParams)) var atLeastOneHealthy bool + if options.clientBuilder == nil { + options.clientBuilder = func(addr string) (Client, error) { + var c client.Client + + var prmInit client.PrmInit + prmInit.ResolveNeoFSFailures() + prmInit.SetDefaultPrivateKey(*options.Key) + + c.Init(prmInit) + + var prmDial client.PrmDial + prmDial.SetServerURI(addr) + prmDial.SetTimeout(options.NodeConnectionTimeout) + + return &c, c.Dial(prmDial) + } + } + for i, params := range options.nodesParams { clientPacks := make([]*clientPack, len(params.weights)) for j, addr := range params.addresses { - c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), - client.WithURIAddress(addr, nil), - client.WithDialTimeout(options.NodeConnectionTimeout), - client.WithNeoFSErrorParsing()) + c, err := options.clientBuilder(addr) if err != nil { return nil, err } diff --git a/pool/pool_test.go b/pool/pool_test.go index d1fbd4c..94d0ef8 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -24,7 +24,7 @@ import ( ) func TestBuildPoolClientFailed(t *testing.T) { - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { return nil, fmt.Errorf("error") } @@ -46,7 +46,7 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { ni := &netmap.NodeInfo{} ni.SetAddresses("addr1", "addr2") - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.ResEndpointInfo{}, nil).AnyTimes() @@ -83,7 +83,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { var expectedToken *session.Token clientCount := -1 - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { clientCount++ mockClient := NewMockClient(ctrl) mockInvokes := 0 @@ -154,7 +154,7 @@ func TestOneNode(t *testing.T) { require.NoError(t, err) tok.SetID(uid) - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(tok, nil) mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.ResEndpointInfo{}, nil).AnyTimes() @@ -185,7 +185,7 @@ func TestTwoNodes(t *testing.T) { ctrl := gomock.NewController(t) var tokens []*session.Token - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { tok := session.NewToken() @@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) { var tokens []*session.Token clientCount := -1 - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { clientCount++ mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { @@ -284,7 +284,7 @@ func TestTwoFailed(t *testing.T) { ctrl := gomock.NewController(t) - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() @@ -319,7 +319,7 @@ func TestSessionCache(t *testing.T) { ctrl := gomock.NewController(t) var tokens []*session.Token - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { tok := session.NewToken() @@ -382,7 +382,7 @@ func TestPriority(t *testing.T) { var tokens []*session.Token clientCount := -1 - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { clientCount++ mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { @@ -447,7 +447,7 @@ func TestSessionCacheWithKey(t *testing.T) { ctrl := gomock.NewController(t) var tokens []*session.Token - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { tok := session.NewToken() @@ -499,7 +499,7 @@ func newToken(t *testing.T) *session.Token { func TestSessionTokenOwner(t *testing.T) { ctrl := gomock.NewController(t) - clientBuilder := func(opts ...client.Option) (Client, error) { + clientBuilder := func(_ string) (Client, error) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&client.ResSessionCreate{}, nil).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.ResEndpointInfo{}, nil).AnyTimes()