diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go
index 0fe56d2b0..38cee5837 100644
--- a/cmd/frostfs-node/cache.go
+++ b/cmd/frostfs-node/cache.go
@@ -1,20 +1,27 @@
package main
import (
+ "bytes"
+ "cmp"
"context"
+ "slices"
"sync"
+ "sync/atomic"
"time"
+ "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
+ "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
- lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
+ "github.com/hashicorp/golang-lru/v2/simplelru"
+ "go.uber.org/zap"
)
type netValueReader[K any, V any] func(ctx context.Context, cid K) (V, error)
@@ -110,55 +117,6 @@ func (c *ttlNetCache[K, V]) remove(key K) {
hit = c.cache.Remove(key)
}
-// entity that provides LRU cache interface.
-type lruNetCache struct {
- cache *lru.Cache[uint64, *netmapSDK.NetMap]
-
- netRdr netValueReader[uint64, *netmapSDK.NetMap]
-
- metrics cacheMetrics
-}
-
-// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
-func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap], metrics cacheMetrics) *lruNetCache {
- cache, err := lru.New[uint64, *netmapSDK.NetMap](sz)
- fatalOnErr(err)
-
- return &lruNetCache{
- cache: cache,
- netRdr: netRdr,
- metrics: metrics,
- }
-}
-
-// reads value by the key.
-//
-// updates the value from the network on cache miss.
-//
-// returned value should not be modified.
-func (c *lruNetCache) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
- hit := false
- startedAt := time.Now()
- defer func() {
- c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
- }()
-
- val, ok := c.cache.Get(key)
- if ok {
- hit = true
- return val, nil
- }
-
- val, err := c.netRdr(ctx, key)
- if err != nil {
- return nil, err
- }
-
- c.cache.Add(key, val)
-
- return val, nil
-}
-
// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
@@ -200,20 +158,236 @@ func (s ttlContainerStorage) DeletionInfo(ctx context.Context, cnr cid.ID) (*con
type lruNetmapSource struct {
netState netmap.State
- cache *lruNetCache
+ client rawSource
+ cache *simplelru.LRU[uint64, *atomic.Pointer[netmapSDK.NetMap]]
+ mtx sync.RWMutex
+ metrics cacheMetrics
+ log *logger.Logger
+ candidates atomic.Pointer[[]netmapSDK.NodeInfo]
}
-func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
+type rawSource interface {
+ GetCandidates(ctx context.Context) ([]netmapSDK.NodeInfo, error)
+ GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error)
+}
+
+func newCachedNetmapStorage(ctx context.Context, log *logger.Logger,
+ netState netmap.State, client rawSource, wg *sync.WaitGroup, d time.Duration,
+) netmap.Source {
const netmapCacheSize = 10
- lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
- return v.GetNetMapByEpoch(ctx, key)
- }, metrics.NewCacheMetrics("netmap"))
+ cache, err := simplelru.NewLRU[uint64, *atomic.Pointer[netmapSDK.NetMap]](netmapCacheSize, nil)
+ fatalOnErr(err)
- return &lruNetmapSource{
- netState: s,
- cache: lruNetmapCache,
+ src := &lruNetmapSource{
+ netState: netState,
+ client: client,
+ cache: cache,
+ log: log,
+ metrics: metrics.NewCacheMetrics("netmap"),
}
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ src.updateCandidates(ctx, d)
+ }()
+
+ return src
+}
+
+// updateCandidates routine to merge netmap in cache with candidates list.
+func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration) {
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-timer.C:
+ newCandidates, err := s.client.GetCandidates(ctx)
+ if err != nil {
+ s.log.Debug(ctx, logs.FailedToUpdateNetmapCandidates, zap.Error(err))
+ timer.Reset(d)
+ break
+ }
+ if len(newCandidates) == 0 {
+ s.candidates.Store(&newCandidates)
+ timer.Reset(d)
+ break
+ }
+ slices.SortFunc(newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
+ return cmp.Compare(n1.Hash(), n2.Hash())
+ })
+
+ // Check once state changed
+ v := s.candidates.Load()
+ if v == nil {
+ s.candidates.Store(&newCandidates)
+ s.mergeCacheWithCandidates(newCandidates)
+ timer.Reset(d)
+ break
+ }
+ ret := slices.CompareFunc(*v, newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
+ if !bytes.Equal(n1.PublicKey(), n2.PublicKey()) ||
+ uint32(n1.Status()) != uint32(n2.Status()) ||
+ slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 {
+ return 1
+ }
+ var ne1 []string
+ n1.IterateNetworkEndpoints(func(s string) bool {
+ ne1 = append(ne1, s)
+ return false
+ })
+ var ne2 []string
+ n2.IterateNetworkEndpoints(func(s string) bool {
+ ne2 = append(ne2, s)
+ return false
+ })
+ return slices.Compare(ne1, ne2)
+ })
+ if ret != 0 {
+ s.candidates.Store(&newCandidates)
+ s.mergeCacheWithCandidates(newCandidates)
+ }
+ timer.Reset(d)
+ }
+ }
+}
+
+func (s *lruNetmapSource) mergeCacheWithCandidates(candidates []netmapSDK.NodeInfo) {
+ s.mtx.Lock()
+ tmp := s.cache.Values()
+ s.mtx.Unlock()
+ for _, pointer := range tmp {
+ nm := pointer.Load()
+ updates := getNetMapNodesToUpdate(nm, candidates)
+ if len(updates) > 0 {
+ nm = nm.Clone()
+ mergeNetmapWithCandidates(updates, nm)
+ pointer.Store(nm)
+ }
+ }
+}
+
+// reads value by the key.
+//
+// updates the value from the network on cache miss.
+//
+// returned value should not be modified.
+func (s *lruNetmapSource) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
+ hit := false
+ startedAt := time.Now()
+ defer func() {
+ s.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
+ }()
+
+ s.mtx.RLock()
+ val, ok := s.cache.Get(key)
+ s.mtx.RUnlock()
+ if ok {
+ hit = true
+ return val.Load(), nil
+ }
+
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ val, ok = s.cache.Get(key)
+ if ok {
+ hit = true
+ return val.Load(), nil
+ }
+
+ nm, err := s.client.GetNetMapByEpoch(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+ v := s.candidates.Load()
+ if v != nil {
+ updates := getNetMapNodesToUpdate(nm, *v)
+ if len(updates) > 0 {
+ mergeNetmapWithCandidates(updates, nm)
+ }
+ }
+
+ p := atomic.Pointer[netmapSDK.NetMap]{}
+ p.Store(nm)
+ s.cache.Add(key, &p)
+
+ return nm, nil
+}
+
+// mergeNetmapWithCandidates updates nodes state in the provided netmap with state in the list of candidates.
+func mergeNetmapWithCandidates(updates []nodeToUpdate, nm *netmapSDK.NetMap) {
+ for _, v := range updates {
+ if v.status != netmapSDK.UnspecifiedState {
+ nm.Nodes()[v.netmapIndex].SetStatus(v.status)
+ }
+ if v.externalAddresses != nil {
+ nm.Nodes()[v.netmapIndex].SetExternalAddresses(v.externalAddresses...)
+ }
+ if v.endpoints != nil {
+ nm.Nodes()[v.netmapIndex].SetNetworkEndpoints(v.endpoints...)
+ }
+ }
+}
+
+type nodeToUpdate struct {
+ netmapIndex int
+ status netmapSDK.NodeState
+ externalAddresses []string
+ endpoints []string
+}
+
+// getNetMapNodesToUpdate checks for the changes between provided netmap and the list of candidates.
+func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInfo) []nodeToUpdate {
+ var res []nodeToUpdate
+ for i := range nm.Nodes() {
+ for _, cnd := range candidates {
+ if bytes.Equal(nm.Nodes()[i].PublicKey(), cnd.PublicKey()) {
+ var tmp nodeToUpdate
+ var update bool
+
+ if cnd.Status() != nm.Nodes()[i].Status() &&
+ (cnd.Status() == netmapSDK.Online || cnd.Status() == netmapSDK.Maintenance) {
+ update = true
+ tmp.status = cnd.Status()
+ }
+
+ externalAddresses := cnd.ExternalAddresses()
+ if externalAddresses != nil &&
+ slices.Compare(externalAddresses, nm.Nodes()[i].ExternalAddresses()) != 0 {
+ update = true
+ tmp.externalAddresses = externalAddresses
+ }
+
+ nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints())
+ nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool {
+ nodeEndpoints = append(nodeEndpoints, s)
+ return false
+ })
+ candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints())
+ cnd.IterateNetworkEndpoints(func(s string) bool {
+ candidateEndpoints = append(candidateEndpoints, s)
+ return false
+ })
+ if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 {
+ update = true
+ tmp.endpoints = candidateEndpoints
+ }
+
+ if update {
+ tmp.netmapIndex = i
+ res = append(res, tmp)
+ }
+
+ break
+ }
+ }
+ }
+ return res
}
func (s *lruNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
@@ -225,7 +399,7 @@ func (s *lruNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*
}
func (s *lruNetmapSource) getNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
- val, err := s.cache.get(ctx, epoch)
+ val, err := s.get(ctx, epoch)
if err != nil {
return nil, err
}
diff --git a/cmd/frostfs-node/cache_test.go b/cmd/frostfs-node/cache_test.go
index b1601aa67..24286826f 100644
--- a/cmd/frostfs-node/cache_test.go
+++ b/cmd/frostfs-node/cache_test.go
@@ -3,9 +3,11 @@ package main
import (
"context"
"errors"
+ "sync"
"testing"
"time"
+ netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/stretchr/testify/require"
)
@@ -59,3 +61,75 @@ func testNetValueReader(_ context.Context, key string) (time.Time, error) {
type noopCacheMetricts struct{}
func (m *noopCacheMetricts) AddMethodDuration(method string, d time.Duration, hit bool) {}
+
+type rawSrc struct{}
+
+func (r *rawSrc) GetCandidates(_ context.Context) ([]netmapSDK.NodeInfo, error) {
+ node0 := netmapSDK.NodeInfo{}
+ node0.SetPublicKey([]byte{byte(1)})
+ node0.SetStatus(netmapSDK.Online)
+ node0.SetExternalAddresses("1", "0")
+ node0.SetNetworkEndpoints("1", "0")
+
+ node1 := netmapSDK.NodeInfo{}
+ node1.SetPublicKey([]byte{byte(1)})
+ node1.SetStatus(netmapSDK.Online)
+ node1.SetExternalAddresses("1", "0")
+ node1.SetNetworkEndpoints("1", "0")
+
+ return []netmapSDK.NodeInfo{node0, node1}, nil
+}
+
+func (r *rawSrc) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
+ nm := netmapSDK.NetMap{}
+ nm.SetEpoch(1)
+
+ node0 := netmapSDK.NodeInfo{}
+ node0.SetPublicKey([]byte{byte(1)})
+ node0.SetStatus(netmapSDK.Maintenance)
+ node0.SetExternalAddresses("0")
+ node0.SetNetworkEndpoints("0")
+
+ node1 := netmapSDK.NodeInfo{}
+ node1.SetPublicKey([]byte{byte(1)})
+ node1.SetStatus(netmapSDK.Maintenance)
+ node1.SetExternalAddresses("0")
+ node1.SetNetworkEndpoints("0")
+
+ nm.SetNodes([]netmapSDK.NodeInfo{node0, node1})
+
+ return &nm, nil
+}
+
+type st struct{}
+
+func (s *st) CurrentEpoch() uint64 {
+ return 1
+}
+
+func TestNetmapStorage(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ wg := sync.WaitGroup{}
+ cache := newCachedNetmapStorage(ctx, nil, &st{}, &rawSrc{}, &wg, time.Millisecond*50)
+
+ nm, err := cache.GetNetMapByEpoch(ctx, 1)
+ require.NoError(t, err)
+ require.True(t, nm.Nodes()[0].Status() == netmapSDK.Maintenance)
+ require.True(t, len(nm.Nodes()[0].ExternalAddresses()) == 1)
+ require.True(t, nm.Nodes()[0].NumberOfNetworkEndpoints() == 1)
+
+ require.Eventually(t, func() bool {
+ nm, err := cache.GetNetMapByEpoch(ctx, 1)
+ require.NoError(t, err)
+ for _, node := range nm.Nodes() {
+ if !(node.Status() == netmapSDK.Online && len(node.ExternalAddresses()) == 2 &&
+ node.NumberOfNetworkEndpoints() == 2) {
+ return false
+ }
+ }
+ return true
+ }, time.Second*5, time.Millisecond*10)
+
+ cancel()
+ wg.Wait()
+}
diff --git a/cmd/frostfs-node/config/morph/config.go b/cmd/frostfs-node/config/morph/config.go
index d089870ea..a9f774d18 100644
--- a/cmd/frostfs-node/config/morph/config.go
+++ b/cmd/frostfs-node/config/morph/config.go
@@ -33,6 +33,9 @@ const (
// ContainerCacheSizeDefault represents the default size for the container cache.
ContainerCacheSizeDefault = 100
+
+ // PollCandidatesTimeoutDefault is a default poll timeout for netmap candidates.
+ PollCandidatesTimeoutDefault = 20 * time.Second
)
var errNoMorphEndpoints = errors.New("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")
@@ -154,3 +157,17 @@ func FrostfsIDCacheSize(c *config.Config) uint32 {
}
return config.Uint32Safe(c.Sub(subsection), "frostfsid_cache_size")
}
+
+// NetmapCandidatesPollInterval returns the value of "netmap.candidates.poll_interval" config parameter
+// from "morph" section.
+//
+// Returns PollCandidatesTimeoutDefault if the value is not positive duration.
+func NetmapCandidatesPollInterval(c *config.Config) time.Duration {
+ v := config.DurationSafe(c.Sub(subsection).
+ Sub("netmap").Sub("candidates"), "poll_interval")
+ if v > 0 {
+ return v
+ }
+
+ return PollCandidatesTimeoutDefault
+}
diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go
index 657e22389..d3c0f7b81 100644
--- a/cmd/frostfs-node/morph.go
+++ b/cmd/frostfs-node/morph.go
@@ -60,10 +60,11 @@ func (c *cfg) initMorphComponents(ctx context.Context) {
}
if c.cfgMorph.cacheTTL < 0 {
- netmapSource = wrap
+ netmapSource = newRawNetmapStorage(wrap)
} else {
// use RPC node as source of netmap (with caching)
- netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
+ netmapSource = newCachedNetmapStorage(ctx, c.log, c.cfgNetmap.state, wrap, &c.wg,
+ morphconfig.NetmapCandidatesPollInterval(c.appCfg))
}
c.netMapSource = netmapSource
diff --git a/cmd/frostfs-node/netmap_source.go b/cmd/frostfs-node/netmap_source.go
new file mode 100644
index 000000000..e6be9cdf5
--- /dev/null
+++ b/cmd/frostfs-node/netmap_source.go
@@ -0,0 +1,55 @@
+package main
+
+import (
+ "context"
+
+ "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
+ netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
+ netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
+)
+
+type rawNetmapSource struct {
+ client *netmapClient.Client
+}
+
+func newRawNetmapStorage(client *netmapClient.Client) netmap.Source {
+ return &rawNetmapSource{
+ client: client,
+ }
+}
+
+func (s *rawNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
+ nm, err := s.client.GetNetMap(ctx, diff)
+ if err != nil {
+ return nil, err
+ }
+ candidates, err := s.client.GetCandidates(ctx)
+ if err != nil {
+ return nil, err
+ }
+ updates := getNetMapNodesToUpdate(nm, candidates)
+ if len(updates) > 0 {
+ mergeNetmapWithCandidates(updates, nm)
+ }
+ return nm, nil
+}
+
+func (s *rawNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
+ nm, err := s.client.GetNetMapByEpoch(ctx, epoch)
+ if err != nil {
+ return nil, err
+ }
+ candidates, err := s.client.GetCandidates(ctx)
+ if err != nil {
+ return nil, err
+ }
+ updates := getNetMapNodesToUpdate(nm, candidates)
+ if len(updates) > 0 {
+ mergeNetmapWithCandidates(updates, nm)
+ }
+ return nm, nil
+}
+
+func (s *rawNetmapSource) Epoch(ctx context.Context) (uint64, error) {
+ return s.client.Epoch(ctx)
+}
diff --git a/config/example/node.yaml b/config/example/node.yaml
index a448ba7ce..0b6c7b12c 100644
--- a/config/example/node.yaml
+++ b/config/example/node.yaml
@@ -95,6 +95,9 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
ape_chain_cache_size: 100000
+ netmap:
+ candidates:
+ poll_interval: 20s
apiclient:
dial_timeout: 15s # timeout for FrostFS API client connection
diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md
index b5c8aadfe..5fe011ece 100644
--- a/docs/storage-node-configuration.md
+++ b/docs/storage-node-configuration.md
@@ -148,15 +148,19 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
switch_interval: 2m
+ netmap:
+ candidates:
+ poll_interval: 20s
```
-| Parameter | Type | Default value | Description |
-| ---------------------- | --------------------------------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. |
-| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).
Negative value disables caching.
Cached entities: containers, container lists, eACL tables. |
-| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. |
-| `switch_interval` | `duration` | `2m` | Time interval between the attempts to connect to the highest priority RPC node if the connection is not established yet. |
-| `ape_chain_cache_size` | `int` | `10000` | Size of the morph cache for APE chains. |
+| Parameter | Type | Default value | Description |
+|-----------------------------------|-----------------------------------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. |
+| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).
Negative value disables caching.
Cached entities: containers, container lists, eACL tables. |
+| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. |
+| `switch_interval` | `duration` | `2m` | Time interval between the attempts to connect to the highest priority RPC node if the connection is not established yet. |
+| `ape_chain_cache_size` | `int` | `10000` | Size of the morph cache for APE chains. |
+| `netmap.candidates.poll_interval` | `duration` | `20s` | Timeout to set up frequency of merge candidates to netmap with netmap in local cache. |
## `rpc_endpoint` subsection
| Parameter | Type | Default value | Description |
diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index 5b42b25ba..3a3ceb150 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -515,4 +515,5 @@ const (
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
+ FailedToUpdateNetmapCandidates = "update netmap candidates failed"
)