forked from TrueCloudLab/frostfs-node
[#789] blobstor: Implement iterator
There is a need to be able to process all objects saved in `BlobStor`. Implement `BlobStor.Iterate` method which iterates over all objects. Implement `IterateBinaryObjects` and `IterateObjects` helper functions to simplify the code. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
b618a44d69
commit
8d016d2529
3 changed files with 207 additions and 12 deletions
|
@ -629,6 +629,20 @@ func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error {
|
||||||
return b.iterateSortedLeaves(nil, f)
|
return b.iterateSortedLeaves(nil, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// iterator over all blobovniczas in unsorted order. Break on f's error return.
|
||||||
|
func (b *blobovniczas) iterateBlobovniczas(f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||||
|
return b.iterateLeaves(func(p string) (bool, error) {
|
||||||
|
blz, err := b.openBlobovnicza(p)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = f(p, blz)
|
||||||
|
|
||||||
|
return err != nil, err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// iterator over the paths of blobovniczas sorted by weight.
|
// iterator over the paths of blobovniczas sorted by weight.
|
||||||
func (b *blobovniczas) iterateSortedLeaves(addr *objectSDK.Address, f func(string) (bool, error)) error {
|
func (b *blobovniczas) iterateSortedLeaves(addr *objectSDK.Address, f func(string) (bool, error)) error {
|
||||||
_, err := b.iterateSorted(
|
_, err := b.iterateSorted(
|
||||||
|
@ -767,19 +781,16 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex
|
||||||
func (b *blobovniczas) init() error {
|
func (b *blobovniczas) init() error {
|
||||||
b.log.Debug("initializing Blobovnicza's")
|
b.log.Debug("initializing Blobovnicza's")
|
||||||
|
|
||||||
return b.iterateLeaves(func(p string) (bool, error) {
|
return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
blz, err := b.openBlobovnicza(p)
|
if err := blz.Init(); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
|
||||||
} else if err := blz.Init(); err != nil {
|
|
||||||
return false, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log := b.log.With(zap.String("id", p))
|
log := b.log.With(zap.String("id", p))
|
||||||
|
|
||||||
log.Debug("blobovnicza successfully initialized, closing...")
|
log.Debug("blobovnicza successfully initialized, closing...")
|
||||||
|
|
||||||
return false, nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,33 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IterationHandler represents the action to be performed on each iteration.
|
// IterationElement represents a unit of elements through which Iterate operation passes.
|
||||||
type IterationHandler func(*objectSDK.Address, *blobovnicza.ID) error
|
type IterationElement struct {
|
||||||
|
data []byte
|
||||||
|
|
||||||
|
blzID *blobovnicza.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectData returns stored object in a binary representation.
|
||||||
|
func (x IterationElement) ObjectData() []byte {
|
||||||
|
return x.data
|
||||||
|
}
|
||||||
|
|
||||||
|
// BlobovniczaID returns identifier of Blobovnicza in which object is stored.
|
||||||
|
// Returns nil if object isn't in Blobovnicza.
|
||||||
|
func (x IterationElement) BlobovniczaID() *blobovnicza.ID {
|
||||||
|
return x.blzID
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterationHandler is a generic processor of IterationElement.
|
||||||
|
type IterationHandler func(IterationElement) error
|
||||||
|
|
||||||
// IteratePrm groups the parameters of Iterate operation.
|
// IteratePrm groups the parameters of Iterate operation.
|
||||||
type IteratePrm struct {
|
type IteratePrm struct {
|
||||||
|
@ -27,7 +48,78 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) {
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely iterate over the storage.
|
// did not allow to completely iterate over the storage.
|
||||||
//
|
//
|
||||||
// If handler returns an error, method returns it immediately.
|
// If handler returns an error, method wraps and returns it immediately.
|
||||||
func (b *BlobStor) Iterate(prm *IteratePrm) (*IterateRes, error) {
|
func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
panic("implement me")
|
var elem IterationElement
|
||||||
|
|
||||||
|
err := b.blobovniczas.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
|
err := blobovnicza.IterateObjects(blz, func(data []byte) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// decompress the data
|
||||||
|
elem.data, err = b.decompressor(data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
elem.blzID = blobovnicza.NewIDFromBytes([]byte(p))
|
||||||
|
|
||||||
|
return prm.handler(elem)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("blobovnicza iterator failure %s: %w", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("blobovniczas iterator failure: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
elem.blzID = nil
|
||||||
|
|
||||||
|
err = b.fsTree.Iterate(func(_ *objectSDK.Address, data []byte) error {
|
||||||
|
// decompress the data
|
||||||
|
elem.data, err = b.decompressor(data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return prm.handler(elem)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return new(IterateRes), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
||||||
|
func IterateBinaryObjects(blz *BlobStor, f func(data []byte, blzID *blobovnicza.ID) error) error {
|
||||||
|
var prm IteratePrm
|
||||||
|
|
||||||
|
prm.SetIterationHandler(func(elem IterationElement) error {
|
||||||
|
return f(elem.ObjectData(), elem.BlobovniczaID())
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := blz.Iterate(prm)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateObjects is a helper function which iterates over BlobStor and passes decoded objects to f.
|
||||||
|
func IterateObjects(blz *BlobStor, f func(obj *object.Object, blzID *blobovnicza.ID) error) error {
|
||||||
|
var obj *object.Object
|
||||||
|
|
||||||
|
return IterateBinaryObjects(blz, func(data []byte, blzID *blobovnicza.ID) error {
|
||||||
|
if obj == nil {
|
||||||
|
obj = object.New()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
return fmt.Errorf("could not unmarshal the object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return f(obj, blzID)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
92
pkg/local_object_storage/blobstor/iterate_test.go
Normal file
92
pkg/local_object_storage/blobstor/iterate_test.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package blobstor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIterateObjects(t *testing.T) {
|
||||||
|
p := t.Name()
|
||||||
|
|
||||||
|
const smalSz = 50
|
||||||
|
|
||||||
|
// create BlobStor instance
|
||||||
|
blobStor := New(
|
||||||
|
WithCompressObjects(true, test.NewLogger(false)),
|
||||||
|
WithRootPath(p),
|
||||||
|
WithSmallSizeLimit(smalSz),
|
||||||
|
WithBlobovniczaShallowWidth(1),
|
||||||
|
WithBlobovniczaShallowDepth(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
|
// open Blobstor
|
||||||
|
require.NoError(t, blobStor.Open())
|
||||||
|
|
||||||
|
// initialize Blobstor
|
||||||
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
||||||
|
defer blobStor.Close()
|
||||||
|
|
||||||
|
const objNum = 5
|
||||||
|
|
||||||
|
type addrData struct {
|
||||||
|
big bool
|
||||||
|
addr *object.Address
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
mObjs := make(map[string]addrData)
|
||||||
|
|
||||||
|
for i := uint64(0); i < objNum; i++ {
|
||||||
|
sz := smalSz
|
||||||
|
|
||||||
|
big := i < objNum/2
|
||||||
|
if big {
|
||||||
|
sz++
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make([]byte, sz)
|
||||||
|
binary.BigEndian.PutUint64(data, i)
|
||||||
|
|
||||||
|
addr := objecttest.Address()
|
||||||
|
|
||||||
|
mObjs[string(data)] = addrData{
|
||||||
|
big: big,
|
||||||
|
addr: addr,
|
||||||
|
data: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range mObjs {
|
||||||
|
_, err := blobStor.PutRaw(v.addr, v.data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := IterateBinaryObjects(blobStor, func(data []byte, blzID *blobovnicza.ID) error {
|
||||||
|
v, ok := mObjs[string(data)]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, v.data, data)
|
||||||
|
|
||||||
|
if v.big {
|
||||||
|
require.Nil(t, blzID)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, blzID)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(mObjs, string(data))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, mObjs)
|
||||||
|
}
|
Loading…
Reference in a new issue