diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 3dd3c67a46..84359cdae2 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -6,7 +6,8 @@ import ( // FlushWriteCachePrm groups the parameters of FlushWriteCache operation. type FlushWriteCachePrm struct { - shardID *shard.ID + shardID *shard.ID + ignoreErrors bool } // SetShardID is an option to set shard ID. @@ -16,6 +17,11 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) { p.shardID = id } +// SetIgnoreErrors sets errors ignore flag.. +func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { + p.ignoreErrors = ignore +} + // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. type FlushWriteCacheRes struct{} @@ -29,5 +35,8 @@ func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRe return FlushWriteCacheRes{}, errShardNotFound } - return FlushWriteCacheRes{}, sh.FlushWriteCache() + var prm shard.FlushWriteCachePrm + prm.SetIgnoreErrors(p.ignoreErrors) + + return FlushWriteCacheRes{}, sh.FlushWriteCache(prm) } diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index 37de9d788a..c217ab913b 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -6,13 +6,23 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" ) +// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. +type FlushWriteCachePrm struct { + ignoreErrors bool +} + +// SetIgnoreErrors sets the flag to ignore read-errors during flush. +func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { + p.ignoreErrors = ignore +} + // errWriteCacheDisabled is returned when an operation on write-cache is performed, // but write-cache is disabled. var errWriteCacheDisabled = errors.New("write-cache is disabled") // FlushWriteCache moves writecache in read-only mode and flushes all data from it. // After the operation writecache will remain read-only mode. -func (s *Shard) FlushWriteCache() error { +func (s *Shard) FlushWriteCache(p FlushWriteCachePrm) error { if !s.hasWriteCache() { return errWriteCacheDisabled } @@ -32,5 +42,5 @@ func (s *Shard) FlushWriteCache() error { return err } - return s.writeCache.Flush() + return s.writeCache.Flush(p.ignoreErrors) } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 12c1bb3c2f..ef7cf8c5b0 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -233,7 +233,7 @@ func (c *cache) flushObject(obj *object.Object) error { // Flush flushes all objects from the write-cache to the main storage. // Write-cache must be in readonly mode to ensure correctness of an operation and // to prevent interference with background flush workers. -func (c *cache) Flush() error { +func (c *cache) Flush(ignoreErrors bool) error { c.modeMtx.RLock() defer c.modeMtx.RUnlock() @@ -242,6 +242,7 @@ func (c *cache) Flush() error { } var prm common.IteratePrm + prm.IgnoreErrors = ignoreErrors prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { _, ok := c.flushed.Peek(addr.EncodeToString()) if ok { @@ -250,12 +251,18 @@ func (c *cache) Flush() error { data, err := f() if err != nil { + if ignoreErrors { + return nil + } return err } var obj object.Object err = obj.Unmarshal(data) if err != nil { + if ignoreErrors { + return nil + } return err } @@ -279,11 +286,17 @@ func (c *cache) Flush() error { } if err := addr.DecodeString(sa); err != nil { + if ignoreErrors { + continue + } return err } var obj object.Object if err := obj.Unmarshal(data); err != nil { + if ignoreErrors { + continue + } return err } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 7ada3cca57..2044b429e9 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -1,6 +1,7 @@ package writecache import ( + "os" "path/filepath" "testing" @@ -18,6 +19,7 @@ import ( usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" "go.uber.org/zap/zaptest" ) @@ -27,105 +29,191 @@ func TestFlush(t *testing.T) { smallSize = 256 ) - dir := t.TempDir() - mb := meta.New( - meta.WithPath(filepath.Join(dir, "meta")), - meta.WithEpochState(dummyEpoch{})) - require.NoError(t, mb.Open(false)) - require.NoError(t, mb.Init()) - - fsTree := fstree.New(fstree.WithPath(filepath.Join(dir, "blob"))) - bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ - {Storage: fsTree}, - })) - require.NoError(t, bs.Open(false)) - require.NoError(t, bs.Init()) - - wc := New( - WithLogger(zaptest.NewLogger(t)), - WithPath(filepath.Join(dir, "writecache")), - WithSmallObjectSize(smallSize), - WithMetabase(mb), - WithBlobstor(bs)) - require.NoError(t, wc.Open(false)) - require.NoError(t, wc.Init()) - - // First set mode for metabase and blobstor to prevent background flushes. - require.NoError(t, mb.SetMode(mode.ReadOnly)) - require.NoError(t, bs.SetMode(mode.ReadOnly)) - type objectPair struct { addr oid.Address obj *object.Object } - objects := make([]objectPair, objCount) - for i := range objects { - obj := object.New() - ver := versionSDK.Current() + newCache := func(t *testing.T) (Cache, *blobstor.BlobStor, *meta.DB) { + dir := t.TempDir() + mb := meta.New( + meta.WithPath(filepath.Join(dir, "meta")), + meta.WithEpochState(dummyEpoch{})) + require.NoError(t, mb.Open(false)) + require.NoError(t, mb.Init()) - obj.SetID(oidtest.ID()) - obj.SetOwnerID(usertest.ID()) - obj.SetContainerID(cidtest.ID()) - obj.SetType(object.TypeRegular) - obj.SetVersion(&ver) - obj.SetPayloadChecksum(checksumtest.Checksum()) - obj.SetPayloadHomomorphicHash(checksumtest.Checksum()) - obj.SetPayload(make([]byte, 1+(i%2)*smallSize)) + fsTree := fstree.New( + fstree.WithPath(filepath.Join(dir, "blob")), + fstree.WithDepth(0), + fstree.WithDirNameLen(1)) + bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ + {Storage: fsTree}, + })) + require.NoError(t, bs.Open(false)) + require.NoError(t, bs.Init()) - addr := objectCore.AddressOf(obj) - data, err := obj.Marshal() - require.NoError(t, err) + wc := New( + WithLogger(zaptest.NewLogger(t)), + WithPath(filepath.Join(dir, "writecache")), + WithSmallObjectSize(smallSize), + WithMetabase(mb), + WithBlobstor(bs)) + require.NoError(t, wc.Open(false)) + require.NoError(t, wc.Init()) - var prm common.PutPrm - prm.Address = objectCore.AddressOf(obj) - prm.Object = obj - prm.RawData = data + // First set mode for metabase and blobstor to prevent background flushes. + require.NoError(t, mb.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadOnly)) - _, err = wc.Put(prm) - require.NoError(t, err) - - objects[i] = objectPair{addr: addr, obj: obj} + return wc, bs, mb } - t.Run("must be read-only", func(t *testing.T) { - require.ErrorIs(t, wc.Flush(), errMustBeReadOnly) + putObjects := func(t *testing.T, c Cache) []objectPair { + objects := make([]objectPair, objCount) + for i := range objects { + obj, data := newObject(t, 1+(i%2)*smallSize) + addr := objectCore.AddressOf(obj) + + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.Object = obj + prm.RawData = data + + _, err := c.Put(prm) + require.NoError(t, err) + + objects[i] = objectPair{addr: addr, obj: obj} + } + return objects + } + + check := func(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { + for i := range objects { + var mPrm meta.StorageIDPrm + mPrm.SetAddress(objects[i].addr) + + mRes, err := mb.StorageID(mPrm) + require.NoError(t, err) + + var prm common.GetPrm + prm.Address = objects[i].addr + prm.StorageID = mRes.StorageID() + + res, err := bs.Get(prm) + require.NoError(t, err) + require.Equal(t, objects[i].obj, res.Object) + } + } + + t.Run("no errors", func(t *testing.T) { + wc, bs, mb := newCache(t) + objects := putObjects(t, wc) + + t.Run("must be read-only", func(t *testing.T) { + require.ErrorIs(t, wc.Flush(false), errMustBeReadOnly) + }) + + require.NoError(t, wc.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) + wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) + + require.NoError(t, wc.Flush(false)) + + for i := 0; i < 2; i++ { + var mPrm meta.GetPrm + mPrm.SetAddress(objects[i].addr) + _, err := mb.Get(mPrm) + require.Error(t, err) + + _, err = bs.Get(common.GetPrm{Address: objects[i].addr}) + require.Error(t, err) + } + + check(t, mb, bs, objects[2:]) }) - require.NoError(t, wc.SetMode(mode.ReadOnly)) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) + t.Run("ignore errors", func(t *testing.T) { + testIgnoreErrors := func(t *testing.T, f func(*cache)) { + wc, bs, mb := newCache(t) + objects := putObjects(t, wc) + f(wc.(*cache)) - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) + require.NoError(t, wc.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) - require.NoError(t, wc.Flush()) + require.Error(t, wc.Flush(false)) + require.NoError(t, wc.Flush(true)) - for i := 0; i < 2; i++ { - var mPrm meta.GetPrm - mPrm.SetAddress(objects[i].addr) - _, err := mb.Get(mPrm) - require.Error(t, err) + check(t, mb, bs, objects) + } + t.Run("db, invalid address", func(t *testing.T) { + testIgnoreErrors(t, func(c *cache) { + _, data := newObject(t, 1) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte{1, 2, 3}, data) + })) + }) + }) + t.Run("db, invalid object", func(t *testing.T) { + testIgnoreErrors(t, func(c *cache) { + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) + })) + }) + }) + t.Run("fs, read error", func(t *testing.T) { + testIgnoreErrors(t, func(c *cache) { + obj, data := newObject(t, 1) - _, err = bs.Get(common.GetPrm{Address: objects[i].addr}) - require.Error(t, err) - } + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.RawData = data - for i := 2; i < objCount; i++ { - var mPrm meta.StorageIDPrm - mPrm.SetAddress(objects[i].addr) + _, err := c.fsTree.Put(prm) + require.NoError(t, err) - mRes, err := mb.StorageID(mPrm) - require.NoError(t, err) + p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() + p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) - var prm common.GetPrm - prm.Address = objects[i].addr - prm.StorageID = mRes.StorageID() + _, err = os.Stat(p) // sanity check + require.NoError(t, err) + require.NoError(t, os.Chmod(p, 0)) + }) + }) + t.Run("fs, invalid object", func(t *testing.T) { + testIgnoreErrors(t, func(c *cache) { + var prm common.PutPrm + prm.Address = oidtest.Address() + prm.RawData = []byte{1, 2, 3} + _, err := c.fsTree.Put(prm) + require.NoError(t, err) + }) + }) + }) +} - res, err := bs.Get(prm) - require.NoError(t, err) - require.Equal(t, objects[i].obj, res.Object) - } +func newObject(t *testing.T, size int) (*object.Object, []byte) { + obj := object.New() + ver := versionSDK.Current() + + obj.SetID(oidtest.ID()) + obj.SetOwnerID(usertest.ID()) + obj.SetContainerID(cidtest.ID()) + obj.SetType(object.TypeRegular) + obj.SetVersion(&ver) + obj.SetPayloadChecksum(checksumtest.Checksum()) + obj.SetPayloadHomomorphicHash(checksumtest.Checksum()) + obj.SetPayload(make([]byte, size)) + + data, err := obj.Marshal() + require.NoError(t, err) + return obj, data } type dummyEpoch struct{} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index e91666a3ea..8d2f0d7547 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -28,7 +28,7 @@ type Cache interface { SetMode(mode.Mode) error SetLogger(*zap.Logger) DumpInfo() Info - Flush() error + Flush(bool) error Init() error Open(readOnly bool) error