package main import ( "bytes" "cmp" "context" "slices" "sync" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/hashicorp/golang-lru/v2/simplelru" "go.uber.org/zap" ) type netValueReader[K any, V any] func(ctx context.Context, cid K) (V, error) type valueWithError[V any] struct { v V // cached error in order to not repeat failed request for some time e error } // entity that provides TTL cache interface. type ttlNetCache[K comparable, V any] struct { cache *expirable.LRU[K, *valueWithError[V]] netRdr netValueReader[K, V] keyLocker *utilSync.KeyLocker[K] metrics cacheMetrics } // complicates netValueReader with TTL caching mechanism. func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V], metrics cacheMetrics) *ttlNetCache[K, V] { cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl) return &ttlNetCache[K, V]{ cache: cache, netRdr: netRdr, metrics: metrics, keyLocker: utilSync.NewKeyLocker[K](), } } // reads value by the key. // // updates the value from the network on cache miss or by TTL. // // returned value should not be modified. func (c *ttlNetCache[K, V]) get(ctx context.Context, key K) (V, error) { hit := false startedAt := time.Now() defer func() { c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit) }() val, ok := c.cache.Peek(key) if ok { hit = true return val.v, val.e } c.keyLocker.Lock(key) defer c.keyLocker.Unlock(key) val, ok = c.cache.Peek(key) if ok { hit = true return val.v, val.e } v, err := c.netRdr(ctx, key) c.cache.Add(key, &valueWithError[V]{ v: v, e: err, }) return v, err } func (c *ttlNetCache[K, V]) set(k K, v V, e error) { startedAt := time.Now() defer func() { c.metrics.AddMethodDuration("Set", time.Since(startedAt), false) }() c.keyLocker.Lock(k) defer c.keyLocker.Unlock(k) c.cache.Add(k, &valueWithError[V]{ v: v, e: e, }) } func (c *ttlNetCache[K, V]) remove(key K) { hit := false startedAt := time.Now() defer func() { c.metrics.AddMethodDuration("Remove", time.Since(startedAt), hit) }() c.keyLocker.Lock(key) defer c.keyLocker.Unlock(key) hit = c.cache.Remove(key) } // wrapper over TTL cache of values read from the network // that implements container storage. type ttlContainerStorage struct { containerCache *ttlNetCache[cid.ID, *container.Container] delInfoCache *ttlNetCache[cid.ID, *container.DelInfo] } func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage { lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.Container, error) { return v.Get(ctx, id) }, metrics.NewCacheMetrics("container")) lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.DelInfo, error) { return v.DeletionInfo(ctx, id) }, metrics.NewCacheMetrics("container_deletion_info")) return ttlContainerStorage{ containerCache: lruCnrCache, delInfoCache: lruDelInfoCache, } } func (s ttlContainerStorage) handleRemoval(cnr cid.ID) { s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound)) // The removal invalidates possibly stored error response. s.delInfoCache.remove(cnr) } // Get returns container value from the cache. If value is missing in the cache // or expired, then it returns value from side chain and updates the cache. func (s ttlContainerStorage) Get(ctx context.Context, cnr cid.ID) (*container.Container, error) { return s.containerCache.get(ctx, cnr) } func (s ttlContainerStorage) DeletionInfo(ctx context.Context, cnr cid.ID) (*container.DelInfo, error) { return s.delInfoCache.get(ctx, cnr) } type lruNetmapSource struct { netState netmap.State client rawSource cache *simplelru.LRU[uint64, *atomic.Pointer[netmapSDK.NetMap]] mtx sync.RWMutex metrics cacheMetrics log *logger.Logger candidates atomic.Pointer[[]netmapSDK.NodeInfo] } type rawSource interface { GetCandidates(ctx context.Context) ([]netmapSDK.NodeInfo, error) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) } func newCachedNetmapStorage(ctx context.Context, log *logger.Logger, netState netmap.State, client rawSource, wg *sync.WaitGroup, d time.Duration, ) netmap.Source { const netmapCacheSize = 10 cache, err := simplelru.NewLRU[uint64, *atomic.Pointer[netmapSDK.NetMap]](netmapCacheSize, nil) fatalOnErr(err) src := &lruNetmapSource{ netState: netState, client: client, cache: cache, log: log, metrics: metrics.NewCacheMetrics("netmap"), } wg.Add(1) go func() { defer wg.Done() src.updateCandidates(ctx, d) }() return src } // updateCandidates routine to merge netmap in cache with candidates list. func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration) { timer := time.NewTimer(d) defer timer.Stop() for { select { case <-ctx.Done(): return case <-timer.C: newCandidates, err := s.client.GetCandidates(ctx) if err != nil { s.log.Debug(ctx, logs.FailedToUpdateNetmapCandidates, zap.Error(err)) timer.Reset(d) break } if len(newCandidates) == 0 { s.candidates.Store(&newCandidates) timer.Reset(d) break } slices.SortFunc(newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int { return cmp.Compare(n1.Hash(), n2.Hash()) }) // Check once state changed v := s.candidates.Load() if v == nil { s.candidates.Store(&newCandidates) s.mergeCacheWithCandidates(newCandidates) timer.Reset(d) break } ret := slices.CompareFunc(*v, newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int { if !bytes.Equal(n1.PublicKey(), n2.PublicKey()) || uint32(n1.Status()) != uint32(n2.Status()) || slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 { return 1 } var ne1 []string n1.IterateNetworkEndpoints(func(s string) bool { ne1 = append(ne1, s) return false }) var ne2 []string n2.IterateNetworkEndpoints(func(s string) bool { ne2 = append(ne2, s) return false }) return slices.Compare(ne1, ne2) }) if ret != 0 { s.candidates.Store(&newCandidates) s.mergeCacheWithCandidates(newCandidates) } timer.Reset(d) } } } func (s *lruNetmapSource) mergeCacheWithCandidates(candidates []netmapSDK.NodeInfo) { s.mtx.Lock() tmp := s.cache.Values() s.mtx.Unlock() for _, pointer := range tmp { nm := pointer.Load() updates := getNetMapNodesToUpdate(nm, candidates) if len(updates) > 0 { nm = nm.Clone() mergeNetmapWithCandidates(updates, nm) pointer.Store(nm) } } } // reads value by the key. // // updates the value from the network on cache miss. // // returned value should not be modified. func (s *lruNetmapSource) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) { hit := false startedAt := time.Now() defer func() { s.metrics.AddMethodDuration("Get", time.Since(startedAt), hit) }() s.mtx.RLock() val, ok := s.cache.Get(key) s.mtx.RUnlock() if ok { hit = true return val.Load(), nil } s.mtx.Lock() defer s.mtx.Unlock() val, ok = s.cache.Get(key) if ok { hit = true return val.Load(), nil } nm, err := s.client.GetNetMapByEpoch(ctx, key) if err != nil { return nil, err } v := s.candidates.Load() if v != nil { updates := getNetMapNodesToUpdate(nm, *v) if len(updates) > 0 { mergeNetmapWithCandidates(updates, nm) } } p := atomic.Pointer[netmapSDK.NetMap]{} p.Store(nm) s.cache.Add(key, &p) return nm, nil } // mergeNetmapWithCandidates updates nodes state in the provided netmap with state in the list of candidates. func mergeNetmapWithCandidates(updates []nodeToUpdate, nm *netmapSDK.NetMap) { for _, v := range updates { if v.status != netmapSDK.UnspecifiedState { nm.Nodes()[v.netmapIndex].SetStatus(v.status) } if v.externalAddresses != nil { nm.Nodes()[v.netmapIndex].SetExternalAddresses(v.externalAddresses...) } if v.endpoints != nil { nm.Nodes()[v.netmapIndex].SetNetworkEndpoints(v.endpoints...) } } } type nodeToUpdate struct { netmapIndex int status netmapSDK.NodeState externalAddresses []string endpoints []string } // getNetMapNodesToUpdate checks for the changes between provided netmap and the list of candidates. func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInfo) []nodeToUpdate { var res []nodeToUpdate for i := range nm.Nodes() { for _, cnd := range candidates { if bytes.Equal(nm.Nodes()[i].PublicKey(), cnd.PublicKey()) { var tmp nodeToUpdate var update bool if cnd.Status() != nm.Nodes()[i].Status() && (cnd.Status() == netmapSDK.Online || cnd.Status() == netmapSDK.Maintenance) { update = true tmp.status = cnd.Status() } externalAddresses := cnd.ExternalAddresses() if externalAddresses != nil && slices.Compare(externalAddresses, nm.Nodes()[i].ExternalAddresses()) != 0 { update = true tmp.externalAddresses = externalAddresses } nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints()) nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool { nodeEndpoints = append(nodeEndpoints, s) return false }) candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints()) cnd.IterateNetworkEndpoints(func(s string) bool { candidateEndpoints = append(candidateEndpoints, s) return false }) if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 { update = true tmp.endpoints = candidateEndpoints } if update { tmp.netmapIndex = i res = append(res, tmp) } break } } } return res } func (s *lruNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) { return s.getNetMapByEpoch(ctx, s.netState.CurrentEpoch()-diff) } func (s *lruNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) { return s.getNetMapByEpoch(ctx, epoch) } func (s *lruNetmapSource) getNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) { val, err := s.get(ctx, epoch) if err != nil { return nil, err } return val, nil } func (s *lruNetmapSource) Epoch(_ context.Context) (uint64, error) { return s.netState.CurrentEpoch(), nil } type cachedIRFetcher struct { *ttlNetCache[struct{}, [][]byte] } func newCachedIRFetcher(f interface { InnerRingKeys(ctx context.Context) ([][]byte, error) }, ) cachedIRFetcher { const ( irFetcherCacheSize = 1 // we intend to store only one value // Without the cache in the testnet we can see several hundred simultaneous // requests (frostfs-node #1278), so limiting the request rate solves the issue. // // Exact request rate doesn't really matter because Inner Ring list update // happens extremely rare, but there is no side chain events for that as // for now (frostfs-contract v0.15.0 notary disabled env) to monitor it. irFetcherCacheTTL = 30 * time.Second ) irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL, func(ctx context.Context, _ struct{}) ([][]byte, error) { return f.InnerRingKeys(ctx) }, metrics.NewCacheMetrics("ir_keys"), ) return cachedIRFetcher{irFetcherCache} } // InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in // the cache or expired, then it returns keys from side chain and updates // the cache. func (f cachedIRFetcher) InnerRingKeys(ctx context.Context) ([][]byte, error) { val, err := f.get(ctx, struct{}{}) if err != nil { return nil, err } return val, nil } type ttlMaxObjectSizeCache struct { mtx sync.RWMutex lastUpdated time.Time lastSize uint64 src objectwriter.MaxSizeSource metrics cacheMetrics } func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource { return &ttlMaxObjectSizeCache{ src: src, metrics: metrics.NewCacheMetrics("max_object_size"), } } func (c *ttlMaxObjectSizeCache) MaxObjectSize(ctx context.Context) uint64 { const ttl = time.Second * 30 hit := false startedAt := time.Now() defer func() { c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit) }() c.mtx.RLock() prevUpdated := c.lastUpdated size := c.lastSize c.mtx.RUnlock() if time.Since(prevUpdated) < ttl { hit = true return size } c.mtx.Lock() size = c.lastSize if !c.lastUpdated.After(prevUpdated) { size = c.src.MaxObjectSize(ctx) c.lastSize = size c.lastUpdated = time.Now() } c.mtx.Unlock() return size } type cacheMetrics interface { AddMethodDuration(method string, d time.Duration, hit bool) }