diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 40d3cc1cd..3d4f791ac 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -464,7 +464,7 @@ func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Ad return e.engine.IsLocked(ctx, address) } -func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error { +func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error { var prm engine.InhumePrm addrs := make([]oid.Address, len(toDelete)) @@ -473,7 +473,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad addrs[i].SetObject(toDelete[i]) } - prm.WithTarget(tombstone, addrs...) + prm.WithTarget(tombstone, expEpoch, addrs...) return e.engine.Inhume(ctx, prm) } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6a72644e5..19ccb0ab6 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -253,6 +253,8 @@ const ( ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage" ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" + ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects" + ShardFailedToRemoveExpiredGraves = "failed to remove expired graves" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" diff --git a/internal/metrics/gc.go b/internal/metrics/gc.go index 53bfef0e5..664944b05 100644 --- a/internal/metrics/gc.go +++ b/internal/metrics/gc.go @@ -11,7 +11,7 @@ import ( type GCMetrics interface { AddRunDuration(shardID string, d time.Duration, success bool) AddDeletedCount(shardID string, deleted, failed uint64) - AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) + AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool) AddInhumedObjectCount(shardID string, count uint64, objectType string) } @@ -71,11 +71,10 @@ func (m *gcMetrics) AddDeletedCount(shardID string, deleted, failed uint64) { }).Add(float64(failed)) } -func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) { +func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool) { m.expCollectDuration.With(prometheus.Labels{ - shardIDLabel: shardID, - successLabel: strconv.FormatBool(success), - objectTypeLabel: objectType, + shardIDLabel: shardID, + successLabel: strconv.FormatBool(success), }).Add(d.Seconds()) } diff --git a/pkg/core/object/object.go b/pkg/core/object/object.go index 9c450966c..fa3735a30 100644 --- a/pkg/core/object/object.go +++ b/pkg/core/object/object.go @@ -1,6 +1,9 @@ package object import ( + "strconv" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -21,3 +24,14 @@ func AddressOf(obj *objectSDK.Object) oid.Address { return addr } + +// ExpirationEpoch returns the expiration epoch of the object. +func ExpirationEpoch(obj *objectSDK.Object) (epoch uint64, found bool) { + for _, attr := range obj.Attributes() { + if attr.Key() == objectV2.SysAttributeExpEpoch { + epoch, err := strconv.ParseUint(attr.Value(), 10, 64) + return epoch, err == nil + } + } + return +} diff --git a/pkg/core/object/object_test.go b/pkg/core/object/object_test.go new file mode 100644 index 000000000..b990dd4b8 --- /dev/null +++ b/pkg/core/object/object_test.go @@ -0,0 +1,49 @@ +package object_test + +import ( + "strconv" + "testing" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func TestExpirationEpoch(t *testing.T) { + obj := objecttest.Object() + + var expEpoch uint64 = 42 + expAttr := objectSDK.NewAttribute() + expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetValue(strconv.FormatUint(expEpoch, 10)) + + t.Run("no attributes set", func(t *testing.T) { + obj.SetAttributes() + _, found := objectCore.ExpirationEpoch(obj) + require.False(t, found) + }) + t.Run("no expiration epoch attribute", func(t *testing.T) { + obj.SetAttributes(*objecttest.Attribute(), *objectSDK.NewAttribute()) + _, found := objectCore.ExpirationEpoch(obj) + require.False(t, found) + }) + t.Run("valid expiration epoch attribute", func(t *testing.T) { + obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute()) + epoch, found := objectCore.ExpirationEpoch(obj) + require.True(t, found) + require.Equal(t, expEpoch, epoch) + }) + t.Run("invalid expiration epoch value", func(t *testing.T) { + expAttr.SetValue("-42") + obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute()) + _, found := objectCore.ExpirationEpoch(obj) + require.False(t, found) + + expAttr.SetValue("qwerty") + obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute()) + _, found = objectCore.ExpirationEpoch(obj) + require.False(t, found) + }) +} diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go new file mode 100644 index 000000000..ad6041394 --- /dev/null +++ b/pkg/local_object_storage/engine/gc_test.go @@ -0,0 +1,111 @@ +package engine + +import ( + "context" + "strconv" + "testing" + "time" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +func testPopulateWithObjects(t testing.TB, engine *StorageEngine, cnt cid.ID, objectCount int) (objects []*objectSDK.Object) { + require.Positive(t, objectCount) + + var putPrm PutPrm + for range objectCount { + putPrm.Object = testutil.GenerateObjectWithCID(cnt) + require.NoError(t, engine.Put(context.Background(), putPrm)) + objects = append(objects, putPrm.Object) + } + return objects +} + +func testInhumeObjects(t testing.TB, engine *StorageEngine, objects []*objectSDK.Object, expEpoch uint64) (tombstone *objectSDK.Object) { + require.NotEmpty(t, objects) + cnt := objectCore.AddressOf(objects[0]).Container() + + tombstone = testutil.GenerateObjectWithCID(cnt) + tombstone.SetType(objectSDK.TypeTombstone) + testutil.AddAttribute(tombstone, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expEpoch, 10)) + + var putPrm PutPrm + putPrm.Object = tombstone + require.NoError(t, engine.Put(context.Background(), putPrm)) + + var addrs []oid.Address + for _, object := range objects { + addrs = append(addrs, objectCore.AddressOf(object)) + } + tombstoneAddr := objectCore.AddressOf(tombstone) + + pivot := len(objects) / 2 + + var inhumePrm InhumePrm + inhumePrm.WithTarget(tombstoneAddr, expEpoch, addrs[:pivot]...) + err := engine.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + inhumePrm.WithTarget(tombstoneAddr, meta.NoExpirationEpoch, addrs[pivot:]...) + err = engine.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + return +} + +func TestGCHandleExpiredTombstones(t *testing.T) { + t.Parallel() + + const ( + numShards = 2 + objectCount = 10 * numShards + expEpoch = 1 + ) + + container := cidtest.ID() + + engine := testNewEngine(t). + setShardsNumAdditionalOpts(t, numShards, func(_ int) []shard.Option { + return []shard.Option{ + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + return pool + }), + shard.WithTombstoneSource(tss{expEpoch}), + } + }).prepare(t).engine + defer func() { require.NoError(t, engine.Close(context.Background())) }() + + objects := testPopulateWithObjects(t, engine, container, objectCount) + tombstone := testInhumeObjects(t, engine, objects, expEpoch) + + engine.HandleNewEpoch(context.Background(), expEpoch+1) + + var headPrm HeadPrm + require.Eventually(t, func() bool { + for _, object := range objects { + headPrm.WithAddress(objectCore.AddressOf(object)) + _, err := engine.Head(context.Background(), headPrm) + if !client.IsErrObjectNotFound(err) { + return false + } + } + + headPrm.WithAddress(objectCore.AddressOf(tombstone)) + _, err := engine.Head(context.Background(), headPrm) + return client.IsErrObjectNotFound(err) + }, 10*time.Second, 1*time.Second) +} diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index fb802ef2a..15158d72c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -24,6 +24,7 @@ type InhumePrm struct { addrs []oid.Address forceRemoval bool + expEpoch uint64 } // WithTarget sets a list of objects that should be inhumed and tombstone address @@ -31,9 +32,10 @@ type InhumePrm struct { // // tombstone should not be nil, addr should not be empty. // Should not be called along with MarkAsGarbage. -func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) { +func (p *InhumePrm) WithTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) { p.addrs = addrs p.tombstone = &tombstone + p.expEpoch = expEpoch } // MarkAsGarbage marks an object to be physically removed from local storage. @@ -88,7 +90,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error { for shardID, addrs := range addrsPerShard { if prm.tombstone != nil { - shPrm.SetTarget(*prm.tombstone, addrs...) + shPrm.SetTarget(*prm.tombstone, prm.expEpoch, addrs...) } else { shPrm.MarkAsGarbage(addrs...) } @@ -270,9 +272,9 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I return allLocks, outErr } -func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { +func (e *StorageEngine) processExpiredGraves(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - sh.HandleExpiredTombstones(ctx, addrs) + sh.HandleExpiredGraves(ctx, addrs) select { case <-ctx.Done(): @@ -283,9 +285,9 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me }) } -func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { +func (e *StorageEngine) processExpiredLockObjects(ctx context.Context, epoch uint64, lockers []oid.Address) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - sh.HandleExpiredLocks(ctx, epoch, lockers) + sh.HandleExpiredLockObjects(ctx, epoch, lockers) select { case <-ctx.Done(): diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 8c5d28b15..cbf0d32ac 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -44,6 +44,8 @@ func TestStorageEngine_Inhume(t *testing.T) { link.SetChildren(idChild) link.SetSplitID(splitID) + const tombstoneExpEpoch = 1008 + t.Run("delete small object", func(t *testing.T) { t.Parallel() e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine @@ -53,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) { require.NoError(t, err) var inhumePrm InhumePrm - inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) + inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent)) err = e.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -83,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) { require.NoError(t, err) var inhumePrm InhumePrm - inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) + inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent)) err = e.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -127,7 +129,9 @@ func TestStorageEngine_ECInhume(t *testing.T) { require.NoError(t, Put(context.Background(), e, tombstoneObject, false)) var inhumePrm InhumePrm - inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress) + + const tombstoneExpEpoch = 1008 + inhumePrm.WithTarget(tombstoneObjectAddress, tombstoneExpEpoch, parentObjectAddress) err = e.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -143,6 +147,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) { const currEpoch = 42 const objectExpiresAfter = currEpoch - 1 + const tombstoneExpiresAfter = currEpoch + 1000 engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option { return []shard.Option{ @@ -172,7 +177,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) { ts.SetContainer(cnr) var prm InhumePrm - prm.WithTarget(ts, object.AddressOf(obj)) + prm.WithTarget(ts, tombstoneExpiresAfter, object.AddressOf(obj)) err := engine.Inhume(context.Background(), prm) require.NoError(t, err) }) @@ -205,6 +210,8 @@ func BenchmarkInhumeMultipart(b *testing.B) { func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) { b.StopTimer() + const tombstoneExpiresAfter = 1000 // doesn't matter, just big enough + engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))). setShardsNum(b, numShards).prepare(b).engine defer func() { require.NoError(b, engine.Close(context.Background())) }() @@ -234,7 +241,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) { ts.SetContainer(cnt) prm := InhumePrm{} - prm.WithTarget(ts, addrs...) + prm.WithTarget(ts, tombstoneExpiresAfter, addrs...) b.StartTimer() err := engine.Inhume(context.Background(), prm) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index b8c9d6b1d..c09e67631 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -40,6 +40,7 @@ func TestLockUserScenario(t *testing.T) { // 5. waits for an epoch after the lock expiration one // 6. tries to inhume the object and expects success const lockerExpiresAfter = 13 + const tombstoneExpiresAfter = 1000 cnr := cidtest.ID() tombObj := testutil.GenerateObjectWithCID(cnr) @@ -111,13 +112,15 @@ func TestLockUserScenario(t *testing.T) { // 3. var inhumePrm InhumePrm - inhumePrm.WithTarget(tombAddr, objAddr) + inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr) var objLockedErr *apistatus.ObjectLocked err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) // 4. + a.SetValue(strconv.Itoa(tombstoneExpiresAfter)) + tombObj.SetType(objectSDK.TypeTombstone) tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) @@ -125,7 +128,7 @@ func TestLockUserScenario(t *testing.T) { err = Put(context.Background(), e, tombObj, false) require.NoError(t, err) - inhumePrm.WithTarget(tombForLockAddr, lockerAddr) + inhumePrm.WithTarget(tombForLockAddr, tombstoneExpiresAfter, lockerAddr) err = e.Inhume(context.Background(), inhumePrm) require.ErrorIs(t, err, meta.ErrLockObjectRemoval) @@ -133,7 +136,7 @@ func TestLockUserScenario(t *testing.T) { // 5. e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) - inhumePrm.WithTarget(tombAddr, objAddr) + inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr) require.Eventually(t, func() bool { err = e.Inhume(context.Background(), inhumePrm) @@ -166,6 +169,7 @@ func TestLockExpiration(t *testing.T) { defer func() { require.NoError(t, e.Close(context.Background())) }() const lockerExpiresAfter = 13 + const tombstoneExpiresAfter = 1000 cnr := cidtest.ID() var err error @@ -197,7 +201,7 @@ func TestLockExpiration(t *testing.T) { var inhumePrm InhumePrm tombAddr := oidtest.Address() tombAddr.SetContainer(cnr) - inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) + inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj)) var objLockedErr *apistatus.ObjectLocked err = e.Inhume(context.Background(), inhumePrm) @@ -209,7 +213,7 @@ func TestLockExpiration(t *testing.T) { // 4. tombAddr = oidtest.Address() tombAddr.SetContainer(cnr) - inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) + inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj)) require.Eventually(t, func() bool { err = e.Inhume(context.Background(), inhumePrm) @@ -273,7 +277,8 @@ func TestLockForceRemoval(t *testing.T) { err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) - inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) + const tombstoneExpEpoch = 1008 + inhumePrm.WithTarget(oidtest.Address(), tombstoneExpEpoch, objectcore.AddressOf(obj)) err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 963292d83..0e9c68909 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -39,8 +39,8 @@ func (m *gcMetrics) AddDeletedCount(deleted, failed uint64) { m.storage.AddDeletedCount(m.shardID, deleted, failed) } -func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) { - m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success, objectType) +func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool) { + m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success) } func (m *gcMetrics) AddInhumedObjectCount(count uint64, objectType string) { @@ -87,7 +87,7 @@ func (noopWriteCacheMetrics) SetMode(string, string) func (noopWriteCacheMetrics) IncOperationCounter(string, string, string, string, metrics.NullBool) {} func (noopWriteCacheMetrics) Close(string, string) {} -func (noopGCMetrics) AddRunDuration(string, time.Duration, bool) {} -func (noopGCMetrics) AddDeletedCount(string, uint64, uint64) {} -func (noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool, string) {} -func (noopGCMetrics) AddInhumedObjectCount(string, uint64, string) {} +func (noopGCMetrics) AddRunDuration(string, time.Duration, bool) {} +func (noopGCMetrics) AddDeletedCount(string, uint64, uint64) {} +func (noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool) {} +func (noopGCMetrics) AddInhumedObjectCount(string, uint64, string) {} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 8e191f72c..ef58df83c 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -131,8 +131,8 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* sh := shard.New(append(opts, shard.WithID(id), - shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), - shard.WithExpiredLocksCallback(e.processExpiredLocks), + shard.WithExpiredTombstonesCallback(e.processExpiredGraves), + shard.WithExpiredLocksCallback(e.processExpiredLockObjects), shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorByID), shard.WithZeroSizeCallback(e.processZeroSizeContainers), diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 950385a29..3523449a7 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -156,11 +156,13 @@ func TestCounters(t *testing.T) { } var prm meta.InhumePrm + const tombstoneExpEpoch = 1008 for _, o := range inhumedObjs { tombAddr := oidtest.Address() tombAddr.SetContainer(o.Container()) prm.SetTombstoneAddress(tombAddr) + prm.SetTombstoneExpEpoch(tombstoneExpEpoch) prm.SetAddresses(o) res, err := db.Inhume(context.Background(), prm) @@ -301,11 +303,13 @@ func TestCounters(t *testing.T) { } var prm meta.InhumePrm + const tombstoneExpEpoch = 1008 for _, o := range inhumedObjs { tombAddr := oidtest.Address() tombAddr.SetContainer(o.Container()) prm.SetTombstoneAddress(tombAddr) + prm.SetTombstoneExpEpoch(tombstoneExpEpoch) prm.SetAddresses(o) _, err := db.Inhume(context.Background(), prm) diff --git a/pkg/local_object_storage/metabase/delete_ec_test.go b/pkg/local_object_storage/metabase/delete_ec_test.go index 884da23ff..215b3e515 100644 --- a/pkg/local_object_storage/metabase/delete_ec_test.go +++ b/pkg/local_object_storage/metabase/delete_ec_test.go @@ -23,10 +23,13 @@ import ( func TestDeleteECObject_WithoutSplit(t *testing.T) { t.Parallel() + const currEpoch = 12 + const tombstoneExpEpoch = currEpoch + 1 + db := New( WithPath(filepath.Join(t.TempDir(), "metabase")), WithPermissions(0o600), - WithEpochState(epochState{uint64(12)}), + WithEpochState(epochState{uint64(currEpoch)}), ) require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) @@ -82,6 +85,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) { tombAddress.SetObject(tombstoneID) inhumePrm.SetAddresses(ecParentAddress) inhumePrm.SetTombstoneAddress(tombAddress) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -179,10 +183,13 @@ func TestDeleteECObject_WithSplit(t *testing.T) { func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) { t.Parallel() + const currEpoch = 12 + const tombstoneExpEpoch = currEpoch + 1 + db := New( WithPath(filepath.Join(t.TempDir(), "metabase")), WithPermissions(0o600), - WithEpochState(epochState{uint64(12)}), + WithEpochState(epochState{currEpoch}), ) require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) @@ -289,6 +296,7 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool tombAddress.SetObject(tombstoneID) inhumePrm.SetAddresses(inhumeAddresses...) inhumePrm.SetTombstoneAddress(tombAddress) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 2f23d424c..ead8d80ad 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -91,8 +91,9 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err // TombstonedObject represents descriptor of the // object that has been covered with tombstone. type TombstonedObject struct { - addr oid.Address - tomb oid.Address + addr oid.Address + tomb oid.Address + expEpoch uint64 } // Address returns tombstoned object address. @@ -106,6 +107,11 @@ func (g TombstonedObject) Tombstone() oid.Address { return g.tomb } +// ExpirationEpoch returns an expiration epoch of a tombstoned object. +func (g TombstonedObject) ExpirationEpoch() uint64 { + return g.expEpoch +} + // TombstonedHandler is a TombstonedObject handling function. type TombstonedHandler func(object TombstonedObject) error @@ -249,7 +255,7 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) { func graveFromKV(k, v []byte) (res TombstonedObject, err error) { if err = decodeAddressFromKey(&res.addr, k); err != nil { err = fmt.Errorf("decode tombstone target from key: %w", err) - } else if err = decodeAddressFromKey(&res.tomb, v); err != nil { + } else if res.tomb, res.expEpoch, err = decodeTombstoneKeyWithExpEpoch(v); err != nil { err = fmt.Errorf("decode tombstone address from value: %w", err) } @@ -311,3 +317,42 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh }) return res, err } + +// RemoveGraves removes graves from graveyard. +func (db *DB) RemoveGraves(ctx context.Context, graves []TombstonedObject) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("RemoveGraves", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveGraves") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } else if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + key := make([]byte, addressKeySize) + + err := db.boltDB.Batch(func(tx *bbolt.Tx) error { + graveyard := tx.Bucket(graveyardBucketName) + if graveyard == nil { + return nil + } + for i := range graves { + if err := graveyard.Delete(addressKey(graves[i].Address(), key)); err != nil { + return fmt.Errorf("remove grave: %w", err) + } + } + return nil + }) + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/metabase/graveyard_test.go b/pkg/local_object_storage/metabase/graveyard_test.go index ebadecc04..76931198f 100644 --- a/pkg/local_object_storage/metabase/graveyard_test.go +++ b/pkg/local_object_storage/metabase/graveyard_test.go @@ -2,6 +2,7 @@ package meta_test import ( "context" + "math/rand" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -13,6 +14,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) { @@ -144,8 +146,10 @@ func TestDB_IterateDeletedObjects(t *testing.T) { addrTombstone := oidtest.Address() addrTombstone.SetContainer(cnr) + const tombstoneExpEpoch = 1008 inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetTombstoneAddress(addrTombstone) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -232,10 +236,13 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) { addrTombstone.SetContainer(cnr) var inhumePrm meta.InhumePrm + + const tombstoneExpEpoch = 1008 inhumePrm.SetAddresses( object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)) inhumePrm.SetTombstoneAddress(addrTombstone) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -428,8 +435,11 @@ func TestDB_InhumeTombstones(t *testing.T) { addrTombstone := object.AddressOf(objTs) var inhumePrm meta.InhumePrm + + const tombstoneExpEpoch = 1008 inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetTombstoneAddress(addrTombstone) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) @@ -464,3 +474,68 @@ func TestDB_InhumeTombstones(t *testing.T) { require.NoError(t, err) require.Zero(t, counter) } + +func TestIterateOverGraveyardWithDifferentGraveFormats(t *testing.T) { + t.Parallel() + + type grave struct { + addr, tomb oid.Address + expEpoch uint64 + } + + var ( + numGraves = 20 + + expectedGraves []grave + actualGraves []grave + eg errgroup.Group + ) + + db := newDB(t) + defer func() { require.NoError(t, db.Close(context.Background())) }() + + var expEpochs []uint64 + for i := range numGraves / 2 { + expEpochs = append(expEpochs, uint64(i)) + expEpochs = append(expEpochs, meta.NoExpirationEpoch) + } + rand.Shuffle(len(expEpochs), func(i, j int) { + expEpochs[i], expEpochs[j] = expEpochs[j], expEpochs[i] + }) + + for _, expEpoch := range expEpochs { + cnt := cidtest.ID() + + addr := oidtest.Address() + addr.SetContainer(cnt) + + tomb := oidtest.Address() + tomb.SetContainer(cnt) + + expectedGraves = append(expectedGraves, grave{ + addr: addr, tomb: tomb, expEpoch: expEpoch, + }) + + eg.Go(func() error { + var prm meta.InhumePrm + prm.SetAddresses(addr) + prm.SetTombstoneAddress(tomb) + prm.SetTombstoneExpEpoch(expEpoch) + + _, err := db.Inhume(context.Background(), prm) + return err + }) + } + require.NoError(t, eg.Wait()) + + var iterPrm meta.GraveyardIterationPrm + iterPrm.SetHandler(func(o meta.TombstonedObject) error { + actualGraves = append(actualGraves, grave{ + o.Address(), o.Tombstone(), o.ExpirationEpoch(), + }) + return nil + }) + require.NoError(t, db.IterateOverGraveyard(context.Background(), iterPrm)) + + require.ElementsMatch(t, expectedGraves, actualGraves) +} diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 76018fb61..c9d3c9726 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -18,6 +18,8 @@ import ( "go.etcd.io/bbolt" ) +var errTombstoneExpEpochNotSet = errors.New("tombstone expiration epoch is not set") + // InhumePrm encapsulates parameters for Inhume operation. type InhumePrm struct { tomb *oid.Address @@ -27,6 +29,9 @@ type InhumePrm struct { lockObjectHandling bool forceRemoval bool + + expEpoch uint64 + expEpochSet bool } // DeletionInfo contains details on deleted object. @@ -123,6 +128,15 @@ func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) { p.tomb = &addr } +// SetTombstoneExpEpoch sets the expiration epoch for a tombstone. +// +// Setting the expiration epoch is required when inhuming with a tombstone. +// [meta.NoExpirationEpoch] may be used allowed, but only for testing purposes. +func (p *InhumePrm) SetTombstoneExpEpoch(epoch uint64) { + p.expEpoch = epoch + p.expEpochSet = true +} + // SetGCMark marks the object to be physically removed. // // Should not be called along with SetTombstoneAddress. @@ -148,6 +162,9 @@ func (p *InhumePrm) validate() error { return nil } if p.tomb != nil { + if !p.expEpochSet { + return errTombstoneExpEpochNotSet + } for _, addr := range p.target { if addr.Container() != p.tomb.Container() { return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb) @@ -365,7 +382,10 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { if prm.tomb != nil { targetBucket = graveyardBKT - tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) + tombKey := make([]byte, addressKeySize+epochSize) + if err = encodeTombstoneKeyWithExpEpoch(*prm.tomb, prm.expEpoch, tombKey); err != nil { + return nil, nil, fmt.Errorf("encode tombstone key with expiration epoch: %w", err) + } // it is forbidden to have a tomb-on-tomb in FrostFS, // so graveyard keys must not be addresses of tombstones @@ -376,6 +396,14 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err) } } + // it can be a tombstone key without an expiration epoch + data = targetBucket.Get(tombKey[:addressKeySize]) + if data != nil { + err := targetBucket.Delete(tombKey[:addressKeySize]) + if err != nil { + return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err) + } + } value = tombKey } else { @@ -418,6 +446,9 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool { targetIsTomb := false + // take only address because graveyard record may have expiration epoch suffix + addressKey = addressKey[:addressKeySize] + // iterate over graveyard and check if target address // is the address of tombstone in graveyard. // tombstone must have the same container ID as key. @@ -426,7 +457,7 @@ func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool { for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() { // check if graveyard has record with key corresponding // to tombstone address (at least one) - targetIsTomb = bytes.Equal(v, addressKey) + targetIsTomb = bytes.HasPrefix(v, addressKey) if targetIsTomb { break } diff --git a/pkg/local_object_storage/metabase/inhume_ec_test.go b/pkg/local_object_storage/metabase/inhume_ec_test.go index 180713287..e25420fd7 100644 --- a/pkg/local_object_storage/metabase/inhume_ec_test.go +++ b/pkg/local_object_storage/metabase/inhume_ec_test.go @@ -94,10 +94,12 @@ func TestInhumeECObject(t *testing.T) { require.True(t, res.deletionDetails[0].Size == 5) // inhume EC parent (like Delete does) + const tombstoneExpEpoch = 1008 tombAddress.SetContainer(cnr) tombAddress.SetObject(tombstoneID) inhumePrm.SetAddresses(ecParentAddress) inhumePrm.SetTombstoneAddress(tombAddress) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) res, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) // Previously deleted chunk shouldn't be in the details, because it is marked as garbage diff --git a/pkg/local_object_storage/metabase/inhume_test.go b/pkg/local_object_storage/metabase/inhume_test.go index 786d10396..2fa5d0ab6 100644 --- a/pkg/local_object_storage/metabase/inhume_test.go +++ b/pkg/local_object_storage/metabase/inhume_test.go @@ -50,6 +50,8 @@ func TestInhumeTombOnTomb(t *testing.T) { inhumePrm meta.InhumePrm existsPrm meta.ExistsPrm ) + const tombstoneExpEpoch = 1008 + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) addr1.SetContainer(cnr) addr2.SetContainer(cnr) @@ -124,12 +126,15 @@ func TestInhumeLocked(t *testing.T) { } func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error { + const tombstoneExpEpoch = 1008 + var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(target) var tombAddr oid.Address tombAddr.SetContainer(target.Container()) tombAddr.SetObject(tomb) inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err := db.Inhume(context.Background(), inhumePrm) return err diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index 341ff9ad1..a1228823c 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -62,6 +62,8 @@ func TestDB_Lock(t *testing.T) { objAddr := objectcore.AddressOf(objs[0]) lockAddr := objectcore.AddressOf(lockObj) + const tombstoneExpEpoch = 1008 + var inhumePrm meta.InhumePrm inhumePrm.SetGCMark() @@ -76,6 +78,7 @@ func TestDB_Lock(t *testing.T) { tombAddr := oidtest.Address() tombAddr.SetContainer(objAddr.Container()) inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) @@ -94,6 +97,7 @@ func TestDB_Lock(t *testing.T) { tombAddr = oidtest.Address() tombAddr.SetContainer(objAddr.Container()) inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch) _, err = db.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) }) diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 80851f1c4..ba68f6d91 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -309,3 +309,56 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool { bucketNameLockers(idCnr, make([]byte, bucketKeySize)), objectKey(obj, make([]byte, objectKeySize))) } + +const NoExpirationEpoch uint64 = 0 + +// encodeTombstoneKeyWithExpEpoch encodes a tombstone key of a tombstoned +// object if the following format: tombstone address + expiration epoch. +// +// Returns an error if the buffer length isn't 32. +// +// The expiration epoch shouldn't be [NoExpirationEpoch], as tombstone keys +// are intended to have a valid expiration epoch. +// +// The use of [NoExpirationEpoch] is allowed only for test purposes. +func encodeTombstoneKeyWithExpEpoch(addr oid.Address, expEpoch uint64, dst []byte) error { + if len(dst) != addressKeySize+epochSize { + return errInvalidLength + } + + addr.Container().Encode(dst[:cidSize]) + addr.Object().Encode(dst[cidSize:addressKeySize]) + binary.LittleEndian.PutUint64(dst[addressKeySize:], expEpoch) + + return nil +} + +// decodeTombstoneKeyWithExpEpoch decodes a tombstone key of a tombstoned object. +// The tombstone key may have one of the following formats: +// - tombstone address +// - tombstone address + expiration epoch +// +// Expiration epoch is set to [NoExpirationEpoch] if the key doesn't have it. +func decodeTombstoneKeyWithExpEpoch(src []byte) (addr oid.Address, expEpoch uint64, err error) { + if len(src) != addressKeySize && len(src) != addressKeySize+epochSize { + return oid.Address{}, 0, errInvalidLength + } + + var cnt cid.ID + if err := cnt.Decode(src[:cidSize]); err != nil { + return addr, 0, err + } + var obj oid.ID + if err := obj.Decode(src[cidSize:addressKeySize]); err != nil { + return addr, 0, err + } + addr.SetContainer(cnt) + addr.SetObject(obj) + + if len(src) > addressKeySize { + expEpoch = binary.LittleEndian.Uint64(src[addressKeySize:]) + } else { + expEpoch = NoExpirationEpoch + } + return +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index fedde2206..8ad35d794 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -116,9 +116,8 @@ func (s *Shard) Init(ctx context.Context) error { eventNewEpoch: { cancelFunc: func() {}, handlers: []eventHandler{ - s.collectExpiredLocks, s.collectExpiredObjects, - s.collectExpiredTombstones, + s.collectExpiredGraves, s.collectExpiredMetrics, }, }, @@ -334,6 +333,11 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err } func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error { + expEpoch, ok := object.ExpirationEpoch(obj) + if !ok { + return fmt.Errorf("tombstone %s has no expiration epoch", object.AddressOf(obj)) + } + tombstone := objectSDK.NewTombstone() if err := tombstone.Unmarshal(obj.Payload()); err != nil { @@ -354,6 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object var inhumePrm meta.InhumePrm inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetTombstoneExpEpoch(expEpoch) inhumePrm.SetAddresses(tombMembers...) _, err := s.metaBase.Inhume(ctx, inhumePrm) diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 6d2cd7137..1cf391330 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -7,6 +7,7 @@ import ( "math" "os" "path/filepath" + "strconv" "sync/atomic" "testing" @@ -19,6 +20,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -234,8 +236,15 @@ func TestRefillMetabase(t *testing.T) { } } + expirationEpoch := 1008 + + expirationAttr := *objectSDK.NewAttribute() + expirationAttr.SetKey(objectV2.SysAttributeExpEpoch) + expirationAttr.SetValue(strconv.Itoa(expirationEpoch)) + tombObj := objecttest.Object() tombObj.SetType(objectSDK.TypeTombstone) + tombObj.SetAttributes(expirationAttr) tombstone := objecttest.Tombstone() @@ -276,6 +285,7 @@ func TestRefillMetabase(t *testing.T) { lockObj := objecttest.Object() lockObj.SetContainerID(cnrLocked) + lockObj.SetAttributes(expirationAttr) objectSDK.WriteLock(lockObj, lock) putPrm.SetObject(lockObj) @@ -286,7 +296,7 @@ func TestRefillMetabase(t *testing.T) { require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked)) var inhumePrm InhumePrm - inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...) + inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...) _, err = sh.Inhume(context.Background(), inhumePrm) require.NoError(t, err) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 4a5ec7a71..eba3df59b 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -86,17 +86,17 @@ type GCMectrics interface { SetShardID(string) AddRunDuration(d time.Duration, success bool) AddDeletedCount(deleted, failed uint64) - AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) + AddExpiredObjectCollectionDuration(d time.Duration, success bool) AddInhumedObjectCount(count uint64, objectType string) } type noopGCMetrics struct{} -func (m *noopGCMetrics) SetShardID(string) {} -func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {} -func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {} -func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool, string) {} -func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {} +func (m *noopGCMetrics) SetShardID(string) {} +func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {} +func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {} +func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool) {} +func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {} type gc struct { *gcCfg @@ -355,43 +355,63 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { startedAt := time.Now() defer func() { - s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) + s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) }() - s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) + epoch := e.(newEpoch).epoch + + s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() errGroup, egCtx := errgroup.WithContext(ctx) errGroup.SetLimit(workersCount) - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { - batch = append(batch, o.Address()) + handlers := map[objectSDK.Type]func(context.Context, []oid.Address){ + objectSDK.TypeRegular: s.handleExpiredObjects, + objectSDK.TypeTombstone: s.handleExpiredTombstones, + objectSDK.TypeLock: func(ctx context.Context, batch []oid.Address) { + s.expiredLockObjectsCallback(ctx, epoch, batch) + }, + } + batches := make(map[objectSDK.Type][]oid.Address) + for typ := range handlers { + batches[typ] = make([]oid.Address, 0, batchSize) + } - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } + errGroup.Go(func() error { + expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) { + typ := o.Type() + + if _, ok := handlers[typ]; !ok { + s.log.Warn(ctx, logs.ShardUnknownObjectTypeWhileIteratingExpiredObjects, zap.Stringer("type", typ)) + return + } + + batches[typ] = append(batches[typ], o.Address()) + + if len(batches[typ]) == batchSize { + expired := batches[typ] + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + batches[typ] = make([]oid.Address, 0, batchSize) } }) if expErr != nil { return expErr } - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) + for typ, batch := range batches { + if len(batch) > 0 { + expired := batch + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + } } return nil @@ -402,6 +422,41 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } } +func (s *Shard) handleExpiredTombstones(ctx context.Context, expired []oid.Address) { + select { + case <-ctx.Done(): + return + default: + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return + } + + var inhumePrm meta.InhumePrm + inhumePrm.SetAddresses(expired...) + inhumePrm.SetGCMark() + + res, err := s.metaBase.Inhume(ctx, inhumePrm) + if err != nil { + s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err)) + return + } + + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone) + s.decObjectCounterBy(logical, res.LogicInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) + + for i := range res.GetDeletionInfoLength() { + delInfo := res.GetDeletionInfoByIndex(i) + s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) + } +} + func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) { select { case <-ctx.Done(): @@ -464,12 +519,12 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) return result, nil } -func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { +func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) { var err error startedAt := time.Now() defer func() { - s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) + s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) }() epoch := e.(newEpoch).epoch @@ -480,7 +535,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { const tssDeleteBatch = 50 tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) - tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch) + expiredGravesWithExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch) + expiredGravesWithoutExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch) var iterPrm meta.GraveyardIterationPrm iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error { @@ -521,72 +577,45 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { } for _, ts := range tss { - if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) { - tssExp = append(tssExp, ts) + if ts.ExpirationEpoch() != meta.NoExpirationEpoch { + expiredGravesWithExpEpoch = append(expiredGravesWithExpEpoch, ts) + } else if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) { + expiredGravesWithoutExpEpoch = append(expiredGravesWithoutExpEpoch, ts) } } - log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp))) - if len(tssExp) > 0 { - s.expiredTombstonesCallback(ctx, tssExp) - } + log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, + zap.Int("with expiration epoch", len(expiredGravesWithExpEpoch)), + zap.Int("without expiration epoch", len(expiredGravesWithoutExpEpoch)), + ) + + s.handleExpiredGraves(ctx, expiredGravesWithExpEpoch, expiredGravesWithoutExpEpoch) iterPrm.SetOffset(tss[tssLen-1].Address()) tss = tss[:0] - tssExp = tssExp[:0] + expiredGravesWithExpEpoch = expiredGravesWithExpEpoch[:0] + expiredGravesWithoutExpEpoch = expiredGravesWithoutExpEpoch[:0] } } -func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { - var err error - startedAt := time.Now() +func (s *Shard) handleExpiredGraves(ctx context.Context, + expiredWithExpEpoch []meta.TombstonedObject, expiredWithoutExpEpoch []meta.TombstonedObject, +) { + if len(expiredWithExpEpoch) > 0 { + s.removeExpiredGraves(ctx, expiredWithExpEpoch) + } + if len(expiredWithoutExpEpoch) > 0 { + s.expiredGravesCallback(ctx, expiredWithoutExpEpoch) + } +} - defer func() { - s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) - }() +func (s *Shard) removeExpiredGraves(ctx context.Context, expired []meta.TombstonedObject) { + if s.info.Mode.NoMetabase() { + return + } - s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) - - workersCount, batchSize := s.getExpiredObjectsParameters() - - errGroup, egCtx := errgroup.WithContext(ctx) - errGroup.SetLimit(workersCount) - - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() == objectSDK.TypeLock { - batch = append(batch, o.Address()) - - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } - } - }) - if expErr != nil { - return expErr - } - - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - } - - return nil - }) - - if err = errGroup.Wait(); err != nil { - s.log.Warn(ctx, logs.ShardIteratorOverExpiredLocksFailed, zap.Error(err)) + if err := s.metaBase.RemoveGraves(ctx, expired); err != nil { + s.log.Warn(ctx, logs.ShardFailedToRemoveExpiredGraves, zap.Error(err)) } } @@ -624,11 +653,11 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid return s.metaBase.FilterExpired(ctx, epoch, addresses) } -// HandleExpiredTombstones marks tombstones themselves as garbage +// HandleExpiredGraves marks tombstones themselves as garbage // and clears up corresponding graveyard records. // // Does not modify tss. -func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) { +func (s *Shard) HandleExpiredGraves(ctx context.Context, tss []meta.TombstonedObject) { s.m.RLock() defer s.m.RUnlock() @@ -658,9 +687,9 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston } } -// HandleExpiredLocks unlocks all objects which were locked by lockers. +// HandleExpiredLockObjects unlocks all objects which were locked by lockers. // If successful, marks lockers themselves as garbage. -func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { +func (s *Shard) HandleExpiredLockObjects(ctx context.Context, epoch uint64, lockers []oid.Address) { if s.GetMode().NoMetabase() { return } diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index 9998bbae2..681ad9ffa 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -65,7 +65,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { sh.HandleDeletedLocks(ctx, addresses) }), WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { - sh.HandleExpiredLocks(ctx, epoch, a) + sh.HandleExpiredLockObjects(ctx, epoch, a) }), WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 9d5f66063..6ee4e8810 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -19,6 +19,7 @@ type InhumePrm struct { target []oid.Address tombstone *oid.Address forceRemoval bool + expEpoch uint64 } // InhumeRes encapsulates results of inhume operation. @@ -29,9 +30,10 @@ type InhumeRes struct{} // // tombstone should not be nil, addr should not be empty. // Should not be called along with MarkAsGarbage. -func (p *InhumePrm) SetTarget(tombstone oid.Address, addrs ...oid.Address) { +func (p *InhumePrm) SetTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) { p.target = addrs p.tombstone = &tombstone + p.expEpoch = expEpoch } // MarkAsGarbage marks object to be physically removed from shard. @@ -93,6 +95,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { if prm.tombstone != nil { metaPrm.SetTombstoneAddress(*prm.tombstone) + metaPrm.SetTombstoneExpEpoch(prm.expEpoch) } else { metaPrm.SetGCMark() } diff --git a/pkg/local_object_storage/shard/inhume_test.go b/pkg/local_object_storage/shard/inhume_test.go index 1421f0e18..cbad41039 100644 --- a/pkg/local_object_storage/shard/inhume_test.go +++ b/pkg/local_object_storage/shard/inhume_test.go @@ -40,7 +40,8 @@ func testShardInhume(t *testing.T, hasWriteCache bool) { putPrm.SetObject(obj) var inhPrm InhumePrm - inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj)) + const tombstoneExpEpoch = 1008 + inhPrm.SetTarget(object.AddressOf(ts), tombstoneExpEpoch, object.AddressOf(obj)) var getPrm GetPrm getPrm.SetAddress(object.AddressOf(obj)) diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 5caf3641f..964890a62 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -89,11 +89,13 @@ func TestShard_Lock(t *testing.T) { _, err = sh.Put(context.Background(), putPrm) require.NoError(t, err) + const tombstoneExpEpoch = 1008 + t.Run("inhuming locked objects", func(t *testing.T) { ts := testutil.GenerateObjectWithCID(cnr) var inhumePrm InhumePrm - inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj)) + inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(obj)) var objLockedErr *apistatus.ObjectLocked @@ -109,7 +111,7 @@ func TestShard_Lock(t *testing.T) { ts := testutil.GenerateObjectWithCID(cnr) var inhumePrm InhumePrm - inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock)) + inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(lock)) _, err = sh.Inhume(context.Background(), inhumePrm) require.Error(t, err) diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 5230dcad0..8f79dcf95 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -314,11 +314,13 @@ func TestCounters(t *testing.T) { logic := mm.getObjectCounter(logical) custom := mm.getObjectCounter(user) + const tombstoneExpEpoch = 1008 + inhumedNumber := int(phy / 4) for _, o := range addrFromObjs(oo[:inhumedNumber]) { ts := oidtest.Address() ts.SetContainer(o.Container()) - prm.SetTarget(ts, o) + prm.SetTarget(ts, tombstoneExpEpoch, o) _, err := sh.Inhume(context.Background(), prm) require.NoError(t, err) } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 1eb7f14d0..da139457d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -46,8 +46,8 @@ type Shard struct { // Option represents Shard's constructor option. type Option func(*cfg) -// ExpiredTombstonesCallback is a callback handling list of expired tombstones. -type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) +// ExpiredGravesCallback is a callback handling list of expired graves. +type ExpiredGravesCallback func(context.Context, []meta.TombstonedObject) // ExpiredObjectsCallback is a callback handling list of expired objects. type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) @@ -82,9 +82,9 @@ type cfg struct { gcCfg gcCfg - expiredTombstonesCallback ExpiredTombstonesCallback + expiredGravesCallback ExpiredGravesCallback - expiredLocksCallback ExpiredObjectsCallback + expiredLockObjectsCallback ExpiredObjectsCallback deletedLockCallBack DeletedLockCallback @@ -248,9 +248,9 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { // WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. -func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option { +func WithExpiredTombstonesCallback(cb ExpiredGravesCallback) Option { return func(c *cfg) { - c.expiredTombstonesCallback = cb + c.expiredGravesCallback = cb } } @@ -258,7 +258,7 @@ func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option { // of the expired LOCK objects handler. func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option { return func(c *cfg) { - c.expiredLocksCallback = cb + c.expiredLockObjectsCallback = cb } } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index f9ee34488..7515eef39 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -93,7 +93,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard sh.HandleDeletedLocks(ctx, addresses) }), WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { - sh.HandleExpiredLocks(ctx, epoch, a) + sh.HandleExpiredLockObjects(ctx, epoch, a) }), WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) diff --git a/pkg/services/object/common/writer/local.go b/pkg/services/object/common/writer/local.go index cf3d03275..ae72e9a66 100644 --- a/pkg/services/object/common/writer/local.go +++ b/pkg/services/object/common/writer/local.go @@ -2,6 +2,7 @@ package writer import ( "context" + "errors" "fmt" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" @@ -11,6 +12,8 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) +var errObjectHasNoExpirationEpoch = errors.New("object has no expiration epoch") + // ObjectStorage is an object storage interface. type ObjectStorage interface { // Put must save passed object @@ -18,7 +21,7 @@ type ObjectStorage interface { Put(context.Context, *objectSDK.Object, bool) error // Delete must delete passed objects // and return any appeared error. - Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error + Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error // Lock must lock passed objects // and return any appeared error. Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error @@ -38,7 +41,12 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met switch meta.Type() { case objectSDK.TypeTombstone: - err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects()) + expEpoch, ok := objectCore.ExpirationEpoch(obj) + if !ok { + return errObjectHasNoExpirationEpoch + } + + err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects(), expEpoch) if err != nil { return fmt.Errorf("could not delete objects from tombstone locally: %w", err) } diff --git a/scripts/populate-metabase/internal/populate.go b/scripts/populate-metabase/internal/populate.go index 4da23a295..f0467a474 100644 --- a/scripts/populate-metabase/internal/populate.go +++ b/scripts/populate-metabase/internal/populate.go @@ -186,6 +186,7 @@ func PopulateGraveyard( prm := meta.InhumePrm{} prm.SetAddresses(addr) prm.SetTombstoneAddress(tsAddr) + prm.SetTombstoneExpEpoch(rand.Uint64()) group.Go(func() error { if _, err := db.Inhume(ctx, prm); err != nil {