diff --git a/pkg/local_object_storage/shard/evacuate.go b/pkg/local_object_storage/shard/evacuate.go index 0920eeec3b..75fca8aa63 100644 --- a/pkg/local_object_storage/shard/evacuate.go +++ b/pkg/local_object_storage/shard/evacuate.go @@ -14,8 +14,9 @@ var dumpMagic = []byte("NEOF") // EvacuatePrm groups the parameters of Evacuate operation. type EvacuatePrm struct { - path string - stream io.Writer + path string + stream io.Writer + ignoreErrors bool } // WithPath is an Evacuate option to set the destination path. @@ -31,6 +32,13 @@ func (p *EvacuatePrm) WithStream(r io.Writer) *EvacuatePrm { return p } +// WithIgnoreErrors is an Evacuate option to allow ignore all errors during iteration. +// This includes invalid blobovniczas as well as corrupted objects. +func (p *EvacuatePrm) WithIgnoreErrors(ignore bool) *EvacuatePrm { + p.ignoreErrors = ignore + return p +} + // EvacuateRes groups the result fields of Evacuate operation. type EvacuateRes struct { count int @@ -86,7 +94,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { count++ return nil - })) + }).WithIgnoreErrors(prm.ignoreErrors)) if err != nil { return nil, err } @@ -94,6 +102,9 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { var pi blobstor.IteratePrm + if prm.ignoreErrors { + pi.IgnoreErrors() + } pi.SetIterationHandler(func(elem blobstor.IterationElement) error { data := elem.ObjectData() diff --git a/pkg/local_object_storage/shard/evacuate_test.go b/pkg/local_object_storage/shard/evacuate_test.go index 1c7658ffe3..c15c4f0276 100644 --- a/pkg/local_object_storage/shard/evacuate_test.go +++ b/pkg/local_object_storage/shard/evacuate_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "bytes" "errors" "io" "io/ioutil" @@ -10,10 +11,15 @@ import ( "testing" "time" + "github.com/klauspost/compress/zstd" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" ) @@ -40,8 +46,11 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) { sh = newShard(t, false) } else { sh = newCustomShard(t, t.TempDir(), true, - writecache.WithSmallObjectSize(wcSmallObjectSize), - writecache.WithMaxObjectSize(wcBigObjectSize)) + []writecache.Option{ + writecache.WithSmallObjectSize(wcSmallObjectSize), + writecache.WithMaxObjectSize(wcBigObjectSize), + }, + nil) } defer releaseShard(sh, t) @@ -150,7 +159,7 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) { require.Error(t, err) t.Run("skip errors", func(t *testing.T) { - sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false) + sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil) defer releaseShard(sh, t) res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true)) @@ -229,3 +238,115 @@ func checkRestore(t *testing.T, sh *shard.Shard, prm *shard.RestorePrm, objects require.Equal(t, objects[i], res.Object()) } } + +func TestEvacuateIgnoreErrors(t *testing.T) { + const ( + wcSmallObjectSize = 512 // goes to write-cache memory + wcBigObjectSize = wcSmallObjectSize << 1 // goes to write-cache FSTree + bsSmallObjectSize = wcSmallObjectSize << 2 // goes to blobovnicza DB + + objCount = 10 + headerSize = 400 + ) + + dir := t.TempDir() + bsPath := filepath.Join(dir, "blob") + bsOpts := []blobstor.Option{ + blobstor.WithSmallSizeLimit(bsSmallObjectSize), + blobstor.WithRootPath(bsPath), + blobstor.WithCompressObjects(true), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(2), + blobstor.WithBlobovniczaOpenedCacheSize(1), + } + wcPath := filepath.Join(dir, "writecache") + wcOpts := []writecache.Option{ + writecache.WithPath(wcPath), + writecache.WithSmallObjectSize(wcSmallObjectSize), + writecache.WithMaxObjectSize(wcBigObjectSize), + } + sh := newCustomShard(t, dir, true, wcOpts, bsOpts) + + objects := make([]*object.Object, objCount) + for i := 0; i < objCount; i++ { + size := (wcSmallObjectSize << (i % 4)) - headerSize + obj := generateRawObjectWithPayload(cidtest.ID(), make([]byte, size)) + objects[i] = obj.Object() + + prm := new(shard.PutPrm).WithObject(objects[i]) + _, err := sh.Put(prm) + require.NoError(t, err) + } + + releaseShard(sh, t) + + b := bytes.NewBuffer(nil) + badObject := make([]byte, 1000) + enc, err := zstd.NewWriter(b) + require.NoError(t, err) + corruptedData := enc.EncodeAll(badObject, nil) + for i := 4; i < len(corruptedData); i++ { + corruptedData[i] ^= 0xFF + } + + // There are 3 different types of errors to consider. + // To setup envirionment we use implementation details so this test must be updated + // if any of them are changed. + { + // 1. Invalid object in fs tree. + // 1.1. Invalid compressed data. + addr := cidtest.ID().String() + "." + generateOID().String() + dirName := filepath.Join(bsPath, addr[:2]) + require.NoError(t, os.MkdirAll(dirName, os.ModePerm)) + require.NoError(t, ioutil.WriteFile(filepath.Join(dirName, addr[2:]), corruptedData, os.ModePerm)) + + // 1.2. Unreadable file. + addr = cidtest.ID().String() + "." + generateOID().String() + dirName = filepath.Join(bsPath, addr[:2]) + require.NoError(t, os.MkdirAll(dirName, os.ModePerm)) + + fname := filepath.Join(dirName, addr[2:]) + require.NoError(t, ioutil.WriteFile(fname, []byte{}, 0)) + + // 1.3. Unreadable dir. + require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0)) + } + + bsOpts = append(bsOpts, blobstor.WithBlobovniczaShallowWidth(3)) + sh = newCustomShard(t, dir, true, wcOpts, bsOpts) + require.NoError(t, sh.SetMode(shard.ModeReadOnly)) + + { + // 2. Invalid object in blobovnicza. + // 2.1. Invalid blobovnicza. + bTree := filepath.Join(bsPath, "blobovnicza") + data := make([]byte, 1024) + rand.Read(data) + require.NoError(t, ioutil.WriteFile(filepath.Join(bTree, "0", "2"), data, 0)) + + // 2.2. Invalid object in valid blobovnicza. + prm := new(blobovnicza.PutPrm) + prm.SetAddress(objectSDK.NewAddress()) + prm.SetMarshaledObject(corruptedData) + b := blobovnicza.New(blobovnicza.WithPath(filepath.Join(bTree, "1", "2"))) + require.NoError(t, b.Open()) + _, err := b.Put(prm) + require.NoError(t, err) + require.NoError(t, b.Close()) + } + + { + // 3. Invalid object in write-cache. Note that because shard is read-only + // the object won't be flushed. + addr := cidtest.ID().String() + "." + objecttest.ID().String() + dir := filepath.Join(wcPath, addr[:1]) + require.NoError(t, os.MkdirAll(dir, os.ModePerm)) + require.NoError(t, ioutil.WriteFile(filepath.Join(dir, addr[1:]), nil, 0)) + } + + out := filepath.Join(t.TempDir(), "out.dump") + res, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath(out).WithIgnoreErrors(true)) + require.NoError(t, err) + require.Equal(t, objCount, res.Count()) +} diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 08905fbd6b..948a5f11d0 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -27,10 +27,12 @@ import ( ) func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { - return newCustomShard(t, t.TempDir(), enableWriteCache, writecache.WithMaxMemSize(0)) + return newCustomShard(t, t.TempDir(), enableWriteCache, + []writecache.Option{writecache.WithMaxMemSize(0)}, + nil) } -func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts ...writecache.Option) *shard.Shard { +func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard { if enableWriteCache { rootPath = path.Join(rootPath, "wc") } else { @@ -40,16 +42,20 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts opts := []shard.Option{ shard.WithLogger(zap.L()), shard.WithBlobStorOptions( - blobstor.WithRootPath(path.Join(rootPath, "blob")), - blobstor.WithBlobovniczaShallowWidth(2), - blobstor.WithBlobovniczaShallowDepth(2), + append([]blobstor.Option{ + blobstor.WithRootPath(path.Join(rootPath, "blob")), + blobstor.WithBlobovniczaShallowWidth(2), + blobstor.WithBlobovniczaShallowDepth(2), + }, bsOpts...)..., ), shard.WithMetaBaseOptions( meta.WithPath(path.Join(rootPath, "meta")), ), shard.WithWriteCache(enableWriteCache), shard.WithWriteCacheOptions( - append(wcOpts, writecache.WithPath(path.Join(rootPath, "wcache")))..., + append( + []writecache.Option{writecache.WithPath(path.Join(rootPath, "wcache"))}, + wcOpts...)..., ), } diff --git a/pkg/local_object_storage/shard/shutdown_test.go b/pkg/local_object_storage/shard/shutdown_test.go index 36d13030a5..097b12126e 100644 --- a/pkg/local_object_storage/shard/shutdown_test.go +++ b/pkg/local_object_storage/shard/shutdown_test.go @@ -34,7 +34,7 @@ func TestWriteCacheObjectLoss(t *testing.T) { writecache.WithSmallObjectSize(smallSize), writecache.WithMaxObjectSize(smallSize * 2)} - sh := newCustomShard(t, dir, true, wcOpts...) + sh := newCustomShard(t, dir, true, wcOpts, nil) for i := range objects { _, err := sh.Put(new(shard.PutPrm).WithObject(objects[i])) @@ -42,7 +42,7 @@ func TestWriteCacheObjectLoss(t *testing.T) { } require.NoError(t, sh.Close()) - sh = newCustomShard(t, dir, true, wcOpts...) + sh = newCustomShard(t, dir, true, wcOpts, nil) defer releaseShard(sh, t) for i := range objects {