[#1445] local_object_storage: Append expiration epoch to tombstones
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
c9c51bdb23
commit
17d2dc5341
21 changed files with 177 additions and 67 deletions
|
@ -465,7 +465,7 @@ func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Ad
|
||||||
return e.engine.IsLocked(ctx, address)
|
return e.engine.IsLocked(ctx, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
|
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error {
|
||||||
var prm engine.InhumePrm
|
var prm engine.InhumePrm
|
||||||
|
|
||||||
addrs := make([]oid.Address, len(toDelete))
|
addrs := make([]oid.Address, len(toDelete))
|
||||||
|
@ -474,7 +474,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
||||||
addrs[i].SetObject(toDelete[i])
|
addrs[i].SetObject(toDelete[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
prm.WithTarget(tombstone, addrs...)
|
prm.WithTarget(tombstone, expEpoch, addrs...)
|
||||||
|
|
||||||
_, err := e.engine.Inhume(ctx, prm)
|
_, err := e.engine.Inhume(ctx, prm)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -25,6 +25,7 @@ type InhumePrm struct {
|
||||||
addrs []oid.Address
|
addrs []oid.Address
|
||||||
|
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
|
expEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeRes encapsulates results of inhume operation.
|
// InhumeRes encapsulates results of inhume operation.
|
||||||
|
@ -35,9 +36,10 @@ type InhumeRes struct{}
|
||||||
//
|
//
|
||||||
// tombstone should not be nil, addr should not be empty.
|
// tombstone should not be nil, addr should not be empty.
|
||||||
// Should not be called along with MarkAsGarbage.
|
// Should not be called along with MarkAsGarbage.
|
||||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
|
func (p *InhumePrm) WithTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
|
||||||
p.addrs = addrs
|
p.addrs = addrs
|
||||||
p.tombstone = &tombstone
|
p.tombstone = &tombstone
|
||||||
|
p.expEpoch = expEpoch
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkAsGarbage marks an object to be physically removed from local storage.
|
// MarkAsGarbage marks an object to be physically removed from local storage.
|
||||||
|
@ -95,7 +97,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
|
|
||||||
for shardID, addrs := range addrsPerShard {
|
for shardID, addrs := range addrsPerShard {
|
||||||
if prm.tombstone != nil {
|
if prm.tombstone != nil {
|
||||||
shPrm.SetTarget(*prm.tombstone, addrs...)
|
shPrm.SetTarget(*prm.tombstone, prm.expEpoch, addrs...)
|
||||||
} else {
|
} else {
|
||||||
shPrm.MarkAsGarbage(addrs...)
|
shPrm.MarkAsGarbage(addrs...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -53,7 +54,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, rand.Uint64(), object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -83,7 +84,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, rand.Uint64(), object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -127,7 +128,7 @@ func TestStorageEngine_ECInhume(t *testing.T) {
|
||||||
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
inhumePrm.WithTarget(tombstoneObjectAddress, rand.Uint64(), parentObjectAddress)
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -143,6 +144,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
|
|
||||||
const currEpoch = 42
|
const currEpoch = 42
|
||||||
const objectExpiresAfter = currEpoch - 1
|
const objectExpiresAfter = currEpoch - 1
|
||||||
|
const tombstoneExpiresAfter = currEpoch + 1000
|
||||||
|
|
||||||
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
|
@ -172,7 +174,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
ts.SetContainer(cnr)
|
ts.SetContainer(cnr)
|
||||||
|
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
prm.WithTarget(ts, object.AddressOf(obj))
|
prm.WithTarget(ts, tombstoneExpiresAfter, object.AddressOf(obj))
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
_, err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
@ -205,6 +207,8 @@ func BenchmarkInhumeMultipart(b *testing.B) {
|
||||||
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
|
|
||||||
|
const tombstoneExpiresAfter = 1000 // doesn't matter, just big enough
|
||||||
|
|
||||||
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
|
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
|
||||||
setShardsNum(b, numShards).prepare(b).engine
|
setShardsNum(b, numShards).prepare(b).engine
|
||||||
defer func() { require.NoError(b, engine.Close(context.Background())) }()
|
defer func() { require.NoError(b, engine.Close(context.Background())) }()
|
||||||
|
@ -234,7 +238,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||||
ts.SetContainer(cnt)
|
ts.SetContainer(cnt)
|
||||||
|
|
||||||
prm := InhumePrm{}
|
prm := InhumePrm{}
|
||||||
prm.WithTarget(ts, addrs...)
|
prm.WithTarget(ts, tombstoneExpiresAfter, addrs...)
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
_, err := engine.Inhume(context.Background(), prm)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -40,6 +41,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
// 5. waits for an epoch after the lock expiration one
|
// 5. waits for an epoch after the lock expiration one
|
||||||
// 6. tries to inhume the object and expects success
|
// 6. tries to inhume the object and expects success
|
||||||
const lockerExpiresAfter = 13
|
const lockerExpiresAfter = 13
|
||||||
|
const tombstoneExpiresAfter = 1000
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
tombObj := testutil.GenerateObjectWithCID(cnr)
|
tombObj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
@ -111,7 +113,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -125,7 +127,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
err = Put(context.Background(), e, tombObj, false)
|
err = Put(context.Background(), e, tombObj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
inhumePrm.WithTarget(tombForLockAddr, tombstoneExpiresAfter, lockerAddr)
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||||
|
@ -133,7 +135,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
// 5.
|
// 5.
|
||||||
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr)
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -166,6 +168,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
const lockerExpiresAfter = 13
|
const lockerExpiresAfter = 13
|
||||||
|
const tombstoneExpiresAfter = 1000
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
var err error
|
var err error
|
||||||
|
@ -197,7 +200,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
tombAddr := oidtest.Address()
|
tombAddr := oidtest.Address()
|
||||||
tombAddr.SetContainer(cnr)
|
tombAddr.SetContainer(cnr)
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -209,7 +212,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
// 4.
|
// 4.
|
||||||
tombAddr = oidtest.Address()
|
tombAddr = oidtest.Address()
|
||||||
tombAddr.SetContainer(cnr)
|
tombAddr.SetContainer(cnr)
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -273,7 +276,7 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(oidtest.Address(), rand.Uint64(), objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -160,7 +161,7 @@ func TestCounters(t *testing.T) {
|
||||||
tombAddr := oidtest.Address()
|
tombAddr := oidtest.Address()
|
||||||
tombAddr.SetContainer(o.Container())
|
tombAddr.SetContainer(o.Container())
|
||||||
|
|
||||||
prm.SetTombstoneAddress(tombAddr)
|
prm.SetTombstoneAddress(tombAddr, rand.Uint64())
|
||||||
prm.SetAddresses(o)
|
prm.SetAddresses(o)
|
||||||
|
|
||||||
res, err := db.Inhume(context.Background(), prm)
|
res, err := db.Inhume(context.Background(), prm)
|
||||||
|
@ -305,7 +306,7 @@ func TestCounters(t *testing.T) {
|
||||||
tombAddr := oidtest.Address()
|
tombAddr := oidtest.Address()
|
||||||
tombAddr.SetContainer(o.Container())
|
tombAddr.SetContainer(o.Container())
|
||||||
|
|
||||||
prm.SetTombstoneAddress(tombAddr)
|
prm.SetTombstoneAddress(tombAddr, rand.Uint64())
|
||||||
prm.SetAddresses(o)
|
prm.SetAddresses(o)
|
||||||
|
|
||||||
_, err := db.Inhume(context.Background(), prm)
|
_, err := db.Inhume(context.Background(), prm)
|
||||||
|
|
|
@ -23,10 +23,13 @@ import (
|
||||||
func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
const currEpoch = 12
|
||||||
|
const tombstoneExpEpoch = currEpoch + 1
|
||||||
|
|
||||||
db := New(
|
db := New(
|
||||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
WithPermissions(0o600),
|
WithPermissions(0o600),
|
||||||
WithEpochState(epochState{uint64(12)}),
|
WithEpochState(epochState{uint64(currEpoch)}),
|
||||||
)
|
)
|
||||||
|
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
@ -81,7 +84,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||||
tombAddress.SetContainer(cnr)
|
tombAddress.SetContainer(cnr)
|
||||||
tombAddress.SetObject(tombstoneID)
|
tombAddress.SetObject(tombstoneID)
|
||||||
inhumePrm.SetAddresses(ecParentAddress)
|
inhumePrm.SetAddresses(ecParentAddress)
|
||||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
inhumePrm.SetTombstoneAddress(tombAddress, tombstoneExpEpoch)
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -179,10 +182,13 @@ func TestDeleteECObject_WithSplit(t *testing.T) {
|
||||||
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
|
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
const currEpoch = 12
|
||||||
|
const tombstoneExpEpoch = currEpoch + 1
|
||||||
|
|
||||||
db := New(
|
db := New(
|
||||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
WithPermissions(0o600),
|
WithPermissions(0o600),
|
||||||
WithEpochState(epochState{uint64(12)}),
|
WithEpochState(epochState{currEpoch}),
|
||||||
)
|
)
|
||||||
|
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
@ -288,7 +294,7 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
|
||||||
tombAddress.SetContainer(cnr)
|
tombAddress.SetContainer(cnr)
|
||||||
tombAddress.SetObject(tombstoneID)
|
tombAddress.SetObject(tombstoneID)
|
||||||
inhumePrm.SetAddresses(inhumeAddresses...)
|
inhumePrm.SetAddresses(inhumeAddresses...)
|
||||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
inhumePrm.SetTombstoneAddress(tombAddress, tombstoneExpEpoch)
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -93,6 +93,7 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
|
||||||
type TombstonedObject struct {
|
type TombstonedObject struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
tomb oid.Address
|
tomb oid.Address
|
||||||
|
expEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Address returns tombstoned object address.
|
// Address returns tombstoned object address.
|
||||||
|
@ -106,6 +107,11 @@ func (g TombstonedObject) Tombstone() oid.Address {
|
||||||
return g.tomb
|
return g.tomb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExpirationEpoch returns an expiration epoch of a tombstone.
|
||||||
|
func (g TombstonedObject) ExpirationEpoch() uint64 {
|
||||||
|
return g.expEpoch
|
||||||
|
}
|
||||||
|
|
||||||
// TombstonedHandler is a TombstonedObject handling function.
|
// TombstonedHandler is a TombstonedObject handling function.
|
||||||
type TombstonedHandler func(object TombstonedObject) error
|
type TombstonedHandler func(object TombstonedObject) error
|
||||||
|
|
||||||
|
@ -249,7 +255,7 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||||
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||||
if err = decodeAddressFromKey(&res.addr, k); err != nil {
|
if err = decodeAddressFromKey(&res.addr, k); err != nil {
|
||||||
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
||||||
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
|
} else if err = decodeTombstoneWithExpEpoch(&res.tomb, &res.expEpoch, v); err != nil {
|
||||||
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -145,7 +146,7 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
|
||||||
addrTombstone.SetContainer(cnr)
|
addrTombstone.SetContainer(cnr)
|
||||||
|
|
||||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
inhumePrm.SetTombstoneAddress(addrTombstone, rand.Uint64())
|
||||||
|
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -235,7 +236,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
|
||||||
inhumePrm.SetAddresses(
|
inhumePrm.SetAddresses(
|
||||||
object.AddressOf(obj1), object.AddressOf(obj2),
|
object.AddressOf(obj1), object.AddressOf(obj2),
|
||||||
object.AddressOf(obj3), object.AddressOf(obj4))
|
object.AddressOf(obj3), object.AddressOf(obj4))
|
||||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
inhumePrm.SetTombstoneAddress(addrTombstone, rand.Uint64())
|
||||||
|
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -429,7 +430,7 @@ func TestDB_InhumeTombstones(t *testing.T) {
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||||
inhumePrm.SetTombstoneAddress(addrTombstone)
|
inhumePrm.SetTombstoneAddress(addrTombstone, rand.Uint64())
|
||||||
|
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -27,6 +27,8 @@ type InhumePrm struct {
|
||||||
lockObjectHandling bool
|
lockObjectHandling bool
|
||||||
|
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
|
|
||||||
|
expEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletionInfo contains details on deleted object.
|
// DeletionInfo contains details on deleted object.
|
||||||
|
@ -119,8 +121,9 @@ func (p *InhumePrm) SetAddresses(addrs ...oid.Address) {
|
||||||
//
|
//
|
||||||
// addr should not be nil.
|
// addr should not be nil.
|
||||||
// Should not be called along with SetGCMark.
|
// Should not be called along with SetGCMark.
|
||||||
func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
|
func (p *InhumePrm) SetTombstoneAddress(addr oid.Address, expEpoch uint64) {
|
||||||
p.tomb = &addr
|
p.tomb = &addr
|
||||||
|
p.expEpoch = expEpoch
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetGCMark marks the object to be physically removed.
|
// SetGCMark marks the object to be physically removed.
|
||||||
|
@ -297,6 +300,8 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This condition should be checked in the beginning because
|
||||||
|
// of a possible race with GC.
|
||||||
if isLockObject(tx, cnr, id) {
|
if isLockObject(tx, cnr, id) {
|
||||||
res.deletedLockObj = append(res.deletedLockObj, addr)
|
res.deletedLockObj = append(res.deletedLockObj, addr)
|
||||||
}
|
}
|
||||||
|
@ -363,26 +368,26 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
// a Tombstone
|
// a Tombstone
|
||||||
// 2. zeroValue if Inhume was called with a GC mark
|
// 2. zeroValue if Inhume was called with a GC mark
|
||||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||||
if prm.tomb != nil {
|
if prm.tomb == nil {
|
||||||
targetBucket = graveyardBKT
|
return garbageBKT, zeroValue, nil
|
||||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
}
|
||||||
|
|
||||||
|
tombKey := make([]byte, addressKeySize+epochSize)
|
||||||
|
if err = encodeTombstoneWithExpEpoch(*prm.tomb, prm.expEpoch, tombKey); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("encode tombstone with expiration epoch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
||||||
// so graveyard keys must not be addresses of tombstones
|
// so graveyard keys must not be addresses of tombstones
|
||||||
data := targetBucket.Get(tombKey)
|
if err := graveyardBKT.Delete(tombKey); err != nil {
|
||||||
if data != nil {
|
|
||||||
err := targetBucket.Delete(tombKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||||
}
|
}
|
||||||
|
// because it can be a tombstone without expiration epoch (old tombstone format)
|
||||||
|
if err := graveyardBKT.Delete(tombKey[:addressKeySize]); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
value = tombKey
|
return graveyardBKT, tombKey, nil
|
||||||
} else {
|
|
||||||
targetBucket = garbageBKT
|
|
||||||
value = zeroValue
|
|
||||||
}
|
|
||||||
return targetBucket, value, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte) (bool, error) {
|
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte) (bool, error) {
|
||||||
|
@ -418,6 +423,9 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
|
||||||
func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
|
func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
|
||||||
targetIsTomb := false
|
targetIsTomb := false
|
||||||
|
|
||||||
|
// because it can contain an expiration epoch (new tombstone format)
|
||||||
|
addressKey = addressKey[:addressKeySize]
|
||||||
|
|
||||||
// iterate over graveyard and check if target address
|
// iterate over graveyard and check if target address
|
||||||
// is the address of tombstone in graveyard.
|
// is the address of tombstone in graveyard.
|
||||||
// tombstone must have the same container ID as key.
|
// tombstone must have the same container ID as key.
|
||||||
|
@ -426,7 +434,7 @@ func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
|
||||||
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
|
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
|
||||||
// check if graveyard has record with key corresponding
|
// check if graveyard has record with key corresponding
|
||||||
// to tombstone address (at least one)
|
// to tombstone address (at least one)
|
||||||
targetIsTomb = bytes.Equal(v, addressKey)
|
targetIsTomb = bytes.Equal(v[:addressKeySize], addressKey)
|
||||||
if targetIsTomb {
|
if targetIsTomb {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -97,7 +98,7 @@ func TestInhumeECObject(t *testing.T) {
|
||||||
tombAddress.SetContainer(cnr)
|
tombAddress.SetContainer(cnr)
|
||||||
tombAddress.SetObject(tombstoneID)
|
tombAddress.SetObject(tombstoneID)
|
||||||
inhumePrm.SetAddresses(ecParentAddress)
|
inhumePrm.SetAddresses(ecParentAddress)
|
||||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
inhumePrm.SetTombstoneAddress(tombAddress, rand.Uint64())
|
||||||
res, err = db.Inhume(context.Background(), inhumePrm)
|
res, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Previously deleted chunk shouldn't be in the details, because it is marked as garbage
|
// Previously deleted chunk shouldn't be in the details, because it is marked as garbage
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -57,7 +58,7 @@ func TestInhumeTombOnTomb(t *testing.T) {
|
||||||
addr4.SetContainer(cnr)
|
addr4.SetContainer(cnr)
|
||||||
|
|
||||||
inhumePrm.SetAddresses(addr1)
|
inhumePrm.SetAddresses(addr1)
|
||||||
inhumePrm.SetTombstoneAddress(addr2)
|
inhumePrm.SetTombstoneAddress(addr2, rand.Uint64())
|
||||||
|
|
||||||
// inhume addr1 via addr2
|
// inhume addr1 via addr2
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -70,7 +71,7 @@ func TestInhumeTombOnTomb(t *testing.T) {
|
||||||
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
||||||
|
|
||||||
inhumePrm.SetAddresses(addr3)
|
inhumePrm.SetAddresses(addr3)
|
||||||
inhumePrm.SetTombstoneAddress(addr1)
|
inhumePrm.SetTombstoneAddress(addr1, rand.Uint64())
|
||||||
|
|
||||||
// try to inhume addr3 via addr1
|
// try to inhume addr3 via addr1
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -90,7 +91,7 @@ func TestInhumeTombOnTomb(t *testing.T) {
|
||||||
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
||||||
|
|
||||||
inhumePrm.SetAddresses(addr1)
|
inhumePrm.SetAddresses(addr1)
|
||||||
inhumePrm.SetTombstoneAddress(addr4)
|
inhumePrm.SetTombstoneAddress(addr4, rand.Uint64())
|
||||||
|
|
||||||
// try to inhume addr1 (which is already a tombstone in graveyard)
|
// try to inhume addr1 (which is already a tombstone in graveyard)
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -129,7 +130,7 @@ func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error {
|
||||||
var tombAddr oid.Address
|
var tombAddr oid.Address
|
||||||
tombAddr.SetContainer(target.Container())
|
tombAddr.SetContainer(target.Container())
|
||||||
tombAddr.SetObject(tomb)
|
tombAddr.SetObject(tomb)
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
inhumePrm.SetTombstoneAddress(tombAddr, rand.Uint64())
|
||||||
|
|
||||||
_, err := db.Inhume(context.Background(), inhumePrm)
|
_, err := db.Inhume(context.Background(), inhumePrm)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -75,7 +76,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
|
|
||||||
tombAddr := oidtest.Address()
|
tombAddr := oidtest.Address()
|
||||||
tombAddr.SetContainer(objAddr.Container())
|
tombAddr.SetContainer(objAddr.Container())
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
inhumePrm.SetTombstoneAddress(tombAddr, rand.Uint64())
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
|
@ -93,7 +94,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
|
|
||||||
tombAddr = oidtest.Address()
|
tombAddr = oidtest.Address()
|
||||||
tombAddr.SetContainer(objAddr.Container())
|
tombAddr.SetContainer(objAddr.Container())
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
inhumePrm.SetTombstoneAddress(tombAddr, rand.Uint64())
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
})
|
})
|
||||||
|
|
|
@ -309,3 +309,57 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
||||||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
||||||
objectKey(obj, make([]byte, objectKeySize)))
|
objectKey(obj, make([]byte, objectKeySize)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const NoExpirationEpoch uint64 = 0
|
||||||
|
|
||||||
|
// encodeTombstoneWithExpEpoch encodes a tombstone label in the following
|
||||||
|
// format: tombstone_address + expiration_epoch.
|
||||||
|
//
|
||||||
|
// Returns an error if the buffer length isn't 32.
|
||||||
|
//
|
||||||
|
// The expiration epoch shouldn't be [NoExpirationEpoch], as tombstone labels
|
||||||
|
// are intended to have a valid expiration epoch.
|
||||||
|
//
|
||||||
|
// The use of [NoExpirationEpoch] is allowed only for test purposes.
|
||||||
|
func encodeTombstoneWithExpEpoch(addr oid.Address, expEpoch uint64, dst []byte) error {
|
||||||
|
if len(dst) != addressKeySize+epochSize {
|
||||||
|
return errInvalidLength
|
||||||
|
}
|
||||||
|
|
||||||
|
addr.Container().Encode(dst[:cidSize])
|
||||||
|
addr.Object().Encode(dst[cidSize:addressKeySize])
|
||||||
|
binary.LittleEndian.PutUint64(dst[addressKeySize:], expEpoch)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeTombstoneWithExpEpoch decodes a tombstone label in the following
|
||||||
|
// formats: tombstone address or tombstone address + expiration epoch.
|
||||||
|
//
|
||||||
|
// Expiration epoch is set to [NoExpirationEpoch] if the label doesn't have one.
|
||||||
|
func decodeTombstoneWithExpEpoch(addr *oid.Address, expEpoch *uint64, src []byte) error {
|
||||||
|
if len(src) != addressKeySize && len(src) != addressKeySize+epochSize {
|
||||||
|
return errInvalidLength
|
||||||
|
}
|
||||||
|
|
||||||
|
var cnt cid.ID
|
||||||
|
if err := cnt.Decode(src[:cidSize]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(src[cidSize:addressKeySize]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
addr.SetContainer(cnt)
|
||||||
|
addr.SetObject(obj)
|
||||||
|
|
||||||
|
if len(src) > addressKeySize {
|
||||||
|
*expEpoch = binary.LittleEndian.Uint64(src[addressKeySize:])
|
||||||
|
} else {
|
||||||
|
*expEpoch = NoExpirationEpoch
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -334,6 +334,14 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
has, expEpoch, err := object.GetExpirationEpoch(obj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get tombstone %s expiration epoch: %w", object.AddressOf(obj), err)
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
return fmt.Errorf("tombstone %s has no expiration epoch", object.AddressOf(obj))
|
||||||
|
}
|
||||||
|
|
||||||
tombstone := objectSDK.NewTombstone()
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
|
||||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||||
|
@ -353,10 +361,10 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
inhumePrm.SetTombstoneAddress(tombAddr, expEpoch)
|
||||||
inhumePrm.SetAddresses(tombMembers...)
|
inhumePrm.SetAddresses(tombMembers...)
|
||||||
|
|
||||||
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
_, err = s.metaBase.Inhume(ctx, inhumePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not inhume objects: %w", err)
|
return fmt.Errorf("could not inhume objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,7 +297,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...)
|
||||||
|
|
||||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -20,6 +20,7 @@ type InhumePrm struct {
|
||||||
target []oid.Address
|
target []oid.Address
|
||||||
tombstone *oid.Address
|
tombstone *oid.Address
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
|
expEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeRes encapsulates results of inhume operation.
|
// InhumeRes encapsulates results of inhume operation.
|
||||||
|
@ -30,9 +31,10 @@ type InhumeRes struct{}
|
||||||
//
|
//
|
||||||
// tombstone should not be nil, addr should not be empty.
|
// tombstone should not be nil, addr should not be empty.
|
||||||
// Should not be called along with MarkAsGarbage.
|
// Should not be called along with MarkAsGarbage.
|
||||||
func (p *InhumePrm) SetTarget(tombstone oid.Address, addrs ...oid.Address) {
|
func (p *InhumePrm) SetTarget(tombstone oid.Address, expEpoch uint64, addrs ...oid.Address) {
|
||||||
p.target = addrs
|
p.target = addrs
|
||||||
p.tombstone = &tombstone
|
p.tombstone = &tombstone
|
||||||
|
p.expEpoch = expEpoch
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkAsGarbage marks object to be physically removed from shard.
|
// MarkAsGarbage marks object to be physically removed from shard.
|
||||||
|
@ -93,7 +95,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
metaPrm.SetLockObjectHandling()
|
metaPrm.SetLockObjectHandling()
|
||||||
|
|
||||||
if prm.tombstone != nil {
|
if prm.tombstone != nil {
|
||||||
metaPrm.SetTombstoneAddress(*prm.tombstone)
|
metaPrm.SetTombstoneAddress(*prm.tombstone, prm.expEpoch)
|
||||||
} else {
|
} else {
|
||||||
metaPrm.SetGCMark()
|
metaPrm.SetGCMark()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -40,7 +41,7 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
var inhPrm InhumePrm
|
var inhPrm InhumePrm
|
||||||
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
|
inhPrm.SetTarget(object.AddressOf(ts), rand.Uint64(), object.AddressOf(obj))
|
||||||
|
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(object.AddressOf(obj))
|
getPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -93,7 +94,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
ts := testutil.GenerateObjectWithCID(cnr)
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
|
inhumePrm.SetTarget(objectcore.AddressOf(ts), rand.Uint64(), objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
|
|
||||||
|
@ -109,7 +110,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
ts := testutil.GenerateObjectWithCID(cnr)
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
|
inhumePrm.SetTarget(objectcore.AddressOf(ts), rand.Uint64(), objectcore.AddressOf(lock))
|
||||||
|
|
||||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -318,7 +319,7 @@ func TestCounters(t *testing.T) {
|
||||||
for _, o := range addrFromObjs(oo[:inhumedNumber]) {
|
for _, o := range addrFromObjs(oo[:inhumedNumber]) {
|
||||||
ts := oidtest.Address()
|
ts := oidtest.Address()
|
||||||
ts.SetContainer(o.Container())
|
ts.SetContainer(o.Container())
|
||||||
prm.SetTarget(ts, o)
|
prm.SetTarget(ts, rand.Uint64(), o)
|
||||||
_, err := sh.Inhume(context.Background(), prm)
|
_, err := sh.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
@ -18,7 +19,7 @@ type ObjectStorage interface {
|
||||||
Put(context.Context, *objectSDK.Object, bool) error
|
Put(context.Context, *objectSDK.Object, bool) error
|
||||||
// Delete must delete passed objects
|
// Delete must delete passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
|
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error
|
||||||
// Lock must lock passed objects
|
// Lock must lock passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error
|
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error
|
||||||
|
@ -38,7 +39,15 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
||||||
|
|
||||||
switch meta.Type() {
|
switch meta.Type() {
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
has, expEpoch, err := objectCore.GetExpirationEpoch(obj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get expiration epoch: %w", err)
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
return errors.New("tombstone has no expiration epoch")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects(), expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ func PopulateGraveyard(
|
||||||
for addr := range addrs {
|
for addr := range addrs {
|
||||||
prm := meta.InhumePrm{}
|
prm := meta.InhumePrm{}
|
||||||
prm.SetAddresses(addr)
|
prm.SetAddresses(addr)
|
||||||
prm.SetTombstoneAddress(tsAddr)
|
prm.SetTombstoneAddress(tsAddr, rand.Uint64())
|
||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
if _, err := db.Inhume(ctx, prm); err != nil {
|
if _, err := db.Inhume(ctx, prm); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue