diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 314c791c..f03568e9 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -24,6 +24,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" @@ -131,6 +132,11 @@ const ( cfgBlobStorSection = "blobstor" cfgWriteCacheSection = "writecache" + cfgWriteCacheMemSize = "mem_size" + cfgWriteCacheDBSize = "db_size" + cfgWriteCacheSmallSize = "small_size" + cfgWriteCacheMaxSize = "max_size" + cfgWriteCacheWrkCount = "workers_count" cfgBlobStorCompress = "compress" cfgBlobStorShallowDepth = "shallow_depth" cfgBlobStorTreePath = "path" @@ -560,6 +566,11 @@ func initShardOptions(c *cfg) { c.log.Warn("incorrect writeCache path, ignore shard") break } + writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize)) + writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize)) + writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize)) + writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize)) + writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount)) blobPrefix := configPath(prefix, cfgBlobStorSection) @@ -589,6 +600,9 @@ func initShardOptions(c *cfg) { if smallSzLimit == 0 { smallSzLimit = 1 << 20 // 1MB } + if writeCacheMaxSize <= 0 { + writeCacheSmallSize = smallSzLimit + } blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection) @@ -657,9 +671,13 @@ func initShardOptions(c *cfg) { ), shard.WithWriteCache(useCache), shard.WithWriteCacheOptions( - blobstor.WithRootPath(writeCachePath), - blobstor.WithBlobovniczaShallowDepth(0), - blobstor.WithBlobovniczaShallowWidth(1), + writecache.WithPath(writeCachePath), + writecache.WithLogger(c.log), + writecache.WithMaxMemSize(writeCacheMemSize), + writecache.WithMaxObjectSize(writeCacheMaxSize), + writecache.WithSmallObjectSize(writeCacheSmallSize), + writecache.WithMaxDBSize(writeCacheDBSize), + writecache.WithFlushWorkersCount(writeCacheWrkCount), ), shard.WithRemoverBatchSize(rmBatchSize), shard.WithGCRemoverSleepInterval(rmSleepInterval), diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index bcdabb87..41da5e8f 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "path" + "strings" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" ) @@ -44,6 +45,52 @@ func stringifyAddress(addr *objectSDK.Address) string { return hex.EncodeToString(h[:]) } +// Iterate iterates over all stored objects. +func (t *FSTree) Iterate(f func(addr *objectSDK.Address, data []byte) error) error { + return t.iterate(0, []string{t.RootPath}, f) +} + +func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, []byte) error) error { + curName := strings.Join(curPath[1:], "") + des, err := ioutil.ReadDir(curName) + if err != nil { + return err + } + + isLast := depth >= t.Depth + l := len(curPath) + curPath = append(curPath, "") + + for i := range des { + curPath[l] = des[i].Name() + + if !isLast && des[i].IsDir() { + err := t.iterate(depth+1, curPath, f) + if err != nil { + return err + } + } + + addr := objectSDK.NewAddress() + err := addr.Parse(curName + des[i].Name()) + if err != nil { + continue + } + + curPath = append(curPath, des[i].Name()) + data, err := ioutil.ReadFile(path.Join(curPath...)) + if err != nil { + return err + } + + if err := f(addr, data); err != nil { + return err + } + } + + return nil +} + func (t *FSTree) treePath(addr *objectSDK.Address) string { sAddr := stringifyAddress(addr) diff --git a/pkg/local_object_storage/blobstor/info.go b/pkg/local_object_storage/blobstor/info.go index 3e8b0433..cb1a11c5 100644 --- a/pkg/local_object_storage/blobstor/info.go +++ b/pkg/local_object_storage/blobstor/info.go @@ -2,7 +2,7 @@ package blobstor import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" -// DumpInfo returns information about the BlobStor. +// FSTree returns file-system tree for big object store. func (b *BlobStor) DumpInfo() fstree.Info { return b.fsTree.Info } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 17484f37..caa43ac5 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -47,10 +47,9 @@ func (p *PutPrm) WithBlobovniczaID(id *blobovnicza.ID) *PutPrm { } var ( - ErrUnknownObjectType = errors.New("unknown object type") - ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it") - ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") - ErrIncorrectRootObject = errors.New("invalid root object") + ErrUnknownObjectType = errors.New("unknown object type") + ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") + ErrIncorrectRootObject = errors.New("invalid root object") ) // Put saves the object in DB. @@ -379,20 +378,12 @@ func decodeList(data []byte) (lst [][]byte, err error) { // updateBlobovniczaID for existing objects if they were moved from from // one blobovnicza to another. func updateBlobovniczaID(tx *bbolt.Tx, addr *objectSDK.Address, id *blobovnicza.ID) error { - bkt := tx.Bucket(smallBucketName(addr.ContainerID())) - if bkt == nil { - // if object exists, don't have blobovniczaID and we want to update it - // then ignore, this should never happen - return ErrIncorrectBlobovniczaUpdate + bkt, err := tx.CreateBucketIfNotExists(smallBucketName(addr.ContainerID())) + if err != nil { + return err } - objectKey := objectKey(addr.ObjectID()) - - if len(bkt.Get(objectKey)) == 0 { - return ErrIncorrectBlobovniczaUpdate - } - - return bkt.Put(objectKey, *id) + return bkt.Put(objectKey(addr.ObjectID()), *id) } // updateSpliInfo for existing objects if storage filled with extra information diff --git a/pkg/local_object_storage/metabase/put_test.go b/pkg/local_object_storage/metabase/put_test.go index 8223a62c..c5a90001 100644 --- a/pkg/local_object_storage/metabase/put_test.go +++ b/pkg/local_object_storage/metabase/put_test.go @@ -42,8 +42,5 @@ func TestDB_PutBlobovnicaUpdate(t *testing.T) { fetchedBlobovniczaID, err := meta.IsSmall(db, raw2.Object().Address()) require.NoError(t, err) require.Nil(t, fetchedBlobovniczaID) - - err = meta.Put(db, raw2.Object(), &blobovniczaID) - require.Error(t, err) }) } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 02f43bfb..3a3c22fb 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -2,9 +2,11 @@ package shard import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -37,13 +39,10 @@ func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) { smalls := make(map[*objectSDK.Address]*blobovnicza.ID, ln) for i := range prm.addr { - delSmallPrm.SetAddress(prm.addr[i]) - delBigPrm.SetAddress(prm.addr[i]) - if s.hasWriteCache() { - _, err := s.writeCache.DeleteSmall(delSmallPrm) - if err != nil { - _, _ = s.writeCache.DeleteBig(delBigPrm) + err := s.writeCache.Delete(prm.addr[i]) + if err != nil && !errors.Is(err, object.ErrNotFound) { + s.log.Error("can't delete object from write cache", zap.String("error", err.Error())) } } diff --git a/pkg/local_object_storage/shard/delete_test.go b/pkg/local_object_storage/shard/delete_test.go index cd40b50e..facacb42 100644 --- a/pkg/local_object_storage/shard/delete_test.go +++ b/pkg/local_object_storage/shard/delete_test.go @@ -9,24 +9,19 @@ import ( ) func TestShard_Delete(t *testing.T) { - sh := newShard(t, false) - shWC := newShard(t, true) - - defer func() { - releaseShard(sh, t) - releaseShard(shWC, t) - }() - t.Run("without write cache", func(t *testing.T) { - testShardDelete(t, sh) + testShardDelete(t, false) }) t.Run("with write cache", func(t *testing.T) { - testShardDelete(t, shWC) + testShardDelete(t, true) }) } -func testShardDelete(t *testing.T, sh *shard.Shard) { +func testShardDelete(t *testing.T, hasWriteCache bool) { + sh := newShard(t, hasWriteCache) + defer releaseShard(sh, t) + cid := generateCID() obj := generateRawObjectWithCID(t, cid) @@ -47,7 +42,7 @@ func testShardDelete(t *testing.T, sh *shard.Shard) { _, err := sh.Put(putPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) _, err = sh.Delete(delPrm) diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index eb98513f..ed9f5172 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -8,6 +8,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/pkg/errors" + "go.uber.org/zap" ) // storFetcher is a type to unify object fetching mechanism in `fetchObjectData` @@ -89,19 +91,16 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) ) if s.hasWriteCache() { - res, err = small(s.writeCache, nil) + res, err = s.writeCache.Get(addr) if err == nil { return res, nil } - s.log.Debug("miss in writeCache blobovnicza") - - res, err = big(s.writeCache, nil) - if err == nil { - return res, nil + if errors.Is(err, object.ErrNotFound) { + s.log.Debug("object is missing in write-cache") + } else { + s.log.Error("failed to fetch object from write-cache", zap.Error(err)) } - - s.log.Debug("miss in writeCache shallow dir") } exists, err := meta.Exists(s.metaBase, addr) diff --git a/pkg/local_object_storage/shard/get_test.go b/pkg/local_object_storage/shard/get_test.go index 122d2a80..4d661054 100644 --- a/pkg/local_object_storage/shard/get_test.go +++ b/pkg/local_object_storage/shard/get_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "testing" + "time" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -12,31 +13,25 @@ import ( ) func TestShard_Get(t *testing.T) { - sh := newShard(t, false) - shWC := newShard(t, true) - - defer func() { - releaseShard(sh, t) - releaseShard(shWC, t) - }() - t.Run("without write cache", func(t *testing.T) { - testShardGet(t, sh) + testShardGet(t, false) }) t.Run("with write cache", func(t *testing.T) { - testShardGet(t, shWC) + testShardGet(t, true) }) } -func testShardGet(t *testing.T, sh *shard.Shard) { - obj := generateRawObject(t) - addAttribute(obj, "foo", "bar") +func testShardGet(t *testing.T, hasWriteCache bool) { + sh := newShard(t, hasWriteCache) + defer releaseShard(sh, t) putPrm := new(shard.PutPrm) getPrm := new(shard.GetPrm) t.Run("small object", func(t *testing.T) { + obj := generateRawObject(t) + addAttribute(obj, "foo", "bar") addPayload(obj, 1<<5) putPrm.WithObject(obj.Object()) @@ -46,12 +41,14 @@ func testShardGet(t *testing.T, sh *shard.Shard) { getPrm.WithAddress(obj.Object().Address()) - res, err := sh.Get(getPrm) + res, err := testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) require.Equal(t, obj.Object(), res.Object()) }) t.Run("big object", func(t *testing.T) { + obj := generateRawObject(t) + addAttribute(obj, "foo", "bar") obj.SetID(generateOID()) addPayload(obj, 1<<20) // big obj @@ -62,12 +59,14 @@ func testShardGet(t *testing.T, sh *shard.Shard) { getPrm.WithAddress(obj.Object().Address()) - res, err := sh.Get(getPrm) + res, err := testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) require.Equal(t, obj.Object(), res.Object()) }) t.Run("parent object", func(t *testing.T) { + obj := generateRawObject(t) + addAttribute(obj, "foo", "bar") cid := generateCID() splitID := objectSDK.NewSplitID() @@ -87,13 +86,13 @@ func testShardGet(t *testing.T, sh *shard.Shard) { getPrm.WithAddress(child.Object().Address()) - res, err := sh.Get(getPrm) + res, err := testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) require.True(t, binaryEqual(child.Object(), res.Object())) getPrm.WithAddress(parent.Object().Address()) - _, err = sh.Get(getPrm) + _, err = testGet(t, sh, getPrm, hasWriteCache) var expectedErr *objectSDK.SplitInfoError require.True(t, errors.As(err, &expectedErr)) @@ -106,6 +105,19 @@ func testShardGet(t *testing.T, sh *shard.Shard) { }) } +func testGet(t *testing.T, sh *shard.Shard, getPrm *shard.GetPrm, hasWriteCache bool) (*shard.GetRes, error) { + res, err := sh.Get(getPrm) + if hasWriteCache { + require.Eventually(t, func() bool { + if errors.Is(err, object.ErrNotFound) { + res, err = sh.Get(getPrm) + } + return !errors.Is(err, object.ErrNotFound) + }, time.Second, time.Millisecond*100) + } + return res, err +} + // binary equal is used when object contains empty lists in the structure and // requre.Equal fails on comparing and []{} lists. func binaryEqual(a, b *object.Object) bool { diff --git a/pkg/local_object_storage/shard/head_test.go b/pkg/local_object_storage/shard/head_test.go index 9af4f60a..8d0e6c39 100644 --- a/pkg/local_object_storage/shard/head_test.go +++ b/pkg/local_object_storage/shard/head_test.go @@ -3,31 +3,28 @@ package shard_test import ( "errors" "testing" + "time" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/stretchr/testify/require" ) func TestShard_Head(t *testing.T) { - sh := newShard(t, false) - shWC := newShard(t, true) - - defer func() { - releaseShard(sh, t) - releaseShard(shWC, t) - }() - t.Run("without write cache", func(t *testing.T) { - testShardHead(t, sh) + testShardHead(t, false) }) t.Run("with write cache", func(t *testing.T) { - testShardHead(t, shWC) + testShardHead(t, true) }) } -func testShardHead(t *testing.T, sh *shard.Shard) { +func testShardHead(t *testing.T, hasWriteCache bool) { + sh := newShard(t, hasWriteCache) + defer releaseShard(sh, t) + putPrm := new(shard.PutPrm) headPrm := new(shard.HeadPrm) @@ -42,7 +39,7 @@ func testShardHead(t *testing.T, sh *shard.Shard) { headPrm.WithAddress(obj.Object().Address()) - res, err := sh.Head(headPrm) + res, err := testHead(t, sh, headPrm, hasWriteCache) require.NoError(t, err) require.Equal(t, obj.Object(), res.Object()) }) @@ -69,7 +66,7 @@ func testShardHead(t *testing.T, sh *shard.Shard) { var siErr *objectSDK.SplitInfoError - _, err = sh.Head(headPrm) + _, err = testHead(t, sh, headPrm, hasWriteCache) require.True(t, errors.As(err, &siErr)) headPrm.WithAddress(parent.Object().Address()) @@ -80,3 +77,16 @@ func testShardHead(t *testing.T, sh *shard.Shard) { require.Equal(t, parent.Object(), head.Object()) }) } + +func testHead(t *testing.T, sh *shard.Shard, headPrm *shard.HeadPrm, hasWriteCache bool) (*shard.HeadRes, error) { + res, err := sh.Head(headPrm) + if hasWriteCache { + require.Eventually(t, func() bool { + if errors.Is(err, object.ErrNotFound) { + res, err = sh.Head(headPrm) + } + return !errors.Is(err, object.ErrNotFound) + }, time.Second, time.Millisecond*100) + } + return res, err +} diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index b81901eb..ae5b979a 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -44,6 +44,12 @@ func (p *InhumePrm) MarkAsGarbage(addr ...*objectSDK.Address) *InhumePrm { // Inhume calls metabase. Inhume method to mark object as removed. It won't be // removed physically from blobStor and metabase until `Delete` operation. func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) { + if s.hasWriteCache() { + for i := range prm.target { + _ = s.writeCache.Delete(prm.target[i]) + } + } + metaPrm := new(meta.InhumePrm).WithAddresses(prm.target...) if prm.tombstone != nil { diff --git a/pkg/local_object_storage/shard/inhume_test.go b/pkg/local_object_storage/shard/inhume_test.go index 14cab27a..ad1e1700 100644 --- a/pkg/local_object_storage/shard/inhume_test.go +++ b/pkg/local_object_storage/shard/inhume_test.go @@ -9,24 +9,19 @@ import ( ) func TestShard_Inhume(t *testing.T) { - sh := newShard(t, false) - shWC := newShard(t, true) - - defer func() { - releaseShard(sh, t) - releaseShard(shWC, t) - }() - t.Run("without write cache", func(t *testing.T) { - testShardInhume(t, sh) + testShardInhume(t, false) }) t.Run("with write cache", func(t *testing.T) { - testShardInhume(t, shWC) + testShardInhume(t, true) }) } -func testShardInhume(t *testing.T, sh *shard.Shard) { +func testShardInhume(t *testing.T, hasWriteCache bool) { + sh := newShard(t, hasWriteCache) + defer releaseShard(sh, t) + cid := generateCID() obj := generateRawObjectWithCID(t, cid) @@ -46,7 +41,7 @@ func testShardInhume(t *testing.T, sh *shard.Shard) { _, err := sh.Put(putPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) _, err = sh.Inhume(inhPrm) diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index d7fd0e71..e3399f67 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/pkg/errors" + "go.uber.org/zap" ) // PutPrm groups the parameters of Put operation. @@ -34,26 +35,24 @@ func (s *Shard) Put(prm *PutPrm) (*PutRes, error) { // exist check are not performed there, these checks should be executed // ahead of `Put` by storage engine + if s.hasWriteCache() { + err := s.writeCache.Put(prm.obj) + if err == nil { + return nil, nil + } + + s.log.Debug("can't put message to writeCache, trying to blobStor", + zap.String("err", err.Error())) + } var ( err error res *blobstor.PutRes ) - if s.hasWriteCache() { - res, err = s.writeCache.Put(putPrm) - if err != nil { - s.log.Debug("can't put message to writeCache, trying to blobStor") - - res = nil // just in case - } - } - // res == nil if there is no writeCache or writeCache.Put has been failed - if res == nil { - if res, err = s.blobStor.Put(putPrm); err != nil { - return nil, errors.Wrap(err, "could not put object to BLOB storage") - } + if res, err = s.blobStor.Put(putPrm); err != nil { + return nil, errors.Wrap(err, "could not put object to BLOB storage") } // put to metabase diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 039b117e..f32c2d01 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/atomic" @@ -19,7 +20,7 @@ type Shard struct { mode *atomic.Uint32 - writeCache *blobstor.BlobStor + writeCache writecache.Cache blobStor *blobstor.BlobStor @@ -43,7 +44,7 @@ type cfg struct { metaOpts []meta.Option - writeCacheOpts []blobstor.Option + writeCacheOpts []writecache.Option log *logger.Logger @@ -68,19 +69,22 @@ func New(opts ...Option) *Shard { opts[i](c) } - var writeCache *blobstor.BlobStor + bs := blobstor.New(c.blobOpts...) + mb := meta.New(c.metaOpts...) + var writeCache writecache.Cache if c.useWriteCache { - writeCache = blobstor.New( - append(c.blobOpts, c.writeCacheOpts...)..., - ) + writeCache = writecache.New( + append(c.writeCacheOpts, + writecache.WithBlobstor(bs), + writecache.WithMetabase(mb))...) } return &Shard{ cfg: c, mode: atomic.NewUint32(0), // TODO: init with particular mode - blobStor: blobstor.New(c.blobOpts...), - metaBase: meta.New(c.metaOpts...), + blobStor: bs, + metaBase: mb, writeCache: writeCache, } } @@ -107,7 +111,7 @@ func WithMetaBaseOptions(opts ...meta.Option) Option { } // WithMetaBaseOptions returns option to set internal metabase options. -func WithWriteCacheOptions(opts ...blobstor.Option) Option { +func WithWriteCacheOptions(opts ...writecache.Option) Option { return func(c *cfg) { c.writeCacheOpts = opts } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 5e2465f8..2dcb5e8a 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/util/test" "github.com/nspcc-dev/tzhash/tz" "github.com/stretchr/testify/require" @@ -41,9 +42,8 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { ), shard.WithWriteCache(enableWriteCache), shard.WithWriteCacheOptions( - blobstor.WithRootPath(path.Join(rootPath, "wcache")), - blobstor.WithBlobovniczaShallowWidth(1), - blobstor.WithBlobovniczaShallowDepth(0), + writecache.WithMaxMemSize(0), // disable memory batches + writecache.WithPath(path.Join(rootPath, "wcache")), ), } diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go new file mode 100644 index 00000000..c0d6533a --- /dev/null +++ b/pkg/local_object_storage/writecache/delete.go @@ -0,0 +1,49 @@ +package writecache + +import ( + "errors" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" +) + +// Delete removes object from write-cache. +func (c *cache) Delete(addr *objectSDK.Address) error { + saddr := addr.String() + + // Check memory cache. + c.mtx.Lock() + for i := range c.mem { + if saddr == c.mem[i].addr { + copy(c.mem[i:], c.mem[i+1:]) + c.mem = c.mem[:len(c.mem)-1] + c.mtx.Unlock() + return nil + } + } + c.mtx.Unlock() + + // Check disk cache. + has := false + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + has = b.Get([]byte(saddr)) != nil + return nil + }) + + if has { + return c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Delete([]byte(saddr)) + }) + } + + err := c.fsTree.Delete(addr) + if errors.Is(err, fstree.ErrFileNotFound) { + err = object.ErrNotFound + } + + return err +} diff --git a/pkg/local_object_storage/writecache/doc.go b/pkg/local_object_storage/writecache/doc.go new file mode 100644 index 00000000..90917500 --- /dev/null +++ b/pkg/local_object_storage/writecache/doc.go @@ -0,0 +1,20 @@ +// Package writecache implements write-cache for objects. +// +// It contains in-memory cache of fixed size and underlying database +// (usually on SSD) for storing small objects. +// There are 3 places where object can be: +// 1. In-memory cache. +// 2. On-disk cache DB. +// 3. Main storage (blobstor). +// +// There are 2 types of background jobs: +// 1. Persisting objects from in-memory cache to database. +// 2. Flushing objects from database to blobstor. +// On flushing object address is put in in-memory LRU cache. +// The actual deletion from the DB is done when object +// is evicted from this cache. +// +// Putting objects to the main storage is done by multiple workers. +// Some of them prioritize flushing items, others prioritize putting new objects. +// The current ration is 50/50. This helps to make some progress even under load. +package writecache diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go new file mode 100644 index 00000000..f42e01d9 --- /dev/null +++ b/pkg/local_object_storage/writecache/flush.go @@ -0,0 +1,196 @@ +package writecache + +import ( + "sync" + "time" + + "github.com/mr-tron/base58" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +const ( + // flushBatchSize is amount of keys which will be read from cache to be flushed + // to the main storage. It is used to reduce contention between cache put + // and cache persist. + flushBatchSize = 512 + // flushWorkersCount is number of workers for putting objects in main storage. + flushWorkersCount = 20 + // defaultFlushInterval is default time interval between successive flushes. + defaultFlushInterval = time.Second +) + +// flushLoop periodically flushes changes from the database to memory. +func (c *cache) flushLoop() { + var wg sync.WaitGroup + + for i := 0; i < c.workersCount; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + c.flushWorker(i) + }(i) + } + + wg.Add(1) + go func() { + defer wg.Done() + c.flushBigObjects() + }() + + tick := time.NewTicker(defaultFlushInterval) + for { + select { + case <-tick.C: + c.flush() + case <-c.closeCh: + c.log.Debug("waiting for workers to quit") + wg.Wait() + return + } + } +} + +func (c *cache) flush() { + lastKey := []byte{} + var m []objectInfo + for { + m = m[:0] + sz := 0 + + // We put objects in batches of fixed size to not interfere with main put cycle a lot. + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + for k, v := cs.Seek(lastKey); k != nil && len(m) < flushBatchSize; k, v = cs.Next() { + if _, ok := c.flushed.Peek(string(k)); ok { + continue + } + + sz += len(k) + len(v) + m = append(m, objectInfo{ + addr: string(k), + data: cloneBytes(v), + }) + } + return nil + }) + + for i := range m { + obj := object.New() + if err := obj.Unmarshal(m[i].data); err != nil { + continue + } + + select { + case c.flushCh <- obj: + case <-c.closeCh: + return + } + } + + c.evictObjects(len(m)) + for i := range m { + c.flushed.Add(m[i].addr, true) + } + + c.dbSize.Sub(uint64(sz)) + + c.log.Debug("flushed items from write-cache", + zap.Int("count", len(m)), + zap.String("start", base58.Encode(lastKey))) + + if len(m) > 0 { + lastKey = append([]byte(m[len(m)-1].addr), 0) + } else { + break + } + } +} + +func (c *cache) flushBigObjects() { + tick := time.NewTicker(defaultFlushInterval * 10) + for { + select { + case <-tick.C: + _ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error { + if _, ok := c.store.flushed.Peek(addr.String()); ok { + return nil + } + + if _, err := c.blobstor.PutRaw(addr, data); err != nil { + c.log.Error("cant flush object to blobstor", zap.Error(err)) + } + return nil + }) + case <-c.closeCh: + } + } +} + +// flushWorker runs in a separate goroutine and write objects to the main storage. +// If flushFirst is true, flushing objects from cache database takes priority over +// putting new objects. +func (c *cache) flushWorker(num int) { + priorityCh := c.directCh + switch num % 3 { + case 0: + priorityCh = c.flushCh + case 1: + priorityCh = c.metaCh + } + + var obj *object.Object + for { + metaOnly := false + + // Give priority to direct put. + // TODO(fyrchik): do this once in N iterations depending on load + select { + case obj = <-priorityCh: + default: + select { + case obj = <-c.directCh: + case obj = <-c.flushCh: + case obj = <-c.metaCh: + metaOnly = true + case <-c.closeCh: + return + } + } + + err := c.writeObject(obj, metaOnly) + if err != nil { + c.log.Error("can't flush object to the main storage", zap.Error(err)) + } + } +} + +// writeObject is used to write object directly to the main storage. +func (c *cache) writeObject(obj *object.Object, metaOnly bool) error { + var id *blobovnicza.ID + + if !metaOnly { + prm := new(blobstor.PutPrm) + prm.SetObject(obj) + res, err := c.blobstor.Put(prm) + if err != nil { + return err + } + + id = res.BlobovniczaID() + } + + return meta.Put(c.metabase, obj, id) +} + +func cloneBytes(a []byte) []byte { + b := make([]byte, len(a)) + copy(b, a) + return b +} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go new file mode 100644 index 00000000..1199d8ce --- /dev/null +++ b/pkg/local_object_storage/writecache/get.go @@ -0,0 +1,51 @@ +package writecache + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "go.etcd.io/bbolt" +) + +// Get returns object from write-cache. +func (c *cache) Get(addr *objectSDK.Address) (*object.Object, error) { + saddr := addr.String() + + c.mtx.RLock() + for i := range c.mem { + if saddr == c.mem[i].addr { + obj := c.mem[i].obj + c.mtx.RUnlock() + return obj, nil + } + } + c.mtx.RUnlock() + + var value []byte + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + val := b.Get([]byte(saddr)) + if val != nil { + value = cloneBytes(val) + } + return nil + }) + + if value != nil { + obj := object.New() + c.flushed.Get(saddr) + return obj, obj.Unmarshal(value) + } + + data, err := c.fsTree.Get(addr) + if err != nil { + return nil, object.ErrNotFound + } + + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, err + } + + c.flushed.Get(saddr) + return obj, nil +} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go new file mode 100644 index 00000000..5d253e1e --- /dev/null +++ b/pkg/local_object_storage/writecache/options.go @@ -0,0 +1,100 @@ +package writecache + +import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "go.uber.org/zap" +) + +// Option represents write-cache configuration option. +type Option func(*options) + +type options struct { + log *zap.Logger + // path is a path to a directory for write-cache. + path string + // blobstor is the main persistent storage. + blobstor *blobstor.BlobStor + // metabase is the metabase instance. + metabase *meta.DB + // maxMemSize is the maximum total size of all objects cached in memory. + // 1 GiB by default. + maxMemSize uint64 + // maxDBSize is the maximum size of database in bytes. + // Unrestricted by default. + maxDBSize uint64 + // maxObjectSize is the maximum size of the object stored in the write-cache. + maxObjectSize uint64 + // smallObjectSize is the maximum size of the object stored in the database. + smallObjectSize uint64 + // workersCount is the number of workers flushing objects in parallel. + workersCount int +} + +// WithLogger sets logger. +func WithLogger(log *zap.Logger) Option { + return func(o *options) { + o.log = log + } +} + +// WithPath sets path to writecache db. +func WithPath(path string) Option { + return func(o *options) { + o.path = path + } +} + +// WithBlobstor sets main object storage. +func WithBlobstor(bs *blobstor.BlobStor) Option { + return func(o *options) { + o.blobstor = bs + } +} + +// WithMetabase sets metabase. +func WithMetabase(db *meta.DB) Option { + return func(o *options) { + o.metabase = db + } +} + +// WithMaxMemSize sets maximum size for in-memory DB. +func WithMaxMemSize(sz uint64) Option { + return func(o *options) { + o.maxMemSize = sz + } +} + +// WithMaxDBSize sets maximum size for on-disk DB. +func WithMaxDBSize(sz uint64) Option { + return func(o *options) { + o.maxDBSize = sz + } +} + +// WithMaxObjectSize sets maximum object size to be stored in write-cache. +func WithMaxObjectSize(sz uint64) Option { + return func(o *options) { + if sz > 0 { + o.maxObjectSize = sz + } + } +} + +// WithSmallObjectSize sets maximum object size to be stored in write-cache. +func WithSmallObjectSize(sz uint64) Option { + return func(o *options) { + if sz > 0 { + o.smallObjectSize = sz + } + } +} + +func WithFlushWorkersCount(c int) Option { + return func(o *options) { + if c > 0 { + o.workersCount = c + } + } +} diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go new file mode 100644 index 00000000..1d793dec --- /dev/null +++ b/pkg/local_object_storage/writecache/persist.go @@ -0,0 +1,124 @@ +package writecache + +import ( + "sort" + "time" + + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +const defaultPersistInterval = time.Second + +// persistLoop persists object accumulated in memory to the database. +func (c *cache) persistLoop() { + tick := time.NewTicker(defaultPersistInterval) + defer tick.Stop() + + for { + select { + case <-tick.C: + c.mtx.RLock() + m := c.mem + c.mtx.RUnlock() + + sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) + + start := time.Now() + c.persistObjects(m) + c.log.Debug("persisted items to disk", + zap.Duration("took", time.Since(start)), + zap.Int("total", len(m))) + + c.mtx.Lock() + n := copy(c.mem, c.mem[len(m):]) + c.mem = c.mem[:n] + for i := range c.mem { + c.curMemSize += uint64(len(c.mem[i].data)) + } + c.mtx.Unlock() + + sz := 0 + for i := range m { + sz += len(m[i].addr) + m[i].obj.ToV2().StableSize() + } + c.dbSize.Add(uint64(sz)) + + case <-c.closeCh: + return + } + } +} + +func (c *cache) persistToCache(objs []objectInfo) []int { + var ( + failMem []int + doneMem []int + ) + + _ = c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + for i := range objs { + if uint64(len(objs[i].data)) >= c.smallObjectSize { + failMem = append(failMem, i) + continue + } + + err := b.Put([]byte(objs[i].addr), objs[i].data) + if err != nil { + return err + } + + doneMem = append(doneMem, i) + } + return nil + }) + + if len(doneMem) > 0 { + c.evictObjects(len(doneMem)) + for _, i := range doneMem { + c.flushed.Add(objs[i].addr, true) + } + } + + var failDisk []int + + for _, i := range failMem { + if uint64(len(objs[i].data)) > c.maxObjectSize { + failDisk = append(failDisk, i) + continue + } + + err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data) + if err != nil { + failDisk = append(failDisk, i) + } + } + + return failDisk +} + +// persistObjects tries to write objects from memory to the persistent storage. +// If tryCache is false, writing skips cache and is done directly to the main storage. +func (c *cache) persistObjects(objs []objectInfo) { + toDisk := c.persistToCache(objs) + j := 0 + + for i := range objs { + ch := c.metaCh + if j < len(toDisk) { + if i == toDisk[j] { + ch = c.directCh + } else { + for ; j < len(toDisk) && i > toDisk[j]; j++ { + } + } + } + + select { + case ch <- objs[j].obj: + case <-c.closeCh: + return + } + } +} diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go new file mode 100644 index 00000000..fb6ed6c0 --- /dev/null +++ b/pkg/local_object_storage/writecache/put.go @@ -0,0 +1,44 @@ +package writecache + +import ( + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +// ErrBigObject is returned when object is too big to be placed in cache. +var ErrBigObject = errors.New("too big object") + +// Put puts object to write-cache. +func (c *cache) Put(o *object.Object) error { + sz := uint64(o.ToV2().StableSize()) + if sz > c.maxObjectSize { + return ErrBigObject + } + + data, err := o.Marshal(nil) + if err != nil { + return err + } + + oi := objectInfo{ + addr: o.Address().String(), + obj: o, + data: data, + } + + c.mtx.Lock() + + if sz < c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize { + c.curMemSize += sz + c.mem = append(c.mem, oi) + + c.mtx.Unlock() + return nil + } + + c.mtx.Unlock() + + c.persistObjects([]objectInfo{oi}) + return nil +} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go new file mode 100644 index 00000000..2e92b490 --- /dev/null +++ b/pkg/local_object_storage/writecache/storage.go @@ -0,0 +1,130 @@ +package writecache + +import ( + "errors" + "os" + "path" + + lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru/simplelru" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// store represents persistent storage with in-memory LRU cache +// for flushed items on top of it. +type store struct { + flushed simplelru.LRUCache + db *bbolt.DB +} + +const lruKeysCount = 256 * 1024 * 8 + +const dbName = "small.bolt" + +func (c *cache) openStore() error { + if err := os.MkdirAll(c.path, os.ModePerm); err != nil { + return err + } + + db, err := bbolt.Open(path.Join(c.path, dbName), os.ModePerm, &bbolt.Options{ + NoFreelistSync: true, + NoSync: true, + }) + if err != nil { + return err + } + + c.fsTree = &fstree.FSTree{ + Info: fstree.Info{ + Permissions: os.ModePerm, + RootPath: c.path, + }, + Depth: 1, + DirNameLen: 1, + } + + _ = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(defaultBucket) + return err + }) + + c.db = db + c.flushed, _ = lru.New(lruKeysCount) + return nil +} + +func (s *store) removeFlushedKeys(n int) ([][]byte, [][]byte) { + var keysMem, keysDisk [][]byte + for i := 0; i < n; i++ { + k, v, ok := s.flushed.RemoveOldest() + if !ok { + break + } + + if v.(bool) { + keysMem = append(keysMem, []byte(k.(string))) + } else { + keysDisk = append(keysDisk, []byte(k.(string))) + } + } + + return keysMem, keysDisk +} + +func (c *cache) evictObjects(putCount int) { + sum := c.flushed.Len() + putCount + if sum <= lruKeysCount { + return + } + + keysMem, keysDisk := c.store.removeFlushedKeys(sum - lruKeysCount) + + if err := c.deleteFromDB(keysMem); err != nil { + c.log.Error("error while removing objects from write-cache (database)", zap.Error(err)) + } + + if err := c.deleteFromDisk(keysDisk); err != nil { + c.log.Error("error while removing objects from write-cache (disk)", zap.Error(err)) + } +} + +func (c *cache) deleteFromDB(keys [][]byte) error { + if len(keys) == 0 { + return nil + } + + return c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + for i := range keys { + if err := b.Delete(keys[i]); err != nil { + return err + } + } + return nil + }) +} + +func (c *cache) deleteFromDisk(keys [][]byte) error { + var lastErr error + + for i := range keys { + addr := objectSDK.NewAddress() + addrStr := string(keys[i]) + + if err := addr.Parse(addrStr); err != nil { + c.log.Error("can't parse address", zap.String("address", addrStr)) + continue + } + + if err := c.fsTree.Delete(addr); err != nil && !errors.Is(err, fstree.ErrFileNotFound) { + lastErr = err + c.log.Error("can't remove object from write-cache", zap.Error(err)) + continue + } + } + + return lastErr +} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go new file mode 100644 index 00000000..01163e47 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecache.go @@ -0,0 +1,109 @@ +package writecache + +import ( + "sync" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +// Cache represents write-cache for objects. +type Cache interface { + Get(*objectSDK.Address) (*object.Object, error) + Delete(*objectSDK.Address) error + Put(*object.Object) error + + Init() error + Open() error + Close() error +} + +type cache struct { + options + + // mtx protects mem field, statistics and counters. + mtx sync.RWMutex + mem []objectInfo + + // curMemSize is the current size of all objects cached in memory. + curMemSize uint64 + + // flushCh is a channel with objects to flush. + flushCh chan *object.Object + // directCh is a channel with objects to put directly to the main storage. + // it is prioritized over flushCh. + directCh chan *object.Object + // metaCh is a channel with objects for which only metadata needs to be written. + metaCh chan *object.Object + // closeCh is close channel. + closeCh chan struct{} + evictCh chan []byte + // store contains underlying database. + store + // dbSize stores approximate database size. It is updated every flush/persist cycle. + dbSize atomic.Uint64 + // fsTree contains big files stored directly on file-system. + fsTree *fstree.FSTree +} + +type objectInfo struct { + addr string + data []byte + obj *object.Object +} + +const ( + maxInMemorySizeBytes = 1024 * 1024 * 1024 // 1 GiB + maxObjectSize = 64 * 1024 * 1024 // 64 MiB + smallObjectSize = 32 * 1024 // 32 KiB +) + +var ( + defaultBucket = []byte{0} +) + +// New creates new writecache instance. +func New(opts ...Option) Cache { + c := &cache{ + flushCh: make(chan *object.Object), + directCh: make(chan *object.Object), + metaCh: make(chan *object.Object), + closeCh: make(chan struct{}), + evictCh: make(chan []byte), + + options: options{ + log: zap.NewNop(), + maxMemSize: maxInMemorySizeBytes, + maxObjectSize: maxObjectSize, + smallObjectSize: smallObjectSize, + workersCount: flushWorkersCount, + }, + } + + for i := range opts { + opts[i](&c.options) + } + + return c +} + +// Open opens and initializes database. +func (c *cache) Open() error { + return c.openStore() +} + +// Init runs necessary services. +func (c *cache) Init() error { + go c.persistLoop() + go c.flushLoop() + return nil +} + +// Close closes db connection and stops services. +func (c *cache) Close() error { + close(c.closeCh) + return c.db.Close() +}