From f1f3c80dbff8dfd7315f1693894f3410e35daab6 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Wed, 15 Feb 2023 17:53:42 +0300
Subject: [PATCH] [#32] node: Init write-cache asynchronously
Signed-off-by: Pavel Karpy
---
CHANGELOG.md | 1 +
pkg/local_object_storage/writecache/flush.go | 5 +-
.../writecache/flush_test.go | 16 ++++-
pkg/local_object_storage/writecache/init.go | 59 +++++++++++++++++++
pkg/local_object_storage/writecache/mode.go | 34 +++++++++--
pkg/local_object_storage/writecache/put.go | 7 +++
.../writecache/storage.go | 3 +
.../writecache/writecache.go | 22 +++++--
8 files changed, 132 insertions(+), 15 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a6324756c..6023e5c1be 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,7 @@ Changelog for FrostFS Node
- Storage engine now can start even when some shard components are unavailable (#2238)
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
- Expired locked object is available for reading (#56)
+- Initialize write-cache asynchronously (#32)
### Fixed
- Increase payload size metric on shards' `put` operation (#1794)
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index ececf331c7..0437367e7e 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0]
c.modeMtx.RLock()
- if c.readOnly() {
+ if c.readOnly() || !c.initialized.Load() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@@ -151,6 +151,9 @@ func (c *cache) flushBigObjects() {
if c.readOnly() {
c.modeMtx.RUnlock()
break
+ } else if !c.initialized.Load() {
+ c.modeMtx.RUnlock()
+ continue
}
_ = c.flushFSTree(true)
diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go
index 6d0f9910b8..e6de8a0282 100644
--- a/pkg/local_object_storage/writecache/flush_test.go
+++ b/pkg/local_object_storage/writecache/flush_test.go
@@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
+ "time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@@ -64,7 +65,7 @@ func TestFlush(t *testing.T) {
WithBlobstor(bs),
}, opts...)...)
require.NoError(t, wc.Open(false))
- require.NoError(t, wc.Init())
+ initWC(t, wc)
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
@@ -262,7 +263,7 @@ func TestFlush(t *testing.T) {
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
- require.NoError(t, wc.Init())
+ initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
require.NoError(t, err, i)
@@ -271,7 +272,7 @@ func TestFlush(t *testing.T) {
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
- require.NoError(t, wc.Init())
+ initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
if i < 2 {
@@ -316,6 +317,15 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
return obj, data
}
+func initWC(t *testing.T, wc Cache) {
+ require.NoError(t, wc.Init())
+
+ require.Eventually(t, func() bool {
+ rawWc := wc.(*cache)
+ return rawWc.initialized.Load()
+ }, 100*time.Second, 1*time.Millisecond)
+}
+
type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 {
diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go
index 382b90021b..56b27ec4b2 100644
--- a/pkg/local_object_storage/writecache/init.go
+++ b/pkg/local_object_storage/writecache/init.go
@@ -2,6 +2,7 @@ package writecache
import (
"errors"
+ "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
@@ -13,10 +14,57 @@ import (
)
func (c *cache) initFlushMarks() {
+ var localWG sync.WaitGroup
+
+ localWG.Add(1)
+ go func() {
+ defer localWG.Done()
+
+ c.fsTreeFlushMarkUpdate()
+ }()
+
+ localWG.Add(1)
+ go func() {
+ defer localWG.Done()
+
+ c.dbFlushMarkUpdate()
+ }()
+
+ c.initWG.Add(1)
+ c.wg.Add(1)
+ go func() {
+ defer c.wg.Done()
+ defer c.initWG.Done()
+
+ localWG.Wait()
+
+ select {
+ case <-c.stopInitCh:
+ return
+ case <-c.closeCh:
+ return
+ default:
+ }
+
+ c.initialized.Store(true)
+ }()
+}
+
+var errStopIter = errors.New("stop iteration")
+
+func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
+ select {
+ case <-c.closeCh:
+ return errStopIter
+ case <-c.stopInitCh:
+ return errStopIter
+ default:
+ }
+
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
@@ -37,7 +85,10 @@ func (c *cache) initFlushMarks() {
return nil
}
_, _ = c.fsTree.Iterate(prm)
+ c.log.Info("finished updating FSTree flush marks")
+}
+func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
@@ -45,6 +96,14 @@ func (c *cache) initFlushMarks() {
var lastKey []byte
var batchSize = flushBatchSize
for {
+ select {
+ case <-c.closeCh:
+ return
+ case <-c.stopInitCh:
+ return
+ default:
+ }
+
m = m[:0]
indices = indices[:0]
diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go
index f04b0bc472..997310d9ee 100644
--- a/pkg/local_object_storage/writecache/mode.go
+++ b/pkg/local_object_storage/writecache/mode.go
@@ -11,6 +11,9 @@ import (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
+// ErrNotInitialized is returned when write-cache is initializing.
+var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
+
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
@@ -18,15 +21,36 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
- if m.NoMetabase() && !c.mode.NoMetabase() {
- err := c.flush(true)
+ return c.setMode(m)
+}
+
+// setMode applies new mode. Must be called with cache.modeMtx lock taken.
+func (c *cache) setMode(m mode.Mode) error {
+ var err error
+ turnOffMeta := m.NoMetabase()
+
+ if turnOffMeta && !c.mode.NoMetabase() {
+ err = c.flush(true)
if err != nil {
return err
}
}
+ if !c.initialized.Load() {
+ close(c.stopInitCh)
+
+ c.initWG.Wait()
+ c.stopInitCh = make(chan struct{})
+
+ defer func() {
+ if err == nil && !turnOffMeta {
+ c.initFlushMarks()
+ }
+ }()
+ }
+
if c.db != nil {
- if err := c.db.Close(); err != nil {
+ if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}
@@ -39,12 +63,12 @@ func (c *cache) SetMode(m mode.Mode) error {
time.Sleep(time.Second)
}
- if m.NoMetabase() {
+ if turnOffMeta {
c.mode = m
return nil
}
- if err := c.openStore(m.ReadOnly()); err != nil {
+ if err = c.openStore(m.ReadOnly()); err != nil {
return err
}
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index b5fc0e7132..7791e93dc0 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -16,11 +16,18 @@ var (
)
// Put puts object to write-cache.
+//
+// Returns ErrReadOnly if write-cache is in R/O mode.
+// Returns ErrNotInitialized if write-cache has not been initialized yet.
+// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
+// Returns ErrBigObject if an objects exceeds maximum object size.
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
+ } else if !c.initialized.Load() {
+ return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))
diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go
index 51f58b1fab..02c79d380e 100644
--- a/pkg/local_object_storage/writecache/storage.go
+++ b/pkg/local_object_storage/writecache/storage.go
@@ -76,6 +76,9 @@ func (c *cache) openStore(readOnly bool) error {
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
+
+ c.initialized.Store(false)
+
return nil
}
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index c3f95c5539..2fe7d44bc1 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
+ "go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -48,8 +49,11 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
- mode mode.Mode
- modeMtx sync.RWMutex
+ mode mode.Mode
+ initialized atomic.Bool
+ stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
+ initWG sync.WaitGroup // for initialisation routines only
+ modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@@ -57,7 +61,7 @@ type cache struct {
// flushCh is a channel with objects to flush.
flushCh chan *object.Object
- // closeCh is close channel.
+ // closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
@@ -89,8 +93,9 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
- flushCh: make(chan *object.Object),
- mode: mode.ReadWrite,
+ flushCh: make(chan *object.Object),
+ mode: mode.ReadWrite,
+ stopInitCh: make(chan struct{}),
compressFlags: make(map[string]struct{}),
options: options{
@@ -151,8 +156,11 @@ func (c *cache) Init() error {
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
+ c.modeMtx.Lock()
+ defer c.modeMtx.Unlock()
+
// Finish all in-progress operations.
- if err := c.SetMode(mode.ReadOnly); err != nil {
+ if err := c.setMode(mode.ReadOnly); err != nil {
return err
}
@@ -164,6 +172,8 @@ func (c *cache) Close() error {
c.closeCh = nil
}
+ c.initialized.Store(false)
+
var err error
if c.db != nil {
err = c.db.Close()