[#1818] writecache: Reuse FSTree flushing code between flushes

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
remotes/fyrchik/fix-pilorama-processing
Evgenii Stratonikov 2022-10-18 17:57:50 +03:00 committed by fyrchik
parent a56927e3d4
commit b64b14eb54
1 changed files with 55 additions and 71 deletions

View File

@ -137,47 +137,7 @@ func (c *cache) flushBigObjects() {
break
}
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.log.Error("can't read a file", zap.Stringer("address", addr))
return nil
}
c.mtx.Lock()
_, compress := c.compressFlags[sAddr]
c.mtx.Unlock()
var prm common.PutPrm
prm.Address = addr
prm.RawData = data
prm.DontCompress = !compress
if _, err := c.blobstor.Put(prm); err != nil {
c.log.Error("cant flush object to blobstor", zap.Error(err))
return nil
}
if compress {
c.mtx.Lock()
delete(c.compressFlags, sAddr)
c.mtx.Unlock()
}
// mark object as flushed
c.flushed.Add(sAddr, false)
return nil
}
_, _ = c.fsTree.Iterate(prm)
_ = c.flushFSTree(true)
c.modeMtx.RUnlock()
case <-c.closeCh:
@ -186,6 +146,59 @@ func (c *cache) flushBigObjects() {
}
}
func (c *cache) flushFSTree(ignoreErrors bool) error {
var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
if ignoreErrors {
c.log.Error("can't read a file", zap.Stringer("address", addr))
return nil
}
return err
}
var obj object.Object
err = obj.Unmarshal(data)
if err != nil {
if ignoreErrors {
c.log.Error("can't unmarshal an object", zap.Stringer("address", addr))
return nil
}
return err
}
var prm common.PutPrm
prm.Address = addr
prm.Object = &obj
prm.RawData = data
_, err = c.blobstor.Put(prm)
if err != nil {
if ignoreErrors {
c.log.Error("cant flush object to blobstor", zap.Error(err))
return nil
}
return err
}
// mark object as flushed
c.flushed.Add(sAddr, false)
return nil
}
_, err := c.fsTree.Iterate(prm)
return err
}
// flushWorker writes objects to the main storage.
func (c *cache) flushWorker(_ int) {
defer c.wg.Done()
@ -241,36 +254,7 @@ func (c *cache) Flush(ignoreErrors bool) error {
}
func (c *cache) flush(ignoreErrors bool) error {
var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
_, ok := c.flushed.Peek(addr.EncodeToString())
if ok {
return nil
}
data, err := f()
if err != nil {
if ignoreErrors {
return nil
}
return err
}
var obj object.Object
err = obj.Unmarshal(data)
if err != nil {
if ignoreErrors {
return nil
}
return err
}
return c.flushObject(&obj)
}
_, err := c.fsTree.Iterate(prm)
if err != nil {
if err := c.flushFSTree(ignoreErrors); err != nil {
return err
}