diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index c4c36ee0..82a8e7bc 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -33,13 +33,11 @@ func testAddress() *addressSDK.Address { return addr } -func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, assertErrPut, assertErrGet func(error) bool) *addressSDK.Address { +func testPutGet(t *testing.T, blz *Blobovnicza, addr *addressSDK.Address, sz uint64, assertErrPut, assertErrGet func(error) bool) *addressSDK.Address { // create binary object data := make([]byte, sz) + rand.Read(data) - addr := testAddress() - - // try to save object in Blobovnicza pPut := new(PutPrm) pPut.SetAddress(addr) pPut.SetMarshaledObject(data) @@ -106,7 +104,7 @@ func TestBlobovnicza(t *testing.T) { filled := uint64(15 * 1 << 10) // test object 15KB - addr := testPutGet(t, blz, filled, nil, nil) + addr := testPutGet(t, blz, testAddress(), filled, nil, nil) // remove the object dPrm := new(DeletePrm) @@ -120,11 +118,11 @@ func TestBlobovnicza(t *testing.T) { // fill Blobovnicza fully for ; filled < sizeLim; filled += objSizeLim { - testPutGet(t, blz, objSizeLim, nil, nil) + testPutGet(t, blz, testAddress(), objSizeLim, nil, nil) } // from now objects should not be saved - testPutGet(t, blz, 1024, func(err error) bool { + testPutGet(t, blz, testAddress(), 1024, func(err error) bool { return errors.Is(err, ErrFull) }, nil) diff --git a/pkg/local_object_storage/blobstor/exists.go b/pkg/local_object_storage/blobstor/exists.go index 1454b4bd..79f95e6f 100644 --- a/pkg/local_object_storage/blobstor/exists.go +++ b/pkg/local_object_storage/blobstor/exists.go @@ -2,9 +2,12 @@ package blobstor import ( "errors" + "path/filepath" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" ) // ExistsPrm groups the parameters of Exists operation. @@ -29,20 +32,35 @@ func (r ExistsRes) Exists() bool { func (b *BlobStor) Exists(prm *ExistsPrm) (*ExistsRes, error) { // check presence in shallow dir first (cheaper) exists, err := b.existsBig(prm.addr) - if !exists { - // TODO: #1143 do smth if err != nil - // check presence in blobovnicza - exists, err = b.existsSmall(prm.addr) + // If there was an error during existence check below, + // it will be returned unless object was found in blobovnicza. + // Otherwise, it is logged and the latest error is returned. + // FSTree | Blobovnicza | Behaviour + // found | (not tried) | return true, nil + // not found | any result | return the result + // error | found | log the error, return true, nil + // error | not found | return the error + // error | error | log the first error, return the second + if !exists { + var smallErr error + + exists, smallErr = b.existsSmall(prm.addr) + if err != nil && (smallErr != nil || exists) { + b.log.Warn("error occured during object existence checking", + zap.Stringer("address", prm.addr), + zap.String("error", err.Error())) + err = nil + } + if err == nil { + err = smallErr + } } if err != nil { return nil, err } - - return &ExistsRes{ - exists: exists, - }, err + return &ExistsRes{exists: exists}, err } // checks if object is presented in shallow dir. @@ -55,8 +73,36 @@ func (b *BlobStor) existsBig(addr *addressSDK.Address) (bool, error) { return err == nil, err } -// checks if object is presented in blobovnicza. -func (b *BlobStor) existsSmall(_ *addressSDK.Address) (bool, error) { - // TODO: #1143 implement - return false, nil +// existsSmall checks if object is presented in blobovnicza. +func (b *BlobStor) existsSmall(addr *addressSDK.Address) (bool, error) { + return b.blobovniczas.existsSmall(addr) +} + +func (b *blobovniczas) existsSmall(addr *addressSDK.Address) (bool, error) { + activeCache := make(map[string]struct{}) + + prm := new(blobovnicza.GetPrm) + prm.SetAddress(addr) + + var found bool + err := b.iterateSortedLeaves(addr, func(p string) (bool, error) { + dirPath := filepath.Dir(p) + + _, ok := activeCache[dirPath] + + _, err := b.getObjectFromLevel(prm, p, !ok) + if err != nil { + if !blobovnicza.IsErrNotFound(err) { + b.log.Debug("could not get object from level", + zap.String("level", p), + zap.String("error", err.Error())) + } + } + + activeCache[dirPath] = struct{}{} + found = err == nil + return found, nil + }) + + return found, err } diff --git a/pkg/local_object_storage/blobstor/exists_test.go b/pkg/local_object_storage/blobstor/exists_test.go new file mode 100644 index 00000000..187ae4e7 --- /dev/null +++ b/pkg/local_object_storage/blobstor/exists_test.go @@ -0,0 +1,78 @@ +package blobstor + +import ( + "os" + "path/filepath" + "testing" + + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/stretchr/testify/require" +) + +func TestExists(t *testing.T) { + dir, err := os.MkdirTemp("", "neofs*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + const smallSizeLimit = 512 + + b := New(WithRootPath(dir), + WithSmallSizeLimit(smallSizeLimit), + WithBlobovniczaShallowWidth(1)) // default width is 16, slow init + require.NoError(t, b.Open()) + require.NoError(t, b.Init()) + + objects := []*objectSDK.Object{ + testObject(smallSizeLimit / 2), + testObject(smallSizeLimit + 1), + } + + for i := range objects { + prm := new(PutPrm) + prm.SetObject(objects[i]) + _, err = b.Put(prm) + require.NoError(t, err) + } + + prm := new(ExistsPrm) + for i := range objects { + prm.SetAddress(objectCore.AddressOf(objects[i])) + + res, err := b.Exists(prm) + require.NoError(t, err) + require.True(t, res.Exists()) + } + + prm.SetAddress(testAddress()) + res, err := b.Exists(prm) + require.NoError(t, err) + require.False(t, res.Exists()) + + t.Run("corrupt direcrory", func(t *testing.T) { + var bigDir string + de, err := os.ReadDir(dir) + require.NoError(t, err) + for i := range de { + if de[i].Name() != blobovniczaDir { + bigDir = filepath.Join(dir, de[i].Name()) + break + } + } + require.NotEmpty(t, bigDir) + + require.NoError(t, os.Chmod(dir, 0)) + t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTree.Permissions)) }) + + // Object exists, first error is logged. + prm.SetAddress(objectCore.AddressOf(objects[0])) + res, err := b.Exists(prm) + require.NoError(t, err) + require.True(t, res.Exists()) + + // Object doesn't exist, first error is returned. + prm.SetAddress(objectCore.AddressOf(objects[1])) + _, err = b.Exists(prm) + require.Error(t, err) + }) +}