diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 7e1dc0e1..b8085048 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -109,3 +109,44 @@ func objectType(tx *bbolt.Tx, cid *container.ID, oidBytes []byte) object.Type { return object.TypeStorageGroup } } + +// IterateCoveredByTombstones iterates over all objects in DB which are covered +// by tombstone with string address from tss. +// +// If h returns ErrInterruptIterator, nil returns immediately. +// Returns other errors of h directly. +// +// Does not modify tss. +func (db *DB) IterateCoveredByTombstones(tss map[string]struct{}, h func(*object.Address) error) error { + return db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateCoveredByTombstones(tx, tss, h) + }) +} + +func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]struct{}, h func(*object.Address) error) error { + bktGraveyard := tx.Bucket(graveyardBucketName) + if bktGraveyard == nil { + return nil + } + + err := bktGraveyard.ForEach(func(k, v []byte) error { + if _, ok := tss[string(v)]; ok { + addr := object.NewAddress() + + err := addr.Parse(string(k)) + if err != nil { + return errors.Wrap(err, "could not parse address of the object under tombstone") + } + + return h(addr) + } + + return nil + }) + + if errors.Is(err, ErrInterruptIterator) { + err = nil + } + + return err +} diff --git a/pkg/local_object_storage/metabase/iterators_test.go b/pkg/local_object_storage/metabase/iterators_test.go index 16ea5075..f1abe01e 100644 --- a/pkg/local_object_storage/metabase/iterators_test.go +++ b/pkg/local_object_storage/metabase/iterators_test.go @@ -56,3 +56,44 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ object.Type, expiresAt uin return obj.Address() } + +func TestDB_IterateCoveredByTombstones(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + ts := generateAddress() + protected1 := generateAddress() + protected2 := generateAddress() + garbage := generateAddress() + + prm := new(meta.InhumePrm) + + var err error + + _, err = db.Inhume(prm. + WithTombstoneAddress(ts). + WithAddresses(protected1, protected2), + ) + require.NoError(t, err) + + _, err = db.Inhume(prm. + WithAddresses(garbage). + WithGCMark(), + ) + + var handled []*object.Address + + tss := map[string]struct{}{ + ts.String(): {}, + } + + err = db.IterateCoveredByTombstones(tss, func(addr *object.Address) error { + handled = append(handled, addr) + return nil + }) + require.NoError(t, err) + + require.Len(t, handled, 2) + require.Contains(t, handled, protected1) + require.Contains(t, handled, protected2) +}