local_object_storage: Guarantee graves removal when handling expired tombstones #1481

Open
a-savchuk wants to merge 7 commits from a-savchuk/frostfs-node:remove-dangling-locks into master
32 changed files with 625 additions and 148 deletions

View file

@ -464,7 +464,7 @@ func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Ad
return e.engine.IsLocked(ctx, address)
}
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error {
var prm engine.InhumePrm
addrs := make([]oid.Address, len(toDelete))
@ -473,7 +473,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
addrs[i].SetObject(toDelete[i])
}
prm.WithTarget(tombstone, addrs...)
prm.WithTarget(tombstone, expEpoch, addrs...)
return e.engine.Inhume(ctx, prm)
}

View file

@ -253,6 +253,8 @@ const (
ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage"
ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects"
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects"
ShardFailedToRemoveExpiredGraves = "failed to remove expired graves"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"

View file

@ -11,7 +11,7 @@ import (
type GCMetrics interface {
AddRunDuration(shardID string, d time.Duration, success bool)
AddDeletedCount(shardID string, deleted, failed uint64)
AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string)
AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool)
AddInhumedObjectCount(shardID string, count uint64, objectType string)
}
@ -71,11 +71,10 @@ func (m *gcMetrics) AddDeletedCount(shardID string, deleted, failed uint64) {
}).Add(float64(failed))
}
func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) {
func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool) {
m.expCollectDuration.With(prometheus.Labels{
shardIDLabel: shardID,
successLabel: strconv.FormatBool(success),
objectTypeLabel: objectType,
shardIDLabel: shardID,
successLabel: strconv.FormatBool(success),
}).Add(d.Seconds())
}

View file

@ -1,6 +1,9 @@
package object
import (
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -21,3 +24,14 @@ func AddressOf(obj *objectSDK.Object) oid.Address {
return addr
}
// ExpirationEpoch returns the expiration epoch of the object.
func ExpirationEpoch(obj *objectSDK.Object) (epoch uint64, found bool) {
for _, attr := range obj.Attributes() {
if attr.Key() == objectV2.SysAttributeExpEpoch {
epoch, err := strconv.ParseUint(attr.Value(), 10, 64)
return epoch, err == nil
}
}
return
}

View file

@ -0,0 +1,49 @@
package object_test
import (
"strconv"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
)
func TestExpirationEpoch(t *testing.T) {
obj := objecttest.Object()
var expEpoch uint64 = 42
expAttr := objectSDK.NewAttribute()
expAttr.SetKey(objectV2.SysAttributeExpEpoch)
expAttr.SetValue(strconv.FormatUint(expEpoch, 10))
t.Run("no attributes set", func(t *testing.T) {
obj.SetAttributes()
_, found := objectCore.ExpirationEpoch(obj)
require.False(t, found)
})
t.Run("no expiration epoch attribute", func(t *testing.T) {
obj.SetAttributes(*objecttest.Attribute(), *objectSDK.NewAttribute())
_, found := objectCore.ExpirationEpoch(obj)
require.False(t, found)
})
t.Run("valid expiration epoch attribute", func(t *testing.T) {
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
epoch, found := objectCore.ExpirationEpoch(obj)
require.True(t, found)
require.Equal(t, expEpoch, epoch)
})
t.Run("invalid expiration epoch value", func(t *testing.T) {
expAttr.SetValue("-42")
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
_, found := objectCore.ExpirationEpoch(obj)
require.False(t, found)
expAttr.SetValue("qwerty")
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
_, found = objectCore.ExpirationEpoch(obj)
require.False(t, found)
})
}

View file

@ -0,0 +1,111 @@
package engine
import (
"context"
"strconv"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func testPopulateWithObjects(t testing.TB, engine *StorageEngine, cnt cid.ID, objectCount int) (objects []*objectSDK.Object) {
require.Positive(t, objectCount)
var putPrm PutPrm
for range objectCount {
putPrm.Object = testutil.GenerateObjectWithCID(cnt)
require.NoError(t, engine.Put(context.Background(), putPrm))
objects = append(objects, putPrm.Object)
}
return objects
}
func testInhumeObjects(t testing.TB, engine *StorageEngine, objects []*objectSDK.Object, expEpoch uint64) (tombstone *objectSDK.Object) {
require.NotEmpty(t, objects)
cnt := objectCore.AddressOf(objects[0]).Container()
tombstone = testutil.GenerateObjectWithCID(cnt)
tombstone.SetType(objectSDK.TypeTombstone)
testutil.AddAttribute(tombstone, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expEpoch, 10))
var putPrm PutPrm
putPrm.Object = tombstone
require.NoError(t, engine.Put(context.Background(), putPrm))
var addrs []oid.Address
for _, object := range objects {
addrs = append(addrs, objectCore.AddressOf(object))
}
tombstoneAddr := objectCore.AddressOf(tombstone)
pivot := len(objects) / 2
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneAddr, expEpoch, addrs[:pivot]...)
err := engine.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
inhumePrm.WithTarget(tombstoneAddr, meta.NoExpirationEpoch, addrs[pivot:]...)
err = engine.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
return
}
func TestGCHandleExpiredTombstones(t *testing.T) {
t.Parallel()
const (
numShards = 2
objectCount = 10 * numShards
expEpoch = 1
)
container := cidtest.ID()
engine := testNewEngine(t).
setShardsNumAdditionalOpts(t, numShards, func(_ int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithTombstoneSource(tss{expEpoch}),
}
}).prepare(t).engine
defer func() { require.NoError(t, engine.Close(context.Background())) }()
objects := testPopulateWithObjects(t, engine, container, objectCount)
tombstone := testInhumeObjects(t, engine, objects, expEpoch)
engine.HandleNewEpoch(context.Background(), expEpoch+1)
var headPrm HeadPrm
require.Eventually(t, func() bool {
for _, object := range objects {
headPrm.WithAddress(objectCore.AddressOf(object))
_, err := engine.Head(context.Background(), headPrm)
if !client.IsErrObjectNotFound(err) {
return false
}
}
headPrm.WithAddress(objectCore.AddressOf(tombstone))
_, err := engine.Head(context.Background(), headPrm)
return client.IsErrObjectNotFound(err)
}, 10*time.Second, 1*time.Second)
}

