package main import ( "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" lru "github.com/hashicorp/golang-lru/v2" "github.com/hashicorp/golang-lru/v2/expirable" ) type netValueReader[K any, V any] func(K) (V, error) type valueWithError[V any] struct { v V // cached error in order to not repeat failed request for some time e error } // entity that provides TTL cache interface. type ttlNetCache[K comparable, V any] struct { cache *expirable.LRU[K, *valueWithError[V]] netRdr netValueReader[K, V] keyLocker *utilSync.KeyLocker[K] } // complicates netValueReader with TTL caching mechanism. func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V]) *ttlNetCache[K, V] { cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl) return &ttlNetCache[K, V]{ cache: cache, netRdr: netRdr, keyLocker: utilSync.NewKeyLocker[K](), } } // reads value by the key. // // updates the value from the network on cache miss or by TTL. // // returned value should not be modified. func (c *ttlNetCache[K, V]) get(key K) (V, error) { val, ok := c.cache.Peek(key) if ok { return val.v, val.e } c.keyLocker.Lock(key) defer c.keyLocker.Unlock(key) val, ok = c.cache.Peek(key) if ok { return val.v, val.e } v, err := c.netRdr(key) c.cache.Add(key, &valueWithError[V]{ v: v, e: err, }) return v, err } func (c *ttlNetCache[K, V]) set(k K, v V, e error) { c.keyLocker.Lock(k) defer c.keyLocker.Unlock(k) c.cache.Add(k, &valueWithError[V]{ v: v, e: e, }) } func (c *ttlNetCache[K, V]) remove(key K) { c.keyLocker.Lock(key) defer c.keyLocker.Unlock(key) c.cache.Remove(key) } // entity that provides LRU cache interface. type lruNetCache struct { cache *lru.Cache[uint64, *netmapSDK.NetMap] netRdr netValueReader[uint64, *netmapSDK.NetMap] } // newNetworkLRUCache returns wrapper over netValueReader with LRU cache. func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap]) *lruNetCache { cache, err := lru.New[uint64, *netmapSDK.NetMap](sz) fatalOnErr(err) return &lruNetCache{ cache: cache, netRdr: netRdr, } } // reads value by the key. // // updates the value from the network on cache miss. // // returned value should not be modified. func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) { val, ok := c.cache.Get(key) if ok { return val, nil } val, err := c.netRdr(key) if err != nil { return nil, err } c.cache.Add(key, val) return val, nil } // wrapper over TTL cache of values read from the network // that implements container storage. type ttlContainerStorage struct { containerCache *ttlNetCache[cid.ID, *container.Container] delInfoCache *ttlNetCache[cid.ID, *container.DelInfo] } func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage { const containerCacheSize = 100 lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) { return v.Get(id) }) lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) { return v.DeletionInfo(id) }) return ttlContainerStorage{ containerCache: lruCnrCache, delInfoCache: lruDelInfoCache, } } func (s ttlContainerStorage) handleRemoval(cnr cid.ID) { s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound)) // The removal invalidates possibly stored error response. s.delInfoCache.remove(cnr) } // Get returns container value from the cache. If value is missing in the cache // or expired, then it returns value from side chain and updates the cache. func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) { return s.containerCache.get(cnr) } func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) { return s.delInfoCache.get(cnr) } type ttlEACLStorage struct { *ttlNetCache[cid.ID, *container.EACL] } func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStorage { const eaclCacheSize = 100 lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) { return v.GetEACL(id) }) return ttlEACLStorage{lruCnrCache} } // GetEACL returns eACL value from the cache. If value is missing in the cache // or expired, then it returns value from side chain and updates cache. func (s ttlEACLStorage) GetEACL(cnr cid.ID) (*container.EACL, error) { return s.get(cnr) } // InvalidateEACL removes cached eACL value. func (s ttlEACLStorage) InvalidateEACL(cnr cid.ID) { s.remove(cnr) } type lruNetmapSource struct { netState netmap.State cache *lruNetCache } func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source { const netmapCacheSize = 10 lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(key uint64) (*netmapSDK.NetMap, error) { return v.GetNetMapByEpoch(key) }) return &lruNetmapSource{ netState: s, cache: lruNetmapCache, } } func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.NetMap, error) { return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff) } func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) { return s.getNetMapByEpoch(epoch) } func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) { val, err := s.cache.get(epoch) if err != nil { return nil, err } return val, nil } func (s *lruNetmapSource) Epoch() (uint64, error) { return s.netState.CurrentEpoch(), nil } type cachedIRFetcher struct { *ttlNetCache[struct{}, [][]byte] } func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cachedIRFetcher { const ( irFetcherCacheSize = 1 // we intend to store only one value // Without the cache in the testnet we can see several hundred simultaneous // requests (frostfs-node #1278), so limiting the request rate solves the issue. // // Exact request rate doesn't really matter because Inner Ring list update // happens extremely rare, but there is no side chain events for that as // for now (frostfs-contract v0.15.0 notary disabled env) to monitor it. irFetcherCacheTTL = 30 * time.Second ) irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL, func(_ struct{}) ([][]byte, error) { return f.InnerRingKeys() }, ) return cachedIRFetcher{irFetcherCache} } // InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in // the cache or expired, then it returns keys from side chain and updates // the cache. func (f cachedIRFetcher) InnerRingKeys() ([][]byte, error) { val, err := f.get(struct{}{}) if err != nil { return nil, err } return val, nil } type ttlMaxObjectSizeCache struct { mtx sync.RWMutex lastUpdated time.Time lastSize uint64 src putsvc.MaxSizeSource } func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource { return &ttlMaxObjectSizeCache{ src: src, } } func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 { const ttl = time.Second * 30 c.mtx.RLock() prevUpdated := c.lastUpdated size := c.lastSize c.mtx.RUnlock() if time.Since(prevUpdated) < ttl { return size } c.mtx.Lock() size = c.lastSize if !c.lastUpdated.After(prevUpdated) { size = c.src.MaxObjectSize() c.lastSize = size c.lastUpdated = time.Now() } c.mtx.Unlock() return size }