diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index c923d6b6..bedee924 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -72,6 +72,7 @@ func addressFromString(s string) (*oid.Address, error) { type IterationPrm struct { handler func(addr oid.Address, data []byte) error ignoreErrors bool + lazyHandler func(oid.Address, func() ([]byte, error)) error } // WithHandler sets a function to call on each object. @@ -79,6 +80,13 @@ func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error) p.handler = f } +// WithLazyHandler sets a function to call on each object. +// Second callback parameter opens file and reads all data to a buffer. +// File is not opened at all unless this callback is invoked. +func (p *IterationPrm) WithLazyHandler(f func(oid.Address, func() ([]byte, error)) error) { + p.lazyHandler = f +} + // WithIgnoreErrors sets a flag indicating whether errors should be ignored. func (p *IterationPrm) WithIgnoreErrors(ignore bool) { p.ignoreErrors = ignore @@ -124,16 +132,23 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error { continue } - data, err := os.ReadFile(filepath.Join(curPath...)) - if err != nil { - if prm.ignoreErrors { - continue + if prm.lazyHandler != nil { + err = prm.lazyHandler(*addr, func() ([]byte, error) { + return os.ReadFile(filepath.Join(curPath...)) + }) + } else { + var data []byte + data, err = os.ReadFile(filepath.Join(curPath...)) + if err != nil { + if prm.ignoreErrors { + continue + } + return err } - return err + err = prm.handler(*addr, data) } - if err := prm.handler(*addr, data); err != nil { - // Error occurred in handler, outside of our scope, needs to be reported. + if err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 4beeb3d7..62a16d62 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -138,13 +138,19 @@ func (c *cache) flushBigObjects() { evictNum := 0 var prm fstree.IterationPrm - prm.WithHandler(func(addr oid.Address, data []byte) error { + prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error { sAddr := addr.EncodeToString() if _, ok := c.store.flushed.Peek(sAddr); ok { return nil } + data, err := f() + if err != nil { + c.log.Error("can't read a file", zap.Stringer("address", addr)) + return nil + } + c.mtx.Lock() _, compress := c.compressFlags[sAddr] c.mtx.Unlock() diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index c17cd77b..43ec0a38 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -53,10 +53,17 @@ func (c *cache) Iterate(prm IterationPrm) error { var fsPrm fstree.IterationPrm fsPrm.WithIgnoreErrors(prm.ignoreErrors) - fsPrm.WithHandler(func(addr oid.Address, data []byte) error { + fsPrm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error { if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { return nil } + data, err := f() + if err != nil { + if prm.ignoreErrors { + return nil + } + return err + } return prm.handler(data) })