Init write-cache asynchronously #35

Merged
carpawell merged 2 commits from carpawell/faster-wc-init into master 2023-03-09 11:07:34 +00:00
9 changed files with 133 additions and 15 deletions

View file

@ -23,6 +23,7 @@ Changelog for FrostFS Node
- Storage engine now can start even when some shard components are unavailable (#2238)
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
- Expired locked object is available for reading (#56)
- Initialize write-cache asynchronously (#32)
### Fixed
- Increase payload size metric on shards' `put` operation (#1794)

View file

@ -106,6 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
require.NoError(t, s.Open(false))
require.NoError(t, s.Init())
require.NoError(t, s.SetMode(m))
require.NoError(t, s.Close())
})
}

View file

@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() {
if c.readOnly() || !c.initialized.Load() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@ -151,6 +151,9 @@ func (c *cache) flushBigObjects() {
if c.readOnly() {
c.modeMtx.RUnlock()
break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
}
_ = c.flushFSTree(true)

View file

@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -64,7 +65,7 @@ func TestFlush(t *testing.T) {
WithBlobstor(bs),
}, opts...)...)
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
initWC(t, wc)
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
@ -262,7 +263,7 @@ func TestFlush(t *testing.T) {
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
require.NoError(t, wc.Init())
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
require.NoError(t, err, i)
@ -271,7 +272,7 @@ func TestFlush(t *testing.T) {
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
if i < 2 {
@ -316,6 +317,15 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
return obj, data
fyrchik commented 2023-02-16 06:08:51 +00:00 (Migrated from github.com)
Review

Is it enough with -race flag?

Is it enough with `-race` flag?
fyrchik commented 2023-02-16 06:14:47 +00:00 (Migrated from github.com)
Review

The tests are in the writecache package, can we just directly check for initialized flag?

The tests are in the `writecache` package, can we just directly check for `initialized` flag?
carpawell commented 2023-02-17 14:00:18 +00:00 (Migrated from github.com)
Review

it was not but because of another reason, fixed races

it was not but because of another reason, fixed races
}
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 {

View file

@ -2,6 +2,7 @@ package writecache
import (
"errors"
"sync"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
@ -13,10 +14,57 @@ import (
)
func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
fyrchik commented 2023-01-31 08:45:12 +00:00 (Migrated from github.com)
Review

I think this should be added in c.wg too, no?

I think this should be added in `c.wg` too, no?
carpawell commented 2023-01-31 12:20:03 +00:00 (Migrated from github.com)
Review

hw, yeah, would be more reliable, added

hw, yeah, would be more reliable, added
return
case <-c.closeCh:
fyrchik commented 2023-02-16 06:10:10 +00:00 (Migrated from github.com)
Review

If we use c.wg here, do we need to use it in 2 previous goroutines?

If we use `c.wg` here, do we need to use it in 2 previous goroutines?
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
fyrchik commented 2023-02-16 06:11:02 +00:00 (Migrated from github.com)
Review

It is enough for setMode to work too?

It is enough for `setMode` to work too?
carpawell commented 2023-02-17 14:00:15 +00:00 (Migrated from github.com)
Review

was not, fixed

was not, fixed
c.store.flushed.Add(addr.EncodeToString(), true)
@ -37,7 +85,10 @@ func (c *cache) initFlushMarks() {
return nil
}
_, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
@ -45,6 +96,14 @@ func (c *cache) initFlushMarks() {
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]

View file

@ -11,6 +11,9 @@ import (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
// ErrNotInitialized is returned when write-cache is initializing.
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
@ -18,15 +21,36 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
if m.NoMetabase() && !c.mode.NoMetabase() {
err := c.flush(true)
return c.setMode(m)
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(m mode.Mode) error {
var err error
turnOffMeta := m.NoMetabase()
if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(true)
if err != nil {
return err
}
}
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
fyrchik commented 2023-02-20 11:36:10 +00:00 (Migrated from github.com)
Review

The order is really important, can you mention that stopInitCh must be used only in initWG goroutines, to avoid races?

The order is really important, can you mention that `stopInitCh` must be used only in `initWG` goroutines, to avoid races?
carpawell commented 2023-02-28 20:05:11 +00:00 (Migrated from github.com)
Review

sure, added comments to that fields

sure, added comments to that fields
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil {
if err := c.db.Close(); err != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}
@ -39,12 +63,12 @@ func (c *cache) SetMode(m mode.Mode) error {
time.Sleep(time.Second)
}
if m.NoMetabase() {
if turnOffMeta {
c.mode = m
return nil
}
if err := c.openStore(m.ReadOnly()); err != nil {
if err = c.openStore(m.ReadOnly()); err != nil {
return err
}

View file

@ -16,11 +16,18 @@ var (
)
// Put puts object to write-cache.
//
// Returns ErrReadOnly if write-cache is in R/O mode.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
// Returns ErrBigObject if an objects exceeds maximum object size.
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))

View file

@ -76,6 +76,9 @@ func (c *cache) openStore(readOnly bool) error {
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil
}

View file

@ -10,6 +10,7 @@ import (
"github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -48,8 +49,11 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mode mode.Mode
modeMtx sync.RWMutex
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@ -57,7 +61,7 @@ type cache struct {
// flushCh is a channel with objects to flush.
flushCh chan *object.Object
// closeCh is close channel.
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
@ -89,8 +93,9 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
compressFlags: make(map[string]struct{}),
options: options{
@ -151,8 +156,11 @@ func (c *cache) Init() error {
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations.
if err := c.SetMode(mode.ReadOnly); err != nil {
if err := c.setMode(mode.ReadOnly); err != nil {
return err
fyrchik commented 2023-02-16 06:13:19 +00:00 (Migrated from github.com)
Review

Is this line necessary?

Is this line necessary?
carpawell commented 2023-02-17 14:00:13 +00:00 (Migrated from github.com)
Review

you mean the empty one?

you mean the empty one?
}
@ -164,6 +172,8 @@ func (c *cache) Close() error {
c.closeCh = nil
}
c.initialized.Store(false)
var err error
if c.db != nil {
err = c.db.Close()