diff --git a/cmd/frostfs-node/accounting.go b/cmd/frostfs-node/accounting.go index 26acc043..6a35f37d 100644 --- a/cmd/frostfs-node/accounting.go +++ b/cmd/frostfs-node/accounting.go @@ -1,6 +1,8 @@ package main import ( + "context" + accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc" @@ -8,9 +10,9 @@ import ( accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph" ) -func initAccountingService(c *cfg) { +func initAccountingService(ctx context.Context, c *cfg) { if c.cfgMorph.client == nil { - initMorphComponents(c) + initMorphComponents(ctx, c) } balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0) diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index fdb00322..d5f711a5 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -95,8 +95,8 @@ func initApp(ctx context.Context, c *cfg) { }) initAndLog(c, "gRPC", initGRPC) - initAndLog(c, "netmap", initNetmapService) - initAndLog(c, "accounting", initAccountingService) + initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) }) + initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) }) initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", initReputationService) diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 2dfbe5c1..2db865ca 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -27,7 +27,7 @@ const ( notaryDepositRetriesAmount = 300 ) -func initMorphComponents(c *cfg) { +func initMorphComponents(ctx context.Context, c *cfg) { var err error addresses := morphconfig.RPCEndpoint(c.appCfg) @@ -38,7 +38,8 @@ func initMorphComponents(c *cfg) { addresses[i], addresses[j] = addresses[j], addresses[i] }) - cli, err := client.New(c.key, + cli, err := client.New(ctx, + c.key, client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), client.WithEndpoints(addresses...), diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index 845649de..d9b1c920 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "errors" "fmt" @@ -135,7 +136,7 @@ func (c *cfg) addressNum() int { return 0 } -func initNetmapService(c *cfg) { +func initNetmapService(ctx context.Context, c *cfg) { network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo) c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes()) parseAttributes(c) @@ -144,7 +145,7 @@ func initNetmapService(c *cfg) { readSubnetCfg(c) if c.cfgMorph.client == nil { - initMorphComponents(c) + initMorphComponents(ctx, c) } initNetmapState(c) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 3b42a585..a91d2fd0 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -464,8 +464,8 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c } return client.New( + ctx, p.key, - client.WithContext(ctx), client.WithLogger(p.log), client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")), client.WithSigner(p.sgn), diff --git a/pkg/morph/client/audit/result_test.go b/pkg/morph/client/audit/result_test.go index a0137e3b..5ce1cc74 100644 --- a/pkg/morph/client/audit/result_test.go +++ b/pkg/morph/client/audit/result_test.go @@ -1,6 +1,7 @@ package audit import ( + "context" "testing" "time" @@ -26,7 +27,7 @@ func TestAuditResults(t *testing.T) { auditHash, err := util.Uint160DecodeStringLE(sAuditHash) require.NoError(t, err) - morphClient, err := client.New(key, client.WithEndpoints(client.Endpoint{Address: endpoint})) + morphClient, err := client.New(context.Background(), key, client.WithEndpoints(client.Endpoint{Address: endpoint})) require.NoError(t, err) auditClientWrapper, err := NewFromMorph(morphClient, auditHash, 0) diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index e4569ad0..9ed27502 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -29,10 +29,7 @@ type Option func(*cfg) type Callback func() // groups the configurations with default values. -// nolint: containedctx type cfg struct { - ctx context.Context // neo-go client context - dialTimeout time.Duration // client dial timeout logger *logger.Logger // logging component @@ -57,7 +54,6 @@ const ( func defaultConfig() *cfg { return &cfg{ - ctx: context.Background(), dialTimeout: defaultDialTimeout, logger: &logger.Logger{Logger: zap.L()}, waitInterval: defaultWaitInterval, @@ -84,7 +80,7 @@ func defaultConfig() *cfg { // If desired option satisfies the default value, it can be omitted. // If multiple options of the same config value are supplied, // the option with the highest index in the arguments will be used. -func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { +func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, error) { if key == nil { panic("empty private key") } @@ -142,20 +138,20 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { return nil, fmt.Errorf("could not create RPC actor: %w", err) } } else { - cli.client, act, err = cli.newCli(cli.endpoints.list[0].Address) + cli.client, act, err = cli.newCli(ctx, cli.endpoints.list[0].Address) if err != nil { return nil, fmt.Errorf("could not create RPC client: %w", err) } } cli.setActor(act) - go cli.notificationLoop() + go cli.notificationLoop(ctx) return cli, nil } -func (c *Client) newCli(endpoint string) (*rpcclient.WSClient, *actor.Actor, error) { - cli, err := rpcclient.NewWS(c.cfg.ctx, endpoint, rpcclient.Options{ +func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClient, *actor.Actor, error) { + cli, err := rpcclient.NewWS(ctx, endpoint, rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }) if err != nil { @@ -201,21 +197,6 @@ func newClientCache() cache { } } -// WithContext returns a client constructor option that -// specifies the neo-go client context. -// -// Ignores nil value. Has no effect if WithSingleClient -// is provided. -// -// If option not provided, context.Background() is used. -func WithContext(ctx context.Context) Option { - return func(c *cfg) { - if ctx != nil { - c.ctx = ctx - } - } -} - // WithDialTimeout returns a client constructor option // that specifies neo-go client dial timeout duration. // diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 54af56b2..5d736839 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -1,6 +1,7 @@ package client import ( + "context" "sort" "time" @@ -32,7 +33,7 @@ func (e *endpoints) init(ee []Endpoint) { e.list = ee } -func (c *Client) switchRPC() bool { +func (c *Client) switchRPC(ctx context.Context) bool { c.switchLock.Lock() defer c.switchLock.Unlock() @@ -41,7 +42,7 @@ func (c *Client) switchRPC() bool { // Iterate endpoints in the order of decreasing priority. for c.endpoints.curr = range c.endpoints.list { newEndpoint := c.endpoints.list[c.endpoints.curr].Address - cli, act, err := c.newCli(newEndpoint) + cli, act, err := c.newCli(ctx, newEndpoint) if err != nil { c.logger.Warn("could not establish connection to the switched RPC node", zap.String("endpoint", newEndpoint), @@ -56,7 +57,7 @@ func (c *Client) switchRPC() bool { c.logger.Info("connection to the new RPC node has been established", zap.String("endpoint", newEndpoint)) - subs, ok := c.restoreSubscriptions(cli, newEndpoint, false) + subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false) if !ok { // new WS client does not allow // restoring subscription, client @@ -74,7 +75,7 @@ func (c *Client) switchRPC() bool { if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { c.switchIsActive.Store(true) - go c.switchToMostPrioritized() + go c.switchToMostPrioritized(ctx) } return true @@ -83,7 +84,7 @@ func (c *Client) switchRPC() bool { return false } -func (c *Client) notificationLoop() { +func (c *Client) notificationLoop(ctx context.Context) { var e any var ok bool @@ -95,7 +96,7 @@ func (c *Client) notificationLoop() { c.switchLock.RUnlock() select { - case <-c.cfg.ctx.Done(): + case <-ctx.Done(): _ = c.UnsubscribeAll() c.close() @@ -111,17 +112,17 @@ func (c *Client) notificationLoop() { } if ok { - c.routeEvent(e) + c.routeEvent(ctx, e) continue } - if !c.reconnect() { + if !c.reconnect(ctx) { return } } } -func (c *Client) routeEvent(e any) { +func (c *Client) routeEvent(ctx context.Context, e any) { typedNotification := rpcclient.Notification{Value: e} switch e.(type) { @@ -135,7 +136,7 @@ func (c *Client) routeEvent(e any) { select { case c.notifications <- typedNotification: - case <-c.cfg.ctx.Done(): + case <-ctx.Done(): _ = c.UnsubscribeAll() c.close() case <-c.closeChan: @@ -144,7 +145,7 @@ func (c *Client) routeEvent(e any) { } } -func (c *Client) reconnect() bool { +func (c *Client) reconnect(ctx context.Context) bool { if closeErr := c.client.GetError(); closeErr != nil { c.logger.Warn("switching to the next RPC node", zap.String("reason", closeErr.Error()), @@ -156,7 +157,7 @@ func (c *Client) reconnect() bool { return true } - if !c.switchRPC() { + if !c.switchRPC(ctx) { c.logger.Error("could not establish connection to any RPC node") // could not connect to all endpoints => @@ -173,7 +174,7 @@ func (c *Client) reconnect() bool { return true } -func (c *Client) switchToMostPrioritized() { +func (c *Client) switchToMostPrioritized(ctx context.Context) { t := time.NewTicker(c.cfg.switchInterval) defer t.Stop() defer c.switchIsActive.Store(false) @@ -181,7 +182,7 @@ func (c *Client) switchToMostPrioritized() { mainLoop: for { select { - case <-c.cfg.ctx.Done(): + case <-ctx.Done(): return case <-t.C: c.switchLock.RLock() @@ -207,7 +208,7 @@ mainLoop: tryE := e.Address - cli, act, err := c.newCli(tryE) + cli, act, err := c.newCli(ctx, tryE) if err != nil { c.logger.Warn("could not create client to the higher priority node", zap.String("endpoint", tryE), @@ -216,7 +217,7 @@ mainLoop: continue } - if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok { + if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok { c.switchLock.Lock() // higher priority node could have been diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index ed036870..300bab82 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,6 +1,8 @@ package client import ( + "context" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" @@ -227,7 +229,7 @@ type subsInfo struct { // one contains subscription information applied to the passed cli // and receivers for the updated subscriptions. // Does not change Client instance. -func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { +func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { var ( err error id string @@ -240,7 +242,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, notificationRcv := make(chan *state.ContainedNotificationEvent) notaryReqRcv := make(chan *result.NotaryRequestEvent) - c.startListen(stopCh, blockRcv, notificationRcv, notaryReqRcv, background) + c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background) if background { c.switchLock.RLock() @@ -304,7 +306,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, return si, true } -func (c *Client) startListen(stopCh <-chan struct{}, blockRcv <-chan *block.Block, +func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block, notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) { // neo-go WS client says to _always_ read notifications // from its channel. Subscribing to any notification @@ -335,7 +337,7 @@ func (c *Client) startListen(stopCh <-chan struct{}, blockRcv <-chan *block.Bloc continue } - c.routeEvent(e) + c.routeEvent(ctx, e) } }() }