From cc7287d6f7eeaec8742c8c58d57f7851f0cd3f6e Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sat, 13 Mar 2021 18:22:21 +0300 Subject: [PATCH] [#422] pkg/services: Cache clients by address only Signed-off-by: Evgenii Stratonikov --- cmd/neofs-node/container.go | 8 +++++-- go.mod | 2 +- go.sum | Bin 61613 -> 59867 bytes pkg/innerring/innerring.go | 1 + pkg/innerring/processors/audit/process.go | 2 +- pkg/innerring/processors/audit/processor.go | 6 ++++++ pkg/innerring/rpc.go | 14 +++++++++---- pkg/network/cache/client.go | 22 +++++--------------- pkg/services/object/get/exec.go | 4 ++-- pkg/services/object/get/get_test.go | 3 +-- pkg/services/object/get/service.go | 4 +--- pkg/services/object/get/util.go | 5 ++--- pkg/services/object/head/remote.go | 3 ++- pkg/services/object/put/remote.go | 3 ++- pkg/services/object/search/exec.go | 3 ++- pkg/services/object/search/search_test.go | 3 +-- pkg/services/object/search/service.go | 4 +--- pkg/services/object/search/util.go | 5 ++--- pkg/services/object/util/prm.go | 7 +++++++ 19 files changed, 53 insertions(+), 46 deletions(-) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 5d217f3c69..82fb6763a2 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -217,24 +217,27 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return nil, errors.Wrap(err, "could not convert address to IP format") } - c, err := r.clientCache.Get(r.key, ipAddr) + c, err := r.clientCache.Get(ipAddr) if err != nil { return nil, errors.Wrap(err, "could not initialize API client") } return &remoteLoadAnnounceWriterProvider{ client: c, + key: r.key, }, nil } type remoteLoadAnnounceWriterProvider struct { client *apiClient.Client + key *ecdsa.PrivateKey } func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { return &remoteLoadAnnounceWriter{ ctx: ctx, client: p.client, + key: p.key, }, nil } @@ -242,6 +245,7 @@ type remoteLoadAnnounceWriter struct { ctx context.Context client *apiClient.Client + key *ecdsa.PrivateKey buf []containerSDK.UsedSpaceAnnouncement } @@ -253,7 +257,7 @@ func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) err } func (r *remoteLoadAnnounceWriter) Close() error { - return r.client.AnnounceContainerUsedSpace(r.ctx, r.buf) + return r.client.AnnounceContainerUsedSpace(r.ctx, r.buf, apiClient.WithKey(r.key)) } type loadPlacementBuilder struct { diff --git a/go.mod b/go.mod index 2ea9ff4232..e434dda004 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.94.0-pre.0.20210301112733-3227de8050f8 - github.com/nspcc-dev/neofs-api-go v1.24.0 + github.com/nspcc-dev/neofs-api-go v1.24.1-0.20210312074010-64505180b400 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index b044e90fb010f7a497e37fda505894cbbd1d92b9..66151a54dc51f9f29ec8eda8a0aec90b50073e17 100644 GIT binary patch delta 241 zcmZ4ckoopy<_)||nWqmDQDT7z$wc<`MtoK&0a#ve3SKNb8kMbz|TFo7RWHv zv1gvVSAWaqBL)qOlNT7R+dR*B8`tI~cFByB52$l*E^wI0v^m0g1IOlgpGJkvzPX#3 z{S0*t^o$IQ3=ND8jSS393=9o)%}h)UObsmzl1vN?6fz90%uFK7O~TW?eezvXQ%bTz zeJq@O!hFNB3o6VV4JzCU^Ab~BOx$uKEWC>+Pb`$3>|4kyhu_%EQHAp)HhXokGHssU RlN_+Q;O=p@&0SBFc>wSbQBwc_ delta 1198 zcmXAoU8~y!9L8}+=JDms!On2=VjM+=61ODHTYG$gq)C&ed2f;?xiW3iBu$&NO_Q{_ z5%h2n^k(Ez@EZt95JB|L8@&<-${0g-?Tw#6bnOTD@q3%a=FJ#N{{bw zsJ0l)ysAPWy(n4|15>5uK=o%=-)!DG?!K@Gd3#Cp$Lh?TrWCNDNF)$F~t`Dy5jipmY9A`wn(_Q64ObZazHo|J{OQTHT(v&7yWQCwr3}rQZcG}GD+kJ`;KwoKKy5p%)8sjlOEG)+G*FAka-}FN(ro<+FCq6pZXns+wkakRmvAK;f zD2Y0|IQEP+tPFzHKu#kFp1P3e#r2Qx{=R2K1!Y3Q>5dV2KKJczFE1BR>Z72|x-o#w z;eu>QGSh|DN$P@@%) z)=%$0-pjxYs|Hd9#)ikG(ZK}$E+h9oACEL&~Rbwr(Mz6Ae#X%B5P(ljw* zq=oDZkC}ed)_lHF=Nz}1P9EEGK&`ioE<0V&3Kps&&S^_+B z5*RhvyIn8YYt0$^$=M!eII5h>JFm5X@Gjyd+i1)xLD^Sthh=^rLa*N{qp5T;DODLRRs$zg4^;^*btxqn_n#+sh!%`48 zaHnwEA&jy&KJGX?GW^s9;>}~QG1F<#~m>k7Bft5_B z@Wi3VEU*1P&NtG}JrcOKqo9{usx)%(wSqM{Aq(3FB{ffXr`5Gh?q iO4YD&pa*qWCaUmZww6}As&*gUtfeO>&38}kzVaV8HjEhn diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index d49598fcbf..4f2c0140aa 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -362,6 +362,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error MorphClient: server.morphClient, IRList: server, ClientCache: clientCache, + Key: server.key, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), TaskManager: auditTaskManager, Reporter: server, diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index fa040ee6ee..625b9e1fc4 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -136,7 +136,7 @@ func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) sgSearchParams.WithSearchFilters(sgFilter) ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - result, err := cli.SearchObject(ctx, sgSearchParams) + result, err := cli.SearchObject(ctx, sgSearchParams, client.WithKey(ap.key)) cancel() if err != nil { diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 370c5bff1f..83d6c27ed5 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -2,6 +2,7 @@ package audit import ( "context" + "crypto/ecdsa" "time" "github.com/nspcc-dev/neo-go/pkg/util" @@ -46,6 +47,7 @@ type ( morphClient *client.Client irList Indexer clientCache NeoFSClientCache + key *ecdsa.PrivateKey searchTimeout time.Duration containerClient *wrapContainer.Wrapper @@ -68,6 +70,7 @@ type ( RPCSearchTimeout time.Duration TaskManager TaskManager Reporter audit.Reporter + Key *ecdsa.PrivateKey } ) @@ -97,6 +100,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/audit: audit task manager is not set") case p.Reporter == nil: return nil, errors.New("ir/audit: audit result reporter is not set") + case p.Key == nil: + return nil, errors.New("ir/audit: signing key is not set") } pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) @@ -124,6 +129,7 @@ func New(p *Params) (*Processor, error) { morphClient: p.MorphClient, irList: p.IRList, clientCache: p.ClientCache, + key: p.Key, searchTimeout: p.RPCSearchTimeout, containerClient: containerClient, netmapClient: netmapClient, diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 756dc82b08..ade8452fef 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -47,7 +47,9 @@ func newClientCache(p *clientCacheParams) *ClientCache { } func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { - return c.cache.Get(c.key, address, opts...) + // Because cache is used by `ClientCache` exclusively, + // client will always have valid key. + return c.cache.Get(address, opts...) } // GetSG polls the container from audit task to get the object by id. @@ -89,7 +91,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma } cctx, cancel := context.WithTimeout(ctx, c.sgTimeout) - obj, err := cli.GetObject(cctx, getParams) + obj, err := cli.GetObject(cctx, getParams, client.WithKey(c.key)) cancel() @@ -143,7 +145,9 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. } cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) - head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(ttl)) + head, err := cli.GetObjectHeader(cctx, headParams, + client.WithTTL(ttl), + client.WithKey(c.key)) cancel() @@ -177,7 +181,9 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje } cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) - result, err := cli.ObjectPayloadRangeTZ(cctx, rangeParams, client.WithTTL(1)) + result, err := cli.ObjectPayloadRangeTZ(cctx, rangeParams, + client.WithTTL(1), + client.WithKey(c.key)) cancel() diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 007fec8473..968faa412a 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -1,13 +1,9 @@ package cache import ( - "crypto/ecdsa" - "crypto/sha256" - "encoding/hex" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" - crypto "github.com/nspcc-dev/neofs-crypto" ) type ( @@ -28,11 +24,9 @@ func NewSDKClientCache() *ClientCache { } // Get function returns existing client or creates a new one. -func (c *ClientCache) Get(key *ecdsa.PrivateKey, address string, opts ...client.Option) (*client.Client, error) { - id := uniqueID(key, address) - +func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { c.mu.RLock() - if cli, ok := c.clients[id]; ok { + if cli, ok := c.clients[address]; ok { // todo: check underlying connection neofs-api-go#196 c.mu.RUnlock() @@ -46,22 +40,16 @@ func (c *ClientCache) Get(key *ecdsa.PrivateKey, address string, opts ...client. // check once again if client is missing in cache, concurrent routine could // create client while this routine was locked on `c.mu.Lock()`. - if cli, ok := c.clients[id]; ok { + if cli, ok := c.clients[address]; ok { return cli, nil } - cli, err := client.New(key, append(opts, client.WithAddress(address))...) + cli, err := client.New(nil, append(opts, client.WithAddress(address))...) if err != nil { return nil, err } - c.clients[id] = cli + c.clients[address] = cli return cli, nil } - -func uniqueID(key *ecdsa.PrivateKey, address string) string { - keyFingerprint := sha256.Sum256(crypto.MarshalPrivateKey(key)) - - return hex.EncodeToString(keyFingerprint[:]) + address -} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 900c0cdd58..882226855a 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -111,7 +111,7 @@ func (exec execCtx) key() *ecdsa.PrivateKey { func (exec execCtx) callOptions() []client.CallOption { return exec.prm.common.RemoteCallOptions( util.WithNetmapEpoch(exec.curProcEpoch), - ) + util.WithKey(exec.key())) } func (exec execCtx) remotePrm() *client.GetObjectParams { @@ -276,7 +276,7 @@ func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) { log.Debug("could not calculate node IP address") case err == nil: - c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + c, err := exec.svc.clientCache.get(ipAddr) switch { default: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index f5df9cdf8e..f7d5fcd6c5 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -2,7 +2,6 @@ package getsvc import ( "context" - "crypto/ecdsa" "crypto/rand" "crypto/sha256" "fmt" @@ -81,7 +80,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return vs, nil } -func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (getClient, error) { +func (c *testClientCache) get(addr string) (getClient, error) { v, ok := c.clients[addr] if !ok { return nil, errors.New("could not construct client") diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index e0263985fb..d67ae0d194 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,8 +1,6 @@ package getsvc import ( - "crypto/ecdsa" - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -37,7 +35,7 @@ type cfg struct { } clientCache interface { - get(*ecdsa.PrivateKey, string) (getClient, error) + get(string) (getClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index a3354980d1..db2c7022a6 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -1,7 +1,6 @@ package getsvc import ( - "crypto/ecdsa" "io" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -75,8 +74,8 @@ func (s *SimpleObjectWriter) Object() *object.Object { return s.obj.Object() } -func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, error) { - clt, err := c.cache.Get(key, addr, c.opts...) +func (c *clientCacheWrapper) get(addr string) (getClient, error) { + clt, err := c.cache.Get(addr, c.opts...) return &clientWrapper{ client: clt, diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index bd7a692b21..a54e378601 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -70,7 +70,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return nil, err } - c, err := h.clientCache.Get(key, addr, h.clientOpts...) + c, err := h.clientCache.Get(addr, h.clientOpts...) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) } @@ -87,6 +87,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob client.WithTTL(1), // FIXME: use constant client.WithSession(prm.commonHeadPrm.common.SessionToken()), client.WithBearer(prm.commonHeadPrm.common.BearerToken()), + client.WithKey(key), ) if err != nil { return nil, errors.Wrapf(err, "(%T) could not head object in %s", h, addr) diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index f1c72526e7..30d4be0461 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -64,7 +64,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, err } - c, err := t.clientCache.Get(key, addr, t.clientOpts...) + c, err := t.clientCache.Get(addr, t.clientOpts...) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) } @@ -76,6 +76,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { append( t.commonPrm.RemoteCallOptions(), client.WithTTL(1), // FIXME: use constant + client.WithKey(key), )..., ) if err != nil { diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 3c7194bbb9..4803770f29 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -69,6 +69,7 @@ func (exec execCtx) key() *ecdsa.PrivateKey { func (exec execCtx) callOptions() []client.CallOption { return exec.prm.common.RemoteCallOptions( util.WithNetmapEpoch(exec.curProcEpoch), + util.WithKey(exec.key()), ) } @@ -146,7 +147,7 @@ func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) { log.Debug("could not calculate node IP address") case err == nil: - c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + c, err := exec.svc.clientCache.get(ipAddr) switch { default: diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 3b7a79691f..4def0f9310 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -2,7 +2,6 @@ package searchsvc import ( "context" - "crypto/ecdsa" "crypto/rand" "crypto/sha256" "fmt" @@ -83,7 +82,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return res, nil } -func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) { +func (c *testClientCache) get(addr string) (searchClient, error) { v, ok := c.clients[addr] if !ok { return nil, errors.New("could not construct client") diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index ccaa4daf88..046b7732e3 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,8 +1,6 @@ package searchsvc import ( - "crypto/ecdsa" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -36,7 +34,7 @@ type cfg struct { } clientCache interface { - get(*ecdsa.PrivateKey, string) (searchClient, error) + get(string) (searchClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index a59cfebf52..3485d559a0 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -1,7 +1,6 @@ package searchsvc import ( - "crypto/ecdsa" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -71,8 +70,8 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { return w.writer.WriteIDs(list) } -func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (searchClient, error) { - clt, err := c.cache.Get(key, addr, c.opts...) +func (c *clientCacheWrapper) get(addr string) (searchClient, error) { + clt, err := c.cache.Get(addr, c.opts...) return &clientWrapper{ client: clt, diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index f8b4eb7187..ea54574e26 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -116,6 +116,13 @@ func WithNetmapEpoch(v uint64) DynamicCallOption { } } +// WithKey sets key to use for the request. +func WithKey(key *ecdsa.PrivateKey) DynamicCallOption { + return func(o *remoteCallOpts) { + o.opts = append(o.opts, client.WithKey(key)) + } +} + func (p *CommonPrm) SessionToken() *token.SessionToken { if p != nil { return p.token