frostfs-node/cmd/frostfs-node/cache.go

351 lines
8.9 KiB
Go
Raw Normal View History

package main
import (
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
)
type netValueReader[K any, V any] func(K) (V, error)
type valueWithError[V any] struct {
v V
// cached error in order to not repeat failed request for some time
e error
}
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
cache *expirable.LRU[K, *valueWithError[V]]
netRdr netValueReader[K, V]
keyLocker *utilSync.KeyLocker[K]
metrics cacheMetrics
}
// complicates netValueReader with TTL caching mechanism.
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V], metrics cacheMetrics) *ttlNetCache[K, V] {
cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl)
return &ttlNetCache[K, V]{
cache: cache,
netRdr: netRdr,
metrics: metrics,
keyLocker: utilSync.NewKeyLocker[K](),
}
}
// reads value by the key.
//
// updates the value from the network on cache miss or by TTL.
//
// returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
hit := false
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
}()
val, ok := c.cache.Peek(key)
if ok {
hit = true
return val.v, val.e
}
c.keyLocker.Lock(key)
defer c.keyLocker.Unlock(key)
val, ok = c.cache.Peek(key)
if ok {
hit = true
return val.v, val.e
}
v, err := c.netRdr(key)
c.cache.Add(key, &valueWithError[V]{
v: v,
e: err,
})
return v, err
}
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Set", time.Since(startedAt), false)
}()
c.keyLocker.Lock(k)
defer c.keyLocker.Unlock(k)
c.cache.Add(k, &valueWithError[V]{
v: v,
e: e,
})
}
func (c *ttlNetCache[K, V]) remove(key K) {
hit := false
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Remove", time.Since(startedAt), hit)
}()
c.keyLocker.Lock(key)
defer c.keyLocker.Unlock(key)
hit = c.cache.Remove(key)
}
// entity that provides LRU cache interface.
type lruNetCache struct {
cache *lru.Cache[uint64, *netmapSDK.NetMap]
netRdr netValueReader[uint64, *netmapSDK.NetMap]
metrics cacheMetrics
}
// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap], metrics cacheMetrics) *lruNetCache {
cache, err := lru.New[uint64, *netmapSDK.NetMap](sz)
fatalOnErr(err)
return &lruNetCache{
cache: cache,
netRdr: netRdr,
metrics: metrics,
}
}
// reads value by the key.
//
// updates the value from the network on cache miss.
//
// returned value should not be modified.
func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
hit := false
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
}()
val, ok := c.cache.Get(key)
if ok {
hit = true
return val, nil
}
val, err := c.netRdr(key)
if err != nil {
return nil, err
}
c.cache.Add(key, val)
return val, nil
}
// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
containerCache *ttlNetCache[cid.ID, *container.Container]
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
}
func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage {
lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.Container, error) {
return v.Get(id)
}, metrics.NewCacheMetrics("container"))
lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.DelInfo, error) {
return v.DeletionInfo(id)
}, metrics.NewCacheMetrics("container_deletion_info"))
return ttlContainerStorage{
containerCache: lruCnrCache,
delInfoCache: lruDelInfoCache,
}
}
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
// The removal invalidates possibly stored error response.
s.delInfoCache.remove(cnr)
}
// Get returns container value from the cache. If value is missing in the cache
// or expired, then it returns value from side chain and updates the cache.
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
return s.containerCache.get(cnr)
}
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
return s.delInfoCache.get(cnr)
}
type ttlEACLStorage struct {
*ttlNetCache[cid.ID, *container.EACL]
}
func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStorage {
const eaclCacheSize = 100
lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) {
return v.GetEACL(id)
}, metrics.NewCacheMetrics("eacl"))
return ttlEACLStorage{lruCnrCache}
}
// GetEACL returns eACL value from the cache. If value is missing in the cache
// or expired, then it returns value from side chain and updates cache.
func (s ttlEACLStorage) GetEACL(cnr cid.ID) (*container.EACL, error) {
return s.get(cnr)
}
// InvalidateEACL removes cached eACL value.
func (s ttlEACLStorage) InvalidateEACL(cnr cid.ID) {
s.remove(cnr)
}
type lruNetmapSource struct {
netState netmap.State
cache *lruNetCache
}
func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
const netmapCacheSize = 10
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(key uint64) (*netmapSDK.NetMap, error) {
return v.GetNetMapByEpoch(key)
}, metrics.NewCacheMetrics("netmap"))
return &lruNetmapSource{
netState: s,
cache: lruNetmapCache,
}
}
func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff)
}
func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(epoch)
}
func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
val, err := s.cache.get(epoch)
if err != nil {
return nil, err
}
return val, nil
}
func (s *lruNetmapSource) Epoch() (uint64, error) {
return s.netState.CurrentEpoch(), nil
}
type cachedIRFetcher struct {
*ttlNetCache[struct{}, [][]byte]
}
func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cachedIRFetcher {
const (
irFetcherCacheSize = 1 // we intend to store only one value
// Without the cache in the testnet we can see several hundred simultaneous
// requests (frostfs-node #1278), so limiting the request rate solves the issue.
//
// Exact request rate doesn't really matter because Inner Ring list update
// happens extremely rare, but there is no side chain events for that as
// for now (frostfs-contract v0.15.0 notary disabled env) to monitor it.
irFetcherCacheTTL = 30 * time.Second
)
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
func(_ struct{}) ([][]byte, error) {
return f.InnerRingKeys()
}, metrics.NewCacheMetrics("ir_keys"),
)
return cachedIRFetcher{irFetcherCache}
}
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
// the cache or expired, then it returns keys from side chain and updates
// the cache.
func (f cachedIRFetcher) InnerRingKeys() ([][]byte, error) {
val, err := f.get(struct{}{})
if err != nil {
return nil, err
}
return val, nil
}
type ttlMaxObjectSizeCache struct {
mtx sync.RWMutex
lastUpdated time.Time
lastSize uint64
src objectwriter.MaxSizeSource
metrics cacheMetrics
}
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
return &ttlMaxObjectSizeCache{
src: src,
metrics: metrics.NewCacheMetrics("max_object_size"),
}
}
func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
const ttl = time.Second * 30
hit := false
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
}()
c.mtx.RLock()
prevUpdated := c.lastUpdated
size := c.lastSize
c.mtx.RUnlock()
if time.Since(prevUpdated) < ttl {
hit = true
return size
}
c.mtx.Lock()
size = c.lastSize
if !c.lastUpdated.After(prevUpdated) {
size = c.src.MaxObjectSize()
c.lastSize = size
c.lastUpdated = time.Now()
}
c.mtx.Unlock()
return size
}
type cacheMetrics interface {
AddMethodDuration(method string, d time.Duration, hit bool)
}