blobstor/test: Cover iteration behaviour #1417
1 changed files with 59 additions and 113 deletions
|
@ -3,10 +3,13 @@ package blobstor
|
|||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -90,117 +93,60 @@ func TestIterateObjects(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIterate_IgnoreErrors(t *testing.T) {
|
||||
t.Skip()
|
||||
// dir := t.TempDir()
|
||||
//
|
||||
// const (
|
||||
// smallSize = 512
|
||||
// objCount = 5
|
||||
// )
|
||||
// bsOpts := []Option{
|
||||
// WithCompressObjects(true),
|
||||
// WithRootPath(dir),
|
||||
// WithSmallSizeLimit(smallSize * 2), // + header
|
||||
// WithBlobovniczaOpenedCacheSize(1),
|
||||
// WithBlobovniczaShallowWidth(1),
|
||||
// WithBlobovniczaShallowDepth(1)}
|
||||
// bs := New(bsOpts...)
|
||||
// require.NoError(t, bs.Open(false))
|
||||
// require.NoError(t, bs.Init())
|
||||
//
|
||||
// addrs := make([]oid.Address, objCount)
|
||||
// for i := range addrs {
|
||||
// addrs[i] = oidtest.Address()
|
||||
//
|
||||
// obj := object.New()
|
||||
// obj.SetContainerID(addrs[i].Container())
|
||||
// obj.SetID(addrs[i].Object())
|
||||
// obj.SetPayload(make([]byte, smallSize<<(i%2)))
|
||||
//
|
||||
// objData, err := obj.Marshal()
|
||||
// require.NoError(t, err)
|
||||
//
|
||||
// _, err = bs.PutRaw(addrs[i], objData, true)
|
||||
// require.NoError(t, err)
|
||||
// }
|
||||
//
|
||||
// // Construct corrupted compressed object.
|
||||
// buf := bytes.NewBuffer(nil)
|
||||
// badObject := make([]byte, smallSize/2+1)
|
||||
// enc, err := zstd.NewWriter(buf)
|
||||
// require.NoError(t, err)
|
||||
// rawData := enc.EncodeAll(badObject, nil)
|
||||
// for i := 4; /* magic size */ i < len(rawData); i += 2 {
|
||||
// rawData[i] ^= 0xFF
|
||||
// }
|
||||
// // Will be put uncompressed but fetched as compressed because of magic.
|
||||
// _, err = bs.PutRaw(oidtest.Address(), rawData, false)
|
||||
// require.NoError(t, err)
|
||||
// require.NoError(t, bs.fsTree.Put(oidtest.Address(), rawData))
|
||||
//
|
||||
// require.NoError(t, bs.Close())
|
||||
//
|
||||
// // Increase width to have blobovnicza which is definitely empty.
|
||||
// b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...)
|
||||
// require.NoError(t, b.Open(false))
|
||||
// require.NoError(t, b.Init())
|
||||
//
|
||||
// var p string
|
||||
// for i := 0; i < 2; i++ {
|
||||
// bp := filepath.Join(bs.rootPath, "1", strconv.FormatUint(uint64(i), 10))
|
||||
// if _, ok := bs.blobovniczas.opened.Get(bp); !ok {
|
||||
// p = bp
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache")
|
||||
// require.NoError(t, os.Chmod(p, 0))
|
||||
//
|
||||
// require.NoError(t, b.Close())
|
||||
// require.NoError(t, bs.Open(false))
|
||||
// require.NoError(t, bs.Init())
|
||||
//
|
||||
// var prm IteratePrm
|
||||
// prm.SetIterationHandler(func(e IterationElement) error {
|
||||
// return nil
|
||||
// })
|
||||
// _, err = bs.Iterate(prm)
|
||||
// require.Error(t, err)
|
||||
//
|
||||
// prm.IgnoreErrors()
|
||||
//
|
||||
// t.Run("skip invalid objects", func(t *testing.T) {
|
||||
// actual := make([]oid.Address, 0, len(addrs))
|
||||
// prm.SetIterationHandler(func(e IterationElement) error {
|
||||
// obj := object.New()
|
||||
// err := obj.Unmarshal(e.data)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// var addr oid.Address
|
||||
// cnr, _ := obj.ContainerID()
|
||||
// addr.SetContainer(cnr)
|
||||
// id, _ := obj.ID()
|
||||
// addr.SetObject(id)
|
||||
// actual = append(actual, addr)
|
||||
// return nil
|
||||
// })
|
||||
//
|
||||
// _, err := bs.Iterate(prm)
|
||||
// require.NoError(t, err)
|
||||
// require.ElementsMatch(t, addrs, actual)
|
||||
// })
|
||||
// t.Run("return errors from handler", func(t *testing.T) {
|
||||
// n := 0
|
||||
// expectedErr := errors.New("expected error")
|
||||
// prm.SetIterationHandler(func(e IterationElement) error {
|
||||
// if n++; n == objCount/2 {
|
||||
// return expectedErr
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// _, err := bs.Iterate(prm)
|
||||
// require.ErrorIs(t, err, expectedErr)
|
||||
// })
|
||||
ctx := context.Background()
|
||||
|
||||
myErr := errors.New("unique error")
|
||||
nopIter := func(common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, nil }
|
||||
panicIter := func(common.IteratePrm) (common.IterateRes, error) { panic("unreachable") }
|
||||
errIter := func(common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, myErr }
|
||||
|
||||
var s1iter, s2iter func(common.IteratePrm) (common.IterateRes, error)
|
||||
st1 := teststore.New(
|
||||
teststore.WithSubstorage(memstore.New()),
|
||||
teststore.WithIterate(func(prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return s1iter(prm)
|
||||
}))
|
||||
st2 := teststore.New(
|
||||
teststore.WithSubstorage(memstore.New()),
|
||||
teststore.WithIterate(func(prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return s2iter(prm)
|
||||
}))
|
||||
|
||||
bsOpts := []Option{WithStorages([]SubStorage{
|
||||
{Storage: st1},
|
||||
{Storage: st2},
|
||||
})}
|
||||
bs := New(bsOpts...)
|
||||
require.NoError(t, bs.Open(ctx, mode.ReadWrite))
|
||||
require.NoError(t, bs.Init())
|
||||
|
||||
nopHandler := func(e common.IterationElement) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
s1iter = nopIter
|
||||
s2iter = nopIter
|
||||
_, err := bs.Iterate(ctx, common.IteratePrm{Handler: nopHandler})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
t.Run("error in the first sub storage, the second one is not iterated over", func(t *testing.T) {
|
||||
s1iter = errIter
|
||||
s2iter = panicIter
|
||||
_, err := bs.Iterate(ctx, common.IteratePrm{Handler: nopHandler})
|
||||
require.ErrorIs(t, err, myErr)
|
||||
})
|
||||
|
||||
t.Run("ignore errors, storage 1", func(t *testing.T) {
|
||||
s1iter = errIter
|
||||
s2iter = nopIter
|
||||
_, err := bs.Iterate(ctx, common.IteratePrm{IgnoreErrors: true, Handler: nopHandler})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
t.Run("ignore errors, storage 2", func(t *testing.T) {
|
||||
s1iter = nopIter
|
||||
s2iter = errIter
|
||||
_, err := bs.Iterate(ctx, common.IteratePrm{IgnoreErrors: true, Handler: nopHandler})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue