forked from TrueCloudLab/frostfs-node
[#1523] local_object_storage: Unify parameters for the Iterate
operation
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
73f8bb3e5f
commit
4176b2a1bc
10 changed files with 130 additions and 198 deletions
|
@ -650,8 +650,32 @@ func (b *Blobovniczas) iterateLeaves(f func(string) (bool, error)) error {
|
||||||
return b.iterateSortedLeaves(nil, f)
|
return b.iterateSortedLeaves(nil, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Iterate iterates over all objects in b.
|
||||||
|
func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
|
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
|
return blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error {
|
||||||
|
data, err := b.Decompress(data)
|
||||||
|
if err != nil {
|
||||||
|
if prm.IgnoreErrors {
|
||||||
|
if prm.ErrorHandler != nil {
|
||||||
|
return prm.ErrorHandler(addr, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return prm.Handler(common.IterationElement{
|
||||||
|
Address: addr,
|
||||||
|
ObjectData: data,
|
||||||
|
StorageID: []byte(p),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||||
func (b *Blobovniczas) Iterate(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||||
return b.iterateLeaves(func(p string) (bool, error) {
|
return b.iterateLeaves(func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovnicza(p)
|
blz, err := b.openBlobovnicza(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (b *Blobovniczas) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.Iterate(false, func(p string, blz *blobovnicza.Blobovnicza) error {
|
return b.iterateBlobovniczas(false, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
if err := blz.Init(); err != nil {
|
if err := blz.Init(); err != nil {
|
||||||
return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
||||||
}
|
}
|
||||||
|
|
24
pkg/local_object_storage/blobstor/common/iterate.go
Normal file
24
pkg/local_object_storage/blobstor/common/iterate.go
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
|
||||||
|
// IterationElement represents a unit of elements through which Iterate operation passes.
|
||||||
|
type IterationElement struct {
|
||||||
|
ObjectData []byte
|
||||||
|
Address oid.Address
|
||||||
|
StorageID []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterationHandler is a generic processor of IterationElement.
|
||||||
|
type IterationHandler func(IterationElement) error
|
||||||
|
|
||||||
|
// IteratePrm groups the parameters of Iterate operation.
|
||||||
|
type IteratePrm struct {
|
||||||
|
Handler IterationHandler
|
||||||
|
LazyHandler func(oid.Address, func() ([]byte, error)) error
|
||||||
|
IgnoreErrors bool
|
||||||
|
ErrorHandler func(oid.Address, error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateRes groups the resulting values of Iterate operation.
|
||||||
|
type IterateRes struct{}
|
|
@ -69,46 +69,16 @@ func addressFromString(s string) (*oid.Address, error) {
|
||||||
return &addr, nil
|
return &addr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterationPrm contains iteraction parameters.
|
|
||||||
type IterationPrm struct {
|
|
||||||
handler func(addr oid.Address, data []byte) error
|
|
||||||
ignoreErrors bool
|
|
||||||
errorHandler func(oid.Address, error) error
|
|
||||||
lazyHandler func(oid.Address, func() ([]byte, error)) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithHandler sets a function to call on each object.
|
|
||||||
func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error) {
|
|
||||||
p.handler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLazyHandler sets a function to call on each object.
|
|
||||||
// Second callback parameter opens file and reads all data to a buffer.
|
|
||||||
// File is not opened at all unless this callback is invoked.
|
|
||||||
func (p *IterationPrm) WithLazyHandler(f func(oid.Address, func() ([]byte, error)) error) {
|
|
||||||
p.lazyHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithIgnoreErrors sets a flag indicating whether errors should be ignored.
|
|
||||||
func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
|
||||||
p.ignoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithErrorHandler sets error handler for objects that cannot be read or unmarshaled.
|
|
||||||
func (p *IterationPrm) WithErrorHandler(f func(oid.Address, error) error) {
|
|
||||||
p.errorHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// Iterate iterates over all stored objects.
|
||||||
func (t *FSTree) Iterate(prm IterationPrm) error {
|
func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
return t.iterate(0, []string{t.RootPath}, prm)
|
return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error {
|
func (t *FSTree) iterate(depth int, curPath []string, prm common.IteratePrm) error {
|
||||||
curName := strings.Join(curPath[1:], "")
|
curName := strings.Join(curPath[1:], "")
|
||||||
des, err := os.ReadDir(filepath.Join(curPath...))
|
des, err := os.ReadDir(filepath.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.IgnoreErrors {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -139,23 +109,28 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.lazyHandler != nil {
|
if prm.LazyHandler != nil {
|
||||||
err = prm.lazyHandler(*addr, func() ([]byte, error) {
|
err = prm.LazyHandler(*addr, func() ([]byte, error) {
|
||||||
return os.ReadFile(filepath.Join(curPath...))
|
return os.ReadFile(filepath.Join(curPath...))
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
var data []byte
|
var data []byte
|
||||||
data, err = os.ReadFile(filepath.Join(curPath...))
|
data, err = os.ReadFile(filepath.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.IgnoreErrors {
|
||||||
if prm.errorHandler != nil {
|
if prm.ErrorHandler != nil {
|
||||||
return prm.errorHandler(*addr, err)
|
return prm.ErrorHandler(*addr, err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = prm.handler(*addr, data)
|
|
||||||
|
err = prm.Handler(common.IterationElement{
|
||||||
|
Address: *addr,
|
||||||
|
ObjectData: data,
|
||||||
|
StorageID: []byte{},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -76,16 +76,17 @@ func TestFSTree(t *testing.T) {
|
||||||
|
|
||||||
t.Run("iterate", func(t *testing.T) {
|
t.Run("iterate", func(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
var iterationPrm IterationPrm
|
var iterationPrm common.IteratePrm
|
||||||
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
iterationPrm.Handler = func(elem common.IterationElement) error {
|
||||||
n++
|
n++
|
||||||
expected, ok := store[addr.EncodeToString()]
|
addr := elem.Address.EncodeToString()
|
||||||
require.True(t, ok, "object %s was not found", addr.EncodeToString())
|
expected, ok := store[addr]
|
||||||
require.Equal(t, data, expected)
|
require.True(t, ok, "object %s was not found", addr)
|
||||||
|
require.Equal(t, elem.ObjectData, expected)
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
err := fs.Iterate(iterationPrm)
|
_, err := fs.Iterate(iterationPrm)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, count, n)
|
require.Equal(t, count, n)
|
||||||
|
@ -94,14 +95,14 @@ func TestFSTree(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
errStop := errors.New("stop")
|
errStop := errors.New("stop")
|
||||||
|
|
||||||
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
iterationPrm.Handler = func(_ common.IterationElement) error {
|
||||||
if n++; n == count-1 {
|
if n++; n == count-1 {
|
||||||
return errStop
|
return errStop
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
err := fs.Iterate(iterationPrm)
|
_, err := fs.Iterate(iterationPrm)
|
||||||
|
|
||||||
require.ErrorIs(t, err, errStop)
|
require.ErrorIs(t, err, errStop)
|
||||||
require.Equal(t, count-1, n)
|
require.Equal(t, count-1, n)
|
||||||
|
@ -123,13 +124,13 @@ func TestFSTree(t *testing.T) {
|
||||||
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
||||||
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions))
|
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions))
|
||||||
|
|
||||||
iterationPrm.WithIgnoreErrors(true)
|
iterationPrm.IgnoreErrors = true
|
||||||
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
iterationPrm.Handler = func(_ common.IterationElement) error {
|
||||||
n++
|
n++
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
err := fs.Iterate(iterationPrm)
|
_, err := fs.Iterate(iterationPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, count, n)
|
require.Equal(t, count, n)
|
||||||
|
|
||||||
|
@ -137,15 +138,15 @@ func TestFSTree(t *testing.T) {
|
||||||
expectedErr := errors.New("expected error")
|
expectedErr := errors.New("expected error")
|
||||||
n := 0
|
n := 0
|
||||||
|
|
||||||
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
iterationPrm.Handler = func(_ common.IterationElement) error {
|
||||||
n++
|
n++
|
||||||
if n == count/2 { // process some iterations
|
if n == count/2 { // process some iterations
|
||||||
return expectedErr
|
return expectedErr
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
err := fs.Iterate(iterationPrm)
|
_, err := fs.Iterate(iterationPrm)
|
||||||
require.ErrorIs(t, err, expectedErr)
|
require.ErrorIs(t, err, expectedErr)
|
||||||
require.Equal(t, count/2, n)
|
require.Equal(t, count/2, n)
|
||||||
})
|
})
|
||||||
|
|
|
@ -3,64 +3,11 @@ package blobstor
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IterationElement represents a unit of elements through which Iterate operation passes.
|
|
||||||
type IterationElement struct {
|
|
||||||
data []byte
|
|
||||||
|
|
||||||
addr oid.Address
|
|
||||||
|
|
||||||
descriptor []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// ObjectData returns the stored object in a binary representation.
|
|
||||||
func (x IterationElement) ObjectData() []byte {
|
|
||||||
return x.data
|
|
||||||
}
|
|
||||||
|
|
||||||
// Descriptor returns the identifier of storage part where x is stored.
|
|
||||||
func (x IterationElement) Descriptor() []byte {
|
|
||||||
return x.descriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
// Address returns the object address.
|
|
||||||
func (x IterationElement) Address() oid.Address {
|
|
||||||
return x.addr
|
|
||||||
}
|
|
||||||
|
|
||||||
// IterationHandler is a generic processor of IterationElement.
|
|
||||||
type IterationHandler func(IterationElement) error
|
|
||||||
|
|
||||||
// IteratePrm groups the parameters of Iterate operation.
|
|
||||||
type IteratePrm struct {
|
|
||||||
handler IterationHandler
|
|
||||||
ignoreErrors bool
|
|
||||||
errorHandler func(oid.Address, error) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// IterateRes groups the resulting values of Iterate operation.
|
|
||||||
type IterateRes struct{}
|
|
||||||
|
|
||||||
// SetIterationHandler sets the action to be performed on each iteration.
|
|
||||||
func (i *IteratePrm) SetIterationHandler(h IterationHandler) {
|
|
||||||
i.handler = h
|
|
||||||
}
|
|
||||||
|
|
||||||
// IgnoreErrors sets the flag signifying whether errors should be ignored.
|
|
||||||
func (i *IteratePrm) IgnoreErrors() {
|
|
||||||
i.ignoreErrors = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetErrorHandler sets error handler for objects that cannot be read or unmarshaled.
|
|
||||||
func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) {
|
|
||||||
i.errorHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate traverses the storage over the stored objects and calls the handler
|
// Iterate traverses the storage over the stored objects and calls the handler
|
||||||
// on each element.
|
// on each element.
|
||||||
//
|
//
|
||||||
|
@ -68,86 +15,51 @@ func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) {
|
||||||
// did not allow to completely iterate over the storage.
|
// did not allow to completely iterate over the storage.
|
||||||
//
|
//
|
||||||
// If handler returns an error, method wraps and returns it immediately.
|
// If handler returns an error, method wraps and returns it immediately.
|
||||||
func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) {
|
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
var elem IterationElement
|
_, err := b.blobovniczas.Iterate(prm)
|
||||||
|
if err != nil && !prm.IgnoreErrors {
|
||||||
|
return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
err := b.blobovniczas.Iterate(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
// FIXME decompress in the fstree
|
||||||
err := blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error {
|
iPrm := prm
|
||||||
var err error
|
iPrm.Handler = func(element common.IterationElement) error {
|
||||||
|
data, err := b.Decompress(element.ObjectData)
|
||||||
// decompress the data
|
|
||||||
elem.data, err = b.Decompress(data)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.IgnoreErrors {
|
||||||
if prm.errorHandler != nil {
|
if prm.ErrorHandler != nil {
|
||||||
return prm.errorHandler(addr, err)
|
return prm.ErrorHandler(element.Address, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
element.ObjectData = data
|
||||||
elem.addr = addr
|
return prm.Handler(element)
|
||||||
elem.descriptor = []byte(p)
|
|
||||||
|
|
||||||
return prm.handler(elem)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("blobovnicza iterator failure %s: %w", p, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
_, err = b.fsTree.Iterate(iPrm)
|
||||||
})
|
if err != nil && !prm.IgnoreErrors {
|
||||||
if err != nil {
|
return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err)
|
||||||
return IterateRes{}, fmt.Errorf("blobovniczas iterator failure: %w", err)
|
|
||||||
}
|
}
|
||||||
|
return common.IterateRes{}, nil
|
||||||
elem.descriptor = []byte{}
|
|
||||||
|
|
||||||
var fsPrm fstree.IterationPrm
|
|
||||||
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
|
||||||
fsPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
|
||||||
// decompress the data
|
|
||||||
elem.data, err = b.Decompress(data)
|
|
||||||
if err != nil {
|
|
||||||
if prm.ignoreErrors {
|
|
||||||
if prm.errorHandler != nil {
|
|
||||||
return prm.errorHandler(addr, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
elem.addr = addr
|
|
||||||
|
|
||||||
return prm.handler(elem)
|
|
||||||
})
|
|
||||||
|
|
||||||
err = b.fsTree.Iterate(fsPrm)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return IterateRes{}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
||||||
// Errors related to object reading and unmarshaling are logged and skipped.
|
// Errors related to object reading and unmarshaling are logged and skipped.
|
||||||
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
||||||
var prm IteratePrm
|
var prm common.IteratePrm
|
||||||
|
|
||||||
prm.SetIterationHandler(func(elem IterationElement) error {
|
prm.Handler = func(elem common.IterationElement) error {
|
||||||
return f(elem.Address(), elem.ObjectData(), elem.Descriptor())
|
return f(elem.Address, elem.ObjectData, elem.StorageID)
|
||||||
})
|
}
|
||||||
prm.IgnoreErrors()
|
prm.IgnoreErrors = true
|
||||||
prm.SetErrorHandler(func(addr oid.Address, err error) error {
|
prm.ErrorHandler = func(addr oid.Address, err error) error {
|
||||||
blz.log.Warn("error occurred during the iteration",
|
blz.log.Warn("error occurred during the iteration",
|
||||||
zap.Stringer("address", addr),
|
zap.Stringer("address", addr),
|
||||||
zap.String("err", err.Error()))
|
zap.String("err", err.Error()))
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
_, err := blz.Iterate(prm)
|
_, err := blz.Iterate(prm)
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -102,13 +102,10 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var pi blobstor.IteratePrm
|
var pi common.IteratePrm
|
||||||
|
pi.IgnoreErrors = prm.ignoreErrors
|
||||||
if prm.ignoreErrors {
|
pi.Handler = func(elem common.IterationElement) error {
|
||||||
pi.IgnoreErrors()
|
data := elem.ObjectData
|
||||||
}
|
|
||||||
pi.SetIterationHandler(func(elem blobstor.IterationElement) error {
|
|
||||||
data := elem.ObjectData()
|
|
||||||
|
|
||||||
var size [4]byte
|
var size [4]byte
|
||||||
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
||||||
|
@ -122,7 +119,7 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
||||||
|
|
||||||
count++
|
count++
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
if _, err := s.blobStor.Iterate(pi); err != nil {
|
if _, err := s.blobStor.Iterate(pi); err != nil {
|
||||||
return DumpRes{}, err
|
return DumpRes{}, err
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
@ -135,8 +134,8 @@ func (c *cache) flushBigObjects() {
|
||||||
|
|
||||||
evictNum := 0
|
evictNum := 0
|
||||||
|
|
||||||
var prm fstree.IterationPrm
|
var prm common.IteratePrm
|
||||||
prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
|
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
sAddr := addr.EncodeToString()
|
sAddr := addr.EncodeToString()
|
||||||
|
|
||||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||||
|
@ -174,9 +173,9 @@ func (c *cache) flushBigObjects() {
|
||||||
evictNum++
|
evictNum++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
_ = c.fsTree.Iterate(prm)
|
_, _ = c.fsTree.Iterate(prm)
|
||||||
|
|
||||||
// evict objects which were successfully written to BlobStor
|
// evict objects which were successfully written to BlobStor
|
||||||
c.evictObjects(evictNum)
|
c.evictObjects(evictNum)
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
@ -15,14 +14,14 @@ import (
|
||||||
func (c *cache) initFlushMarks() {
|
func (c *cache) initFlushMarks() {
|
||||||
c.log.Info("filling flush marks for objects in FSTree")
|
c.log.Info("filling flush marks for objects in FSTree")
|
||||||
|
|
||||||
var prm fstree.IterationPrm
|
var prm common.IteratePrm
|
||||||
prm.WithLazyHandler(func(addr oid.Address, _ func() ([]byte, error)) error {
|
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||||
if c.isFlushed(addr) {
|
if c.isFlushed(addr) {
|
||||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
_ = c.fsTree.Iterate(prm)
|
_, _ = c.fsTree.Iterate(prm)
|
||||||
|
|
||||||
c.log.Info("filling flush marks for objects in database")
|
c.log.Info("filling flush marks for objects in database")
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -51,9 +51,9 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var fsPrm fstree.IterationPrm
|
var fsPrm common.IteratePrm
|
||||||
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
fsPrm.IgnoreErrors = prm.ignoreErrors
|
||||||
fsPrm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
|
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -65,9 +65,10 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return prm.handler(data)
|
return prm.handler(data)
|
||||||
})
|
}
|
||||||
|
|
||||||
return c.fsTree.Iterate(fsPrm)
|
_, err = c.fsTree.Iterate(fsPrm)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
||||||
|
|
Loading…
Reference in a new issue