[#230] metabase: Return SplitInfoError in Exist method

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
support/v0.27
Alex Vanin 2020-12-03 12:31:51 +03:00
parent e2de95e3f6
commit bf9e938a3b
5 changed files with 27 additions and 6 deletions

View File

@ -1,11 +1,16 @@
package meta
import (
"errors"
"fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.etcd.io/bbolt"
)
var ErrLackSplitInfo = errors.New("no split info on parent object")
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
// returns true if addr is in primary index or false if it is not.
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
@ -33,7 +38,19 @@ func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err er
// if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) {
return true, nil
rawSplitInfo := getFromBucket(tx, rootBucketName(addr.ContainerID()), objKey)
if len(rawSplitInfo) == 0 {
return false, ErrLackSplitInfo
}
splitInfo := objectSDK.NewSplitInfo()
err := splitInfo.Unmarshal(rawSplitInfo)
if err != nil {
return false, fmt.Errorf("can't unmarshal split info from root index: %w", err)
}
return false, objectSDK.NewSplitInfoError(splitInfo)
}
// if parent bucket is empty, then check if object exists in tombstone bucket

View File

@ -1,6 +1,7 @@
package meta_test
import (
"errors"
"testing"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
@ -63,8 +64,9 @@ func TestDB_Exists(t *testing.T) {
err := db.Put(child.Object(), nil)
require.NoError(t, err)
exists, err := db.Exists(parent.Object().Address())
require.NoError(t, err)
require.True(t, exists)
_, err = db.Exists(parent.Object().Address())
var expectedErr *objectSDK.SplitInfoError
require.True(t, errors.As(err, &expectedErr))
})
}

View File

@ -37,7 +37,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
isParent := si != nil
exists, err := db.exists(tx, obj.Address())
if err != nil {
if err != nil && !errors.As(err, &splitInfoError) {
return err
}

View File

@ -289,7 +289,7 @@ func (db *DB) selectObjectID(
}
ok, err := db.exists(tx, addr)
if err == nil && ok {
if (err == nil && ok) || errors.As(err, &splitInfoError) {
markAddressInCache(to, fNum, addrStr)
}
}

View File

@ -30,6 +30,8 @@ var (
splitPostfix = "_splitid"
userAttributePostfix = "_attr_"
splitInfoError *object.SplitInfoError // for errors.As comparisons
)
// primaryBucketName returns <CID>.