diff --git a/pkg/local_object_storage/shard/dump_test.go b/pkg/local_object_storage/shard/dump_test.go index 5c7c1fdb..5f2ac158 100644 --- a/pkg/local_object_storage/shard/dump_test.go +++ b/pkg/local_object_storage/shard/dump_test.go @@ -21,6 +21,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func TestDump(t *testing.T) { @@ -49,8 +50,11 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) { []writecache.Option{ writecache.WithSmallObjectSize(wcSmallObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize), + writecache.WithLogger(zaptest.NewLogger(t)), }, - nil) + []blobstor.Option{ + blobstor.WithLogger(zaptest.NewLogger(t)), + }) } defer releaseShard(sh, t) diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index cee7a9d0..839d735b 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -3,6 +3,7 @@ package shard import ( "fmt" + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -34,28 +35,33 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { return PutRes{}, ErrReadOnlyMode } + data, err := prm.obj.Marshal() + if err != nil { + return PutRes{}, fmt.Errorf("cannot marshal object: %w", err) + } + var putPrm common.PutPrm // form Put parameters putPrm.Object = prm.obj + putPrm.RawData = data + putPrm.Address = objectCore.AddressOf(prm.obj) + + var res common.PutRes // exist check are not performed there, these checks should be executed // ahead of `Put` by storage engine if s.hasWriteCache() { - err := s.writeCache.Put(prm.obj) - if err == nil { - return PutRes{}, nil + res, err = s.writeCache.Put(putPrm) + } + if err != nil || !s.hasWriteCache() { + if err != nil { + s.log.Debug("can't put object to the write-cache, trying blobstor", + zap.String("err", err.Error())) } - s.log.Debug("can't put message to writeCache, trying to blobStor", - zap.String("err", err.Error())) - } - - var ( - err error - res common.PutRes - ) - - if res, err = s.blobStor.Put(putPrm); err != nil { - return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err) + res, err = s.blobStor.Put(putPrm) + if err != nil { + return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err) + } } if !m.NoMetabase() { diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 61973c6f..27f31cde 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -22,20 +22,6 @@ func (c *cache) Delete(addr oid.Address) error { saddr := addr.EncodeToString() - // Check memory cache. - c.mtx.Lock() - for i := range c.mem { - if saddr == c.mem[i].addr { - c.curMemSize -= uint64(len(c.mem[i].data)) - copy(c.mem[i:], c.mem[i+1:]) - c.mem = c.mem[:len(c.mem)-1] - c.mtx.Unlock() - storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("in-mem DELETE")) - return nil - } - } - c.mtx.Unlock() - // Check disk cache. var has int _ = c.db.View(func(tx *bbolt.Tx) error { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 926c0108..0427b484 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,7 +1,6 @@ package writecache import ( - "sync" "time" "github.com/mr-tron/base58" @@ -24,44 +23,45 @@ const ( defaultFlushInterval = time.Second ) -// flushLoop periodically flushes changes from the database to memory. -func (c *cache) flushLoop() { - var wg sync.WaitGroup - +// runFlushLoop starts background workers which periodically flush objects to the blobstor. +func (c *cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - c.flushWorker(i) - }(i) + c.wg.Add(1) + go c.flushWorker(i) } - wg.Add(1) + c.wg.Add(1) + go c.flushBigObjects() + + c.wg.Add(1) go func() { - defer wg.Done() - c.flushBigObjects() - }() + defer c.wg.Done() - tt := time.NewTimer(defaultFlushInterval) - defer tt.Stop() + tt := time.NewTimer(defaultFlushInterval) + defer tt.Stop() - for { - select { - case <-tt.C: - c.flush() - tt.Reset(defaultFlushInterval) - case <-c.closeCh: - c.log.Debug("waiting for workers to quit") - wg.Wait() - return + for { + select { + case <-tt.C: + c.flush() + tt.Reset(defaultFlushInterval) + case <-c.closeCh: + return + } } - } + }() } func (c *cache) flush() { lastKey := []byte{} var m []objectInfo for { + select { + case <-c.closeCh: + return + default: + } + m = m[:0] sz := 0 @@ -122,6 +122,8 @@ func (c *cache) flush() { } func (c *cache) flushBigObjects() { + defer c.wg.Done() + tick := time.NewTicker(defaultFlushInterval * 10) for { select { @@ -181,6 +183,7 @@ func (c *cache) flushBigObjects() { c.evictObjects(evictNum) c.modeMtx.RUnlock() case <-c.closeCh: + return } } } @@ -188,63 +191,40 @@ func (c *cache) flushBigObjects() { // flushWorker runs in a separate goroutine and write objects to the main storage. // If flushFirst is true, flushing objects from cache database takes priority over // putting new objects. -func (c *cache) flushWorker(num int) { - priorityCh := c.directCh - switch num % 3 { - case 0: - priorityCh = c.flushCh - case 1: - priorityCh = c.metaCh - } +func (c *cache) flushWorker(_ int) { + defer c.wg.Done() var obj *object.Object for { - metaOnly := false - // Give priority to direct put. - // TODO(fyrchik): #1150 do this once in N iterations depending on load select { - case obj = <-priorityCh: - metaOnly = num%3 == 1 - default: - select { - case obj = <-c.directCh: - case obj = <-c.flushCh: - case obj = <-c.metaCh: - metaOnly = true - case <-c.closeCh: - return - } + case obj = <-c.flushCh: + case <-c.closeCh: + return } - err := c.writeObject(obj, metaOnly) + err := c.flushObject(obj) if err != nil { c.log.Error("can't flush object to the main storage", zap.Error(err)) } } } -// writeObject is used to write object directly to the main storage. -func (c *cache) writeObject(obj *object.Object, metaOnly bool) error { - var descriptor []byte +// flushObject is used to write object directly to the main storage. +func (c *cache) flushObject(obj *object.Object) error { + var prm common.PutPrm + prm.Object = obj - if !metaOnly { - var prm common.PutPrm - prm.Object = obj - - res, err := c.blobstor.Put(prm) - if err != nil { - return err - } - - descriptor = res.StorageID + res, err := c.blobstor.Put(prm) + if err != nil { + return err } var pPrm meta.PutPrm pPrm.SetObject(obj) - pPrm.SetStorageID(descriptor) + pPrm.SetStorageID(res.StorageID) - _, err := c.metabase.Put(pPrm) + _, err = c.metabase.Put(pPrm) return err } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 850e6a46..951daad7 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -14,20 +14,6 @@ import ( func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) { saddr := addr.EncodeToString() - c.mtx.RLock() - for i := range c.mem { - if saddr == c.mem[i].addr { - data := c.mem[i].data - c.mtx.RUnlock() - // We unmarshal object instead of using cached value to avoid possibility - // of unintentional object corruption by caller. - // It is safe to unmarshal without mutex, as storage under `c.mem[i].data` slices is not reused. - obj := objectSDK.New() - return obj, obj.Unmarshal(data) - } - } - c.mtx.RUnlock() - value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index bf6685a4..9f6d8bb4 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -23,13 +23,6 @@ func (c *cache) SetMode(m mode.Mode) error { return nil } - if !c.readOnly() { - // Because modeMtx is taken no new objects will arrive an all other modifying - // operations are completed. - // 1. Persist objects already in memory on disk. - c.persistMemoryCache() - } - if c.db != nil { if err := c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) @@ -37,13 +30,10 @@ func (c *cache) SetMode(m mode.Mode) error { c.db = nil } - // 2. Suspend producers to ensure there are channel send operations in fly. - // metaCh and directCh can be populated either during Put or in background memory persist thread. - // Former possibility is eliminated by taking `modeMtx` mutex and - // latter by explicit persist in the previous step. - // flushCh is populated by `flush` with `modeMtx` is also taken. - // Thus all producers are shutdown and we only need to wait until all channels are empty. - for len(c.metaCh) != 0 || len(c.directCh) != 0 || len(c.flushCh) != 0 { + // Suspend producers to ensure there are channel send operations in fly. + // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty + // guarantees that there are no in-fly operations. + for len(c.flushCh) != 0 { c.log.Info("waiting for channels to flush") time.Sleep(time.Second) } diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go deleted file mode 100644 index f0692d28..00000000 --- a/pkg/local_object_storage/writecache/persist.go +++ /dev/null @@ -1,153 +0,0 @@ -package writecache - -import ( - "sort" - "time" - - "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" - "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -const defaultPersistInterval = time.Second - -// persistLoop persists object accumulated in memory to the database. -func (c *cache) persistLoop() { - tick := time.NewTicker(defaultPersistInterval) - defer tick.Stop() - - for { - select { - case <-tick.C: - c.modeMtx.RLock() - if c.readOnly() { - c.modeMtx.RUnlock() - continue - } - c.persistMemoryCache() - c.modeMtx.RUnlock() - case <-c.closeCh: - return - } - } -} - -func (c *cache) persistMemoryCache() { - c.mtx.RLock() - m := c.mem - c.mtx.RUnlock() - - if len(m) == 0 { - return - } - - sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) - - start := time.Now() - c.persistSmallObjects(m) - c.log.Debug("persisted items to disk", - zap.Duration("took", time.Since(start)), - zap.Int("total", len(m))) - - for i := range m { - storagelog.Write(c.log, - storagelog.AddressField(m[i].addr), - storagelog.OpField("in-mem DELETE persist"), - ) - } - - c.mtx.Lock() - c.curMemSize = 0 - n := copy(c.mem, c.mem[len(m):]) - c.mem = c.mem[:n] - for i := range c.mem { - c.curMemSize += uint64(len(c.mem[i].data)) - } - c.mtx.Unlock() -} - -// persistSmallObjects persists small objects to the write-cache database and -// pushes the to the flush workers queue. -func (c *cache) persistSmallObjects(objs []objectInfo) { - cacheSize := c.estimateCacheSize() - overflowIndex := len(objs) - for i := range objs { - newSize := c.incSizeDB(cacheSize) - if c.maxCacheSize < newSize { - overflowIndex = i - break - } - cacheSize = newSize - } - - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for i := 0; i < overflowIndex; i++ { - err := b.Put([]byte(objs[i].addr), objs[i].data) - if err != nil { - return err - } - } - return nil - }) - if err != nil { - overflowIndex = 0 - } else { - c.evictObjects(len(objs) - overflowIndex) - } - - for i := 0; i < overflowIndex; i++ { - storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) - c.objCounters.IncDB() - } - for i := overflowIndex; i < len(objs); i++ { - c.flushed.Add(objs[i].addr, true) - } - - c.addToFlushQueue(objs, overflowIndex) -} - -// persistBigObject writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) persistBigObject(objInfo objectInfo) { - cacheSz := c.estimateCacheSize() - metaIndex := 0 - if c.incSizeFS(cacheSz) <= c.maxCacheSize { - var prm common.PutPrm - prm.Address = object.AddressOf(objInfo.obj) - prm.RawData = objInfo.data - - _, err := c.fsTree.Put(prm) - if err == nil { - metaIndex = 1 - if c.blobstor.NeedsCompression(objInfo.obj) { - c.mtx.Lock() - c.compressFlags[objInfo.addr] = struct{}{} - c.mtx.Unlock() - } - c.objCounters.IncFS() - storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT")) - } - } - c.addToFlushQueue([]objectInfo{objInfo}, metaIndex) -} - -// addToFlushQueue pushes objects to the flush workers queue. -// For objects below metaIndex only meta information will be flushed. -func (c *cache) addToFlushQueue(objs []objectInfo, metaIndex int) { - for i := 0; i < metaIndex; i++ { - select { - case c.metaCh <- objs[i].obj: - case <-c.closeCh: - return - } - } - for i := metaIndex; i < len(objs); i++ { - select { - case c.directCh <- objs[i].obj: - case <-c.closeCh: - return - } - } -} diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index b8900275..cbf39c61 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -3,57 +3,80 @@ package writecache import ( "errors" - "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "go.etcd.io/bbolt" ) -// ErrBigObject is returned when object is too big to be placed in cache. -var ErrBigObject = errors.New("too big object") +var ( + // ErrBigObject is returned when object is too big to be placed in cache. + ErrBigObject = errors.New("too big object") + // ErrOutOfSpace is returned when there is no space left to put a new object. + ErrOutOfSpace = errors.New("no space left in the write cache") +) // Put puts object to write-cache. -func (c *cache) Put(o *objectSDK.Object) error { +func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) { c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { - return ErrReadOnly + return common.PutRes{}, ErrReadOnly } - sz := uint64(o.ToV2().StableSize()) + sz := uint64(len(prm.RawData)) if sz > c.maxObjectSize { - return ErrBigObject + return common.PutRes{}, ErrBigObject } - data, err := o.Marshal() + oi := objectInfo{ + addr: prm.Address.EncodeToString(), + obj: prm.Object, + data: prm.RawData, + } + + if sz <= c.smallObjectSize { + return common.PutRes{}, c.putSmall(oi) + } + return common.PutRes{}, c.putBig(oi.addr, prm) +} + +// putSmall persists small objects to the write-cache database and +// pushes the to the flush workers queue. +func (c *cache) putSmall(obj objectInfo) error { + cacheSize := c.estimateCacheSize() + if c.maxCacheSize < c.incSizeDB(cacheSize) { + return ErrOutOfSpace + } + + err := c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte(obj.addr), obj.data) + }) + if err == nil { + storagelog.Write(c.log, storagelog.AddressField(obj.addr), storagelog.OpField("db PUT")) + c.objCounters.IncDB() + } + return nil +} + +// putBig writes object to FSTree and pushes it to the flush workers queue. +func (c *cache) putBig(addr string, prm common.PutPrm) error { + cacheSz := c.estimateCacheSize() + if c.maxCacheSize < c.incSizeFS(cacheSz) { + return ErrOutOfSpace + } + + _, err := c.fsTree.Put(prm) if err != nil { return err } - oi := objectInfo{ - addr: object.AddressOf(o).EncodeToString(), - obj: o, - data: data, - } - - c.mtx.Lock() - - if sz <= c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize { - c.curMemSize += sz - c.mem = append(c.mem, oi) - + if c.blobstor.NeedsCompression(prm.Object) { + c.mtx.Lock() + c.compressFlags[addr] = struct{}{} c.mtx.Unlock() - - storagelog.Write(c.log, storagelog.AddressField(oi.addr), storagelog.OpField("in-mem PUT")) - - return nil - } - - c.mtx.Unlock() - - if sz <= c.smallObjectSize { - c.persistSmallObjects([]objectInfo{oi}) - } else { - c.persistBigObject(oi) } + c.objCounters.IncFS() + storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.OpField("fstree PUT")) return nil } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 40902be0..cc00ca59 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -3,6 +3,7 @@ package writecache import ( "sync" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -23,7 +24,7 @@ type Cache interface { Head(oid.Address) (*object.Object, error) Delete(oid.Address) error Iterate(IterationPrm) error - Put(*object.Object) error + Put(common.PutPrm) (common.PutRes, error) SetMode(mode.Mode) error SetLogger(*zap.Logger) DumpInfo() Info @@ -36,9 +37,8 @@ type Cache interface { type cache struct { options - // mtx protects mem field, statistics, counters and compressFlags. + // mtx protects statistics, counters and compressFlags. mtx sync.RWMutex - mem []objectInfo mode mode.Mode modeMtx sync.RWMutex @@ -47,19 +47,12 @@ type cache struct { // whether object should be compressed. compressFlags map[string]struct{} - // curMemSize is the current size of all objects cached in memory. - curMemSize uint64 - // flushCh is a channel with objects to flush. flushCh chan *object.Object - // directCh is a channel with objects to put directly to the main storage. - // it is prioritized over flushCh. - directCh chan *object.Object - // metaCh is a channel with objects for which only metadata needs to be written. - metaCh chan *object.Object // closeCh is close channel. closeCh chan struct{} - evictCh chan []byte + // wg is a wait group for flush workers. + wg sync.WaitGroup // store contains underlying database. store // fsTree contains big files stored directly on file-system. @@ -86,12 +79,9 @@ var ( // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - flushCh: make(chan *object.Object), - directCh: make(chan *object.Object), - metaCh: make(chan *object.Object), - closeCh: make(chan struct{}), - evictCh: make(chan []byte), - mode: mode.ReadWrite, + flushCh: make(chan *object.Object), + closeCh: make(chan struct{}), + mode: mode.ReadWrite, compressFlags: make(map[string]struct{}), options: options{ @@ -144,9 +134,7 @@ func (c *cache) Open(readOnly bool) error { // Init runs necessary services. func (c *cache) Init() error { c.initFlushMarks() - - go c.persistLoop() - go c.flushLoop() + c.runFlushLoop() return nil } @@ -158,6 +146,8 @@ func (c *cache) Close() error { } close(c.closeCh) + c.wg.Wait() + if c.objCounters != nil { c.objCounters.FlushAndClose() }