diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index ef1a6543b5..e995f0163e 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -137,47 +137,7 @@ func (c *cache) flushBigObjects() { break } - var prm common.IteratePrm - prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - sAddr := addr.EncodeToString() - - if _, ok := c.store.flushed.Peek(sAddr); ok { - return nil - } - - data, err := f() - if err != nil { - c.log.Error("can't read a file", zap.Stringer("address", addr)) - return nil - } - - c.mtx.Lock() - _, compress := c.compressFlags[sAddr] - c.mtx.Unlock() - - var prm common.PutPrm - prm.Address = addr - prm.RawData = data - prm.DontCompress = !compress - - if _, err := c.blobstor.Put(prm); err != nil { - c.log.Error("cant flush object to blobstor", zap.Error(err)) - return nil - } - - if compress { - c.mtx.Lock() - delete(c.compressFlags, sAddr) - c.mtx.Unlock() - } - - // mark object as flushed - c.flushed.Add(sAddr, false) - - return nil - } - - _, _ = c.fsTree.Iterate(prm) + _ = c.flushFSTree(true) c.modeMtx.RUnlock() case <-c.closeCh: @@ -186,6 +146,59 @@ func (c *cache) flushBigObjects() { } } +func (c *cache) flushFSTree(ignoreErrors bool) error { + var prm common.IteratePrm + prm.IgnoreErrors = ignoreErrors + prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { + sAddr := addr.EncodeToString() + + if _, ok := c.store.flushed.Peek(sAddr); ok { + return nil + } + + data, err := f() + if err != nil { + if ignoreErrors { + c.log.Error("can't read a file", zap.Stringer("address", addr)) + return nil + } + return err + } + + var obj object.Object + err = obj.Unmarshal(data) + if err != nil { + if ignoreErrors { + c.log.Error("can't unmarshal an object", zap.Stringer("address", addr)) + return nil + } + return err + } + + var prm common.PutPrm + prm.Address = addr + prm.Object = &obj + prm.RawData = data + + _, err = c.blobstor.Put(prm) + if err != nil { + if ignoreErrors { + c.log.Error("cant flush object to blobstor", zap.Error(err)) + return nil + } + return err + } + + // mark object as flushed + c.flushed.Add(sAddr, false) + + return nil + } + + _, err := c.fsTree.Iterate(prm) + return err +} + // flushWorker writes objects to the main storage. func (c *cache) flushWorker(_ int) { defer c.wg.Done() @@ -241,36 +254,7 @@ func (c *cache) Flush(ignoreErrors bool) error { } func (c *cache) flush(ignoreErrors bool) error { - var prm common.IteratePrm - prm.IgnoreErrors = ignoreErrors - prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - _, ok := c.flushed.Peek(addr.EncodeToString()) - if ok { - return nil - } - - data, err := f() - if err != nil { - if ignoreErrors { - return nil - } - return err - } - - var obj object.Object - err = obj.Unmarshal(data) - if err != nil { - if ignoreErrors { - return nil - } - return err - } - - return c.flushObject(&obj) - } - - _, err := c.fsTree.Iterate(prm) - if err != nil { + if err := c.flushFSTree(ignoreErrors); err != nil { return err }