From d8a00c365a5546d7e33e429e1ee1fa8e4ee89512 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@morphbits.ru>
Date: Tue, 30 Aug 2022 09:15:16 +0300
Subject: [PATCH] [#1691] blobovniczatree: Fix active blobovnicza caching

Maintain an invariant that any blobovnicza is present either
in `opened` or in `active` map. Otherwise, the logic becomes too
complicate because it is not obvious when we should close the blobovnicza.

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
---
 .../blobstor/blobovniczatree/blobovnicza.go   | 21 ++++---
 .../blobovniczatree/blobovnicza_test.go       | 61 +++++++++++++++++++
 .../blobstor/blobovniczatree/control.go       | 41 +++++++------
 3 files changed, 97 insertions(+), 26 deletions(-)

diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go
index 665047631..d9d324c30 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go
@@ -179,19 +179,26 @@ func (b *Blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex
 	defer b.activeMtx.Unlock()
 
 	// check 2nd time to find out if it blobovnicza was activated while thread was locked
-	if tryActive, ok := b.active[p]; ok && tryActive.blz == active.blz {
+	tryActive, ok := b.active[p]
+	if ok && tryActive.blz == active.blz {
 		return tryActive, nil
 	}
 
-	// remove from opened cache (active blobovnicza should always be opened)
-	b.lruMtx.Lock()
-	b.opened.Remove(p)
-	b.lruMtx.Unlock()
+	// Remove from opened cache (active blobovnicza should always be opened).
+	// Because `onEvict` callback is called in `Remove`, we need to update
+	// active map beforehand.
 	b.active[p] = active
 
+	activePath := filepath.Join(p, u64ToHexString(active.ind))
+	b.lruMtx.Lock()
+	b.opened.Remove(activePath)
+	if ok {
+		b.opened.Add(filepath.Join(p, u64ToHexString(tryActive.ind)), tryActive.blz)
+	}
+	b.lruMtx.Unlock()
+
 	b.log.Debug("blobovnicza successfully activated",
-		zap.String("path", filepath.Join(p, u64ToHexString(active.ind))),
-	)
+		zap.String("path", activePath))
 
 	return active, nil
 }
diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go
index e5d0dde1a..b9049260e 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go
@@ -9,10 +9,71 @@ import (
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
 	"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
+	objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
 	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
 	"github.com/stretchr/testify/require"
 )
 
+func TestOpenedAndActive(t *testing.T) {
+	rand.Seed(1024)
+
+	l := test.NewLogger(true)
+	p, err := os.MkdirTemp("", "*")
+	require.NoError(t, err)
+
+	const (
+		width  = 2
+		depth  = 1
+		dbSize = 64 * 1024
+	)
+
+	b := NewBlobovniczaTree(
+		WithLogger(l),
+		WithObjectSizeLimit(2048),
+		WithBlobovniczaShallowWidth(width),
+		WithBlobovniczaShallowDepth(depth),
+		WithRootPath(p),
+		WithOpenedCacheSize(1),
+		WithBlobovniczaSize(dbSize))
+
+	defer os.RemoveAll(p)
+
+	require.NoError(t, b.Open(false))
+	require.NoError(t, b.Init())
+
+	type pair struct {
+		obj *objectSDK.Object
+		sid []byte
+	}
+
+	objects := make([]pair, 10)
+	for i := range objects {
+		var prm common.PutPrm
+		prm.Object = blobstortest.NewObject(1024)
+		prm.Address = object.AddressOf(prm.Object)
+		prm.RawData, err = prm.Object.Marshal()
+		require.NoError(t, err)
+
+		res, err := b.Put(prm)
+		require.NoError(t, err)
+
+		objects[i].obj = prm.Object
+		objects[i].sid = res.StorageID
+	}
+	for i := range objects {
+		var prm common.GetPrm
+		prm.Address = object.AddressOf(objects[i].obj)
+		// It is important to provide StorageID because
+		// we want to open a single blobovnicza, without other
+		// unpredictable cache effects.
+		prm.StorageID = objects[i].sid
+
+		_, err := b.Get(prm)
+		require.NoError(t, err)
+	}
+	require.NoError(t, b.Close())
+}
+
 func TestBlobovniczas(t *testing.T) {
 	rand.Seed(1024)
 
diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go
index ff7578a50..33b08fecb 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go
@@ -26,7 +26,7 @@ func (b *Blobovniczas) Init() error {
 	}
 
 	return b.iterateLeaves(func(p string) (bool, error) {
-		blz, err := b.openBlobovniczaNoCache(p, false)
+		blz, err := b.openBlobovniczaNoCache(p)
 		if err != nil {
 			return true, err
 		}
@@ -89,36 +89,39 @@ func (b *Blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, erro
 		return v.(*blobovnicza.Blobovnicza), nil
 	}
 
-	blz, err := b.openBlobovniczaNoCache(p, true)
+	lvlPath := filepath.Dir(p)
+	curIndex := u64FromHexString(filepath.Base(p))
+
+	b.activeMtx.RLock()
+	defer b.activeMtx.RUnlock()
+
+	active, ok := b.active[lvlPath]
+	if ok && active.ind == curIndex {
+		return active.blz, nil
+	}
+
+	b.lruMtx.Lock()
+	defer b.lruMtx.Unlock()
+
+	v, ok = b.opened.Get(p)
+	if ok {
+		return v.(*blobovnicza.Blobovnicza), nil
+	}
+
+	blz, err := b.openBlobovniczaNoCache(p)
 	if err != nil {
 		return nil, err
 	}
 
-	b.activeMtx.Lock()
-	b.lruMtx.Lock()
-
 	b.opened.Add(p, blz)
 
-	b.lruMtx.Unlock()
-	b.activeMtx.Unlock()
-
 	return blz, nil
 }
 
-func (b *Blobovniczas) openBlobovniczaNoCache(p string, tryCache bool) (*blobovnicza.Blobovnicza, error) {
+func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicza, error) {
 	b.openMtx.Lock()
 	defer b.openMtx.Unlock()
 
-	if tryCache {
-		b.lruMtx.Lock()
-		v, ok := b.opened.Get(p)
-		b.lruMtx.Unlock()
-		if ok {
-			// blobovnicza should be opened in cache
-			return v.(*blobovnicza.Blobovnicza), nil
-		}
-	}
-
 	blz := blobovnicza.New(append(b.blzOpts,
 		blobovnicza.WithReadOnly(b.readOnly),
 		blobovnicza.WithPath(filepath.Join(b.rootPath, p)),