forked from TrueCloudLab/frostfs-node
[#1085] writecache: allow to ignore errors during iteration
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
cd75638ce3
commit
36eebb5932
3 changed files with 26 additions and 7 deletions
|
@ -7,6 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
)
|
)
|
||||||
|
|
||||||
var dumpMagic = []byte("NEOF")
|
var dumpMagic = []byte("NEOF")
|
||||||
|
@ -72,7 +73,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
|
||||||
var count int
|
var count int
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
err := s.writeCache.Iterate(func(data []byte) error {
|
err := s.writeCache.Iterate(new(writecache.IterationPrm).WithHandler(func(data []byte) error {
|
||||||
var size [4]byte
|
var size [4]byte
|
||||||
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
||||||
if _, err := w.Write(size[:]); err != nil {
|
if _, err := w.Write(size[:]); err != nil {
|
||||||
|
@ -85,7 +86,7 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
|
||||||
|
|
||||||
count++
|
count++
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,10 +12,28 @@ import (
|
||||||
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
|
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
|
||||||
var ErrNoDefaultBucket = errors.New("no default bucket")
|
var ErrNoDefaultBucket = errors.New("no default bucket")
|
||||||
|
|
||||||
|
// IterationPrm contains iteration parameters.
|
||||||
|
type IterationPrm struct {
|
||||||
|
handler func([]byte) error
|
||||||
|
ignoreErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithHandler sets a callback to be executed on every object.
|
||||||
|
func (p *IterationPrm) WithHandler(f func([]byte) error) *IterationPrm {
|
||||||
|
p.handler = f
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithIgnoreErrors sets a flag indicating that errors should be ignored.
|
||||||
|
func (p *IterationPrm) WithIgnoreErrors(ignore bool) *IterationPrm {
|
||||||
|
p.ignoreErrors = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate iterates over all objects present in write cache.
|
// Iterate iterates over all objects present in write cache.
|
||||||
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
||||||
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
||||||
func (c *cache) Iterate(f func([]byte) error) error {
|
func (c *cache) Iterate(prm *IterationPrm) error {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.mode != ModeReadOnly {
|
if c.mode != ModeReadOnly {
|
||||||
|
@ -28,7 +46,7 @@ func (c *cache) Iterate(f func([]byte) error) error {
|
||||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
if _, ok := c.flushed.Peek(string(k)); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return f(data)
|
return prm.handler(data)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -39,8 +57,8 @@ func (c *cache) Iterate(f func([]byte) error) error {
|
||||||
if _, ok := c.flushed.Peek(addr.String()); ok {
|
if _, ok := c.flushed.Peek(addr.String()); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return f(data)
|
return prm.handler(data)
|
||||||
}))
|
}).WithIgnoreErrors(prm.ignoreErrors))
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
||||||
|
|
|
@ -20,7 +20,7 @@ type Cache interface {
|
||||||
Get(*objectSDK.Address) (*object.Object, error)
|
Get(*objectSDK.Address) (*object.Object, error)
|
||||||
Head(*objectSDK.Address) (*object.Object, error)
|
Head(*objectSDK.Address) (*object.Object, error)
|
||||||
Delete(*objectSDK.Address) error
|
Delete(*objectSDK.Address) error
|
||||||
Iterate(func(data []byte) error) error
|
Iterate(*IterationPrm) error
|
||||||
Put(*object.Object) error
|
Put(*object.Object) error
|
||||||
SetMode(Mode)
|
SetMode(Mode)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
|
|
Loading…
Reference in a new issue