[#395] metabase: Do not add tombstone-on-tombstone records to graveyard
Creating tombstones for tombstones is prohibited in NeoFS system. Metabase graveyard contains records of the form {address: address}: key is an address of inhumed object, value is an address of the tombstone. To prevent creation tombstones for tombstones metabase must control incoming Inhume calls: * if Inhume target is a tombstone, then "grave" should not be added; * if {a1:a2} "grave" was created earlier and {a2: a3} "grave" came later, then first "grave" must be removed as tomb-on-tomb. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
bc5e04f502
commit
3ed0065455
2 changed files with 108 additions and 5 deletions
|
@ -1,7 +1,10 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -61,6 +64,8 @@ func Inhume(db *DB, target, tomb *objectSDK.Address) error {
|
||||||
|
|
||||||
const inhumeGCMarkValue = "GCMARK"
|
const inhumeGCMarkValue = "GCMARK"
|
||||||
|
|
||||||
|
var errBreakBucketForEach = errors.New("bucket ForEach break")
|
||||||
|
|
||||||
// Inhume marks objects as removed but not removes it from metabase.
|
// Inhume marks objects as removed but not removes it from metabase.
|
||||||
func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
@ -69,6 +74,25 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tombKey []byte
|
||||||
|
if prm.tomb != nil {
|
||||||
|
tombKey = addressKey(prm.tomb)
|
||||||
|
|
||||||
|
// it is forbidden to have a tomb-on-tomb in NeoFS,
|
||||||
|
// so graveyard keys must not be addresses of tombstones
|
||||||
|
|
||||||
|
// tombstones can be marked for GC in graveyard, so exclude this case
|
||||||
|
data := graveyard.Get(tombKey)
|
||||||
|
if data != nil && !bytes.Equal(data, []byte(inhumeGCMarkValue)) {
|
||||||
|
err := graveyard.Delete(tombKey)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "could not remove grave with tombstone key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tombKey = []byte(inhumeGCMarkValue)
|
||||||
|
}
|
||||||
|
|
||||||
for i := range prm.target {
|
for i := range prm.target {
|
||||||
obj, err := db.get(tx, prm.target[i], false, true)
|
obj, err := db.get(tx, prm.target[i], false, true)
|
||||||
|
|
||||||
|
@ -86,15 +110,37 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var val []byte
|
targetKey := addressKey(prm.target[i])
|
||||||
|
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
val = addressKey(prm.tomb)
|
targetIsTomb := false
|
||||||
} else {
|
|
||||||
val = []byte(inhumeGCMarkValue)
|
// iterate over graveyard and check if target address
|
||||||
|
// is the address of tombstone in graveyard.
|
||||||
|
err = graveyard.ForEach(func(k, v []byte) error {
|
||||||
|
// check if graveyard has record with key corresponding
|
||||||
|
// to tombstone address (at least one)
|
||||||
|
targetIsTomb = bytes.Equal(v, targetKey)
|
||||||
|
|
||||||
|
if targetIsTomb {
|
||||||
|
// break bucket iterator
|
||||||
|
return errBreakBucketForEach
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil && !errors.Is(err, errBreakBucketForEach) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not add grave if target is a tombstone
|
||||||
|
if targetIsTomb {
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
// consider checking if target is already in graveyard?
|
||||||
err = graveyard.Put(addressKey(prm.target[i]), val)
|
err = graveyard.Put(targetKey, tombKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -29,3 +30,59 @@ func TestDB_Inhume(t *testing.T) {
|
||||||
_, err = meta.Get(db, raw.Object().Address())
|
_, err = meta.Get(db, raw.Object().Address())
|
||||||
require.EqualError(t, err, object.ErrAlreadyRemoved.Error())
|
require.EqualError(t, err, object.ErrAlreadyRemoved.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInhumeTombOnTomb(t *testing.T) {
|
||||||
|
db := newDB(t)
|
||||||
|
defer releaseDB(db)
|
||||||
|
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
|
||||||
|
addr1 = generateAddress()
|
||||||
|
addr2 = generateAddress()
|
||||||
|
addr3 = generateAddress()
|
||||||
|
inhumePrm = new(meta.InhumePrm)
|
||||||
|
existsPrm = new(meta.ExistsPrm)
|
||||||
|
)
|
||||||
|
|
||||||
|
// inhume addr1 via addr2
|
||||||
|
_, err = db.Inhume(inhumePrm.
|
||||||
|
WithAddresses(addr1).
|
||||||
|
WithTombstoneAddress(addr2),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// addr1 should become inhumed {addr1:addr2}
|
||||||
|
_, err = db.Exists(existsPrm.WithAddress(addr1))
|
||||||
|
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
|
||||||
|
|
||||||
|
// try to inhume addr3 via addr1
|
||||||
|
_, err = db.Inhume(inhumePrm.
|
||||||
|
WithAddresses(addr3).
|
||||||
|
WithTombstoneAddress(addr1),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// record with {addr1:addr2} should be removed from graveyard
|
||||||
|
// as a tomb-on-tomb
|
||||||
|
res, err := db.Exists(existsPrm.WithAddress(addr1))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, res.Exists())
|
||||||
|
|
||||||
|
// addr3 should be inhumed {addr3: addr1}
|
||||||
|
_, err = db.Exists(existsPrm.WithAddress(addr3))
|
||||||
|
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
|
||||||
|
|
||||||
|
// try to inhume addr1 (which is already a tombstone in graveyard)
|
||||||
|
_, err = db.Inhume(inhumePrm.
|
||||||
|
WithAddresses(addr1).
|
||||||
|
WithTombstoneAddress(generateAddress()),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// record with addr1 key should not appear in graveyard
|
||||||
|
// (tomb can not be inhumed)
|
||||||
|
res, err = db.Exists(existsPrm.WithAddress(addr1))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, res.Exists())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue