forked from TrueCloudLab/frostfs-node
[#128] metabase: Implement Delete method
Implement Delete method on DB structure that adds deleted addresses to tombstone index. Do not attach addresses from tombstone index to Select result. Return error from Get method if address is presented in tombstone index. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
2d319aa29c
commit
20ed7c0d61
4 changed files with 105 additions and 9 deletions
|
@ -26,16 +26,32 @@ func testSelect(t *testing.T, db *DB, fs objectSDK.SearchFilters, exp ...*object
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCID() *container.ID {
|
||||||
|
cs := [sha256.Size]byte{}
|
||||||
|
rand.Read(cs[:])
|
||||||
|
|
||||||
|
id := container.NewID()
|
||||||
|
id.SetSHA256(cs)
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
func testOID() *objectSDK.ID {
|
||||||
|
cs := [sha256.Size]byte{}
|
||||||
|
rand.Read(cs[:])
|
||||||
|
|
||||||
|
id := objectSDK.NewID()
|
||||||
|
id.SetSHA256(cs)
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
func TestDB(t *testing.T) {
|
func TestDB(t *testing.T) {
|
||||||
version := pkg.NewVersion()
|
version := pkg.NewVersion()
|
||||||
version.SetMajor(2)
|
version.SetMajor(2)
|
||||||
version.SetMinor(1)
|
version.SetMinor(1)
|
||||||
|
|
||||||
cs := [sha256.Size]byte{}
|
cid := testCID()
|
||||||
rand.Read(cs[:])
|
|
||||||
|
|
||||||
cid := container.NewID()
|
|
||||||
cid.SetSHA256(cs)
|
|
||||||
|
|
||||||
w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey)
|
w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -43,9 +59,7 @@ func TestDB(t *testing.T) {
|
||||||
ownerID := owner.NewID()
|
ownerID := owner.NewID()
|
||||||
ownerID.SetNeo3Wallet(w)
|
ownerID.SetNeo3Wallet(w)
|
||||||
|
|
||||||
rand.Read(cs[:])
|
oid := testOID()
|
||||||
oid := objectSDK.NewID()
|
|
||||||
oid.SetSHA256(cs)
|
|
||||||
|
|
||||||
obj := object.NewRaw()
|
obj := object.NewRaw()
|
||||||
obj.SetID(oid)
|
obj.SetID(oid)
|
||||||
|
@ -100,3 +114,42 @@ func TestDB(t *testing.T) {
|
||||||
fs.AddFilter(k, v+"1", objectSDK.MatchStringEqual)
|
fs.AddFilter(k, v+"1", objectSDK.MatchStringEqual)
|
||||||
testSelect(t, db, fs)
|
testSelect(t, db, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDB_Delete(t *testing.T) {
|
||||||
|
path := "test.db"
|
||||||
|
|
||||||
|
bdb, err := bbolt.Open(path, 0600, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
bdb.Close()
|
||||||
|
os.Remove(path)
|
||||||
|
}()
|
||||||
|
|
||||||
|
db := NewDB(bdb)
|
||||||
|
|
||||||
|
obj := object.NewRaw()
|
||||||
|
obj.SetContainerID(testCID())
|
||||||
|
obj.SetID(testOID())
|
||||||
|
|
||||||
|
o := obj.Object()
|
||||||
|
|
||||||
|
require.NoError(t, db.Put(o))
|
||||||
|
|
||||||
|
addr := o.Address()
|
||||||
|
|
||||||
|
_, err = db.Get(addr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, o.GetContainerID())
|
||||||
|
|
||||||
|
testSelect(t, db, fs, o.Address())
|
||||||
|
|
||||||
|
require.NoError(t, db.Delete(addr))
|
||||||
|
|
||||||
|
_, err = db.Get(addr)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
testSelect(t, db, fs)
|
||||||
|
}
|
||||||
|
|
31
pkg/local_object_storage/metabase/delete.go
Normal file
31
pkg/local_object_storage/metabase/delete.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tombstoneBucket = []byte("tombstones")
|
||||||
|
|
||||||
|
// Delete marks object as deleted.
|
||||||
|
func (db *DB) Delete(addr *object.Address) error {
|
||||||
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
bucket, err := tx.CreateBucketIfNotExists(tombstoneBucket)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "(%T) could not create tombstone bucket", db)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Put(addressKey(addr), nil); err != nil {
|
||||||
|
return errors.Wrapf(err, "(%T) could not put to tombstone bucket", db)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectRemoved(tx *bbolt.Tx, addr []byte) bool {
|
||||||
|
tombstoneBucket := tx.Bucket(tombstoneBucket)
|
||||||
|
|
||||||
|
return tombstoneBucket != nil && tombstoneBucket.Get(addr) != nil
|
||||||
|
}
|
|
@ -15,12 +15,19 @@ func (db *DB) Get(addr *objectSDK.Address) (*object.Object, error) {
|
||||||
var obj *object.Object
|
var obj *object.Object
|
||||||
|
|
||||||
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
addrKey := addressKey(addr)
|
||||||
|
|
||||||
|
// check if object marked as deleted
|
||||||
|
if objectRemoved(tx, addrKey) {
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
primaryBucket := tx.Bucket(primaryBucket)
|
primaryBucket := tx.Bucket(primaryBucket)
|
||||||
if primaryBucket == nil {
|
if primaryBucket == nil {
|
||||||
return errNotFound
|
return errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
data := primaryBucket.Get(addressKey(addr))
|
data := primaryBucket.Get(addrKey)
|
||||||
if data == nil {
|
if data == nil {
|
||||||
return errNotFound
|
return errNotFound
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,11 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if object marked as deleted
|
||||||
|
if objectRemoved(tx, k) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
addr := object.NewAddress()
|
addr := object.NewAddress()
|
||||||
if err := addr.Parse(string(k)); err != nil {
|
if err := addr.Parse(string(k)); err != nil {
|
||||||
// TODO: storage was broken, so we need to handle it
|
// TODO: storage was broken, so we need to handle it
|
||||||
|
|
Loading…
Reference in a new issue