forked from TrueCloudLab/frostfs-node
[#199] Make exist check on all primary buckets in metabase
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
e478c0d024
commit
f6387a9b94
2 changed files with 95 additions and 27 deletions
|
@ -9,27 +9,41 @@ import (
|
||||||
// returns true if addr is in primary index or false if it is not.
|
// returns true if addr is in primary index or false if it is not.
|
||||||
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
|
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
// check graveyard first
|
exists, err = db.exists(tx, addr)
|
||||||
if inGraveyard(tx, addr) {
|
|
||||||
return ErrAlreadyRemoved
|
|
||||||
}
|
|
||||||
|
|
||||||
// if graveyard is empty, then check if object exists in primary bucket
|
return err
|
||||||
primaryBucket := tx.Bucket(primaryBucketName(addr.ContainerID()))
|
|
||||||
if primaryBucket == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// using `get` as `exists`: https://github.com/boltdb/bolt/issues/321
|
|
||||||
val := primaryBucket.Get(objectKey(addr.ObjectID()))
|
|
||||||
exists = len(val) != 0
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err error) {
|
||||||
|
// check graveyard first
|
||||||
|
if inGraveyard(tx, addr) {
|
||||||
|
return false, ErrAlreadyRemoved
|
||||||
|
}
|
||||||
|
|
||||||
|
objKey := objectKey(addr.ObjectID())
|
||||||
|
|
||||||
|
// if graveyard is empty, then check if object exists in primary bucket
|
||||||
|
if inBucket(tx, primaryBucketName(addr.ContainerID()), objKey) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if primary bucket is empty, then check if object exists in parent bucket
|
||||||
|
if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if parent bucket is empty, then check if object exists in tombstone bucket
|
||||||
|
if inBucket(tx, tombstoneBucketName(addr.ContainerID()), objKey) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if parent bucket is empty, then check if object exists in storage group bucket
|
||||||
|
return inBucket(tx, storageGroupBucketName(addr.ContainerID()), objKey), nil
|
||||||
|
}
|
||||||
|
|
||||||
// inGraveyard returns true if object was marked as removed.
|
// inGraveyard returns true if object was marked as removed.
|
||||||
func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool {
|
func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool {
|
||||||
graveyard := tx.Bucket(graveyardBucketName)
|
graveyard := tx.Bucket(graveyardBucketName)
|
||||||
|
@ -41,3 +55,16 @@ func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool {
|
||||||
|
|
||||||
return len(tombstone) != 0
|
return len(tombstone) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inBucket checks if key <key> is present in bucket <name>.
|
||||||
|
func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
||||||
|
bkt := tx.Bucket(name)
|
||||||
|
if bkt == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// using `get` as `exists`: https://github.com/boltdb/bolt/issues/321
|
||||||
|
val := bkt.Get(key)
|
||||||
|
|
||||||
|
return len(val) != 0
|
||||||
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ package meta_test
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -11,19 +11,60 @@ func TestDB_Exists(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer releaseDB(db)
|
defer releaseDB(db)
|
||||||
|
|
||||||
raw := generateRawObject(t)
|
t.Run("no object", func(t *testing.T) {
|
||||||
addAttribute(raw, "foo", "bar")
|
nonExist := generateRawObject(t)
|
||||||
|
exists, err := db.Exists(nonExist.Object().Address())
|
||||||
obj := object.NewFromV2(raw.ToV2())
|
|
||||||
|
|
||||||
exists, err := db.Exists(obj.Address())
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, exists)
|
require.False(t, exists)
|
||||||
|
})
|
||||||
|
|
||||||
err = db.Put(obj, nil)
|
t.Run("regular object", func(t *testing.T) {
|
||||||
|
regular := generateRawObject(t)
|
||||||
|
err := db.Put(regular.Object(), nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
exists, err = db.Exists(obj.Address())
|
exists, err := db.Exists(regular.Object().Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, exists)
|
require.True(t, exists)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("tombstone object", func(t *testing.T) {
|
||||||
|
ts := generateRawObject(t)
|
||||||
|
ts.SetType(objectSDK.TypeTombstone)
|
||||||
|
|
||||||
|
err := db.Put(ts.Object(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
exists, err := db.Exists(ts.Object().Address())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, exists)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("storage group object", func(t *testing.T) {
|
||||||
|
sg := generateRawObject(t)
|
||||||
|
sg.SetType(objectSDK.TypeStorageGroup)
|
||||||
|
|
||||||
|
err := db.Put(sg.Object(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
exists, err := db.Exists(sg.Object().Address())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, exists)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("virtual object", func(t *testing.T) {
|
||||||
|
cid := testCID()
|
||||||
|
parent := generateRawObjectWithCID(t, cid)
|
||||||
|
|
||||||
|
child := generateRawObjectWithCID(t, cid)
|
||||||
|
child.SetParent(parent.Object().SDK())
|
||||||
|
child.SetParentID(parent.Object().Address().ObjectID())
|
||||||
|
|
||||||
|
err := db.Put(child.Object(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
exists, err := db.Exists(parent.Object().Address())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, exists)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue