frostfs-node/pkg/local_object_storage/writecache/flush.go
Evgenii Stratonikov 73f8bb3e5f [#1523] shard: Store generic storage ID in metabase
Allow to extend blobstor with more storage sub-systems. Currently
objects stored in the FSTree have empty byte slice descriptor and object
from blobovnicza tree have the same id as earlier. Each such change in
the identifier formation should be accompanied with metabase version
increase.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
2022-08-22 13:14:19 +03:00

256 lines
5.2 KiB
Go

package writecache
import (
"sync"
"time"
"github.com/mr-tron/base58"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second
)
// flushLoop periodically flushes changes from the database to memory.
func (c *cache) flushLoop() {
var wg sync.WaitGroup
for i := 0; i < c.workersCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.flushWorker(i)
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
c.flushBigObjects()
}()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flush()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
c.log.Debug("waiting for workers to quit")
wg.Wait()
return
}
}
}
func (c *cache) flush() {
lastKey := []byte{}
var m []objectInfo
for {
m = m[:0]
sz := 0
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, v := cs.Seek(lastKey); k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if _, ok := c.flushed.Peek(string(k)); ok {
continue
}
sz += len(k) + len(v)
m = append(m, objectInfo{
addr: string(k),
data: cloneBytes(v),
})
}
return nil
})
for i := range m {
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
select {
case c.flushCh <- obj:
case <-c.closeCh:
c.modeMtx.RUnlock()
return
}
}
if len(m) == 0 {
c.modeMtx.RUnlock()
break
}
c.evictObjects(len(m))
for i := range m {
c.flushed.Add(m[i].addr, true)
}
c.modeMtx.RUnlock()
c.log.Debug("flushed items from write-cache",
zap.Int("count", len(m)),
zap.String("start", base58.Encode(lastKey)))
}
}
func (c *cache) flushBigObjects() {
tick := time.NewTicker(defaultFlushInterval * 10)
for {
select {
case <-tick.C:
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
break
}
evictNum := 0
var prm fstree.IterationPrm
prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.log.Error("can't read a file", zap.Stringer("address", addr))
return nil
}
c.mtx.Lock()
_, compress := c.compressFlags[sAddr]
c.mtx.Unlock()
var prm common.PutPrm
prm.Address = addr
prm.RawData = data
if _, err := c.blobstor.PutRaw(common.PutPrm{Address: addr, RawData: data}, compress); err != nil {
c.log.Error("cant flush object to blobstor", zap.Error(err))
return nil
}
if compress {
c.mtx.Lock()
delete(c.compressFlags, sAddr)
c.mtx.Unlock()
}
// mark object as flushed
c.store.flushed.Add(sAddr, false)
evictNum++
return nil
})
_ = c.fsTree.Iterate(prm)
// evict objects which were successfully written to BlobStor
c.evictObjects(evictNum)
c.modeMtx.RUnlock()
case <-c.closeCh:
}
}
}
// flushWorker runs in a separate goroutine and write objects to the main storage.
// If flushFirst is true, flushing objects from cache database takes priority over
// putting new objects.
func (c *cache) flushWorker(num int) {
priorityCh := c.directCh
switch num % 3 {
case 0:
priorityCh = c.flushCh
case 1:
priorityCh = c.metaCh
}
var obj *object.Object
for {
metaOnly := false
// Give priority to direct put.
// TODO(fyrchik): #1150 do this once in N iterations depending on load
select {
case obj = <-priorityCh:
metaOnly = num%3 == 1
default:
select {
case obj = <-c.directCh:
case obj = <-c.flushCh:
case obj = <-c.metaCh:
metaOnly = true
case <-c.closeCh:
return
}
}
err := c.writeObject(obj, metaOnly)
if err != nil {
c.log.Error("can't flush object to the main storage", zap.Error(err))
}
}
}
// writeObject is used to write object directly to the main storage.
func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
var descriptor []byte
if !metaOnly {
var prm common.PutPrm
prm.Object = obj
res, err := c.blobstor.Put(prm)
if err != nil {
return err
}
descriptor = res.StorageID
}
var pPrm meta.PutPrm
pPrm.SetObject(obj)
pPrm.SetStorageID(descriptor)
_, err := c.metabase.Put(pPrm)
return err
}
func cloneBytes(a []byte) []byte {
b := make([]byte, len(a))
copy(b, a)
return b
}