[#1523] writecache: Simplify logic

1. Remove in-memory cache. It doesn't persist objects and if we want
   more speed, `NoSync` option can be used for the bolt DB.
2. Put to the metabase in a synchronous fashion. This considerably
   simplifies overall logic and plays nicely with the metabase bolt DB
   batch settings.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-07 15:52:40 +03:00 committed by fyrchik
parent 4176b2a1bc
commit ddaed283e9
9 changed files with 140 additions and 328 deletions

View file

@ -21,6 +21,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestDump(t *testing.T) {
@ -49,8 +50,11 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
[]writecache.Option{
writecache.WithSmallObjectSize(wcSmallObjectSize),
writecache.WithMaxObjectSize(wcBigObjectSize),
writecache.WithLogger(zaptest.NewLogger(t)),
},
nil)
[]blobstor.Option{
blobstor.WithLogger(zaptest.NewLogger(t)),
})
}
defer releaseShard(sh, t)

View file

@ -3,6 +3,7 @@ package shard
import (
"fmt"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-sdk-go/object"
@ -34,28 +35,33 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
return PutRes{}, ErrReadOnlyMode
}
data, err := prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
}
var putPrm common.PutPrm // form Put parameters
putPrm.Object = prm.obj
putPrm.RawData = data
putPrm.Address = objectCore.AddressOf(prm.obj)
var res common.PutRes
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
if s.hasWriteCache() {
err := s.writeCache.Put(prm.obj)
if err == nil {
return PutRes{}, nil
res, err = s.writeCache.Put(putPrm)
}
if err != nil || !s.hasWriteCache() {
if err != nil {
s.log.Debug("can't put object to the write-cache, trying blobstor",
zap.String("err", err.Error()))
}
s.log.Debug("can't put message to writeCache, trying to blobStor",
zap.String("err", err.Error()))
}
var (
err error
res common.PutRes
)
if res, err = s.blobStor.Put(putPrm); err != nil {
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
res, err = s.blobStor.Put(putPrm)
if err != nil {
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
}
}
if !m.NoMetabase() {

View file

@ -22,20 +22,6 @@ func (c *cache) Delete(addr oid.Address) error {
saddr := addr.EncodeToString()
// Check memory cache.
c.mtx.Lock()
for i := range c.mem {
if saddr == c.mem[i].addr {
c.curMemSize -= uint64(len(c.mem[i].data))
copy(c.mem[i:], c.mem[i+1:])
c.mem = c.mem[:len(c.mem)-1]
c.mtx.Unlock()
storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("in-mem DELETE"))
return nil
}
}
c.mtx.Unlock()
// Check disk cache.
var has int
_ = c.db.View(func(tx *bbolt.Tx) error {

View file

@ -1,7 +1,6 @@
package writecache
import (
"sync"
"time"
"github.com/mr-tron/base58"
@ -24,44 +23,45 @@ const (
defaultFlushInterval = time.Second
)
// flushLoop periodically flushes changes from the database to memory.
func (c *cache) flushLoop() {
var wg sync.WaitGroup
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.flushWorker(i)
}(i)
c.wg.Add(1)
go c.flushWorker(i)
}
wg.Add(1)
c.wg.Add(1)
go c.flushBigObjects()
c.wg.Add(1)
go func() {
defer wg.Done()
c.flushBigObjects()
}()
defer c.wg.Done()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flush()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
c.log.Debug("waiting for workers to quit")
wg.Wait()
return
for {
select {
case <-tt.C:
c.flush()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
return
}
}
}
}()
}
func (c *cache) flush() {
lastKey := []byte{}
var m []objectInfo
for {
select {
case <-c.closeCh:
return
default:
}
m = m[:0]
sz := 0
@ -122,6 +122,8 @@ func (c *cache) flush() {
}
func (c *cache) flushBigObjects() {
defer c.wg.Done()
tick := time.NewTicker(defaultFlushInterval * 10)
for {
select {
@ -181,6 +183,7 @@ func (c *cache) flushBigObjects() {
c.evictObjects(evictNum)
c.modeMtx.RUnlock()
case <-c.closeCh:
return
}
}
}
@ -188,63 +191,40 @@ func (c *cache) flushBigObjects() {
// flushWorker runs in a separate goroutine and write objects to the main storage.
// If flushFirst is true, flushing objects from cache database takes priority over
// putting new objects.
func (c *cache) flushWorker(num int) {
priorityCh := c.directCh
switch num % 3 {
case 0:
priorityCh = c.flushCh
case 1:
priorityCh = c.metaCh
}
func (c *cache) flushWorker(_ int) {
defer c.wg.Done()
var obj *object.Object
for {
metaOnly := false
// Give priority to direct put.
// TODO(fyrchik): #1150 do this once in N iterations depending on load
select {
case obj = <-priorityCh:
metaOnly = num%3 == 1
default:
select {
case obj = <-c.directCh:
case obj = <-c.flushCh:
case obj = <-c.metaCh:
metaOnly = true
case <-c.closeCh:
return
}
case obj = <-c.flushCh:
case <-c.closeCh:
return
}
err := c.writeObject(obj, metaOnly)
err := c.flushObject(obj)
if err != nil {
c.log.Error("can't flush object to the main storage", zap.Error(err))
}
}
}
// writeObject is used to write object directly to the main storage.
func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
var descriptor []byte
// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(obj *object.Object) error {
var prm common.PutPrm
prm.Object = obj
if !metaOnly {
var prm common.PutPrm
prm.Object = obj
res, err := c.blobstor.Put(prm)
if err != nil {
return err
}
descriptor = res.StorageID
res, err := c.blobstor.Put(prm)
if err != nil {
return err
}
var pPrm meta.PutPrm
pPrm.SetObject(obj)
pPrm.SetStorageID(descriptor)
pPrm.SetStorageID(res.StorageID)
_, err := c.metabase.Put(pPrm)
_, err = c.metabase.Put(pPrm)
return err
}

View file

@ -14,20 +14,6 @@ import (
func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
c.mtx.RLock()
for i := range c.mem {
if saddr == c.mem[i].addr {
data := c.mem[i].data
c.mtx.RUnlock()
// We unmarshal object instead of using cached value to avoid possibility
// of unintentional object corruption by caller.
// It is safe to unmarshal without mutex, as storage under `c.mem[i].data` slices is not reused.
obj := objectSDK.New()
return obj, obj.Unmarshal(data)
}
}
c.mtx.RUnlock()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()

View file

@ -23,13 +23,6 @@ func (c *cache) SetMode(m mode.Mode) error {
return nil
}
if !c.readOnly() {
// Because modeMtx is taken no new objects will arrive an all other modifying
// operations are completed.
// 1. Persist objects already in memory on disk.
c.persistMemoryCache()
}
if c.db != nil {
if err := c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
@ -37,13 +30,10 @@ func (c *cache) SetMode(m mode.Mode) error {
c.db = nil
}
// 2. Suspend producers to ensure there are channel send operations in fly.
// metaCh and directCh can be populated either during Put or in background memory persist thread.
// Former possibility is eliminated by taking `modeMtx` mutex and
// latter by explicit persist in the previous step.
// flushCh is populated by `flush` with `modeMtx` is also taken.
// Thus all producers are shutdown and we only need to wait until all channels are empty.
for len(c.metaCh) != 0 || len(c.directCh) != 0 || len(c.flushCh) != 0 {
// Suspend producers to ensure there are channel send operations in fly.
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations.
for len(c.flushCh) != 0 {
c.log.Info("waiting for channels to flush")
time.Sleep(time.Second)
}

View file

@ -1,153 +0,0 @@
package writecache
import (
"sort"
"time"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
const defaultPersistInterval = time.Second
// persistLoop persists object accumulated in memory to the database.
func (c *cache) persistLoop() {
tick := time.NewTicker(defaultPersistInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
continue
}
c.persistMemoryCache()
c.modeMtx.RUnlock()
case <-c.closeCh:
return
}
}
}
func (c *cache) persistMemoryCache() {
c.mtx.RLock()
m := c.mem
c.mtx.RUnlock()
if len(m) == 0 {
return
}
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
start := time.Now()
c.persistSmallObjects(m)
c.log.Debug("persisted items to disk",
zap.Duration("took", time.Since(start)),
zap.Int("total", len(m)))
for i := range m {
storagelog.Write(c.log,
storagelog.AddressField(m[i].addr),
storagelog.OpField("in-mem DELETE persist"),
)
}
c.mtx.Lock()
c.curMemSize = 0
n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n]
for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data))
}
c.mtx.Unlock()
}
// persistSmallObjects persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) persistSmallObjects(objs []objectInfo) {
cacheSize := c.estimateCacheSize()
overflowIndex := len(objs)
for i := range objs {
newSize := c.incSizeDB(cacheSize)
if c.maxCacheSize < newSize {
overflowIndex = i
break
}
cacheSize = newSize
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for i := 0; i < overflowIndex; i++ {
err := b.Put([]byte(objs[i].addr), objs[i].data)
if err != nil {
return err
}
}
return nil
})
if err != nil {
overflowIndex = 0
} else {
c.evictObjects(len(objs) - overflowIndex)
}
for i := 0; i < overflowIndex; i++ {
storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT"))
c.objCounters.IncDB()
}
for i := overflowIndex; i < len(objs); i++ {
c.flushed.Add(objs[i].addr, true)
}
c.addToFlushQueue(objs, overflowIndex)
}
// persistBigObject writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) persistBigObject(objInfo objectInfo) {
cacheSz := c.estimateCacheSize()
metaIndex := 0
if c.incSizeFS(cacheSz) <= c.maxCacheSize {
var prm common.PutPrm
prm.Address = object.AddressOf(objInfo.obj)
prm.RawData = objInfo.data
_, err := c.fsTree.Put(prm)
if err == nil {
metaIndex = 1
if c.blobstor.NeedsCompression(objInfo.obj) {
c.mtx.Lock()
c.compressFlags[objInfo.addr] = struct{}{}
c.mtx.Unlock()
}
c.objCounters.IncFS()
storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT"))
}
}
c.addToFlushQueue([]objectInfo{objInfo}, metaIndex)
}
// addToFlushQueue pushes objects to the flush workers queue.
// For objects below metaIndex only meta information will be flushed.
func (c *cache) addToFlushQueue(objs []objectInfo, metaIndex int) {
for i := 0; i < metaIndex; i++ {
select {
case c.metaCh <- objs[i].obj:
case <-c.closeCh:
return
}
}
for i := metaIndex; i < len(objs); i++ {
select {
case c.directCh <- objs[i].obj:
case <-c.closeCh:
return
}
}
}

View file

@ -3,57 +3,80 @@ package writecache
import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.etcd.io/bbolt"
)
// ErrBigObject is returned when object is too big to be placed in cache.
var ErrBigObject = errors.New("too big object")
var (
// ErrBigObject is returned when object is too big to be placed in cache.
ErrBigObject = errors.New("too big object")
// ErrOutOfSpace is returned when there is no space left to put a new object.
ErrOutOfSpace = errors.New("no space left in the write cache")
)
// Put puts object to write-cache.
func (c *cache) Put(o *objectSDK.Object) error {
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
return ErrReadOnly
return common.PutRes{}, ErrReadOnly
}
sz := uint64(o.ToV2().StableSize())
sz := uint64(len(prm.RawData))
if sz > c.maxObjectSize {
return ErrBigObject
return common.PutRes{}, ErrBigObject
}
data, err := o.Marshal()
oi := objectInfo{
addr: prm.Address.EncodeToString(),
obj: prm.Object,
data: prm.RawData,
}
if sz <= c.smallObjectSize {
return common.PutRes{}, c.putSmall(oi)
}
return common.PutRes{}, c.putBig(oi.addr, prm)
}
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte(obj.addr), obj.data)
})
if err == nil {
storagelog.Write(c.log, storagelog.AddressField(obj.addr), storagelog.OpField("db PUT"))
c.objCounters.IncDB()
}
return nil
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(addr string, prm common.PutPrm) error {
cacheSz := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeFS(cacheSz) {
return ErrOutOfSpace
}
_, err := c.fsTree.Put(prm)
if err != nil {
return err
}
oi := objectInfo{
addr: object.AddressOf(o).EncodeToString(),
obj: o,
data: data,
}
c.mtx.Lock()
if sz <= c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize {
c.curMemSize += sz
c.mem = append(c.mem, oi)
if c.blobstor.NeedsCompression(prm.Object) {
c.mtx.Lock()
c.compressFlags[addr] = struct{}{}
c.mtx.Unlock()
storagelog.Write(c.log, storagelog.AddressField(oi.addr), storagelog.OpField("in-mem PUT"))
return nil
}
c.mtx.Unlock()
if sz <= c.smallObjectSize {
c.persistSmallObjects([]objectInfo{oi})
} else {
c.persistBigObject(oi)
}
c.objCounters.IncFS()
storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.OpField("fstree PUT"))
return nil
}

View file

@ -3,6 +3,7 @@ package writecache
import (
"sync"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-sdk-go/object"
@ -23,7 +24,7 @@ type Cache interface {
Head(oid.Address) (*object.Object, error)
Delete(oid.Address) error
Iterate(IterationPrm) error
Put(*object.Object) error
Put(common.PutPrm) (common.PutRes, error)
SetMode(mode.Mode) error
SetLogger(*zap.Logger)
DumpInfo() Info
@ -36,9 +37,8 @@ type Cache interface {
type cache struct {
options
// mtx protects mem field, statistics, counters and compressFlags.
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mem []objectInfo
mode mode.Mode
modeMtx sync.RWMutex
@ -47,19 +47,12 @@ type cache struct {
// whether object should be compressed.
compressFlags map[string]struct{}
// curMemSize is the current size of all objects cached in memory.
curMemSize uint64
// flushCh is a channel with objects to flush.
flushCh chan *object.Object
// directCh is a channel with objects to put directly to the main storage.
// it is prioritized over flushCh.
directCh chan *object.Object
// metaCh is a channel with objects for which only metadata needs to be written.
metaCh chan *object.Object
// closeCh is close channel.
closeCh chan struct{}
evictCh chan []byte
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system.
@ -86,12 +79,9 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
directCh: make(chan *object.Object),
metaCh: make(chan *object.Object),
closeCh: make(chan struct{}),
evictCh: make(chan []byte),
mode: mode.ReadWrite,
flushCh: make(chan *object.Object),
closeCh: make(chan struct{}),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
@ -144,9 +134,7 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
c.initFlushMarks()
go c.persistLoop()
go c.flushLoop()
c.runFlushLoop()
return nil
}
@ -158,6 +146,8 @@ func (c *cache) Close() error {
}
close(c.closeCh)
c.wg.Wait()
if c.objCounters != nil {
c.objCounters.FlushAndClose()
}