View file

@ -24,6 +24,7 @@ type InhumePrm struct {
addrs []oid.Address
forceRemoval bool
expEpoch uint64
}
// WithTarget sets a list of objects that should be inhumed and tombstone address
@ -31,9 +32,10 @@ type InhumePrm struct {
//
// tombstone should not be nil, addr should not be empty.
// Should not be called along with MarkAsGarbage.
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
func (p *InhumePrm) WithTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
p.addrs = addrs
p.tombstone = &tombstone
p.expEpoch = expEpoch
}
// MarkAsGarbage marks an object to be physically removed from local storage.
@ -88,7 +90,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
for shardID, addrs := range addrsPerShard {
if prm.tombstone != nil {
shPrm.SetTarget(*prm.tombstone, addrs...)
shPrm.SetTarget(*prm.tombstone, prm.expEpoch, addrs...)
} else {
shPrm.MarkAsGarbage(addrs...)
}
@ -270,9 +272,9 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I
return allLocks, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
func (e *StorageEngine) processExpiredGraves(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(ctx, addrs)
sh.HandleExpiredGraves(ctx, addrs)
select {
case <-ctx.Done():
@ -283,9 +285,9 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me
})
}
func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
func (e *StorageEngine) processExpiredLockObjects(ctx context.Context, epoch uint64, lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(ctx, epoch, lockers)
sh.HandleExpiredLockObjects(ctx, epoch, lockers)
select {
case <-ctx.Done():

View file

@ -44,6 +44,8 @@ func TestStorageEngine_Inhume(t *testing.T) {
link.SetChildren(idChild)
link.SetSplitID(splitID)
const tombstoneExpEpoch = 1008
t.Run("delete small object", func(t *testing.T) {
t.Parallel()
e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
@ -53,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent))
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -83,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent))
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -127,7 +129,9 @@ func TestStorageEngine_ECInhume(t *testing.T) {
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
const tombstoneExpEpoch = 1008
inhumePrm.WithTarget(tombstoneObjectAddress, tombstoneExpEpoch, parentObjectAddress)
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -143,6 +147,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
const currEpoch = 42
const objectExpiresAfter = currEpoch - 1
const tombstoneExpiresAfter = currEpoch + 1000
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{
@ -172,7 +177,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
ts.SetContainer(cnr)
var prm InhumePrm
prm.WithTarget(ts, object.AddressOf(obj))
prm.WithTarget(ts, tombstoneExpiresAfter, object.AddressOf(obj))
err := engine.Inhume(context.Background(), prm)
require.NoError(t, err)
})
@ -205,6 +210,8 @@ func BenchmarkInhumeMultipart(b *testing.B) {
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
b.StopTimer()
const tombstoneExpiresAfter = 1000 // doesn't matter, just big enough
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
setShardsNum(b, numShards).prepare(b).engine
defer func() { require.NoError(b, engine.Close(context.Background())) }()
@ -234,7 +241,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
ts.SetContainer(cnt)
prm := InhumePrm{}
prm.WithTarget(ts, addrs...)
prm.WithTarget(ts, tombstoneExpiresAfter, addrs...)
b.StartTimer()
err := engine.Inhume(context.Background(), prm)

View file

@ -40,6 +40,7 @@ func TestLockUserScenario(t *testing.T) {
// 5. waits for an epoch after the lock expiration one
// 6. tries to inhume the object and expects success
const lockerExpiresAfter = 13
const tombstoneExpiresAfter = 1000
cnr := cidtest.ID()
tombObj := testutil.GenerateObjectWithCID(cnr)
@ -111,13 +112,15 @@ func TestLockUserScenario(t *testing.T) {
// 3.
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombAddr, objAddr)
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
var objLockedErr *apistatus.ObjectLocked
err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 4.
a.SetValue(strconv.Itoa(tombstoneExpiresAfter))
tombObj.SetType(objectSDK.TypeTombstone)
tombObj.SetID(tombForLockID)
tombObj.SetAttributes(a)
@ -125,7 +128,7 @@ func TestLockUserScenario(t *testing.T) {
err = Put(context.Background(), e, tombObj, false)
require.NoError(t, err)
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
inhumePrm.WithTarget(tombForLockAddr, tombstoneExpiresAfter, lockerAddr)
err = e.Inhume(context.Background(), inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
@ -133,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
// 5.
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr)
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
require.Eventually(t, func() bool {
err = e.Inhume(context.Background(), inhumePrm)
@ -166,6 +169,7 @@ func TestLockExpiration(t *testing.T) {
defer func() { require.NoError(t, e.Close(context.Background())) }()
const lockerExpiresAfter = 13
const tombstoneExpiresAfter = 1000
cnr := cidtest.ID()
var err error
@ -197,7 +201,7 @@ func TestLockExpiration(t *testing.T) {
var inhumePrm InhumePrm
tombAddr := oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
err = e.Inhume(context.Background(), inhumePrm)
@ -209,7 +213,7 @@ func TestLockExpiration(t *testing.T) {
// 4.
tombAddr = oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
require.Eventually(t, func() bool {
err = e.Inhume(context.Background(), inhumePrm)
@ -273,7 +277,8 @@ func TestLockForceRemoval(t *testing.T) {
err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
const tombstoneExpEpoch = 1008
inhumePrm.WithTarget(oidtest.Address(), tombstoneExpEpoch, objectcore.AddressOf(obj))
err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)

View file

@ -39,8 +39,8 @@ func (m *gcMetrics) AddDeletedCount(deleted, failed uint64) {
m.storage.AddDeletedCount(m.shardID, deleted, failed)
}
func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) {
m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success, objectType)
func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool) {
m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success)
}
func (m *gcMetrics) AddInhumedObjectCount(count uint64, objectType string) {
@ -87,7 +87,7 @@ func (noopWriteCacheMetrics) SetMode(string, string)
func (noopWriteCacheMetrics) IncOperationCounter(string, string, string, string, metrics.NullBool) {}
func (noopWriteCacheMetrics) Close(string, string) {}
func (noopGCMetrics) AddRunDuration(string, time.Duration, bool) {}
func (noopGCMetrics) AddDeletedCount(string, uint64, uint64) {}
func (noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool, string) {}
func (noopGCMetrics) AddInhumedObjectCount(string, uint64, string) {}
func (noopGCMetrics) AddRunDuration(string, time.Duration, bool) {}
func (noopGCMetrics) AddDeletedCount(string, uint64, uint64) {}
func (noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool) {}
func (noopGCMetrics) AddInhumedObjectCount(string, uint64, string) {}

View file

@ -131,8 +131,8 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithExpiredTombstonesCallback(e.processExpiredGraves),
shard.WithExpiredLocksCallback(e.processExpiredLockObjects),
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorByID),
shard.WithZeroSizeCallback(e.processZeroSizeContainers),

View file

@ -156,11 +156,13 @@ func TestCounters(t *testing.T) {
}
var prm meta.InhumePrm
const tombstoneExpEpoch = 1008
for _, o := range inhumedObjs {
tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(tombAddr)
prm.SetTombstoneExpEpoch(tombstoneExpEpoch)
prm.SetAddresses(o)
res, err := db.Inhume(context.Background(), prm)
@ -301,11 +303,13 @@ func TestCounters(t *testing.T) {
}
var prm meta.InhumePrm
const tombstoneExpEpoch = 1008
for _, o := range inhumedObjs {
tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(tombAddr)
prm.SetTombstoneExpEpoch(tombstoneExpEpoch)
prm.SetAddresses(o)
_, err := db.Inhume(context.Background(), prm)

View file

@ -23,10 +23,13 @@ import (
func TestDeleteECObject_WithoutSplit(t *testing.T) {
t.Parallel()
const currEpoch = 12
const tombstoneExpEpoch = currEpoch + 1
db := New(
WithPath(filepath.Join(t.TempDir(), "metabase")),
WithPermissions(0o600),
WithEpochState(epochState{uint64(12)}),
WithEpochState(epochState{uint64(currEpoch)}),
)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
@ -82,6 +85,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
tombAddress.SetObject(tombstoneID)
inhumePrm.SetAddresses(ecParentAddress)
inhumePrm.SetTombstoneAddress(tombAddress)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -179,10 +183,13 @@ func TestDeleteECObject_WithSplit(t *testing.T) {
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
t.Parallel()
const currEpoch = 12
const tombstoneExpEpoch = currEpoch + 1
db := New(
WithPath(filepath.Join(t.TempDir(), "metabase")),
WithPermissions(0o600),
WithEpochState(epochState{uint64(12)}),
WithEpochState(epochState{currEpoch}),
)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
@ -289,6 +296,7 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
tombAddress.SetObject(tombstoneID)
inhumePrm.SetAddresses(inhumeAddresses...)
inhumePrm.SetTombstoneAddress(tombAddress)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)

View file

@ -91,8 +91,9 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
// TombstonedObject represents descriptor of the
// object that has been covered with tombstone.
type TombstonedObject struct {
addr oid.Address
tomb oid.Address
addr oid.Address
tomb oid.Address
expEpoch uint64
}
// Address returns tombstoned object address.
@ -106,6 +107,11 @@ func (g TombstonedObject) Tombstone() oid.Address {
return g.tomb
}
// ExpirationEpoch returns an expiration epoch of a tombstoned object.
func (g TombstonedObject) ExpirationEpoch() uint64 {
return g.expEpoch
}
// TombstonedHandler is a TombstonedObject handling function.
type TombstonedHandler func(object TombstonedObject) error
@ -249,7 +255,7 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) {
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
if err = decodeAddressFromKey(&res.addr, k); err != nil {
err = fmt.Errorf("decode tombstone target from key: %w", err)
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
} else if res.tomb, res.expEpoch, err = decodeTombstoneKeyWithExpEpoch(v); err != nil {
err = fmt.Errorf("decode tombstone address from value: %w", err)
}
@ -311,3 +317,42 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
})
return res, err
}
// RemoveGraves removes graves from graveyard.
func (db *DB) RemoveGraves(ctx context.Context, graves []TombstonedObject) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("RemoveGraves", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveGraves")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
} else if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
key := make([]byte, addressKeySize)
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
graveyard := tx.Bucket(graveyardBucketName)
if graveyard == nil {
return nil
}
for i := range graves {
if err := graveyard.Delete(addressKey(graves[i].Address(), key)); err != nil {
return fmt.Errorf("remove grave: %w", err)
}
}
return nil
})
return metaerr.Wrap(err)
}

View file

@ -2,6 +2,7 @@ package meta_test
import (
"context"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -13,6 +14,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) {
@ -144,8 +146,10 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
const tombstoneExpEpoch = 1008
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.SetTombstoneAddress(addrTombstone)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -232,10 +236,13 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
addrTombstone.SetContainer(cnr)
var inhumePrm meta.InhumePrm
const tombstoneExpEpoch = 1008
inhumePrm.SetAddresses(
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.SetTombstoneAddress(addrTombstone)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -428,8 +435,11 @@ func TestDB_InhumeTombstones(t *testing.T) {
addrTombstone := object.AddressOf(objTs)
var inhumePrm meta.InhumePrm
const tombstoneExpEpoch = 1008
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.SetTombstoneAddress(addrTombstone)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
@ -464,3 +474,68 @@ func TestDB_InhumeTombstones(t *testing.T) {
require.NoError(t, err)
require.Zero(t, counter)
}
func TestIterateOverGraveyardWithDifferentGraveFormats(t *testing.T) {
t.Parallel()
type grave struct {
addr, tomb oid.Address
expEpoch uint64
}
var (
numGraves = 20
expectedGraves []grave
actualGraves []grave
eg errgroup.Group
)
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
var expEpochs []uint64
for i := range numGraves / 2 {
expEpochs = append(expEpochs, uint64(i))
expEpochs = append(expEpochs, meta.NoExpirationEpoch)
}
rand.Shuffle(len(expEpochs), func(i, j int) {
expEpochs[i], expEpochs[j] = expEpochs[j], expEpochs[i]
})
for _, expEpoch := range expEpochs {
cnt := cidtest.ID()
addr := oidtest.Address()
addr.SetContainer(cnt)
tomb := oidtest.Address()
tomb.SetContainer(cnt)
expectedGraves = append(expectedGraves, grave{
addr: addr, tomb: tomb, expEpoch: expEpoch,
})
eg.Go(func() error {
var prm meta.InhumePrm
prm.SetAddresses(addr)
prm.SetTombstoneAddress(tomb)
prm.SetTombstoneExpEpoch(expEpoch)
_, err := db.Inhume(context.Background(), prm)
return err
})
}
require.NoError(t, eg.Wait())
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(o meta.TombstonedObject) error {
actualGraves = append(actualGraves, grave{
o.Address(), o.Tombstone(), o.ExpirationEpoch(),
})
return nil
})
require.NoError(t, db.IterateOverGraveyard(context.Background(), iterPrm))
require.ElementsMatch(t, expectedGraves, actualGraves)
}

View file

@ -18,6 +18,8 @@ import (
"go.etcd.io/bbolt"
)
var errTombstoneExpEpochNotSet = errors.New("tombstone expiration epoch is not set")
// InhumePrm encapsulates parameters for Inhume operation.
type InhumePrm struct {
tomb *oid.Address
@ -27,6 +29,9 @@ type InhumePrm struct {
lockObjectHandling bool
forceRemoval bool
expEpoch uint64
expEpochSet bool
}
// DeletionInfo contains details on deleted object.
@ -123,6 +128,15 @@ func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
p.tomb = &addr
}
// SetTombstoneExpEpoch sets the expiration epoch for a tombstone.
//
// Setting the expiration epoch is required when inhuming with a tombstone.
// [meta.NoExpirationEpoch] may be used allowed, but only for testing purposes.
func (p *InhumePrm) SetTombstoneExpEpoch(epoch uint64) {
p.expEpoch = epoch
p.expEpochSet = true
}
// SetGCMark marks the object to be physically removed.
//
// Should not be called along with SetTombstoneAddress.
@ -148,6 +162,9 @@ func (p *InhumePrm) validate() error {
return nil
}
if p.tomb != nil {
if !p.expEpochSet {
return errTombstoneExpEpochNotSet
}
for _, addr := range p.target {
if addr.Container() != p.tomb.Container() {
return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb)
@ -365,7 +382,10 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
tombKey := make([]byte, addressKeySize+epochSize)
if err = encodeTombstoneKeyWithExpEpoch(*prm.tomb, prm.expEpoch, tombKey); err != nil {
return nil, nil, fmt.Errorf("encode tombstone key with expiration epoch: %w", err)
}
// it is forbidden to have a tomb-on-tomb in FrostFS,
// so graveyard keys must not be addresses of tombstones
@ -376,6 +396,14 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
}
}
// it can be a tombstone key without an expiration epoch
data = targetBucket.Get(tombKey[:addressKeySize])
if data != nil {
err := targetBucket.Delete(tombKey[:addressKeySize])
if err != nil {
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
}
}
value = tombKey
} else {
@ -418,6 +446,9 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
targetIsTomb := false
// take only address because graveyard record may have expiration epoch suffix
addressKey = addressKey[:addressKeySize]
// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
// tombstone must have the same container ID as key.
@ -426,7 +457,7 @@ func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, addressKey)
targetIsTomb = bytes.HasPrefix(v, addressKey)
if targetIsTomb {
break
}

View file

@ -94,10 +94,12 @@ func TestInhumeECObject(t *testing.T) {
require.True(t, res.deletionDetails[0].Size == 5)
// inhume EC parent (like Delete does)
const tombstoneExpEpoch = 1008
tombAddress.SetContainer(cnr)
tombAddress.SetObject(tombstoneID)
inhumePrm.SetAddresses(ecParentAddress)
inhumePrm.SetTombstoneAddress(tombAddress)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
res, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
// Previously deleted chunk shouldn't be in the details, because it is marked as garbage

View file

@ -50,6 +50,8 @@ func TestInhumeTombOnTomb(t *testing.T) {
inhumePrm meta.InhumePrm
existsPrm meta.ExistsPrm
)
const tombstoneExpEpoch = 1008
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
addr1.SetContainer(cnr)
addr2.SetContainer(cnr)
@ -124,12 +126,15 @@ func TestInhumeLocked(t *testing.T) {
}
func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error {
const tombstoneExpEpoch = 1008
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(target)
var tombAddr oid.Address
tombAddr.SetContainer(target.Container())
tombAddr.SetObject(tomb)
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err := db.Inhume(context.Background(), inhumePrm)
return err

View file

@ -62,6 +62,8 @@ func TestDB_Lock(t *testing.T) {
objAddr := objectcore.AddressOf(objs[0])
lockAddr := objectcore.AddressOf(lockObj)
const tombstoneExpEpoch = 1008
var inhumePrm meta.InhumePrm
inhumePrm.SetGCMark()
@ -76,6 +78,7 @@ func TestDB_Lock(t *testing.T) {
tombAddr := oidtest.Address()
tombAddr.SetContainer(objAddr.Container())
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
@ -94,6 +97,7 @@ func TestDB_Lock(t *testing.T) {
tombAddr = oidtest.Address()
tombAddr.SetContainer(objAddr.Container())
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
_, err = db.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
})

View file

@ -309,3 +309,56 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
objectKey(obj, make([]byte, objectKeySize)))
}
const NoExpirationEpoch uint64 = 0
// encodeTombstoneKeyWithExpEpoch encodes a tombstone key of a tombstoned
// object if the following format: tombstone address + expiration epoch.
//
// Returns an error if the buffer length isn't 32.
//
// The expiration epoch shouldn't be [NoExpirationEpoch], as tombstone keys
// are intended to have a valid expiration epoch.
//
// The use of [NoExpirationEpoch] is allowed only for test purposes.
func encodeTombstoneKeyWithExpEpoch(addr oid.Address, expEpoch uint64, dst []byte) error {
if len(dst) != addressKeySize+epochSize {
return errInvalidLength
}
addr.Container().Encode(dst[:cidSize])
addr.Object().Encode(dst[cidSize:addressKeySize])
binary.LittleEndian.PutUint64(dst[addressKeySize:], expEpoch)
return nil
}
// decodeTombstoneKeyWithExpEpoch decodes a tombstone key of a tombstoned object.
// The tombstone key may have one of the following formats:
// - tombstone address
// - tombstone address + expiration epoch
//
// Expiration epoch is set to [NoExpirationEpoch] if the key doesn't have it.
func decodeTombstoneKeyWithExpEpoch(src []byte) (addr oid.Address, expEpoch uint64, err error) {
if len(src) != addressKeySize && len(src) != addressKeySize+epochSize {
return oid.Address{}, 0, errInvalidLength
}
var cnt cid.ID
if err := cnt.Decode(src[:cidSize]); err != nil {
return addr, 0, err
}
var obj oid.ID
if err := obj.Decode(src[cidSize:addressKeySize]); err != nil {
return addr, 0, err
}
addr.SetContainer(cnt)
addr.SetObject(obj)
if len(src) > addressKeySize {
expEpoch = binary.LittleEndian.Uint64(src[addressKeySize:])
} else {
expEpoch = NoExpirationEpoch
}
return
}

View file

@ -116,9 +116,8 @@ func (s *Shard) Init(ctx context.Context) error {
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredGraves,
s.collectExpiredMetrics,
},
},
@ -334,6 +333,11 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
}
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
expEpoch, ok := object.ExpirationEpoch(obj)
if !ok {
return fmt.Errorf("tombstone %s has no expiration epoch", object.AddressOf(obj))
}
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
@ -354,6 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetTombstoneExpEpoch(expEpoch)
inhumePrm.SetAddresses(tombMembers...)
_, err := s.metaBase.Inhume(ctx, inhumePrm)

View file

@ -7,6 +7,7 @@ import (
"math"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
@ -19,6 +20,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -234,8 +236,15 @@ func TestRefillMetabase(t *testing.T) {
}
}
expirationEpoch := 1008
expirationAttr := *objectSDK.NewAttribute()
expirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
expirationAttr.SetValue(strconv.Itoa(expirationEpoch))
tombObj := objecttest.Object()
tombObj.SetType(objectSDK.TypeTombstone)
tombObj.SetAttributes(expirationAttr)
tombstone := objecttest.Tombstone()
@ -276,6 +285,7 @@ func TestRefillMetabase(t *testing.T) {
lockObj := objecttest.Object()
lockObj.SetContainerID(cnrLocked)
lockObj.SetAttributes(expirationAttr)
objectSDK.WriteLock(lockObj, lock)
putPrm.SetObject(lockObj)
@ -286,7 +296,7 @@ func TestRefillMetabase(t *testing.T) {
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
var inhumePrm InhumePrm
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)

View file

@ -86,17 +86,17 @@ type GCMectrics interface {
SetShardID(string)
AddRunDuration(d time.Duration, success bool)
AddDeletedCount(deleted, failed uint64)
AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string)
AddExpiredObjectCollectionDuration(d time.Duration, success bool)
AddInhumedObjectCount(count uint64, objectType string)
}
type noopGCMetrics struct{}
func (m *noopGCMetrics) SetShardID(string) {}
func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {}
func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {}
func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool, string) {}
func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {}
func (m *noopGCMetrics) SetShardID(string) {}
func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {}
func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {}
func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool) {}
func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {}
type gc struct {
*gcCfg
@ -355,43 +355,63 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
startedAt := time.Now()
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil)
}()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
epoch := e.(newEpoch).epoch
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(workersCount)
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
batch = append(batch, o.Address())
handlers := map[objectSDK.Type]func(context.Context, []oid.Address){
objectSDK.TypeRegular: s.handleExpiredObjects,
objectSDK.TypeTombstone: s.handleExpiredTombstones,
objectSDK.TypeLock: func(ctx context.Context, batch []oid.Address) {
s.expiredLockObjectsCallback(ctx, epoch, batch)
},
}
batches := make(map[objectSDK.Type][]oid.Address)
for typ := range handlers {
batches[typ] = make([]oid.Address, 0, batchSize)
}
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.handleExpiredObjects(egCtx, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
}
errGroup.Go(func() error {
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
typ := o.Type()
if _, ok := handlers[typ]; !ok {
s.log.Warn(ctx, logs.ShardUnknownObjectTypeWhileIteratingExpiredObjects, zap.Stringer("type", typ))
return
}
batches[typ] = append(batches[typ], o.Address())
if len(batches[typ]) == batchSize {
expired := batches[typ]
errGroup.Go(func() error {
handlers[typ](egCtx, expired)
return egCtx.Err()
})
batches[typ] = make([]oid.Address, 0, batchSize)
}
})
if expErr != nil {
return expErr
}
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.handleExpiredObjects(egCtx, expired)
return egCtx.Err()
})
for typ, batch := range batches {
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
handlers[typ](egCtx, expired)
return egCtx.Err()
})
}
}
return nil
@ -402,6 +422,41 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
}
}
func (s *Shard) handleExpiredTombstones(ctx context.Context, expired []oid.Address) {
select {
case <-ctx.Done():
return
default:
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
res, err := s.metaBase.Inhume(ctx, inhumePrm)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
return
}
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
for i := range res.GetDeletionInfoLength() {
delInfo := res.GetDeletionInfoByIndex(i)
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
}
}
func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) {
select {
case <-ctx.Done():
@ -464,12 +519,12 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address)
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil)
}()
epoch := e.(newEpoch).epoch
@ -480,7 +535,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
expiredGravesWithExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
expiredGravesWithoutExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
@ -521,72 +577,45 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
}
for _, ts := range tss {
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
tssExp = append(tssExp, ts)
if ts.ExpirationEpoch() != meta.NoExpirationEpoch {
expiredGravesWithExpEpoch = append(expiredGravesWithExpEpoch, ts)
} else if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
expiredGravesWithoutExpEpoch = append(expiredGravesWithoutExpEpoch, ts)
}
}
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
if len(tssExp) > 0 {
s.expiredTombstonesCallback(ctx, tssExp)
}
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch,
zap.Int("with expiration epoch", len(expiredGravesWithExpEpoch)),
zap.Int("without expiration epoch", len(expiredGravesWithoutExpEpoch)),
)
s.handleExpiredGraves(ctx, expiredGravesWithExpEpoch, expiredGravesWithoutExpEpoch)
iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0]
tssExp = tssExp[:0]
expiredGravesWithExpEpoch = expiredGravesWithExpEpoch[:0]
expiredGravesWithoutExpEpoch = expiredGravesWithoutExpEpoch[:0]
}
}
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
func (s *Shard) handleExpiredGraves(ctx context.Context,
expiredWithExpEpoch []meta.TombstonedObject, expiredWithoutExpEpoch []meta.TombstonedObject,
) {
if len(expiredWithExpEpoch) > 0 {
s.removeExpiredGraves(ctx, expiredWithExpEpoch)
}
if len(expiredWithoutExpEpoch) > 0 {
s.expiredGravesCallback(ctx, expiredWithoutExpEpoch)
}
}
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
}()
func (s *Shard) removeExpiredGraves(ctx context.Context, expired []meta.TombstonedObject) {
if s.info.Mode.NoMetabase() {
return
}
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(workersCount)
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() == objectSDK.TypeLock {
batch = append(batch, o.Address())
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
}
}
})
if expErr != nil {
return expErr
}
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
}
return nil
})
if err = errGroup.Wait(); err != nil {
s.log.Warn(ctx, logs.ShardIteratorOverExpiredLocksFailed, zap.Error(err))
if err := s.metaBase.RemoveGraves(ctx, expired); err != nil {
s.log.Warn(ctx, logs.ShardFailedToRemoveExpiredGraves, zap.Error(err))
}
}
@ -624,11 +653,11 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
// HandleExpiredTombstones marks tombstones themselves as garbage
// HandleExpiredGraves marks tombstones themselves as garbage
// and clears up corresponding graveyard records.
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
func (s *Shard) HandleExpiredGraves(ctx context.Context, tss []meta.TombstonedObject) {
s.m.RLock()
defer s.m.RUnlock()
@ -658,9 +687,9 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
}
}
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// HandleExpiredLockObjects unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
func (s *Shard) HandleExpiredLockObjects(ctx context.Context, epoch uint64, lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}

