From 36eebb593204efea4380af27f53f4299721c413b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 20 Jan 2022 19:52:30 +0300 Subject: [PATCH] [#1085] writecache: allow to ignore errors during iteration Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/shard/evacuate.go | 5 ++-- .../writecache/iterate.go | 26 ++++++++++++++++--- .../writecache/writecache.go | 2 +- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/local_object_storage/shard/evacuate.go b/pkg/local_object_storage/shard/evacuate.go index 4dc425a4..0920eeec 100644 --- a/pkg/local_object_storage/shard/evacuate.go +++ b/pkg/local_object_storage/shard/evacuate.go @@ -7,6 +7,7 @@ import ( "os" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" ) var dumpMagic = []byte("NEOF") @@ -72,7 +73,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { var count int if s.hasWriteCache() { - err := s.writeCache.Iterate(func(data []byte) error { + err := s.writeCache.Iterate(new(writecache.IterationPrm).WithHandler(func(data []byte) error { var size [4]byte binary.LittleEndian.PutUint32(size[:], uint32(len(data))) if _, err := w.Write(size[:]); err != nil { @@ -85,7 +86,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { count++ return nil - }) + })) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index e5434ebb..53278cbc 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -12,10 +12,28 @@ import ( // ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. var ErrNoDefaultBucket = errors.New("no default bucket") +// IterationPrm contains iteration parameters. +type IterationPrm struct { + handler func([]byte) error + ignoreErrors bool +} + +// WithHandler sets a callback to be executed on every object. +func (p *IterationPrm) WithHandler(f func([]byte) error) *IterationPrm { + p.handler = f + return p +} + +// WithIgnoreErrors sets a flag indicating that errors should be ignored. +func (p *IterationPrm) WithIgnoreErrors(ignore bool) *IterationPrm { + p.ignoreErrors = ignore + return p +} + // Iterate iterates over all objects present in write cache. // This is very difficult to do correctly unless write-cache is put in read-only mode. // Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results. -func (c *cache) Iterate(f func([]byte) error) error { +func (c *cache) Iterate(prm *IterationPrm) error { c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.mode != ModeReadOnly { @@ -28,7 +46,7 @@ func (c *cache) Iterate(f func([]byte) error) error { if _, ok := c.flushed.Peek(string(k)); ok { return nil } - return f(data) + return prm.handler(data) }) }) if err != nil { @@ -39,8 +57,8 @@ func (c *cache) Iterate(f func([]byte) error) error { if _, ok := c.flushed.Peek(addr.String()); ok { return nil } - return f(data) - })) + return prm.handler(data) + }).WithIgnoreErrors(prm.ignoreErrors)) } // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 5bdad7e3..858af4eb 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -20,7 +20,7 @@ type Cache interface { Get(*objectSDK.Address) (*object.Object, error) Head(*objectSDK.Address) (*object.Object, error) Delete(*objectSDK.Address) error - Iterate(func(data []byte) error) error + Iterate(*IterationPrm) error Put(*object.Object) error SetMode(Mode) DumpInfo() Info