[#1085] shard: allow to ignore errors during restore

We could also ignore errors during evacuate, but this requires
unmarshaling objects first which slowers the process considerably.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-01-20 13:45:29 +03:00 committed by LeL
parent 0ef3d5ab03
commit 9c60ab893c
2 changed files with 34 additions and 6 deletions

View file

@ -143,11 +143,21 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) {
}) })
t.Run("invalid object", func(t *testing.T) { t.Run("invalid object", func(t *testing.T) {
out := out + ".wrongobj" out := out + ".wrongobj"
fileData := append(fileData, 1, 0, 0, 0, 0xFF) fileData := append(fileData, 1, 0, 0, 0, 0xFF, 4, 0, 0, 0, 1, 2, 3, 4)
require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm)) require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm))
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out)) _, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
require.Error(t, err) require.Error(t, err)
t.Run("skip errors", func(t *testing.T) {
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false)
defer releaseShard(sh, t)
res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true))
require.NoError(t, err)
require.Equal(t, objCount, res.Count())
require.Equal(t, 2, res.FailCount())
})
}) })
}) })

View file

@ -15,8 +15,9 @@ var ErrInvalidMagic = errors.New("invalid magic")
// RestorePrm groups the parameters of Restore operation. // RestorePrm groups the parameters of Restore operation.
type RestorePrm struct { type RestorePrm struct {
path string path string
stream io.Reader stream io.Reader
ignoreErrors bool
} }
// WithPath is a Restore option to set the destination path. // WithPath is a Restore option to set the destination path.
@ -32,9 +33,17 @@ func (p *RestorePrm) WithStream(r io.Reader) *RestorePrm {
return p return p
} }
// WithIgnoreErrors is a Restore option which allows to ignore errors encountered during restore.
// Corrupted objects will not be processed.
func (p *RestorePrm) WithIgnoreErrors(ignore bool) *RestorePrm {
p.ignoreErrors = ignore
return p
}
// RestoreRes groups the result fields of Restore operation. // RestoreRes groups the result fields of Restore operation.
type RestoreRes struct { type RestoreRes struct {
count int count int
failed int
} }
// Count return amount of object written. // Count return amount of object written.
@ -42,6 +51,11 @@ func (r *RestoreRes) Count() int {
return r.count return r.count
} }
// FailCount return amount of object skipped.
func (r *RestoreRes) FailCount() int {
return r.failed
}
// Restore restores objects from the dump prepared by Evacuate. // Restore restores objects from the dump prepared by Evacuate.
// //
// Returns any error encountered. // Returns any error encountered.
@ -71,7 +85,7 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
return nil, ErrInvalidMagic return nil, ErrInvalidMagic
} }
var count int var count, failCount int
var data []byte var data []byte
var size [4]byte var size [4]byte
for { for {
@ -100,6 +114,10 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
obj := object.New() obj := object.New()
err = obj.Unmarshal(data) err = obj.Unmarshal(data)
if err != nil { if err != nil {
if prm.ignoreErrors {
failCount++
continue
}
return nil, err return nil, err
} }
@ -111,5 +129,5 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
count++ count++
} }
return &RestoreRes{count: count}, nil return &RestoreRes{count: count, failed: failCount}, nil
} }