[#472] blobstor: implement write-cache

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2021-04-06 13:56:06 +03:00 committed by Alex Vanin
parent 96a8ee7c83
commit 59de521fd1
24 changed files with 1011 additions and 116 deletions

View file

@ -24,6 +24,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
@ -131,6 +132,11 @@ const (
cfgBlobStorSection = "blobstor"
cfgWriteCacheSection = "writecache"
cfgWriteCacheMemSize = "mem_size"
cfgWriteCacheDBSize = "db_size"
cfgWriteCacheSmallSize = "small_size"
cfgWriteCacheMaxSize = "max_size"
cfgWriteCacheWrkCount = "workers_count"
cfgBlobStorCompress = "compress"
cfgBlobStorShallowDepth = "shallow_depth"
cfgBlobStorTreePath = "path"
@ -560,6 +566,11 @@ func initShardOptions(c *cfg) {
c.log.Warn("incorrect writeCache path, ignore shard")
break
}
writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize))
writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize))
writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize))
writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize))
writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount))
blobPrefix := configPath(prefix, cfgBlobStorSection)
@ -589,6 +600,9 @@ func initShardOptions(c *cfg) {
if smallSzLimit == 0 {
smallSzLimit = 1 << 20 // 1MB
}
if writeCacheMaxSize <= 0 {
writeCacheSmallSize = smallSzLimit
}
blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection)
@ -657,9 +671,13 @@ func initShardOptions(c *cfg) {
),
shard.WithWriteCache(useCache),
shard.WithWriteCacheOptions(
blobstor.WithRootPath(writeCachePath),
blobstor.WithBlobovniczaShallowDepth(0),
blobstor.WithBlobovniczaShallowWidth(1),
writecache.WithPath(writeCachePath),
writecache.WithLogger(c.log),
writecache.WithMaxMemSize(writeCacheMemSize),
writecache.WithMaxObjectSize(writeCacheMaxSize),
writecache.WithSmallObjectSize(writeCacheSmallSize),
writecache.WithMaxDBSize(writeCacheDBSize),
writecache.WithFlushWorkersCount(writeCacheWrkCount),
),
shard.WithRemoverBatchSize(rmBatchSize),
shard.WithGCRemoverSleepInterval(rmSleepInterval),

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path"
"strings"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
)
@ -44,6 +45,52 @@ func stringifyAddress(addr *objectSDK.Address) string {
return hex.EncodeToString(h[:])
}
// Iterate iterates over all stored objects.
func (t *FSTree) Iterate(f func(addr *objectSDK.Address, data []byte) error) error {
return t.iterate(0, []string{t.RootPath}, f)
}
func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, []byte) error) error {
curName := strings.Join(curPath[1:], "")
des, err := ioutil.ReadDir(curName)
if err != nil {
return err
}
isLast := depth >= t.Depth
l := len(curPath)
curPath = append(curPath, "")
for i := range des {
curPath[l] = des[i].Name()
if !isLast && des[i].IsDir() {
err := t.iterate(depth+1, curPath, f)
if err != nil {
return err
}
}
addr := objectSDK.NewAddress()
err := addr.Parse(curName + des[i].Name())
if err != nil {
continue
}
curPath = append(curPath, des[i].Name())
data, err := ioutil.ReadFile(path.Join(curPath...))
if err != nil {
return err
}
if err := f(addr, data); err != nil {
return err
}
}
return nil
}
func (t *FSTree) treePath(addr *objectSDK.Address) string {
sAddr := stringifyAddress(addr)

View file

@ -2,7 +2,7 @@ package blobstor
import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
// DumpInfo returns information about the BlobStor.
// FSTree returns file-system tree for big object store.
func (b *BlobStor) DumpInfo() fstree.Info {
return b.fsTree.Info
}

View file

@ -47,10 +47,9 @@ func (p *PutPrm) WithBlobovniczaID(id *blobovnicza.ID) *PutPrm {
}
var (
ErrUnknownObjectType = errors.New("unknown object type")
ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it")
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
ErrIncorrectRootObject = errors.New("invalid root object")
ErrUnknownObjectType = errors.New("unknown object type")
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
ErrIncorrectRootObject = errors.New("invalid root object")
)
// Put saves the object in DB.
@ -379,20 +378,12 @@ func decodeList(data []byte) (lst [][]byte, err error) {
// updateBlobovniczaID for existing objects if they were moved from from
// one blobovnicza to another.
func updateBlobovniczaID(tx *bbolt.Tx, addr *objectSDK.Address, id *blobovnicza.ID) error {
bkt := tx.Bucket(smallBucketName(addr.ContainerID()))
if bkt == nil {
// if object exists, don't have blobovniczaID and we want to update it
// then ignore, this should never happen
return ErrIncorrectBlobovniczaUpdate
bkt, err := tx.CreateBucketIfNotExists(smallBucketName(addr.ContainerID()))
if err != nil {
return err
}
objectKey := objectKey(addr.ObjectID())
if len(bkt.Get(objectKey)) == 0 {
return ErrIncorrectBlobovniczaUpdate
}
return bkt.Put(objectKey, *id)
return bkt.Put(objectKey(addr.ObjectID()), *id)
}
// updateSpliInfo for existing objects if storage filled with extra information

View file

@ -42,8 +42,5 @@ func TestDB_PutBlobovnicaUpdate(t *testing.T) {
fetchedBlobovniczaID, err := meta.IsSmall(db, raw2.Object().Address())
require.NoError(t, err)
require.Nil(t, fetchedBlobovniczaID)
err = meta.Put(db, raw2.Object(), &blobovniczaID)
require.Error(t, err)
})
}

View file

@ -2,9 +2,11 @@ package shard
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/pkg/errors"
"go.uber.org/zap"
)
@ -37,13 +39,10 @@ func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) {
smalls := make(map[*objectSDK.Address]*blobovnicza.ID, ln)
for i := range prm.addr {
delSmallPrm.SetAddress(prm.addr[i])
delBigPrm.SetAddress(prm.addr[i])
if s.hasWriteCache() {
_, err := s.writeCache.DeleteSmall(delSmallPrm)
if err != nil {
_, _ = s.writeCache.DeleteBig(delBigPrm)
err := s.writeCache.Delete(prm.addr[i])
if err != nil && !errors.Is(err, object.ErrNotFound) {
s.log.Error("can't delete object from write cache", zap.String("error", err.Error()))
}
}

View file

@ -9,24 +9,19 @@ import (
)
func TestShard_Delete(t *testing.T) {
sh := newShard(t, false)
shWC := newShard(t, true)
defer func() {
releaseShard(sh, t)
releaseShard(shWC, t)
}()
t.Run("without write cache", func(t *testing.T) {
testShardDelete(t, sh)
testShardDelete(t, false)
})
t.Run("with write cache", func(t *testing.T) {
testShardDelete(t, shWC)
testShardDelete(t, true)
})
}
func testShardDelete(t *testing.T, sh *shard.Shard) {
func testShardDelete(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
cid := generateCID()
obj := generateRawObjectWithCID(t, cid)
@ -47,7 +42,7 @@ func testShardDelete(t *testing.T, sh *shard.Shard) {
_, err := sh.Put(putPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Delete(delPrm)

View file

@ -8,6 +8,8 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// storFetcher is a type to unify object fetching mechanism in `fetchObjectData`
@ -89,19 +91,16 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
)
if s.hasWriteCache() {
res, err = small(s.writeCache, nil)
res, err = s.writeCache.Get(addr)
if err == nil {
return res, nil
}
s.log.Debug("miss in writeCache blobovnicza")
res, err = big(s.writeCache, nil)
if err == nil {
return res, nil
if errors.Is(err, object.ErrNotFound) {
s.log.Debug("object is missing in write-cache")
} else {
s.log.Error("failed to fetch object from write-cache", zap.Error(err))
}
s.log.Debug("miss in writeCache shallow dir")
}
exists, err := meta.Exists(s.metaBase, addr)

View file

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"testing"
"time"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
@ -12,31 +13,25 @@ import (
)
func TestShard_Get(t *testing.T) {
sh := newShard(t, false)
shWC := newShard(t, true)
defer func() {
releaseShard(sh, t)
releaseShard(shWC, t)
}()
t.Run("without write cache", func(t *testing.T) {
testShardGet(t, sh)
testShardGet(t, false)
})
t.Run("with write cache", func(t *testing.T) {
testShardGet(t, shWC)
testShardGet(t, true)
})
}
func testShardGet(t *testing.T, sh *shard.Shard) {
obj := generateRawObject(t)
addAttribute(obj, "foo", "bar")
func testShardGet(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
putPrm := new(shard.PutPrm)
getPrm := new(shard.GetPrm)
t.Run("small object", func(t *testing.T) {
obj := generateRawObject(t)
addAttribute(obj, "foo", "bar")
addPayload(obj, 1<<5)
putPrm.WithObject(obj.Object())
@ -46,12 +41,14 @@ func testShardGet(t *testing.T, sh *shard.Shard) {
getPrm.WithAddress(obj.Object().Address())
res, err := sh.Get(getPrm)
res, err := testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
require.Equal(t, obj.Object(), res.Object())
})
t.Run("big object", func(t *testing.T) {
obj := generateRawObject(t)
addAttribute(obj, "foo", "bar")
obj.SetID(generateOID())
addPayload(obj, 1<<20) // big obj
@ -62,12 +59,14 @@ func testShardGet(t *testing.T, sh *shard.Shard) {
getPrm.WithAddress(obj.Object().Address())
res, err := sh.Get(getPrm)
res, err := testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
require.Equal(t, obj.Object(), res.Object())
})
t.Run("parent object", func(t *testing.T) {
obj := generateRawObject(t)
addAttribute(obj, "foo", "bar")
cid := generateCID()
splitID := objectSDK.NewSplitID()
@ -87,13 +86,13 @@ func testShardGet(t *testing.T, sh *shard.Shard) {
getPrm.WithAddress(child.Object().Address())
res, err := sh.Get(getPrm)
res, err := testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
require.True(t, binaryEqual(child.Object(), res.Object()))
getPrm.WithAddress(parent.Object().Address())
_, err = sh.Get(getPrm)
_, err = testGet(t, sh, getPrm, hasWriteCache)
var expectedErr *objectSDK.SplitInfoError
require.True(t, errors.As(err, &expectedErr))
@ -106,6 +105,19 @@ func testShardGet(t *testing.T, sh *shard.Shard) {
})
}
func testGet(t *testing.T, sh *shard.Shard, getPrm *shard.GetPrm, hasWriteCache bool) (*shard.GetRes, error) {
res, err := sh.Get(getPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if errors.Is(err, object.ErrNotFound) {
res, err = sh.Get(getPrm)
}
return !errors.Is(err, object.ErrNotFound)
}, time.Second, time.Millisecond*100)
}
return res, err
}
// binary equal is used when object contains empty lists in the structure and
// requre.Equal fails on comparing <nil> and []{} lists.
func binaryEqual(a, b *object.Object) bool {

View file

@ -3,31 +3,28 @@ package shard_test
import (
"errors"
"testing"
"time"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/stretchr/testify/require"
)
func TestShard_Head(t *testing.T) {
sh := newShard(t, false)
shWC := newShard(t, true)
defer func() {
releaseShard(sh, t)
releaseShard(shWC, t)
}()
t.Run("without write cache", func(t *testing.T) {
testShardHead(t, sh)
testShardHead(t, false)
})
t.Run("with write cache", func(t *testing.T) {
testShardHead(t, shWC)
testShardHead(t, true)
})
}
func testShardHead(t *testing.T, sh *shard.Shard) {
func testShardHead(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
putPrm := new(shard.PutPrm)
headPrm := new(shard.HeadPrm)
@ -42,7 +39,7 @@ func testShardHead(t *testing.T, sh *shard.Shard) {
headPrm.WithAddress(obj.Object().Address())
res, err := sh.Head(headPrm)
res, err := testHead(t, sh, headPrm, hasWriteCache)
require.NoError(t, err)
require.Equal(t, obj.Object(), res.Object())
})
@ -69,7 +66,7 @@ func testShardHead(t *testing.T, sh *shard.Shard) {
var siErr *objectSDK.SplitInfoError
_, err = sh.Head(headPrm)
_, err = testHead(t, sh, headPrm, hasWriteCache)
require.True(t, errors.As(err, &siErr))
headPrm.WithAddress(parent.Object().Address())
@ -80,3 +77,16 @@ func testShardHead(t *testing.T, sh *shard.Shard) {
require.Equal(t, parent.Object(), head.Object())
})
}
func testHead(t *testing.T, sh *shard.Shard, headPrm *shard.HeadPrm, hasWriteCache bool) (*shard.HeadRes, error) {
res, err := sh.Head(headPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if errors.Is(err, object.ErrNotFound) {
res, err = sh.Head(headPrm)
}
return !errors.Is(err, object.ErrNotFound)
}, time.Second, time.Millisecond*100)
}
return res, err
}

View file

@ -44,6 +44,12 @@ func (p *InhumePrm) MarkAsGarbage(addr ...*objectSDK.Address) *InhumePrm {
// Inhume calls metabase. Inhume method to mark object as removed. It won't be
// removed physically from blobStor and metabase until `Delete` operation.
func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) {
if s.hasWriteCache() {
for i := range prm.target {
_ = s.writeCache.Delete(prm.target[i])
}
}
metaPrm := new(meta.InhumePrm).WithAddresses(prm.target...)
if prm.tombstone != nil {

View file

@ -9,24 +9,19 @@ import (
)
func TestShard_Inhume(t *testing.T) {
sh := newShard(t, false)
shWC := newShard(t, true)
defer func() {
releaseShard(sh, t)
releaseShard(shWC, t)
}()
t.Run("without write cache", func(t *testing.T) {
testShardInhume(t, sh)
testShardInhume(t, false)
})
t.Run("with write cache", func(t *testing.T) {
testShardInhume(t, shWC)
testShardInhume(t, true)
})
}
func testShardInhume(t *testing.T, sh *shard.Shard) {
func testShardInhume(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
cid := generateCID()
obj := generateRawObjectWithCID(t, cid)
@ -46,7 +41,7 @@ func testShardInhume(t *testing.T, sh *shard.Shard) {
_, err := sh.Put(putPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Inhume(inhPrm)

View file

@ -5,6 +5,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// PutPrm groups the parameters of Put operation.
@ -34,26 +35,24 @@ func (s *Shard) Put(prm *PutPrm) (*PutRes, error) {
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
if s.hasWriteCache() {
err := s.writeCache.Put(prm.obj)
if err == nil {
return nil, nil
}
s.log.Debug("can't put message to writeCache, trying to blobStor",
zap.String("err", err.Error()))
}
var (
err error
res *blobstor.PutRes
)
if s.hasWriteCache() {
res, err = s.writeCache.Put(putPrm)
if err != nil {
s.log.Debug("can't put message to writeCache, trying to blobStor")
res = nil // just in case
}
}
// res == nil if there is no writeCache or writeCache.Put has been failed
if res == nil {
if res, err = s.blobStor.Put(putPrm); err != nil {
return nil, errors.Wrap(err, "could not put object to BLOB storage")
}
if res, err = s.blobStor.Put(putPrm); err != nil {
return nil, errors.Wrap(err, "could not put object to BLOB storage")
}
// put to metabase

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/atomic"
@ -19,7 +20,7 @@ type Shard struct {
mode *atomic.Uint32
writeCache *blobstor.BlobStor
writeCache writecache.Cache
blobStor *blobstor.BlobStor
@ -43,7 +44,7 @@ type cfg struct {
metaOpts []meta.Option
writeCacheOpts []blobstor.Option
writeCacheOpts []writecache.Option
log *logger.Logger
@ -68,19 +69,22 @@ func New(opts ...Option) *Shard {
opts[i](c)
}
var writeCache *blobstor.BlobStor
bs := blobstor.New(c.blobOpts...)
mb := meta.New(c.metaOpts...)
var writeCache writecache.Cache
if c.useWriteCache {
writeCache = blobstor.New(
append(c.blobOpts, c.writeCacheOpts...)...,
)
writeCache = writecache.New(
append(c.writeCacheOpts,
writecache.WithBlobstor(bs),
writecache.WithMetabase(mb))...)
}
return &Shard{
cfg: c,
mode: atomic.NewUint32(0), // TODO: init with particular mode
blobStor: blobstor.New(c.blobOpts...),
metaBase: meta.New(c.metaOpts...),
blobStor: bs,
metaBase: mb,
writeCache: writeCache,
}
}
@ -107,7 +111,7 @@ func WithMetaBaseOptions(opts ...meta.Option) Option {
}
// WithMetaBaseOptions returns option to set internal metabase options.
func WithWriteCacheOptions(opts ...blobstor.Option) Option {
func WithWriteCacheOptions(opts ...writecache.Option) Option {
return func(c *cfg) {
c.writeCacheOpts = opts
}

View file

@ -15,6 +15,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/util/test"
"github.com/nspcc-dev/tzhash/tz"
"github.com/stretchr/testify/require"
@ -41,9 +42,8 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
),
shard.WithWriteCache(enableWriteCache),
shard.WithWriteCacheOptions(
blobstor.WithRootPath(path.Join(rootPath, "wcache")),
blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(0),
writecache.WithMaxMemSize(0), // disable memory batches
writecache.WithPath(path.Join(rootPath, "wcache")),
),
}

View file

@ -0,0 +1,49 @@
package writecache
import (
"errors"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
)
// Delete removes object from write-cache.
func (c *cache) Delete(addr *objectSDK.Address) error {
saddr := addr.String()
// Check memory cache.
c.mtx.Lock()
for i := range c.mem {
if saddr == c.mem[i].addr {
copy(c.mem[i:], c.mem[i+1:])
c.mem = c.mem[:len(c.mem)-1]
c.mtx.Unlock()
return nil
}
}
c.mtx.Unlock()
// Check disk cache.
has := false
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
has = b.Get([]byte(saddr)) != nil
return nil
})
if has {
return c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Delete([]byte(saddr))
})
}
err := c.fsTree.Delete(addr)
if errors.Is(err, fstree.ErrFileNotFound) {
err = object.ErrNotFound
}
return err
}

View file

@ -0,0 +1,20 @@
// Package writecache implements write-cache for objects.
//
// It contains in-memory cache of fixed size and underlying database
// (usually on SSD) for storing small objects.
// There are 3 places where object can be:
// 1. In-memory cache.
// 2. On-disk cache DB.
// 3. Main storage (blobstor).
//
// There are 2 types of background jobs:
// 1. Persisting objects from in-memory cache to database.
// 2. Flushing objects from database to blobstor.
// On flushing object address is put in in-memory LRU cache.
// The actual deletion from the DB is done when object
// is evicted from this cache.
//
// Putting objects to the main storage is done by multiple workers.
// Some of them prioritize flushing items, others prioritize putting new objects.
// The current ration is 50/50. This helps to make some progress even under load.
package writecache

View file

@ -0,0 +1,196 @@
package writecache
import (
"sync"
"time"
"github.com/mr-tron/base58"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// flushWorkersCount is number of workers for putting objects in main storage.
flushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second
)
// flushLoop periodically flushes changes from the database to memory.
func (c *cache) flushLoop() {
var wg sync.WaitGroup
for i := 0; i < c.workersCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.flushWorker(i)
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
c.flushBigObjects()
}()
tick := time.NewTicker(defaultFlushInterval)
for {
select {
case <-tick.C:
c.flush()
case <-c.closeCh:
c.log.Debug("waiting for workers to quit")
wg.Wait()
return
}
}
}
func (c *cache) flush() {
lastKey := []byte{}
var m []objectInfo
for {
m = m[:0]
sz := 0
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, v := cs.Seek(lastKey); k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if _, ok := c.flushed.Peek(string(k)); ok {
continue
}
sz += len(k) + len(v)
m = append(m, objectInfo{
addr: string(k),
data: cloneBytes(v),
})
}
return nil
})
for i := range m {
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
select {
case c.flushCh <- obj:
case <-c.closeCh:
return
}
}
c.evictObjects(len(m))
for i := range m {
c.flushed.Add(m[i].addr, true)
}
c.dbSize.Sub(uint64(sz))
c.log.Debug("flushed items from write-cache",
zap.Int("count", len(m)),
zap.String("start", base58.Encode(lastKey)))
if len(m) > 0 {
lastKey = append([]byte(m[len(m)-1].addr), 0)
} else {
break
}
}
}
func (c *cache) flushBigObjects() {
tick := time.NewTicker(defaultFlushInterval * 10)
for {
select {
case <-tick.C:
_ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error {
if _, ok := c.store.flushed.Peek(addr.String()); ok {
return nil
}
if _, err := c.blobstor.PutRaw(addr, data); err != nil {
c.log.Error("cant flush object to blobstor", zap.Error(err))
}
return nil
})
case <-c.closeCh:
}
}
}
// flushWorker runs in a separate goroutine and write objects to the main storage.
// If flushFirst is true, flushing objects from cache database takes priority over
// putting new objects.
func (c *cache) flushWorker(num int) {
priorityCh := c.directCh
switch num % 3 {
case 0:
priorityCh = c.flushCh
case 1:
priorityCh = c.metaCh
}
var obj *object.Object
for {
metaOnly := false
// Give priority to direct put.
// TODO(fyrchik): do this once in N iterations depending on load
select {
case obj = <-priorityCh:
default:
select {
case obj = <-c.directCh:
case obj = <-c.flushCh:
case obj = <-c.metaCh:
metaOnly = true
case <-c.closeCh:
return
}
}
err := c.writeObject(obj, metaOnly)
if err != nil {
c.log.Error("can't flush object to the main storage", zap.Error(err))
}
}
}
// writeObject is used to write object directly to the main storage.
func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
var id *blobovnicza.ID
if !metaOnly {
prm := new(blobstor.PutPrm)
prm.SetObject(obj)
res, err := c.blobstor.Put(prm)
if err != nil {
return err
}
id = res.BlobovniczaID()
}
return meta.Put(c.metabase, obj, id)
}
func cloneBytes(a []byte) []byte {
b := make([]byte, len(a))
copy(b, a)
return b
}

View file

@ -0,0 +1,51 @@
package writecache
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.etcd.io/bbolt"
)
// Get returns object from write-cache.
func (c *cache) Get(addr *objectSDK.Address) (*object.Object, error) {
saddr := addr.String()
c.mtx.RLock()
for i := range c.mem {
if saddr == c.mem[i].addr {
obj := c.mem[i].obj
c.mtx.RUnlock()
return obj, nil
}
}
c.mtx.RUnlock()
var value []byte
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
val := b.Get([]byte(saddr))
if val != nil {
value = cloneBytes(val)
}
return nil
})
if value != nil {
obj := object.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
data, err := c.fsTree.Get(addr)
if err != nil {
return nil, object.ErrNotFound
}
obj := object.New()
if err := obj.Unmarshal(data); err != nil {
return nil, err
}
c.flushed.Get(saddr)
return obj, nil
}

View file

@ -0,0 +1,100 @@
package writecache
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"go.uber.org/zap"
)
// Option represents write-cache configuration option.
type Option func(*options)
type options struct {
log *zap.Logger
// path is a path to a directory for write-cache.
path string
// blobstor is the main persistent storage.
blobstor *blobstor.BlobStor
// metabase is the metabase instance.
metabase *meta.DB
// maxMemSize is the maximum total size of all objects cached in memory.
// 1 GiB by default.
maxMemSize uint64
// maxDBSize is the maximum size of database in bytes.
// Unrestricted by default.
maxDBSize uint64
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel.
workersCount int
}
// WithLogger sets logger.
func WithLogger(log *zap.Logger) Option {
return func(o *options) {
o.log = log
}
}
// WithPath sets path to writecache db.
func WithPath(path string) Option {
return func(o *options) {
o.path = path
}
}
// WithBlobstor sets main object storage.
func WithBlobstor(bs *blobstor.BlobStor) Option {
return func(o *options) {
o.blobstor = bs
}
}
// WithMetabase sets metabase.
func WithMetabase(db *meta.DB) Option {
return func(o *options) {
o.metabase = db
}
}
// WithMaxMemSize sets maximum size for in-memory DB.
func WithMaxMemSize(sz uint64) Option {
return func(o *options) {
o.maxMemSize = sz
}
}
// WithMaxDBSize sets maximum size for on-disk DB.
func WithMaxDBSize(sz uint64) Option {
return func(o *options) {
o.maxDBSize = sz
}
}
// WithMaxObjectSize sets maximum object size to be stored in write-cache.
func WithMaxObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.maxObjectSize = sz
}
}
}
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option {
return func(o *options) {
if c > 0 {
o.workersCount = c
}
}
}

View file

@ -0,0 +1,124 @@
package writecache
import (
"sort"
"time"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
const defaultPersistInterval = time.Second
// persistLoop persists object accumulated in memory to the database.
func (c *cache) persistLoop() {
tick := time.NewTicker(defaultPersistInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
c.mtx.RLock()
m := c.mem
c.mtx.RUnlock()
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
start := time.Now()
c.persistObjects(m)
c.log.Debug("persisted items to disk",
zap.Duration("took", time.Since(start)),
zap.Int("total", len(m)))
c.mtx.Lock()
n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n]
for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data))
}
c.mtx.Unlock()
sz := 0
for i := range m {
sz += len(m[i].addr) + m[i].obj.ToV2().StableSize()
}
c.dbSize.Add(uint64(sz))
case <-c.closeCh:
return
}
}
}
func (c *cache) persistToCache(objs []objectInfo) []int {
var (
failMem []int
doneMem []int
)
_ = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for i := range objs {
if uint64(len(objs[i].data)) >= c.smallObjectSize {
failMem = append(failMem, i)
continue
}
err := b.Put([]byte(objs[i].addr), objs[i].data)
if err != nil {
return err
}
doneMem = append(doneMem, i)
}
return nil
})
if len(doneMem) > 0 {
c.evictObjects(len(doneMem))
for _, i := range doneMem {
c.flushed.Add(objs[i].addr, true)
}
}
var failDisk []int
for _, i := range failMem {
if uint64(len(objs[i].data)) > c.maxObjectSize {
failDisk = append(failDisk, i)
continue
}
err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data)
if err != nil {
failDisk = append(failDisk, i)
}
}
return failDisk
}
// persistObjects tries to write objects from memory to the persistent storage.
// If tryCache is false, writing skips cache and is done directly to the main storage.
func (c *cache) persistObjects(objs []objectInfo) {
toDisk := c.persistToCache(objs)
j := 0
for i := range objs {
ch := c.metaCh
if j < len(toDisk) {
if i == toDisk[j] {
ch = c.directCh
} else {
for ; j < len(toDisk) && i > toDisk[j]; j++ {
}
}
}
select {
case ch <- objs[j].obj:
case <-c.closeCh:
return
}
}
}

View file

@ -0,0 +1,44 @@
package writecache
import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
// ErrBigObject is returned when object is too big to be placed in cache.
var ErrBigObject = errors.New("too big object")
// Put puts object to write-cache.
func (c *cache) Put(o *object.Object) error {
sz := uint64(o.ToV2().StableSize())
if sz > c.maxObjectSize {
return ErrBigObject
}
data, err := o.Marshal(nil)
if err != nil {
return err
}
oi := objectInfo{
addr: o.Address().String(),
obj: o,
data: data,
}
c.mtx.Lock()
if sz < c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize {
c.curMemSize += sz
c.mem = append(c.mem, oi)
c.mtx.Unlock()
return nil
}
c.mtx.Unlock()
c.persistObjects([]objectInfo{oi})
return nil
}

View file

@ -0,0 +1,130 @@
package writecache
import (
"errors"
"os"
"path"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru/simplelru"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
flushed simplelru.LRUCache
db *bbolt.DB
}
const lruKeysCount = 256 * 1024 * 8
const dbName = "small.bolt"
func (c *cache) openStore() error {
if err := os.MkdirAll(c.path, os.ModePerm); err != nil {
return err
}
db, err := bbolt.Open(path.Join(c.path, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
NoSync: true,
})
if err != nil {
return err
}
c.fsTree = &fstree.FSTree{
Info: fstree.Info{
Permissions: os.ModePerm,
RootPath: c.path,
},
Depth: 1,
DirNameLen: 1,
}
_ = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
c.db = db
c.flushed, _ = lru.New(lruKeysCount)
return nil
}
func (s *store) removeFlushedKeys(n int) ([][]byte, [][]byte) {
var keysMem, keysDisk [][]byte
for i := 0; i < n; i++ {
k, v, ok := s.flushed.RemoveOldest()
if !ok {
break
}
if v.(bool) {
keysMem = append(keysMem, []byte(k.(string)))
} else {
keysDisk = append(keysDisk, []byte(k.(string)))
}
}
return keysMem, keysDisk
}
func (c *cache) evictObjects(putCount int) {
sum := c.flushed.Len() + putCount
if sum <= lruKeysCount {
return
}
keysMem, keysDisk := c.store.removeFlushedKeys(sum - lruKeysCount)
if err := c.deleteFromDB(keysMem); err != nil {
c.log.Error("error while removing objects from write-cache (database)", zap.Error(err))
}
if err := c.deleteFromDisk(keysDisk); err != nil {
c.log.Error("error while removing objects from write-cache (disk)", zap.Error(err))
}
}
func (c *cache) deleteFromDB(keys [][]byte) error {
if len(keys) == 0 {
return nil
}
return c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for i := range keys {
if err := b.Delete(keys[i]); err != nil {
return err
}
}
return nil
})
}
func (c *cache) deleteFromDisk(keys [][]byte) error {
var lastErr error
for i := range keys {
addr := objectSDK.NewAddress()
addrStr := string(keys[i])
if err := addr.Parse(addrStr); err != nil {
c.log.Error("can't parse address", zap.String("address", addrStr))
continue
}
if err := c.fsTree.Delete(addr); err != nil && !errors.Is(err, fstree.ErrFileNotFound) {
lastErr = err
c.log.Error("can't remove object from write-cache", zap.Error(err))
continue
}
}
return lastErr
}

