[#1085] blobstor: allow to ignore errors during iteration
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
c954f0e71b
commit
cd75638ce3
3 changed files with 134 additions and 5 deletions
|
@ -631,10 +631,13 @@ func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over all blobovniczas in unsorted order. Break on f's error return.
|
// iterator over all blobovniczas in unsorted order. Break on f's error return.
|
||||||
func (b *blobovniczas) iterateBlobovniczas(f func(string, *blobovnicza.Blobovnicza) error) error {
|
func (b *blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||||
return b.iterateLeaves(func(p string) (bool, error) {
|
return b.iterateLeaves(func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovnicza(p)
|
blz, err := b.openBlobovnicza(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if ignoreErrors {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -811,7 +814,7 @@ func (b *blobovniczas) init() error {
|
||||||
return zstdD(data)
|
return zstdD(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
return b.iterateBlobovniczas(false, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
if err := blz.Init(); err != nil {
|
if err := blz.Init(); err != nil {
|
||||||
return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ type IterationHandler func(IterationElement) error
|
||||||
// IteratePrm groups the parameters of Iterate operation.
|
// IteratePrm groups the parameters of Iterate operation.
|
||||||
type IteratePrm struct {
|
type IteratePrm struct {
|
||||||
handler IterationHandler
|
handler IterationHandler
|
||||||
|
ignoreErrors bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateRes groups resulting values of Iterate operation.
|
// IterateRes groups resulting values of Iterate operation.
|
||||||
|
@ -43,6 +44,11 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) {
|
||||||
i.handler = h
|
i.handler = h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IgnoreErrors sets the flag signifying whether errors should be ignored.
|
||||||
|
func (i *IteratePrm) IgnoreErrors() {
|
||||||
|
i.ignoreErrors = true
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate traverses the storage over the stored objects and calls the handler
|
// Iterate traverses the storage over the stored objects and calls the handler
|
||||||
// on each element.
|
// on each element.
|
||||||
//
|
//
|
||||||
|
@ -53,13 +59,16 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) {
|
||||||
func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
var elem IterationElement
|
var elem IterationElement
|
||||||
|
|
||||||
err := b.blobovniczas.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
err := blobovnicza.IterateObjects(blz, func(data []byte) error {
|
err := blobovnicza.IterateObjects(blz, func(data []byte) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if prm.ignoreErrors {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,11 +92,15 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if prm.ignoreErrors {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
}))
|
}).WithIgnoreErrors(prm.ignoreErrors))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
|
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
|
||||||
|
@ -89,3 +94,111 @@ func TestIterateObjects(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, mObjs)
|
require.Empty(t, mObjs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIterate_IgnoreErrors(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
const (
|
||||||
|
smallSize = 512
|
||||||
|
objCount = 5
|
||||||
|
)
|
||||||
|
bsOpts := []Option{
|
||||||
|
WithCompressObjects(true),
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithSmallSizeLimit(smallSize * 2), // + header
|
||||||
|
WithBlobovniczaOpenedCacheSize(1),
|
||||||
|
WithBlobovniczaShallowWidth(1),
|
||||||
|
WithBlobovniczaShallowDepth(1)}
|
||||||
|
bs := New(bsOpts...)
|
||||||
|
require.NoError(t, bs.Open())
|
||||||
|
require.NoError(t, bs.Init())
|
||||||
|
|
||||||
|
addrs := make([]*object.Address, objCount)
|
||||||
|
for i := range addrs {
|
||||||
|
addrs[i] = objecttest.Address()
|
||||||
|
obj := object.NewRaw()
|
||||||
|
obj.SetContainerID(addrs[i].ContainerID())
|
||||||
|
obj.SetID(addrs[i].ObjectID())
|
||||||
|
obj.SetPayload(make([]byte, smallSize<<(i%2)))
|
||||||
|
|
||||||
|
objData, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = bs.PutRaw(addrs[i], objData, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct corrupted compressed object.
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
badObject := make([]byte, smallSize/2+1)
|
||||||
|
enc, err := zstd.NewWriter(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rawData := enc.EncodeAll(badObject, nil)
|
||||||
|
for i := 4; /* magic size */ i < len(rawData); i += 2 {
|
||||||
|
rawData[i] ^= 0xFF
|
||||||
|
}
|
||||||
|
// Will be put uncompressed but fetched as compressed because of magic.
|
||||||
|
_, err = bs.PutRaw(objecttest.Address(), rawData, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, bs.fsTree.Put(objecttest.Address(), rawData))
|
||||||
|
|
||||||
|
require.NoError(t, bs.Close())
|
||||||
|
|
||||||
|
// Increase width to have blobovnicza which is definitely empty.
|
||||||
|
b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...)
|
||||||
|
require.NoError(t, b.Open())
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
var p string
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
bp := filepath.Join(bs.blzRootPath, "1", strconv.FormatUint(uint64(i), 10))
|
||||||
|
if _, ok := bs.blobovniczas.opened.Get(bp); !ok {
|
||||||
|
p = bp
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache")
|
||||||
|
require.NoError(t, os.Chmod(p, 0))
|
||||||
|
|
||||||
|
var prm IteratePrm
|
||||||
|
prm.SetIterationHandler(func(e IterationElement) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
_, err = bs.Iterate(prm)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
prm.IgnoreErrors()
|
||||||
|
|
||||||
|
t.Run("skip invalid objects", func(t *testing.T) {
|
||||||
|
actual := make([]*object.Address, 0, len(addrs))
|
||||||
|
prm.SetIterationHandler(func(e IterationElement) error {
|
||||||
|
obj := object.New()
|
||||||
|
err := obj.Unmarshal(e.data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
addr := object.NewAddress()
|
||||||
|
addr.SetContainerID(obj.ContainerID())
|
||||||
|
addr.SetObjectID(obj.ID())
|
||||||
|
actual = append(actual, addr)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := bs.Iterate(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.ElementsMatch(t, addrs, actual)
|
||||||
|
})
|
||||||
|
t.Run("return errors from handler", func(t *testing.T) {
|
||||||
|
n := 0
|
||||||
|
expectedErr := errors.New("expected error")
|
||||||
|
prm.SetIterationHandler(func(e IterationElement) error {
|
||||||
|
if n++; n == objCount/2 {
|
||||||
|
return expectedErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
_, err := bs.Iterate(prm)
|
||||||
|
require.True(t, errors.Is(err, expectedErr), "got: %v")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue