package writecache import ( "context" "os" "path/filepath" "sync/atomic" "testing" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" "go.uber.org/zap" ) func TestFlush(t *testing.T) { testlogger := test.NewLogger(t) createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache { return New( append([]Option{ WithLogger(testlogger), WithPath(filepath.Join(t.TempDir(), "writecache")), WithSmallObjectSize(smallSize), WithMetabase(mb), WithBlobstor(bs), WithDisableBackgroundFlush(), }, opts...)...) } errCountOpt := func() (Option, *atomic.Uint32) { cnt := &atomic.Uint32{} return WithReportErrorFunc(func(msg string, err error) { cnt.Add(1) testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err)) }), cnt } failures := []TestFailureInjector[Option]{ { Desc: "db, invalid address", InjectFn: func(t *testing.T, wc Cache) { c := wc.(*cache) obj := testutil.GenerateObject() data, err := obj.Marshal() require.NoError(t, err) require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.Put([]byte{1, 2, 3}, data) })) }, }, { Desc: "db, invalid object", InjectFn: func(t *testing.T, wc Cache) { c := wc.(*cache) require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) k := []byte(oidtest.Address().EncodeToString()) v := []byte{1, 2, 3} return b.Put(k, v) })) }, }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { c := wc.(*cache) obj := testutil.GenerateObject() data, err := obj.Marshal() require.NoError(t, err) var prm common.PutPrm prm.Address = objectCore.AddressOf(obj) prm.RawData = data _, err = c.fsTree.Put(context.Background(), prm) require.NoError(t, err) p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) _, err = os.Stat(p) // sanity check require.NoError(t, err) require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled }, }, { Desc: "fs, invalid object", InjectFn: func(t *testing.T, wc Cache) { c := wc.(*cache) var prm common.PutPrm prm.Address = oidtest.Address() prm.RawData = []byte{1, 2, 3} _, err := c.fsTree.Put(context.Background(), prm) require.NoError(t, err) }, }, } runFlushTest(t, createCacheFn, errCountOpt, failures...) } const ( objCount = 4 smallSize = 256 ) type CreateCacheFunc[Option any] func( t *testing.T, smallSize uint64, meta *meta.DB, bs MainStorage, opts ...Option, ) Cache type TestFailureInjector[Option any] struct { Desc string InjectFn func(*testing.T, Cache) } type objectPair struct { addr oid.Address obj *objectSDK.Object } func runFlushTest[Option any]( t *testing.T, createCacheFn CreateCacheFunc[Option], errCountOption func() (Option, *atomic.Uint32), failures ...TestFailureInjector[Option], ) { t.Run("no errors", func(t *testing.T) { wc, bs, mb := newCache(t, createCacheFn, smallSize) defer func() { require.NoError(t, wc.Close()) }() objects := putObjects(t, wc) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, wc.Flush(context.Background(), false, false)) check(t, mb, bs, objects) }) t.Run("flush on moving to degraded mode", func(t *testing.T) { wc, bs, mb := newCache(t, createCacheFn, smallSize) defer func() { require.NoError(t, wc.Close()) }() objects := putObjects(t, wc) // Blobstor is read-only, so we expect en error from `flush` here. require.Error(t, wc.SetMode(mode.Degraded)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, wc.SetMode(mode.Degraded)) check(t, mb, bs, objects) }) t.Run("ignore errors", func(t *testing.T) { for _, f := range failures { f := f t.Run(f.Desc, func(t *testing.T) { errCountOpt, errCount := errCountOption() wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt) defer func() { require.NoError(t, wc.Close()) }() objects := putObjects(t, wc) f.InjectFn(t, wc) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) require.Equal(t, uint32(0), errCount.Load()) require.Error(t, wc.Flush(context.Background(), false, false)) require.Greater(t, errCount.Load(), uint32(0)) require.NoError(t, wc.Flush(context.Background(), true, false)) check(t, mb, bs, objects) }) } }) } func newCache[Option any]( t *testing.T, createCacheFn CreateCacheFunc[Option], smallSize uint64, opts ...Option, ) (Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() mb := meta.New( meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(dummyEpoch{})) require.NoError(t, mb.Open(context.Background(), mode.ReadWrite)) require.NoError(t, mb.Init()) bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ { Storage: fstree.New( fstree.WithPath(filepath.Join(dir, "blob")), fstree.WithDepth(0), fstree.WithDirNameLen(1)), }, })) require.NoError(t, bs.Open(context.Background(), mode.ReadWrite)) require.NoError(t, bs.Init()) wc := createCacheFn(t, smallSize, mb, bs, opts...) require.NoError(t, wc.Open(context.Background(), mode.ReadWrite)) require.NoError(t, wc.Init()) // First set mode for metabase and blobstor to prevent background flushes. require.NoError(t, mb.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadOnly)) return wc, bs, mb } func putObject(t *testing.T, c Cache, size int) objectPair { obj := testutil.GenerateObjectWithSize(size) data, err := obj.Marshal() require.NoError(t, err) var prm common.PutPrm prm.Address = objectCore.AddressOf(obj) prm.Object = obj prm.RawData = data _, err = c.Put(context.Background(), prm) require.NoError(t, err) return objectPair{prm.Address, prm.Object} } func putObjects(t *testing.T, c Cache) []objectPair { objects := make([]objectPair, objCount) for i := range objects { objects[i] = putObject(t, c, 1+(i%2)*smallSize) } return objects } func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { for i := range objects { var mPrm meta.StorageIDPrm mPrm.SetAddress(objects[i].addr) mRes, err := mb.StorageID(context.Background(), mPrm) require.NoError(t, err) var prm common.GetPrm prm.Address = objects[i].addr prm.StorageID = mRes.StorageID() res, err := bs.Get(context.Background(), prm) require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) } } type dummyEpoch struct{} func (dummyEpoch) CurrentEpoch() uint64 { return 0 }