From 4176b2a1bc468128fbeff598cf162399ef584794 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 7 Jul 2022 15:03:45 +0300 Subject: [PATCH] [#1523] local_object_storage: Unify parameters for the `Iterate` operation Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovniczatree/blobovnicza.go | 26 +++- .../blobstor/blobovniczatree/control.go | 2 +- .../blobstor/common/iterate.go | 24 +++ .../blobstor/fstree/fstree.go | 55 ++----- .../blobstor/fstree/fstree_test.go | 35 ++--- pkg/local_object_storage/blobstor/iterate.go | 140 ++++-------------- pkg/local_object_storage/shard/dump.go | 15 +- pkg/local_object_storage/writecache/flush.go | 9 +- pkg/local_object_storage/writecache/init.go | 9 +- .../writecache/iterate.go | 13 +- 10 files changed, 130 insertions(+), 198 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/common/iterate.go diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index 551178c47..c68b7ed77 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -650,8 +650,32 @@ func (b *Blobovniczas) iterateLeaves(f func(string) (bool, error)) error { return b.iterateSortedLeaves(nil, f) } +// Iterate iterates over all objects in b. +func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { + return blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error { + data, err := b.Decompress(data) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + return nil + } + return fmt.Errorf("could not decompress object data: %w", err) + } + + return prm.Handler(common.IterationElement{ + Address: addr, + ObjectData: data, + StorageID: []byte(p), + }) + }) + }) +} + // iterator over all Blobovniczas in unsorted order. Break on f's error return. -func (b *Blobovniczas) Iterate(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { +func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { return b.iterateLeaves(func(p string) (bool, error) { blz, err := b.openBlobovnicza(p) if err != nil { diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index 9e92fc5f9..4db7df0e5 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -34,7 +34,7 @@ func (b *Blobovniczas) Init() error { return nil } - return b.Iterate(false, func(p string, blz *blobovnicza.Blobovnicza) error { + return b.iterateBlobovniczas(false, func(p string, blz *blobovnicza.Blobovnicza) error { if err := blz.Init(); err != nil { return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err) } diff --git a/pkg/local_object_storage/blobstor/common/iterate.go b/pkg/local_object_storage/blobstor/common/iterate.go new file mode 100644 index 000000000..7fd24bbdb --- /dev/null +++ b/pkg/local_object_storage/blobstor/common/iterate.go @@ -0,0 +1,24 @@ +package common + +import oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + +// IterationElement represents a unit of elements through which Iterate operation passes. +type IterationElement struct { + ObjectData []byte + Address oid.Address + StorageID []byte +} + +// IterationHandler is a generic processor of IterationElement. +type IterationHandler func(IterationElement) error + +// IteratePrm groups the parameters of Iterate operation. +type IteratePrm struct { + Handler IterationHandler + LazyHandler func(oid.Address, func() ([]byte, error)) error + IgnoreErrors bool + ErrorHandler func(oid.Address, error) error +} + +// IterateRes groups the resulting values of Iterate operation. +type IterateRes struct{} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 365b3e04c..6a08dc87c 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -69,46 +69,16 @@ func addressFromString(s string) (*oid.Address, error) { return &addr, nil } -// IterationPrm contains iteraction parameters. -type IterationPrm struct { - handler func(addr oid.Address, data []byte) error - ignoreErrors bool - errorHandler func(oid.Address, error) error - lazyHandler func(oid.Address, func() ([]byte, error)) error -} - -// WithHandler sets a function to call on each object. -func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error) { - p.handler = f -} - -// WithLazyHandler sets a function to call on each object. -// Second callback parameter opens file and reads all data to a buffer. -// File is not opened at all unless this callback is invoked. -func (p *IterationPrm) WithLazyHandler(f func(oid.Address, func() ([]byte, error)) error) { - p.lazyHandler = f -} - -// WithIgnoreErrors sets a flag indicating whether errors should be ignored. -func (p *IterationPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// WithErrorHandler sets error handler for objects that cannot be read or unmarshaled. -func (p *IterationPrm) WithErrorHandler(f func(oid.Address, error) error) { - p.errorHandler = f -} - // Iterate iterates over all stored objects. -func (t *FSTree) Iterate(prm IterationPrm) error { - return t.iterate(0, []string{t.RootPath}, prm) +func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm) } -func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error { +func (t *FSTree) iterate(depth int, curPath []string, prm common.IteratePrm) error { curName := strings.Join(curPath[1:], "") des, err := os.ReadDir(filepath.Join(curPath...)) if err != nil { - if prm.ignoreErrors { + if prm.IgnoreErrors { return nil } return err @@ -139,23 +109,28 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error { continue } - if prm.lazyHandler != nil { - err = prm.lazyHandler(*addr, func() ([]byte, error) { + if prm.LazyHandler != nil { + err = prm.LazyHandler(*addr, func() ([]byte, error) { return os.ReadFile(filepath.Join(curPath...)) }) } else { var data []byte data, err = os.ReadFile(filepath.Join(curPath...)) if err != nil { - if prm.ignoreErrors { - if prm.errorHandler != nil { - return prm.errorHandler(*addr, err) + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(*addr, err) } continue } return err } - err = prm.handler(*addr, data) + + err = prm.Handler(common.IterationElement{ + Address: *addr, + ObjectData: data, + StorageID: []byte{}, + }) } if err != nil { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 76978b77a..9c54dffba 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -76,16 +76,17 @@ func TestFSTree(t *testing.T) { t.Run("iterate", func(t *testing.T) { n := 0 - var iterationPrm IterationPrm - iterationPrm.WithHandler(func(addr oid.Address, data []byte) error { + var iterationPrm common.IteratePrm + iterationPrm.Handler = func(elem common.IterationElement) error { n++ - expected, ok := store[addr.EncodeToString()] - require.True(t, ok, "object %s was not found", addr.EncodeToString()) - require.Equal(t, data, expected) + addr := elem.Address.EncodeToString() + expected, ok := store[addr] + require.True(t, ok, "object %s was not found", addr) + require.Equal(t, elem.ObjectData, expected) return nil - }) + } - err := fs.Iterate(iterationPrm) + _, err := fs.Iterate(iterationPrm) require.NoError(t, err) require.Equal(t, count, n) @@ -94,14 +95,14 @@ func TestFSTree(t *testing.T) { n := 0 errStop := errors.New("stop") - iterationPrm.WithHandler(func(addr oid.Address, data []byte) error { + iterationPrm.Handler = func(_ common.IterationElement) error { if n++; n == count-1 { return errStop } return nil - }) + } - err := fs.Iterate(iterationPrm) + _, err := fs.Iterate(iterationPrm) require.ErrorIs(t, err, errStop) require.Equal(t, count-1, n) @@ -123,13 +124,13 @@ func TestFSTree(t *testing.T) { require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions)) require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions)) - iterationPrm.WithIgnoreErrors(true) - iterationPrm.WithHandler(func(addr oid.Address, data []byte) error { + iterationPrm.IgnoreErrors = true + iterationPrm.Handler = func(_ common.IterationElement) error { n++ return nil - }) + } - err := fs.Iterate(iterationPrm) + _, err := fs.Iterate(iterationPrm) require.NoError(t, err) require.Equal(t, count, n) @@ -137,15 +138,15 @@ func TestFSTree(t *testing.T) { expectedErr := errors.New("expected error") n := 0 - iterationPrm.WithHandler(func(addr oid.Address, data []byte) error { + iterationPrm.Handler = func(_ common.IterationElement) error { n++ if n == count/2 { // process some iterations return expectedErr } return nil - }) + } - err := fs.Iterate(iterationPrm) + _, err := fs.Iterate(iterationPrm) require.ErrorIs(t, err, expectedErr) require.Equal(t, count/2, n) }) diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 1e534ef50..bdf33d608 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -3,64 +3,11 @@ package blobstor import ( "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) -// IterationElement represents a unit of elements through which Iterate operation passes. -type IterationElement struct { - data []byte - - addr oid.Address - - descriptor []byte -} - -// ObjectData returns the stored object in a binary representation. -func (x IterationElement) ObjectData() []byte { - return x.data -} - -// Descriptor returns the identifier of storage part where x is stored. -func (x IterationElement) Descriptor() []byte { - return x.descriptor -} - -// Address returns the object address. -func (x IterationElement) Address() oid.Address { - return x.addr -} - -// IterationHandler is a generic processor of IterationElement. -type IterationHandler func(IterationElement) error - -// IteratePrm groups the parameters of Iterate operation. -type IteratePrm struct { - handler IterationHandler - ignoreErrors bool - errorHandler func(oid.Address, error) error -} - -// IterateRes groups the resulting values of Iterate operation. -type IterateRes struct{} - -// SetIterationHandler sets the action to be performed on each iteration. -func (i *IteratePrm) SetIterationHandler(h IterationHandler) { - i.handler = h -} - -// IgnoreErrors sets the flag signifying whether errors should be ignored. -func (i *IteratePrm) IgnoreErrors() { - i.ignoreErrors = true -} - -// SetErrorHandler sets error handler for objects that cannot be read or unmarshaled. -func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) { - i.errorHandler = f -} - // Iterate traverses the storage over the stored objects and calls the handler // on each element. // @@ -68,86 +15,51 @@ func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) { // did not allow to completely iterate over the storage. // // If handler returns an error, method wraps and returns it immediately. -func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) { - var elem IterationElement - - err := b.blobovniczas.Iterate(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { - err := blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error { - var err error - - // decompress the data - elem.data, err = b.Decompress(data) - if err != nil { - if prm.ignoreErrors { - if prm.errorHandler != nil { - return prm.errorHandler(addr, err) - } - return nil - } - return fmt.Errorf("could not decompress object data: %w", err) - } - - elem.addr = addr - elem.descriptor = []byte(p) - - return prm.handler(elem) - }) - if err != nil { - return fmt.Errorf("blobovnicza iterator failure %s: %w", p, err) - } - - return nil - }) - if err != nil { - return IterateRes{}, fmt.Errorf("blobovniczas iterator failure: %w", err) +func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + _, err := b.blobovniczas.Iterate(prm) + if err != nil && !prm.IgnoreErrors { + return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err) } - elem.descriptor = []byte{} - - var fsPrm fstree.IterationPrm - fsPrm.WithIgnoreErrors(prm.ignoreErrors) - fsPrm.WithHandler(func(addr oid.Address, data []byte) error { - // decompress the data - elem.data, err = b.Decompress(data) + // FIXME decompress in the fstree + iPrm := prm + iPrm.Handler = func(element common.IterationElement) error { + data, err := b.Decompress(element.ObjectData) if err != nil { - if prm.ignoreErrors { - if prm.errorHandler != nil { - return prm.errorHandler(addr, err) + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(element.Address, err) } return nil } return fmt.Errorf("could not decompress object data: %w", err) } - - elem.addr = addr - - return prm.handler(elem) - }) - - err = b.fsTree.Iterate(fsPrm) - - if err != nil { - return IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err) + element.ObjectData = data + return prm.Handler(element) } - return IterateRes{}, nil + _, err = b.fsTree.Iterate(iPrm) + if err != nil && !prm.IgnoreErrors { + return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err) + } + return common.IterateRes{}, nil } // IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f. // Errors related to object reading and unmarshaling are logged and skipped. func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error { - var prm IteratePrm + var prm common.IteratePrm - prm.SetIterationHandler(func(elem IterationElement) error { - return f(elem.Address(), elem.ObjectData(), elem.Descriptor()) - }) - prm.IgnoreErrors() - prm.SetErrorHandler(func(addr oid.Address, err error) error { + prm.Handler = func(elem common.IterationElement) error { + return f(elem.Address, elem.ObjectData, elem.StorageID) + } + prm.IgnoreErrors = true + prm.ErrorHandler = func(addr oid.Address, err error) error { blz.log.Warn("error occurred during the iteration", zap.Stringer("address", addr), zap.String("err", err.Error())) return nil - }) + } _, err := blz.Iterate(prm) diff --git a/pkg/local_object_storage/shard/dump.go b/pkg/local_object_storage/shard/dump.go index d62149dbb..522261441 100644 --- a/pkg/local_object_storage/shard/dump.go +++ b/pkg/local_object_storage/shard/dump.go @@ -6,7 +6,7 @@ import ( "io" "os" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" ) @@ -102,13 +102,10 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) { } } - var pi blobstor.IteratePrm - - if prm.ignoreErrors { - pi.IgnoreErrors() - } - pi.SetIterationHandler(func(elem blobstor.IterationElement) error { - data := elem.ObjectData() + var pi common.IteratePrm + pi.IgnoreErrors = prm.ignoreErrors + pi.Handler = func(elem common.IterationElement) error { + data := elem.ObjectData var size [4]byte binary.LittleEndian.PutUint32(size[:], uint32(len(data))) @@ -122,7 +119,7 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) { count++ return nil - }) + } if _, err := s.blobStor.Iterate(pi); err != nil { return DumpRes{}, err diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 4951c2e03..926c01086 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -6,7 +6,6 @@ import ( "github.com/mr-tron/base58" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -135,8 +134,8 @@ func (c *cache) flushBigObjects() { evictNum := 0 - var prm fstree.IterationPrm - prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error { + var prm common.IteratePrm + prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { sAddr := addr.EncodeToString() if _, ok := c.store.flushed.Peek(sAddr); ok { @@ -174,9 +173,9 @@ func (c *cache) flushBigObjects() { evictNum++ return nil - }) + } - _ = c.fsTree.Iterate(prm) + _, _ = c.fsTree.Iterate(prm) // evict objects which were successfully written to BlobStor c.evictObjects(evictNum) diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 152d9eede..175463f50 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -5,7 +5,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -15,14 +14,14 @@ import ( func (c *cache) initFlushMarks() { c.log.Info("filling flush marks for objects in FSTree") - var prm fstree.IterationPrm - prm.WithLazyHandler(func(addr oid.Address, _ func() ([]byte, error)) error { + var prm common.IteratePrm + prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { if c.isFlushed(addr) { c.store.flushed.Add(addr.EncodeToString(), true) } return nil - }) - _ = c.fsTree.Iterate(prm) + } + _, _ = c.fsTree.Iterate(prm) c.log.Info("filling flush marks for objects in database") diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 43ec0a387..6bd279ea9 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -51,9 +51,9 @@ func (c *cache) Iterate(prm IterationPrm) error { return err } - var fsPrm fstree.IterationPrm - fsPrm.WithIgnoreErrors(prm.ignoreErrors) - fsPrm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error { + var fsPrm common.IteratePrm + fsPrm.IgnoreErrors = prm.ignoreErrors + fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { return nil } @@ -65,9 +65,10 @@ func (c *cache) Iterate(prm IterationPrm) error { return err } return prm.handler(data) - }) + } - return c.fsTree.Iterate(fsPrm) + _, err = c.fsTree.Iterate(fsPrm) + return err } // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.