metabase: Delete EC gc marks and split info #1257
2 changed files with 534 additions and 0 deletions
|
@ -314,6 +314,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
||||
}
|
||||
|
||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
||||
return deleteSingleResult{}, err
|
||||
}
|
||||
|
||||
return deleteSingleResult{
|
||||
Phy: true,
|
||||
Logic: removeAvailableObject,
|
||||
|
@ -476,3 +480,74 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
||||
ech := obj.ECHeader()
|
||||
if ech == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
|
||||
// drop EC parent GC mark if current EC chunk is the last one
|
||||
if !hasAnyChunks && garbageBKT != nil {
|
||||
var ecParentAddress oid.Address
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ech.Parent())
|
||||
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// also drop EC parent root info if current EC chunk is the last one
|
||||
if !hasAnyChunks {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
})
|
||||
}
|
||||
|
||||
if ech.ParentSplitParentID() == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var splitParentAddress oid.Address
|
||||
splitParentAddress.SetContainer(cnr)
|
||||
splitParentAddress.SetObject(*ech.ParentSplitParentID())
|
||||
|
||||
if ref, ok := refCounter[string(addressKey(splitParentAddress, make([]byte, addressKeySize)))]; ok {
|
||||
// linking object is already processing
|
||||
// so just inform that one more reference was deleted
|
||||
// split info and gc marks will be deleted after linking object delete
|
||||
ref.cur++
|
||||
return nil
|
||||
}
|
||||
|
||||
if parentLength(tx, splitParentAddress) > 0 {
|
||||
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
||||
return nil
|
||||
}
|
||||
|
||||
// drop split parent gc mark
|
||||
if garbageBKT != nil {
|
||||
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// drop split info
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
||||
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
objectKey(ech.Parent(), make([]byte, objectKeySize)))
|
||||
return len(data) > 0
|
||||
}
|
||||
|
|
459
pkg/local_object_storage/metabase/delete_ec_test.go
Normal file
459
pkg/local_object_storage/metabase/delete_ec_test.go
Normal file
|
@ -0,0 +1,459 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := New(
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{uint64(12)}),
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ecChunk := oidtest.ID()
|
||||
ecParent := oidtest.ID()
|
||||
tombstoneID := oidtest.ID()
|
||||
|
||||
chunkObj := testutil.GenerateObjectWithCID(cnr)
|
||||
chunkObj.SetContainerID(cnr)
|
||||
chunkObj.SetID(ecChunk)
|
||||
chunkObj.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
chunkObj.SetPayloadSize(uint64(10))
|
||||
chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent}, 0, 3, []byte{}, 0))
|
||||
|
||||
// put object with EC
|
||||
|
||||
var prm PutPrm
|
||||
prm.SetObject(chunkObj)
|
||||
prm.SetStorageID([]byte("0/0"))
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var ecChunkAddress oid.Address
|
||||
ecChunkAddress.SetContainer(cnr)
|
||||
ecChunkAddress.SetObject(ecChunk)
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecParent)
|
||||
|
||||
var getPrm GetPrm
|
||||
|
||||
getPrm.SetAddress(ecChunkAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, &ecInfoError)
|
||||
require.True(t, len(ecInfoError.ECInfo().Chunks) == 1 &&
|
||||
ecInfoError.ECInfo().Chunks[0].Index == 0 &&
|
||||
ecInfoError.ECInfo().Chunks[0].Total == 3)
|
||||
|
||||
// inhume EC parent (like Delete does)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
var tombAddress oid.Address
|
||||
tombAddress.SetContainer(cnr)
|
||||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(ecParentAddress)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
|
||||
getPrm.SetAddress(ecChunkAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
|
||||
// GC finds and deletes split, EC parent and EC chunk
|
||||
|
||||
var garbageAddresses []oid.Address
|
||||
var itPrm GarbageIterationPrm
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageAddresses = append(garbageAddresses, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 2, len(garbageAddresses))
|
||||
require.True(t, slices.Contains(garbageAddresses, ecParentAddress))
|
||||
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
|
||||
|
||||
var deletePrm DeletePrm
|
||||
deletePrm.SetAddresses(garbageAddresses...)
|
||||
_, err = db.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
garbageAddresses = nil
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageAddresses = append(garbageAddresses, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 0, len(garbageAddresses))
|
||||
|
||||
// after tombstone expired GC inhumes tombstone and drops graves
|
||||
|
||||
var tombstonedObjects []TombstonedObject
|
||||
var graveyardIterationPrm GraveyardIterationPrm
|
||||
graveyardIterationPrm.SetHandler(func(object TombstonedObject) error {
|
||||
tombstonedObjects = append(tombstonedObjects, object)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||
require.Equal(t, 2, len(tombstonedObjects))
|
||||
|
||||
var tombstones []oid.Address
|
||||
for _, tss := range tombstonedObjects {
|
||||
tombstones = append(tombstones, tss.tomb)
|
||||
}
|
||||
inhumePrm.SetAddresses(tombstones...)
|
||||
inhumePrm.SetGCMark()
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||
|
||||
// GC finds tombstone as garbage and deletes it
|
||||
|
||||
garbageAddresses = nil
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageAddresses = append(garbageAddresses, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 1, len(garbageAddresses))
|
||||
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
|
||||
|
||||
deletePrm.SetAddresses(garbageAddresses...)
|
||||
_, err = db.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
// no more objects should left as garbage
|
||||
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
require.FailNow(t, "no garbage objects should left")
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
|
||||
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
|
||||
|
||||
require.NoError(t, testCountersAreZero(db, cnr))
|
||||
}
|
||||
|
||||
func TestDeleteECObject_WithSplit(t *testing.T) {
|
||||
t.Parallel()
|
||||
for _, c := range []int{1, 2, 3} {
|
||||
for _, l := range []bool{true, false} {
|
||||
test := fmt.Sprintf("%d EC chunks with split info without linking object", c)
|
||||
if l {
|
||||
test = fmt.Sprintf("%d EC chunks with split info with linking object", c)
|
||||
}
|
||||
t.Run(test, func(t *testing.T) {
|
||||
testDeleteECObjectWithSplit(t, c, l)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
|
||||
t.Parallel()
|
||||
|
||||
db := New(
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{uint64(12)}),
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ecChunks := make([]oid.ID, chunksCount)
|
||||
for idx := range ecChunks {
|
||||
ecChunks[idx] = oidtest.ID()
|
||||
}
|
||||
ecParentID := oidtest.ID()
|
||||
splitParentID := oidtest.ID()
|
||||
tombstoneID := oidtest.ID()
|
||||
splitID := objectSDK.NewSplitID()
|
||||
linkingID := oidtest.ID()
|
||||
|
||||
ecChunkObjects := make([]*objectSDK.Object, chunksCount)
|
||||
for idx := range ecChunkObjects {
|
||||
ecChunkObjects[idx] = testutil.GenerateObjectWithCID(cnr)
|
||||
ecChunkObjects[idx].SetContainerID(cnr)
|
||||
ecChunkObjects[idx].SetID(ecChunks[idx])
|
||||
ecChunkObjects[idx].SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
ecChunkObjects[idx].SetPayloadSize(uint64(10))
|
||||
ecChunkObjects[idx].SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: ecParentID,
|
||||
SplitParentID: &splitParentID, SplitID: splitID,
|
||||
}, uint32(idx), uint32(chunksCount+1), []byte{}, 0))
|
||||
}
|
||||
|
||||
splitParentObj := testutil.GenerateObjectWithCID(cnr)
|
||||
splitParentObj.SetID(splitParentID)
|
||||
|
||||
var linkingAddress oid.Address
|
||||
linkingAddress.SetContainer(cnr)
|
||||
linkingAddress.SetObject(linkingID)
|
||||
|
||||
linkingObj := testutil.GenerateObjectWithCID(cnr)
|
||||
linkingObj.SetID(linkingID)
|
||||
linkingObj.SetParent(splitParentObj)
|
||||
linkingObj.SetParentID(splitParentID)
|
||||
linkingObj.SetChildren(ecParentID, oidtest.ID(), oidtest.ID())
|
||||
linkingObj.SetSplitID(splitID)
|
||||
|
||||
// put object with EC and split info
|
||||
|
||||
var prm PutPrm
|
||||
prm.SetStorageID([]byte("0/0"))
|
||||
for _, obj := range ecChunkObjects {
|
||||
prm.SetObject(obj)
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if withLinking {
|
||||
prm.SetObject(linkingObj)
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecParentID)
|
||||
|
||||
var getPrm GetPrm
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err := db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, &ecInfoError)
|
||||
require.True(t, len(ecInfoError.ECInfo().Chunks) == chunksCount)
|
||||
|
||||
var splitParentAddress oid.Address
|
||||
splitParentAddress.SetContainer(cnr)
|
||||
splitParentAddress.SetObject(splitParentID)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
getPrm.SetAddress(splitParentAddress)
|
||||
getPrm.SetRaw(true)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, &splitInfoError)
|
||||
require.True(t, splitInfoError.SplitInfo() != nil)
|
||||
require.Equal(t, splitID, splitInfoError.SplitInfo().SplitID())
|
||||
lastPart, set := splitInfoError.SplitInfo().LastPart()
|
||||
require.True(t, set)
|
||||
require.Equal(t, lastPart, ecParentID)
|
||||
if withLinking {
|
||||
l, ok := splitInfoError.SplitInfo().Link()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, linkingID, l)
|
||||
}
|
||||
getPrm.SetRaw(false)
|
||||
|
||||
// inhume EC parent and split objects (like Delete does)
|
||||
|
||||
inhumeAddresses := []oid.Address{splitParentAddress, ecParentAddress}
|
||||
if withLinking {
|
||||
inhumeAddresses = append(inhumeAddresses, linkingAddress)
|
||||
}
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
var tombAddress oid.Address
|
||||
tombAddress.SetContainer(cnr)
|
||||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(inhumeAddresses...)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
|
||||
getPrm.SetAddress(splitParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
|
||||
if withLinking {
|
||||
getPrm.SetAddress(linkingAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
}
|
||||
|
||||
for _, id := range ecChunks {
|
||||
var ecChunkAddress oid.Address
|
||||
ecChunkAddress.SetContainer(cnr)
|
||||
ecChunkAddress.SetObject(id)
|
||||
getPrm.SetAddress(ecChunkAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
}
|
||||
|
||||
// GC finds and deletes split, EC parent and EC chunks
|
||||
|
||||
parentCount := 2 // split + ec
|
||||
if withLinking {
|
||||
parentCount = 3
|
||||
}
|
||||
|
||||
var garbageAddresses []oid.Address
|
||||
var itPrm GarbageIterationPrm
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageAddresses = append(garbageAddresses, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, parentCount+chunksCount, len(garbageAddresses))
|
||||
require.True(t, slices.Contains(garbageAddresses, splitParentAddress))
|
||||
require.True(t, slices.Contains(garbageAddresses, ecParentAddress))
|
||||
if withLinking {
|
||||
require.True(t, slices.Contains(garbageAddresses, linkingAddress))
|
||||
}
|
||||
for _, id := range ecChunks {
|
||||
var ecChunkAddress oid.Address
|
||||
ecChunkAddress.SetContainer(cnr)
|
||||
ecChunkAddress.SetObject(id)
|
||||
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
|
||||
}
|
||||
|
||||
var deletePrm DeletePrm
|
||||
deletePrm.SetAddresses(garbageAddresses...)
|
||||
_, err = db.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var garbageStub []oid.Address
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageStub = append(garbageStub, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 0, len(garbageStub))
|
||||
|
||||
// after tombstone expired GC inhumes tombstone and drops graves
|
||||
|
||||
var tombstonedObjects []TombstonedObject
|
||||
var graveyardIterationPrm GraveyardIterationPrm
|
||||
graveyardIterationPrm.SetHandler(func(object TombstonedObject) error {
|
||||
tombstonedObjects = append(tombstonedObjects, object)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
||||
|
||||
var tombstones []oid.Address
|
||||
for _, tss := range tombstonedObjects {
|
||||
tombstones = append(tombstones, tss.tomb)
|
||||
}
|
||||
inhumePrm.SetAddresses(tombstones...)
|
||||
inhumePrm.SetGCMark()
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||
|
||||
// GC finds tombstone as garbage and deletes it
|
||||
|
||||
garbageAddresses = nil
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
garbageAddresses = append(garbageAddresses, g.Address())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 1, len(garbageAddresses))
|
||||
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
|
||||
|
||||
deletePrm.SetAddresses(garbageAddresses...)
|
||||
_, err = db.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
// no more objects should left as garbage
|
||||
|
||||
itPrm.SetHandler(func(g GarbageObject) error {
|
||||
require.FailNow(t, "no garbage objects should left")
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
|
||||
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
|
||||
|
||||
require.NoError(t, testCountersAreZero(db, cnr))
|
||||
}
|
||||
|
||||
func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
if bytes.Equal(name, shardInfoBucket) ||
|
||||
bytes.Equal(name, containerCounterBucketName) ||
|
||||
bytes.Equal(name, containerVolumeBucketName) {
|
||||
return nil
|
||||
}
|
||||
return testBucketEmpty(name, b)
|
||||
})
|
||||
}
|
||||
|
||||
func testBucketEmpty(name []byte, b *bbolt.Bucket) error {
|
||||
err := b.ForEach(func(k, v []byte) error {
|
||||
if len(v) > 0 {
|
||||
return fmt.Errorf("bucket %v is not empty", name)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.ForEachBucket(func(k []byte) error {
|
||||
return testBucketEmpty(k, b.Bucket(k))
|
||||
})
|
||||
}
|
||||
|
||||
func testCountersAreZero(db *DB, cnr cid.ID) error {
|
||||
c, err := db.ContainerCount(context.Background(), cnr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !c.IsZero() {
|
||||
return fmt.Errorf("container %s has non zero counters", cnr.EncodeToString())
|
||||
}
|
||||
s, err := db.ContainerSize(cnr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != 0 {
|
||||
return fmt.Errorf("container %s has non zero size", cnr.EncodeToString())
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue