forked from TrueCloudLab/frostfs-node
[#1462] fstree: Allow to fetch file content lazily
If we should process address based on some condition, there is no need to read file content in memory. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
54d4503701
commit
f2a7503964
3 changed files with 37 additions and 9 deletions
|
@ -72,6 +72,7 @@ func addressFromString(s string) (*oid.Address, error) {
|
|||
type IterationPrm struct {
|
||||
handler func(addr oid.Address, data []byte) error
|
||||
ignoreErrors bool
|
||||
lazyHandler func(oid.Address, func() ([]byte, error)) error
|
||||
}
|
||||
|
||||
// WithHandler sets a function to call on each object.
|
||||
|
@ -79,6 +80,13 @@ func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error)
|
|||
p.handler = f
|
||||
}
|
||||
|
||||
// WithLazyHandler sets a function to call on each object.
|
||||
// Second callback parameter opens file and reads all data to a buffer.
|
||||
// File is not opened at all unless this callback is invoked.
|
||||
func (p *IterationPrm) WithLazyHandler(f func(oid.Address, func() ([]byte, error)) error) {
|
||||
p.lazyHandler = f
|
||||
}
|
||||
|
||||
// WithIgnoreErrors sets a flag indicating whether errors should be ignored.
|
||||
func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
||||
p.ignoreErrors = ignore
|
||||
|
@ -124,16 +132,23 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error {
|
|||
continue
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(curPath...))
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
continue
|
||||
if prm.lazyHandler != nil {
|
||||
err = prm.lazyHandler(*addr, func() ([]byte, error) {
|
||||
return os.ReadFile(filepath.Join(curPath...))
|
||||
})
|
||||
} else {
|
||||
var data []byte
|
||||
data, err = os.ReadFile(filepath.Join(curPath...))
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
err = prm.handler(*addr, data)
|
||||
}
|
||||
|
||||
if err := prm.handler(*addr, data); err != nil {
|
||||
// Error occurred in handler, outside of our scope, needs to be reported.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,13 +138,19 @@ func (c *cache) flushBigObjects() {
|
|||
evictNum := 0
|
||||
|
||||
var prm fstree.IterationPrm
|
||||
prm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||
prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
c.log.Error("can't read a file", zap.Stringer("address", addr))
|
||||
return nil
|
||||
}
|
||||
|
||||
c.mtx.Lock()
|
||||
_, compress := c.compressFlags[sAddr]
|
||||
c.mtx.Unlock()
|
||||
|
|
|
@ -53,10 +53,17 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
|||
|
||||
var fsPrm fstree.IterationPrm
|
||||
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
||||
fsPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||
fsPrm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||
return nil
|
||||
}
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return prm.handler(data)
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in a new issue