246 lines
7.1 KiB
Go
246 lines
7.1 KiB
Go
package writecache
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func TestFlush(t *testing.T) {
|
|
testlogger := test.NewLogger(t)
|
|
|
|
createCacheFn := func(t *testing.T, mb *meta.DB, bs MainStorage, opts ...Option) Cache {
|
|
return New(
|
|
append([]Option{
|
|
WithLogger(testlogger),
|
|
WithPath(filepath.Join(t.TempDir(), "writecache")),
|
|
WithMetabase(mb),
|
|
WithBlobstor(bs),
|
|
WithDisableBackgroundFlush(),
|
|
}, opts...)...)
|
|
}
|
|
|
|
errCountOpt := func() (Option, *atomic.Uint32) {
|
|
cnt := &atomic.Uint32{}
|
|
return WithReportErrorFunc(func(ctx context.Context, msg string, err error) {
|
|
cnt.Add(1)
|
|
testlogger.Warn(ctx, msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err))
|
|
}), cnt
|
|
}
|
|
|
|
failures := []TestFailureInjector[Option]{
|
|
{
|
|
Desc: "fs, read error",
|
|
InjectFn: func(t *testing.T, wc Cache) {
|
|
c := wc.(*cache)
|
|
obj := testutil.GenerateObject()
|
|
data, err := obj.Marshal()
|
|
require.NoError(t, err)
|
|
|
|
var prm common.PutPrm
|
|
prm.Address = objectCore.AddressOf(obj)
|
|
prm.RawData = data
|
|
|
|
_, err = c.fsTree.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString()
|
|
p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:])
|
|
|
|
_, err = os.Stat(p) // sanity check
|
|
require.NoError(t, err)
|
|
require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled
|
|
},
|
|
},
|
|
{
|
|
Desc: "fs, invalid object",
|
|
InjectFn: func(t *testing.T, wc Cache) {
|
|
c := wc.(*cache)
|
|
var prm common.PutPrm
|
|
prm.Address = oidtest.Address()
|
|
prm.RawData = []byte{1, 2, 3}
|
|
_, err := c.fsTree.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
},
|
|
},
|
|
}
|
|
|
|
runFlushTest(t, createCacheFn, errCountOpt, failures...)
|
|
}
|
|
|
|
const (
|
|
objCount = 4
|
|
smallSize = 256
|
|
)
|
|
|
|
type CreateCacheFunc[Option any] func(
|
|
t *testing.T,
|
|
meta *meta.DB,
|
|
bs MainStorage,
|
|
opts ...Option,
|
|
) Cache
|
|
|
|
type TestFailureInjector[Option any] struct {
|
|
Desc string
|
|
InjectFn func(*testing.T, Cache)
|
|
}
|
|
|
|
type objectPair struct {
|
|
addr oid.Address
|
|
obj *objectSDK.Object
|
|
}
|
|
|
|
func runFlushTest[Option any](
|
|
t *testing.T,
|
|
createCacheFn CreateCacheFunc[Option],
|
|
errCountOption func() (Option, *atomic.Uint32),
|
|
failures ...TestFailureInjector[Option],
|
|
) {
|
|
t.Run("no errors", func(t *testing.T) {
|
|
wc, bs, mb := newCache(t, createCacheFn)
|
|
defer func() { require.NoError(t, wc.Close(context.Background())) }()
|
|
objects := putObjects(t, wc)
|
|
|
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
|
|
|
require.NoError(t, wc.Flush(context.Background(), false, false))
|
|
|
|
check(t, mb, bs, objects)
|
|
})
|
|
|
|
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
|
wc, bs, mb := newCache(t, createCacheFn)
|
|
defer func() { require.NoError(t, wc.Close(context.Background())) }()
|
|
objects := putObjects(t, wc)
|
|
|
|
// Blobstor is read-only, so we expect en error from `flush` here.
|
|
require.Error(t, wc.SetMode(context.Background(), mode.Degraded))
|
|
|
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, wc.SetMode(context.Background(), mode.Degraded))
|
|
|
|
check(t, mb, bs, objects)
|
|
})
|
|
|
|
t.Run("ignore errors", func(t *testing.T) {
|
|
for _, f := range failures {
|
|
t.Run(f.Desc, func(t *testing.T) {
|
|
errCountOpt, errCount := errCountOption()
|
|
wc, bs, mb := newCache(t, createCacheFn, errCountOpt)
|
|
defer func() { require.NoError(t, wc.Close(context.Background())) }()
|
|
objects := putObjects(t, wc)
|
|
f.InjectFn(t, wc)
|
|
|
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
|
|
|
require.Equal(t, uint32(0), errCount.Load())
|
|
require.Error(t, wc.Flush(context.Background(), false, false))
|
|
require.Greater(t, errCount.Load(), uint32(0))
|
|
require.NoError(t, wc.Flush(context.Background(), true, false))
|
|
|
|
check(t, mb, bs, objects)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func newCache[Option any](
|
|
t *testing.T,
|
|
createCacheFn CreateCacheFunc[Option],
|
|
opts ...Option,
|
|
) (Cache, *blobstor.BlobStor, *meta.DB) {
|
|
dir := t.TempDir()
|
|
mb := meta.New(
|
|
meta.WithPath(filepath.Join(dir, "meta")),
|
|
meta.WithEpochState(dummyEpoch{}))
|
|
require.NoError(t, mb.Open(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, mb.Init(context.Background()))
|
|
|
|
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
|
{
|
|
Storage: fstree.New(
|
|
fstree.WithPath(filepath.Join(dir, "blob")),
|
|
fstree.WithDepth(0),
|
|
fstree.WithDirNameLen(1)),
|
|
},
|
|
}))
|
|
require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, bs.Init(context.Background()))
|
|
|
|
wc := createCacheFn(t, mb, bs, opts...)
|
|
require.NoError(t, wc.Open(context.Background(), mode.ReadWrite))
|
|
require.NoError(t, wc.Init(context.Background()))
|
|
|
|
// First set mode for metabase and blobstor to prevent background flushes.
|
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
return wc, bs, mb
|
|
}
|
|
|
|
func putObject(t *testing.T, c Cache, size int) objectPair {
|
|
obj := testutil.GenerateObjectWithSize(size)
|
|
data, err := obj.Marshal()
|
|
require.NoError(t, err)
|
|
|
|
var prm common.PutPrm
|
|
prm.Address = objectCore.AddressOf(obj)
|
|
prm.Object = obj
|
|
prm.RawData = data
|
|
|
|
_, err = c.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
return objectPair{prm.Address, prm.Object}
|
|
}
|
|
|
|
func putObjects(t *testing.T, c Cache) []objectPair {
|
|
objects := make([]objectPair, objCount)
|
|
for i := range objects {
|
|
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
|
}
|
|
return objects
|
|
}
|
|
|
|
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
|
for i := range objects {
|
|
var mPrm meta.StorageIDPrm
|
|
mPrm.SetAddress(objects[i].addr)
|
|
|
|
mRes, err := mb.StorageID(context.Background(), mPrm)
|
|
require.NoError(t, err)
|
|
|
|
var prm common.GetPrm
|
|
prm.Address = objects[i].addr
|
|
prm.StorageID = mRes.StorageID()
|
|
|
|
res, err := bs.Get(context.Background(), prm)
|
|
require.NoError(t, err, objects[i].addr)
|
|
require.Equal(t, objects[i].obj, res.Object)
|
|
}
|
|
}
|
|
|
|
type dummyEpoch struct{}
|
|
|
|
func (dummyEpoch) CurrentEpoch() uint64 {
|
|
return 0
|
|
}
|