frostfs-node/pkg/local_object_storage/shard/control_test.go
Airat Arifullin cb172e73a6
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
[#228] node: Use uber atomic package instead standard
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-04-07 15:37:27 +00:00

380 lines
9.5 KiB
Go

package shard
import (
"context"
"io/fs"
"math"
"os"
"path/filepath"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap/zaptest"
)
type epochState struct{}
func (s epochState) CurrentEpoch() uint64 {
return 0
}
type objAddr struct {
obj *objectSDK.Object
addr oid.Address
}
func TestShardOpen(t *testing.T) {
dir := t.TempDir()
metaPath := filepath.Join(dir, "meta")
st := teststore.New(teststore.WithSubstorage(fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1)),
))
var allowedMode atomic.Int64
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
const modeMask = os.O_RDONLY | os.O_RDWR | os.O_WRONLY
if int64(f&modeMask) == allowedMode.Load() {
return os.OpenFile(p, f, perm)
}
return nil, fs.ErrPermission
}
newShard := func() *Shard {
return New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{Storage: st},
})),
WithMetaBaseOptions(
meta.WithPath(metaPath),
meta.WithEpochState(epochState{}),
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithWriteCache(true),
WithWriteCacheOptions(
writecache.WithPath(filepath.Join(dir, "wc"))))
}
allowedMode.Store(int64(os.O_RDWR))
sh := newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadWrite, sh.GetMode())
require.NoError(t, sh.Close())
// Metabase can be opened in read-only => start in ReadOnly mode.
allowedMode.Store(int64(os.O_RDONLY))
sh = newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadOnly, sh.GetMode())
require.Error(t, sh.SetMode(mode.ReadWrite))
require.Equal(t, mode.ReadOnly, sh.GetMode())
require.NoError(t, sh.Close())
// Metabase is corrupted => start in DegradedReadOnly mode.
allowedMode.Store(math.MaxInt64)
sh = newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
require.NoError(t, sh.Close())
}
func TestRefillMetabaseCorrupted(t *testing.T) {
dir := t.TempDir()
fsTree := fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1))
blobOpts := []blobstor.Option{
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fsTree,
},
}),
}
sh := New(
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)
obj.SetPayload([]byte{0, 1, 2, 3, 4, 5})
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(putPrm)
require.NoError(t, err)
require.NoError(t, sh.Close())
addr := object.AddressOf(obj)
_, err = fsTree.Put(common.PutPrm{Address: addr, RawData: []byte("not an object")})
require.NoError(t, err)
sh = New(
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
WithRefillMetabase(true))
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
var getPrm GetPrm
getPrm.SetAddress(addr)
_, err = sh.Get(getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
require.NoError(t, sh.Close())
}
func TestRefillMetabase(t *testing.T) {
p := t.Name()
defer os.RemoveAll(p)
blobOpts := []blobstor.Option{
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(p, "blob")),
fstree.WithDepth(1)),
},
}),
}
sh := New(
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))),
)
// open Blobstor
require.NoError(t, sh.Open())
// initialize Blobstor
require.NoError(t, sh.Init(context.Background()))
const objNum = 5
mObjs := make(map[string]objAddr)
locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID()
cnrLocked := cidtest.ID()
for i := uint64(0); i < objNum; i++ {
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)
if len(locked) < 2 {
obj.SetContainerID(cnrLocked)
id, _ := obj.ID()
locked = append(locked, id)
}
addr := object.AddressOf(obj)
mObjs[addr.EncodeToString()] = objAddr{
obj: obj,
addr: addr,
}
}
tombObj := objecttest.Object()
tombObj.SetType(objectSDK.TypeTombstone)
tombstone := objecttest.Tombstone()
tombData, err := tombstone.Marshal()
require.NoError(t, err)
tombObj.SetPayload(tombData)
tombMembers := make([]oid.Address, 0, len(tombstone.Members()))
members := tombstone.Members()
for i := range tombstone.Members() {
var a oid.Address
a.SetObject(members[i])
cnr, _ := tombObj.ContainerID()
a.SetContainer(cnr)
tombMembers = append(tombMembers, a)
}
var putPrm PutPrm
for _, v := range mObjs {
putPrm.SetObject(v.obj)
_, err := sh.Put(putPrm)
require.NoError(t, err)
}
putPrm.SetObject(tombObj)
_, err = sh.Put(putPrm)
require.NoError(t, err)
// LOCK object handling
var lock objectSDK.Lock
lock.WriteMembers(locked)
lockObj := objecttest.Object()
lockObj.SetContainerID(cnrLocked)
objectSDK.WriteLock(lockObj, lock)
putPrm.SetObject(lockObj)
_, err = sh.Put(putPrm)
require.NoError(t, err)
lockID, _ := lockObj.ID()
require.NoError(t, sh.Lock(cnrLocked, lockID, locked))
var inhumePrm InhumePrm
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var headPrm HeadPrm
checkObj := func(addr oid.Address, expObj *objectSDK.Object) {
headPrm.SetAddress(addr)
res, err := sh.Head(headPrm)
if expObj == nil {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
return
}
require.NoError(t, err)
require.Equal(t, expObj.CutPayload(), res.Object())
}
checkAllObjs := func(exists bool) {
for _, v := range mObjs {
if exists {
checkObj(v.addr, v.obj)
} else {
checkObj(v.addr, nil)
}
}
}
checkTombMembers := func(exists bool) {
for _, member := range tombMembers {
headPrm.SetAddress(member)
_, err := sh.Head(headPrm)
if exists {
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
} else {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
}
}
}
checkLocked := func(t *testing.T, cnr cid.ID, locked []oid.ID) {
var addr oid.Address
addr.SetContainer(cnr)
for i := range locked {
addr.SetObject(locked[i])
var prm InhumePrm
prm.MarkAsGarbage(addr)
_, err := sh.Inhume(context.Background(), prm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked),
"object %s should be locked", locked[i])
}
}
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
c, err := sh.metaBase.ObjectCounters()
require.NoError(t, err)
phyBefore := c.Phy()
logicalBefore := c.Logic()
err = sh.Close()
require.NoError(t, err)
sh = New(
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta_restored")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
)
// open Blobstor
require.NoError(t, sh.Open())
// initialize Blobstor
require.NoError(t, sh.Init(context.Background()))
defer sh.Close()
checkAllObjs(false)
checkObj(object.AddressOf(tombObj), nil)
checkTombMembers(false)
err = sh.refillMetabase()
require.NoError(t, err)
c, err = sh.metaBase.ObjectCounters()
require.NoError(t, err)
require.Equal(t, phyBefore, c.Phy())
require.Equal(t, logicalBefore, c.Logic())
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
}