From dfc2e81717b7853ea5c8567d81368d71472fd5fa Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 29 Oct 2020 18:53:45 +0300 Subject: [PATCH] [#132] Add cleanup table in inner ring netmap processor Cleanup table is a cache for inner ring node to look for netmap snapshot. It updates access time of bootstrapped nodes and will be used to send `updateState` txs to clean netmap from unresponsive nodes. Signed-off-by: Alex Vanin --- .../processors/netmap/cleanup_table.go | 97 +++++++++++++++ .../processors/netmap/cleanup_table_test.go | 115 ++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 pkg/innerring/processors/netmap/cleanup_table.go create mode 100644 pkg/innerring/processors/netmap/cleanup_table_test.go diff --git a/pkg/innerring/processors/netmap/cleanup_table.go b/pkg/innerring/processors/netmap/cleanup_table.go new file mode 100644 index 000000000..203394654 --- /dev/null +++ b/pkg/innerring/processors/netmap/cleanup_table.go @@ -0,0 +1,97 @@ +package netmap + +import ( + "encoding/hex" + "sync" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" +) + +type ( + cleanupTable struct { + *sync.RWMutex + enabled bool + threshold uint64 + lastAccess map[string]epochStamp + } + + epochStamp struct { + epoch uint64 + removeFlag bool + } +) + +func newCleanupTable(enabled bool, threshold uint64) cleanupTable { + return cleanupTable{ + RWMutex: new(sync.RWMutex), + enabled: enabled, + threshold: threshold, + lastAccess: make(map[string]epochStamp), + } +} + +// Update cleanup table based on on-chain information about netmap. +func (c *cleanupTable) update(snapshot []netmap.NodeInfo, now uint64) { + c.Lock() + defer c.Unlock() + + // replacing map is less memory efficient but faster + newMap := make(map[string]epochStamp, len(snapshot)) + + for i := range snapshot { + keyString := hex.EncodeToString(snapshot[i].PublicKey) + if access, ok := c.lastAccess[keyString]; ok { + access.removeFlag = false // reset remove Flag on each Update + newMap[keyString] = access + } else { + newMap[keyString] = epochStamp{epoch: now} + } + } + + c.lastAccess = newMap +} + +func (c *cleanupTable) touch(keyString string, now uint64) bool { + c.Lock() + defer c.Unlock() + + access, ok := c.lastAccess[keyString] + result := !access.removeFlag && ok + + access.removeFlag = false // reset remove flag on each touch + if now > access.epoch { + access.epoch = now + } + + c.lastAccess[keyString] = access + + return result +} + +func (c *cleanupTable) flag(keyString string) { + c.Lock() + defer c.Unlock() + + if access, ok := c.lastAccess[keyString]; ok { + access.removeFlag = true + c.lastAccess[keyString] = access + } +} + +func (c *cleanupTable) forEachRemoveCandidate(epoch uint64, f func(string) error) error { + c.Lock() + defer c.Unlock() + + for keyString, access := range c.lastAccess { + if epoch-access.epoch > c.threshold { + access.removeFlag = true // set remove flag + c.lastAccess[keyString] = access + + if err := f(keyString); err != nil { + return err + } + } + } + + return nil +} diff --git a/pkg/innerring/processors/netmap/cleanup_table_test.go b/pkg/innerring/processors/netmap/cleanup_table_test.go new file mode 100644 index 000000000..7a9c3dea3 --- /dev/null +++ b/pkg/innerring/processors/netmap/cleanup_table_test.go @@ -0,0 +1,115 @@ +package netmap + +import ( + "encoding/hex" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" + crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/nspcc-dev/neofs-node/pkg/util/test" + "github.com/stretchr/testify/require" +) + +func TestCleanupTable(t *testing.T) { + var infos = []netmap.NodeInfo{ + {PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(1).PublicKey)}, + {PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(2).PublicKey)}, + {PublicKey: crypto.MarshalPublicKey(&test.DecodeKey(3).PublicKey)}, + } + + var mapInfos = map[string]struct{}{ + hex.EncodeToString(infos[0].PublicKey): {}, + hex.EncodeToString(infos[1].PublicKey): {}, + hex.EncodeToString(infos[2].PublicKey): {}, + } + + t.Run("update", func(t *testing.T) { + c := newCleanupTable(true, 1) + c.update(infos, 1) + require.Len(t, c.lastAccess, len(infos)) + + for k, v := range c.lastAccess { + require.EqualValues(t, 1, v.epoch) + require.False(t, v.removeFlag) + + _, ok := mapInfos[k] + require.True(t, ok) + } + + t.Run("update with flagged", func(t *testing.T) { + key := hex.EncodeToString(infos[0].PublicKey) + c.flag(key) + + c.update(infos, 2) + require.EqualValues(t, 1, c.lastAccess[key].epoch) + require.False(t, c.lastAccess[key].removeFlag) + }) + }) + + t.Run("touch", func(t *testing.T) { + c := newCleanupTable(true, 1) + c.update(infos, 1) + + key := hex.EncodeToString(infos[1].PublicKey) + require.True(t, c.touch(key, 11)) + require.EqualValues(t, 11, c.lastAccess[key].epoch) + + require.False(t, c.touch(key+"x", 12)) + require.EqualValues(t, 12, c.lastAccess[key+"x"].epoch) + }) + + t.Run("flag", func(t *testing.T) { + c := newCleanupTable(true, 1) + c.update(infos, 1) + + key := hex.EncodeToString(infos[1].PublicKey) + c.flag(key) + require.True(t, c.lastAccess[key].removeFlag) + + require.False(t, c.touch(key, 2)) + require.False(t, c.lastAccess[key].removeFlag) + }) + + t.Run("iterator", func(t *testing.T) { + c := newCleanupTable(true, 2) + c.update(infos, 1) + + t.Run("no nodes to remove", func(t *testing.T) { + cnt := 0 + require.NoError(t, + c.forEachRemoveCandidate(2, func(_ string) error { + cnt++ + return nil + })) + require.EqualValues(t, 0, cnt) + }) + + t.Run("all nodes to remove", func(t *testing.T) { + cnt := 0 + require.NoError(t, + c.forEachRemoveCandidate(4, func(s string) error { + cnt++ + _, ok := mapInfos[s] + require.True(t, ok) + return nil + })) + require.EqualValues(t, len(infos), cnt) + }) + + t.Run("some nodes to remove", func(t *testing.T) { + cnt := 0 + key := hex.EncodeToString(infos[1].PublicKey) + + require.False(t, c.touch(key, 4)) // one node was updated + + require.NoError(t, + c.forEachRemoveCandidate(4, func(s string) error { + cnt++ + require.NotEqual(t, s, key) + return nil + })) + require.EqualValues(t, len(infos)-1, cnt) + }) + + }) +}