Compare commits
3 commits
master
...
fix/subscr
Author | SHA1 | Date | |
---|---|---|---|
23bec300dd | |||
368774be95 | |||
b9ef294b99 |
4 changed files with 81 additions and 16 deletions
|
@ -282,11 +282,62 @@ func initNetmapState(c *cfg) {
|
|||
c.handleLocalNodeInfo(ni)
|
||||
}
|
||||
|
||||
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool {
|
||||
// Suboptimal, but we do this once on the node startup.
|
||||
rawA := a.Marshal()
|
||||
rawB := b.Marshal()
|
||||
return bytes.Equal(rawA, rawB)
|
||||
func needsUpdate(local, remote *netmapSDK.NodeInfo) bool {
|
||||
return bytes.Equal(local.PublicKey(), remote.PublicKey()) && equalEndpoints(local, remote) && equalAttributes(local, remote)
|
||||
}
|
||||
|
||||
func equalAttributes(local, remote *netmapSDK.NodeInfo) bool {
|
||||
asA := make(map[string]string)
|
||||
local.IterateAttributes(func(k, v string) {
|
||||
asA[k] = v
|
||||
})
|
||||
|
||||
allMatched := true
|
||||
count := 0
|
||||
remote.IterateAttributes(func(k, vb string) {
|
||||
// IR adds new attributes derived from the locode, they should be skipped.
|
||||
if isLocodeAttribute(k) {
|
||||
return
|
||||
}
|
||||
if va, ok := asA[k]; !ok || va != vb {
|
||||
allMatched = false
|
||||
return
|
||||
}
|
||||
count++
|
||||
})
|
||||
return allMatched && count == len(asA)
|
||||
}
|
||||
|
||||
func isLocodeAttribute(k string) bool {
|
||||
// See https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/netmap/types.proto#L171
|
||||
switch k {
|
||||
case "Continent", "Country", "CountryCode", "Location", "SubDiv", "SubDivCode":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func equalEndpoints(a, b *netmapSDK.NodeInfo) bool {
|
||||
var esA, esB []string
|
||||
a.IterateNetworkEndpoints(func(e string) bool {
|
||||
esA = append(esA, e)
|
||||
return false
|
||||
})
|
||||
b.IterateNetworkEndpoints(func(e string) bool {
|
||||
esB = append(esB, e)
|
||||
return false
|
||||
})
|
||||
|
||||
if len(esA) != len(esB) {
|
||||
return false
|
||||
}
|
||||
for i := range esA {
|
||||
if esA[i] != esB[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func nodeState(ni *netmapSDK.NodeInfo) string {
|
||||
|
@ -314,7 +365,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
|
|||
for i := range nmNodes {
|
||||
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
||||
candidate = &nmNodes[i]
|
||||
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate)
|
||||
alreadyBootstraped = candidate.IsOnline() && needsUpdate(&c.cfgNodeInfo.localInfo, candidate)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -60,9 +60,16 @@ type (
|
|||
|
||||
// Params is a group of Subscriber constructor parameters.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
StartFromBlock uint32
|
||||
Client *client.Client
|
||||
Log *logger.Logger
|
||||
StartFromBlock uint32
|
||||
Client *client.Client
|
||||
NotificationsConfig NotificationsConfig
|
||||
}
|
||||
|
||||
NotificationsConfig struct {
|
||||
NotificationChannelSize uint32
|
||||
BlockChannelSize uint32
|
||||
NotaryRequestChannelSize uint32
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -169,7 +176,7 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
|
|||
blockChan: make(chan *block.Block),
|
||||
notaryChan: make(chan *result.NotaryRequestEvent),
|
||||
|
||||
current: newSubChannels(),
|
||||
current: newSubChannels(p.NotificationsConfig),
|
||||
|
||||
subscribedEvents: make(map[util.Uint160]bool),
|
||||
subscribedNotaryEvents: make(map[util.Uint160]bool),
|
||||
|
@ -259,7 +266,14 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (
|
|||
cliCh := s.client.NotificationChannel()
|
||||
|
||||
s.Lock()
|
||||
chs := newSubChannels()
|
||||
|
||||
param := NotificationsConfig{
|
||||
NotificationChannelSize: uint32(cap(s.current.NotifyChan)),
|
||||
BlockChannelSize: uint32(cap(s.current.BlockChan)),
|
||||
NotaryRequestChannelSize: uint32(cap(s.current.NotaryChan)),
|
||||
}
|
||||
|
||||
chs := newSubChannels(param)
|
||||
go func() {
|
||||
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
|
||||
}()
|
||||
|
@ -270,11 +284,11 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (
|
|||
return true, cliCh
|
||||
}
|
||||
|
||||
func newSubChannels() subChannels {
|
||||
func newSubChannels(param NotificationsConfig) subChannels {
|
||||
return subChannels{
|
||||
NotifyChan: make(chan *state.ContainedNotificationEvent),
|
||||
BlockChan: make(chan *block.Block),
|
||||
NotaryChan: make(chan *result.NotaryRequestEvent),
|
||||
NotifyChan: make(chan *state.ContainedNotificationEvent, param.NotificationChannelSize),
|
||||
BlockChan: make(chan *block.Block, param.BlockChannelSize),
|
||||
NotaryChan: make(chan *result.NotaryRequestEvent, param.NotaryRequestChannelSize),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue