frostfs-node/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go

108 lines
3.3 KiB
Go

package writecachebbolt
import (
"context"
"os"
"path/filepath"
"sync/atomic"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachetest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func TestFlush(t *testing.T) {
testlogger := test.NewLogger(t, true)
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb),
WithBlobstor(bs),
}, opts...)...)
}
errCountOpt := func() (Option, *atomic.Uint32) {
cnt := &atomic.Uint32{}
return WithReportErrorFunc(func(msg string, err error) {
cnt.Add(1)
testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err))
}), cnt
}
failures := []writecachetest.TestFailureInjector[Option]{
{
Desc: "db, invalid address",
InjectFn: func(t *testing.T, wc writecache.Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
},
},
{
Desc: "db, invalid object",
InjectFn: func(t *testing.T, wc writecache.Cache) {
c := wc.(*cache)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
k := []byte(oidtest.Address().EncodeToString())
v := []byte{1, 2, 3}
return b.Put(k, v)
}))
},
},
{
Desc: "fs, read error",
InjectFn: func(t *testing.T, wc writecache.Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.RawData = data
_, err = c.fsTree.Put(context.Background(), prm)
require.NoError(t, err)
p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString()
p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:])
_, err = os.Stat(p) // sanity check
require.NoError(t, err)
require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled
},
},
{
Desc: "fs, invalid object",
InjectFn: func(t *testing.T, wc writecache.Cache) {
c := wc.(*cache)
var prm common.PutPrm
prm.Address = oidtest.Address()
prm.RawData = []byte{1, 2, 3}
_, err := c.fsTree.Put(context.Background(), prm)
require.NoError(t, err)
},
},
}
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
}