[#230] metabase: Return SplitInfoError in Exist method

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-03 12:31:51 +03:00
parent e2de95e3f6
commit bf9e938a3b
5 changed files with 27 additions and 6 deletions

View file

@ -1,11 +1,16 @@
package meta package meta
import ( import (
"errors"
"fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
var ErrLackSplitInfo = errors.New("no split info on parent object")
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it // Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
// returns true if addr is in primary index or false if it is not. // returns true if addr is in primary index or false if it is not.
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) { func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
@ -33,7 +38,19 @@ func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err er
// if primary bucket is empty, then check if object exists in parent bucket // if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) { if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) {
return true, nil rawSplitInfo := getFromBucket(tx, rootBucketName(addr.ContainerID()), objKey)
if len(rawSplitInfo) == 0 {
return false, ErrLackSplitInfo
}
splitInfo := objectSDK.NewSplitInfo()
err := splitInfo.Unmarshal(rawSplitInfo)
if err != nil {
return false, fmt.Errorf("can't unmarshal split info from root index: %w", err)
}
return false, objectSDK.NewSplitInfoError(splitInfo)
} }
// if parent bucket is empty, then check if object exists in tombstone bucket // if parent bucket is empty, then check if object exists in tombstone bucket

View file

@ -1,6 +1,7 @@
package meta_test package meta_test
import ( import (
"errors"
"testing" "testing"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
@ -63,8 +64,9 @@ func TestDB_Exists(t *testing.T) {
err := db.Put(child.Object(), nil) err := db.Put(child.Object(), nil)
require.NoError(t, err) require.NoError(t, err)
exists, err := db.Exists(parent.Object().Address()) _, err = db.Exists(parent.Object().Address())
require.NoError(t, err)
require.True(t, exists) var expectedErr *objectSDK.SplitInfoError
require.True(t, errors.As(err, &expectedErr))
}) })
} }

View file

@ -37,7 +37,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
isParent := si != nil isParent := si != nil
exists, err := db.exists(tx, obj.Address()) exists, err := db.exists(tx, obj.Address())
if err != nil { if err != nil && !errors.As(err, &splitInfoError) {
return err return err
} }

View file

@ -289,7 +289,7 @@ func (db *DB) selectObjectID(
} }
ok, err := db.exists(tx, addr) ok, err := db.exists(tx, addr)
if err == nil && ok { if (err == nil && ok) || errors.As(err, &splitInfoError) {
markAddressInCache(to, fNum, addrStr) markAddressInCache(to, fNum, addrStr)
} }
} }

View file

@ -30,6 +30,8 @@ var (
splitPostfix = "_splitid" splitPostfix = "_splitid"
userAttributePostfix = "_attr_" userAttributePostfix = "_attr_"
splitInfoError *object.SplitInfoError // for errors.As comparisons
) )
// primaryBucketName returns <CID>. // primaryBucketName returns <CID>.