forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
master
...
carpawell/
Author | SHA1 | Date | |
---|---|---|---|
|
bec577b6e9 | ||
|
b52fada6da |
10 changed files with 137 additions and 22 deletions
|
@ -23,6 +23,7 @@ Changelog for FrostFS Node
|
|||
- Storage engine now can start even when some shard components are unavailable (#2238)
|
||||
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
|
||||
- Expired locked object is available for reading (#56)
|
||||
- Initialize write-cache asynchronously (#32)
|
||||
|
||||
### Fixed
|
||||
- Increase payload size metric on shards' `put` operation (#1794)
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
containerApi "github.com/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
|
@ -218,12 +217,10 @@ func parseAttributes(dst *container.Container, attributes []string) error {
|
|||
container.SetName(dst, containerName)
|
||||
}
|
||||
|
||||
if containerNnsName != "" {
|
||||
dst.SetAttribute(containerApi.SysAttributeName, containerNnsName)
|
||||
}
|
||||
if containerNnsZone != "" {
|
||||
dst.SetAttribute(containerApi.SysAttributeZone, containerNnsZone)
|
||||
}
|
||||
var domain container.Domain
|
||||
domain.SetName(containerNnsName)
|
||||
domain.SetZone(containerNnsZone)
|
||||
container.WriteDomain(dst, domain)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
|
|||
m = m[:0]
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() {
|
||||
if c.readOnly() || !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
|
@ -151,6 +151,9 @@ func (c *cache) flushBigObjects() {
|
|||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
} else if !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
|
||||
_ = c.flushFSTree(true)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -64,7 +65,7 @@ func TestFlush(t *testing.T) {
|
|||
WithBlobstor(bs),
|
||||
}, opts...)...)
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Init())
|
||||
initWC(t, wc)
|
||||
|
||||
// First set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
|
@ -262,7 +263,7 @@ func TestFlush(t *testing.T) {
|
|||
|
||||
// Open in read-only: no error, nothing is removed.
|
||||
require.NoError(t, wc.Open(true))
|
||||
require.NoError(t, wc.Init())
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(objects[i].addr)
|
||||
require.NoError(t, err, i)
|
||||
|
@ -271,7 +272,7 @@ func TestFlush(t *testing.T) {
|
|||
|
||||
// Open in read-write: no error, something is removed.
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Init())
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(objects[i].addr)
|
||||
if i < 2 {
|
||||
|
@ -316,6 +317,15 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
|||
return obj, data
|
||||
}
|
||||
|
||||
func initWC(t *testing.T, wc Cache) {
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
rawWc := wc.(*cache)
|
||||
return rawWc.initialized.Load()
|
||||
}, 100*time.Second, 1*time.Millisecond)
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
||||
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||
|
|
|
@ -2,6 +2,7 @@ package writecache
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
|
@ -13,10 +14,57 @@ import (
|
|||
)
|
||||
|
||||
func (c *cache) initFlushMarks() {
|
||||
var localWG sync.WaitGroup
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.fsTreeFlushMarkUpdate()
|
||||
}()
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.dbFlushMarkUpdate()
|
||||
}()
|
||||
|
||||
c.initWG.Add(1)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer c.initWG.Done()
|
||||
|
||||
localWG.Wait()
|
||||
|
||||
select {
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
case <-c.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.initialized.Store(true)
|
||||
}()
|
||||
}
|
||||
|
||||
var errStopIter = errors.New("stop iteration")
|
||||
|
||||
func (c *cache) fsTreeFlushMarkUpdate() {
|
||||
c.log.Info("filling flush marks for objects in FSTree")
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return errStopIter
|
||||
case <-c.stopInitCh:
|
||||
return errStopIter
|
||||
default:
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
|
@ -37,7 +85,10 @@ func (c *cache) initFlushMarks() {
|
|||
return nil
|
||||
}
|
||||
_, _ = c.fsTree.Iterate(prm)
|
||||
c.log.Info("finished updating FSTree flush marks")
|
||||
}
|
||||
|
||||
func (c *cache) dbFlushMarkUpdate() {
|
||||
c.log.Info("filling flush marks for objects in database")
|
||||
|
||||
var m []string
|
||||
|
@ -45,6 +96,14 @@ func (c *cache) initFlushMarks() {
|
|||
var lastKey []byte
|
||||
var batchSize = flushBatchSize
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
indices = indices[:0]
|
||||
|
||||
|
|
|
@ -11,6 +11,9 @@ import (
|
|||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
||||
|
||||
// ErrNotInitialized is returned when write-cache is initializing.
|
||||
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
|
||||
|
||||
// SetMode sets write-cache mode of operation.
|
||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||
// and all background jobs are suspended.
|
||||
|
@ -18,15 +21,36 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
if m.NoMetabase() && !c.mode.NoMetabase() {
|
||||
err := c.flush(true)
|
||||
return c.setMode(m)
|
||||
}
|
||||
|
||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||
func (c *cache) setMode(m mode.Mode) error {
|
||||
var err error
|
||||
turnOffMeta := m.NoMetabase()
|
||||
|
||||
if turnOffMeta && !c.mode.NoMetabase() {
|
||||
err = c.flush(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !c.initialized.Load() {
|
||||
close(c.stopInitCh)
|
||||
|
||||
c.initWG.Wait()
|
||||
c.stopInitCh = make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
if err == nil && !turnOffMeta {
|
||||
c.initFlushMarks()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if c.db != nil {
|
||||
if err := c.db.Close(); err != nil {
|
||||
if err = c.db.Close(); err != nil {
|
||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -39,12 +63,12 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if m.NoMetabase() {
|
||||
if turnOffMeta {
|
||||
c.mode = m
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := c.openStore(m.ReadOnly()); err != nil {
|
||||
if err = c.openStore(m.ReadOnly()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,18 @@ var (
|
|||
)
|
||||
|
||||
// Put puts object to write-cache.
|
||||
//
|
||||
// Returns ErrReadOnly if write-cache is in R/O mode.
|
||||
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
||||
// Returns ErrBigObject if an objects exceeds maximum object size.
|
||||
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return common.PutRes{}, ErrReadOnly
|
||||
} else if !c.initialized.Load() {
|
||||
return common.PutRes{}, ErrNotInitialized
|
||||
}
|
||||
|
||||
sz := uint64(len(prm.RawData))
|
||||
|
|
|
@ -76,6 +76,9 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
if c.flushed == nil {
|
||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -48,8 +49,11 @@ type cache struct {
|
|||
// mtx protects statistics, counters and compressFlags.
|
||||
mtx sync.RWMutex
|
||||
|
||||
mode mode.Mode
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
initialized atomic.Bool
|
||||
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
||||
initWG sync.WaitGroup // for initialisation routines only
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
// whether object should be compressed.
|
||||
|
@ -57,7 +61,7 @@ type cache struct {
|
|||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan *object.Object
|
||||
// closeCh is close channel.
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
|
@ -89,8 +93,9 @@ var (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
stopInitCh: make(chan struct{}),
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
|
@ -151,8 +156,11 @@ func (c *cache) Init() error {
|
|||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
// Finish all in-progress operations.
|
||||
if err := c.SetMode(mode.ReadOnly); err != nil {
|
||||
if err := c.setMode(mode.ReadOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -164,6 +172,8 @@ func (c *cache) Close() error {
|
|||
c.closeCh = nil
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
Loading…
Reference in a new issue