forked from TrueCloudLab/frostfs-node
f58234aa2f
Reduce public interface of this package. Later each result will contain an additional status, so it makes more sense to use the same functions and result processing everywhere. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
253 lines
5.2 KiB
Go
253 lines
5.2 KiB
Go
package writecache
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mr-tron/base58"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
// flushBatchSize is amount of keys which will be read from cache to be flushed
|
|
// to the main storage. It is used to reduce contention between cache put
|
|
// and cache persist.
|
|
flushBatchSize = 512
|
|
// defaultFlushWorkersCount is number of workers for putting objects in main storage.
|
|
defaultFlushWorkersCount = 20
|
|
// defaultFlushInterval is default time interval between successive flushes.
|
|
defaultFlushInterval = time.Second
|
|
)
|
|
|
|
// flushLoop periodically flushes changes from the database to memory.
|
|
func (c *cache) flushLoop() {
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < c.workersCount; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
c.flushWorker(i)
|
|
}(i)
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
c.flushBigObjects()
|
|
}()
|
|
|
|
tt := time.NewTimer(defaultFlushInterval)
|
|
defer tt.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-tt.C:
|
|
c.flush()
|
|
tt.Reset(defaultFlushInterval)
|
|
case <-c.closeCh:
|
|
c.log.Debug("waiting for workers to quit")
|
|
wg.Wait()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *cache) flush() {
|
|
lastKey := []byte{}
|
|
var m []objectInfo
|
|
for {
|
|
m = m[:0]
|
|
sz := 0
|
|
|
|
c.modeMtx.RLock()
|
|
if c.readOnly() {
|
|
c.modeMtx.RUnlock()
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket(defaultBucket)
|
|
cs := b.Cursor()
|
|
for k, v := cs.Seek(lastKey); k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
|
|
if _, ok := c.flushed.Peek(string(k)); ok {
|
|
continue
|
|
}
|
|
|
|
sz += len(k) + len(v)
|
|
m = append(m, objectInfo{
|
|
addr: string(k),
|
|
data: cloneBytes(v),
|
|
})
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for i := range m {
|
|
obj := object.New()
|
|
if err := obj.Unmarshal(m[i].data); err != nil {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case c.flushCh <- obj:
|
|
case <-c.closeCh:
|
|
c.modeMtx.RUnlock()
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(m) == 0 {
|
|
c.modeMtx.RUnlock()
|
|
break
|
|
}
|
|
|
|
c.evictObjects(len(m))
|
|
for i := range m {
|
|
c.flushed.Add(m[i].addr, true)
|
|
}
|
|
c.modeMtx.RUnlock()
|
|
|
|
c.log.Debug("flushed items from write-cache",
|
|
zap.Int("count", len(m)),
|
|
zap.String("start", base58.Encode(lastKey)))
|
|
}
|
|
}
|
|
|
|
func (c *cache) flushBigObjects() {
|
|
tick := time.NewTicker(defaultFlushInterval * 10)
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
c.modeMtx.RLock()
|
|
if c.readOnly() {
|
|
c.modeMtx.RUnlock()
|
|
break
|
|
}
|
|
|
|
evictNum := 0
|
|
|
|
var prm fstree.IterationPrm
|
|
prm.WithLazyHandler(func(addr oid.Address, f func() ([]byte, error)) error {
|
|
sAddr := addr.EncodeToString()
|
|
|
|
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
|
return nil
|
|
}
|
|
|
|
data, err := f()
|
|
if err != nil {
|
|
c.log.Error("can't read a file", zap.Stringer("address", addr))
|
|
return nil
|
|
}
|
|
|
|
c.mtx.Lock()
|
|
_, compress := c.compressFlags[sAddr]
|
|
c.mtx.Unlock()
|
|
|
|
if _, err := c.blobstor.PutRaw(addr, data, compress); err != nil {
|
|
c.log.Error("cant flush object to blobstor", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
if compress {
|
|
c.mtx.Lock()
|
|
delete(c.compressFlags, sAddr)
|
|
c.mtx.Unlock()
|
|
}
|
|
|
|
// mark object as flushed
|
|
c.store.flushed.Add(sAddr, false)
|
|
|
|
evictNum++
|
|
|
|
return nil
|
|
})
|
|
|
|
_ = c.fsTree.Iterate(prm)
|
|
|
|
// evict objects which were successfully written to BlobStor
|
|
c.evictObjects(evictNum)
|
|
c.modeMtx.RUnlock()
|
|
case <-c.closeCh:
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushWorker runs in a separate goroutine and write objects to the main storage.
|
|
// If flushFirst is true, flushing objects from cache database takes priority over
|
|
// putting new objects.
|
|
func (c *cache) flushWorker(num int) {
|
|
priorityCh := c.directCh
|
|
switch num % 3 {
|
|
case 0:
|
|
priorityCh = c.flushCh
|
|
case 1:
|
|
priorityCh = c.metaCh
|
|
}
|
|
|
|
var obj *object.Object
|
|
for {
|
|
metaOnly := false
|
|
|
|
// Give priority to direct put.
|
|
// TODO(fyrchik): #1150 do this once in N iterations depending on load
|
|
select {
|
|
case obj = <-priorityCh:
|
|
metaOnly = num%3 == 1
|
|
default:
|
|
select {
|
|
case obj = <-c.directCh:
|
|
case obj = <-c.flushCh:
|
|
case obj = <-c.metaCh:
|
|
metaOnly = true
|
|
case <-c.closeCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
err := c.writeObject(obj, metaOnly)
|
|
if err != nil {
|
|
c.log.Error("can't flush object to the main storage", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// writeObject is used to write object directly to the main storage.
|
|
func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
|
|
var id *blobovnicza.ID
|
|
|
|
if !metaOnly {
|
|
var prm blobstor.PutPrm
|
|
prm.SetObject(obj)
|
|
|
|
res, err := c.blobstor.Put(prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
id = res.BlobovniczaID()
|
|
}
|
|
|
|
var pPrm meta.PutPrm
|
|
pPrm.WithObject(obj)
|
|
pPrm.WithBlobovniczaID(id)
|
|
|
|
_, err := c.metabase.Put(pPrm)
|
|
return err
|
|
}
|
|
|
|
func cloneBytes(a []byte) []byte {
|
|
b := make([]byte, len(a))
|
|
copy(b, a)
|
|
return b
|
|
}
|