forked from TrueCloudLab/frostfs-node
[#1818] writecache: Reuse FSTree flushing code between flushes
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
6e2f7e291d
commit
b580846630
1 changed files with 55 additions and 71 deletions
|
@ -137,47 +137,7 @@ func (c *cache) flushBigObjects() {
|
|||
break
|
||||
}
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
c.log.Error("can't read a file", zap.Stringer("address", addr))
|
||||
return nil
|
||||
}
|
||||
|
||||
c.mtx.Lock()
|
||||
_, compress := c.compressFlags[sAddr]
|
||||
c.mtx.Unlock()
|
||||
|
||||
var prm common.PutPrm
|
||||
prm.Address = addr
|
||||
prm.RawData = data
|
||||
prm.DontCompress = !compress
|
||||
|
||||
if _, err := c.blobstor.Put(prm); err != nil {
|
||||
c.log.Error("cant flush object to blobstor", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if compress {
|
||||
c.mtx.Lock()
|
||||
delete(c.compressFlags, sAddr)
|
||||
c.mtx.Unlock()
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.flushed.Add(sAddr, false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
_, _ = c.fsTree.Iterate(prm)
|
||||
_ = c.flushFSTree(true)
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
case <-c.closeCh:
|
||||
|
@ -186,6 +146,59 @@ func (c *cache) flushBigObjects() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *cache) flushFSTree(ignoreErrors bool) error {
|
||||
var prm common.IteratePrm
|
||||
prm.IgnoreErrors = ignoreErrors
|
||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
c.log.Error("can't read a file", zap.Stringer("address", addr))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var obj object.Object
|
||||
err = obj.Unmarshal(data)
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
c.log.Error("can't unmarshal an object", zap.Stringer("address", addr))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var prm common.PutPrm
|
||||
prm.Address = addr
|
||||
prm.Object = &obj
|
||||
prm.RawData = data
|
||||
|
||||
_, err = c.blobstor.Put(prm)
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
c.log.Error("cant flush object to blobstor", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.flushed.Add(sAddr, false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Iterate(prm)
|
||||
return err
|
||||
}
|
||||
|
||||
// flushWorker writes objects to the main storage.
|
||||
func (c *cache) flushWorker(_ int) {
|
||||
defer c.wg.Done()
|
||||
|
@ -241,36 +254,7 @@ func (c *cache) Flush(ignoreErrors bool) error {
|
|||
}
|
||||
|
||||
func (c *cache) flush(ignoreErrors bool) error {
|
||||
var prm common.IteratePrm
|
||||
prm.IgnoreErrors = ignoreErrors
|
||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
_, ok := c.flushed.Peek(addr.EncodeToString())
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var obj object.Object
|
||||
err = obj.Unmarshal(data)
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return c.flushObject(&obj)
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Iterate(prm)
|
||||
if err != nil {
|
||||
if err := c.flushFSTree(ignoreErrors); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue