From 9c60ab893cecb49b5e99eedadc50a877af5d4984 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@nspcc.ru>
Date: Thu, 20 Jan 2022 13:45:29 +0300
Subject: [PATCH] [#1085] shard: allow to ignore errors during restore

We could also ignore errors during evacuate, but this requires
unmarshaling objects first which slowers the process considerably.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
---
 .../shard/evacuate_test.go                    | 12 +++++++-
 pkg/local_object_storage/shard/restore.go     | 28 +++++++++++++++----
 2 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/pkg/local_object_storage/shard/evacuate_test.go b/pkg/local_object_storage/shard/evacuate_test.go
index 3f5f74a21..1c7658ffe 100644
--- a/pkg/local_object_storage/shard/evacuate_test.go
+++ b/pkg/local_object_storage/shard/evacuate_test.go
@@ -143,11 +143,21 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) {
 			})
 			t.Run("invalid object", func(t *testing.T) {
 				out := out + ".wrongobj"
-				fileData := append(fileData, 1, 0, 0, 0, 0xFF)
+				fileData := append(fileData, 1, 0, 0, 0, 0xFF, 4, 0, 0, 0, 1, 2, 3, 4)
 				require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm))
 
 				_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
 				require.Error(t, err)
+
+				t.Run("skip errors", func(t *testing.T) {
+					sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false)
+					defer releaseShard(sh, t)
+
+					res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true))
+					require.NoError(t, err)
+					require.Equal(t, objCount, res.Count())
+					require.Equal(t, 2, res.FailCount())
+				})
 			})
 		})
 
diff --git a/pkg/local_object_storage/shard/restore.go b/pkg/local_object_storage/shard/restore.go
index e3eac3a10..3cd2778c3 100644
--- a/pkg/local_object_storage/shard/restore.go
+++ b/pkg/local_object_storage/shard/restore.go
@@ -15,8 +15,9 @@ var ErrInvalidMagic = errors.New("invalid magic")
 
 // RestorePrm groups the parameters of Restore operation.
 type RestorePrm struct {
-	path   string
-	stream io.Reader
+	path         string
+	stream       io.Reader
+	ignoreErrors bool
 }
 
 // WithPath is a Restore option to set the destination path.
@@ -32,9 +33,17 @@ func (p *RestorePrm) WithStream(r io.Reader) *RestorePrm {
 	return p
 }
 
+// WithIgnoreErrors is a Restore option which allows to ignore errors encountered during restore.
+// Corrupted objects will not be processed.
+func (p *RestorePrm) WithIgnoreErrors(ignore bool) *RestorePrm {
+	p.ignoreErrors = ignore
+	return p
+}
+
 // RestoreRes groups the result fields of Restore operation.
 type RestoreRes struct {
-	count int
+	count  int
+	failed int
 }
 
 // Count return amount of object written.
@@ -42,6 +51,11 @@ func (r *RestoreRes) Count() int {
 	return r.count
 }
 
+// FailCount return amount of object skipped.
+func (r *RestoreRes) FailCount() int {
+	return r.failed
+}
+
 // Restore restores objects from the dump prepared by Evacuate.
 //
 // Returns any error encountered.
@@ -71,7 +85,7 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
 		return nil, ErrInvalidMagic
 	}
 
-	var count int
+	var count, failCount int
 	var data []byte
 	var size [4]byte
 	for {
@@ -100,6 +114,10 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
 		obj := object.New()
 		err = obj.Unmarshal(data)
 		if err != nil {
+			if prm.ignoreErrors {
+				failCount++
+				continue
+			}
 			return nil, err
 		}
 
@@ -111,5 +129,5 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
 		count++
 	}
 
-	return &RestoreRes{count: count}, nil
+	return &RestoreRes{count: count, failed: failCount}, nil
 }