metabase: optimize Inhume operation #1385

Open
dstepanov-yadro wants to merge 3 commits from dstepanov-yadro/frostfs-node:fix/metabase_inhume_hangs into master
20 changed files with 126 additions and 77 deletions

View file

@ -199,7 +199,9 @@ func TestLockExpiration(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) tombAddr := oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) _, err = e.Inhume(context.Background(), inhumePrm)
@ -209,7 +211,9 @@ func TestLockExpiration(t *testing.T) {
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4. // 4.
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) tombAddr = oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm) _, err = e.Inhume(context.Background(), inhumePrm)

View file

@ -67,7 +67,7 @@ func TestDB_Containers(t *testing.T) {
assertContains(cnrs, cnr) assertContains(cnrs, cnr)
require.NoError(t, metaInhume(db, object.AddressOf(obj), oidtest.Address())) require.NoError(t, metaInhume(db, object.AddressOf(obj), oidtest.ID()))
cnrs, err = db.Containers(context.Background()) cnrs, err = db.Containers(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -164,7 +164,7 @@ func TestDB_ContainerSize(t *testing.T) {
require.NoError(t, metaInhume( require.NoError(t, metaInhume(
db, db,
object.AddressOf(obj), object.AddressOf(obj),
oidtest.Address(), oidtest.ID(),
)) ))
volume -= int(obj.PayloadSize()) volume -= int(obj.PayloadSize())

View file

@ -41,7 +41,7 @@ func TestReset(t *testing.T) {
err = putBig(db, obj) err = putBig(db, obj)
require.NoError(t, err) require.NoError(t, err)
err = metaInhume(db, addrToInhume, oidtest.Address()) err = metaInhume(db, addrToInhume, oidtest.ID())
require.NoError(t, err) require.NoError(t, err)
assertExists(addr, true, nil) assertExists(addr, true, nil)

View file

@ -654,7 +654,7 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrReadOnlyMode return ErrReadOnlyMode
} }
err := db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName) b := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize) key := make([]byte, cidSize)
@ -737,7 +737,7 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrReadOnlyMode return ErrReadOnlyMode
} }
err := db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName) b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize) key := make([]byte, cidSize)

View file

@ -156,13 +156,18 @@ func TestCounters(t *testing.T) {
} }
var prm meta.InhumePrm var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address()) for _, o := range inhumedObjs {
prm.SetAddresses(inhumedObjs...) tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(tombAddr)
prm.SetAddresses(o)
res, err := db.Inhume(context.Background(), prm) res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed()) require.Equal(t, uint64(1), res.LogicInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed()) require.Equal(t, uint64(1), res.UserInhumed())
}
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
@ -296,11 +301,16 @@ func TestCounters(t *testing.T) {
} }
var prm meta.InhumePrm var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address()) for _, o := range inhumedObjs {
prm.SetAddresses(inhumedObjs...) tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(tombAddr)
prm.SetAddresses(o)
_, err := db.Inhume(context.Background(), prm) _, err := db.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)

View file

@ -112,7 +112,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
var err error var err error
var res DeleteRes var res DeleteRes
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
res, err = db.deleteGroup(tx, prm.addrs) res, err = db.deleteGroup(tx, prm.addrs)
return err return err
}) })

View file

@ -40,12 +40,12 @@ func TestDB_Delete(t *testing.T) {
// inhume parent and child so they will be on graveyard // inhume parent and child so they will be on graveyard
ts := testutil.GenerateObjectWithCID(cnr) ts := testutil.GenerateObjectWithCID(cnr)
err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts)) err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts).Object())
require.NoError(t, err) require.NoError(t, err)
ts = testutil.GenerateObjectWithCID(cnr) ts = testutil.GenerateObjectWithCID(cnr)
err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts)) err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts).Object())
require.NoError(t, err) require.NoError(t, err)
// delete object // delete object
@ -108,7 +108,7 @@ func TestGraveOnlyDelete(t *testing.T) {
addr := oidtest.Address() addr := oidtest.Address()
// inhume non-existent object by address // inhume non-existent object by address
require.NoError(t, metaInhume(db, addr, oidtest.Address())) require.NoError(t, metaInhume(db, addr, oidtest.ID()))
// delete the object data // delete the object data
require.NoError(t, metaDelete(db, addr)) require.NoError(t, metaDelete(db, addr))

View file

@ -37,7 +37,7 @@ func TestDB_Exists(t *testing.T) {
require.True(t, exists) require.True(t, exists)
t.Run("removed object", func(t *testing.T) { t.Run("removed object", func(t *testing.T) {
err := metaInhume(db, object.AddressOf(regular), oidtest.Address()) err := metaInhume(db, object.AddressOf(regular), oidtest.ID())
require.NoError(t, err) require.NoError(t, err)
exists, err := metaExists(db, object.AddressOf(regular)) exists, err := metaExists(db, object.AddressOf(regular))

View file

@ -150,9 +150,8 @@ func TestDB_Get(t *testing.T) {
t.Run("get removed object", func(t *testing.T) { t.Run("get removed object", func(t *testing.T) {
obj := oidtest.Address() obj := oidtest.Address()
ts := oidtest.Address()
require.NoError(t, metaInhume(db, obj, ts)) require.NoError(t, metaInhume(db, obj, oidtest.ID()))
_, err := metaGet(db, obj, false) _, err := metaGet(db, obj, false)
require.True(t, client.IsErrObjectAlreadyRemoved(err)) require.True(t, client.IsErrObjectAlreadyRemoved(err))

View file

@ -282,7 +282,7 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
return db.boltDB.Update(func(tx *bbolt.Tx) error { return db.boltDB.Batch(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(graveyardBucketName) bkt := tx.Bucket(graveyardBucketName)
if bkt == nil { if bkt == nil {
return nil return nil

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -114,11 +115,12 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 4 objects // generate and put 4 objects
obj1 := testutil.GenerateObject() obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObject() obj2 := testutil.GenerateObjectWithCID(cnr)
obj3 := testutil.GenerateObject() obj3 := testutil.GenerateObjectWithCID(cnr)
obj4 := testutil.GenerateObject() obj4 := testutil.GenerateObjectWithCID(cnr)
var err error var err error
@ -138,6 +140,7 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
// inhume with tombstone // inhume with tombstone
addrTombstone := oidtest.Address() addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.SetTombstoneAddress(addrTombstone) inhumePrm.SetTombstoneAddress(addrTombstone)
@ -201,11 +204,12 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 4 objects // generate and put 4 objects
obj1 := testutil.GenerateObject() obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObject() obj2 := testutil.GenerateObjectWithCID(cnr)
obj3 := testutil.GenerateObject() obj3 := testutil.GenerateObjectWithCID(cnr)
obj4 := testutil.GenerateObject() obj4 := testutil.GenerateObjectWithCID(cnr)
var err error var err error
@ -223,6 +227,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
// inhume with tombstone // inhume with tombstone
addrTombstone := oidtest.Address() addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses( inhumePrm.SetAddresses(
@ -392,9 +397,10 @@ func TestDB_DropGraves(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 2 objects // generate and put 2 objects
obj1 := testutil.GenerateObject() obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObject() obj2 := testutil.GenerateObjectWithCID(cnr)
var err error var err error
@ -406,6 +412,7 @@ func TestDB_DropGraves(t *testing.T) {
// inhume with tombstone // inhume with tombstone
addrTombstone := oidtest.Address() addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))

View file

@ -143,6 +143,20 @@ func (p *InhumePrm) SetForceGCMark() {
p.forceRemoval = true p.forceRemoval = true
} }
func (p *InhumePrm) validate() error {
if p == nil {
return nil
}
if p.tomb != nil {
for _, addr := range p.target {
if addr.Container() != p.tomb.Container() {
return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb)
}
}
}
return nil
}
var errBreakBucketForEach = errors.New("bucket ForEach break") var errBreakBucketForEach = errors.New("bucket ForEach break")
// ErrLockObjectRemoval is returned when inhume operation is being // ErrLockObjectRemoval is returned when inhume operation is being
@ -171,6 +185,10 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
if err := prm.validate(); err != nil {
return InhumeRes{}, err
}
if db.mode.NoMetabase() { if db.mode.NoMetabase() {
return InhumeRes{}, ErrDegradedMode return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() { } else if db.mode.ReadOnly() {
@ -181,7 +199,7 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
inhumedByCnrID: make(map[cid.ID]ObjectCounters), inhumedByCnrID: make(map[cid.ID]ObjectCounters),
} }
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err := db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res) return db.inhumeTx(tx, currEpoch, prm, &res)
}) })
success = err == nil success = err == nil
@ -359,11 +377,8 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
return targetBucket, value, nil return targetBucket, value, nil
} }
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) { func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte) (bool, error) {
targetIsTomb, err := isTomb(graveyardBKT, key) targetIsTomb := isTomb(graveyardBKT, addressKey)
if err != nil {
return false, err
}
// do not add grave if target is a tombstone // do not add grave if target is a tombstone
if targetIsTomb { if targetIsTomb {
@ -372,7 +387,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
// if tombstone appears object must be // if tombstone appears object must be
// additionally marked with GC // additionally marked with GC
return false, garbageBKT.Put(key, zeroValue) return false, garbageBKT.Put(addressKey, zeroValue)
} }
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
@ -392,25 +407,21 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
return nil return nil
} }
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) { func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
targetIsTomb := false targetIsTomb := false
// iterate over graveyard and check if target address // iterate over graveyard and check if target address
// is the address of tombstone in graveyard. // is the address of tombstone in graveyard.
err := graveyardBucket.ForEach(func(_, v []byte) error { // tombstone must have the same container ID as key.
c := graveyardBucket.Cursor()
containerPrefix := addressKey[:cidSize]
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
// check if graveyard has record with key corresponding // check if graveyard has record with key corresponding
// to tombstone address (at least one) // to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, key) targetIsTomb = bytes.Equal(v, addressKey)
if targetIsTomb { if targetIsTomb {
// break bucket iterator break
return errBreakBucketForEach
} }
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return false, err
} }
return targetIsTomb, nil return targetIsTomb
} }

View file

@ -9,6 +9,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -21,12 +22,10 @@ func TestDB_Inhume(t *testing.T) {
raw := testutil.GenerateObject() raw := testutil.GenerateObject()
testutil.AddAttribute(raw, "foo", "bar") testutil.AddAttribute(raw, "foo", "bar")
tombstoneID := oidtest.Address()
err := putBig(db, raw) err := putBig(db, raw)
require.NoError(t, err) require.NoError(t, err)
err = metaInhume(db, object.AddressOf(raw), tombstoneID) err = metaInhume(db, object.AddressOf(raw), oidtest.ID())
require.NoError(t, err) require.NoError(t, err)
_, err = metaExists(db, object.AddressOf(raw)) _, err = metaExists(db, object.AddressOf(raw))
@ -43,13 +42,20 @@ func TestInhumeTombOnTomb(t *testing.T) {
var ( var (
err error err error
cnr = cidtest.ID()
addr1 = oidtest.Address() addr1 = oidtest.Address()
addr2 = oidtest.Address() addr2 = oidtest.Address()
addr3 = oidtest.Address() addr3 = oidtest.Address()
addr4 = oidtest.Address()
inhumePrm meta.InhumePrm inhumePrm meta.InhumePrm
existsPrm meta.ExistsPrm existsPrm meta.ExistsPrm
) )
addr1.SetContainer(cnr)
addr2.SetContainer(cnr)
addr3.SetContainer(cnr)
addr4.SetContainer(cnr)
inhumePrm.SetAddresses(addr1) inhumePrm.SetAddresses(addr1)
inhumePrm.SetTombstoneAddress(addr2) inhumePrm.SetTombstoneAddress(addr2)
@ -84,7 +90,7 @@ func TestInhumeTombOnTomb(t *testing.T) {
require.True(t, client.IsErrObjectAlreadyRemoved(err)) require.True(t, client.IsErrObjectAlreadyRemoved(err))
inhumePrm.SetAddresses(addr1) inhumePrm.SetAddresses(addr1)
inhumePrm.SetTombstoneAddress(oidtest.Address()) inhumePrm.SetTombstoneAddress(addr4)
// try to inhume addr1 (which is already a tombstone in graveyard) // try to inhume addr1 (which is already a tombstone in graveyard)
_, err = db.Inhume(context.Background(), inhumePrm) _, err = db.Inhume(context.Background(), inhumePrm)
@ -117,10 +123,13 @@ func TestInhumeLocked(t *testing.T) {
require.ErrorAs(t, err, &e) require.ErrorAs(t, err, &e)
} }
func metaInhume(db *meta.DB, target, tomb oid.Address) error { func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error {
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(target) inhumePrm.SetAddresses(target)
inhumePrm.SetTombstoneAddress(tomb) var tombAddr oid.Address
tombAddr.SetContainer(target.Container())
tombAddr.SetObject(tomb)
inhumePrm.SetTombstoneAddress(tombAddr)
_, err := db.Inhume(context.Background(), inhumePrm) _, err := db.Inhume(context.Background(), inhumePrm)
return err return err

View file

@ -9,6 +9,7 @@ import (
object2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" object2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -71,11 +72,16 @@ func TestDB_IterateCoveredByTombstones(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
ts := oidtest.Address() ts := oidtest.Address()
protected1 := oidtest.Address() protected1 := oidtest.Address()
protected2 := oidtest.Address() protected2 := oidtest.Address()
protectedLocked := oidtest.Address() protectedLocked := oidtest.Address()
garbage := oidtest.Address() garbage := oidtest.Address()
ts.SetContainer(cnr)
protected1.SetContainer(cnr)
protected2.SetContainer(cnr)
protectedLocked.SetContainer(cnr)
var prm meta.InhumePrm var prm meta.InhumePrm
var err error var err error

View file

@ -110,7 +110,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
err = putBig(db, obj) err = putBig(db, obj)
require.NoError(t, err) require.NoError(t, err)
ts := testutil.GenerateObjectWithCID(containerID) ts := testutil.GenerateObjectWithCID(containerID)
err = metaInhume(db, object.AddressOf(obj), object.AddressOf(ts)) err = metaInhume(db, object.AddressOf(obj), object.AddressOf(ts).Object())
require.NoError(t, err) require.NoError(t, err)
// add one child object (do not include parent into expected) // add one child object (do not include parent into expected)

View file

@ -78,7 +78,7 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
} }
key := make([]byte, cidSize) key := make([]byte, cidSize)
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error { return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error {
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular { if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
return logicerr.Wrap(new(apistatus.LockNonRegularObject)) return logicerr.Wrap(new(apistatus.LockNonRegularObject))
} }
@ -143,7 +143,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
var unlockedObjects []oid.Address var unlockedObjects []oid.Address
if err := db.boltDB.Update(func(tx *bbolt.Tx) error { if err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
for i := range lockers { for i := range lockers {
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
if err != nil { if err != nil {

View file

@ -73,7 +73,9 @@ func TestDB_Lock(t *testing.T) {
_, err := db.Inhume(context.Background(), inhumePrm) _, err := db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
inhumePrm.SetTombstoneAddress(oidtest.Address()) tombAddr := oidtest.Address()
tombAddr.SetContainer(objAddr.Container())
inhumePrm.SetTombstoneAddress(tombAddr)
_, err = db.Inhume(context.Background(), inhumePrm) _, err = db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
@ -89,7 +91,9 @@ func TestDB_Lock(t *testing.T) {
_, err = db.Inhume(context.Background(), inhumePrm) _, err = db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
inhumePrm.SetTombstoneAddress(oidtest.Address()) tombAddr = oidtest.Address()
tombAddr.SetContainer(objAddr.Container())
inhumePrm.SetTombstoneAddress(tombAddr)
_, err = db.Inhume(context.Background(), inhumePrm) _, err = db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
}) })
@ -103,7 +107,7 @@ func TestDB_Lock(t *testing.T) {
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
// try to inhume locked object using tombstone // try to inhume locked object using tombstone
err := metaInhume(db, objAddr, lockAddr) err := metaInhume(db, objAddr, lockAddr.Object())
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// free locked object // free locked object

View file

@ -352,11 +352,7 @@ func TestDB_SelectInhume(t *testing.T) {
object.AddressOf(raw2), object.AddressOf(raw2),
) )
var tombstone oid.Address err = metaInhume(db, object.AddressOf(raw2), oidtest.ID())
tombstone.SetContainer(cnr)
tombstone.SetObject(oidtest.ID())
err = metaInhume(db, object.AddressOf(raw2), tombstone)
require.NoError(t, err) require.NoError(t, err)
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}

View file

@ -43,7 +43,7 @@ func TestDB_StorageID(t *testing.T) {
cnrID, ok := deleted.ContainerID() cnrID, ok := deleted.ContainerID()
require.True(t, ok) require.True(t, ok)
ts := testutil.GenerateObjectWithCID(cnrID) ts := testutil.GenerateObjectWithCID(cnrID)
require.NoError(t, metaInhume(db, object.AddressOf(deleted), object.AddressOf(ts))) require.NoError(t, metaInhume(db, object.AddressOf(deleted), object.AddressOf(ts).Object()))
// check StorageID for object without storageID // check StorageID for object without storageID
fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw2)) fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw2))

View file

@ -17,6 +17,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -308,17 +309,19 @@ func TestCounters(t *testing.T) {
t.Run("inhume_TS", func(t *testing.T) { t.Run("inhume_TS", func(t *testing.T) {
var prm InhumePrm var prm InhumePrm
ts := objectcore.AddressOf(testutil.GenerateObject())
phy := mm.getObjectCounter(physical) phy := mm.getObjectCounter(physical)
logic := mm.getObjectCounter(logical) logic := mm.getObjectCounter(logical)
custom := mm.getObjectCounter(user) custom := mm.getObjectCounter(user)
inhumedNumber := int(phy / 4) inhumedNumber := int(phy / 4)
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) for _, o := range addrFromObjs(oo[:inhumedNumber]) {
ts := oidtest.Address()
ts.SetContainer(o.Container())
prm.SetTarget(ts, o)
_, err := sh.Inhume(context.Background(), prm) _, err := sh.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}
for i := range inhumedNumber { for i := range inhumedNumber {
cid, ok := oo[i].ContainerID() cid, ok := oo[i].ContainerID()