Fix complex object deletion #348

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:fix/complex_object_lifetime into master 2023-05-16 12:44:58 +00:00
8 changed files with 204 additions and 60 deletions

View file

@ -16,6 +16,7 @@ Changelog for FrostFS Node
- Notary requests parsing according to `neo-go`'s updates (#268)
- Tree service panic in its internal client cache (#322)
- Iterate over endpoints when create ws client in morph's constructor (#304)
- Delete complex objects with GC (#332)
### Removed
### Updated

View file

@ -483,4 +483,5 @@ const (
ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed"
ShardGCRemoveGarbageStarted = "garbage remove started"
ShardGCRemoveGarbageCompleted = "garbage remove completed"
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
)

View file

@ -0,0 +1,57 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> empty slice value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
if _, found := result[addr]; found {
continue
}
result[addr] = []oid.Address{}
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer))
ale64bit marked this conversation as resolved Outdated

why the slicing ([:]) here? it's already a slice.

why the slicing (`[:]`) here? it's already a slice.

fixed

fixed
if bkt == nil {
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer)))
if err != nil {
return err
}
for _, binObjID := range binObjIDs {
ale64bit marked this conversation as resolved Outdated

why the check? the next loop won't do anything anyway.

why the check? the next loop won't do anything anyway.

done

done
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -320,6 +320,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
expired, err := s.getExpiredWithLinked(expired)
if err != nil {
s.log.Warn(logs.ShardGCFailedToGetExpiredWithLinked, zap.Error(err))
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
@ -345,6 +351,20 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
}
}
func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

View file

@ -2,79 +2,30 @@ package shard_test
import (
"context"
"path/filepath"
"errors"
"testing"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func Test_GCDropsLockedExpiredObject(t *testing.T) {
func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
t.Parallel()
var sh *shard.Shard
epoch := &epochState{
Value: 100,
}
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(2),
blobovniczatree.WithBlobovniczaShallowWidth(2)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
@ -122,3 +73,97 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
}
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
t.Parallel()
ale64bit marked this conversation as resolved Outdated

this is unfortunate...isn't there another way? (same below)

this is unfortunate...isn't there another way? (same below)

Looks ugly, I agree. But there are two async processes: one for expired objects collecting, second for physically delete expired objects.

Looks ugly, I agree. But there are two async processes: one for expired objects collecting, second for physically delete expired objects.

I'm not sure what to do about it, but it would be nice to look at it some other time.

I'm not sure what to do about it, but it would be nice to look at it some other time.
epoch := &epochState{
Value: 100,
}
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
parent.SetAttributes(objExpirationAttr)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
linkID, _ := link.ID()
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
})
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm shard.PutPrm
for _, child := range children {
putPrm.SetObject(child)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
}
putPrm.SetObject(link)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm)
var splitInfoError *objectSDK.SplitInfoError
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
}

View file

@ -87,7 +87,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
Storage: fstree.New(
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
},
})})
})},
nil)
defer releaseShard(sh, t)
for _, tc := range testCases {

View file

@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
@ -12,8 +13,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
@ -29,11 +33,13 @@ func (s epochState) CurrentEpoch() uint64 {
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
return newCustomShard(t, t.TempDir(), enableWriteCache,
nil,
nil,
nil)
}
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard {
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard {
fyrchik marked this conversation as resolved Outdated

We already have options for writecache and blobstor. What about also having options for a metabase instead?

We already have options for writecache and blobstor. What about also having options for a metabase instead?

fixed

fixed
var sh *shard.Shard
if enableWriteCache {
rootPath = filepath.Join(rootPath, "wc")
} else {
@ -67,8 +73,9 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
append([]meta.Option{
meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})},
metaOptions...)...,
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
shard.WithWriteCache(enableWriteCache),
@ -77,9 +84,21 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
[]writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts...)...,
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithGCRemoverSleepInterval(1 * time.Millisecond),
}
sh := shard.New(opts...)
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))

View file

@ -40,7 +40,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
writecache.WithSmallObjectSize(smallSize),
writecache.WithMaxObjectSize(smallSize * 2)}
sh := newCustomShard(t, dir, true, wcOpts, nil)
sh := newCustomShard(t, dir, true, wcOpts, nil, nil)
var errG errgroup.Group
for i := range objects {
@ -55,7 +55,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
require.NoError(t, errG.Wait())
require.NoError(t, sh.Close())
sh = newCustomShard(t, dir, true, wcOpts, nil)
sh = newCustomShard(t, dir, true, wcOpts, nil, nil)
defer releaseShard(sh, t)
var getPrm shard.GetPrm