forked from TrueCloudLab/frostfs-node
[#1462] writecache: Fill flush marks during startup
Some of the objects are already flushed, don't do it twice. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
f2a7503964
commit
6ad87e7959
2 changed files with 77 additions and 0 deletions
75
pkg/local_object_storage/writecache/init.go
Normal file
75
pkg/local_object_storage/writecache/init.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func (c *cache) initFlushMarks() {
|
||||
c.log.Info("filling flush marks for objects in FSTree")
|
||||
|
||||
var prm fstree.IterationPrm
|
||||
prm.WithLazyHandler(func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||
if c.isFlushed(addr) {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
_ = c.fsTree.Iterate(prm)
|
||||
|
||||
c.log.Info("filling flush marks for objects in database")
|
||||
|
||||
var m []string
|
||||
var lastKey []byte
|
||||
var batchSize = flushBatchSize
|
||||
for {
|
||||
m = m[:0]
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
|
||||
m = append(m, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var addr oid.Address
|
||||
for i := range m {
|
||||
if err := addr.DecodeString(m[i]); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.isFlushed(addr) {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
}
|
||||
}
|
||||
|
||||
if len(m) == 0 {
|
||||
break
|
||||
}
|
||||
lastKey = append([]byte(m[len(m)-1]), 0)
|
||||
}
|
||||
|
||||
c.log.Info("finished updating flush marks")
|
||||
}
|
||||
|
||||
func (c *cache) isFlushed(addr oid.Address) bool {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.WithAddress(addr)
|
||||
|
||||
mRes, err := c.metabase.Exists(existsPrm)
|
||||
if err != nil || !mRes.Exists() {
|
||||
return false
|
||||
}
|
||||
|
||||
var prm blobstor.ExistsPrm
|
||||
prm.SetAddress(addr)
|
||||
|
||||
res, err := c.blobstor.Exists(prm)
|
||||
return err == nil && res.Exists()
|
||||
}
|
|
@ -133,6 +133,8 @@ func (c *cache) Open() error {
|
|||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init() error {
|
||||
c.initFlushMarks()
|
||||
|
||||
go c.persistLoop()
|
||||
go c.flushLoop()
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue