diff --git a/pkg/services/object_manager/placement/cache.go b/pkg/services/object_manager/placement/cache.go new file mode 100644 index 000000000..217261877 --- /dev/null +++ b/pkg/services/object_manager/placement/cache.go @@ -0,0 +1,69 @@ +package placement + +import ( + "crypto/sha256" + "fmt" + "sync" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "github.com/hashicorp/golang-lru/v2/simplelru" +) + +// ContainerNodesCache caches results of ContainerNodes() invocation between epochs. +type ContainerNodesCache struct { + // mtx protects lastEpoch and containerCache fields. + mtx sync.Mutex + // lastEpoch contains network map epoch for all values in the container cache. + lastEpoch uint64 + // containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if + // neither netmap nor container has changed. + containerCache simplelru.LRUCache[cid.ID, [][]netmapSDK.NodeInfo] +} + +// defaultContainerCacheSize is the default size for the container cache. +const defaultContainerCacheSize = 10 + +// NewContainerNodesCache creates new cache which saves the result of the ContainerNodes() invocations. +// If size is <= 0, defaultContainerCacheSize (10) is used. +func NewContainerNodesCache(size int) *ContainerNodesCache { + if size <= 0 { + size = defaultContainerCacheSize + } + + cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](size, nil) // no error + return &ContainerNodesCache{ + containerCache: cache, + } +} + +// ContainerNodes returns the result of nm.ContainerNodes(), possibly from the cache. +func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) { + c.mtx.Lock() + if nm.Epoch() == c.lastEpoch { + raw, ok := c.containerCache.Get(cnr) + c.mtx.Unlock() + if ok { + return raw, nil + } + } else { + c.lastEpoch = nm.Epoch() + c.containerCache.Purge() + c.mtx.Unlock() + } + + binCnr := make([]byte, sha256.Size) + cnr.Encode(binCnr) + + cn, err := nm.ContainerNodes(p, binCnr) + if err != nil { + return nil, fmt.Errorf("could not get container nodes: %w", err) + } + + c.mtx.Lock() + if c.lastEpoch == nm.Epoch() { + c.containerCache.Add(cnr, cn) + } + c.mtx.Unlock() + return cn, nil +} diff --git a/pkg/services/object_manager/placement/cache_test.go b/pkg/services/object_manager/placement/cache_test.go new file mode 100644 index 000000000..5816d59fe --- /dev/null +++ b/pkg/services/object_manager/placement/cache_test.go @@ -0,0 +1,95 @@ +package placement_test + +import ( + "strconv" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "github.com/stretchr/testify/require" +) + +func TestContainerNodesCache(t *testing.T) { + const size = 3 + + nodes := [6]netmapSDK.NodeInfo{} + for i := range nodes { + nodes[i].SetAttribute("ATTR", strconv.Itoa(i)) + } + + nm := func(epoch uint64, nodes []netmapSDK.NodeInfo) *netmapSDK.NetMap { + var nm netmapSDK.NetMap + nm.SetEpoch(epoch) + nm.SetNodes(nodes) + return &nm + } + + var pp netmapSDK.PlacementPolicy + require.NoError(t, pp.DecodeString("REP 1")) + + t.Run("update netmap on the new epoch", func(t *testing.T) { + c := placement.NewContainerNodesCache(size) + + cnr := cidtest.ID() + res, err := c.ContainerNodes(nm(1, nodes[0:1]), cnr, pp) + require.NoError(t, err) + + // Use other nodes in the argument to ensure the result is taken from cache. + resCached, err := c.ContainerNodes(nm(1, nodes[1:2]), cnr, pp) + require.NoError(t, err) + require.Equal(t, res, resCached) + + // Update epoch, netmap should be purged. + resCached, err = c.ContainerNodes(nm(2, nodes[2:3]), cnr, pp) + require.NoError(t, err) + require.NotEqual(t, res, resCached) + }) + t.Run("cache uses container as a key", func(t *testing.T) { + c := placement.NewContainerNodesCache(size) + + res1, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp) + require.NoError(t, err) + + res2, err := c.ContainerNodes(nm(1, nodes[1:2]), cidtest.ID(), pp) + require.NoError(t, err) + + require.NotEqual(t, res1, res2) + }) + t.Run("cache respects size parameter", func(t *testing.T) { + c := placement.NewContainerNodesCache(size) + + nm1 := nm(1, nodes[0:1]) + nm2 := nm(1, nodes[1:2]) + cnr := [size * 2]cid.ID{} + res := [size * 2][][]netmapSDK.NodeInfo{} + for i := 0; i < size*2; i++ { + cnr[i] = cidtest.ID() + + var err error + res[i], err = c.ContainerNodes(nm1, cnr[i], pp) + require.NoError(t, err) + } + + for i := size; i < size*2; i++ { + r, err := c.ContainerNodes(nm2, cnr[i], pp) + require.NoError(t, err) + require.Equal(t, res[i], r) + } + for i := 0; i < size; i++ { + r, err := c.ContainerNodes(nm2, cnr[i], pp) + require.NoError(t, err) + require.NotEqual(t, res[i], r) + } + }) + t.Run("the error is propagated", func(t *testing.T) { + var pp netmapSDK.PlacementPolicy + require.NoError(t, pp.DecodeString("REP 1 SELECT 1 FROM X FILTER ATTR EQ 42 AS X")) + + c := placement.NewContainerNodesCache(size) + _, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp) + require.Error(t, err) + }) + +} diff --git a/pkg/services/object_manager/placement/netmap.go b/pkg/services/object_manager/placement/netmap.go index 8163529ed..1782e27ea 100644 --- a/pkg/services/object_manager/placement/netmap.go +++ b/pkg/services/object_manager/placement/netmap.go @@ -3,24 +3,16 @@ package placement import ( "crypto/sha256" "fmt" - "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/hashicorp/golang-lru/v2/simplelru" ) type netMapBuilder struct { - nmSrc netmap.Source - // mtx protects lastEpoch and containerCache fields. - mtx sync.Mutex - // lastEpoch contains contains network map epoch for all values in the container cache. - lastEpoch uint64 - // containerCache caches container nodes by ID. It is used to skip `GetContainerNodes` invocation if - // neither netmap nor container has changed. - containerCache simplelru.LRUCache[cid.ID, [][]netmapSDK.NodeInfo] + nmSrc netmap.Source + containerCache *ContainerNodesCache } type netMapSrc struct { @@ -29,22 +21,17 @@ type netMapSrc struct { nm *netmapSDK.NetMap } -// defaultContainerCacheSize is the default size for the container cache. -const defaultContainerCacheSize = 10 - func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder { - cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error return &netMapBuilder{ nmSrc: &netMapSrc{nm: nm}, - containerCache: cache, + containerCache: NewContainerNodesCache(0), } } func NewNetworkMapSourceBuilder(nmSrc netmap.Source) Builder { - cache, _ := simplelru.NewLRU[cid.ID, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error return &netMapBuilder{ nmSrc: nmSrc, - containerCache: cache, + containerCache: NewContainerNodesCache(0), } } @@ -58,33 +45,11 @@ func (b *netMapBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, p netmapSDK.Plac return nil, fmt.Errorf("could not get network map: %w", err) } - binCnr := make([]byte, sha256.Size) - cnr.Encode(binCnr) - - b.mtx.Lock() - if nm.Epoch() == b.lastEpoch { - raw, ok := b.containerCache.Get(cnr) - b.mtx.Unlock() - if ok { - return BuildObjectPlacement(nm, raw, obj) - } - } else { - b.lastEpoch = nm.Epoch() - b.containerCache.Purge() - b.mtx.Unlock() - } - - cn, err := nm.ContainerNodes(p, binCnr) + cn, err := b.containerCache.ContainerNodes(nm, cnr, p) if err != nil { - return nil, fmt.Errorf("could not get container nodes: %w", err) + return nil, err } - b.mtx.Lock() - if b.lastEpoch == nm.Epoch() { - b.containerCache.Add(cnr, cn) - } - b.mtx.Unlock() - return BuildObjectPlacement(nm, cn, obj) }