[#1143] blobstor: Implement existsSmall check

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-03-03 17:16:49 +03:00 committed by fyrchik
parent aa0cc1f824
commit 08e7914729
3 changed files with 141 additions and 19 deletions

View file

@ -33,13 +33,11 @@ func testAddress() *addressSDK.Address {
return addr return addr
} }
func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, assertErrPut, assertErrGet func(error) bool) *addressSDK.Address { func testPutGet(t *testing.T, blz *Blobovnicza, addr *addressSDK.Address, sz uint64, assertErrPut, assertErrGet func(error) bool) *addressSDK.Address {
// create binary object // create binary object
data := make([]byte, sz) data := make([]byte, sz)
rand.Read(data)
addr := testAddress()
// try to save object in Blobovnicza
pPut := new(PutPrm) pPut := new(PutPrm)
pPut.SetAddress(addr) pPut.SetAddress(addr)
pPut.SetMarshaledObject(data) pPut.SetMarshaledObject(data)
@ -106,7 +104,7 @@ func TestBlobovnicza(t *testing.T) {
filled := uint64(15 * 1 << 10) filled := uint64(15 * 1 << 10)
// test object 15KB // test object 15KB
addr := testPutGet(t, blz, filled, nil, nil) addr := testPutGet(t, blz, testAddress(), filled, nil, nil)
// remove the object // remove the object
dPrm := new(DeletePrm) dPrm := new(DeletePrm)
@ -120,11 +118,11 @@ func TestBlobovnicza(t *testing.T) {
// fill Blobovnicza fully // fill Blobovnicza fully
for ; filled < sizeLim; filled += objSizeLim { for ; filled < sizeLim; filled += objSizeLim {
testPutGet(t, blz, objSizeLim, nil, nil) testPutGet(t, blz, testAddress(), objSizeLim, nil, nil)
} }
// from now objects should not be saved // from now objects should not be saved
testPutGet(t, blz, 1024, func(err error) bool { testPutGet(t, blz, testAddress(), 1024, func(err error) bool {
return errors.Is(err, ErrFull) return errors.Is(err, ErrFull)
}, nil) }, nil)

View file

@ -2,9 +2,12 @@ package blobstor
import ( import (
"errors" "errors"
"path/filepath"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
"go.uber.org/zap"
) )
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
@ -29,20 +32,35 @@ func (r ExistsRes) Exists() bool {
func (b *BlobStor) Exists(prm *ExistsPrm) (*ExistsRes, error) { func (b *BlobStor) Exists(prm *ExistsPrm) (*ExistsRes, error) {
// check presence in shallow dir first (cheaper) // check presence in shallow dir first (cheaper)
exists, err := b.existsBig(prm.addr) exists, err := b.existsBig(prm.addr)
if !exists {
// TODO: #1143 do smth if err != nil
// check presence in blobovnicza // If there was an error during existence check below,
exists, err = b.existsSmall(prm.addr) // it will be returned unless object was found in blobovnicza.
// Otherwise, it is logged and the latest error is returned.
// FSTree | Blobovnicza | Behaviour
// found | (not tried) | return true, nil
// not found | any result | return the result
// error | found | log the error, return true, nil
// error | not found | return the error
// error | error | log the first error, return the second
if !exists {
var smallErr error
exists, smallErr = b.existsSmall(prm.addr)
if err != nil && (smallErr != nil || exists) {
b.log.Warn("error occured during object existence checking",
zap.Stringer("address", prm.addr),
zap.String("error", err.Error()))
err = nil
}
if err == nil {
err = smallErr
}
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ExistsRes{exists: exists}, err
return &ExistsRes{
exists: exists,
}, err
} }
// checks if object is presented in shallow dir. // checks if object is presented in shallow dir.
@ -55,8 +73,36 @@ func (b *BlobStor) existsBig(addr *addressSDK.Address) (bool, error) {
return err == nil, err return err == nil, err
} }
// checks if object is presented in blobovnicza. // existsSmall checks if object is presented in blobovnicza.
func (b *BlobStor) existsSmall(_ *addressSDK.Address) (bool, error) { func (b *BlobStor) existsSmall(addr *addressSDK.Address) (bool, error) {
// TODO: #1143 implement return b.blobovniczas.existsSmall(addr)
return false, nil }
func (b *blobovniczas) existsSmall(addr *addressSDK.Address) (bool, error) {
activeCache := make(map[string]struct{})
prm := new(blobovnicza.GetPrm)
prm.SetAddress(addr)
var found bool
err := b.iterateSortedLeaves(addr, func(p string) (bool, error) {
dirPath := filepath.Dir(p)
_, ok := activeCache[dirPath]
_, err := b.getObjectFromLevel(prm, p, !ok)
if err != nil {
if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from level",
zap.String("level", p),
zap.String("error", err.Error()))
}
}
activeCache[dirPath] = struct{}{}
found = err == nil
return found, nil
})
return found, err
} }

View file

@ -0,0 +1,78 @@
package blobstor
import (
"os"
"path/filepath"
"testing"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/stretchr/testify/require"
)
func TestExists(t *testing.T) {
dir, err := os.MkdirTemp("", "neofs*")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
const smallSizeLimit = 512
b := New(WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1)) // default width is 16, slow init
require.NoError(t, b.Open())
require.NoError(t, b.Init())
objects := []*objectSDK.Object{
testObject(smallSizeLimit / 2),
testObject(smallSizeLimit + 1),
}
for i := range objects {
prm := new(PutPrm)
prm.SetObject(objects[i])
_, err = b.Put(prm)
require.NoError(t, err)
}
prm := new(ExistsPrm)
for i := range objects {
prm.SetAddress(objectCore.AddressOf(objects[i]))
res, err := b.Exists(prm)
require.NoError(t, err)
require.True(t, res.Exists())
}
prm.SetAddress(testAddress())
res, err := b.Exists(prm)
require.NoError(t, err)
require.False(t, res.Exists())
t.Run("corrupt direcrory", func(t *testing.T) {
var bigDir string
de, err := os.ReadDir(dir)
require.NoError(t, err)
for i := range de {
if de[i].Name() != blobovniczaDir {
bigDir = filepath.Join(dir, de[i].Name())
break
}
}
require.NotEmpty(t, bigDir)
require.NoError(t, os.Chmod(dir, 0))
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTree.Permissions)) })
// Object exists, first error is logged.
prm.SetAddress(objectCore.AddressOf(objects[0]))
res, err := b.Exists(prm)
require.NoError(t, err)
require.True(t, res.Exists())
// Object doesn't exist, first error is returned.
prm.SetAddress(objectCore.AddressOf(objects[1]))
_, err = b.Exists(prm)
require.Error(t, err)
})
}