Compare commits

...

3 commits

Author SHA1 Message Date
04ab939e4c [#706] morph: Use backoff strategy for UnsubscribeAll()
All checks were successful
DCO action / DCO (pull_request) Successful in 2m42s
Build / Build Components (1.20) (pull_request) Successful in 4m26s
Tests and linters / Staticcheck (pull_request) Successful in 4m12s
Tests and linters / Lint (pull_request) Successful in 4m43s
Vulncheck / Vulncheck (pull_request) Successful in 4m58s
Tests and linters / Tests (1.20) (pull_request) Successful in 5m8s
Tests and linters / Tests (1.21) (pull_request) Successful in 5m23s
Tests and linters / Tests with -race (pull_request) Successful in 7m14s
Build / Build Components (1.21) (pull_request) Successful in 9m15s
* Sometimes the morph client cannot unsubscribe from events
  because websocket client may be got down and neo-go will
  never the request for unsubscription.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-26 17:47:16 +03:00
368774be95 [#691] node: Compare node info during initial bootstrap properly
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-18 07:30:15 +00:00
b9ef294b99 [#692] go.mod: Update sdk-go
All checks were successful
DCO action / DCO (pull_request) Successful in 4m8s
Vulncheck / Vulncheck (pull_request) Successful in 4m50s
Build / Build Components (1.21) (pull_request) Successful in 6m29s
Build / Build Components (1.20) (pull_request) Successful in 6m44s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m20s
Tests and linters / Staticcheck (pull_request) Successful in 7m12s
Tests and linters / Lint (pull_request) Successful in 7m37s
Tests and linters / Tests (1.20) (pull_request) Successful in 7m35s
Tests and linters / Tests with -race (pull_request) Successful in 8m37s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 16:54:15 +03:00
8 changed files with 103 additions and 9 deletions

View file

@ -282,11 +282,62 @@ func initNetmapState(c *cfg) {
c.handleLocalNodeInfo(ni) c.handleLocalNodeInfo(ni)
} }
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool { func needsUpdate(local, remote *netmapSDK.NodeInfo) bool {
// Suboptimal, but we do this once on the node startup. return bytes.Equal(local.PublicKey(), remote.PublicKey()) && equalEndpoints(local, remote) && equalAttributes(local, remote)
rawA := a.Marshal() }
rawB := b.Marshal()
return bytes.Equal(rawA, rawB) func equalAttributes(local, remote *netmapSDK.NodeInfo) bool {
asA := make(map[string]string)
local.IterateAttributes(func(k, v string) {
asA[k] = v
})
allMatched := true
count := 0
remote.IterateAttributes(func(k, vb string) {
// IR adds new attributes derived from the locode, they should be skipped.
if isLocodeAttribute(k) {
return
}
if va, ok := asA[k]; !ok || va != vb {
allMatched = false
return
}
count++
})
return allMatched && count == len(asA)
}
func isLocodeAttribute(k string) bool {
// See https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/netmap/types.proto#L171
switch k {
case "Continent", "Country", "CountryCode", "Location", "SubDiv", "SubDivCode":
return true
default:
return false
}
}
func equalEndpoints(a, b *netmapSDK.NodeInfo) bool {
var esA, esB []string
a.IterateNetworkEndpoints(func(e string) bool {
esA = append(esA, e)
return false
})
b.IterateNetworkEndpoints(func(e string) bool {
esB = append(esB, e)
return false
})
if len(esA) != len(esB) {
return false
}
for i := range esA {
if esA[i] != esB[i] {
return false
}
}
return true
} }
func nodeState(ni *netmapSDK.NodeInfo) string { func nodeState(ni *netmapSDK.NodeInfo) string {
@ -314,7 +365,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
for i := range nmNodes { for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) { if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
candidate = &nmNodes[i] candidate = &nmNodes[i]
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate) alreadyBootstraped = candidate.IsOnline() && needsUpdate(&c.cfgNodeInfo.localInfo, candidate)
break break
} }
} }

3
go.mod
View file

