blobstor/test: Cover iteration behaviour #1417

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:blobstor-iter into master 2024-10-07 12:44:28 +00:00

View file

@ -3,10 +3,13 @@ package blobstor
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors"
"os" "os"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -90,117 +93,60 @@ func TestIterateObjects(t *testing.T) {
} }
func TestIterate_IgnoreErrors(t *testing.T) { func TestIterate_IgnoreErrors(t *testing.T) {
t.Skip() ctx := context.Background()
// dir := t.TempDir()
// myErr := errors.New("unique error")
// const ( nopIter := func(common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, nil }
// smallSize = 512 panicIter := func(common.IteratePrm) (common.IterateRes, error) { panic("unreachable") }
// objCount = 5 errIter := func(common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, myErr }
// )
// bsOpts := []Option{ var s1iter, s2iter func(common.IteratePrm) (common.IterateRes, error)
// WithCompressObjects(true), st1 := teststore.New(
// WithRootPath(dir), teststore.WithSubstorage(memstore.New()),
// WithSmallSizeLimit(smallSize * 2), // + header teststore.WithIterate(func(prm common.IteratePrm) (common.IterateRes, error) {
// WithBlobovniczaOpenedCacheSize(1), return s1iter(prm)
// WithBlobovniczaShallowWidth(1), }))
// WithBlobovniczaShallowDepth(1)} st2 := teststore.New(
// bs := New(bsOpts...) teststore.WithSubstorage(memstore.New()),
// require.NoError(t, bs.Open(false)) teststore.WithIterate(func(prm common.IteratePrm) (common.IterateRes, error) {
// require.NoError(t, bs.Init()) return s2iter(prm)
// }))
// addrs := make([]oid.Address, objCount)
// for i := range addrs { bsOpts := []Option{WithStorages([]SubStorage{
// addrs[i] = oidtest.Address() {Storage: st1},
// {Storage: st2},
// obj := object.New() })}
// obj.SetContainerID(addrs[i].Container()) bs := New(bsOpts...)
// obj.SetID(addrs[i].Object()) require.NoError(t, bs.Open(ctx, mode.ReadWrite))
// obj.SetPayload(make([]byte, smallSize<<(i%2))) require.NoError(t, bs.Init())
//
// objData, err := obj.Marshal() nopHandler := func(e common.IterationElement) error {
// require.NoError(t, err) return nil
// }
// _, err = bs.PutRaw(addrs[i], objData, true)
// require.NoError(t, err) t.Run("no errors", func(t *testing.T) {
// } s1iter = nopIter
// s2iter = nopIter
// // Construct corrupted compressed object. _, err := bs.Iterate(ctx, common.IteratePrm{Handler: nopHandler})
// buf := bytes.NewBuffer(nil) require.NoError(t, err)
// badObject := make([]byte, smallSize/2+1) })
// enc, err := zstd.NewWriter(buf) t.Run("error in the first sub storage, the second one is not iterated over", func(t *testing.T) {
// require.NoError(t, err) s1iter = errIter
// rawData := enc.EncodeAll(badObject, nil) s2iter = panicIter
// for i := 4; /* magic size */ i < len(rawData); i += 2 { _, err := bs.Iterate(ctx, common.IteratePrm{Handler: nopHandler})
// rawData[i] ^= 0xFF require.ErrorIs(t, err, myErr)
// } })
// // Will be put uncompressed but fetched as compressed because of magic.
// _, err = bs.PutRaw(oidtest.Address(), rawData, false) t.Run("ignore errors, storage 1", func(t *testing.T) {
// require.NoError(t, err) s1iter = errIter
// require.NoError(t, bs.fsTree.Put(oidtest.Address(), rawData)) s2iter = nopIter
// _, err := bs.Iterate(ctx, common.IteratePrm{IgnoreErrors: true, Handler: nopHandler})
// require.NoError(t, bs.Close()) require.NoError(t, err)
// })
// // Increase width to have blobovnicza which is definitely empty. t.Run("ignore errors, storage 2", func(t *testing.T) {
// b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...) s1iter = nopIter
// require.NoError(t, b.Open(false)) s2iter = errIter
// require.NoError(t, b.Init()) _, err := bs.Iterate(ctx, common.IteratePrm{IgnoreErrors: true, Handler: nopHandler})
// require.NoError(t, err)
// var p string })
// for i := 0; i < 2; i++ {
// bp := filepath.Join(bs.rootPath, "1", strconv.FormatUint(uint64(i), 10))
// if _, ok := bs.blobovniczas.opened.Get(bp); !ok {
// p = bp
// break
// }
// }
// require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache")
// require.NoError(t, os.Chmod(p, 0))
//
// require.NoError(t, b.Close())
// require.NoError(t, bs.Open(false))
// require.NoError(t, bs.Init())
//
// var prm IteratePrm
// prm.SetIterationHandler(func(e IterationElement) error {
// return nil
// })
// _, err = bs.Iterate(prm)
// require.Error(t, err)
//
// prm.IgnoreErrors()
//
// t.Run("skip invalid objects", func(t *testing.T) {
// actual := make([]oid.Address, 0, len(addrs))
// prm.SetIterationHandler(func(e IterationElement) error {
// obj := object.New()
// err := obj.Unmarshal(e.data)
// if err != nil {
// return err
// }
//
// var addr oid.Address
// cnr, _ := obj.ContainerID()
// addr.SetContainer(cnr)
// id, _ := obj.ID()
// addr.SetObject(id)
// actual = append(actual, addr)
// return nil
// })
//
// _, err := bs.Iterate(prm)
// require.NoError(t, err)
// require.ElementsMatch(t, addrs, actual)
// })
// t.Run("return errors from handler", func(t *testing.T) {
// n := 0
// expectedErr := errors.New("expected error")
// prm.SetIterationHandler(func(e IterationElement) error {
// if n++; n == objCount/2 {
// return expectedErr
// }
// return nil
// })
// _, err := bs.Iterate(prm)
// require.ErrorIs(t, err, expectedErr)
// })
} }