[#2068] blobstor: Allow to provide storage ID in Exists

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-11-17 09:29:35 +03:00 committed by fyrchik
parent 6ad2b5d5b8
commit 63f604e948
5 changed files with 101 additions and 4 deletions

View file

@ -10,6 +10,17 @@ import (
// Exists implements common.Storage.
func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String())
if err != nil {
return common.ExistsRes{}, err
}
exists, err := blz.Exists(prm.Address)
return common.ExistsRes{Exists: exists}, err
}
activeCache := make(map[string]struct{})
var gPrm blobovnicza.GetPrm

View file

@ -0,0 +1,65 @@
package blobovniczatree
import (
"os"
"path/filepath"
"testing"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestExistsInvalidStorageID(t *testing.T) {
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithObjectSizeLimit(1024),
WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2),
WithRootPath(dir),
WithBlobovniczaSize(1<<20))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
t.Cleanup(func() { _ = b.Close() })
obj := blobstortest.NewObject(1024)
addr := object.AddressOf(obj)
d, err := obj.Marshal()
require.NoError(t, err)
putRes, err := b.Put(common.PutPrm{Address: addr, RawData: d, DontCompress: true})
require.NoError(t, err)
t.Run("valid but wrong storage id", func(t *testing.T) {
// "0/X/Y" <-> "1/X/Y"
storageID := slice.Copy(putRes.StorageID)
if storageID[0] == '0' {
storageID[0]++
} else {
storageID[0]--
}
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
require.NoError(t, err)
require.False(t, res.Exists)
})
t.Run("invalid storage id", func(t *testing.T) {
// "0/X/Y" <-> "1/X/Y"
storageID := slice.Copy(putRes.StorageID)
storageID[0] = '9'
badDir := filepath.Join(dir, "9")
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
require.NoError(t, os.Chmod(badDir, 0))
t.Cleanup(func() { _ = os.Chmod(filepath.Join(dir, "9"), os.ModePerm) })
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
require.Error(t, err)
require.False(t, res.Exists)
})
}

View file

@ -5,6 +5,7 @@ import oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
Address oid.Address
StorageID []byte
}
// ExistsRes groups the resulting values of Exists operation.

View file

@ -13,6 +13,13 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID != nil {
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.Exists(prm)
}
return b.storage[0].Storage.Exists(prm)
}
// If there was an error during existence check below,
// it will be returned unless object was found in blobovnicza.
// Otherwise, it is logged and the latest error is returned.

View file

@ -25,7 +25,20 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
var prm common.ExistsPrm
prm.Address = objects[0].addr
t.Run("without storage ID", func(t *testing.T) {
prm.StorageID = nil
res, err := s.Exists(prm)
require.NoError(t, err)
require.True(t, res.Exists)
})
t.Run("with storage ID", func(t *testing.T) {
prm.StorageID = objects[0].storageID
res, err := s.Exists(prm)
require.NoError(t, err)
require.True(t, res.Exists)
})
}