Fix container nodes cache #1248
3 changed files with 170 additions and 37 deletions
69
pkg/services/object_manager/placement/cache.go
Normal file
69
pkg/services/object_manager/placement/cache.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package placement
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
)
|
||||
|
||||
// ContainerNodesCache caches results of ContainerNodes() invocation between epochs.
|
||||
type ContainerNodesCache struct {
|
||||
// mtx protects lastEpoch and containerCache fields.
|
||||
mtx sync.Mutex
|
||||
// lastEpoch contains network map epoch for all values in the container cache.
|
||||
lastEpoch uint64
|
||||
// containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if
|
||||
// neither netmap nor container has changed.
|
||||
containerCache simplelru.LRUCache[cid.ID, [][]netmapSDK.NodeInfo]
|
||||
}
|
||||
|
||||
// defaultContainerCacheSize is the default size for the container cache.
|
||||
const defaultContainerCacheSize = 10
|
||||
|
||||
// NewContainerNodesCache creates new cache which saves the result of the ContainerNodes() invocations.
|
||||
// If size is <= 0, defaultContainerCacheSize (10) is used.
|
||||
func NewContainerNodesCache(size int) *ContainerNodesCache {
|
||||
if size <= 0 {
|
||||
size = defaultContainerCacheSize
|
||||
}
|
||||
|
||||
cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](size, nil) // no error
|
||||
return &ContainerNodesCache{
|
||||
containerCache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
// ContainerNodes returns the result of nm.ContainerNodes(), possibly from the cache.
|
||||
func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||
c.mtx.Lock()
|
||||
if nm.Epoch() == c.lastEpoch {
|
||||
raw, ok := c.containerCache.Get(cnr)
|
||||
c.mtx.Unlock()
|
||||
if ok {
|
||||
return raw, nil
|
||||
}
|
||||
} else {
|
||||
c.lastEpoch = nm.Epoch()
|
||||
c.containerCache.Purge()
|
||||
c.mtx.Unlock()
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
||||
cn, err := nm.ContainerNodes(p, binCnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container nodes: %w", err)
|
||||
}
|
||||
|
||||
c.mtx.Lock()
|
||||
if c.lastEpoch == nm.Epoch() {
|
||||
c.containerCache.Add(cnr, cn)
|
||||
}
|
||||
c.mtx.Unlock()
|
||||
return cn, nil
|
||||
}
|
95
pkg/services/object_manager/placement/cache_test.go
Normal file
95
pkg/services/object_manager/placement/cache_test.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
package placement_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestContainerNodesCache(t *testing.T) {
|
||||
const size = 3
|
||||
|
||||
nodes := [6]netmapSDK.NodeInfo{}
|
||||
for i := range nodes {
|
||||
nodes[i].SetAttribute("ATTR", strconv.Itoa(i))
|
||||
}
|
||||
|
||||
nm := func(epoch uint64, nodes []netmapSDK.NodeInfo) *netmapSDK.NetMap {
|
||||
var nm netmapSDK.NetMap
|
||||
nm.SetEpoch(epoch)
|
||||
nm.SetNodes(nodes)
|
||||
return &nm
|
||||
}
|
||||
|
||||
var pp netmapSDK.PlacementPolicy
|
||||
require.NoError(t, pp.DecodeString("REP 1"))
|
||||
|
||||
t.Run("update netmap on the new epoch", func(t *testing.T) {
|
||||
c := placement.NewContainerNodesCache(size)
|
||||
|
||||
cnr := cidtest.ID()
|
||||
res, err := c.ContainerNodes(nm(1, nodes[0:1]), cnr, pp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Use other nodes in the argument to ensure the result is taken from cache.
|
||||
resCached, err := c.ContainerNodes(nm(1, nodes[1:2]), cnr, pp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res, resCached)
|
||||
|
||||
// Update epoch, netmap should be purged.
|
||||
resCached, err = c.ContainerNodes(nm(2, nodes[2:3]), cnr, pp)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, res, resCached)
|
||||
})
|
||||
t.Run("cache uses container as a key", func(t *testing.T) {
|
||||
c := placement.NewContainerNodesCache(size)
|
||||
|
||||
res1, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp)
|
||||
require.NoError(t, err)
|
||||
|
||||
res2, err := c.ContainerNodes(nm(1, nodes[1:2]), cidtest.ID(), pp)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEqual(t, res1, res2)
|
||||
})
|
||||
t.Run("cache respects size parameter", func(t *testing.T) {
|
||||
c := placement.NewContainerNodesCache(size)
|
||||
|
||||
nm1 := nm(1, nodes[0:1])
|
||||
nm2 := nm(1, nodes[1:2])
|
||||
cnr := [size * 2]cid.ID{}
|
||||
res := [size * 2][][]netmapSDK.NodeInfo{}
|
||||
for i := 0; i < size*2; i++ {
|
||||
cnr[i] = cidtest.ID()
|
||||
|
||||
var err error
|
||||
res[i], err = c.ContainerNodes(nm1, cnr[i], pp)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for i := size; i < size*2; i++ {
|
||||
r, err := c.ContainerNodes(nm2, cnr[i], pp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res[i], r)
|
||||
}
|
||||
for i := 0; i < size; i++ {
|
||||
r, err := c.ContainerNodes(nm2, cnr[i], pp)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, res[i], r)
|
||||
}
|
||||
})
|
||||
t.Run("the error is propagated", func(t *testing.T) {
|
||||
var pp netmapSDK.PlacementPolicy
|
||||
require.NoError(t, pp.DecodeString("REP 1 SELECT 1 FROM X FILTER ATTR EQ 42 AS X"))
|
||||
|
||||
c := placement.NewContainerNodesCache(size)
|
||||
_, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
}
|
|
@ -3,23 +3,16 @@ package placement
|
|||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
)
|
||||
|
||||
type netMapBuilder struct {
|
||||
nmSrc netmap.Source
|
||||
// mtx protects lastNm and containerCache fields.
|
||||
mtx sync.Mutex
|
||||
lastNm *netmapSDK.NetMap
|
||||
// containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if
|
||||
// neither netmap nor container has changed.
|
||||
containerCache simplelru.LRUCache[string, [][]netmapSDK.NodeInfo]
|
||||
nmSrc netmap.Source
|
||||
containerCache *ContainerNodesCache
|
||||
}
|
||||
|
||||
type netMapSrc struct {
|
||||
|
@ -28,22 +21,17 @@ type netMapSrc struct {
|
|||
nm *netmapSDK.NetMap
|
||||
}
|
||||
|
||||
// defaultContainerCacheSize is the default size for the container cache.
|
||||
const defaultContainerCacheSize = 10
|
||||
|
||||
func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder {
|
||||
cache, _ := simplelru.NewLRU[string, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
|
||||
return &netMapBuilder{
|
||||
nmSrc: &netMapSrc{nm: nm},
|
||||
containerCache: cache,
|
||||
containerCache: NewContainerNodesCache(0),
|
||||
}
|
||||
}
|
||||
|
||||
func NewNetworkMapSourceBuilder(nmSrc netmap.Source) Builder {
|
||||
cache, _ := simplelru.NewLRU[string, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
|
||||
return &netMapBuilder{
|
||||
nmSrc: nmSrc,
|
||||
containerCache: cache,
|
||||
containerCache: NewContainerNodesCache(0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,30 +45,11 @@ func (b *netMapBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, p netmapSDK.Plac
|
|||
return nil, fmt.Errorf("could not get network map: %w", err)
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
||||
b.mtx.Lock()
|
||||
if nm == b.lastNm {
|
||||
raw, ok := b.containerCache.Get(string(binCnr))
|
||||
b.mtx.Unlock()
|
||||
if ok {
|
||||
return BuildObjectPlacement(nm, raw, obj)
|
||||
}
|
||||
} else {
|
||||
b.containerCache.Purge()
|
||||
b.mtx.Unlock()
|
||||
}
|
||||
|
||||
cn, err := nm.ContainerNodes(p, binCnr)
|
||||
cn, err := b.containerCache.ContainerNodes(nm, cnr, p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container nodes: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.mtx.Lock()
|
||||
b.containerCache.Add(string(binCnr), cn)
|
||||
b.mtx.Unlock()
|
||||
|
||||
return BuildObjectPlacement(nm, cn, obj)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue