[#1248] placement: Decouple ContainerNodes() cache from the placement builder.
Also, write tests. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
3a48b282b6
commit
286df198c9
3 changed files with 170 additions and 41 deletions
69
pkg/services/object_manager/placement/cache.go
Normal file
69
pkg/services/object_manager/placement/cache.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package placement
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainerNodesCache caches results of ContainerNodes() invocation between epochs.
|
||||||
|
type ContainerNodesCache struct {
|
||||||
|
// mtx protects lastEpoch and containerCache fields.
|
||||||
|
mtx sync.Mutex
|
||||||
|
// lastEpoch contains network map epoch for all values in the container cache.
|
||||||
|
lastEpoch uint64
|
||||||
|
// containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if
|
||||||
|
// neither netmap nor container has changed.
|
||||||
|
containerCache simplelru.LRUCache[cid.ID, [][]netmapSDK.NodeInfo]
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultContainerCacheSize is the default size for the container cache.
|
||||||
|
const defaultContainerCacheSize = 10
|
||||||
|
|
||||||
|
// NewContainerNodesCache creates new cache which saves the result of the ContainerNodes() invocations.
|
||||||
|
// If size is <= 0, defaultContainerCacheSize (10) is used.
|
||||||
|
func NewContainerNodesCache(size int) *ContainerNodesCache {
|
||||||
|
if size <= 0 {
|
||||||
|
size = defaultContainerCacheSize
|
||||||
|
}
|
||||||
|
|
||||||
|
cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](size, nil) // no error
|
||||||
|
return &ContainerNodesCache{
|
||||||
|
containerCache: cache,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainerNodes returns the result of nm.ContainerNodes(), possibly from the cache.
|
||||||
|
func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
c.mtx.Lock()
|
||||||
|
if nm.Epoch() == c.lastEpoch {
|
||||||
|
raw, ok := c.containerCache.Get(cnr)
|
||||||
|
c.mtx.Unlock()
|
||||||
|
if ok {
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.lastEpoch = nm.Epoch()
|
||||||
|
c.containerCache.Purge()
|
||||||
|
c.mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
binCnr := make([]byte, sha256.Size)
|
||||||
|
cnr.Encode(binCnr)
|
||||||
|
|
||||||
|
cn, err := nm.ContainerNodes(p, binCnr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get container nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mtx.Lock()
|
||||||
|
if c.lastEpoch == nm.Epoch() {
|
||||||
|
c.containerCache.Add(cnr, cn)
|
||||||
|
}
|
||||||
|
c.mtx.Unlock()
|
||||||
|
return cn, nil
|
||||||
|
}
|
95
pkg/services/object_manager/placement/cache_test.go
Normal file
95
pkg/services/object_manager/placement/cache_test.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package placement_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContainerNodesCache(t *testing.T) {
|
||||||
|
const size = 3
|
||||||
|
|
||||||
|
nodes := [6]netmapSDK.NodeInfo{}
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetAttribute("ATTR", strconv.Itoa(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
nm := func(epoch uint64, nodes []netmapSDK.NodeInfo) *netmapSDK.NetMap {
|
||||||
|
var nm netmapSDK.NetMap
|
||||||
|
nm.SetEpoch(epoch)
|
||||||
|
nm.SetNodes(nodes)
|
||||||
|
return &nm
|
||||||
|
}
|
||||||
|
|
||||||
|
var pp netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, pp.DecodeString("REP 1"))
|
||||||
|
|
||||||
|
t.Run("update netmap on the new epoch", func(t *testing.T) {
|
||||||
|
c := placement.NewContainerNodesCache(size)
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
res, err := c.ContainerNodes(nm(1, nodes[0:1]), cnr, pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Use other nodes in the argument to ensure the result is taken from cache.
|
||||||
|
resCached, err := c.ContainerNodes(nm(1, nodes[1:2]), cnr, pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, res, resCached)
|
||||||
|
|
||||||
|
// Update epoch, netmap should be purged.
|
||||||
|
resCached, err = c.ContainerNodes(nm(2, nodes[2:3]), cnr, pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, res, resCached)
|
||||||
|
})
|
||||||
|
t.Run("cache uses container as a key", func(t *testing.T) {
|
||||||
|
c := placement.NewContainerNodesCache(size)
|
||||||
|
|
||||||
|
res1, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res2, err := c.ContainerNodes(nm(1, nodes[1:2]), cidtest.ID(), pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NotEqual(t, res1, res2)
|
||||||
|
})
|
||||||
|
t.Run("cache respects size parameter", func(t *testing.T) {
|
||||||
|
c := placement.NewContainerNodesCache(size)
|
||||||
|
|
||||||
|
nm1 := nm(1, nodes[0:1])
|
||||||
|
nm2 := nm(1, nodes[1:2])
|
||||||
|
cnr := [size * 2]cid.ID{}
|
||||||
|
res := [size * 2][][]netmapSDK.NodeInfo{}
|
||||||
|
for i := 0; i < size*2; i++ {
|
||||||
|
cnr[i] = cidtest.ID()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
res[i], err = c.ContainerNodes(nm1, cnr[i], pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := size; i < size*2; i++ {
|
||||||
|
r, err := c.ContainerNodes(nm2, cnr[i], pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, res[i], r)
|
||||||
|
}
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
r, err := c.ContainerNodes(nm2, cnr[i], pp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, res[i], r)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("the error is propagated", func(t *testing.T) {
|
||||||
|
var pp netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, pp.DecodeString("REP 1 SELECT 1 FROM X FILTER ATTR EQ 42 AS X"))
|
||||||
|
|
||||||
|
c := placement.NewContainerNodesCache(size)
|
||||||
|
_, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
|
@ -3,24 +3,16 @@ package placement
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type netMapBuilder struct {
|
type netMapBuilder struct {
|
||||||
nmSrc netmap.Source
|
nmSrc netmap.Source
|
||||||
// mtx protects lastEpoch and containerCache fields.
|
containerCache *ContainerNodesCache
|
||||||
mtx sync.Mutex
|
|
||||||
// lastEpoch contains contains network map epoch for all values in the container cache.
|
|
||||||
lastEpoch uint64
|
|
||||||
// containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if
|
|
||||||
// neither netmap nor container has changed.
|
|
||||||
containerCache simplelru.LRUCache[cid.ID, [][]netmapSDK.NodeInfo]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type netMapSrc struct {
|
type netMapSrc struct {
|
||||||
|
@ -29,22 +21,17 @@ type netMapSrc struct {
|
||||||
nm *netmapSDK.NetMap
|
nm *netmapSDK.NetMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaultContainerCacheSize is the default size for the container cache.
|
|
||||||
const defaultContainerCacheSize = 10
|
|
||||||
|
|
||||||
func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder {
|
func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder {
|
||||||
cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
|
|
||||||
return &netMapBuilder{
|
return &netMapBuilder{
|
||||||
nmSrc: &netMapSrc{nm: nm},
|
nmSrc: &netMapSrc{nm: nm},
|
||||||
containerCache: cache,
|
containerCache: NewContainerNodesCache(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNetworkMapSourceBuilder(nmSrc netmap.Source) Builder {
|
func NewNetworkMapSourceBuilder(nmSrc netmap.Source) Builder {
|
||||||
cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
|
|
||||||
return &netMapBuilder{
|
return &netMapBuilder{
|
||||||
nmSrc: nmSrc,
|
nmSrc: nmSrc,
|
||||||
containerCache: cache,
|
containerCache: NewContainerNodesCache(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,33 +45,11 @@ func (b *netMapBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, p netmapSDK.Plac
|
||||||
return nil, fmt.Errorf("could not get network map: %w", err)
|
return nil, fmt.Errorf("could not get network map: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
binCnr := make([]byte, sha256.Size)
|
cn, err := b.containerCache.ContainerNodes(nm, cnr, p)
|
||||||
cnr.Encode(binCnr)
|
|
||||||
|
|
||||||
b.mtx.Lock()
|
|
||||||
if nm.Epoch() == b.lastEpoch {
|
|
||||||
raw, ok := b.containerCache.Get(cnr)
|
|
||||||
b.mtx.Unlock()
|
|
||||||
if ok {
|
|
||||||
return BuildObjectPlacement(nm, raw, obj)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
b.lastEpoch = nm.Epoch()
|
|
||||||
b.containerCache.Purge()
|
|
||||||
b.mtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
cn, err := nm.ContainerNodes(p, binCnr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get container nodes: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b.mtx.Lock()
|
|
||||||
if b.lastEpoch == nm.Epoch() {
|
|
||||||
b.containerCache.Add(cnr, cn)
|
|
||||||
}
|
|
||||||
b.mtx.Unlock()
|
|
||||||
|
|
||||||
return BuildObjectPlacement(nm, cn, obj)
|
return BuildObjectPlacement(nm, cn, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue