WIP: Put a tombstone on a different shard if a target shard is read-only #1403

Closed
a-savchuk wants to merge 3 commits from a-savchuk/frostfs-node:put-tombstone-on-different-shard into master
7 changed files with 154 additions and 50 deletions

View file

@ -94,6 +94,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, errNotFound
}
if client.IsErrObjectAlreadyRemoved(it.OutError) {
return GetRes{}, it.OutError
}
if it.Object == nil {
if !it.HasDegraded && it.ShardWithMeta.Shard == nil || !client.IsErrObjectNotFound(it.OutError) {
return GetRes{}, it.OutError
@ -146,7 +150,10 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
res, err := sh.Get(ctx, i.ShardPrm)
if err == nil {
i.Object = res.Object()
return true
// Keep iterating over shards because the object's GC mark can be
// stored on another shard. For more information, please refer to
// https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1403.
return false
}
if res.HasMeta() {

View file

@ -3,12 +3,17 @@ package engine
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -83,3 +88,116 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.Empty(t, addrs)
})
}
func TestObjectUnavailableIfTombstoneOnDifferentShard(t *testing.T) {
t.Run("look first at the shard with the object", func(t *testing.T) {
obj := objecttest.Object().Parent()
shard1 := testNewShard(t, shard.WithDisabledGC())
shard2 := testNewShard(t, shard.WithDisabledGC())
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
shards := engine.sortShards(object.AddressOf(obj))
shard1 = shards[0].Shard
shard2 = shards[1].Shard
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
})
t.Run("look first at the shard with the tombstone", func(t *testing.T) {
obj := objecttest.Object().Parent()
shard1 := testNewShard(t, shard.WithDisabledGC())
shard2 := testNewShard(t, shard.WithDisabledGC())
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
shards := engine.sortShards(object.AddressOf(obj))
shard1 = shards[1].Shard
shard2 = shards[0].Shard
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
})
}
func testObjectUnavailableIfTombstoneOnDifferentShard(
t *testing.T,
obj *objectSDK.Object,
shard1, shard2 *shard.Shard,
engine *StorageEngine,
) {
ctx := context.Background()
ts := oidtest.Address()
cnr, set := obj.ContainerID()
require.True(t, set)
ts.SetContainer(cnr)
{
prm := shard.PutPrm{}
prm.SetObject(obj)
_, err := shard1.Put(ctx, prm)
require.NoError(t, err)
}
{
prm := shard.InhumePrm{}
prm.SetTarget(ts, object.AddressOf(obj))
_, err := shard2.Inhume(ctx, prm)
require.NoError(t, err)
}
{
prm := GetPrm{}
prm.WithAddress(object.AddressOf(obj))
_, err := engine.Get(ctx, prm)
assert.Error(t, err)
assert.True(t, client.IsErrObjectAlreadyRemoved(err))
}
}
func TestObjectDeletedIfTombstoneOnDifferentShard(t *testing.T) {
ctx := context.Background()
shard1 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
shard2 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
obj := objecttest.Object().Parent()
ts := oidtest.Address()
cnr, set := obj.ContainerID()
require.True(t, set)
ts.SetContainer(cnr)
{
prm := shard.PutPrm{}
prm.SetObject(obj)
_, err := shard1.Put(ctx, prm)
require.NoError(t, err)
}
{
prm := shard.InhumePrm{}
prm.SetTarget(ts, object.AddressOf(obj))
_, err := shard2.Inhume(ctx, prm)
require.NoError(t, err)
}
<-time.After(1 * time.Second)
{
prm := shard.ExistsPrm{
Address: object.AddressOf(obj),
}
res, err := shard1.Exists(ctx, prm)
assert.NoError(t, err)
assert.False(t, res.Exists())
_, err = shard2.Exists(ctx, prm)
require.True(t, client.IsErrObjectAlreadyRemoved(err))
}
}

View file

@ -381,8 +381,6 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
var addr oid.Address
counters := make(map[cid.ID]ObjectCounters)
graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := tx.Bucket(garbageBucketName)
key := make([]byte, addressKeySize)
var isAvailable bool
@ -402,7 +400,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
// check if an object is available: not with GCMark
// and not covered with a tombstone
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
if inGraveyardWithKey(tx, addressKey(addr, key)) == 0 {
if v, ok := counters[cnr]; ok {
v.Logic++
counters[cnr] = v

View file

@ -247,9 +247,8 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
removeAvailableObject := inGraveyardWithKey(tx, addrKey) == 0
// unmarshal object, work only with physically stored (raw == true) objects
obj, err := db.get(tx, addr, key, false, true, currEpoch)

View file

@ -156,39 +156,26 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, erro
return 3, nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
addrKey := addressKey(addr, make([]byte, addressKeySize))
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt), nil
return inGraveyardWithKey(tx, addrKey), nil
}
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
if graveyard == nil {
// incorrect metabase state, does not make
// sense to check garbage bucket
func inGraveyardWithKey(tx *bbolt.Tx, addrKey []byte) uint8 {
graveyard := tx.Bucket(graveyardBucketName)
garbage := tx.Bucket(garbageBucketName)
switch {
case graveyard != nil && graveyard.Get(addrKey) != nil:
// the object in the graveyard
return 2
case garbage != nil && garbage.Get(addrKey) != nil:
// the object has been marked with GC
return 1
default:
// - neither the object is in the graveyard nor it has been marked with GC
// - alternatively, the metabase is in an incorrect state
return 0
}
val := graveyard.Get(addrKey)
if val == nil {
if garbageBCK == nil {
// incorrect node state
return 0
}
val = garbageBCK.Get(addrKey)
if val != nil {
// object has been marked with GC
return 1
}
// neither in the graveyard
// nor was marked with GC mark
return 0
}
// object in the graveyard
return 2
}
// inBucket checks if key <key> is present in bucket <name>.

View file

@ -249,7 +249,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
err = db.updateDeleteInfo(tx, targetKey, cnr, obj, res)
if err != nil {
return err
}
@ -315,7 +315,7 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
err = db.updateDeleteInfo(tx, chunkKey, cnr, chunkObj, res)
if err != nil {
return err
}
@ -390,9 +390,9 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte
return false, garbageBKT.Put(addressKey, zeroValue)
}
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
if inGraveyardWithKey(tx, targetKey) == 0 {
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
}

View file

@ -134,8 +134,6 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
var containerID cid.ID
var offset []byte
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
rawAddr := make([]byte, cidSize, addressKeySize)
@ -162,7 +160,7 @@ loop:
bkt := tx.Bucket(name)
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, offset, cursor, err = selectNFromBucket(tx, bkt, objType, rawAddr, containerID,
result, count, cursor, threshold)
if err != nil {
return nil, nil, err
@ -199,9 +197,10 @@ loop:
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
func selectNFromBucket(
tx *bbolt.Tx,
bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
to []objectcore.Info, // listing result
@ -235,7 +234,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
}
offset = k
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
continue
}
@ -375,8 +374,6 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
@ -399,7 +396,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
continue
}
@ -463,12 +460,10 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, _ := c.First()
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
if inGraveyardWithKey(tx, append(cidRaw, k...)) > 0 {
continue
}
count++