@ -6,9 +6,10 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0 git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1 github.com/chzyer/readline v1.5.1
github.com/dgraph-io/ristretto v0.1.1 github.com/dgraph-io/ristretto v0.1.1

BIN
go.sum

Binary file not shown.

View file

@ -195,6 +195,8 @@ const (
SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block" SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block"
SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct" SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct"
SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain" SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain"
SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents = "could not switch rpc during the unsubscription from events"
SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy = "could not unsubscribe from events on backoff policy"
BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB" BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB"
BlobovniczaOpeningBoltDB = "opening BoltDB" BlobovniczaOpeningBoltDB = "opening BoltDB"
BlobovniczaInitializing = "initializing..." BlobovniczaInitializing = "initializing..."

View file

@ -74,6 +74,9 @@ type Client struct {
// channel for internal stop // channel for internal stop
closeChan chan struct{} closeChan chan struct{}
// channel to indicate that close is done
closeDone chan struct{}
// indicates that Client is not able to // indicates that Client is not able to
// establish connection to any of the // establish connection to any of the
// provided RPC endpoints // provided RPC endpoints

View file

@ -120,6 +120,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
accAddr: accAddr, accAddr: accAddr,
cfg: *cfg, cfg: *cfg,
closeChan: make(chan struct{}), closeChan: make(chan struct{}),
closeDone: make(chan struct{}),
} }
cli.endpoints.init(cfg.endpoints) cli.endpoints.init(cfg.endpoints)
@ -169,6 +170,7 @@ func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClie
Options: rpcclient.Options{ Options: rpcclient.Options{
DialTimeout: c.cfg.dialTimeout, DialTimeout: c.cfg.dialTimeout,
}, },
CloseNotificationChannelIfFull: true,
}) })
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("WS client creation: %w", err) return nil, nil, fmt.Errorf("WS client creation: %w", err)

View file

@ -79,8 +79,10 @@ func (c *Client) closeWaiter(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
case <-c.closeChan: case <-c.closeChan:
} }
//nolint:contextcheck
_ = c.UnsubscribeAll() _ = c.UnsubscribeAll()
c.close() c.close()
close(c.closeDone)
} }
func (c *Client) switchToMostPrioritized(ctx context.Context) { func (c *Client) switchToMostPrioritized(ctx context.Context) {

View file

@ -1,11 +1,18 @@
package client package client
import ( import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/cenkalti/backoff"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
) )
// Close closes connection to the remote side making // Close closes connection to the remote side making
@ -17,6 +24,11 @@ func (c *Client) Close() {
// to prevent switching to another RPC node // to prevent switching to another RPC node
// in the notification loop // in the notification loop
close(c.closeChan) close(c.closeChan)
// closeWaiter performs asynchronously and thus
// we may abrupt all process related to close process
// if we do not wait for closing process finish.
<-c.closeDone
} }
// ReceiveExecutionNotifications performs subscription for notifications // ReceiveExecutionNotifications performs subscription for notifications
@ -100,6 +112,27 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost return ErrConnectionLost
} }
err := c.client.UnsubscribeAll() if err := c.client.UnsubscribeAll(); err != nil {
return err // TODO (aarifullin): consider the situation when the morph client
// failed to subscribe for events because of websocket client problems
// "under hood". After failed subscription the client invokes Close()
// that invokes UnsubscribeAll(). This requires to push new request
// via websocket to neo-go but the websocket client may be down.
// Therefore, morph will never request neo-go to unsubscribe from events, but
// we can try to fix this by reconnecting to neo-go.
backoffSettings := backoff.NewExponentialBackOff()
backoffSettings.MaxElapsedTime = 30 * time.Second
return backoff.Retry(func() error {
if !c.SwitchRPC(context.TODO()) {
c.logger.Warn(logs.SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents)
return fmt.Errorf("could not switch rpc")
}
err := c.client.UnsubscribeAll()
if err != nil {
c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err))
}
return err
}, backoffSettings)
}
return nil
} }