From cfcefc48bcfd360d574f2182485fd9026cb7f04e Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Fri, 5 May 2023 18:40:57 +0300
Subject: [PATCH 1/3] [#314] writecache: Simplify background workers naming
Also, drop not used arg.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/writecache/flush.go | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index 04fcccede..1e24a42ee 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -37,12 +37,12 @@ const (
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
- go c.flushWorker(i)
+ go c.workerFlushSmall()
}
c.wg.Add(1)
go func() {
- c.flushBigObjects(context.TODO())
+ c.workerFlushBig(context.TODO())
c.wg.Done()
}()
@@ -56,7 +56,7 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
- c.flushDB()
+ c.flushSmallObjects()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
return
@@ -65,7 +65,7 @@ func (c *cache) runFlushLoop() {
}()
}
-func (c *cache) flushDB() {
+func (c *cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
@@ -148,7 +148,7 @@ func (c *cache) flushDB() {
}
}
-func (c *cache) flushBigObjects(ctx context.Context) {
+func (c *cache) workerFlushBig(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10)
for {
select {
@@ -228,8 +228,8 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
-// flushWorker writes objects to the main storage.
-func (c *cache) flushWorker(_ int) {
+// workerFlushSmall writes small objects to the main storage.
+func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *object.Object
--
2.45.2
From 389c3251b7733f716bc2f64f4ef06433f1426612 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Fri, 5 May 2023 18:59:49 +0300
Subject: [PATCH 2/3] [#314] writecache: Do not lose small objects on disk
errors
Do return error if an object could not been stored on WC's disk.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/writecache/put.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index e2535d9e2..223a73bef 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -78,7 +78,7 @@ func (c *cache) putSmall(obj objectInfo) error {
)
c.objCounters.IncDB()
}
- return nil
+ return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
--
2.45.2
From d259dd8cd91e67e7912f603fb4095f7c1bd33015 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov
Date: Wed, 10 May 2023 17:43:49 +0300
Subject: [PATCH 3/3] [#314] writecache: remove objects right after they are
flushed
Signed-off-by: Evgenii Stratonikov
---
pkg/local_object_storage/writecache/flush.go | 28 +--
.../writecache/flush_test.go | 99 +--------
pkg/local_object_storage/writecache/get.go | 2 -
pkg/local_object_storage/writecache/init.go | 192 ------------------
.../writecache/iterate.go | 6 -
pkg/local_object_storage/writecache/mode.go | 16 --
pkg/local_object_storage/writecache/put.go | 2 -
.../writecache/storage.go | 46 +----
.../writecache/writecache.go | 42 ++--
9 files changed, 25 insertions(+), 408 deletions(-)
delete mode 100644 pkg/local_object_storage/writecache/init.go
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index 1e24a42ee..c6c8a9465 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -78,7 +78,7 @@ func (c *cache) flushSmallObjects() {
m = m[:0]
c.modeMtx.RLock()
- if c.readOnly() || !c.initialized.Load() {
+ if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@@ -117,10 +117,6 @@ func (c *cache) flushSmallObjects() {
var count int
for i := range m {
- if c.flushed.Contains(m[i].addr) {
- continue
- }
-
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
@@ -157,9 +153,6 @@ func (c *cache) workerFlushBig(ctx context.Context) {
if c.readOnly() {
c.modeMtx.RUnlock()
break
- } else if !c.initialized.Load() {
- c.modeMtx.RUnlock()
- continue
}
_ = c.flushFSTree(ctx, true)
@@ -187,10 +180,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
- if _, ok := c.store.flushed.Peek(sAddr); ok {
- return nil
- }
-
data, err := f()
if err != nil {
c.reportFlushError("can't read a file", sAddr, err)
@@ -218,9 +207,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
- // mark object as flushed
- c.flushed.Add(sAddr, false)
-
+ c.deleteFromDisk(ctx, []string{sAddr})
return nil
}
@@ -242,9 +229,12 @@ func (c *cache) workerFlushSmall() {
}
err := c.flushObject(context.TODO(), obj, nil)
- if err == nil {
- c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
+ if err != nil {
+ // Error is handled in flushObject.
+ continue
}
+
+ c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
}
}
@@ -306,10 +296,6 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
- if _, ok := c.flushed.Peek(sa); ok {
- continue
- }
-
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors {
diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go
index 2cec07081..e5ca85735 100644
--- a/pkg/local_object_storage/writecache/flush_test.go
+++ b/pkg/local_object_storage/writecache/flush_test.go
@@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
- "time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
- apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
- wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
-
require.NoError(t, wc.Flush(context.Background(), false))
- for i := 0; i < 2; i++ {
- var mPrm meta.GetPrm
- mPrm.SetAddress(objects[i].addr)
- _, err := mb.Get(context.Background(), mPrm)
- require.Error(t, err)
-
- _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
- require.Error(t, err)
- }
-
- check(t, mb, bs, objects[2:])
+ check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
@@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
-
- wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
-
require.NoError(t, wc.SetMode(mode.Degraded))
- for i := 0; i < 2; i++ {
- var mPrm meta.GetPrm
- mPrm.SetAddress(objects[i].addr)
- _, err := mb.Get(context.Background(), mPrm)
- require.Error(t, err)
-
- _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
- require.Error(t, err)
- }
-
- check(t, mb, bs, objects[2:])
+ check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
@@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
})
})
})
-
- t.Run("on init", func(t *testing.T) {
- wc, bs, mb := newCache(t)
- objects := []objectPair{
- // removed
- putObject(t, wc, 1),
- putObject(t, wc, smallSize+1),
- // not found
- putObject(t, wc, 1),
- putObject(t, wc, smallSize+1),
- // ok
- putObject(t, wc, 1),
- putObject(t, wc, smallSize+1),
- }
-
- require.NoError(t, wc.Close())
- require.NoError(t, bs.SetMode(mode.ReadWrite))
- require.NoError(t, mb.SetMode(mode.ReadWrite))
-
- for i := range objects {
- var prm meta.PutPrm
- prm.SetObject(objects[i].obj)
- _, err := mb.Put(context.Background(), prm)
- require.NoError(t, err)
- }
-
- var inhumePrm meta.InhumePrm
- inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
- inhumePrm.SetTombstoneAddress(oidtest.Address())
- _, err := mb.Inhume(context.Background(), inhumePrm)
- require.NoError(t, err)
-
- var deletePrm meta.DeletePrm
- deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
- _, err = mb.Delete(context.Background(), deletePrm)
- require.NoError(t, err)
-
- require.NoError(t, bs.SetMode(mode.ReadOnly))
- require.NoError(t, mb.SetMode(mode.ReadOnly))
-
- // Open in read-only: no error, nothing is removed.
- require.NoError(t, wc.Open(true))
- initWC(t, wc)
- for i := range objects {
- _, err := wc.Get(context.Background(), objects[i].addr)
- require.NoError(t, err, i)
- }
- require.NoError(t, wc.Close())
-
- // Open in read-write: no error, something is removed.
- require.NoError(t, wc.Open(false))
- initWC(t, wc)
- for i := range objects {
- _, err := wc.Get(context.Background(), objects[i].addr)
- if i < 2 {
- require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
- } else {
- require.NoError(t, err, i)
- }
- }
- })
}
func putObject(t *testing.T, c Cache, size int) objectPair {
@@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
-
- require.Eventually(t, func() bool {
- rawWc := wc.(*cache)
- return rawWc.initialized.Load()
- }, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}
diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go
index 6af1bd181..030f9b413 100644
--- a/pkg/local_object_storage/writecache/get.go
+++ b/pkg/local_object_storage/writecache/get.go
@@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
- c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
- c.flushed.Get(saddr)
return res.Object, nil
}
diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go
deleted file mode 100644
index 2ca8cceef..000000000
--- a/pkg/local_object_storage/writecache/init.go
+++ /dev/null
@@ -1,192 +0,0 @@
-package writecache
-
-import (
- "context"
- "errors"
- "sync"
-
- "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
- "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
- storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
- meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
- apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
- oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
- "go.etcd.io/bbolt"
- "go.uber.org/zap"
-)
-
-func (c *cache) initFlushMarks(ctx context.Context) {
- var localWG sync.WaitGroup
-
- localWG.Add(1)
- go func() {
- defer localWG.Done()
-
- c.fsTreeFlushMarkUpdate(ctx)
- }()
-
- localWG.Add(1)
- go func() {
- defer localWG.Done()
-
- c.dbFlushMarkUpdate(ctx)
- }()
-
- c.initWG.Add(1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- defer c.initWG.Done()
-
- localWG.Wait()
-
- select {
- case <-c.stopInitCh:
- return
- case <-c.closeCh:
- return
- default:
- }
-
- c.initialized.Store(true)
- }()
-}
-
-var errStopIter = errors.New("stop iteration")
-
-func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
- c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
-
- var prm common.IteratePrm
- prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
- select {
- case <-c.closeCh:
- return errStopIter
- case <-c.stopInitCh:
- return errStopIter
- default:
- }
-
- flushed, needRemove := c.flushStatus(ctx, addr)
- if flushed {
- c.store.flushed.Add(addr.EncodeToString(), true)
- if needRemove {
- var prm common.DeletePrm
- prm.Address = addr
-
- _, err := c.fsTree.Delete(ctx, prm)
- if err == nil {
- storagelog.Write(c.log,
- storagelog.AddressField(addr),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("fstree DELETE"),
- )
- }
- }
- }
- return nil
- }
-
- c.modeMtx.RLock()
- defer c.modeMtx.RUnlock()
-
- _, _ = c.fsTree.Iterate(prm)
-
- c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
-}
-
-func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
- c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
-
- c.modeMtx.RLock()
- defer c.modeMtx.RUnlock()
-
- var m []string
- var indices []int
- var lastKey []byte
- var batchSize = flushBatchSize
- for {
- select {
- case <-c.closeCh:
- return
- case <-c.stopInitCh:
- return
- default:
- }
-
- m = m[:0]
- indices = indices[:0]
-
- // We put objects in batches of fixed size to not interfere with main put cycle a lot.
- _ = c.db.View(func(tx *bbolt.Tx) error {
- b := tx.Bucket(defaultBucket)
- cs := b.Cursor()
- for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
- m = append(m, string(k))
- }
- return nil
- })
-
- var addr oid.Address
- for i := range m {
- if err := addr.DecodeString(m[i]); err != nil {
- continue
- }
-
- flushed, needRemove := c.flushStatus(ctx, addr)
- if flushed {
- c.store.flushed.Add(addr.EncodeToString(), true)
- if needRemove {
- indices = append(indices, i)
- }
- }
- }
-
- if len(m) == 0 {
- break
- }
-
- err := c.db.Batch(func(tx *bbolt.Tx) error {
- b := tx.Bucket(defaultBucket)
- for _, j := range indices {
- if err := b.Delete([]byte(m[j])); err != nil {
- return err
- }
- }
- return nil
- })
- if err == nil {
- for _, j := range indices {
- storagelog.Write(c.log,
- zap.String("address", m[j]),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("db DELETE"),
- )
- }
- }
- lastKey = append([]byte(m[len(m)-1]), 0)
- }
-
- c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
-}
-
-// flushStatus returns info about the object state in the main storage.
-// First return value is true iff object exists.
-// Second return value is true iff object can be safely removed.
-func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
- var existsPrm meta.ExistsPrm
- existsPrm.SetAddress(addr)
-
- _, err := c.metabase.Exists(ctx, existsPrm)
- if err != nil {
- needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
- return needRemove, needRemove
- }
-
- var prm meta.StorageIDPrm
- prm.SetAddress(addr)
-
- mRes, _ := c.metabase.StorageID(ctx, prm)
- res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
- return err == nil && res.Exists, false
-}
diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go
index 228dd2597..ebe979520 100644
--- a/pkg/local_object_storage/writecache/iterate.go
+++ b/pkg/local_object_storage/writecache/iterate.go
@@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
- if _, ok := c.flushed.Peek(string(k)); ok {
- return nil
- }
return prm.handler(data)
})
})
@@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
- if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
- return nil
- }
data, err := f()
if err != nil {
if prm.ignoreErrors {
diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go
index 14f8af49e..ca6faff4c 100644
--- a/pkg/local_object_storage/writecache/mode.go
+++ b/pkg/local_object_storage/writecache/mode.go
@@ -37,22 +37,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
var err error
turnOffMeta := m.NoMetabase()
- if !c.initialized.Load() {
- close(c.stopInitCh)
-
- c.initWG.Wait()
- c.stopInitCh = make(chan struct{})
-
- defer func() {
- if err == nil && !turnOffMeta {
- c.initFlushMarks(ctx)
- }
- }()
- }
-
- c.modeMtx.Lock()
- defer c.modeMtx.Unlock()
-
if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(ctx, true)
if err != nil {
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index 223a73bef..04d818b31 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -37,8 +37,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
- } else if !c.initialized.Load() {
- return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))
diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go
index aeae752e3..c06d16c0b 100644
--- a/pkg/local_object_storage/writecache/storage.go
+++ b/pkg/local_object_storage/writecache/storage.go
@@ -13,8 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
- lru "github.com/hashicorp/golang-lru/v2"
- "github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@@ -22,19 +20,7 @@ import (
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
- maxFlushedMarksCount int
- maxRemoveBatchSize int
-
- // flushed contains addresses of objects that were already flushed to the main storage.
- // We use LRU cache instead of map here to facilitate removing of unused object in favour of
- // frequently read ones.
- // MUST NOT be used inside bolt db transaction because it's eviction handler
- // removes untracked items from the database.
- flushed simplelru.LRUCache[string, bool]
- db *bbolt.DB
-
- dbKeysToRemove []string
- fsKeysToRemove []string
+ db *bbolt.DB
}
const dbName = "small.bolt"
@@ -73,35 +59,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err)
}
- // Write-cache can be opened multiple times during `SetMode`.
- // flushed map must not be re-created in this case.
- if c.flushed == nil {
- c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
- }
-
- c.initialized.Store(false)
-
return nil
}
-// removeFlushed removes an object from the writecache.
-// To minimize interference with the client operations, the actual removal
-// is done in batches.
-// It is not thread-safe and is used only as an evict callback to LRU cache.
-func (c *cache) removeFlushed(key string, value bool) {
- fromDatabase := value
- if fromDatabase {
- c.dbKeysToRemove = append(c.dbKeysToRemove, key)
- } else {
- c.fsKeysToRemove = append(c.fsKeysToRemove, key)
- }
-
- if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
- c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
- c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
- }
-}
-
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys
@@ -133,7 +93,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
return keys[:len(keys)-errorIndex]
}
-func (c *cache) deleteFromDisk(keys []string) []string {
+func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
if len(keys) == 0 {
return keys
}
@@ -147,7 +107,7 @@ func (c *cache) deleteFromDisk(keys []string) []string {
continue
}
- _, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
+ _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index bdcc9bbf6..83ecf219c 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -5,7 +5,6 @@ import (
"os"
"sync"
- "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@@ -13,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
- "go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -52,11 +50,8 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
- mode mode.Mode
- initialized atomic.Bool
- stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
- initWG sync.WaitGroup // for initialisation routines only
- modeMtx sync.RWMutex
+ mode mode.Mode
+ modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@@ -96,9 +91,8 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
- flushCh: make(chan *object.Object),
- mode: mode.ReadWrite,
- stopInitCh: make(chan struct{}),
+ flushCh: make(chan *object.Object),
+ mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
@@ -117,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options)
}
- // Make the LRU cache contain which take approximately 3/4 of the maximum space.
- // Assume small and big objects are stored in 50-50 proportion.
- c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
- // Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
- c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
-
return c
}
@@ -153,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
- ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
- defer span.End()
-
- c.initFlushMarks(ctx)
c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
- // Finish all in-progress operations.
- if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
- return err
- }
-
+ // We cannot lock mutex for the whole operation duration
+ // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
+ c.modeMtx.Lock()
if c.closeCh != nil {
close(c.closeCh)
}
+ c.mode = mode.DegradedReadOnly // prevent new operations from being processed
+ c.modeMtx.Unlock()
+
c.wg.Wait()
- if c.closeCh != nil {
- c.closeCh = nil
- }
- c.initialized.Store(false)
+ c.modeMtx.Lock()
+ defer c.modeMtx.Unlock()
+ c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()
--
2.45.2