[#1445] local_object_storage: Append expiration epoch to graves
In the near future the garbage collector will delete expired tombstones and graves separately. So, all graves should have expiration epochs. Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
9b29e7392f
commit
efec26b8ef
23 changed files with 261 additions and 30 deletions
|
@ -464,7 +464,7 @@ func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Ad
|
|||
return e.engine.IsLocked(ctx, address)
|
||||
}
|
||||
|
||||
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
|
||||
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error {
|
||||
var prm engine.InhumePrm
|
||||
|
||||
addrs := make([]oid.Address, len(toDelete))
|
||||
|
@ -473,7 +473,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
|||
addrs[i].SetObject(toDelete[i])
|
||||
}
|
||||
|
||||
prm.WithTarget(tombstone, addrs...)
|
||||
prm.WithTarget(tombstone, expEpoch, addrs...)
|
||||
|
||||
return e.engine.Inhume(ctx, prm)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -21,3 +24,14 @@ func AddressOf(obj *objectSDK.Object) oid.Address {
|
|||
|
||||
return addr
|
||||
}
|
||||
|
||||
// ExpirationEpoch returns the expiration epoch of the object.
|
||||
func ExpirationEpoch(obj *objectSDK.Object) (epoch uint64, found bool) {
|
||||
for _, attr := range obj.Attributes() {
|
||||
if attr.Key() == objectV2.SysAttributeExpEpoch {
|
||||
epoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
||||
return epoch, err == nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
49
pkg/core/object/object_test.go
Normal file
49
pkg/core/object/object_test.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package object_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExpirationEpoch(t *testing.T) {
|
||||
obj := objecttest.Object()
|
||||
|
||||
var expEpoch uint64 = 42
|
||||
expAttr := objectSDK.NewAttribute()
|
||||
expAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||
expAttr.SetValue(strconv.FormatUint(expEpoch, 10))
|
||||
|
||||
t.Run("no attributes set", func(t *testing.T) {
|
||||
obj.SetAttributes()
|
||||
_, found := objectCore.ExpirationEpoch(obj)
|
||||
require.False(t, found)
|
||||
})
|
||||
t.Run("no expiration epoch attribute", func(t *testing.T) {
|
||||
obj.SetAttributes(*objecttest.Attribute(), *objectSDK.NewAttribute())
|
||||
_, found := objectCore.ExpirationEpoch(obj)
|
||||
require.False(t, found)
|
||||
})
|
||||
t.Run("valid expiration epoch attribute", func(t *testing.T) {
|
||||
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
|
||||
epoch, found := objectCore.ExpirationEpoch(obj)
|
||||
require.True(t, found)
|
||||
require.Equal(t, expEpoch, epoch)
|
||||
})
|
||||
t.Run("invalid expiration epoch value", func(t *testing.T) {
|
||||
expAttr.SetValue("-42")
|
||||
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
|
||||
_, found := objectCore.ExpirationEpoch(obj)
|
||||
require.False(t, found)
|
||||
|
||||
expAttr.SetValue("qwerty")
|
||||
obj.SetAttributes(*objecttest.Attribute(), *expAttr, *objectSDK.NewAttribute())
|
||||
_, found = objectCore.ExpirationEpoch(obj)
|
||||
require.False(t, found)
|
||||
})
|
||||
}
|
|
@ -24,6 +24,7 @@ type InhumePrm struct {
|
|||
addrs []oid.Address
|
||||
|
||||
forceRemoval bool
|
||||
expEpoch uint64
|
||||
}
|
||||
|
||||
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
||||
|
@ -31,9 +32,10 @@ type InhumePrm struct {
|
|||
//
|
||||
// tombstone should not be nil, addr should not be empty.
|
||||
// Should not be called along with MarkAsGarbage.
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
|
||||
p.addrs = addrs
|
||||
p.tombstone = &tombstone
|
||||
p.expEpoch = expEpoch
|
||||
}
|
||||
|
||||
// MarkAsGarbage marks an object to be physically removed from local storage.
|
||||
|
@ -88,7 +90,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
|
|||
|
||||
for shardID, addrs := range addrsPerShard {
|
||||
if prm.tombstone != nil {
|
||||
shPrm.SetTarget(*prm.tombstone, addrs...)
|
||||
shPrm.SetTarget(*prm.tombstone, prm.expEpoch, addrs...)
|
||||
} else {
|
||||
shPrm.MarkAsGarbage(addrs...)
|
||||
}
|
||||
|
|
|
@ -44,6 +44,8 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
link.SetChildren(idChild)
|
||||
link.SetSplitID(splitID)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
|
||||
t.Run("delete small object", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
|
||||
|
@ -53,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent))
|
||||
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -83,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
inhumePrm.WithTarget(tombstoneID, tombstoneExpEpoch, object.AddressOf(parent))
|
||||
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -127,7 +129,9 @@ func TestStorageEngine_ECInhume(t *testing.T) {
|
|||
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.WithTarget(tombstoneObjectAddress, tombstoneExpEpoch, parentObjectAddress)
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -143,6 +147,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
|||
|
||||
const currEpoch = 42
|
||||
const objectExpiresAfter = currEpoch - 1
|
||||
const tombstoneExpiresAfter = currEpoch + 1000
|
||||
|
||||
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
||||
return []shard.Option{
|
||||
|
@ -172,7 +177,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
|||
ts.SetContainer(cnr)
|
||||
|
||||
var prm InhumePrm
|
||||
prm.WithTarget(ts, object.AddressOf(obj))
|
||||
prm.WithTarget(ts, tombstoneExpiresAfter, object.AddressOf(obj))
|
||||
err := engine.Inhume(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
@ -205,6 +210,8 @@ func BenchmarkInhumeMultipart(b *testing.B) {
|
|||
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||
b.StopTimer()
|
||||
|
||||
const tombstoneExpiresAfter = 1000 // doesn't matter, just big enough
|
||||
|
||||
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
|
||||
setShardsNum(b, numShards).prepare(b).engine
|
||||
defer func() { require.NoError(b, engine.Close(context.Background())) }()
|
||||
|
@ -234,7 +241,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
|||
ts.SetContainer(cnt)
|
||||
|
||||
prm := InhumePrm{}
|
||||
prm.WithTarget(ts, addrs...)
|
||||
prm.WithTarget(ts, tombstoneExpiresAfter, addrs...)
|
||||
|
||||
b.StartTimer()
|
||||
err := engine.Inhume(context.Background(), prm)
|
||||
|
|
|
@ -40,6 +40,7 @@ func TestLockUserScenario(t *testing.T) {
|
|||
// 5. waits for an epoch after the lock expiration one
|
||||
// 6. tries to inhume the object and expects success
|
||||
const lockerExpiresAfter = 13
|
||||
const tombstoneExpiresAfter = 1000
|
||||
|
||||
cnr := cidtest.ID()
|
||||
tombObj := testutil.GenerateObjectWithCID(cnr)
|
||||
|
@ -111,13 +112,15 @@ func TestLockUserScenario(t *testing.T) {
|
|||
|
||||
// 3.
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
|
||||
|
||||
var objLockedErr *apistatus.ObjectLocked
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorAs(t, err, &objLockedErr)
|
||||
|
||||
// 4.
|
||||
a.SetValue(strconv.Itoa(tombstoneExpiresAfter))
|
||||
|
||||
tombObj.SetType(objectSDK.TypeTombstone)
|
||||
tombObj.SetID(tombForLockID)
|
||||
tombObj.SetAttributes(a)
|
||||
|
@ -125,7 +128,7 @@ func TestLockUserScenario(t *testing.T) {
|
|||
err = Put(context.Background(), e, tombObj, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
||||
inhumePrm.WithTarget(tombForLockAddr, tombstoneExpiresAfter, lockerAddr)
|
||||
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||
|
@ -133,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
|
|||
// 5.
|
||||
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||
|
||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
|
@ -166,6 +169,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
const lockerExpiresAfter = 13
|
||||
const tombstoneExpiresAfter = 1000
|
||||
|
||||
cnr := cidtest.ID()
|
||||
var err error
|
||||
|
@ -197,7 +201,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
var inhumePrm InhumePrm
|
||||
tombAddr := oidtest.Address()
|
||||
tombAddr.SetContainer(cnr)
|
||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
|
||||
|
||||
var objLockedErr *apistatus.ObjectLocked
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
|
@ -209,7 +213,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
// 4.
|
||||
tombAddr = oidtest.Address()
|
||||
tombAddr.SetContainer(cnr)
|
||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
|
@ -273,7 +277,8 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorAs(t, err, &objLockedErr)
|
||||
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.WithTarget(oidtest.Address(), tombstoneExpEpoch, objectcore.AddressOf(obj))
|
||||
|
||||
err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorAs(t, err, &objLockedErr)
|
||||
|
|
|
@ -156,11 +156,13 @@ func TestCounters(t *testing.T) {
|
|||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
const tombstoneExpEpoch = 1008
|
||||
for _, o := range inhumedObjs {
|
||||
tombAddr := oidtest.Address()
|
||||
tombAddr.SetContainer(o.Container())
|
||||
|
||||
prm.SetTombstoneAddress(tombAddr)
|
||||
prm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
prm.SetAddresses(o)
|
||||
|
||||
res, err := db.Inhume(context.Background(), prm)
|
||||
|
@ -301,11 +303,13 @@ func TestCounters(t *testing.T) {
|
|||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
const tombstoneExpEpoch = 1008
|
||||
for _, o := range inhumedObjs {
|
||||
tombAddr := oidtest.Address()
|
||||
tombAddr.SetContainer(o.Container())
|
||||
|
||||
prm.SetTombstoneAddress(tombAddr)
|
||||
prm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
prm.SetAddresses(o)
|
||||
|
||||
_, err := db.Inhume(context.Background(), prm)
|
||||
|
|
|
@ -23,10 +23,13 @@ import (
|
|||
func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const currEpoch = 12
|
||||
const tombstoneExpEpoch = currEpoch + 1
|
||||
|
||||
db := New(
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{uint64(12)}),
|
||||
WithEpochState(epochState{uint64(currEpoch)}),
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
|
@ -82,6 +85,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
|||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(ecParentAddress)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -179,10 +183,13 @@ func TestDeleteECObject_WithSplit(t *testing.T) {
|
|||
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
|
||||
t.Parallel()
|
||||
|
||||
const currEpoch = 12
|
||||
const tombstoneExpEpoch = currEpoch + 1
|
||||
|
||||
db := New(
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{uint64(12)}),
|
||||
WithEpochState(epochState{currEpoch}),
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
|
@ -289,6 +296,7 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
|
|||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(inhumeAddresses...)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -91,8 +91,9 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
|
|||
// TombstonedObject represents descriptor of the
|
||||
// object that has been covered with tombstone.
|
||||
type TombstonedObject struct {
|
||||
addr oid.Address
|
||||
tomb oid.Address
|
||||
addr oid.Address
|
||||
tomb oid.Address
|
||||
expEpoch uint64
|
||||
}
|
||||
|
||||
// Address returns tombstoned object address.
|
||||
|
@ -106,6 +107,11 @@ func (g TombstonedObject) Tombstone() oid.Address {
|
|||
return g.tomb
|
||||
}
|
||||
|
||||
// ExpirationEpoch returns an expiration epoch of a tombstoned object.
|
||||
func (g TombstonedObject) ExpirationEpoch() uint64 {
|
||||
return g.expEpoch
|
||||
}
|
||||
|
||||
// TombstonedHandler is a TombstonedObject handling function.
|
||||
type TombstonedHandler func(object TombstonedObject) error
|
||||
|
||||
|
@ -249,7 +255,7 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
|||
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||
if err = decodeAddressFromKey(&res.addr, k); err != nil {
|
||||
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
||||
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
|
||||
} else if res.tomb, res.expEpoch, err = decodeTombstoneKeyWithExpEpoch(v); err != nil {
|
||||
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -144,8 +144,10 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
|
|||
addrTombstone := oidtest.Address()
|
||||
addrTombstone.SetContainer(cnr)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -232,10 +234,13 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
|
|||
addrTombstone.SetContainer(cnr)
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.SetAddresses(
|
||||
object.AddressOf(obj1), object.AddressOf(obj2),
|
||||
object.AddressOf(obj3), object.AddressOf(obj4))
|
||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -428,8 +433,11 @@ func TestDB_InhumeTombstones(t *testing.T) {
|
|||
addrTombstone := object.AddressOf(objTs)
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var errTombstoneExpEpochNotSet = errors.New("tombstone expiration epoch is not set")
|
||||
|
||||
// InhumePrm encapsulates parameters for Inhume operation.
|
||||
type InhumePrm struct {
|
||||
tomb *oid.Address
|
||||
|
@ -27,6 +29,9 @@ type InhumePrm struct {
|
|||
lockObjectHandling bool
|
||||
|
||||
forceRemoval bool
|
||||
|
||||
expEpoch uint64
|
||||
expEpochSet bool
|
||||
}
|
||||
|
||||
// DeletionInfo contains details on deleted object.
|
||||
|
@ -123,6 +128,15 @@ func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
|
|||
p.tomb = &addr
|
||||
}
|
||||
|
||||
// SetTombstoneExpEpoch sets the expiration epoch for a tombstone.
|
||||
//
|
||||
// Setting the expiration epoch is required when inhuming with a tombstone.
|
||||
// [meta.NoExpirationEpoch] may be used allowed, but only for testing purposes.
|
||||
func (p *InhumePrm) SetTombstoneExpEpoch(epoch uint64) {
|
||||
p.expEpoch = epoch
|
||||
p.expEpochSet = true
|
||||
}
|
||||
|
||||
// SetGCMark marks the object to be physically removed.
|
||||
//
|
||||
// Should not be called along with SetTombstoneAddress.
|
||||
|
@ -148,6 +162,9 @@ func (p *InhumePrm) validate() error {
|
|||
return nil
|
||||
}
|
||||
if p.tomb != nil {
|
||||
if !p.expEpochSet {
|
||||
return errTombstoneExpEpochNotSet
|
||||
}
|
||||
for _, addr := range p.target {
|
||||
if addr.Container() != p.tomb.Container() {
|
||||
return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb)
|
||||
|
@ -365,7 +382,10 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
|||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||
if prm.tomb != nil {
|
||||
targetBucket = graveyardBKT
|
||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||
tombKey := make([]byte, addressKeySize+epochSize)
|
||||
if err = encodeTombstoneKeyWithExpEpoch(*prm.tomb, prm.expEpoch, tombKey); err != nil {
|
||||
return nil, nil, fmt.Errorf("encode tombstone key with expiration epoch: %w", err)
|
||||
}
|
||||
|
||||
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
||||
// so graveyard keys must not be addresses of tombstones
|
||||
|
@ -376,6 +396,14 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
|
|||
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
// it can be a tombstone key without an expiration epoch
|
||||
data = targetBucket.Get(tombKey[:addressKeySize])
|
||||
if data != nil {
|
||||
err := targetBucket.Delete(tombKey[:addressKeySize])
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
value = tombKey
|
||||
} else {
|
||||
|
@ -418,6 +446,9 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
|
|||
func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
|
||||
targetIsTomb := false
|
||||
|
||||
// take only address because graveyard record may have expiration epoch suffix
|
||||
addressKey = addressKey[:addressKeySize]
|
||||
|
||||
// iterate over graveyard and check if target address
|
||||
// is the address of tombstone in graveyard.
|
||||
// tombstone must have the same container ID as key.
|
||||
|
@ -426,7 +457,7 @@ func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
|
|||
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
|
||||
// check if graveyard has record with key corresponding
|
||||
// to tombstone address (at least one)
|
||||
targetIsTomb = bytes.Equal(v, addressKey)
|
||||
targetIsTomb = bytes.HasPrefix(v, addressKey)
|
||||
if targetIsTomb {
|
||||
break
|
||||
}
|
||||
|
|
|
@ -94,10 +94,12 @@ func TestInhumeECObject(t *testing.T) {
|
|||
require.True(t, res.deletionDetails[0].Size == 5)
|
||||
|
||||
// inhume EC parent (like Delete does)
|
||||
const tombstoneExpEpoch = 1008
|
||||
tombAddress.SetContainer(cnr)
|
||||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(ecParentAddress)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
res, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
// Previously deleted chunk shouldn't be in the details, because it is marked as garbage
|
||||
|
|
|
@ -50,6 +50,8 @@ func TestInhumeTombOnTomb(t *testing.T) {
|
|||
inhumePrm meta.InhumePrm
|
||||
existsPrm meta.ExistsPrm
|
||||
)
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
|
||||
addr1.SetContainer(cnr)
|
||||
addr2.SetContainer(cnr)
|
||||
|
@ -124,12 +126,15 @@ func TestInhumeLocked(t *testing.T) {
|
|||
}
|
||||
|
||||
func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error {
|
||||
const tombstoneExpEpoch = 1008
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(target)
|
||||
var tombAddr oid.Address
|
||||
tombAddr.SetContainer(target.Container())
|
||||
tombAddr.SetObject(tomb)
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
|
||||
_, err := db.Inhume(context.Background(), inhumePrm)
|
||||
return err
|
||||
|
|
|
@ -62,6 +62,8 @@ func TestDB_Lock(t *testing.T) {
|
|||
objAddr := objectcore.AddressOf(objs[0])
|
||||
lockAddr := objectcore.AddressOf(lockObj)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetGCMark()
|
||||
|
||||
|
@ -76,6 +78,7 @@ func TestDB_Lock(t *testing.T) {
|
|||
tombAddr := oidtest.Address()
|
||||
tombAddr.SetContainer(objAddr.Container())
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorAs(t, err, &objLockedErr)
|
||||
|
||||
|
@ -94,6 +97,7 @@ func TestDB_Lock(t *testing.T) {
|
|||
tombAddr = oidtest.Address()
|
||||
tombAddr.SetContainer(objAddr.Container())
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetTombstoneExpEpoch(tombstoneExpEpoch)
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.ErrorAs(t, err, &objLockedErr)
|
||||
})
|
||||
|
|
|
@ -309,3 +309,56 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
|||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
||||
objectKey(obj, make([]byte, objectKeySize)))
|
||||
}
|
||||
|
||||
const NoExpirationEpoch uint64 = 0
|
||||
|
||||
// encodeTombstoneKeyWithExpEpoch encodes a tombstone key of a tombstoned
|
||||
// object if the following format: tombstone address + expiration epoch.
|
||||
//
|
||||
// Returns an error if the buffer length isn't 32.
|
||||
//
|
||||
// The expiration epoch shouldn't be [NoExpirationEpoch], as tombstone keys
|
||||
// are intended to have a valid expiration epoch.
|
||||
//
|
||||
// The use of [NoExpirationEpoch] is allowed only for test purposes.
|
||||
func encodeTombstoneKeyWithExpEpoch(addr oid.Address, expEpoch uint64, dst []byte) error {
|
||||
if len(dst) != addressKeySize+epochSize {
|
||||
return errInvalidLength
|
||||
}
|
||||
|
||||
addr.Container().Encode(dst[:cidSize])
|
||||
addr.Object().Encode(dst[cidSize:addressKeySize])
|
||||
binary.LittleEndian.PutUint64(dst[addressKeySize:], expEpoch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeTombstoneKeyWithExpEpoch decodes a tombstone key of a tombstoned object.
|
||||
// The tombstone key may have one of the following formats:
|
||||
// - tombstone address
|
||||
// - tombstone address + expiration epoch
|
||||
//
|
||||
// Expiration epoch is set to [NoExpirationEpoch] if the key doesn't have it.
|
||||
func decodeTombstoneKeyWithExpEpoch(src []byte) (addr oid.Address, expEpoch uint64, err error) {
|
||||
if len(src) != addressKeySize && len(src) != addressKeySize+epochSize {
|
||||
return oid.Address{}, 0, errInvalidLength
|
||||
}
|
||||
|
||||
var cnt cid.ID
|
||||
if err := cnt.Decode(src[:cidSize]); err != nil {
|
||||
return addr, 0, err
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(src[cidSize:addressKeySize]); err != nil {
|
||||
return addr, 0, err
|
||||
}
|
||||
addr.SetContainer(cnt)
|
||||
addr.SetObject(obj)
|
||||
|
||||
if len(src) > addressKeySize {
|
||||
expEpoch = binary.LittleEndian.Uint64(src[addressKeySize:])
|
||||
} else {
|
||||
expEpoch = NoExpirationEpoch
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -334,6 +334,11 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
|||
}
|
||||
|
||||
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
expEpoch, ok := object.ExpirationEpoch(obj)
|
||||
if !ok {
|
||||
return fmt.Errorf("tombstone %s has no expiration epoch", object.AddressOf(obj))
|
||||
}
|
||||
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
|
@ -354,6 +359,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
|||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetTombstoneExpEpoch(expEpoch)
|
||||
inhumePrm.SetAddresses(tombMembers...)
|
||||
|
||||
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -234,8 +236,15 @@ func TestRefillMetabase(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
expirationEpoch := 1008
|
||||
|
||||
expirationAttr := *objectSDK.NewAttribute()
|
||||
expirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||
expirationAttr.SetValue(strconv.Itoa(expirationEpoch))
|
||||
|
||||
tombObj := objecttest.Object()
|
||||
tombObj.SetType(objectSDK.TypeTombstone)
|
||||
tombObj.SetAttributes(expirationAttr)
|
||||
|
||||
tombstone := objecttest.Tombstone()
|
||||
|
||||
|
@ -276,6 +285,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
|
||||
lockObj := objecttest.Object()
|
||||
lockObj.SetContainerID(cnrLocked)
|
||||
lockObj.SetAttributes(expirationAttr)
|
||||
objectSDK.WriteLock(lockObj, lock)
|
||||
|
||||
putPrm.SetObject(lockObj)
|
||||
|
@ -286,7 +296,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
||||
inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...)
|
||||
|
||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -19,6 +19,7 @@ type InhumePrm struct {
|
|||
target []oid.Address
|
||||
tombstone *oid.Address
|
||||
forceRemoval bool
|
||||
expEpoch uint64
|
||||
}
|
||||
|
||||
// InhumeRes encapsulates results of inhume operation.
|
||||
|
@ -29,9 +30,10 @@ type InhumeRes struct{}
|
|||
//
|
||||
// tombstone should not be nil, addr should not be empty.
|
||||
// Should not be called along with MarkAsGarbage.
|
||||
func (p *InhumePrm) SetTarget(tombstone oid.Address, addrs ...oid.Address) {
|
||||
func (p *InhumePrm) SetTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
|
||||
p.target = addrs
|
||||
p.tombstone = &tombstone
|
||||
p.expEpoch = expEpoch
|
||||
}
|
||||
|
||||
// MarkAsGarbage marks object to be physically removed from shard.
|
||||
|
@ -93,6 +95,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
|
||||
if prm.tombstone != nil {
|
||||
metaPrm.SetTombstoneAddress(*prm.tombstone)
|
||||
metaPrm.SetTombstoneExpEpoch(prm.expEpoch)
|
||||
} else {
|
||||
metaPrm.SetGCMark()
|
||||
}
|
||||
|
|
|
@ -40,7 +40,8 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
|
|||
putPrm.SetObject(obj)
|
||||
|
||||
var inhPrm InhumePrm
|
||||
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
|
||||
const tombstoneExpEpoch = 1008
|
||||
inhPrm.SetTarget(object.AddressOf(ts), tombstoneExpEpoch, object.AddressOf(obj))
|
||||
|
||||
var getPrm GetPrm
|
||||
getPrm.SetAddress(object.AddressOf(obj))
|
||||
|
|
|
@ -89,11 +89,13 @@ func TestShard_Lock(t *testing.T) {
|
|||
_, err = sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
|
||||
t.Run("inhuming locked objects", func(t *testing.T) {
|
||||
ts := testutil.GenerateObjectWithCID(cnr)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
|
||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(obj))
|
||||
|
||||
var objLockedErr *apistatus.ObjectLocked
|
||||
|
||||
|
@ -109,7 +111,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
ts := testutil.GenerateObjectWithCID(cnr)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
|
||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), tombstoneExpEpoch, objectcore.AddressOf(lock))
|
||||
|
||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||
require.Error(t, err)
|
||||
|
|
|
@ -314,11 +314,13 @@ func TestCounters(t *testing.T) {
|
|||
logic := mm.getObjectCounter(logical)
|
||||
custom := mm.getObjectCounter(user)
|
||||
|
||||
const tombstoneExpEpoch = 1008
|
||||
|
||||
inhumedNumber := int(phy / 4)
|
||||
for _, o := range addrFromObjs(oo[:inhumedNumber]) {
|
||||
ts := oidtest.Address()
|
||||
ts.SetContainer(o.Container())
|
||||
prm.SetTarget(ts, o)
|
||||
prm.SetTarget(ts, tombstoneExpEpoch, o)
|
||||
_, err := sh.Inhume(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package writer
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
|
@ -11,6 +12,8 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
var errObjectHasNoExpirationEpoch = errors.New("object has no expiration epoch")
|
||||
|
||||
// ObjectStorage is an object storage interface.
|
||||
type ObjectStorage interface {
|
||||
// Put must save passed object
|
||||
|
@ -18,7 +21,7 @@ type ObjectStorage interface {
|
|||
Put(context.Context, *objectSDK.Object, bool) error
|
||||
// Delete must delete passed objects
|
||||
// and return any appeared error.
|
||||
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
|
||||
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error
|
||||
// Lock must lock passed objects
|
||||
// and return any appeared error.
|
||||
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error
|
||||
|
@ -38,7 +41,12 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
|||
|
||||
switch meta.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
expEpoch, ok := objectCore.ExpirationEpoch(obj)
|
||||
if !ok {
|
||||
return errObjectHasNoExpirationEpoch
|
||||
}
|
||||
|
||||
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects(), expEpoch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||
}
|
||||
|
|
|
@ -186,6 +186,7 @@ func PopulateGraveyard(
|
|||
prm := meta.InhumePrm{}
|
||||
prm.SetAddresses(addr)
|
||||
prm.SetTombstoneAddress(tsAddr)
|
||||
prm.SetTombstoneExpEpoch(rand.Uint64())
|
||||
|
||||
group.Go(func() error {
|
||||
if _, err := db.Inhume(ctx, prm); err != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue