forked from TrueCloudLab/frostfs-node
[#132] Add cleanup table in inner ring netmap processor
Cleanup table is a cache for inner ring node to look for netmap snapshot. It updates access time of bootstrapped nodes and will be used to send `updateState` txs to clean netmap from unresponsive nodes. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
c80bce9438
commit
dfc2e81717
2 changed files with 212 additions and 0 deletions
97
pkg/innerring/processors/netmap/cleanup_table.go
Normal file
97
pkg/innerring/processors/netmap/cleanup_table.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
cleanupTable struct {
|
||||
*sync.RWMutex
|
||||
enabled bool
|
||||
threshold uint64
|
||||
lastAccess map[string]epochStamp
|
||||
}
|
||||
|
||||
epochStamp struct {
|
||||
epoch uint64
|
||||
removeFlag bool
|
||||
}
|
||||
)
|
||||
|
||||
func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
|
||||
return cleanupTable{
|
||||
RWMutex: new(sync.RWMutex),
|
||||
enabled: enabled,
|
||||
threshold: threshold,
|
||||
lastAccess: make(map[string]epochStamp),
|
||||
}
|
||||
}
|
||||
|
||||
// Update cleanup table based on on-chain information about netmap.
|
||||
func (c *cleanupTable) update(snapshot []netmap.NodeInfo, now uint64) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// replacing map is less memory efficient but faster
|
||||
newMap := make(map[string]epochStamp, len(snapshot))
|
||||
|
||||
for i := range snapshot {
|
||||
keyString := hex.EncodeToString(snapshot[i].PublicKey)
|
||||
if access, ok := c.lastAccess[keyString]; ok {
|
||||
access.removeFlag = false // reset remove Flag on each Update
|
||||
newMap[keyString] = access
|
||||
} else {
|
||||
newMap[keyString] = epochStamp{epoch: now}
|
||||
}
|
||||
}
|
||||
|
||||
c.lastAccess = newMap
|
||||
}
|
||||
|
||||
func (c *cleanupTable) touch(keyString string, now uint64) bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
access, ok := c.lastAccess[keyString]
|
||||
result := !access.removeFlag && ok
|
||||
|
||||
access.removeFlag = false // reset remove flag on each touch
|
||||
if now > access.epoch {
|
||||
access.epoch = now
|
||||
}
|
||||
|
||||
c.lastAccess[keyString] = access
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *cleanupTable) flag(keyString string) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if access, ok := c.lastAccess[keyString]; ok {
|
||||
access.removeFlag = true
|
||||
c.lastAccess[keyString] = access
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cleanupTable) forEachRemoveCandidate(epoch uint64, f func(string) error) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
for keyString, access := range c.lastAccess {
|
||||
if epoch-access.epoch > c.threshold {
|
||||
access.removeFlag = true // set remove flag
|
||||
c.lastAccess[keyString] = access
|
||||
|
||||
if err := f(keyString); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
115
pkg/innerring/processors/netmap/cleanup_table_test.go
Normal file
115
pkg/innerring/processors/netmap/cleanup_table_test.go
Normal file
|
@ -0,0 +1,115 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCleanupTable(t *testing.T) {
|
||||
var infos = []netmap.NodeInfo{
|
||||
{PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(1).PublicKey)},
|
||||
{PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(2).PublicKey)},
|
||||
{PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(3).PublicKey)},
|
||||
}
|
||||
|
||||
var mapInfos = map[string]struct{}{
|
||||
hex.EncodeToString(infos[0].PublicKey): {},
|
||||
hex.EncodeToString(infos[1].PublicKey): {},
|
||||
hex.EncodeToString(infos[2].PublicKey): {},
|
||||
}
|
||||
|
||||
t.Run("update", func(t *testing.T) {
|
||||
c := newCleanupTable(true, 1)
|
||||
c.update(infos, 1)
|
||||
require.Len(t, c.lastAccess, len(infos))
|
||||
|
||||
for k, v := range c.lastAccess {
|
||||
require.EqualValues(t, 1, v.epoch)
|
||||
require.False(t, v.removeFlag)
|
||||
|
||||
_, ok := mapInfos[k]
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
t.Run("update with flagged", func(t *testing.T) {
|
||||
key := hex.EncodeToString(infos[0].PublicKey)
|
||||
c.flag(key)
|
||||
|
||||
c.update(infos, 2)
|
||||
require.EqualValues(t, 1, c.lastAccess[key].epoch)
|
||||
require.False(t, c.lastAccess[key].removeFlag)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("touch", func(t *testing.T) {
|
||||
c := newCleanupTable(true, 1)
|
||||
c.update(infos, 1)
|
||||
|
||||
key := hex.EncodeToString(infos[1].PublicKey)
|
||||
require.True(t, c.touch(key, 11))
|
||||
require.EqualValues(t, 11, c.lastAccess[key].epoch)
|
||||
|
||||
require.False(t, c.touch(key+"x", 12))
|
||||
require.EqualValues(t, 12, c.lastAccess[key+"x"].epoch)
|
||||
})
|
||||
|
||||
t.Run("flag", func(t *testing.T) {
|
||||
c := newCleanupTable(true, 1)
|
||||
c.update(infos, 1)
|
||||
|
||||
key := hex.EncodeToString(infos[1].PublicKey)
|
||||
c.flag(key)
|
||||
require.True(t, c.lastAccess[key].removeFlag)
|
||||
|
||||
require.False(t, c.touch(key, 2))
|
||||
require.False(t, c.lastAccess[key].removeFlag)
|
||||
})
|
||||
|
||||
t.Run("iterator", func(t *testing.T) {
|
||||
c := newCleanupTable(true, 2)
|
||||
c.update(infos, 1)
|
||||
|
||||
t.Run("no nodes to remove", func(t *testing.T) {
|
||||
cnt := 0
|
||||
require.NoError(t,
|
||||
c.forEachRemoveCandidate(2, func(_ string) error {
|
||||
cnt++
|
||||
return nil
|
||||
}))
|
||||
require.EqualValues(t, 0, cnt)
|
||||
})
|
||||
|
||||
t.Run("all nodes to remove", func(t *testing.T) {
|
||||
cnt := 0
|
||||
require.NoError(t,
|
||||
c.forEachRemoveCandidate(4, func(s string) error {
|
||||
cnt++
|
||||
_, ok := mapInfos[s]
|
||||
require.True(t, ok)
|
||||
return nil
|
||||
}))
|
||||
require.EqualValues(t, len(infos), cnt)
|
||||
})
|
||||
|
||||
t.Run("some nodes to remove", func(t *testing.T) {
|
||||
cnt := 0
|
||||
key := hex.EncodeToString(infos[1].PublicKey)
|
||||
|
||||
require.False(t, c.touch(key, 4)) // one node was updated
|
||||
|
||||
require.NoError(t,
|
||||
c.forEachRemoveCandidate(4, func(s string) error {
|
||||
cnt++
|
||||
require.NotEqual(t, s, key)
|
||||
return nil
|
||||
}))
|
||||
require.EqualValues(t, len(infos)-1, cnt)
|
||||
})
|
||||
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue