forked from TrueCloudLab/frostfs-node
[#1085] shard: allow to ignore errors in Evacuate
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
36eebb5932
commit
d06425c852
4 changed files with 152 additions and 14 deletions
|
@ -16,6 +16,7 @@ var dumpMagic = []byte("NEOF")
|
||||||
type EvacuatePrm struct {
|
type EvacuatePrm struct {
|
||||||
path string
|
path string
|
||||||
stream io.Writer
|
stream io.Writer
|
||||||
|
ignoreErrors bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithPath is an Evacuate option to set the destination path.
|
// WithPath is an Evacuate option to set the destination path.
|
||||||
|
@ -31,6 +32,13 @@ func (p *EvacuatePrm) WithStream(r io.Writer) *EvacuatePrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithIgnoreErrors is an Evacuate option to allow ignore all errors during iteration.
|
||||||
|
// This includes invalid blobovniczas as well as corrupted objects.
|
||||||
|
func (p *EvacuatePrm) WithIgnoreErrors(ignore bool) *EvacuatePrm {
|
||||||
|
p.ignoreErrors = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// EvacuateRes groups the result fields of Evacuate operation.
|
// EvacuateRes groups the result fields of Evacuate operation.
|
||||||
type EvacuateRes struct {
|
type EvacuateRes struct {
|
||||||
count int
|
count int
|
||||||
|
@ -86,7 +94,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
|
||||||
|
|
||||||
count++
|
count++
|
||||||
return nil
|
return nil
|
||||||
}))
|
}).WithIgnoreErrors(prm.ignoreErrors))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -94,6 +102,9 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
|
||||||
|
|
||||||
var pi blobstor.IteratePrm
|
var pi blobstor.IteratePrm
|
||||||
|
|
||||||
|
if prm.ignoreErrors {
|
||||||
|
pi.IgnoreErrors()
|
||||||
|
}
|
||||||
pi.SetIterationHandler(func(elem blobstor.IterationElement) error {
|
pi.SetIterationHandler(func(elem blobstor.IterationElement) error {
|
||||||
data := elem.ObjectData()
|
data := elem.ObjectData()
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package shard_test
|
package shard_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -10,10 +11,15 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -40,8 +46,11 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) {
|
||||||
sh = newShard(t, false)
|
sh = newShard(t, false)
|
||||||
} else {
|
} else {
|
||||||
sh = newCustomShard(t, t.TempDir(), true,
|
sh = newCustomShard(t, t.TempDir(), true,
|
||||||
|
[]writecache.Option{
|
||||||
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
||||||
writecache.WithMaxObjectSize(wcBigObjectSize))
|
writecache.WithMaxObjectSize(wcBigObjectSize),
|
||||||
|
},
|
||||||
|
nil)
|
||||||
}
|
}
|
||||||
defer releaseShard(sh, t)
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
|
@ -150,7 +159,7 @@ func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
t.Run("skip errors", func(t *testing.T) {
|
t.Run("skip errors", func(t *testing.T) {
|
||||||
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false)
|
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil)
|
||||||
defer releaseShard(sh, t)
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true))
|
res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true))
|
||||||
|
@ -229,3 +238,115 @@ func checkRestore(t *testing.T, sh *shard.Shard, prm *shard.RestorePrm, objects
|
||||||
require.Equal(t, objects[i], res.Object())
|
require.Equal(t, objects[i], res.Object())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEvacuateIgnoreErrors(t *testing.T) {
|
||||||
|
const (
|
||||||
|
wcSmallObjectSize = 512 // goes to write-cache memory
|
||||||
|
wcBigObjectSize = wcSmallObjectSize << 1 // goes to write-cache FSTree
|
||||||
|
bsSmallObjectSize = wcSmallObjectSize << 2 // goes to blobovnicza DB
|
||||||
|
|
||||||
|
objCount = 10
|
||||||
|
headerSize = 400
|
||||||
|
)
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
bsPath := filepath.Join(dir, "blob")
|
||||||
|
bsOpts := []blobstor.Option{
|
||||||
|
blobstor.WithSmallSizeLimit(bsSmallObjectSize),
|
||||||
|
blobstor.WithRootPath(bsPath),
|
||||||
|
blobstor.WithCompressObjects(true),
|
||||||
|
blobstor.WithShallowDepth(1),
|
||||||
|
blobstor.WithBlobovniczaShallowDepth(1),
|
||||||
|
blobstor.WithBlobovniczaShallowWidth(2),
|
||||||
|
blobstor.WithBlobovniczaOpenedCacheSize(1),
|
||||||
|
}
|
||||||
|
wcPath := filepath.Join(dir, "writecache")
|
||||||
|
wcOpts := []writecache.Option{
|
||||||
|
writecache.WithPath(wcPath),
|
||||||
|
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
||||||
|
writecache.WithMaxObjectSize(wcBigObjectSize),
|
||||||
|
}
|
||||||
|
sh := newCustomShard(t, dir, true, wcOpts, bsOpts)
|
||||||
|
|
||||||
|
objects := make([]*object.Object, objCount)
|
||||||
|
for i := 0; i < objCount; i++ {
|
||||||
|
size := (wcSmallObjectSize << (i % 4)) - headerSize
|
||||||
|
obj := generateRawObjectWithPayload(cidtest.ID(), make([]byte, size))
|
||||||
|
objects[i] = obj.Object()
|
||||||
|
|
||||||
|
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||||
|
_, err := sh.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseShard(sh, t)
|
||||||
|
|
||||||
|
b := bytes.NewBuffer(nil)
|
||||||
|
badObject := make([]byte, 1000)
|
||||||
|
enc, err := zstd.NewWriter(b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
corruptedData := enc.EncodeAll(badObject, nil)
|
||||||
|
for i := 4; i < len(corruptedData); i++ {
|
||||||
|
corruptedData[i] ^= 0xFF
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are 3 different types of errors to consider.
|
||||||
|
// To setup envirionment we use implementation details so this test must be updated
|
||||||
|
// if any of them are changed.
|
||||||
|
{
|
||||||
|
// 1. Invalid object in fs tree.
|
||||||
|
// 1.1. Invalid compressed data.
|
||||||
|
addr := cidtest.ID().String() + "." + generateOID().String()
|
||||||
|
dirName := filepath.Join(bsPath, addr[:2])
|
||||||
|
require.NoError(t, os.MkdirAll(dirName, os.ModePerm))
|
||||||
|
require.NoError(t, ioutil.WriteFile(filepath.Join(dirName, addr[2:]), corruptedData, os.ModePerm))
|
||||||
|
|
||||||
|
// 1.2. Unreadable file.
|
||||||
|
addr = cidtest.ID().String() + "." + generateOID().String()
|
||||||
|
dirName = filepath.Join(bsPath, addr[:2])
|
||||||
|
require.NoError(t, os.MkdirAll(dirName, os.ModePerm))
|
||||||
|
|
||||||
|
fname := filepath.Join(dirName, addr[2:])
|
||||||
|
require.NoError(t, ioutil.WriteFile(fname, []byte{}, 0))
|
||||||
|
|
||||||
|
// 1.3. Unreadable dir.
|
||||||
|
require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
bsOpts = append(bsOpts, blobstor.WithBlobovniczaShallowWidth(3))
|
||||||
|
sh = newCustomShard(t, dir, true, wcOpts, bsOpts)
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||||
|
|
||||||
|
{
|
||||||
|
// 2. Invalid object in blobovnicza.
|
||||||
|
// 2.1. Invalid blobovnicza.
|
||||||
|
bTree := filepath.Join(bsPath, "blobovnicza")
|
||||||
|
data := make([]byte, 1024)
|
||||||
|
rand.Read(data)
|
||||||
|
require.NoError(t, ioutil.WriteFile(filepath.Join(bTree, "0", "2"), data, 0))
|
||||||
|
|
||||||
|
// 2.2. Invalid object in valid blobovnicza.
|
||||||
|
prm := new(blobovnicza.PutPrm)
|
||||||
|
prm.SetAddress(objectSDK.NewAddress())
|
||||||
|
prm.SetMarshaledObject(corruptedData)
|
||||||
|
b := blobovnicza.New(blobovnicza.WithPath(filepath.Join(bTree, "1", "2")))
|
||||||
|
require.NoError(t, b.Open())
|
||||||
|
_, err := b.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// 3. Invalid object in write-cache. Note that because shard is read-only
|
||||||
|
// the object won't be flushed.
|
||||||
|
addr := cidtest.ID().String() + "." + objecttest.ID().String()
|
||||||
|
dir := filepath.Join(wcPath, addr[:1])
|
||||||
|
require.NoError(t, os.MkdirAll(dir, os.ModePerm))
|
||||||
|
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, addr[1:]), nil, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
out := filepath.Join(t.TempDir(), "out.dump")
|
||||||
|
res, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath(out).WithIgnoreErrors(true))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objCount, res.Count())
|
||||||
|
}
|
||||||
|
|
|
@ -27,10 +27,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
|
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
|
||||||
return newCustomShard(t, t.TempDir(), enableWriteCache, writecache.WithMaxMemSize(0))
|
return newCustomShard(t, t.TempDir(), enableWriteCache,
|
||||||
|
[]writecache.Option{writecache.WithMaxMemSize(0)},
|
||||||
|
nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts ...writecache.Option) *shard.Shard {
|
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard {
|
||||||
if enableWriteCache {
|
if enableWriteCache {
|
||||||
rootPath = path.Join(rootPath, "wc")
|
rootPath = path.Join(rootPath, "wc")
|
||||||
} else {
|
} else {
|
||||||
|
@ -40,16 +42,20 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
|
||||||
opts := []shard.Option{
|
opts := []shard.Option{
|
||||||
shard.WithLogger(zap.L()),
|
shard.WithLogger(zap.L()),
|
||||||
shard.WithBlobStorOptions(
|
shard.WithBlobStorOptions(
|
||||||
|
append([]blobstor.Option{
|
||||||
blobstor.WithRootPath(path.Join(rootPath, "blob")),
|
blobstor.WithRootPath(path.Join(rootPath, "blob")),
|
||||||
blobstor.WithBlobovniczaShallowWidth(2),
|
blobstor.WithBlobovniczaShallowWidth(2),
|
||||||
blobstor.WithBlobovniczaShallowDepth(2),
|
blobstor.WithBlobovniczaShallowDepth(2),
|
||||||
|
}, bsOpts...)...,
|
||||||
),
|
),
|
||||||
shard.WithMetaBaseOptions(
|
shard.WithMetaBaseOptions(
|
||||||
meta.WithPath(path.Join(rootPath, "meta")),
|
meta.WithPath(path.Join(rootPath, "meta")),
|
||||||
),
|
),
|
||||||
shard.WithWriteCache(enableWriteCache),
|
shard.WithWriteCache(enableWriteCache),
|
||||||
shard.WithWriteCacheOptions(
|
shard.WithWriteCacheOptions(
|
||||||
append(wcOpts, writecache.WithPath(path.Join(rootPath, "wcache")))...,
|
append(
|
||||||
|
[]writecache.Option{writecache.WithPath(path.Join(rootPath, "wcache"))},
|
||||||
|
wcOpts...)...,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
||||||
writecache.WithSmallObjectSize(smallSize),
|
writecache.WithSmallObjectSize(smallSize),
|
||||||
writecache.WithMaxObjectSize(smallSize * 2)}
|
writecache.WithMaxObjectSize(smallSize * 2)}
|
||||||
|
|
||||||
sh := newCustomShard(t, dir, true, wcOpts...)
|
sh := newCustomShard(t, dir, true, wcOpts, nil)
|
||||||
|
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
_, err := sh.Put(new(shard.PutPrm).WithObject(objects[i]))
|
_, err := sh.Put(new(shard.PutPrm).WithObject(objects[i]))
|
||||||
|
@ -42,7 +42,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
|
||||||
sh = newCustomShard(t, dir, true, wcOpts...)
|
sh = newCustomShard(t, dir, true, wcOpts, nil)
|
||||||
defer releaseShard(sh, t)
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
|
|
Loading…
Reference in a new issue