[#1248] node: Do not update cache twice

Do not request morph values on morph cache misses concurrently.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
Pavel Karpy 2023-01-12 21:54:44 +03:00
parent d65a95a2c6
commit 0c0a433b84
2 changed files with 72 additions and 8 deletions

View file

@ -16,6 +16,7 @@ Changelog for FrostFS Node
- Fetching blobovnicza objects that not found in write-cache (#2206) - Fetching blobovnicza objects that not found in write-cache (#2206)
- Do not search for the small objects in FSTree (#2206) - Do not search for the small objects in FSTree (#2206)
- Correct status error for expired session token (#2207) - Correct status error for expired session token (#2207)
- Concurrent morph cache misses (#1248)
### Removed ### Removed
### Updated ### Updated

View file

@ -26,7 +26,9 @@ type valueWithTime[V any] struct {
// entity that provides TTL cache interface. // entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct { type ttlNetCache[K comparable, V any] struct {
ttl time.Duration m sync.RWMutex // protects progMap
progMap map[K]chan struct{} // contains fetch-in-progress keys
ttl time.Duration
sz int sz int
@ -41,13 +43,47 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err) fatalOnErr(err)
return &ttlNetCache[K, V]{ return &ttlNetCache[K, V]{
ttl: ttl, ttl: ttl,
sz: sz, sz: sz,
cache: cache, cache: cache,
netRdr: netRdr, netRdr: netRdr,
progMap: make(map[K]chan struct{}),
} }
} }
func (c *ttlNetCache[K, V]) actualValue(val interface{}) (*valueWithTime[V], bool) {
valWithTime := val.(*valueWithTime[V])
if time.Since(valWithTime.t) < c.ttl {
return valWithTime, true
}
return nil, false
}
func (c *ttlNetCache[K, V]) waitForUpdate(key K, ch chan struct{}) (V, error) {
// wait for another routine to
// finish network fetching
<-ch
val, ok := c.cache.Peek(key)
if !ok {
// routine finished fetching
// but no value found; unexpected,
// repeat fetching
return c.get(key)
}
valWithTime, ok := c.actualValue(val)
if !ok {
// just updated value is already
// expired; unexpected, repeat
// fetching
return c.get(key)
}
return valWithTime.v, valWithTime.e
}
// reads value by the key. // reads value by the key.
// //
// updates the value from the network on cache miss or by TTL. // updates the value from the network on cache miss or by TTL.
@ -56,17 +92,44 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
func (c *ttlNetCache[K, V]) get(key K) (V, error) { func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key) val, ok := c.cache.Peek(key)
if ok { if ok {
if time.Since(val.t) < c.ttl { valWithTime, ok := c.actualValue(val)
return val.v, val.e if ok {
return valWithTime.v, valWithTime.e
} }
c.cache.Remove(key) c.cache.Remove(key)
} }
v, err := c.netRdr(key) c.m.RLock()
ch, ok := c.progMap[key]
c.m.RUnlock()
if ok {
return c.waitForUpdate(key, ch)
}
c.m.Lock()
ch, ok = c.progMap[key]
if ok {
c.m.Unlock()
return c.waitForUpdate(key, ch)
}
ch = make(chan struct{})
c.progMap[key] = ch
c.m.Unlock()
v, err := c.netRdr(key)
c.set(key, v, err) c.set(key, v, err)
c.m.Lock()
close(ch)
delete(c.progMap, key)
c.m.Unlock()
return v, err return v, err
} }