Init write-cache asynchronously #35
9 changed files with 133 additions and 15 deletions
|
@ -23,6 +23,7 @@ Changelog for FrostFS Node
|
||||||
- Storage engine now can start even when some shard components are unavailable (#2238)
|
- Storage engine now can start even when some shard components are unavailable (#2238)
|
||||||
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
|
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
|
||||||
- Expired locked object is available for reading (#56)
|
- Expired locked object is available for reading (#56)
|
||||||
|
- Initialize write-cache asynchronously (#32)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- Increase payload size metric on shards' `put` operation (#1794)
|
- Increase payload size metric on shards' `put` operation (#1794)
|
||||||
|
|
|
@ -106,6 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(false))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(m))
|
||||||
|
require.NoError(t, s.Close())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
|
||||||
m = m[:0]
|
m = m[:0]
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
if c.readOnly() {
|
if c.readOnly() || !c.initialized.Load() {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
|
@ -151,6 +151,9 @@ func (c *cache) flushBigObjects() {
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
break
|
break
|
||||||
|
} else if !c.initialized.Load() {
|
||||||
|
c.modeMtx.RUnlock()
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = c.flushFSTree(true)
|
_ = c.flushFSTree(true)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -64,7 +65,7 @@ func TestFlush(t *testing.T) {
|
||||||
WithBlobstor(bs),
|
WithBlobstor(bs),
|
||||||
}, opts...)...)
|
}, opts...)...)
|
||||||
require.NoError(t, wc.Open(false))
|
require.NoError(t, wc.Open(false))
|
||||||
require.NoError(t, wc.Init())
|
initWC(t, wc)
|
||||||
|
|
||||||
// First set mode for metabase and blobstor to prevent background flushes.
|
// First set mode for metabase and blobstor to prevent background flushes.
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||||
|
@ -262,7 +263,7 @@ func TestFlush(t *testing.T) {
|
||||||
|
|
||||||
// Open in read-only: no error, nothing is removed.
|
// Open in read-only: no error, nothing is removed.
|
||||||
require.NoError(t, wc.Open(true))
|
require.NoError(t, wc.Open(true))
|
||||||
require.NoError(t, wc.Init())
|
initWC(t, wc)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
_, err := wc.Get(objects[i].addr)
|
_, err := wc.Get(objects[i].addr)
|
||||||
require.NoError(t, err, i)
|
require.NoError(t, err, i)
|
||||||
|
@ -271,7 +272,7 @@ func TestFlush(t *testing.T) {
|
||||||
|
|
||||||
// Open in read-write: no error, something is removed.
|
// Open in read-write: no error, something is removed.
|
||||||
require.NoError(t, wc.Open(false))
|
require.NoError(t, wc.Open(false))
|
||||||
require.NoError(t, wc.Init())
|
initWC(t, wc)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
_, err := wc.Get(objects[i].addr)
|
_, err := wc.Get(objects[i].addr)
|
||||||
if i < 2 {
|
if i < 2 {
|
||||||
|
@ -316,6 +317,15 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
||||||
return obj, data
|
return obj, data
|
||||||
it was not but because of another reason, fixed races it was not but because of another reason, fixed races
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
func initWC(t *testing.T, wc Cache) {
|
||||||
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
rawWc := wc.(*cache)
|
||||||
|
return rawWc.initialized.Load()
|
||||||
|
}, 100*time.Second, 1*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
type dummyEpoch struct{}
|
type dummyEpoch struct{}
|
||||||
|
|
||||||
func (dummyEpoch) CurrentEpoch() uint64 {
|
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
storagelog "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
|
@ -13,10 +14,57 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) initFlushMarks() {
|
func (c *cache) initFlushMarks() {
|
||||||
|
var localWG sync.WaitGroup
|
||||||
|
|
||||||
|
localWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer localWG.Done()
|
||||||
|
|
||||||
|
c.fsTreeFlushMarkUpdate()
|
||||||
|
}()
|
||||||
|
|
||||||
|
localWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer localWG.Done()
|
||||||
|
|
||||||
|
c.dbFlushMarkUpdate()
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.initWG.Add(1)
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
defer c.initWG.Done()
|
||||||
|
|
||||||
|
localWG.Wait()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.stopInitCh:
|
||||||
I think this should be added in I think this should be added in `c.wg` too, no?
hw, yeah, would be more reliable, added hw, yeah, would be more reliable, added
|
|||||||
|
return
|
||||||
|
case <-c.closeCh:
|
||||||
If we use If we use `c.wg` here, do we need to use it in 2 previous goroutines?
|
|||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
c.initialized.Store(true)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var errStopIter = errors.New("stop iteration")
|
||||||
|
|
||||||
|
func (c *cache) fsTreeFlushMarkUpdate() {
|
||||||
c.log.Info("filling flush marks for objects in FSTree")
|
c.log.Info("filling flush marks for objects in FSTree")
|
||||||
|
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||||
|
select {
|
||||||
|
case <-c.closeCh:
|
||||||
|
return errStopIter
|
||||||
|
case <-c.stopInitCh:
|
||||||
|
return errStopIter
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
flushed, needRemove := c.flushStatus(addr)
|
flushed, needRemove := c.flushStatus(addr)
|
||||||
if flushed {
|
if flushed {
|
||||||
It is enough for It is enough for `setMode` to work too?
was not, fixed was not, fixed
|
|||||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||||
|
@ -37,7 +85,10 @@ func (c *cache) initFlushMarks() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, _ = c.fsTree.Iterate(prm)
|
_, _ = c.fsTree.Iterate(prm)
|
||||||
|
c.log.Info("finished updating FSTree flush marks")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) dbFlushMarkUpdate() {
|
||||||
c.log.Info("filling flush marks for objects in database")
|
c.log.Info("filling flush marks for objects in database")
|
||||||
|
|
||||||
var m []string
|
var m []string
|
||||||
|
@ -45,6 +96,14 @@ func (c *cache) initFlushMarks() {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
var batchSize = flushBatchSize
|
var batchSize = flushBatchSize
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.closeCh:
|
||||||
|
return
|
||||||
|
case <-c.stopInitCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
m = m[:0]
|
m = m[:0]
|
||||||
indices = indices[:0]
|
indices = indices[:0]
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,9 @@ import (
|
||||||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||||
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
||||||
|
|
||||||
|
// ErrNotInitialized is returned when write-cache is initializing.
|
||||||
|
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
|
||||||
|
|
||||||
// SetMode sets write-cache mode of operation.
|
// SetMode sets write-cache mode of operation.
|
||||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||||
// and all background jobs are suspended.
|
// and all background jobs are suspended.
|
||||||
|
@ -18,15 +21,36 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
if m.NoMetabase() && !c.mode.NoMetabase() {
|
return c.setMode(m)
|
||||||
err := c.flush(true)
|
}
|
||||||
|
|
||||||
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
|
func (c *cache) setMode(m mode.Mode) error {
|
||||||
|
var err error
|
||||||
|
turnOffMeta := m.NoMetabase()
|
||||||
|
|
||||||
|
if turnOffMeta && !c.mode.NoMetabase() {
|
||||||
|
err = c.flush(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !c.initialized.Load() {
|
||||||
|
close(c.stopInitCh)
|
||||||
|
|
||||||
|
c.initWG.Wait()
|
||||||
|
c.stopInitCh = make(chan struct{})
|
||||||
|
|
||||||
|
defer func() {
|
||||||
The order is really important, can you mention that The order is really important, can you mention that `stopInitCh` must be used only in `initWG` goroutines, to avoid races?
sure, added comments to that fields sure, added comments to that fields
|
|||||||
|
if err == nil && !turnOffMeta {
|
||||||
|
c.initFlushMarks()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
if err := c.db.Close(); err != nil {
|
if err = c.db.Close(); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,12 +63,12 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.NoMetabase() {
|
if turnOffMeta {
|
||||||
c.mode = m
|
c.mode = m
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.openStore(m.ReadOnly()); err != nil {
|
if err = c.openStore(m.ReadOnly()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,11 +16,18 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put puts object to write-cache.
|
// Put puts object to write-cache.
|
||||||
|
//
|
||||||
|
// Returns ErrReadOnly if write-cache is in R/O mode.
|
||||||
|
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||||
|
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
||||||
|
// Returns ErrBigObject if an objects exceeds maximum object size.
|
||||||
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
|
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return common.PutRes{}, ErrReadOnly
|
return common.PutRes{}, ErrReadOnly
|
||||||
|
} else if !c.initialized.Load() {
|
||||||
|
return common.PutRes{}, ErrNotInitialized
|
||||||
}
|
}
|
||||||
|
|
||||||
sz := uint64(len(prm.RawData))
|
sz := uint64(len(prm.RawData))
|
||||||
|
|
|
@ -76,6 +76,9 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
if c.flushed == nil {
|
if c.flushed == nil {
|
||||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.initialized.Store(false)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -48,8 +49,11 @@ type cache struct {
|
||||||
// mtx protects statistics, counters and compressFlags.
|
// mtx protects statistics, counters and compressFlags.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
mode mode.Mode
|
mode mode.Mode
|
||||||
modeMtx sync.RWMutex
|
initialized atomic.Bool
|
||||||
|
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
||||||
|
initWG sync.WaitGroup // for initialisation routines only
|
||||||
|
modeMtx sync.RWMutex
|
||||||
|
|
||||||
// compressFlags maps address of a big object to boolean value indicating
|
// compressFlags maps address of a big object to boolean value indicating
|
||||||
// whether object should be compressed.
|
// whether object should be compressed.
|
||||||
|
@ -57,7 +61,7 @@ type cache struct {
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan *object.Object
|
flushCh chan *object.Object
|
||||||
// closeCh is close channel.
|
// closeCh is close channel, protected by modeMtx.
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -89,8 +93,9 @@ var (
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) Cache {
|
func New(opts ...Option) Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
flushCh: make(chan *object.Object),
|
flushCh: make(chan *object.Object),
|
||||||
mode: mode.ReadWrite,
|
mode: mode.ReadWrite,
|
||||||
|
stopInitCh: make(chan struct{}),
|
||||||
|
|
||||||
compressFlags: make(map[string]struct{}),
|
compressFlags: make(map[string]struct{}),
|
||||||
options: options{
|
options: options{
|
||||||
|
@ -151,8 +156,11 @@ func (c *cache) Init() error {
|
||||||
|
|
||||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||||
func (c *cache) Close() error {
|
func (c *cache) Close() error {
|
||||||
|
c.modeMtx.Lock()
|
||||||
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
// Finish all in-progress operations.
|
// Finish all in-progress operations.
|
||||||
if err := c.SetMode(mode.ReadOnly); err != nil {
|
if err := c.setMode(mode.ReadOnly); err != nil {
|
||||||
return err
|
return err
|
||||||
Is this line necessary? Is this line necessary?
you mean the empty one? you mean the empty one?
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +172,8 @@ func (c *cache) Close() error {
|
||||||
c.closeCh = nil
|
c.closeCh = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.initialized.Store(false)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close()
|
err = c.db.Close()
|
||||||
|
|
Loading…
Reference in a new issue
Is it enough with
-race
flag?The tests are in the
writecache
package, can we just directly check forinitialized
flag?