WIP: Put a tombstone on a different shard if a target shard is read-only #1403
7 changed files with 154 additions and 50 deletions
|
@ -94,6 +94,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return GetRes{}, errNotFound
|
||||
}
|
||||
|
||||
if client.IsErrObjectAlreadyRemoved(it.OutError) {
|
||||
return GetRes{}, it.OutError
|
||||
}
|
||||
|
||||
if it.Object == nil {
|
||||
if !it.HasDegraded && it.ShardWithMeta.Shard == nil || !client.IsErrObjectNotFound(it.OutError) {
|
||||
return GetRes{}, it.OutError
|
||||
|
@ -146,7 +150,10 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
|||
res, err := sh.Get(ctx, i.ShardPrm)
|
||||
if err == nil {
|
||||
i.Object = res.Object()
|
||||
return true
|
||||
// Keep iterating over shards because the object's GC mark can be
|
||||
// stored on another shard. For more information, please refer to
|
||||
// https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1403.
|
||||
return false
|
||||
}
|
||||
|
||||
if res.HasMeta() {
|
||||
|
|
|
@ -3,12 +3,17 @@ package engine
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -83,3 +88,116 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
require.Empty(t, addrs)
|
||||
})
|
||||
}
|
||||
|
||||
func TestObjectUnavailableIfTombstoneOnDifferentShard(t *testing.T) {
|
||||
t.Run("look first at the shard with the object", func(t *testing.T) {
|
||||
obj := objecttest.Object().Parent()
|
||||
|
||||
shard1 := testNewShard(t, shard.WithDisabledGC())
|
||||
shard2 := testNewShard(t, shard.WithDisabledGC())
|
||||
|
||||
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
|
||||
shards := engine.sortShards(object.AddressOf(obj))
|
||||
|
||||
shard1 = shards[0].Shard
|
||||
shard2 = shards[1].Shard
|
||||
|
||||
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
|
||||
})
|
||||
|
||||
t.Run("look first at the shard with the tombstone", func(t *testing.T) {
|
||||
obj := objecttest.Object().Parent()
|
||||
|
||||
shard1 := testNewShard(t, shard.WithDisabledGC())
|
||||
shard2 := testNewShard(t, shard.WithDisabledGC())
|
||||
|
||||
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
|
||||
shards := engine.sortShards(object.AddressOf(obj))
|
||||
|
||||
shard1 = shards[1].Shard
|
||||
shard2 = shards[0].Shard
|
||||
|
||||
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
|
||||
})
|
||||
}
|
||||
|
||||
func testObjectUnavailableIfTombstoneOnDifferentShard(
|
||||
t *testing.T,
|
||||
obj *objectSDK.Object,
|
||||
shard1, shard2 *shard.Shard,
|
||||
engine *StorageEngine,
|
||||
) {
|
||||
ctx := context.Background()
|
||||
|
||||
ts := oidtest.Address()
|
||||
cnr, set := obj.ContainerID()
|
||||
require.True(t, set)
|
||||
ts.SetContainer(cnr)
|
||||
|
||||
{
|
||||
prm := shard.PutPrm{}
|
||||
prm.SetObject(obj)
|
||||
|
||||
_, err := shard1.Put(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
{
|
||||
prm := shard.InhumePrm{}
|
||||
prm.SetTarget(ts, object.AddressOf(obj))
|
||||
|
||||
_, err := shard2.Inhume(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
{
|
||||
prm := GetPrm{}
|
||||
prm.WithAddress(object.AddressOf(obj))
|
||||
|
||||
_, err := engine.Get(ctx, prm)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, client.IsErrObjectAlreadyRemoved(err))
|
||||
}
|
||||
}
|
||||
|
||||
func TestObjectDeletedIfTombstoneOnDifferentShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
shard1 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
|
||||
shard2 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
|
||||
|
||||
obj := objecttest.Object().Parent()
|
||||
|
||||
ts := oidtest.Address()
|
||||
cnr, set := obj.ContainerID()
|
||||
require.True(t, set)
|
||||
ts.SetContainer(cnr)
|
||||
|
||||
{
|
||||
prm := shard.PutPrm{}
|
||||
prm.SetObject(obj)
|
||||
|
||||
_, err := shard1.Put(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
{
|
||||
prm := shard.InhumePrm{}
|
||||
prm.SetTarget(ts, object.AddressOf(obj))
|
||||
|
||||
_, err := shard2.Inhume(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
{
|
||||
prm := shard.ExistsPrm{
|
||||
Address: object.AddressOf(obj),
|
||||
}
|
||||
|
||||
res, err := shard1.Exists(ctx, prm)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, res.Exists())
|
||||
|
||||
_, err = shard2.Exists(ctx, prm)
|
||||
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -381,8 +381,6 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
var addr oid.Address
|
||||
counters := make(map[cid.ID]ObjectCounters)
|
||||
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
key := make([]byte, addressKeySize)
|
||||
var isAvailable bool
|
||||
|
||||
|
@ -402,7 +400,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
|
||||
// check if an object is available: not with GCMark
|
||||
// and not covered with a tombstone
|
||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
||||
if inGraveyardWithKey(tx, addressKey(addr, key)) == 0 {
|
||||
if v, ok := counters[cnr]; ok {
|
||||
v.Logic++
|
||||
counters[cnr] = v
|
||||
|
|
|
@ -247,9 +247,8 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
key := make([]byte, addressKeySize)
|
||||
addrKey := addressKey(addr, key)
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
||||
removeAvailableObject := inGraveyardWithKey(tx, addrKey) == 0
|
||||
|
||||
// unmarshal object, work only with physically stored (raw == true) objects
|
||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
||||
|
|
|
@ -156,39 +156,26 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, erro
|
|||
return 3, nil
|
||||
}
|
||||
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
||||
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt), nil
|
||||
return inGraveyardWithKey(tx, addrKey), nil
|
||||
}
|
||||
|
||||
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
|
||||
if graveyard == nil {
|
||||
// incorrect metabase state, does not make
|
||||
// sense to check garbage bucket
|
||||
func inGraveyardWithKey(tx *bbolt.Tx, addrKey []byte) uint8 {
|
||||
graveyard := tx.Bucket(graveyardBucketName)
|
||||
garbage := tx.Bucket(garbageBucketName)
|
||||
|
||||
switch {
|
||||
case graveyard != nil && graveyard.Get(addrKey) != nil:
|
||||
// the object in the graveyard
|
||||
return 2
|
||||
case garbage != nil && garbage.Get(addrKey) != nil:
|
||||
// the object has been marked with GC
|
||||
return 1
|
||||
default:
|
||||
// - neither the object is in the graveyard nor it has been marked with GC
|
||||
// - alternatively, the metabase is in an incorrect state
|
||||
return 0
|
||||
}
|
||||
|
||||
val := graveyard.Get(addrKey)
|
||||
if val == nil {
|
||||
if garbageBCK == nil {
|
||||
// incorrect node state
|
||||
return 0
|
||||
}
|
||||
|
||||
val = garbageBCK.Get(addrKey)
|
||||
if val != nil {
|
||||
// object has been marked with GC
|
||||
return 1
|
||||
}
|
||||
|
||||
// neither in the graveyard
|
||||
// nor was marked with GC mark
|
||||
return 0
|
||||
}
|
||||
|
||||
// object in the graveyard
|
||||
return 2
|
||||
}
|
||||
|
||||
// inBucket checks if key <key> is present in bucket <name>.
|
||||
|
|
|
@ -249,7 +249,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
targetKey := addressKey(prm.target[i], buf)
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
err = db.updateDeleteInfo(tx, targetKey, cnr, obj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
|||
return err
|
||||
}
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
|
||||
err = db.updateDeleteInfo(tx, chunkKey, cnr, chunkObj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -390,9 +390,9 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte
|
|||
return false, garbageBKT.Put(addressKey, zeroValue)
|
||||
}
|
||||
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
if inGraveyardWithKey(tx, targetKey) == 0 {
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
||||
}
|
||||
|
||||
|
|
|
@ -134,8 +134,6 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
|||
|
||||
var containerID cid.ID
|
||||
var offset []byte
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
|
||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||
|
||||
|
@ -162,7 +160,7 @@ loop:
|
|||
bkt := tx.Bucket(name)
|
||||
if bkt != nil {
|
||||
copy(rawAddr, cidRaw)
|
||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, offset, cursor, err = selectNFromBucket(tx, bkt, objType, rawAddr, containerID,
|
||||
result, count, cursor, threshold)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -199,9 +197,10 @@ loop:
|
|||
|
||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||
// object to start selecting from. Ignores inhumed objects.
|
||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||
func selectNFromBucket(
|
||||
tx *bbolt.Tx,
|
||||
bkt *bbolt.Bucket, // main bucket
|
||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||
cidRaw []byte, // container ID prefix, optimization
|
||||
cnt cid.ID, // container ID
|
||||
to []objectcore.Info, // listing result
|
||||
|
@ -235,7 +234,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
}
|
||||
|
||||
offset = k
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -375,8 +374,6 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
|||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
c := bkt.Cursor()
|
||||
k, v := c.First()
|
||||
|
||||
|
@ -399,7 +396,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
|||
break
|
||||
}
|
||||
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -463,12 +460,10 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
|
|||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
c := bkt.Cursor()
|
||||
k, _ := c.First()
|
||||
for ; k != nil; k, _ = c.Next() {
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
|
|
Loading…
Reference in a new issue