View file

@ -65,7 +65,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
sh.HandleDeletedLocks(ctx, addresses)
}),
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
sh.HandleExpiredLockObjects(ctx, epoch, a)
}),
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)

View file

@ -19,6 +19,7 @@ type InhumePrm struct {
target []oid.Address
tombstone *oid.Address
forceRemoval bool
expEpoch uint64
}
// InhumeRes encapsulates results of inhume operation.
@ -29,9 +30,10 @@ type InhumeRes struct{}
//
// tombstone should not be nil, addr should not be empty.
// Should not be called along with MarkAsGarbage.
func (p *InhumePrm) SetTarget(tombstone oid.Address, addrs ...oid.Address) {
func (p *InhumePrm) SetTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
p.target = addrs
p.tombstone = &tombstone
p.expEpoch = expEpoch
}
// MarkAsGarbage marks object to be physically removed from shard.
@ -93,6 +95,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
if prm.tombstone != nil {
metaPrm.SetTombstoneAddress(*prm.tombstone)
metaPrm.SetTombstoneExpEpoch(prm.expEpoch)
} else {
metaPrm.SetGCMark()
}

View file

@ -40,7 +40,8 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
putPrm.SetObject(obj)
var inhPrm InhumePrm
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
const tombstoneExpEpoch = 1008
inhPrm.SetTarget(object.AddressOf(ts), tombstoneExpEpoch, object.AddressOf(obj))
var getPrm GetPrm
getPrm.SetAddress(object.AddressOf(obj))

View file

@ -89,11 +89,13 @@ func TestShard_Lock(t *testing.T) {
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
const tombstoneExpEpoch = 1008
t.Run("inhuming locked objects", func(t *testing.T) {
ts := testutil.GenerateObjectWithCID(cnr)
var inhumePrm InhumePrm
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
@ -109,7 +111,7 @@ func TestShard_Lock(t *testing.T) {
ts := testutil.GenerateObjectWithCID(cnr)
var inhumePrm InhumePrm
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(lock))
_, err = sh.Inhume(context.Background(), inhumePrm)
require.Error(t, err)

View file

@ -314,11 +314,13 @@ func TestCounters(t *testing.T) {
logic := mm.getObjectCounter(logical)
custom := mm.getObjectCounter(user)
const tombstoneExpEpoch = 1008
inhumedNumber := int(phy / 4)
for _, o := range addrFromObjs(oo[:inhumedNumber]) {
ts := oidtest.Address()
ts.SetContainer(o.Container())
prm.SetTarget(ts, o)
prm.SetTarget(ts, tombstoneExpEpoch, o)
_, err := sh.Inhume(context.Background(), prm)
require.NoError(t, err)
}

View file

@ -46,8 +46,8 @@ type Shard struct {
// Option represents Shard's constructor option.
type Option func(*cfg)
// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
// ExpiredGravesCallback is a callback handling list of expired graves.
type ExpiredGravesCallback func(context.Context, []meta.TombstonedObject)
// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
@ -82,9 +82,9 @@ type cfg struct {
gcCfg gcCfg
expiredTombstonesCallback ExpiredTombstonesCallback
expiredGravesCallback ExpiredGravesCallback
expiredLocksCallback ExpiredObjectsCallback
expiredLockObjectsCallback ExpiredObjectsCallback
deletedLockCallBack DeletedLockCallback
@ -248,9 +248,9 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option {
// WithExpiredTombstonesCallback returns option to specify callback
// of the expired tombstones handler.
func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {
func WithExpiredTombstonesCallback(cb ExpiredGravesCallback) Option {
return func(c *cfg) {
c.expiredTombstonesCallback = cb
c.expiredGravesCallback = cb
}
}
@ -258,7 +258,7 @@ func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {
// of the expired LOCK objects handler.
func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option {
return func(c *cfg) {
c.expiredLocksCallback = cb
c.expiredLockObjectsCallback = cb
}
}

View file

@ -93,7 +93,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
sh.HandleDeletedLocks(ctx, addresses)
}),
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
sh.HandleExpiredLockObjects(ctx, epoch, a)
}),
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)

View file

@ -2,6 +2,7 @@ package writer
import (
"context"
"errors"
"fmt"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
@ -11,6 +12,8 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
var errObjectHasNoExpirationEpoch = errors.New("object has no expiration epoch")
// ObjectStorage is an object storage interface.
type ObjectStorage interface {
// Put must save passed object
@ -18,7 +21,7 @@ type ObjectStorage interface {
Put(context.Context, *objectSDK.Object, bool) error
// Delete must delete passed objects
// and return any appeared error.
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error
// Lock must lock passed objects
// and return any appeared error.
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error
@ -38,7 +41,12 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
switch meta.Type() {
case objectSDK.TypeTombstone:
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
expEpoch, ok := objectCore.ExpirationEpoch(obj)
if !ok {
return errObjectHasNoExpirationEpoch
}
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects(), expEpoch)
if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}

View file

@ -186,6 +186,7 @@ func PopulateGraveyard(
prm := meta.InhumePrm{}
prm.SetAddresses(addr)
prm.SetTombstoneAddress(tsAddr)
prm.SetTombstoneExpEpoch(rand.Uint64())
group.Go(func() error {
if _, err := db.Inhume(ctx, prm); err != nil {