diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 5d217f3c..82fb6763 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -217,24 +217,27 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return nil, errors.Wrap(err, "could not convert address to IP format") } - c, err := r.clientCache.Get(r.key, ipAddr) + c, err := r.clientCache.Get(ipAddr) if err != nil { return nil, errors.Wrap(err, "could not initialize API client") } return &remoteLoadAnnounceWriterProvider{ client: c, + key: r.key, }, nil } type remoteLoadAnnounceWriterProvider struct { client *apiClient.Client + key *ecdsa.PrivateKey } func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { return &remoteLoadAnnounceWriter{ ctx: ctx, client: p.client, + key: p.key, }, nil } @@ -242,6 +245,7 @@ type remoteLoadAnnounceWriter struct { ctx context.Context client *apiClient.Client + key *ecdsa.PrivateKey buf []containerSDK.UsedSpaceAnnouncement } @@ -253,7 +257,7 @@ func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) err } func (r *remoteLoadAnnounceWriter) Close() error { - return r.client.AnnounceContainerUsedSpace(r.ctx, r.buf) + return r.client.AnnounceContainerUsedSpace(r.ctx, r.buf, apiClient.WithKey(r.key)) } type loadPlacementBuilder struct { diff --git a/go.mod b/go.mod index 2ea9ff42..e434dda0 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.94.0-pre.0.20210301112733-3227de8050f8 - github.com/nspcc-dev/neofs-api-go v1.24.0 + github.com/nspcc-dev/neofs-api-go v1.24.1-0.20210312074010-64505180b400 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index b044e90f..66151a54 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index d49598fc..4f2c0140 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -362,6 +362,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error MorphClient: server.morphClient, IRList: server, ClientCache: clientCache, + Key: server.key, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), TaskManager: auditTaskManager, Reporter: server, diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index fa040ee6..625b9e1f 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -136,7 +136,7 @@ func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) sgSearchParams.WithSearchFilters(sgFilter) ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - result, err := cli.SearchObject(ctx, sgSearchParams) + result, err := cli.SearchObject(ctx, sgSearchParams, client.WithKey(ap.key)) cancel() if err != nil { diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 370c5bff..83d6c27e 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -2,6 +2,7 @@ package audit import ( "context" + "crypto/ecdsa" "time" "github.com/nspcc-dev/neo-go/pkg/util" @@ -46,6 +47,7 @@ type ( morphClient *client.Client irList Indexer clientCache NeoFSClientCache + key *ecdsa.PrivateKey searchTimeout time.Duration containerClient *wrapContainer.Wrapper @@ -68,6 +70,7 @@ type ( RPCSearchTimeout time.Duration TaskManager TaskManager Reporter audit.Reporter + Key *ecdsa.PrivateKey } ) @@ -97,6 +100,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/audit: audit task manager is not set") case p.Reporter == nil: return nil, errors.New("ir/audit: audit result reporter is not set") + case p.Key == nil: + return nil, errors.New("ir/audit: signing key is not set") } pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) @@ -124,6 +129,7 @@ func New(p *Params) (*Processor, error) { morphClient: p.MorphClient, irList: p.IRList, clientCache: p.ClientCache, + key: p.Key, searchTimeout: p.RPCSearchTimeout, containerClient: containerClient, netmapClient: netmapClient, diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 756dc82b..ade8452f 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -47,7 +47,9 @@ func newClientCache(p *clientCacheParams) *ClientCache { } func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { - return c.cache.Get(c.key, address, opts...) + // Because cache is used by `ClientCache` exclusively, + // client will always have valid key. + return c.cache.Get(address, opts...) } // GetSG polls the container from audit task to get the object by id. @@ -89,7 +91,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma } cctx, cancel := context.WithTimeout(ctx, c.sgTimeout) - obj, err := cli.GetObject(cctx, getParams) + obj, err := cli.GetObject(cctx, getParams, client.WithKey(c.key)) cancel() @@ -143,7 +145,9 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. } cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) - head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(ttl)) + head, err := cli.GetObjectHeader(cctx, headParams, + client.WithTTL(ttl), + client.WithKey(c.key)) cancel() @@ -177,7 +181,9 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje } cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) - result, err := cli.ObjectPayloadRangeTZ(cctx, rangeParams, client.WithTTL(1)) + result, err := cli.ObjectPayloadRangeTZ(cctx, rangeParams, + client.WithTTL(1), + client.WithKey(c.key)) cancel() diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 007fec84..968faa41 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -1,13 +1,9 @@ package cache import ( - "crypto/ecdsa" - "crypto/sha256" - "encoding/hex" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" - crypto "github.com/nspcc-dev/neofs-crypto" ) type ( @@ -28,11 +24,9 @@ func NewSDKClientCache() *ClientCache { } // Get function returns existing client or creates a new one. -func (c *ClientCache) Get(key *ecdsa.PrivateKey, address string, opts ...client.Option) (*client.Client, error) { - id := uniqueID(key, address) - +func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { c.mu.RLock() - if cli, ok := c.clients[id]; ok { + if cli, ok := c.clients[address]; ok { // todo: check underlying connection neofs-api-go#196 c.mu.RUnlock() @@ -46,22 +40,16 @@ func (c *ClientCache) Get(key *ecdsa.PrivateKey, address string, opts ...client. // check once again if client is missing in cache, concurrent routine could // create client while this routine was locked on `c.mu.Lock()`. - if cli, ok := c.clients[id]; ok { + if cli, ok := c.clients[address]; ok { return cli, nil } - cli, err := client.New(key, append(opts, client.WithAddress(address))...) + cli, err := client.New(nil, append(opts, client.WithAddress(address))...) if err != nil { return nil, err } - c.clients[id] = cli + c.clients[address] = cli return cli, nil } - -func uniqueID(key *ecdsa.PrivateKey, address string) string { - keyFingerprint := sha256.Sum256(crypto.MarshalPrivateKey(key)) - - return hex.EncodeToString(keyFingerprint[:]) + address -} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 900c0cdd..88222685 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -111,7 +111,7 @@ func (exec execCtx) key() *ecdsa.PrivateKey { func (exec execCtx) callOptions() []client.CallOption { return exec.prm.common.RemoteCallOptions( util.WithNetmapEpoch(exec.curProcEpoch), - ) + util.WithKey(exec.key())) } func (exec execCtx) remotePrm() *client.GetObjectParams { @@ -276,7 +276,7 @@ func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) { log.Debug("could not calculate node IP address") case err == nil: - c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + c, err := exec.svc.clientCache.get(ipAddr) switch { default: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index f5df9cdf..f7d5fcd6 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -2,7 +2,6 @@ package getsvc import ( "context" - "crypto/ecdsa" "crypto/rand" "crypto/sha256" "fmt" @@ -81,7 +80,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return vs, nil } -func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (getClient, error) { +func (c *testClientCache) get(addr string) (getClient, error) { v, ok := c.clients[addr] if !ok { return nil, errors.New("could not construct client") diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index e0263985..d67ae0d1 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,8 +1,6 @@ package getsvc import ( - "crypto/ecdsa" - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -37,7 +35,7 @@ type cfg struct { } clientCache interface { - get(*ecdsa.PrivateKey, string) (getClient, error) + get(string) (getClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index a3354980..db2c7022 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -1,7 +1,6 @@ package getsvc import ( - "crypto/ecdsa" "io" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -75,8 +74,8 @@ func (s *SimpleObjectWriter) Object() *object.Object { return s.obj.Object() } -func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, error) { - clt, err := c.cache.Get(key, addr, c.opts...) +func (c *clientCacheWrapper) get(addr string) (getClient, error) { + clt, err := c.cache.Get(addr, c.opts...) return &clientWrapper{ client: clt, diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index bd7a692b..a54e3786 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -70,7 +70,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return nil, err } - c, err := h.clientCache.Get(key, addr, h.clientOpts...) + c, err := h.clientCache.Get(addr, h.clientOpts...) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) } @@ -87,6 +87,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob client.WithTTL(1), // FIXME: use constant client.WithSession(prm.commonHeadPrm.common.SessionToken()), client.WithBearer(prm.commonHeadPrm.common.BearerToken()), + client.WithKey(key), ) if err != nil { return nil, errors.Wrapf(err, "(%T) could not head object in %s", h, addr) diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index f1c72526..30d4be04 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -64,7 +64,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, err } - c, err := t.clientCache.Get(key, addr, t.clientOpts...) + c, err := t.clientCache.Get(addr, t.clientOpts...) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) } @@ -76,6 +76,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { append( t.commonPrm.RemoteCallOptions(), client.WithTTL(1), // FIXME: use constant + client.WithKey(key), )..., ) if err != nil { diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 3c7194bb..4803770f 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -69,6 +69,7 @@ func (exec execCtx) key() *ecdsa.PrivateKey { func (exec execCtx) callOptions() []client.CallOption { return exec.prm.common.RemoteCallOptions( util.WithNetmapEpoch(exec.curProcEpoch), + util.WithKey(exec.key()), ) } @@ -146,7 +147,7 @@ func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) { log.Debug("could not calculate node IP address") case err == nil: - c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + c, err := exec.svc.clientCache.get(ipAddr) switch { default: diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 3b7a7969..4def0f93 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -2,7 +2,6 @@ package searchsvc import ( "context" - "crypto/ecdsa" "crypto/rand" "crypto/sha256" "fmt" @@ -83,7 +82,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return res, nil } -func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) { +func (c *testClientCache) get(addr string) (searchClient, error) { v, ok := c.clients[addr] if !ok { return nil, errors.New("could not construct client") diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index ccaa4daf..046b7732 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,8 +1,6 @@ package searchsvc import ( - "crypto/ecdsa" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -36,7 +34,7 @@ type cfg struct { } clientCache interface { - get(*ecdsa.PrivateKey, string) (searchClient, error) + get(string) (searchClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index a59cfebf..3485d559 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -1,7 +1,6 @@ package searchsvc import ( - "crypto/ecdsa" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -71,8 +70,8 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { return w.writer.WriteIDs(list) } -func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (searchClient, error) { - clt, err := c.cache.Get(key, addr, c.opts...) +func (c *clientCacheWrapper) get(addr string) (searchClient, error) { + clt, err := c.cache.Get(addr, c.opts...) return &clientWrapper{ client: clt, diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index f8b4eb71..ea54574e 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -116,6 +116,13 @@ func WithNetmapEpoch(v uint64) DynamicCallOption { } } +// WithKey sets key to use for the request. +func WithKey(key *ecdsa.PrivateKey) DynamicCallOption { + return func(o *remoteCallOpts) { + o.opts = append(o.opts, client.WithKey(key)) + } +} + func (p *CommonPrm) SessionToken() *token.SessionToken { if p != nil { return p.token