frostfs-node/pkg/local_object_storage/writecache/writecachetest/flush.go

186 lines
5.1 KiB
Go

package writecachetest
import (
"context"
"path/filepath"
"sync/atomic"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
const (
objCount = 4
smallSize = 256
)
type CreateCacheFunc[Option any] func(
t *testing.T,
smallSize uint64,
meta *meta.DB,
bs writecache.MainStorage,
opts ...Option,
) writecache.Cache
type TestFailureInjector[Option any] struct {
Desc string
InjectFn func(*testing.T, writecache.Cache)
}
type objectPair struct {
addr oid.Address
obj *objectSDK.Object
}
func TestFlush[Option any](
t *testing.T,
createCacheFn CreateCacheFunc[Option],
errCountOption func() (Option, *atomic.Uint32),
failures ...TestFailureInjector[Option],
) {
t.Run("no errors", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize)
objects := putObjects(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false))
check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize)
objects := putObjects(t, wc)
// Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded))
// First move to read-only mode to close background workers.
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded))
check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
for _, f := range failures {
f := f
t.Run(f.Desc, func(t *testing.T) {
errCountOpt, errCount := errCountOption()
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
objects := putObjects(t, wc)
f.InjectFn(t, wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false))
require.True(t, errCount.Load() > 0)
require.NoError(t, wc.Flush(context.Background(), true))
check(t, mb, bs, objects)
})
}
})
}
func newCache[Option any](
t *testing.T,
createCacheFn CreateCacheFunc[Option],
smallSize uint64,
opts ...Option,
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir()
mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1)),
},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
wc := createCacheFn(t, smallSize, mb, bs, opts...)
t.Cleanup(func() { require.NoError(t, wc.Close()) })
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
return wc, bs, mb
}
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
obj := testutil.GenerateObjectWithSize(size)
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err = c.Put(context.Background(), prm)
require.NoError(t, err)
return objectPair{prm.Address, prm.Object}
}
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
objects := make([]objectPair, objCount)
for i := range objects {
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
}
return objects
}
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects {
var mPrm meta.StorageIDPrm
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(context.Background(), mPrm)
require.NoError(t, err)
var prm common.GetPrm
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
}
type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 {
return 0
}