View file

@ -0,0 +1,109 @@
package writecache
import (
"sync"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"go.uber.org/atomic"
"go.uber.org/zap"
)
// Cache represents write-cache for objects.
type Cache interface {
Get(*objectSDK.Address) (*object.Object, error)
Delete(*objectSDK.Address) error
Put(*object.Object) error
Init() error
Open() error
Close() error
}
type cache struct {
options
// mtx protects mem field, statistics and counters.
mtx sync.RWMutex
mem []objectInfo
// curMemSize is the current size of all objects cached in memory.
curMemSize uint64
// flushCh is a channel with objects to flush.
flushCh chan *object.Object
// directCh is a channel with objects to put directly to the main storage.
// it is prioritized over flushCh.
directCh chan *object.Object
// metaCh is a channel with objects for which only metadata needs to be written.
metaCh chan *object.Object
// closeCh is close channel.
closeCh chan struct{}
evictCh chan []byte
// store contains underlying database.
store
// dbSize stores approximate database size. It is updated every flush/persist cycle.
dbSize atomic.Uint64
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
}
type objectInfo struct {
addr string
data []byte
obj *object.Object
}
const (
maxInMemorySizeBytes = 1024 * 1024 * 1024 // 1 GiB
maxObjectSize = 64 * 1024 * 1024 // 64 MiB
smallObjectSize = 32 * 1024 // 32 KiB
)
var (
defaultBucket = []byte{0}
)
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
directCh: make(chan *object.Object),
metaCh: make(chan *object.Object),
closeCh: make(chan struct{}),
evictCh: make(chan []byte),
options: options{
log: zap.NewNop(),
maxMemSize: maxInMemorySizeBytes,
maxObjectSize: maxObjectSize,
smallObjectSize: smallObjectSize,
workersCount: flushWorkersCount,
},
}
for i := range opts {
opts[i](&c.options)
}
return c
}
// Open opens and initializes database.
func (c *cache) Open() error {
return c.openStore()
}
// Init runs necessary services.
func (c *cache) Init() error {
go c.persistLoop()
go c.flushLoop()
return nil
}
// Close closes db connection and stops services.
func (c *cache) Close() error {
close(c.closeCh)
return c.db.Close()
}