From 0a411908eeb3df3c9462caaedd45e651d82008ad Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@morphbits.ru>
Date: Mon, 26 Sep 2022 11:54:21 +0300
Subject: [PATCH] [#1806] writecache: Allow to ignore read errors during flush

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
---
 pkg/local_object_storage/engine/writecache.go |  13 +-
 pkg/local_object_storage/shard/writecache.go  |  14 +-
 pkg/local_object_storage/writecache/flush.go  |  15 +-
 .../writecache/flush_test.go                  | 244 ++++++++++++------
 .../writecache/writecache.go                  |   2 +-
 5 files changed, 204 insertions(+), 84 deletions(-)

diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go
index 3dd3c67a4..84359cdae 100644
--- a/pkg/local_object_storage/engine/writecache.go
+++ b/pkg/local_object_storage/engine/writecache.go
@@ -6,7 +6,8 @@ import (
 
 // FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
 type FlushWriteCachePrm struct {
-	shardID *shard.ID
+	shardID      *shard.ID
+	ignoreErrors bool
 }
 
 // SetShardID is an option to set shard ID.
@@ -16,6 +17,11 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
 	p.shardID = id
 }
 
+// SetIgnoreErrors sets errors ignore flag..
+func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
+	p.ignoreErrors = ignore
+}
+
 // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
 type FlushWriteCacheRes struct{}
 
@@ -29,5 +35,8 @@ func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRe
 		return FlushWriteCacheRes{}, errShardNotFound
 	}
 
-	return FlushWriteCacheRes{}, sh.FlushWriteCache()
+	var prm shard.FlushWriteCachePrm
+	prm.SetIgnoreErrors(p.ignoreErrors)
+
+	return FlushWriteCacheRes{}, sh.FlushWriteCache(prm)
 }
diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go
index 37de9d788..c217ab913 100644
--- a/pkg/local_object_storage/shard/writecache.go
+++ b/pkg/local_object_storage/shard/writecache.go
@@ -6,13 +6,23 @@ import (
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
 )
 
+// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
+type FlushWriteCachePrm struct {
+	ignoreErrors bool
+}
+
+// SetIgnoreErrors sets the flag to ignore read-errors during flush.
+func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
+	p.ignoreErrors = ignore
+}
+
 // errWriteCacheDisabled is returned when an operation on write-cache is performed,
 // but write-cache is disabled.
 var errWriteCacheDisabled = errors.New("write-cache is disabled")
 
 // FlushWriteCache moves writecache in read-only mode and flushes all data from it.
 // After the operation writecache will remain read-only mode.
-func (s *Shard) FlushWriteCache() error {
+func (s *Shard) FlushWriteCache(p FlushWriteCachePrm) error {
 	if !s.hasWriteCache() {
 		return errWriteCacheDisabled
 	}
@@ -32,5 +42,5 @@ func (s *Shard) FlushWriteCache() error {
 		return err
 	}
 
-	return s.writeCache.Flush()
+	return s.writeCache.Flush(p.ignoreErrors)
 }
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index 12c1bb3c2..ef7cf8c5b 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -233,7 +233,7 @@ func (c *cache) flushObject(obj *object.Object) error {
 // Flush flushes all objects from the write-cache to the main storage.
 // Write-cache must be in readonly mode to ensure correctness of an operation and
 // to prevent interference with background flush workers.
-func (c *cache) Flush() error {
+func (c *cache) Flush(ignoreErrors bool) error {
 	c.modeMtx.RLock()
 	defer c.modeMtx.RUnlock()
 
@@ -242,6 +242,7 @@ func (c *cache) Flush() error {
 	}
 
 	var prm common.IteratePrm
+	prm.IgnoreErrors = ignoreErrors
 	prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
 		_, ok := c.flushed.Peek(addr.EncodeToString())
 		if ok {
@@ -250,12 +251,18 @@ func (c *cache) Flush() error {
 
 		data, err := f()
 		if err != nil {
+			if ignoreErrors {
+				return nil
+			}
 			return err
 		}
 
 		var obj object.Object
 		err = obj.Unmarshal(data)
 		if err != nil {
+			if ignoreErrors {
+				return nil
+			}
 			return err
 		}
 
@@ -279,11 +286,17 @@ func (c *cache) Flush() error {
 			}
 
 			if err := addr.DecodeString(sa); err != nil {
+				if ignoreErrors {
+					continue
+				}
 				return err
 			}
 
 			var obj object.Object
 			if err := obj.Unmarshal(data); err != nil {
+				if ignoreErrors {
+					continue
+				}
 				return err
 			}
 
diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go
index 7ada3cca5..2044b429e 100644
--- a/pkg/local_object_storage/writecache/flush_test.go
+++ b/pkg/local_object_storage/writecache/flush_test.go
@@ -1,6 +1,7 @@
 package writecache
 
 import (
+	"os"
 	"path/filepath"
 	"testing"
 
@@ -18,6 +19,7 @@ import (
 	usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
 	versionSDK "github.com/nspcc-dev/neofs-sdk-go/version"
 	"github.com/stretchr/testify/require"
+	"go.etcd.io/bbolt"
 	"go.uber.org/zap/zaptest"
 )
 
@@ -27,105 +29,191 @@ func TestFlush(t *testing.T) {
 		smallSize = 256
 	)
 
-	dir := t.TempDir()
-	mb := meta.New(
-		meta.WithPath(filepath.Join(dir, "meta")),
-		meta.WithEpochState(dummyEpoch{}))
-	require.NoError(t, mb.Open(false))
-	require.NoError(t, mb.Init())
-
-	fsTree := fstree.New(fstree.WithPath(filepath.Join(dir, "blob")))
-	bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
-		{Storage: fsTree},
-	}))
-	require.NoError(t, bs.Open(false))
-	require.NoError(t, bs.Init())
-
-	wc := New(
-		WithLogger(zaptest.NewLogger(t)),
-		WithPath(filepath.Join(dir, "writecache")),
-		WithSmallObjectSize(smallSize),
-		WithMetabase(mb),
-		WithBlobstor(bs))
-	require.NoError(t, wc.Open(false))
-	require.NoError(t, wc.Init())
-
-	// First set mode for metabase and blobstor to prevent background flushes.
-	require.NoError(t, mb.SetMode(mode.ReadOnly))
-	require.NoError(t, bs.SetMode(mode.ReadOnly))
-
 	type objectPair struct {
 		addr oid.Address
 		obj  *object.Object
 	}
 
-	objects := make([]objectPair, objCount)
-	for i := range objects {
-		obj := object.New()
-		ver := versionSDK.Current()
+	newCache := func(t *testing.T) (Cache, *blobstor.BlobStor, *meta.DB) {
+		dir := t.TempDir()
+		mb := meta.New(
+			meta.WithPath(filepath.Join(dir, "meta")),
+			meta.WithEpochState(dummyEpoch{}))
+		require.NoError(t, mb.Open(false))
+		require.NoError(t, mb.Init())
 
-		obj.SetID(oidtest.ID())
-		obj.SetOwnerID(usertest.ID())
-		obj.SetContainerID(cidtest.ID())
-		obj.SetType(object.TypeRegular)
-		obj.SetVersion(&ver)
-		obj.SetPayloadChecksum(checksumtest.Checksum())
-		obj.SetPayloadHomomorphicHash(checksumtest.Checksum())
-		obj.SetPayload(make([]byte, 1+(i%2)*smallSize))
+		fsTree := fstree.New(
+			fstree.WithPath(filepath.Join(dir, "blob")),
+			fstree.WithDepth(0),
+			fstree.WithDirNameLen(1))
+		bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
+			{Storage: fsTree},
+		}))
+		require.NoError(t, bs.Open(false))
+		require.NoError(t, bs.Init())
 
-		addr := objectCore.AddressOf(obj)
-		data, err := obj.Marshal()
-		require.NoError(t, err)
+		wc := New(
+			WithLogger(zaptest.NewLogger(t)),
+			WithPath(filepath.Join(dir, "writecache")),
+			WithSmallObjectSize(smallSize),
+			WithMetabase(mb),
+			WithBlobstor(bs))
+		require.NoError(t, wc.Open(false))
+		require.NoError(t, wc.Init())
 
-		var prm common.PutPrm
-		prm.Address = objectCore.AddressOf(obj)
-		prm.Object = obj
-		prm.RawData = data
+		// First set mode for metabase and blobstor to prevent background flushes.
+		require.NoError(t, mb.SetMode(mode.ReadOnly))
+		require.NoError(t, bs.SetMode(mode.ReadOnly))
 
-		_, err = wc.Put(prm)
-		require.NoError(t, err)
-
-		objects[i] = objectPair{addr: addr, obj: obj}
+		return wc, bs, mb
 	}
 
-	t.Run("must be read-only", func(t *testing.T) {
-		require.ErrorIs(t, wc.Flush(), errMustBeReadOnly)
+	putObjects := func(t *testing.T, c Cache) []objectPair {
+		objects := make([]objectPair, objCount)
+		for i := range objects {
+			obj, data := newObject(t, 1+(i%2)*smallSize)
+			addr := objectCore.AddressOf(obj)
+
+			var prm common.PutPrm
+			prm.Address = objectCore.AddressOf(obj)
+			prm.Object = obj
+			prm.RawData = data
+
+			_, err := c.Put(prm)
+			require.NoError(t, err)
+
+			objects[i] = objectPair{addr: addr, obj: obj}
+		}
+		return objects
+	}
+
+	check := func(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
+		for i := range objects {
+			var mPrm meta.StorageIDPrm
+			mPrm.SetAddress(objects[i].addr)
+
+			mRes, err := mb.StorageID(mPrm)
+			require.NoError(t, err)
+
+			var prm common.GetPrm
+			prm.Address = objects[i].addr
+			prm.StorageID = mRes.StorageID()
+
+			res, err := bs.Get(prm)
+			require.NoError(t, err)
+			require.Equal(t, objects[i].obj, res.Object)
+		}
+	}
+
+	t.Run("no errors", func(t *testing.T) {
+		wc, bs, mb := newCache(t)
+		objects := putObjects(t, wc)
+
+		t.Run("must be read-only", func(t *testing.T) {
+			require.ErrorIs(t, wc.Flush(false), errMustBeReadOnly)
+		})
+
+		require.NoError(t, wc.SetMode(mode.ReadOnly))
+		require.NoError(t, bs.SetMode(mode.ReadWrite))
+		require.NoError(t, mb.SetMode(mode.ReadWrite))
+
+		wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
+		wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
+
+		require.NoError(t, wc.Flush(false))
+
+		for i := 0; i < 2; i++ {
+			var mPrm meta.GetPrm
+			mPrm.SetAddress(objects[i].addr)
+			_, err := mb.Get(mPrm)
+			require.Error(t, err)
+
+			_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
+			require.Error(t, err)
+		}
+
+		check(t, mb, bs, objects[2:])
 	})
 
-	require.NoError(t, wc.SetMode(mode.ReadOnly))
-	require.NoError(t, bs.SetMode(mode.ReadWrite))
-	require.NoError(t, mb.SetMode(mode.ReadWrite))
+	t.Run("ignore errors", func(t *testing.T) {
+		testIgnoreErrors := func(t *testing.T, f func(*cache)) {
+			wc, bs, mb := newCache(t)
+			objects := putObjects(t, wc)
+			f(wc.(*cache))
 
-	wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
-	wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
+			require.NoError(t, wc.SetMode(mode.ReadOnly))
+			require.NoError(t, bs.SetMode(mode.ReadWrite))
+			require.NoError(t, mb.SetMode(mode.ReadWrite))
 
-	require.NoError(t, wc.Flush())
+			require.Error(t, wc.Flush(false))
+			require.NoError(t, wc.Flush(true))
 
-	for i := 0; i < 2; i++ {
-		var mPrm meta.GetPrm
-		mPrm.SetAddress(objects[i].addr)
-		_, err := mb.Get(mPrm)
-		require.Error(t, err)
+			check(t, mb, bs, objects)
+		}
+		t.Run("db, invalid address", func(t *testing.T) {
+			testIgnoreErrors(t, func(c *cache) {
+				_, data := newObject(t, 1)
+				require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
+					b := tx.Bucket(defaultBucket)
+					return b.Put([]byte{1, 2, 3}, data)
+				}))
+			})
+		})
+		t.Run("db, invalid object", func(t *testing.T) {
+			testIgnoreErrors(t, func(c *cache) {
+				require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
+					b := tx.Bucket(defaultBucket)
+					return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
+				}))
+			})
+		})
+		t.Run("fs, read error", func(t *testing.T) {
+			testIgnoreErrors(t, func(c *cache) {
+				obj, data := newObject(t, 1)
 
-		_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
-		require.Error(t, err)
-	}
+				var prm common.PutPrm
+				prm.Address = objectCore.AddressOf(obj)
+				prm.RawData = data
 
-	for i := 2; i < objCount; i++ {
-		var mPrm meta.StorageIDPrm
-		mPrm.SetAddress(objects[i].addr)
+				_, err := c.fsTree.Put(prm)
+				require.NoError(t, err)
 
-		mRes, err := mb.StorageID(mPrm)
-		require.NoError(t, err)
+				p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString()
+				p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:])
 
-		var prm common.GetPrm
-		prm.Address = objects[i].addr
-		prm.StorageID = mRes.StorageID()
+				_, err = os.Stat(p) // sanity check
+				require.NoError(t, err)
+				require.NoError(t, os.Chmod(p, 0))
+			})
+		})
+		t.Run("fs, invalid object", func(t *testing.T) {
+			testIgnoreErrors(t, func(c *cache) {
+				var prm common.PutPrm
+				prm.Address = oidtest.Address()
+				prm.RawData = []byte{1, 2, 3}
+				_, err := c.fsTree.Put(prm)
+				require.NoError(t, err)
+			})
+		})
+	})
+}
 
-		res, err := bs.Get(prm)
-		require.NoError(t, err)
-		require.Equal(t, objects[i].obj, res.Object)
-	}
+func newObject(t *testing.T, size int) (*object.Object, []byte) {
+	obj := object.New()
+	ver := versionSDK.Current()
+
+	obj.SetID(oidtest.ID())
+	obj.SetOwnerID(usertest.ID())
+	obj.SetContainerID(cidtest.ID())
+	obj.SetType(object.TypeRegular)
+	obj.SetVersion(&ver)
+	obj.SetPayloadChecksum(checksumtest.Checksum())
+	obj.SetPayloadHomomorphicHash(checksumtest.Checksum())
+	obj.SetPayload(make([]byte, size))
+
+	data, err := obj.Marshal()
+	require.NoError(t, err)
+	return obj, data
 }
 
 type dummyEpoch struct{}
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index e91666a3e..8d2f0d754 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -28,7 +28,7 @@ type Cache interface {
 	SetMode(mode.Mode) error
 	SetLogger(*zap.Logger)
 	DumpInfo() Info
-	Flush() error
+	Flush(bool) error
 
 	Init() error
 	Open(readOnly bool) error