From 0c0a433b8489987ed015c8106874335a1a8d2b51 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Thu, 12 Jan 2023 21:54:44 +0300
Subject: [PATCH] [#1248] node: Do not update cache twice
Do not request morph values on morph cache misses concurrently.
Signed-off-by: Pavel Karpy
---
CHANGELOG.md | 1 +
cmd/frostfs-node/cache.go | 79 +++++++++++++++++++++++++++++++++++----
2 files changed, 72 insertions(+), 8 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2967f8b87..0c30d09aa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@ Changelog for FrostFS Node
- Fetching blobovnicza objects that not found in write-cache (#2206)
- Do not search for the small objects in FSTree (#2206)
- Correct status error for expired session token (#2207)
+- Concurrent morph cache misses (#1248)
### Removed
### Updated
diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go
index ea11cbec5..4da8c3204 100644
--- a/cmd/frostfs-node/cache.go
+++ b/cmd/frostfs-node/cache.go
@@ -26,7 +26,9 @@ type valueWithTime[V any] struct {
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
- ttl time.Duration
+ m sync.RWMutex // protects progMap
+ progMap map[K]chan struct{} // contains fetch-in-progress keys
+ ttl time.Duration
sz int
@@ -41,13 +43,47 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err)
return &ttlNetCache[K, V]{
- ttl: ttl,
- sz: sz,
- cache: cache,
- netRdr: netRdr,
+ ttl: ttl,
+ sz: sz,
+ cache: cache,
+ netRdr: netRdr,
+ progMap: make(map[K]chan struct{}),
}
}
+func (c *ttlNetCache[K, V]) actualValue(val interface{}) (*valueWithTime[V], bool) {
+ valWithTime := val.(*valueWithTime[V])
+ if time.Since(valWithTime.t) < c.ttl {
+ return valWithTime, true
+ }
+
+ return nil, false
+}
+
+func (c *ttlNetCache[K, V]) waitForUpdate(key K, ch chan struct{}) (V, error) {
+ // wait for another routine to
+ // finish network fetching
+ <-ch
+
+ val, ok := c.cache.Peek(key)
+ if !ok {
+ // routine finished fetching
+ // but no value found; unexpected,
+ // repeat fetching
+ return c.get(key)
+ }
+
+ valWithTime, ok := c.actualValue(val)
+ if !ok {
+ // just updated value is already
+ // expired; unexpected, repeat
+ // fetching
+ return c.get(key)
+ }
+
+ return valWithTime.v, valWithTime.e
+}
+
// reads value by the key.
//
// updates the value from the network on cache miss or by TTL.
@@ -56,17 +92,44 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key)
if ok {
- if time.Since(val.t) < c.ttl {
- return val.v, val.e
+ valWithTime, ok := c.actualValue(val)
+ if ok {
+ return valWithTime.v, valWithTime.e
}
c.cache.Remove(key)
}
- v, err := c.netRdr(key)
+ c.m.RLock()
+ ch, ok := c.progMap[key]
+ c.m.RUnlock()
+ if ok {
+ return c.waitForUpdate(key, ch)
+ }
+
+ c.m.Lock()
+ ch, ok = c.progMap[key]
+ if ok {
+ c.m.Unlock()
+ return c.waitForUpdate(key, ch)
+ }
+
+ ch = make(chan struct{})
+ c.progMap[key] = ch
+
+ c.m.Unlock()
+
+ v, err := c.netRdr(key)
c.set(key, v, err)
+ c.m.Lock()
+
+ close(ch)
+ delete(c.progMap, key)
+
+ c.m.Unlock()
+
return v, err
}