forked from TrueCloudLab/frostfs-node
[#451] frostfs-node: Add cache metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
81ea91de52
commit
df894fbac7
4 changed files with 99 additions and 12 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
|
@ -28,15 +29,17 @@ type ttlNetCache[K comparable, V any] struct {
|
|||
cache *expirable.LRU[K, *valueWithError[V]]
|
||||
netRdr netValueReader[K, V]
|
||||
keyLocker *utilSync.KeyLocker[K]
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
// complicates netValueReader with TTL caching mechanism.
|
||||
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V]) *ttlNetCache[K, V] {
|
||||
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V], metrics cacheMetrics) *ttlNetCache[K, V] {
|
||||
cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl)
|
||||
|
||||
return &ttlNetCache[K, V]{
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
metrics: metrics,
|
||||
keyLocker: utilSync.NewKeyLocker[K](),
|
||||
}
|
||||
}
|
||||
|
@ -47,8 +50,15 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
//
|
||||
// returned value should not be modified.
|
||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
val, ok := c.cache.Peek(key)
|
||||
if ok {
|
||||
hit = true
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
|
@ -57,6 +67,7 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
|||
|
||||
val, ok = c.cache.Peek(key)
|
||||
if ok {
|
||||
hit = true
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
|
@ -71,6 +82,11 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
|||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.AddMethodDuration("Set", time.Since(startedAt), false)
|
||||
}()
|
||||
|
||||
c.keyLocker.Lock(k)
|
||||
defer c.keyLocker.Unlock(k)
|
||||
|
||||
|
@ -81,10 +97,16 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
|||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.AddMethodDuration("Remove", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
c.keyLocker.Lock(key)
|
||||
defer c.keyLocker.Unlock(key)
|
||||
|
||||
c.cache.Remove(key)
|
||||
hit = c.cache.Remove(key)
|
||||
}
|
||||
|
||||
// entity that provides LRU cache interface.
|
||||
|
@ -92,16 +114,19 @@ type lruNetCache struct {
|
|||
cache *lru.Cache[uint64, *netmapSDK.NetMap]
|
||||
|
||||
netRdr netValueReader[uint64, *netmapSDK.NetMap]
|
||||
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
|
||||
func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap]) *lruNetCache {
|
||||
func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap], metrics cacheMetrics) *lruNetCache {
|
||||
cache, err := lru.New[uint64, *netmapSDK.NetMap](sz)
|
||||
fatalOnErr(err)
|
||||
|
||||
return &lruNetCache{
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -111,8 +136,15 @@ func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap]
|
|||
//
|
||||
// returned value should not be modified.
|
||||
func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
val, ok := c.cache.Get(key)
|
||||
if ok {
|
||||
hit = true
|
||||
return val, nil
|
||||
}
|
||||
|
||||
|
@ -138,10 +170,10 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
|
|||
|
||||
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
|
||||
return v.Get(id)
|
||||
})
|
||||
}, metrics.NewCacheMetrics("container"))
|
||||
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
|
||||
return v.DeletionInfo(id)
|
||||
})
|
||||
}, metrics.NewCacheMetrics("container_deletion_info"))
|
||||
|
||||
return ttlContainerStorage{
|
||||
containerCache: lruCnrCache,
|
||||
|
@ -175,7 +207,7 @@ func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStor
|
|||
|
||||
lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) {
|
||||
return v.GetEACL(id)
|
||||
})
|
||||
}, metrics.NewCacheMetrics("eacl"))
|
||||
|
||||
return ttlEACLStorage{lruCnrCache}
|
||||
}
|
||||
|
@ -202,7 +234,7 @@ func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
|
|||
|
||||
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(key uint64) (*netmapSDK.NetMap, error) {
|
||||
return v.GetNetMapByEpoch(key)
|
||||
})
|
||||
}, metrics.NewCacheMetrics("netmap"))
|
||||
|
||||
return &lruNetmapSource{
|
||||
netState: s,
|
||||
|
@ -251,7 +283,7 @@ func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cached
|
|||
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
||||
func(_ struct{}) ([][]byte, error) {
|
||||
return f.InnerRingKeys()
|
||||
},
|
||||
}, metrics.NewCacheMetrics("ir_keys"),
|
||||
)
|
||||
|
||||
return cachedIRFetcher{irFetcherCache}
|
||||
|
@ -274,23 +306,32 @@ type ttlMaxObjectSizeCache struct {
|
|||
lastUpdated time.Time
|
||||
lastSize uint64
|
||||
src putsvc.MaxSizeSource
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
|
||||
return &ttlMaxObjectSizeCache{
|
||||
src: src,
|
||||
metrics: metrics.NewCacheMetrics("max_object_size"),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
|
||||
const ttl = time.Second * 30
|
||||
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
c.mtx.RLock()
|
||||
prevUpdated := c.lastUpdated
|
||||
size := c.lastSize
|
||||
c.mtx.RUnlock()
|
||||
|
||||
if time.Since(prevUpdated) < ttl {
|
||||
hit = true
|
||||
return size
|
||||
}
|
||||
|
||||
|
@ -305,3 +346,7 @@ func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
|
|||
|
||||
return size
|
||||
}
|
||||
|
||||
type cacheMetrics interface {
|
||||
AddMethodDuration(method string, d time.Duration, hit bool)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
func TestTTLNetCache(t *testing.T) {
|
||||
ttlDuration := time.Millisecond * 50
|
||||
cache := newNetworkTTLCache(10, ttlDuration, testNetValueReader)
|
||||
cache := newNetworkTTLCache(10, ttlDuration, testNetValueReader, &noopCacheMetricts{})
|
||||
|
||||
key := "key"
|
||||
|
||||
|
@ -54,3 +54,7 @@ func testNetValueReader(key string) (time.Time, error) {
|
|||
}
|
||||
return time.Now(), nil
|
||||
}
|
||||
|
||||
type noopCacheMetricts struct{}
|
||||
|
||||
func (m *noopCacheMetricts) AddMethodDuration(method string, d time.Duration, hit bool) {}
|
||||
|
|
35
internal/metrics/cache.go
Normal file
35
internal/metrics/cache.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var cacheRequests = metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: commonCacheSubsystem,
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Accumulated common cache request process duration",
|
||||
}, []string{hitLabel, methodLabel, cacheLabel})
|
||||
|
||||
type CacheMetrics struct {
|
||||
cache string
|
||||
}
|
||||
|
||||
// NewCacheMetrics returns new CacheMetrics instance for cache specified.
|
||||
func NewCacheMetrics(cache string) *CacheMetrics {
|
||||
return &CacheMetrics{
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *CacheMetrics) AddMethodDuration(method string, d time.Duration, hit bool) {
|
||||
cacheRequests.With(prometheus.Labels{
|
||||
hitLabel: strconv.FormatBool(hit),
|
||||
methodLabel: method,
|
||||
cacheLabel: m.cache,
|
||||
}).Observe(d.Seconds())
|
||||
}
|
|
@ -21,6 +21,7 @@ const (
|
|||
writeCacheSubsystem = "writecache"
|
||||
grpcServerSubsystem = "grpc_server"
|
||||
policerSubsystem = "policer"
|
||||
commonCacheSubsystem = "common_cache"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
@ -38,6 +39,8 @@ const (
|
|||
storageLabel = "storage"
|
||||
operationLabel = "operation"
|
||||
endpointLabel = "endpoint"
|
||||
hitLabel = "hit"
|
||||
cacheLabel = "cache"
|
||||
|
||||
readWriteMode = "READ_WRITE"
|
||||
readOnlyMode = "READ_ONLY"
|
||||
|
|
Loading…
Reference in a new issue