[#32] node: Init write-cache asynchronously

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
Pavel Karpy 2023-02-15 17:53:42 +03:00 committed by Gitea
parent 381e363a8b
commit f1f3c80dbf
8 changed files with 132 additions and 15 deletions

View file

@ -23,6 +23,7 @@ Changelog for FrostFS Node
- Storage engine now can start even when some shard components are unavailable (#2238) - Storage engine now can start even when some shard components are unavailable (#2238)
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243) - `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
- Expired locked object is available for reading (#56) - Expired locked object is available for reading (#56)
- Initialize write-cache asynchronously (#32)
### Fixed ### Fixed
- Increase payload size metric on shards' `put` operation (#1794) - Increase payload size metric on shards' `put` operation (#1794)

View file

@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0] m = m[:0]
c.modeMtx.RLock() c.modeMtx.RLock()
if c.readOnly() { if c.readOnly() || !c.initialized.Load() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
@ -151,6 +151,9 @@ func (c *cache) flushBigObjects() {
if c.readOnly() { if c.readOnly() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
break break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
} }
_ = c.flushFSTree(true) _ = c.flushFSTree(true)

View file

@ -4,6 +4,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -64,7 +65,7 @@ func TestFlush(t *testing.T) {
WithBlobstor(bs), WithBlobstor(bs),
}, opts...)...) }, opts...)...)
require.NoError(t, wc.Open(false)) require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init()) initWC(t, wc)
// First set mode for metabase and blobstor to prevent background flushes. // First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly)) require.NoError(t, mb.SetMode(mode.ReadOnly))
@ -262,7 +263,7 @@ func TestFlush(t *testing.T) {
// Open in read-only: no error, nothing is removed. // Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true)) require.NoError(t, wc.Open(true))
require.NoError(t, wc.Init()) initWC(t, wc)
for i := range objects { for i := range objects {
_, err := wc.Get(objects[i].addr) _, err := wc.Get(objects[i].addr)
require.NoError(t, err, i) require.NoError(t, err, i)
@ -271,7 +272,7 @@ func TestFlush(t *testing.T) {
// Open in read-write: no error, something is removed. // Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false)) require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init()) initWC(t, wc)
for i := range objects { for i := range objects {
_, err := wc.Get(objects[i].addr) _, err := wc.Get(objects[i].addr)
if i < 2 { if i < 2 {
@ -316,6 +317,15 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
return obj, data return obj, data
} }
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{} type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 { func (dummyEpoch) CurrentEpoch() uint64 {

View file

@ -2,6 +2,7 @@ package writecache
import ( import (
"errors" "errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
@ -13,10 +14,57 @@ import (
) )
func (c *cache) initFlushMarks() { func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree") c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr) flushed, needRemove := c.flushStatus(addr)
if flushed { if flushed {
c.store.flushed.Add(addr.EncodeToString(), true) c.store.flushed.Add(addr.EncodeToString(), true)
@ -37,7 +85,10 @@ func (c *cache) initFlushMarks() {
return nil return nil
} }
_, _ = c.fsTree.Iterate(prm) _, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database") c.log.Info("filling flush marks for objects in database")
var m []string var m []string
@ -45,6 +96,14 @@ func (c *cache) initFlushMarks() {
var lastKey []byte var lastKey []byte
var batchSize = flushBatchSize var batchSize = flushBatchSize
for { for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0] m = m[:0]
indices = indices[:0] indices = indices[:0]

View file

@ -11,6 +11,9 @@ import (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode. // ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = logicerr.New("write-cache is in read-only mode") var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
// ErrNotInitialized is returned when write-cache is initializing.
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
// SetMode sets write-cache mode of operation. // SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk // When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended. // and all background jobs are suspended.
@ -18,15 +21,36 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
if m.NoMetabase() && !c.mode.NoMetabase() { return c.setMode(m)
err := c.flush(true) }
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(m mode.Mode) error {
var err error
turnOffMeta := m.NoMetabase()
if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(true)
if err != nil { if err != nil {
return err return err
} }
} }
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil { if c.db != nil {
if err := c.db.Close(); err != nil { if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err) return fmt.Errorf("can't close write-cache database: %w", err)
} }
} }
@ -39,12 +63,12 @@ func (c *cache) SetMode(m mode.Mode) error {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
if m.NoMetabase() { if turnOffMeta {
c.mode = m c.mode = m
return nil return nil
} }
if err := c.openStore(m.ReadOnly()); err != nil { if err = c.openStore(m.ReadOnly()); err != nil {
return err return err
} }

View file

@ -16,11 +16,18 @@ var (
) )
// Put puts object to write-cache. // Put puts object to write-cache.
//
// Returns ErrReadOnly if write-cache is in R/O mode.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
// Returns ErrBigObject if an objects exceeds maximum object size.
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) { func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
c.modeMtx.RLock() c.modeMtx.RLock()
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if c.readOnly() { if c.readOnly() {
return common.PutRes{}, ErrReadOnly return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
} }
sz := uint64(len(prm.RawData)) sz := uint64(len(prm.RawData))

View file

@ -76,6 +76,9 @@ func (c *cache) openStore(readOnly bool) error {
if c.flushed == nil { if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed) c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
} }
c.initialized.Store(false)
return nil return nil
} }

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -48,8 +49,11 @@ type cache struct {
// mtx protects statistics, counters and compressFlags. // mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex mtx sync.RWMutex
mode mode.Mode mode mode.Mode
modeMtx sync.RWMutex initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating // compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed. // whether object should be compressed.
@ -57,7 +61,7 @@ type cache struct {
// flushCh is a channel with objects to flush. // flushCh is a channel with objects to flush.
flushCh chan *object.Object flushCh chan *object.Object
// closeCh is close channel. // closeCh is close channel, protected by modeMtx.
closeCh chan struct{} closeCh chan struct{}
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
@ -89,8 +93,9 @@ var (
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { func New(opts ...Option) Cache {
c := &cache{ c := &cache{
flushCh: make(chan *object.Object), flushCh: make(chan *object.Object),
mode: mode.ReadWrite, mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
compressFlags: make(map[string]struct{}), compressFlags: make(map[string]struct{}),
options: options{ options: options{
@ -151,8 +156,11 @@ func (c *cache) Init() error {
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error { func (c *cache) Close() error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations. // Finish all in-progress operations.
if err := c.SetMode(mode.ReadOnly); err != nil { if err := c.setMode(mode.ReadOnly); err != nil {
return err return err
} }
@ -164,6 +172,8 @@ func (c *cache) Close() error {
c.closeCh = nil c.closeCh = nil
} }
c.initialized.Store(false)
var err error var err error
if c.db != nil { if c.db != nil {
err = c.db.Close() err = c.db.Close()