metabase: Remove (*DB).IterateCoveredByTombstones #1526

Merged
dstepanov-yadro merged 1 commit from a-savchuk/frostfs-node:remove-unused-metabase-method into master 2024-11-29 13:19:32 +00:00
2 changed files with 0 additions and 128 deletions

View file

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"time"
@ -111,70 +110,6 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
return nil
}
// IterateCoveredByTombstones iterates over all objects in DB which are covered
// by tombstone with string address from tss. Locked objects are not included
// (do not confuse with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
//
// Does not modify tss.
func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateCoveredByTombstones(tx, tss, h)
})
}
func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error {
bktGraveyard := tx.Bucket(graveyardBucketName)
err := bktGraveyard.ForEach(func(k, v []byte) error {
var addr oid.Address
if err := decodeAddressFromKey(&addr, v); err != nil {
return err
}
if _, ok := tss[addr.EncodeToString()]; ok {
var addr oid.Address
err := decodeAddressFromKey(&addr, k)
if err != nil {
return fmt.Errorf("could not parse address of the object under tombstone: %w", err)
}
if objectLocked(tx, addr.Container(), addr.Object()) {
return nil
}
return h(addr)
}
return nil
})
if errors.Is(err, ErrInterruptIterator) {
err = nil
}
return err
}
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
var cid cid.ID
var oid oid.ID

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -67,65 +66,3 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ objectSDK.Type, expiresAt
return object2.AddressOf(obj)
}
func TestDB_IterateCoveredByTombstones(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
cnr := cidtest.ID()
ts := oidtest.Address()
protected1 := oidtest.Address()
protected2 := oidtest.Address()
protectedLocked := oidtest.Address()
garbage := oidtest.Address()
ts.SetContainer(cnr)
protected1.SetContainer(cnr)
protected2.SetContainer(cnr)
protectedLocked.SetContainer(cnr)
var prm meta.InhumePrm
var err error
prm.SetAddresses(protected1, protected2, protectedLocked)
prm.SetTombstoneAddress(ts)
_, err = db.Inhume(context.Background(), prm)
require.NoError(t, err)
prm.SetAddresses(garbage)
prm.SetGCMark()
_, err = db.Inhume(context.Background(), prm)
require.NoError(t, err)
var handled []oid.Address
tss := map[string]oid.Address{
ts.EncodeToString(): ts,
}
err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error {
handled = append(handled, addr)
return nil
})
require.NoError(t, err)
require.Len(t, handled, 3)
require.Contains(t, handled, protected1)
require.Contains(t, handled, protected2)
require.Contains(t, handled, protectedLocked)
err = db.Lock(context.Background(), protectedLocked.Container(), oidtest.ID(), []oid.ID{protectedLocked.Object()})
require.NoError(t, err)
handled = handled[:0]
err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error {
handled = append(handled, addr)
return nil
})
require.NoError(t, err)
require.Len(t, handled, 2)
require.NotContains(t, handled, protectedLocked)
}