[#332] gc: Fix expired complex object deletion

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-05-17 12:56:59 +03:00 committed by Evgenii Stratonikov
parent ceb9deb7f1
commit 9bda6e0b8b
8 changed files with 211 additions and 65 deletions

View file

@ -67,6 +67,7 @@ Changelog for FrostFS Node
- Iterating over just removed files by FSTree (#98) - Iterating over just removed files by FSTree (#98)
- Parts of a locked object could not be removed anymore (#141) - Parts of a locked object could not be removed anymore (#141)
- Non-alphabet nodes do not try to handle alphabet events (#181) - Non-alphabet nodes do not try to handle alphabet events (#181)
- Delete complex objects with GC (#332)
### Removed ### Removed
### Updated ### Updated
@ -87,7 +88,7 @@ You need to change configuration environment variables to `FROSTFS_*` if you use
New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime
more appropriate for a specific deployment. more appropriate for a specific deployment.
Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__` Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__`
(existed objects with old attributes will be treated as before, but for new objects new attributes will be used). (existed objects with old attributes will be treated as before, but for new objects new attributes will be used).
## Older versions ## Older versions

View file

@ -0,0 +1,57 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> empty slice value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
if _, found := result[addr]; found {
continue
}
result[addr] = []oid.Address{}
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer))
if bkt == nil {
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer)))
if err != nil {
return err
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -57,6 +57,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
writecache.WithMaxObjectSize(wcBigObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize),
writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
}, },
nil,
nil) nil)
} }
defer releaseShard(sh, t) defer releaseShard(sh, t)
@ -188,7 +189,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
require.Error(t, err) require.Error(t, err)
t.Run("skip errors", func(t *testing.T) { t.Run("skip errors", func(t *testing.T) {
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil) sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil, nil)
t.Cleanup(func() { require.NoError(t, sh.Close()) }) t.Cleanup(func() { require.NoError(t, sh.Close()) })
var restorePrm shard.RestorePrm var restorePrm shard.RestorePrm
@ -219,10 +220,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
} }
func TestStream(t *testing.T) { func TestStream(t *testing.T) {
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil) sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil, nil)
defer releaseShard(sh1, t) defer releaseShard(sh1, t)
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil) sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil, nil)
defer releaseShard(sh2, t) defer releaseShard(sh2, t)
const objCount = 5 const objCount = 5
@ -323,7 +324,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
writecache.WithSmallObjectSize(wcSmallObjectSize), writecache.WithSmallObjectSize(wcSmallObjectSize),
writecache.WithMaxObjectSize(wcBigObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize),
} }
sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2)) sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2), nil)
objects := make([]*objectSDK.Object, objCount) objects := make([]*objectSDK.Object, objCount)
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
@ -371,7 +372,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0)) require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0))
} }
sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3)) sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3), nil)
require.NoError(t, sh.SetMode(mode.ReadOnly)) require.NoError(t, sh.SetMode(mode.ReadOnly))
{ {

View file

@ -313,6 +313,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
expired, err := s.getExpiredWithLinked(expired)
if err != nil {
s.log.Warn("failed to get expired objects with linked", zap.Error(err))
return
}
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...) inhumePrm.SetAddresses(expired...)
@ -338,6 +344,20 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
} }
} }
func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))

View file

@ -2,77 +2,30 @@ package shard_test
import ( import (
"context" "context"
"path/filepath" "errors"
"testing" "testing"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
) )
func Test_GCDropsLockedExpiredObject(t *testing.T) { func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
var sh *shard.Shard t.Parallel()
epoch := &epochState{ epoch := &epochState{
Value: 100, Value: 100,
} }
rootPath := t.TempDir() sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(2),
blobovniczatree.WithBlobovniczaShallowWidth(2)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
releaseShard(sh, t) releaseShard(sh, t)
@ -120,3 +73,97 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
return shard.IsErrNotFound(err) return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted") }, 3*time.Second, 1*time.Second, "expired object must be deleted")
} }
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
parent.SetAttributes(objExpirationAttr)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
linkID, _ := link.ID()
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
})
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm shard.PutPrm
for _, child := range children {
putPrm.SetObject(child)
_, err := sh.Put(putPrm)
require.NoError(t, err)
}
putPrm.SetObject(link)
_, err := sh.Put(putPrm)
require.NoError(t, err)
err = sh.Lock(cnr, lockID, append(childIDs, parentID, linkID))
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(putPrm)
require.NoError(t, err)
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm)
var splitInfoError *objectSDK.SplitInfoError
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
}

View file

@ -84,7 +84,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
Storage: fstree.New( Storage: fstree.New(
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))), fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
}, },
})}) })},
nil)
defer releaseShard(sh, t) defer releaseShard(sh, t)
for _, tc := range testCases { for _, tc := range testCases {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
@ -12,8 +13,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
@ -29,11 +33,13 @@ func (s epochState) CurrentEpoch() uint64 {
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
return newCustomShard(t, t.TempDir(), enableWriteCache, return newCustomShard(t, t.TempDir(), enableWriteCache,
nil,
nil, nil,
nil) nil)
} }
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard { func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard {
var sh *shard.Shard
if enableWriteCache { if enableWriteCache {
rootPath = filepath.Join(rootPath, "wc") rootPath = filepath.Join(rootPath, "wc")
} else { } else {
@ -67,8 +73,9 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...), shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")), append([]meta.Option{
meta.WithEpochState(epochState{}), meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})},
metaOptions...)...,
), ),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
shard.WithWriteCache(enableWriteCache), shard.WithWriteCache(enableWriteCache),
@ -77,9 +84,21 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
[]writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))}, []writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts...)..., wcOpts...)...,
), ),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithGCRemoverSleepInterval(1 * time.Millisecond),
} }
sh := shard.New(opts...) sh = shard.New(opts...)
require.NoError(t, sh.Open()) require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))

View file

@ -37,7 +37,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
writecache.WithSmallObjectSize(smallSize), writecache.WithSmallObjectSize(smallSize),
writecache.WithMaxObjectSize(smallSize * 2)} writecache.WithMaxObjectSize(smallSize * 2)}
sh := newCustomShard(t, dir, true, wcOpts, nil) sh := newCustomShard(t, dir, true, wcOpts, nil, nil)
var putPrm shard.PutPrm var putPrm shard.PutPrm
@ -48,7 +48,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
} }
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
sh = newCustomShard(t, dir, true, wcOpts, nil) sh = newCustomShard(t, dir, true, wcOpts, nil, nil)
defer releaseShard(sh, t) defer releaseShard(sh, t)
var getPrm shard.GetPrm var getPrm shard.GetPrm