From b6720d5f97b4108bca35290c6fb4bbb3f4eec66c Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 11 Mar 2022 18:24:11 +0300 Subject: [PATCH] [#1231] Update new SDK Client interface Signed-off-by: Alex Vanin --- cmd/neofs-cli/modules/control.go | 64 +++++++++++++++++++------- cmd/neofs-cli/modules/dump.go | 7 ++- cmd/neofs-cli/modules/restore.go | 7 ++- cmd/neofs-cli/modules/root.go | 21 +++++---- cmd/neofs-node/config.go | 9 ++-- go.mod | 2 +- go.sum | 5 +- pkg/core/client/client.go | 11 ++--- pkg/innerring/rpc.go | 2 +- pkg/network/cache/client.go | 23 +++++---- pkg/network/cache/multi.go | 47 ++++++++++++------- pkg/services/object/get/v2/util.go | 28 +++++++---- pkg/services/object/put/v2/streamer.go | 6 ++- pkg/services/object/search/v2/util.go | 8 +++- 14 files changed, 158 insertions(+), 82 deletions(-) diff --git a/cmd/neofs-cli/modules/control.go b/cmd/neofs-cli/modules/control.go index 3fac04b2c..9a0e0aa25 100644 --- a/cmd/neofs-cli/modules/control.go +++ b/cmd/neofs-cli/modules/control.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/mr-tron/base58" + rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/services/control" ircontrol "github.com/nspcc-dev/neofs-node/pkg/services/control/ir" ircontrolsrv "github.com/nspcc-dev/neofs-node/pkg/services/control/ir/server" @@ -223,7 +224,11 @@ func healthCheck(cmd *cobra.Command, _ []string) { err = controlSvc.SignMessage(key, req) exitOnErr(cmd, errf("could not sign message: %w", err)) - resp, err := control.HealthCheck(cli.Raw(), req) + var resp *control.HealthCheckResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.HealthCheck(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -248,7 +253,11 @@ func healthCheckIR(cmd *cobra.Command, key *ecdsa.PrivateKey, c *client.Client) err := ircontrolsrv.SignMessage(key, req) exitOnErr(cmd, errf("could not sign request: %w", err)) - resp, err := ircontrol.HealthCheck(c.Raw(), req) + var resp *ircontrol.HealthCheckResponse + err = c.ExecRaw(func(client *rawclient.Client) error { + resp, err = ircontrol.HealthCheck(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -294,7 +303,11 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) { cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.SetNetmapStatus(cli.Raw(), req) + var resp *control.SetNetmapStatusResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.SetNetmapStatus(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -351,7 +364,11 @@ var dropObjectsCmd = &cobra.Command{ cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.DropObjects(cli.Raw(), req) + var resp *control.DropObjectsResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.DropObjects(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -385,7 +402,11 @@ var snapshotCmd = &cobra.Command{ cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.NetmapSnapshot(cli.Raw(), req) + var resp *control.NetmapSnapshotResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.NetmapSnapshot(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -415,7 +436,11 @@ func listShards(cmd *cobra.Command, _ []string) { cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.ListShards(cli.Raw(), req) + var resp *control.ListShardsResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.ListShards(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() @@ -433,26 +458,27 @@ func listShards(cmd *cobra.Command, _ []string) { // getControlSDKClient is the same getSDKClient but with // another RPC endpoint flag. func getControlSDKClient(key *ecdsa.PrivateKey) (*client.Client, error) { + var ( + c client.Client + prmInit client.PrmInit + prmDial client.PrmDial + ) + netAddr, err := getEndpointAddress(controlRPC) if err != nil { return nil, err } - options := []client.Option{ - client.WithAddress(netAddr.HostAddr()), - client.WithDefaultPrivateKey(key), - } + prmInit.SetDefaultPrivateKey(*key) + prmDial.SetServerURI(netAddr.HostAddr()) if netAddr.TLSEnabled() { - options = append(options, client.WithTLSConfig(&tls.Config{})) + prmDial.SetTLSConfig(&tls.Config{}) } - c, err := client.New(options...) - if err != nil { - return nil, fmt.Errorf("coult not init api client:%w", err) - } + c.Init(prmInit) - return c, err + return &c, c.Dial(prmDial) } func prettyPrintShards(cmd *cobra.Command, ii []*control.ShardInfo) { @@ -522,7 +548,11 @@ func setShardMode(cmd *cobra.Command, _ []string) { cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.SetShardMode(cli.Raw(), req) + var resp *control.SetShardModeResponse + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.SetShardMode(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() diff --git a/cmd/neofs-cli/modules/dump.go b/cmd/neofs-cli/modules/dump.go index 9739c9b77..4c9ee704d 100644 --- a/cmd/neofs-cli/modules/dump.go +++ b/cmd/neofs-cli/modules/dump.go @@ -2,6 +2,7 @@ package cmd import ( "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/services/control" controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" "github.com/nspcc-dev/neofs-sdk-go/util/signature" @@ -45,7 +46,11 @@ func dumpShard(cmd *cobra.Command, _ []string) { cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.DumpShard(cli.Raw(), req) + var resp *control.DumpShardResponse + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.DumpShard(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() diff --git a/cmd/neofs-cli/modules/restore.go b/cmd/neofs-cli/modules/restore.go index 992aa8ef7..f201c2921 100644 --- a/cmd/neofs-cli/modules/restore.go +++ b/cmd/neofs-cli/modules/restore.go @@ -2,6 +2,7 @@ package cmd import ( "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/services/control" controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" "github.com/nspcc-dev/neofs-sdk-go/util/signature" @@ -45,7 +46,11 @@ func restoreShard(cmd *cobra.Command, _ []string) { cli, err := getControlSDKClient(key) exitOnErr(cmd, err) - resp, err := control.RestoreShard(cli.Raw(), req) + var resp *control.RestoreShardResponse + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.RestoreShard(client, req) + return err + }) exitOnErr(cmd, errf("rpc error: %w", err)) sign := resp.GetSignature() diff --git a/cmd/neofs-cli/modules/root.go b/cmd/neofs-cli/modules/root.go index 801513815..0978ccf22 100644 --- a/cmd/neofs-cli/modules/root.go +++ b/cmd/neofs-cli/modules/root.go @@ -339,27 +339,32 @@ func prepareBearerPrm(cmd *cobra.Command, prm bearerPrm) { // getSDKClient returns default neofs-api-go sdk client. Consider using // opts... to provide TTL or other global configuration flags. func getSDKClient(key *ecdsa.PrivateKey) (*client.Client, error) { + var ( + c client.Client + prmInit client.PrmInit + prmDial client.PrmDial + ) + netAddr, err := getEndpointAddress(rpc) if err != nil { return nil, err } - options := []client.Option{ - client.WithAddress(netAddr.HostAddr()), - client.WithDefaultPrivateKey(key), - client.WithNeoFSErrorParsing(), - } + prmInit.SetDefaultPrivateKey(*key) + prmInit.ResolveNeoFSFailures() + prmDial.SetServerURI(netAddr.HostAddr()) if netAddr.TLSEnabled() { - options = append(options, client.WithTLSConfig(&tls.Config{})) + prmDial.SetTLSConfig(&tls.Config{}) } - c, err := client.New(options...) + c.Init(prmInit) + err = c.Dial(prmDial) if err != nil { return nil, fmt.Errorf("coult not init api client:%w", err) } - return c, err + return &c, err } func getTTL() uint32 { diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index de7618f39..30decf04b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -43,7 +43,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/state" - apiclient "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/version" @@ -302,10 +301,10 @@ func initCfg(path string) *cfg { scriptHash: contractsconfig.Reputation(appCfg), workerPool: reputationWorkerPool, }, - clientCache: cache.NewSDKClientCache( - apiclient.WithDialTimeout(apiclientconfig.DialTimeout(appCfg)), - apiclient.WithDefaultPrivateKey(&key.PrivateKey), - ), + clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{ + DialTimeout: apiclientconfig.DialTimeout(appCfg), + Key: &key.PrivateKey, + }), persistate: persistate, ownerIDFromKey: ownerIDFromKey, diff --git a/go.mod b/go.mod index 124854498..14d51c055 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/nspcc-dev/neo-go v0.98.0 github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 github.com/nspcc-dev/neofs-contract v0.14.2 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2 + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2.0.20220314100449-68b0440c626f github.com/nspcc-dev/tzhash v1.5.2 github.com/panjf2000/ants/v2 v2.4.0 github.com/paulmach/orb v0.2.2 diff --git a/go.sum b/go.sum index c3cccf6d2..65e0e5ff0 100644 --- a/go.sum +++ b/go.sum @@ -380,7 +380,6 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1: github.com/nspcc-dev/neo-go v0.98.0 h1:yyW4sgY88/pLf0949qmgfkQXzRKC3CI/WyhqXNnwMd8= github.com/nspcc-dev/neo-go v0.98.0/go.mod h1:E3cc1x6RXSXrJb2nDWXTXjnXk3rIqVN8YdFyWv+FrqM= github.com/nspcc-dev/neofs-api-go/v2 v2.11.0-pre.0.20211201134523-3604d96f3fe1/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= -github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220302134950-d065453bd0a7/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 h1:xWqXzorDk9WFMTtWP7cwwlyJDL1X6Z4HT1e5zqkq7xY= github.com/nspcc-dev/neofs-api-go/v2 v2.12.0/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= github.com/nspcc-dev/neofs-contract v0.14.2 h1:m3Wx5LO9QMKt0w7iVVqrsU4SPs67RfTCS6QSyhqCkCA= @@ -390,8 +389,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2 h1:m+em1eyrYFIGUdzs2asDCJH0GVWH+9rYdjLTO42mxSY= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2/go.mod h1:bkVH6yZXH5/F2Sut20SDOlQbveBlaVJ0vOX20tLGnZw= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2.0.20220314100449-68b0440c626f h1:K6hUTDRS71wE7HpSVaTaMwHG5J6oZd842GERw/J54Ak= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.2.0.20220314100449-68b0440c626f/go.mod h1:/WV31AQHs6YLTjMgMjMZw8Z3/Q7b6kMjNgJVsRab5AU= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 9843bfb37..4eafebfdc 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -2,7 +2,6 @@ package client import ( "context" - "io" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -13,7 +12,6 @@ import ( // node's client. type Client interface { ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error) - ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error) ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) @@ -21,13 +19,10 @@ type Client interface { ObjectSearchInit(context.Context, client.PrmObjectSearch) (*client.ObjectListReader, error) ObjectRangeInit(context.Context, client.PrmObjectRange) (*client.ObjectRangeReader, error) ObjectHash(context.Context, client.PrmObjectHash) (*client.ResObjectHash, error) - AnnounceLocalTrust(context.Context, client.PrmAnnounceLocalTrust) (*client.ResAnnounceLocalTrust, error) AnnounceIntermediateTrust(context.Context, client.PrmAnnounceIntermediateTrust) (*client.ResAnnounceIntermediateTrust, error) - - Raw() *rawclient.Client - - Conn() io.Closer + ExecRaw(f func(client *rawclient.Client) error) error + Close() error } // MultiAddressClient is an interface of the @@ -37,7 +32,7 @@ type MultiAddressClient interface { // RawForAddress must return rawclient.Client // for the passed network.Address. - RawForAddress(network.Address) *rawclient.Client + RawForAddress(network.Address, func(cli *rawclient.Client) error) error } // NodeInfo groups information about NeoFS storage node needed for Client construction. diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index c4332ddb7..b553ab089 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -44,7 +44,7 @@ type ( func newClientCache(p *clientCacheParams) *ClientCache { return &ClientCache{ log: p.Log, - cache: cache.NewSDKClientCache(), + cache: cache.NewSDKClientCache(cache.ClientCacheOpts{}), key: p.Key, sgTimeout: p.SGTimeout, headTimeout: p.HeadTimeout, diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index e39778ed5..67ede4be0 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -1,8 +1,10 @@ package cache import ( + "crypto/ecdsa" "encoding/hex" "sync" + "time" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -15,13 +17,19 @@ type ( ClientCache struct { mu *sync.RWMutex clients map[string]clientcore.Client - opts []client.Option + opts ClientCacheOpts + } + + ClientCacheOpts struct { + DialTimeout time.Duration + Key *ecdsa.PrivateKey + ResponseCallback func(client.ResponseMetaInfo) error } ) // NewSDKClientCache creates instance of client cache. // `opts` are used for new client creation. -func NewSDKClientCache(opts ...client.Option) *ClientCache { +func NewSDKClientCache(opts ClientCacheOpts) *ClientCache { return &ClientCache{ mu: new(sync.RWMutex), clients: make(map[string]clientcore.Client), @@ -62,9 +70,9 @@ func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) { return cli, nil } - cli := newMultiClient(netAddr, append(c.opts, - client.WithResponseInfoHandler(clientcore.AssertKeyResponseCallback(info.PublicKey())), - )) + newClientOpts := c.opts + newClientOpts.ResponseCallback = clientcore.AssertKeyResponseCallback(info.PublicKey()) + cli := newMultiClient(netAddr, newClientOpts) c.clients[cacheKey] = cli @@ -79,10 +87,7 @@ func (c *ClientCache) CloseAll() { { for _, cl := range c.clients { - con := cl.Conn() - if con != nil { - _ = con.Close() - } + _ = cl.Close() } } diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 294434b01..f08e246f3 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "errors" - "io" "sync" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" @@ -20,10 +19,10 @@ type multiClient struct { addr network.AddressGroup - opts []client.Option + opts ClientCacheOpts } -func newMultiClient(addr network.AddressGroup, opts []client.Option) *multiClient { +func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClient { return &multiClient{ clients: make(map[string]clientcore.Client), addr: addr, @@ -33,21 +32,39 @@ func newMultiClient(addr network.AddressGroup, opts []client.Option) *multiClien // note: must be wrapped into mutex lock. func (x *multiClient) createForAddress(addr network.Address) clientcore.Client { - opts := append(x.opts, client.WithAddress(addr.HostAddr())) + var ( + c client.Client + prmInit client.PrmInit + prmDial client.PrmDial + ) + prmDial.SetServerURI(addr.HostAddr()) if addr.TLSEnabled() { - opts = append(opts, client.WithTLSConfig(&tls.Config{})) + prmDial.SetTLSConfig(&tls.Config{}) } - c, err := client.New(opts...) + if x.opts.Key != nil { + prmInit.SetDefaultPrivateKey(*x.opts.Key) + } + + if x.opts.DialTimeout > 0 { + prmDial.SetTimeout(x.opts.DialTimeout) + } + + if x.opts.ResponseCallback != nil { + prmInit.SetResponseInfoCallback(x.opts.ResponseCallback) + } + + c.Init(prmInit) + err := c.Dial(prmDial) if err != nil { // client never returns an error panic(err) } - x.clients[addr.String()] = c + x.clients[addr.String()] = &c - return c + return &c } func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Client) error) error { @@ -169,12 +186,8 @@ func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, prm client. return } -func (x *multiClient) Raw() *rawclient.Client { - panic("multiClient.Raw() must not be called") -} - -func (x *multiClient) Conn() io.Closer { - return x +func (x *multiClient) ExecRaw(f func(client *rawclient.Client) error) error { + panic("multiClient.ExecRaw() must not be called") } func (x *multiClient) Close() error { @@ -182,7 +195,7 @@ func (x *multiClient) Close() error { { for _, c := range x.clients { - _ = c.Conn().Close() + _ = c.Close() } } @@ -191,8 +204,8 @@ func (x *multiClient) Close() error { return nil } -func (x *multiClient) RawForAddress(addr network.Address) *rawclient.Client { - return x.client(addr).Raw() +func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclient.Client) error) error { + return x.client(addr).ExecRaw(f) } func (x *multiClient) client(addr network.Address) clientcore.Client { diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 179e0cd93..36fa1e437 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -77,7 +77,11 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - stream, err := rpc.GetObject(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) + var getStream *rpc.GetResponseReader + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context())) + return err + }) if err != nil { return nil, fmt.Errorf("stream opening failed: %w", err) } @@ -91,7 +95,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre for { // receive message from server stream - err := stream.Read(resp) + err := getStream.Read(resp) if err != nil { if errors.Is(err, io.EOF) { if !headWas { @@ -202,7 +206,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - stream, err := rpc.GetObjectRange(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) + var rangeStream *rpc.ObjectRangeResponseReader + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context())) + return err + }) if err != nil { return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } @@ -213,7 +221,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get for { // receive message from server stream - err := stream.Read(resp) + err := rangeStream.Read(resp) if err != nil { if errors.Is(err, io.EOF) { break @@ -362,18 +370,22 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp // perhaps it is worth highlighting the utility function in neofs-api-go // send Head request - resp, err := rpc.HeadObject(c.RawForAddress(addr), req, rpcclient.WithContext(ctx)) + var headResp *objectV2.HeadResponse + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) + return err + }) if err != nil { return nil, fmt.Errorf("sending the request failed: %w", err) } // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { return nil, err } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + if err := signature.VerifyServiceMessage(headResp); err != nil { return nil, fmt.Errorf("response verification failed: %w", err) } @@ -382,7 +394,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp idSig *refs.Signature ) - switch v := resp.GetBody().GetHeaderPart().(type) { + switch v := headResp.GetBody().GetHeaderPart().(type) { case nil: return nil, fmt.Errorf("unexpected header type %T", v) case *objectV2.ShortHeader: diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index ec1d263dd..8e2e02c2d 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/rpc" + rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-node/pkg/core/client" @@ -140,7 +141,10 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien var stream *rpc.PutRequestWriter - stream, err = rpc.PutObject(c.RawForAddress(addr), resp) + err = c.RawForAddress(addr, func(cli *rawclient.Client) error { + stream, err = rpc.PutObject(cli, resp) + return err + }) if err != nil { err = fmt.Errorf("stream opening failed: %w", err) return diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 2a1b4b978..f17672957 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -65,7 +65,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } - stream, err := rpc.SearchObjects(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) + var searchStream *rpc.SearchResponseReader + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + searchStream, err = rpc.SearchObjects(cli, req, rpcclient.WithContext(stream.Context())) + return err + }) if err != nil { return nil, err } @@ -79,7 +83,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre for { // receive message from server stream - err := stream.Read(resp) + err := searchStream.Read(resp) if err != nil { if errors.Is(err, io.EOF) { break