forked from TrueCloudLab/frostfs-node
[#230] metabase: Return SplitInfoError in Exist method
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
e2de95e3f6
commit
bf9e938a3b
5 changed files with 27 additions and 6 deletions
|
@ -1,11 +1,16 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var ErrLackSplitInfo = errors.New("no split info on parent object")
|
||||
|
||||
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
||||
// returns true if addr is in primary index or false if it is not.
|
||||
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
|
||||
|
@ -33,7 +38,19 @@ func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err er
|
|||
|
||||
// if primary bucket is empty, then check if object exists in parent bucket
|
||||
if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) {
|
||||
return true, nil
|
||||
rawSplitInfo := getFromBucket(tx, rootBucketName(addr.ContainerID()), objKey)
|
||||
if len(rawSplitInfo) == 0 {
|
||||
return false, ErrLackSplitInfo
|
||||
}
|
||||
|
||||
splitInfo := objectSDK.NewSplitInfo()
|
||||
|
||||
err := splitInfo.Unmarshal(rawSplitInfo)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||
}
|
||||
|
||||
return false, objectSDK.NewSplitInfoError(splitInfo)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in tombstone bucket
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
|
@ -63,8 +64,9 @@ func TestDB_Exists(t *testing.T) {
|
|||
err := db.Put(child.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
exists, err := db.Exists(parent.Object().Address())
|
||||
require.NoError(t, err)
|
||||
require.True(t, exists)
|
||||
_, err = db.Exists(parent.Object().Address())
|
||||
|
||||
var expectedErr *objectSDK.SplitInfoError
|
||||
require.True(t, errors.As(err, &expectedErr))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
|
|||
isParent := si != nil
|
||||
|
||||
exists, err := db.exists(tx, obj.Address())
|
||||
if err != nil {
|
||||
if err != nil && !errors.As(err, &splitInfoError) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -289,7 +289,7 @@ func (db *DB) selectObjectID(
|
|||
}
|
||||
|
||||
ok, err := db.exists(tx, addr)
|
||||
if err == nil && ok {
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
markAddressInCache(to, fNum, addrStr)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ var (
|
|||
splitPostfix = "_splitid"
|
||||
|
||||
userAttributePostfix = "_attr_"
|
||||
|
||||
splitInfoError *object.SplitInfoError // for errors.As comparisons
|
||||
)
|
||||
|
||||
// primaryBucketName returns <CID>.
|
||||
|
|
Loading…
Reference in a new issue