Compare commits
3 commits
master
...
fix/unsubc
Author | SHA1 | Date | |
---|---|---|---|
04ab939e4c | |||
368774be95 | |||
b9ef294b99 |
8 changed files with 103 additions and 9 deletions
|
@ -282,11 +282,62 @@ func initNetmapState(c *cfg) {
|
||||||
c.handleLocalNodeInfo(ni)
|
c.handleLocalNodeInfo(ni)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool {
|
func needsUpdate(local, remote *netmapSDK.NodeInfo) bool {
|
||||||
// Suboptimal, but we do this once on the node startup.
|
return bytes.Equal(local.PublicKey(), remote.PublicKey()) && equalEndpoints(local, remote) && equalAttributes(local, remote)
|
||||||
rawA := a.Marshal()
|
}
|
||||||
rawB := b.Marshal()
|
|
||||||
return bytes.Equal(rawA, rawB)
|
func equalAttributes(local, remote *netmapSDK.NodeInfo) bool {
|
||||||
|
asA := make(map[string]string)
|
||||||
|
local.IterateAttributes(func(k, v string) {
|
||||||
|
asA[k] = v
|
||||||
|
})
|
||||||
|
|
||||||
|
allMatched := true
|
||||||
|
count := 0
|
||||||
|
remote.IterateAttributes(func(k, vb string) {
|
||||||
|
// IR adds new attributes derived from the locode, they should be skipped.
|
||||||
|
if isLocodeAttribute(k) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if va, ok := asA[k]; !ok || va != vb {
|
||||||
|
allMatched = false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
})
|
||||||
|
return allMatched && count == len(asA)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isLocodeAttribute(k string) bool {
|
||||||
|
// See https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/netmap/types.proto#L171
|
||||||
|
switch k {
|
||||||
|
case "Continent", "Country", "CountryCode", "Location", "SubDiv", "SubDivCode":
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalEndpoints(a, b *netmapSDK.NodeInfo) bool {
|
||||||
|
var esA, esB []string
|
||||||
|
a.IterateNetworkEndpoints(func(e string) bool {
|
||||||
|
esA = append(esA, e)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
b.IterateNetworkEndpoints(func(e string) bool {
|
||||||
|
esB = append(esB, e)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(esA) != len(esB) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := range esA {
|
||||||
|
if esA[i] != esB[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodeState(ni *netmapSDK.NodeInfo) string {
|
func nodeState(ni *netmapSDK.NodeInfo) string {
|
||||||
|
@ -314,7 +365,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
|
||||||
for i := range nmNodes {
|
for i := range nmNodes {
|
||||||
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
||||||
candidate = &nmNodes[i]
|
candidate = &nmNodes[i]
|
||||||
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate)
|
alreadyBootstraped = candidate.IsOnline() && needsUpdate(&c.cfgNodeInfo.localInfo, candidate)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -6,9 +6,10 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
github.com/cenkalti/backoff v2.2.1+incompatible
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
github.com/chzyer/readline v1.5.1
|
github.com/chzyer/readline v1.5.1
|
||||||
github.com/dgraph-io/ristretto v0.1.1
|
github.com/dgraph-io/ristretto v0.1.1
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -195,6 +195,8 @@ const (
|
||||||
SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block"
|
SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block"
|
||||||
SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct"
|
SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct"
|
||||||
SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain"
|
SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain"
|
||||||
|
SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents = "could not switch rpc during the unsubscription from events"
|
||||||
|
SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy = "could not unsubscribe from events on backoff policy"
|
||||||
BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB"
|
BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB"
|
||||||
BlobovniczaOpeningBoltDB = "opening BoltDB"
|
BlobovniczaOpeningBoltDB = "opening BoltDB"
|
||||||
BlobovniczaInitializing = "initializing..."
|
BlobovniczaInitializing = "initializing..."
|
||||||
|
|
|
@ -74,6 +74,9 @@ type Client struct {
|
||||||
// channel for internal stop
|
// channel for internal stop
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
|
||||||
|
// channel to indicate that close is done
|
||||||
|
closeDone chan struct{}
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
// provided RPC endpoints
|
// provided RPC endpoints
|
||||||
|
|
|
@ -120,6 +120,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
||||||
accAddr: accAddr,
|
accAddr: accAddr,
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
closeChan: make(chan struct{}),
|
closeChan: make(chan struct{}),
|
||||||
|
closeDone: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
cli.endpoints.init(cfg.endpoints)
|
cli.endpoints.init(cfg.endpoints)
|
||||||
|
@ -169,6 +170,7 @@ func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClie
|
||||||
Options: rpcclient.Options{
|
Options: rpcclient.Options{
|
||||||
DialTimeout: c.cfg.dialTimeout,
|
DialTimeout: c.cfg.dialTimeout,
|
||||||
},
|
},
|
||||||
|
CloseNotificationChannelIfFull: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("WS client creation: %w", err)
|
return nil, nil, fmt.Errorf("WS client creation: %w", err)
|
||||||
|
|
|
@ -79,8 +79,10 @@ func (c *Client) closeWaiter(ctx context.Context) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case <-c.closeChan:
|
case <-c.closeChan:
|
||||||
}
|
}
|
||||||
|
//nolint:contextcheck
|
||||||
_ = c.UnsubscribeAll()
|
_ = c.UnsubscribeAll()
|
||||||
c.close()
|
c.close()
|
||||||
|
close(c.closeDone)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) switchToMostPrioritized(ctx context.Context) {
|
func (c *Client) switchToMostPrioritized(ctx context.Context) {
|
||||||
|
|
|
@ -1,11 +1,18 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Close closes connection to the remote side making
|
// Close closes connection to the remote side making
|
||||||
|
@ -17,6 +24,11 @@ func (c *Client) Close() {
|
||||||
// to prevent switching to another RPC node
|
// to prevent switching to another RPC node
|
||||||
// in the notification loop
|
// in the notification loop
|
||||||
close(c.closeChan)
|
close(c.closeChan)
|
||||||
|
|
||||||
|
// closeWaiter performs asynchronously and thus
|
||||||
|
// we may abrupt all process related to close process
|
||||||
|
// if we do not wait for closing process finish.
|
||||||
|
<-c.closeDone
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReceiveExecutionNotifications performs subscription for notifications
|
// ReceiveExecutionNotifications performs subscription for notifications
|
||||||
|
@ -100,6 +112,27 @@ func (c *Client) UnsubscribeAll() error {
|
||||||
return ErrConnectionLost
|
return ErrConnectionLost
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.client.UnsubscribeAll()
|
if err := c.client.UnsubscribeAll(); err != nil {
|
||||||
return err
|
// TODO (aarifullin): consider the situation when the morph client
|
||||||
|
// failed to subscribe for events because of websocket client problems
|
||||||
|
// "under hood". After failed subscription the client invokes Close()
|
||||||
|
// that invokes UnsubscribeAll(). This requires to push new request
|
||||||
|
// via websocket to neo-go but the websocket client may be down.
|
||||||
|
// Therefore, morph will never request neo-go to unsubscribe from events, but
|
||||||
|
// we can try to fix this by reconnecting to neo-go.
|
||||||
|
backoffSettings := backoff.NewExponentialBackOff()
|
||||||
|
backoffSettings.MaxElapsedTime = 30 * time.Second
|
||||||
|
return backoff.Retry(func() error {
|
||||||
|
if !c.SwitchRPC(context.TODO()) {
|
||||||
|
c.logger.Warn(logs.SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents)
|
||||||
|
return fmt.Errorf("could not switch rpc")
|
||||||
|
}
|
||||||
|
err := c.client.UnsubscribeAll()
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}, backoffSettings)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue