diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go index e28a3f6107..04524d305d 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -629,6 +629,20 @@ func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error { return b.iterateSortedLeaves(nil, f) } +// iterator over all blobovniczas in unsorted order. Break on f's error return. +func (b *blobovniczas) iterateBlobovniczas(f func(string, *blobovnicza.Blobovnicza) error) error { + return b.iterateLeaves(func(p string) (bool, error) { + blz, err := b.openBlobovnicza(p) + if err != nil { + return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) + } + + err = f(p, blz) + + return err != nil, err + }) +} + // iterator over the paths of blobovniczas sorted by weight. func (b *blobovniczas) iterateSortedLeaves(addr *objectSDK.Address, f func(string) (bool, error)) error { _, err := b.iterateSorted( @@ -767,19 +781,16 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex func (b *blobovniczas) init() error { b.log.Debug("initializing Blobovnicza's") - return b.iterateLeaves(func(p string) (bool, error) { - blz, err := b.openBlobovnicza(p) - if err != nil { - return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) - } else if err := blz.Init(); err != nil { - return false, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err) + return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error { + if err := blz.Init(); err != nil { + return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err) } log := b.log.With(zap.String("id", p)) log.Debug("blobovnicza successfully initialized, closing...") - return false, nil + return nil }) } diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index a50ccd32f4..b35c9cf108 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -1,12 +1,33 @@ package blobstor import ( + "fmt" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" ) -// IterationHandler represents the action to be performed on each iteration. -type IterationHandler func(*objectSDK.Address, *blobovnicza.ID) error +// IterationElement represents a unit of elements through which Iterate operation passes. +type IterationElement struct { + data []byte + + blzID *blobovnicza.ID +} + +// ObjectData returns stored object in a binary representation. +func (x IterationElement) ObjectData() []byte { + return x.data +} + +// BlobovniczaID returns identifier of Blobovnicza in which object is stored. +// Returns nil if object isn't in Blobovnicza. +func (x IterationElement) BlobovniczaID() *blobovnicza.ID { + return x.blzID +} + +// IterationHandler is a generic processor of IterationElement. +type IterationHandler func(IterationElement) error // IteratePrm groups the parameters of Iterate operation. type IteratePrm struct { @@ -27,7 +48,78 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) { // Returns any error encountered that // did not allow to completely iterate over the storage. // -// If handler returns an error, method returns it immediately. -func (b *BlobStor) Iterate(prm *IteratePrm) (*IterateRes, error) { - panic("implement me") +// If handler returns an error, method wraps and returns it immediately. +func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) { + var elem IterationElement + + err := b.blobovniczas.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error { + err := blobovnicza.IterateObjects(blz, func(data []byte) error { + var err error + + // decompress the data + elem.data, err = b.decompressor(data) + if err != nil { + return fmt.Errorf("could not decompress object data: %w", err) + } + + elem.blzID = blobovnicza.NewIDFromBytes([]byte(p)) + + return prm.handler(elem) + }) + if err != nil { + return fmt.Errorf("blobovnicza iterator failure %s: %w", p, err) + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("blobovniczas iterator failure: %w", err) + } + + elem.blzID = nil + + err = b.fsTree.Iterate(func(_ *objectSDK.Address, data []byte) error { + // decompress the data + elem.data, err = b.decompressor(data) + if err != nil { + return fmt.Errorf("could not decompress object data: %w", err) + } + + return prm.handler(elem) + }) + if err != nil { + return nil, fmt.Errorf("fs tree iterator failure: %w", err) + } + + return new(IterateRes), nil +} + +// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f. +func IterateBinaryObjects(blz *BlobStor, f func(data []byte, blzID *blobovnicza.ID) error) error { + var prm IteratePrm + + prm.SetIterationHandler(func(elem IterationElement) error { + return f(elem.ObjectData(), elem.BlobovniczaID()) + }) + + _, err := blz.Iterate(prm) + + return err +} + +// IterateObjects is a helper function which iterates over BlobStor and passes decoded objects to f. +func IterateObjects(blz *BlobStor, f func(obj *object.Object, blzID *blobovnicza.ID) error) error { + var obj *object.Object + + return IterateBinaryObjects(blz, func(data []byte, blzID *blobovnicza.ID) error { + if obj == nil { + obj = object.New() + } + + if err := obj.Unmarshal(data); err != nil { + return fmt.Errorf("could not unmarshal the object: %w", err) + } + + return f(obj, blzID) + }) } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go new file mode 100644 index 0000000000..4f25518260 --- /dev/null +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -0,0 +1,92 @@ +package blobstor + +import ( + "encoding/binary" + "os" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/util/logger/test" + "github.com/stretchr/testify/require" +) + +func TestIterateObjects(t *testing.T) { + p := t.Name() + + const smalSz = 50 + + // create BlobStor instance + blobStor := New( + WithCompressObjects(true, test.NewLogger(false)), + WithRootPath(p), + WithSmallSizeLimit(smalSz), + WithBlobovniczaShallowWidth(1), + WithBlobovniczaShallowDepth(1), + ) + + defer os.RemoveAll(p) + + // open Blobstor + require.NoError(t, blobStor.Open()) + + // initialize Blobstor + require.NoError(t, blobStor.Init()) + + defer blobStor.Close() + + const objNum = 5 + + type addrData struct { + big bool + addr *object.Address + data []byte + } + + mObjs := make(map[string]addrData) + + for i := uint64(0); i < objNum; i++ { + sz := smalSz + + big := i < objNum/2 + if big { + sz++ + } + + data := make([]byte, sz) + binary.BigEndian.PutUint64(data, i) + + addr := objecttest.Address() + + mObjs[string(data)] = addrData{ + big: big, + addr: addr, + data: data, + } + } + + for _, v := range mObjs { + _, err := blobStor.PutRaw(v.addr, v.data) + require.NoError(t, err) + } + + err := IterateBinaryObjects(blobStor, func(data []byte, blzID *blobovnicza.ID) error { + v, ok := mObjs[string(data)] + require.True(t, ok) + + require.Equal(t, v.data, data) + + if v.big { + require.Nil(t, blzID) + } else { + require.NotNil(t, blzID) + } + + delete(mObjs, string(data)) + + return nil + }) + require.NoError(t, err) + require.Empty(t, mObjs) +}