metabase: Remove (*DB).IterateCoveredByTombstones
#1526
2 changed files with 0 additions and 128 deletions
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -111,70 +110,6 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
|||
return nil
|
||||
}
|
||||
|
||||
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
||||
// by tombstone with string address from tss. Locked objects are not included
|
||||
// (do not confuse with objects of type LOCK).
|
||||
//
|
||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
//
|
||||
// Does not modify tss.
|
||||
func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateCoveredByTombstones(tx, tss, h)
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error {
|
||||
bktGraveyard := tx.Bucket(graveyardBucketName)
|
||||
|
||||
err := bktGraveyard.ForEach(func(k, v []byte) error {
|
||||
var addr oid.Address
|
||||
if err := decodeAddressFromKey(&addr, v); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := tss[addr.EncodeToString()]; ok {
|
||||
var addr oid.Address
|
||||
|
||||
err := decodeAddressFromKey(&addr, k)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse address of the object under tombstone: %w", err)
|
||||
}
|
||||
|
||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return h(addr)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||
var cid cid.ID
|
||||
var oid oid.ID
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -67,65 +66,3 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ objectSDK.Type, expiresAt
|
|||
|
||||
return object2.AddressOf(obj)
|
||||
}
|
||||
|
||||
func TestDB_IterateCoveredByTombstones(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ts := oidtest.Address()
|
||||
protected1 := oidtest.Address()
|
||||
protected2 := oidtest.Address()
|
||||
protectedLocked := oidtest.Address()
|
||||
garbage := oidtest.Address()
|
||||
ts.SetContainer(cnr)
|
||||
protected1.SetContainer(cnr)
|
||||
protected2.SetContainer(cnr)
|
||||
protectedLocked.SetContainer(cnr)
|
||||
|
||||
var prm meta.InhumePrm
|
||||
var err error
|
||||
|
||||
prm.SetAddresses(protected1, protected2, protectedLocked)
|
||||
prm.SetTombstoneAddress(ts)
|
||||
|
||||
_, err = db.Inhume(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
prm.SetAddresses(garbage)
|
||||
prm.SetGCMark()
|
||||
|
||||
_, err = db.Inhume(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var handled []oid.Address
|
||||
|
||||
tss := map[string]oid.Address{
|
||||
ts.EncodeToString(): ts,
|
||||
}
|
||||
|
||||
err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error {
|
||||
handled = append(handled, addr)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, handled, 3)
|
||||
require.Contains(t, handled, protected1)
|
||||
require.Contains(t, handled, protected2)
|
||||
require.Contains(t, handled, protectedLocked)
|
||||
|
||||
err = db.Lock(context.Background(), protectedLocked.Container(), oidtest.ID(), []oid.ID{protectedLocked.Object()})
|
||||
require.NoError(t, err)
|
||||
|
||||
handled = handled[:0]
|
||||
|
||||
err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error {
|
||||
handled = append(handled, addr)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, handled, 2)
|
||||
require.NotContains(t, handled, protectedLocked)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue