Invalidate container cache on PutSuccess event #272

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/container_not_found into master 2023-07-26 21:07:57 +00:00
2 changed files with 112 additions and 54 deletions

View file

@ -24,25 +24,72 @@ type valueWithTime[V any] struct {
e error e error
} }
// valueInProgress is a struct that contains type locker struct {
// values that are being fetched/updated. mtx *sync.Mutex
type valueInProgress[V any] struct { waiters int // not protected by mtx, must used outer mutex to update concurrently
m sync.RWMutex }
v V
e error type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
fyrchik marked this conversation as resolved Outdated

Can we do this without locker mtx?

Can we do this without `locker` mtx?

l.mtx.Lock() protects

`l.mtx.Lock()` protects

Can we mention this in the comment to mtx or waiters field?

Can we mention this in the comment to `mtx` or `waiters` field?

hm, there are 8 lines of code only.

hm, there are 8 lines of code only.

comment added

comment added

It's for the kids!

It's for the kids!
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()

I find it difficult to read and verify the absense of deadlocks: we have multiple mutexes and we call some Lock in Unlock, so the order of defer + Unlock() could matter in some cases.

I find it difficult to read and verify the absense of deadlocks: we have multiple mutexes and we call some `Lock` in `Unlock`, so the order of `defer + Unlock()` could matter in some cases.

Sorry, i do not know how to make this code more understandable. It seems like there are 10 lines of code and it should be simple. Therefore, I will be glad if you suggest options.

Sorry, i do not know how to make this code more understandable. It seems like there are 10 lines of code and it should be simple. Therefore, I will be glad if you suggest options.
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
fyrchik marked this conversation as resolved Outdated

Do this in the else branch? Or is it a deliberate decision? Don't mind either option.

Do this in the `else` branch? Or is it a deliberate decision? Don't mind either option.

It's deliberate decision. else is redundant.

It's deliberate decision. `else` is redundant.
locker.mtx.Unlock()
} }
// entity that provides TTL cache interface. // entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct { type ttlNetCache[K comparable, V any] struct {
m sync.RWMutex // protects progMap ttl time.Duration
progMap map[K]*valueInProgress[V] // contains fetch-in-progress keys
ttl time.Duration
sz int sz int
cache *lru.Cache[K, *valueWithTime[V]] cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V] netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
} }
// complicates netValueReader with TTL caching mechanism. // complicates netValueReader with TTL caching mechanism.
@ -51,72 +98,48 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err) fatalOnErr(err)
return &ttlNetCache[K, V]{ return &ttlNetCache[K, V]{
ttl: ttl, ttl: ttl,
sz: sz, sz: sz,
cache: cache, cache: cache,
netRdr: netRdr, netRdr: netRdr,
progMap: make(map[K]*valueInProgress[V]), keyLocker: newKeyLocker[K](),
} }
} }
func waitForUpdate[V any](vip *valueInProgress[V]) (V, error) {
vip.m.RLock()
defer vip.m.RUnlock()
return vip.v, vip.e
}
// reads value by the key. // reads value by the key.
// //
// updates the value from the network on cache miss or by TTL. // updates the value from the network on cache miss or by TTL.
// //
// returned value should not be modified. // returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) { func (c *ttlNetCache[K, V]) get(key K) (V, error) {
valWithTime, ok := c.cache.Peek(key) val, ok := c.cache.Peek(key)
if ok { if ok && time.Since(val.t) < c.ttl {
if time.Since(valWithTime.t) < c.ttl { return val.v, val.e
return valWithTime.v, valWithTime.e
}
c.cache.Remove(key)
} }
fyrchik marked this conversation as resolved Outdated

Do we need to block even if the valid item is in cache?

Do we need to block even if the valid item is in cache?

Fixed with two checks

Fixed with two checks
c.m.RLock() c.keyLocker.LockKey(key)
valInProg, ok := c.progMap[key] defer c.keyLocker.UnlockKey(key)
c.m.RUnlock()
if ok { val, ok = c.cache.Peek(key)
return waitForUpdate(valInProg) if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
} }
c.m.Lock()
valInProg, ok = c.progMap[key]
if ok {
c.m.Unlock()
return waitForUpdate(valInProg)
}
valInProg = &valueInProgress[V]{}
valInProg.m.Lock()
c.progMap[key] = valInProg
c.m.Unlock()
v, err := c.netRdr(key) v, err := c.netRdr(key)
c.set(key, v, err)
valInProg.v = v c.cache.Add(key, &valueWithTime[V]{
valInProg.e = err v: v,
valInProg.m.Unlock() t: time.Now(),
e: err,
c.m.Lock() })
delete(c.progMap, key)
c.m.Unlock()
return v, err return v, err
} }
func (c *ttlNetCache[K, V]) set(k K, v V, e error) { func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{ c.cache.Add(k, &valueWithTime[V]{
v: v, v: v,
t: time.Now(), t: time.Now(),
@ -125,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
} }
func (c *ttlNetCache[K, V]) remove(key K) { func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key) c.cache.Remove(key)
} }

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}