forked from TrueCloudLab/frostfs-node
[#378] metabase: Implement iterator over the objects under tombstones
Implement `DB.IterateCoveredByTombstones` method that iterates over graves and handles all objects under one of the tombstones. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
b961570766
commit
3d5169c4c9
2 changed files with 82 additions and 0 deletions
|
@ -109,3 +109,44 @@ func objectType(tx *bbolt.Tx, cid *container.ID, oidBytes []byte) object.Type {
|
|||
return object.TypeStorageGroup
|
||||
}
|
||||
}
|
||||
|
||||
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
||||
// by tombstone with string address from tss.
|
||||
//
|
||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
//
|
||||
// Does not modify tss.
|
||||
func (db *DB) IterateCoveredByTombstones(tss map[string]struct{}, h func(*object.Address) error) error {
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateCoveredByTombstones(tx, tss, h)
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]struct{}, h func(*object.Address) error) error {
|
||||
bktGraveyard := tx.Bucket(graveyardBucketName)
|
||||
if bktGraveyard == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := bktGraveyard.ForEach(func(k, v []byte) error {
|
||||
if _, ok := tss[string(v)]; ok {
|
||||
addr := object.NewAddress()
|
||||
|
||||
err := addr.Parse(string(k))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not parse address of the object under tombstone")
|
||||
}
|
||||
|
||||
return h(addr)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -56,3 +56,44 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ object.Type, expiresAt uin
|
|||
|
||||
return obj.Address()
|
||||
}
|
||||
|
||||
func TestDB_IterateCoveredByTombstones(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
ts := generateAddress()
|
||||
protected1 := generateAddress()
|
||||
protected2 := generateAddress()
|
||||
garbage := generateAddress()
|
||||
|
||||
prm := new(meta.InhumePrm)
|
||||
|
||||
var err error
|
||||
|
||||
_, err = db.Inhume(prm.
|
||||
WithTombstoneAddress(ts).
|
||||
WithAddresses(protected1, protected2),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = db.Inhume(prm.
|
||||
WithAddresses(garbage).
|
||||
WithGCMark(),
|
||||
)
|
||||
|
||||
var handled []*object.Address
|
||||
|
||||
tss := map[string]struct{}{
|
||||
ts.String(): {},
|
||||
}
|
||||
|
||||
err = db.IterateCoveredByTombstones(tss, func(addr *object.Address) error {
|
||||
handled = append(handled, addr)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, handled, 2)
|
||||
require.Contains(t, handled, protected1)
|
||||
require.Contains(t, handled, protected2)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue