forked from TrueCloudLab/frostfs-node
[#1523] writecache: Simplify logic
1. Remove in-memory cache. It doesn't persist objects and if we want more speed, `NoSync` option can be used for the bolt DB. 2. Put to the metabase in a synchronous fashion. This considerably simplifies overall logic and plays nicely with the metabase bolt DB batch settings. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
4176b2a1bc
commit
ddaed283e9
9 changed files with 140 additions and 328 deletions
|
@ -21,6 +21,7 @@ import (
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDump(t *testing.T) {
|
func TestDump(t *testing.T) {
|
||||||
|
@ -49,8 +50,11 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
||||||
[]writecache.Option{
|
[]writecache.Option{
|
||||||
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
||||||
writecache.WithMaxObjectSize(wcBigObjectSize),
|
writecache.WithMaxObjectSize(wcBigObjectSize),
|
||||||
|
writecache.WithLogger(zaptest.NewLogger(t)),
|
||||||
},
|
},
|
||||||
nil)
|
[]blobstor.Option{
|
||||||
|
blobstor.WithLogger(zaptest.NewLogger(t)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
defer releaseShard(sh, t)
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
@ -34,29 +35,34 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, ErrReadOnlyMode
|
return PutRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data, err := prm.obj.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var putPrm common.PutPrm // form Put parameters
|
var putPrm common.PutPrm // form Put parameters
|
||||||
putPrm.Object = prm.obj
|
putPrm.Object = prm.obj
|
||||||
|
putPrm.RawData = data
|
||||||
|
putPrm.Address = objectCore.AddressOf(prm.obj)
|
||||||
|
|
||||||
|
var res common.PutRes
|
||||||
|
|
||||||
// exist check are not performed there, these checks should be executed
|
// exist check are not performed there, these checks should be executed
|
||||||
// ahead of `Put` by storage engine
|
// ahead of `Put` by storage engine
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
err := s.writeCache.Put(prm.obj)
|
res, err = s.writeCache.Put(putPrm)
|
||||||
if err == nil {
|
|
||||||
return PutRes{}, nil
|
|
||||||
}
|
}
|
||||||
|
if err != nil || !s.hasWriteCache() {
|
||||||
s.log.Debug("can't put message to writeCache, trying to blobStor",
|
if err != nil {
|
||||||
|
s.log.Debug("can't put object to the write-cache, trying blobstor",
|
||||||
zap.String("err", err.Error()))
|
zap.String("err", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
res, err = s.blobStor.Put(putPrm)
|
||||||
err error
|
if err != nil {
|
||||||
res common.PutRes
|
|
||||||
)
|
|
||||||
|
|
||||||
if res, err = s.blobStor.Put(putPrm); err != nil {
|
|
||||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !m.NoMetabase() {
|
if !m.NoMetabase() {
|
||||||
var pPrm meta.PutPrm
|
var pPrm meta.PutPrm
|
||||||
|
|
|
@ -22,20 +22,6 @@ func (c *cache) Delete(addr oid.Address) error {
|
||||||
|
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
// Check memory cache.
|
|
||||||
c.mtx.Lock()
|
|
||||||
for i := range c.mem {
|
|
||||||
if saddr == c.mem[i].addr {
|
|
||||||
c.curMemSize -= uint64(len(c.mem[i].data))
|
|
||||||
copy(c.mem[i:], c.mem[i+1:])
|
|
||||||
c.mem = c.mem[:len(c.mem)-1]
|
|
||||||
c.mtx.Unlock()
|
|
||||||
storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("in-mem DELETE"))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.mtx.Unlock()
|
|
||||||
|
|
||||||
// Check disk cache.
|
// Check disk cache.
|
||||||
var has int
|
var has int
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
@ -24,23 +23,19 @@ const (
|
||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// flushLoop periodically flushes changes from the database to memory.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) flushLoop() {
|
func (c *cache) runFlushLoop() {
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func(i int) {
|
go c.flushWorker(i)
|
||||||
defer wg.Done()
|
|
||||||
c.flushWorker(i)
|
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
go c.flushBigObjects()
|
||||||
|
|
||||||
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer c.wg.Done()
|
||||||
c.flushBigObjects()
|
|
||||||
}()
|
|
||||||
|
|
||||||
tt := time.NewTimer(defaultFlushInterval)
|
tt := time.NewTimer(defaultFlushInterval)
|
||||||
defer tt.Stop()
|
defer tt.Stop()
|
||||||
|
@ -51,17 +46,22 @@ func (c *cache) flushLoop() {
|
||||||
c.flush()
|
c.flush()
|
||||||
tt.Reset(defaultFlushInterval)
|
tt.Reset(defaultFlushInterval)
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
c.log.Debug("waiting for workers to quit")
|
|
||||||
wg.Wait()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flush() {
|
func (c *cache) flush() {
|
||||||
lastKey := []byte{}
|
lastKey := []byte{}
|
||||||
var m []objectInfo
|
var m []objectInfo
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.closeCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
m = m[:0]
|
m = m[:0]
|
||||||
sz := 0
|
sz := 0
|
||||||
|
|
||||||
|
@ -122,6 +122,8 @@ func (c *cache) flush() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushBigObjects() {
|
func (c *cache) flushBigObjects() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
tick := time.NewTicker(defaultFlushInterval * 10)
|
tick := time.NewTicker(defaultFlushInterval * 10)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -181,6 +183,7 @@ func (c *cache) flushBigObjects() {
|
||||||
c.evictObjects(evictNum)
|
c.evictObjects(evictNum)
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,47 +191,27 @@ func (c *cache) flushBigObjects() {
|
||||||
// flushWorker runs in a separate goroutine and write objects to the main storage.
|
// flushWorker runs in a separate goroutine and write objects to the main storage.
|
||||||
// If flushFirst is true, flushing objects from cache database takes priority over
|
// If flushFirst is true, flushing objects from cache database takes priority over
|
||||||
// putting new objects.
|
// putting new objects.
|
||||||
func (c *cache) flushWorker(num int) {
|
func (c *cache) flushWorker(_ int) {
|
||||||
priorityCh := c.directCh
|
defer c.wg.Done()
|
||||||
switch num % 3 {
|
|
||||||
case 0:
|
|
||||||
priorityCh = c.flushCh
|
|
||||||
case 1:
|
|
||||||
priorityCh = c.metaCh
|
|
||||||
}
|
|
||||||
|
|
||||||
var obj *object.Object
|
var obj *object.Object
|
||||||
for {
|
for {
|
||||||
metaOnly := false
|
|
||||||
|
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
// TODO(fyrchik): #1150 do this once in N iterations depending on load
|
|
||||||
select {
|
select {
|
||||||
case obj = <-priorityCh:
|
|
||||||
metaOnly = num%3 == 1
|
|
||||||
default:
|
|
||||||
select {
|
|
||||||
case obj = <-c.directCh:
|
|
||||||
case obj = <-c.flushCh:
|
case obj = <-c.flushCh:
|
||||||
case obj = <-c.metaCh:
|
|
||||||
metaOnly = true
|
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
err := c.writeObject(obj, metaOnly)
|
err := c.flushObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error("can't flush object to the main storage", zap.Error(err))
|
c.log.Error("can't flush object to the main storage", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
|
func (c *cache) flushObject(obj *object.Object) error {
|
||||||
var descriptor []byte
|
|
||||||
|
|
||||||
if !metaOnly {
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Object = obj
|
prm.Object = obj
|
||||||
|
|
||||||
|
@ -237,14 +220,11 @@ func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
descriptor = res.StorageID
|
|
||||||
}
|
|
||||||
|
|
||||||
var pPrm meta.PutPrm
|
var pPrm meta.PutPrm
|
||||||
pPrm.SetObject(obj)
|
pPrm.SetObject(obj)
|
||||||
pPrm.SetStorageID(descriptor)
|
pPrm.SetStorageID(res.StorageID)
|
||||||
|
|
||||||
_, err := c.metabase.Put(pPrm)
|
_, err = c.metabase.Put(pPrm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,20 +14,6 @@ import (
|
||||||
func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
c.mtx.RLock()
|
|
||||||
for i := range c.mem {
|
|
||||||
if saddr == c.mem[i].addr {
|
|
||||||
data := c.mem[i].data
|
|
||||||
c.mtx.RUnlock()
|
|
||||||
// We unmarshal object instead of using cached value to avoid possibility
|
|
||||||
// of unintentional object corruption by caller.
|
|
||||||
// It is safe to unmarshal without mutex, as storage under `c.mem[i].data` slices is not reused.
|
|
||||||
obj := objectSDK.New()
|
|
||||||
return obj, obj.Unmarshal(data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.mtx.RUnlock()
|
|
||||||
|
|
||||||
value, err := Get(c.db, []byte(saddr))
|
value, err := Get(c.db, []byte(saddr))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
|
|
|
@ -23,13 +23,6 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.readOnly() {
|
|
||||||
// Because modeMtx is taken no new objects will arrive an all other modifying
|
|
||||||
// operations are completed.
|
|
||||||
// 1. Persist objects already in memory on disk.
|
|
||||||
c.persistMemoryCache()
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
if err := c.db.Close(); err != nil {
|
if err := c.db.Close(); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||||
|
@ -37,13 +30,10 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
c.db = nil
|
c.db = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Suspend producers to ensure there are channel send operations in fly.
|
// Suspend producers to ensure there are channel send operations in fly.
|
||||||
// metaCh and directCh can be populated either during Put or in background memory persist thread.
|
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
|
||||||
// Former possibility is eliminated by taking `modeMtx` mutex and
|
// guarantees that there are no in-fly operations.
|
||||||
// latter by explicit persist in the previous step.
|
for len(c.flushCh) != 0 {
|
||||||
// flushCh is populated by `flush` with `modeMtx` is also taken.
|
|
||||||
// Thus all producers are shutdown and we only need to wait until all channels are empty.
|
|
||||||
for len(c.metaCh) != 0 || len(c.directCh) != 0 || len(c.flushCh) != 0 {
|
|
||||||
c.log.Info("waiting for channels to flush")
|
c.log.Info("waiting for channels to flush")
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,153 +0,0 @@
|
||||||
package writecache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
const defaultPersistInterval = time.Second
|
|
||||||
|
|
||||||
// persistLoop persists object accumulated in memory to the database.
|
|
||||||
func (c *cache) persistLoop() {
|
|
||||||
tick := time.NewTicker(defaultPersistInterval)
|
|
||||||
defer tick.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-tick.C:
|
|
||||||
c.modeMtx.RLock()
|
|
||||||
if c.readOnly() {
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
c.persistMemoryCache()
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) persistMemoryCache() {
|
|
||||||
c.mtx.RLock()
|
|
||||||
m := c.mem
|
|
||||||
c.mtx.RUnlock()
|
|
||||||
|
|
||||||
if len(m) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
c.persistSmallObjects(m)
|
|
||||||
c.log.Debug("persisted items to disk",
|
|
||||||
zap.Duration("took", time.Since(start)),
|
|
||||||
zap.Int("total", len(m)))
|
|
||||||
|
|
||||||
for i := range m {
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
storagelog.AddressField(m[i].addr),
|
|
||||||
storagelog.OpField("in-mem DELETE persist"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mtx.Lock()
|
|
||||||
c.curMemSize = 0
|
|
||||||
n := copy(c.mem, c.mem[len(m):])
|
|
||||||
c.mem = c.mem[:n]
|
|
||||||
for i := range c.mem {
|
|
||||||
c.curMemSize += uint64(len(c.mem[i].data))
|
|
||||||
}
|
|
||||||
c.mtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// persistSmallObjects persists small objects to the write-cache database and
|
|
||||||
// pushes the to the flush workers queue.
|
|
||||||
func (c *cache) persistSmallObjects(objs []objectInfo) {
|
|
||||||
cacheSize := c.estimateCacheSize()
|
|
||||||
overflowIndex := len(objs)
|
|
||||||
for i := range objs {
|
|
||||||
newSize := c.incSizeDB(cacheSize)
|
|
||||||
if c.maxCacheSize < newSize {
|
|
||||||
overflowIndex = i
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cacheSize = newSize
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(defaultBucket)
|
|
||||||
for i := 0; i < overflowIndex; i++ {
|
|
||||||
err := b.Put([]byte(objs[i].addr), objs[i].data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
overflowIndex = 0
|
|
||||||
} else {
|
|
||||||
c.evictObjects(len(objs) - overflowIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < overflowIndex; i++ {
|
|
||||||
storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT"))
|
|
||||||
c.objCounters.IncDB()
|
|
||||||
}
|
|
||||||
for i := overflowIndex; i < len(objs); i++ {
|
|
||||||
c.flushed.Add(objs[i].addr, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.addToFlushQueue(objs, overflowIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// persistBigObject writes object to FSTree and pushes it to the flush workers queue.
|
|
||||||
func (c *cache) persistBigObject(objInfo objectInfo) {
|
|
||||||
cacheSz := c.estimateCacheSize()
|
|
||||||
metaIndex := 0
|
|
||||||
if c.incSizeFS(cacheSz) <= c.maxCacheSize {
|
|
||||||
var prm common.PutPrm
|
|
||||||
prm.Address = object.AddressOf(objInfo.obj)
|
|
||||||
prm.RawData = objInfo.data
|
|
||||||
|
|
||||||
_, err := c.fsTree.Put(prm)
|
|
||||||
if err == nil {
|
|
||||||
metaIndex = 1
|
|
||||||
if c.blobstor.NeedsCompression(objInfo.obj) {
|
|
||||||
c.mtx.Lock()
|
|
||||||
c.compressFlags[objInfo.addr] = struct{}{}
|
|
||||||
c.mtx.Unlock()
|
|
||||||
}
|
|
||||||
c.objCounters.IncFS()
|
|
||||||
storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.addToFlushQueue([]objectInfo{objInfo}, metaIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// addToFlushQueue pushes objects to the flush workers queue.
|
|
||||||
// For objects below metaIndex only meta information will be flushed.
|
|
||||||
func (c *cache) addToFlushQueue(objs []objectInfo, metaIndex int) {
|
|
||||||
for i := 0; i < metaIndex; i++ {
|
|
||||||
select {
|
|
||||||
case c.metaCh <- objs[i].obj:
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for i := metaIndex; i < len(objs); i++ {
|
|
||||||
select {
|
|
||||||
case c.directCh <- objs[i].obj:
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,57 +3,80 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrBigObject is returned when object is too big to be placed in cache.
|
var (
|
||||||
var ErrBigObject = errors.New("too big object")
|
// ErrBigObject is returned when object is too big to be placed in cache.
|
||||||
|
ErrBigObject = errors.New("too big object")
|
||||||
|
// ErrOutOfSpace is returned when there is no space left to put a new object.
|
||||||
|
ErrOutOfSpace = errors.New("no space left in the write cache")
|
||||||
|
)
|
||||||
|
|
||||||
// Put puts object to write-cache.
|
// Put puts object to write-cache.
|
||||||
func (c *cache) Put(o *objectSDK.Object) error {
|
func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return ErrReadOnly
|
return common.PutRes{}, ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
sz := uint64(o.ToV2().StableSize())
|
sz := uint64(len(prm.RawData))
|
||||||
if sz > c.maxObjectSize {
|
if sz > c.maxObjectSize {
|
||||||
return ErrBigObject
|
return common.PutRes{}, ErrBigObject
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := o.Marshal()
|
oi := objectInfo{
|
||||||
|
addr: prm.Address.EncodeToString(),
|
||||||
|
obj: prm.Object,
|
||||||
|
data: prm.RawData,
|
||||||
|
}
|
||||||
|
|
||||||
|
if sz <= c.smallObjectSize {
|
||||||
|
return common.PutRes{}, c.putSmall(oi)
|
||||||
|
}
|
||||||
|
return common.PutRes{}, c.putBig(oi.addr, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// putSmall persists small objects to the write-cache database and
|
||||||
|
// pushes the to the flush workers queue.
|
||||||
|
func (c *cache) putSmall(obj objectInfo) error {
|
||||||
|
cacheSize := c.estimateCacheSize()
|
||||||
|
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
||||||
|
return ErrOutOfSpace
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(defaultBucket)
|
||||||
|
return b.Put([]byte(obj.addr), obj.data)
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
storagelog.Write(c.log, storagelog.AddressField(obj.addr), storagelog.OpField("db PUT"))
|
||||||
|
c.objCounters.IncDB()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||||
|
func (c *cache) putBig(addr string, prm common.PutPrm) error {
|
||||||
|
cacheSz := c.estimateCacheSize()
|
||||||
|
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
||||||
|
return ErrOutOfSpace
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := c.fsTree.Put(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
oi := objectInfo{
|
if c.blobstor.NeedsCompression(prm.Object) {
|
||||||
addr: object.AddressOf(o).EncodeToString(),
|
|
||||||
obj: o,
|
|
||||||
data: data,
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
|
c.compressFlags[addr] = struct{}{}
|
||||||
if sz <= c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize {
|
|
||||||
c.curMemSize += sz
|
|
||||||
c.mem = append(c.mem, oi)
|
|
||||||
|
|
||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
|
|
||||||
storagelog.Write(c.log, storagelog.AddressField(oi.addr), storagelog.OpField("in-mem PUT"))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mtx.Unlock()
|
|
||||||
|
|
||||||
if sz <= c.smallObjectSize {
|
|
||||||
c.persistSmallObjects([]objectInfo{oi})
|
|
||||||
} else {
|
|
||||||
c.persistBigObject(oi)
|
|
||||||
}
|
}
|
||||||
|
c.objCounters.IncFS()
|
||||||
|
storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.OpField("fstree PUT"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
@ -23,7 +24,7 @@ type Cache interface {
|
||||||
Head(oid.Address) (*object.Object, error)
|
Head(oid.Address) (*object.Object, error)
|
||||||
Delete(oid.Address) error
|
Delete(oid.Address) error
|
||||||
Iterate(IterationPrm) error
|
Iterate(IterationPrm) error
|
||||||
Put(*object.Object) error
|
Put(common.PutPrm) (common.PutRes, error)
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
SetLogger(*zap.Logger)
|
SetLogger(*zap.Logger)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
|
@ -36,9 +37,8 @@ type Cache interface {
|
||||||
type cache struct {
|
type cache struct {
|
||||||
options
|
options
|
||||||
|
|
||||||
// mtx protects mem field, statistics, counters and compressFlags.
|
// mtx protects statistics, counters and compressFlags.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
mem []objectInfo
|
|
||||||
|
|
||||||
mode mode.Mode
|
mode mode.Mode
|
||||||
modeMtx sync.RWMutex
|
modeMtx sync.RWMutex
|
||||||
|
@ -47,19 +47,12 @@ type cache struct {
|
||||||
// whether object should be compressed.
|
// whether object should be compressed.
|
||||||
compressFlags map[string]struct{}
|
compressFlags map[string]struct{}
|
||||||
|
|
||||||
// curMemSize is the current size of all objects cached in memory.
|
|
||||||
curMemSize uint64
|
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan *object.Object
|
flushCh chan *object.Object
|
||||||
// directCh is a channel with objects to put directly to the main storage.
|
|
||||||
// it is prioritized over flushCh.
|
|
||||||
directCh chan *object.Object
|
|
||||||
// metaCh is a channel with objects for which only metadata needs to be written.
|
|
||||||
metaCh chan *object.Object
|
|
||||||
// closeCh is close channel.
|
// closeCh is close channel.
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
evictCh chan []byte
|
// wg is a wait group for flush workers.
|
||||||
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
store
|
store
|
||||||
// fsTree contains big files stored directly on file-system.
|
// fsTree contains big files stored directly on file-system.
|
||||||
|
@ -87,10 +80,7 @@ var (
|
||||||
func New(opts ...Option) Cache {
|
func New(opts ...Option) Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
flushCh: make(chan *object.Object),
|
flushCh: make(chan *object.Object),
|
||||||
directCh: make(chan *object.Object),
|
|
||||||
metaCh: make(chan *object.Object),
|
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
evictCh: make(chan []byte),
|
|
||||||
mode: mode.ReadWrite,
|
mode: mode.ReadWrite,
|
||||||
|
|
||||||
compressFlags: make(map[string]struct{}),
|
compressFlags: make(map[string]struct{}),
|
||||||
|
@ -144,9 +134,7 @@ func (c *cache) Open(readOnly bool) error {
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.initFlushMarks()
|
c.initFlushMarks()
|
||||||
|
c.runFlushLoop()
|
||||||
go c.persistLoop()
|
|
||||||
go c.flushLoop()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,6 +146,8 @@ func (c *cache) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
close(c.closeCh)
|
close(c.closeCh)
|
||||||
|
c.wg.Wait()
|
||||||
|
|
||||||
if c.objCounters != nil {
|
if c.objCounters != nil {
|
||||||
c.objCounters.FlushAndClose()
|
c.objCounters.FlushAndClose()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue