frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go

196 lines
5.6 KiB
Go

package blobovniczatree
import (
"bytes"
"context"
"path/filepath"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestRebuildFailover(t *testing.T) {
t.Parallel()
t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved)
t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget)
t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource)
}
func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, false)
}
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
var dPrm common.DeletePrm
dPrm.Address = object.AddressOf(obj)
dPrm.StorageID = []byte("0/0/1")
_, err := b.Delete(context.Background(), dPrm)
require.ErrorIs(t, err, errObjectIsDeleteProtected)
metaStub := &storageIDUpdateStub{
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err := blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(object.AddressOf(obj))
_, err = blz.Get(context.Background(), gPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err = blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
gRes, err := blz.Get(context.Background(), gPrm)
require.NoError(t, err)
require.True(t, len(gRes.Object()) > 0)
if mustUpdateStorageID {
require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)]))
}
require.NoError(t, blz.Close())
}