From 6ad87e7959be5f9f1d1de07c0a42fe80096d541b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 1 Jun 2022 10:06:32 +0300 Subject: [PATCH] [#1462] writecache: Fill flush marks during startup Some of the objects are already flushed, don't do it twice. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/writecache/init.go | 75 +++++++++++++++++++ .../writecache/writecache.go | 2 + 2 files changed, 77 insertions(+) create mode 100644 pkg/local_object_storage/writecache/init.go diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go new file mode 100644 index 000000000..452e93153 --- /dev/null +++ b/pkg/local_object_storage/writecache/init.go @@ -0,0 +1,75 @@ +package writecache + +import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +func (c *cache) initFlushMarks() { + c.log.Info("filling flush marks for objects in FSTree") + + var prm fstree.IterationPrm + prm.WithLazyHandler(func(addr oid.Address, _ func() ([]byte, error)) error { + if c.isFlushed(addr) { + c.store.flushed.Add(addr.EncodeToString(), true) + } + return nil + }) + _ = c.fsTree.Iterate(prm) + + c.log.Info("filling flush marks for objects in database") + + var m []string + var lastKey []byte + var batchSize = flushBatchSize + for { + m = m[:0] + + // We put objects in batches of fixed size to not interfere with main put cycle a lot. + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() { + m = append(m, string(k)) + } + return nil + }) + + var addr oid.Address + for i := range m { + if err := addr.DecodeString(m[i]); err != nil { + continue + } + + if c.isFlushed(addr) { + c.store.flushed.Add(addr.EncodeToString(), true) + } + } + + if len(m) == 0 { + break + } + lastKey = append([]byte(m[len(m)-1]), 0) + } + + c.log.Info("finished updating flush marks") +} + +func (c *cache) isFlushed(addr oid.Address) bool { + var existsPrm meta.ExistsPrm + existsPrm.WithAddress(addr) + + mRes, err := c.metabase.Exists(existsPrm) + if err != nil || !mRes.Exists() { + return false + } + + var prm blobstor.ExistsPrm + prm.SetAddress(addr) + + res, err := c.blobstor.Exists(prm) + return err == nil && res.Exists() +} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index b3335c1a4..763f58bfc 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -133,6 +133,8 @@ func (c *cache) Open() error { // Init runs necessary services. func (c *cache) Init() error { + c.initFlushMarks() + go c.persistLoop() go c.flushLoop() return nil