From 77b1b80e73507027575542e8c370157b749872b5 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Fri, 5 May 2023 18:36:12 +0300
Subject: [PATCH 1/4] [#314] wc: Drop WC interface
It is not used for testing, its absence does not break build. The only one
implementation is placed in the same package.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/shard/get.go | 4 +-
pkg/local_object_storage/shard/range.go | 2 +-
pkg/local_object_storage/shard/shard.go | 2 +-
pkg/local_object_storage/writecache/delete.go | 2 +-
pkg/local_object_storage/writecache/flush.go | 18 ++++-----
.../writecache/flush_test.go | 31 +++++++-------
pkg/local_object_storage/writecache/get.go | 4 +-
pkg/local_object_storage/writecache/init.go | 8 ++--
.../writecache/iterate.go | 2 +-
pkg/local_object_storage/writecache/mode.go | 6 +--
pkg/local_object_storage/writecache/put.go | 6 +--
pkg/local_object_storage/writecache/state.go | 8 ++--
.../writecache/storage.go | 8 ++--
.../writecache/writecache.go | 40 ++++---------------
14 files changed, 58 insertions(+), 83 deletions(-)
diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go
index 5268ac790..0d400a7ff 100644
--- a/pkg/local_object_storage/shard/get.go
+++ b/pkg/local_object_storage/shard/get.go
@@ -91,7 +91,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return res.Object, nil
}
- wc := func(c writecache.Cache) (*objectSDK.Object, error) {
+ wc := func(c *writecache.Cache) (*objectSDK.Object, error) {
return c.Get(ctx, prm.addr)
}
@@ -109,7 +109,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
var emptyStorageID = make([]byte, 0)
// fetchObjectData looks through writeCache and blobStor to find object.
-func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
+func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w *writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
var (
mErr error
mRes meta.ExistsRes
diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go
index 06aea2f8a..b80adcc93 100644
--- a/pkg/local_object_storage/shard/range.go
+++ b/pkg/local_object_storage/shard/range.go
@@ -104,7 +104,7 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return obj, nil
}
- wc := func(c writecache.Cache) (*object.Object, error) {
+ wc := func(c *writecache.Cache) (*object.Object, error) {
res, err := c.Get(ctx, prm.addr)
if err != nil {
return nil, err
diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go
index 44ec54645..cd82b521c 100644
--- a/pkg/local_object_storage/shard/shard.go
+++ b/pkg/local_object_storage/shard/shard.go
@@ -23,7 +23,7 @@ type Shard struct {
gc *gc
- writeCache writecache.Cache
+ writeCache *writecache.Cache
blobStor *blobstor.BlobStor
diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go
index c1aab9e5a..f0d3cfbf5 100644
--- a/pkg/local_object_storage/writecache/delete.go
+++ b/pkg/local_object_storage/writecache/delete.go
@@ -15,7 +15,7 @@ import (
// Delete removes object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
-func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
+func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index 04fcccede..91052e9f0 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -34,7 +34,7 @@ const (
)
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
-func (c *cache) runFlushLoop() {
+func (c *Cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.flushWorker(i)
@@ -65,7 +65,7 @@ func (c *cache) runFlushLoop() {
}()
}
-func (c *cache) flushDB() {
+func (c *Cache) flushDB() {
var lastKey []byte
var m []objectInfo
for {
@@ -148,7 +148,7 @@ func (c *cache) flushDB() {
}
}
-func (c *cache) flushBigObjects(ctx context.Context) {
+func (c *Cache) flushBigObjects(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10)
for {
select {
@@ -171,7 +171,7 @@ func (c *cache) flushBigObjects(ctx context.Context) {
}
}
-func (c *cache) reportFlushError(msg string, addr string, err error) {
+func (c *Cache) reportFlushError(msg string, addr string, err error) {
if c.reportError != nil {
c.reportError(msg, err)
} else {
@@ -181,7 +181,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
}
}
-func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
+func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
@@ -229,7 +229,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
}
// flushWorker writes objects to the main storage.
-func (c *cache) flushWorker(_ int) {
+func (c *Cache) flushWorker(_ int) {
defer c.wg.Done()
var obj *object.Object
@@ -249,7 +249,7 @@ func (c *cache) flushWorker(_ int) {
}
// flushObject is used to write object directly to the main storage.
-func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
+func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
@@ -281,7 +281,7 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
// Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers.
-func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
+func (c *Cache) Flush(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors),
@@ -294,7 +294,7 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
return c.flush(ctx, ignoreErrors)
}
-func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
+func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
return err
}
diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go
index 2cec07081..c42bda255 100644
--- a/pkg/local_object_storage/writecache/flush_test.go
+++ b/pkg/local_object_storage/writecache/flush_test.go
@@ -39,7 +39,7 @@ func TestFlush(t *testing.T) {
smallSize = 256
)
- newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) {
+ newCache := func(t *testing.T, opts ...Option) (*Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir()
mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")),
@@ -76,7 +76,7 @@ func TestFlush(t *testing.T) {
return wc, bs, mb
}
- putObjects := func(t *testing.T, c Cache) []objectPair {
+ putObjects := func(t *testing.T, c *Cache) []objectPair {
objects := make([]objectPair, objCount)
for i := range objects {
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
@@ -109,8 +109,8 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
- wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
+ wc.flushed.Add(objects[0].addr.EncodeToString(), true)
+ wc.flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(context.Background(), false))
@@ -139,8 +139,8 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
- wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
+ wc.flushed.Add(objects[0].addr.EncodeToString(), true)
+ wc.flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded))
@@ -158,13 +158,13 @@ func TestFlush(t *testing.T) {
})
t.Run("ignore errors", func(t *testing.T) {
- testIgnoreErrors := func(t *testing.T, f func(*cache)) {
+ testIgnoreErrors := func(t *testing.T, f func(*Cache)) {
var errCount atomic.Uint32
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
errCount.Inc()
}))
objects := putObjects(t, wc)
- f(wc.(*cache))
+ f(wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
@@ -178,7 +178,7 @@ func TestFlush(t *testing.T) {
check(t, mb, bs, objects)
}
t.Run("db, invalid address", func(t *testing.T) {
- testIgnoreErrors(t, func(c *cache) {
+ testIgnoreErrors(t, func(c *Cache) {
_, data := newObject(t, 1)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
@@ -187,7 +187,7 @@ func TestFlush(t *testing.T) {
})
})
t.Run("db, invalid object", func(t *testing.T) {
- testIgnoreErrors(t, func(c *cache) {
+ testIgnoreErrors(t, func(c *Cache) {
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
@@ -195,7 +195,7 @@ func TestFlush(t *testing.T) {
})
})
t.Run("fs, read error", func(t *testing.T) {
- testIgnoreErrors(t, func(c *cache) {
+ testIgnoreErrors(t, func(c *Cache) {
obj, data := newObject(t, 1)
var prm common.PutPrm
@@ -214,7 +214,7 @@ func TestFlush(t *testing.T) {
})
})
t.Run("fs, invalid object", func(t *testing.T) {
- testIgnoreErrors(t, func(c *cache) {
+ testIgnoreErrors(t, func(c *Cache) {
var prm common.PutPrm
prm.Address = oidtest.Address()
prm.RawData = []byte{1, 2, 3}
@@ -286,7 +286,7 @@ func TestFlush(t *testing.T) {
})
}
-func putObject(t *testing.T, c Cache, size int) objectPair {
+func putObject(t *testing.T, c *Cache, size int) objectPair {
obj, data := newObject(t, size)
var prm common.PutPrm
@@ -319,12 +319,11 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
return obj, data
}
-func initWC(t *testing.T, wc Cache) {
+func initWC(t *testing.T, wc *Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
- rawWc := wc.(*cache)
- return rawWc.initialized.Load()
+ return wc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go
index 6af1bd181..e303b5d3d 100644
--- a/pkg/local_object_storage/writecache/get.go
+++ b/pkg/local_object_storage/writecache/get.go
@@ -18,7 +18,7 @@ import (
// Get returns object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
-func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
+func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
@@ -46,7 +46,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
// Head returns object header from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
-func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
+func (c *Cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go
index 2ca8cceef..28f1249f2 100644
--- a/pkg/local_object_storage/writecache/init.go
+++ b/pkg/local_object_storage/writecache/init.go
@@ -15,7 +15,7 @@ import (
"go.uber.org/zap"
)
-func (c *cache) initFlushMarks(ctx context.Context) {
+func (c *Cache) initFlushMarks(ctx context.Context) {
var localWG sync.WaitGroup
localWG.Add(1)
@@ -54,7 +54,7 @@ func (c *cache) initFlushMarks(ctx context.Context) {
var errStopIter = errors.New("stop iteration")
-func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
+func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
var prm common.IteratePrm
@@ -95,7 +95,7 @@ func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
}
-func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
+func (c *Cache) dbFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
c.modeMtx.RLock()
@@ -173,7 +173,7 @@ func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
-func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
+func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go
index 228dd2597..be7c37eab 100644
--- a/pkg/local_object_storage/writecache/iterate.go
+++ b/pkg/local_object_storage/writecache/iterate.go
@@ -31,7 +31,7 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
// Iterate iterates over all objects present in write cache.
// This is very difficult to do correctly unless write-cache is put in read-only mode.
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
-func (c *cache) Iterate(prm IterationPrm) error {
+func (c *Cache) Iterate(prm IterationPrm) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if !c.readOnly() {
diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go
index 14f8af49e..681cd84aa 100644
--- a/pkg/local_object_storage/writecache/mode.go
+++ b/pkg/local_object_storage/writecache/mode.go
@@ -22,7 +22,7 @@ var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
-func (c *cache) SetMode(m mode.Mode) error {
+func (c *Cache) SetMode(m mode.Mode) error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
trace.WithAttributes(
attribute.String("mode", m.String()),
@@ -33,7 +33,7 @@ func (c *cache) SetMode(m mode.Mode) error {
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
-func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
+func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
var err error
turnOffMeta := m.NoMetabase()
@@ -89,6 +89,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
// readOnly returns true if current mode is read-only.
// `c.modeMtx` must be taken.
-func (c *cache) readOnly() bool {
+func (c *Cache) readOnly() bool {
return c.mode.ReadOnly()
}
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index e2535d9e2..d5525a5a9 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -25,7 +25,7 @@ var (
// Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
// Returns ErrBigObject if an objects exceeds maximum object size.
-func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
+func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
@@ -60,7 +60,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
-func (c *cache) putSmall(obj objectInfo) error {
+func (c *Cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
@@ -82,7 +82,7 @@ func (c *cache) putSmall(obj objectInfo) error {
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
-func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
+func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
cacheSz := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeFS(cacheSz) {
return ErrOutOfSpace
diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go
index 1ba5a4bd3..99f496487 100644
--- a/pkg/local_object_storage/writecache/state.go
+++ b/pkg/local_object_storage/writecache/state.go
@@ -7,15 +7,15 @@ import (
"go.uber.org/atomic"
)
-func (c *cache) estimateCacheSize() uint64 {
+func (c *Cache) estimateCacheSize() uint64 {
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
}
-func (c *cache) incSizeDB(sz uint64) uint64 {
+func (c *Cache) incSizeDB(sz uint64) uint64 {
return sz + c.smallObjectSize
}
-func (c *cache) incSizeFS(sz uint64) uint64 {
+func (c *Cache) incSizeFS(sz uint64) uint64 {
return sz + c.maxObjectSize
}
@@ -47,7 +47,7 @@ func (x *counters) FS() uint64 {
return x.cFS.Load()
}
-func (c *cache) initCounters() error {
+func (c *Cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go
index aeae752e3..93f4cfd80 100644
--- a/pkg/local_object_storage/writecache/storage.go
+++ b/pkg/local_object_storage/writecache/storage.go
@@ -39,7 +39,7 @@ type store struct {
const dbName = "small.bolt"
-func (c *cache) openStore(readOnly bool) error {
+func (c *Cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil {
return err
@@ -88,7 +88,7 @@ func (c *cache) openStore(readOnly bool) error {
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
-func (c *cache) removeFlushed(key string, value bool) {
+func (c *Cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
@@ -102,7 +102,7 @@ func (c *cache) removeFlushed(key string, value bool) {
}
}
-func (c *cache) deleteFromDB(keys []string) []string {
+func (c *Cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys
}
@@ -133,7 +133,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
return keys[:len(keys)-errorIndex]
}
-func (c *cache) deleteFromDisk(keys []string) []string {
+func (c *Cache) deleteFromDisk(keys []string) []string {
if len(keys) == 0 {
return keys
}
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index bdcc9bbf6..a58d2edbb 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -6,12 +6,10 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
- "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
- oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -24,29 +22,7 @@ type Info struct {
}
// Cache represents write-cache for objects.
-type Cache interface {
- Get(ctx context.Context, address oid.Address) (*object.Object, error)
- Head(context.Context, oid.Address) (*object.Object, error)
- // Delete removes object referenced by the given oid.Address from the
- // Cache. Returns any error encountered that prevented the object to be
- // removed.
- //
- // Returns apistatus.ObjectNotFound if object is missing in the Cache.
- // Returns ErrReadOnly if the Cache is currently in the read-only mode.
- Delete(context.Context, oid.Address) error
- Iterate(IterationPrm) error
- Put(context.Context, common.PutPrm) (common.PutRes, error)
- SetMode(mode.Mode) error
- SetLogger(*logger.Logger)
- DumpInfo() Info
- Flush(context.Context, bool) error
-
- Init() error
- Open(readOnly bool) error
- Close() error
-}
-
-type cache struct {
+type Cache struct {
options
// mtx protects statistics, counters and compressFlags.
@@ -94,8 +70,8 @@ var (
)
// New creates new writecache instance.
-func New(opts ...Option) Cache {
- c := &cache{
+func New(opts ...Option) *Cache {
+ c := &Cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
@@ -127,18 +103,18 @@ func New(opts ...Option) Cache {
}
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
-func (c *cache) SetLogger(l *logger.Logger) {
+func (c *Cache) SetLogger(l *logger.Logger) {
c.log = l
}
-func (c *cache) DumpInfo() Info {
+func (c *Cache) DumpInfo() Info {
return Info{
Path: c.path,
}
}
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
-func (c *cache) Open(readOnly bool) error {
+func (c *Cache) Open(readOnly bool) error {
err := c.openStore(readOnly)
if err != nil {
return err
@@ -152,7 +128,7 @@ func (c *cache) Open(readOnly bool) error {
}
// Init runs necessary services.
-func (c *cache) Init() error {
+func (c *Cache) Init() error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
defer span.End()
@@ -162,7 +138,7 @@ func (c *cache) Init() error {
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
-func (c *cache) Close() error {
+func (c *Cache) Close() error {
// Finish all in-progress operations.
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
return err
--
2.45.2
From 1e6ffd45ad658c0648e21a975c6de52298a87689 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Fri, 5 May 2023 18:40:57 +0300
Subject: [PATCH 2/4] [#314] wc: Simplify background workers naming
Also, drop not used arg.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/writecache/flush.go | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index 91052e9f0..bdebad897 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -37,7 +37,7 @@ const (
func (c *Cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
- go c.flushWorker(i)
+ go c.smallObjectsFlusher()
}
c.wg.Add(1)
@@ -56,7 +56,7 @@ func (c *Cache) runFlushLoop() {
for {
select {
case <-tt.C:
- c.flushDB()
+ c.flushSmallObjects()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
return
@@ -65,7 +65,7 @@ func (c *Cache) runFlushLoop() {
}()
}
-func (c *Cache) flushDB() {
+func (c *Cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
@@ -228,8 +228,8 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
-// flushWorker writes objects to the main storage.
-func (c *Cache) flushWorker(_ int) {
+// smallObjectsFlusher writes small objects to the main storage.
+func (c *Cache) smallObjectsFlusher() {
defer c.wg.Done()
var obj *object.Object
--
2.45.2
From 99f76599e8ec1cdbe2fdb2862a7bd5a5eaffd305 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Fri, 5 May 2023 18:59:49 +0300
Subject: [PATCH 3/4] [#314] wc: Do not lose small objects on disk errors
Do return error if an object could not been stored on WC's disk.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/writecache/put.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index d5525a5a9..1b513854e 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -78,7 +78,7 @@ func (c *Cache) putSmall(obj objectInfo) error {
)
c.objCounters.IncDB()
}
- return nil
+ return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
--
2.45.2
From 34544502dc0cfa5f9b26c34893929d67a1b3b7be Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Wed, 3 May 2023 12:11:36 +0300
Subject: [PATCH 4/4] [#314] wc: Simplify WC
Do not use write-cache as a read cache: always remove objects from the
WC, not only if an object hasn't been used for some time (LRU cache is
dropped). Use object size (in bytes) as a metric of used space, not an
approximate (and too inaccurate) maximum stored objects number.
Signed-off-by: Pavel Karpy
---
pkg/local_object_storage/shard/mode/mode.go | 4 +
pkg/local_object_storage/writecache/delete.go | 28 ++-
pkg/local_object_storage/writecache/flush.go | 193 ++++++++++++++----
.../writecache/flush_test.go | 83 ++++----
pkg/local_object_storage/writecache/get.go | 2 -
pkg/local_object_storage/writecache/init.go | 192 ++---------------
.../writecache/iterate.go | 6 -
pkg/local_object_storage/writecache/mode.go | 48 +++--
.../writecache/options.go | 2 -
pkg/local_object_storage/writecache/put.go | 55 +++--
pkg/local_object_storage/writecache/state.go | 136 ++++++++----
.../writecache/storage.go | 122 +----------
.../writecache/writecache.go | 86 +++-----
13 files changed, 436 insertions(+), 521 deletions(-)
diff --git a/pkg/local_object_storage/shard/mode/mode.go b/pkg/local_object_storage/shard/mode/mode.go
index 65b2b5c89..db11163c3 100644
--- a/pkg/local_object_storage/shard/mode/mode.go
+++ b/pkg/local_object_storage/shard/mode/mode.go
@@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
func (m Mode) ReadOnly() bool {
return m&ReadOnly != 0
}
+
+func (m Mode) ReadWrite() bool {
+ return m == 0
+}
diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go
index f0d3cfbf5..5a4890075 100644
--- a/pkg/local_object_storage/writecache/delete.go
+++ b/pkg/local_object_storage/writecache/delete.go
@@ -2,10 +2,12 @@ package writecache
import (
"context"
+ "errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
+ apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
@@ -31,14 +33,14 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString()
// Check disk cache.
- var has int
+ var valLen int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
- has = len(b.Get([]byte(saddr)))
+ valLen = len(b.Get([]byte(saddr)))
return nil
})
- if 0 < has {
+ if valLen > 0 {
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
err := b.Delete([]byte(saddr))
@@ -52,18 +54,32 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
- c.objCounters.DecDB()
+ c.objCounters.decDB(valLen)
return nil
}
- _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
+ // While getting an object looks overheadly, it allows to
+ // get its size correctly without any additional memory/disk/CPU
+ // usage on the WC's side _for every object_. `Delete` is not
+ // expected to be called right after an object is put to the
+ // Write-cache often, and for non-existing objects (persisted
+ // to the main storage and dropped from the WC's storage) it
+ // is `os.Stat` vs `os.Remove` calls after all.
+ res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
+ if errors.As(err, new(apistatus.ObjectNotFound)) {
+ return nil
+ } else if err != nil {
+ return err
+ }
+
+ _, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
- c.objCounters.DecFS()
+ c.objCounters.decFS(len(res.RawData))
}
return err
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index bdebad897..9d12cf3f1 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -11,7 +11,9 @@ import (
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
+ storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
+ apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
@@ -33,6 +35,11 @@ const (
defaultFlushInterval = time.Second
)
+type objWithData struct {
+ obj *object.Object
+ data []byte
+}
+
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *Cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
@@ -58,7 +65,7 @@ func (c *Cache) runFlushLoop() {
case <-tt.C:
c.flushSmallObjects()
tt.Reset(defaultFlushInterval)
- case <-c.closeCh:
+ case <-c.workersChan:
return
}
}
@@ -70,20 +77,13 @@ func (c *Cache) flushSmallObjects() {
var m []objectInfo
for {
select {
- case <-c.closeCh:
+ case <-c.workersChan:
return
default:
}
m = m[:0]
- c.modeMtx.RLock()
- if c.readOnly() || !c.initialized.Load() {
- c.modeMtx.RUnlock()
- time.Sleep(time.Second)
- continue
- }
-
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
@@ -117,31 +117,25 @@ func (c *Cache) flushSmallObjects() {
var count int
for i := range m {
- if c.flushed.Contains(m[i].addr) {
- continue
- }
-
obj := object.New()
- if err := obj.Unmarshal(m[i].data); err != nil {
+ data := m[i].data
+
+ if err := obj.Unmarshal(data); err != nil {
continue
}
count++
select {
- case c.flushCh <- obj:
- case <-c.closeCh:
- c.modeMtx.RUnlock()
+ case c.smallFlushCh <- objWithData{obj: obj, data: data}:
+ case <-c.workersChan:
return
}
}
if count == 0 {
- c.modeMtx.RUnlock()
break
}
- c.modeMtx.RUnlock()
-
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
@@ -157,15 +151,12 @@ func (c *Cache) flushBigObjects(ctx context.Context) {
if c.readOnly() {
c.modeMtx.RUnlock()
break
- } else if !c.initialized.Load() {
- c.modeMtx.RUnlock()
- continue
}
_ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock()
- case <-c.closeCh:
+ case <-c.workersChan:
return
}
}
@@ -187,8 +178,12 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
- if _, ok := c.store.flushed.Peek(sAddr); ok {
- return nil
+ select {
+ case <-c.workersChan:
+ return stopIter
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
}
data, err := f()
@@ -210,7 +205,7 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
- err = c.flushObject(ctx, &obj, data)
+ err = c.flushObject(ctx, objWithData{obj: &obj, data: data})
if err != nil {
if ignoreErrors {
return nil
@@ -218,13 +213,23 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
- // mark object as flushed
- c.flushed.Add(sAddr, false)
+ err = c.dropBigObject(ctx, addr, len(data))
+ if err != nil {
+ c.reportFlushError("can't drop an object from FSTree", sAddr, err)
+ if ignoreErrors {
+ return nil
+ }
+ return err
+ }
return nil
}
_, err := c.fsTree.Iterate(prm)
+ if errors.Is(err, stopIter) {
+ return nil
+ }
+
return err
}
@@ -232,24 +237,32 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
func (c *Cache) smallObjectsFlusher() {
defer c.wg.Done()
- var obj *object.Object
+ var objAndData objWithData
for {
// Give priority to direct put.
select {
- case obj = <-c.flushCh:
- case <-c.closeCh:
+ case objAndData = <-c.smallFlushCh:
+ case <-c.workersChan:
return
}
- err := c.flushObject(context.TODO(), obj, nil)
+ err := c.flushObject(context.TODO(), objAndData)
if err == nil {
- c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
+ addr := objectCore.AddressOf(objAndData.obj)
+
+ err = c.dropSmallObject(context.TODO(), addr)
+ if err != nil {
+ c.reportFlushError("can't drop object from write-cache",
+ addr.EncodeToString(), err)
+ }
}
}
}
// flushObject is used to write object directly to the main storage.
-func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
+func (c *Cache) flushObject(ctx context.Context, objAndData objWithData) error {
+ obj := objAndData.obj
+ data := objAndData.data
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
@@ -272,6 +285,11 @@ func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte
_, err = c.metabase.UpdateStorageID(updPrm)
if err != nil {
+ if errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) {
+ // object info is outdated in the WC
+ return nil
+ }
+
c.reportFlushError("can't update object storage ID",
addr.EncodeToString(), err)
}
@@ -299,16 +317,20 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
- return c.db.View(func(tx *bbolt.Tx) error {
+ var dbFunc func(func(*bbolt.Tx) error) error
+ if c.readOnly() {
+ dbFunc = c.db.View
+ } else {
+ dbFunc = c.db.Update
+ }
+
+ return dbFunc(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
- if _, ok := c.flushed.Peek(sa); ok {
- continue
- }
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
@@ -327,10 +349,101 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
- if err := c.flushObject(ctx, &obj, data); err != nil {
+ err := c.flushObject(ctx, objWithData{obj: &obj, data: data})
+ if err != nil {
+ if ignoreErrors {
+ continue
+ }
+
return err
}
+
+ if c.readOnly() {
+ continue
+ }
+
+ removed, err := dropObject(tx, k)
+ if err != nil {
+ c.reportFlushError("can't drop an object from the DB", sa, err)
+ if ignoreErrors {
+ continue
+ }
+ }
+
+ storagelog.Write(c.log,
+ storagelog.AddressField(addr),
+ storagelog.StorageTypeField(wcStorageType),
+ storagelog.OpField("db DELETE"),
+ )
+ c.objCounters.decDB(removed)
}
return nil
})
}
+
+func (c *Cache) dropSmallObject(ctx context.Context, addr oid.Address) error {
+ var removedBytes int
+ key := []byte(addr.EncodeToString())
+ var err error
+
+ err = c.db.Batch(func(tx *bbolt.Tx) error {
+ select {
+ case <-c.workersChan:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ removedBytes, err = dropObject(tx, key)
+
+ return err
+
+ })
+ if err != nil {
+ return err
+ }
+
+ storagelog.Write(c.log,
+ storagelog.AddressField(addr),
+ storagelog.StorageTypeField(wcStorageType),
+ storagelog.OpField("db DELETE"),
+ )
+
+ if removedBytes > 0 {
+ c.objCounters.decDB(removedBytes)
+ }
+
+ return nil
+}
+
+func dropObject(tx *bbolt.Tx, key []byte) (int, error) {
+ b := tx.Bucket(defaultBucket)
+
+ removedBytes := len(b.Get(key))
+ if removedBytes > 0 {
+ return removedBytes, b.Delete(key)
+ }
+
+ return 0, nil
+}
+
+func (c *Cache) dropBigObject(ctx context.Context, addr oid.Address, size int) error {
+ _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
+ if err != nil {
+ if errors.As(err, new(apistatus.ObjectNotFound)) {
+ return nil
+ }
+
+ return err
+ }
+
+ storagelog.Write(c.log,
+ storagelog.AddressField(addr),
+ storagelog.StorageTypeField(wcStorageType),
+ storagelog.OpField("fstree DELETE"),
+ )
+ c.objCounters.decFS(size)
+
+ return nil
+}
diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go
index c42bda255..87799b643 100644
--- a/pkg/local_object_storage/writecache/flush_test.go
+++ b/pkg/local_object_storage/writecache/flush_test.go
@@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
- "time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@@ -106,14 +105,6 @@ func TestFlush(t *testing.T) {
wc, bs, mb := newCache(t)
objects := putObjects(t, wc)
- require.NoError(t, bs.SetMode(mode.ReadWrite))
- require.NoError(t, mb.SetMode(mode.ReadWrite))
-
- wc.flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.flushed.Add(objects[1].addr.EncodeToString(), false)
-
- require.NoError(t, wc.Flush(context.Background(), false))
-
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
@@ -124,25 +115,10 @@ func TestFlush(t *testing.T) {
require.Error(t, err)
}
- check(t, mb, bs, objects[2:])
- })
-
- t.Run("flush on moving to degraded mode", func(t *testing.T) {
- wc, bs, mb := newCache(t)
- objects := putObjects(t, wc)
-
- // Blobstor is read-only, so we expect en error from `flush` here.
- require.Error(t, wc.SetMode(mode.Degraded))
-
- // First move to read-only mode to close background workers.
- require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
- wc.flushed.Add(objects[0].addr.EncodeToString(), true)
- wc.flushed.Add(objects[1].addr.EncodeToString(), false)
-
- require.NoError(t, wc.SetMode(mode.Degraded))
+ require.NoError(t, wc.Flush(context.Background(), false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
@@ -151,7 +127,40 @@ func TestFlush(t *testing.T) {
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
+ require.NoError(t, err)
+ }
+
+ check(t, mb, bs, objects[2:])
+ })
+
+ t.Run("flush on moving to degraded mode", func(t *testing.T) {
+ wc, bs, mb := newCache(t)
+ objects := putObjects(t, wc)
+
+ // Moving to the degraded mode is called with `ignoreErrors` so
+ // we do not expect an error from `flush` here.
+ require.NoError(t, wc.SetMode(mode.Degraded))
+
+ // bs is read-only; so is can't get the objects
+ for i := 0; i < 2; i++ {
+ var mPrm meta.GetPrm
+ mPrm.SetAddress(objects[i].addr)
+ _, err := mb.Get(context.Background(), mPrm)
require.Error(t, err)
+
+ _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
+ require.Error(t, err)
+ }
+
+ require.NoError(t, wc.SetMode(mode.ReadWrite))
+ require.NoError(t, bs.SetMode(mode.ReadWrite))
+ require.NoError(t, mb.SetMode(mode.ReadWrite))
+
+ require.NoError(t, wc.SetMode(mode.Degraded))
+
+ for i := 0; i < 2; i++ {
+ _, err := bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
+ require.NoError(t, err)
}
check(t, mb, bs, objects[2:])
@@ -166,7 +175,6 @@ func TestFlush(t *testing.T) {
objects := putObjects(t, wc)
f(wc)
- require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
@@ -224,7 +232,7 @@ func TestFlush(t *testing.T) {
})
})
- t.Run("on init", func(t *testing.T) {
+ t.Run("flush", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
@@ -260,9 +268,6 @@ func TestFlush(t *testing.T) {
_, err = mb.Delete(context.Background(), deletePrm)
require.NoError(t, err)
- require.NoError(t, bs.SetMode(mode.ReadOnly))
- require.NoError(t, mb.SetMode(mode.ReadOnly))
-
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
@@ -275,13 +280,17 @@ func TestFlush(t *testing.T) {
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
+
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
- if i < 2 {
- require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
- } else {
- require.NoError(t, err, i)
- }
+ require.NoError(t, err, i)
+ }
+
+ require.NoError(t, wc.Flush(context.Background(), true))
+
+ for i := range objects {
+ _, err := wc.Get(context.Background(), objects[i].addr)
+ require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
}
})
}
@@ -321,10 +330,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc *Cache) {
require.NoError(t, wc.Init())
-
- require.Eventually(t, func() bool {
- return wc.initialized.Load()
- }, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}
diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go
index e303b5d3d..71d05538e 100644
--- a/pkg/local_object_storage/writecache/get.go
+++ b/pkg/local_object_storage/writecache/get.go
@@ -30,7 +30,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
- c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@@ -39,7 +38,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
- c.flushed.Get(saddr)
return res.Object, nil
}
diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go
index 28f1249f2..89c1bf351 100644
--- a/pkg/local_object_storage/writecache/init.go
+++ b/pkg/local_object_storage/writecache/init.go
@@ -2,191 +2,33 @@ package writecache
import (
"context"
- "errors"
- "sync"
+ "fmt"
- "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
- "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
- storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
- meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
- apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
- oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
- "go.etcd.io/bbolt"
- "go.uber.org/zap"
+ "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
+ "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
-func (c *Cache) initFlushMarks(ctx context.Context) {
- var localWG sync.WaitGroup
+// Init runs necessary services.
+func (c *Cache) Init() error {
+ ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
+ defer span.End()
- localWG.Add(1)
- go func() {
- defer localWG.Done()
+ c.modeMtx.Lock()
+ defer c.modeMtx.Unlock()
- c.fsTreeFlushMarkUpdate(ctx)
- }()
-
- localWG.Add(1)
- go func() {
- defer localWG.Done()
-
- c.dbFlushMarkUpdate(ctx)
- }()
-
- c.initWG.Add(1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- defer c.initWG.Done()
-
- localWG.Wait()
-
- select {
- case <-c.stopInitCh:
- return
- case <-c.closeCh:
- return
- default:
- }
-
- c.initialized.Store(true)
- }()
-}
-
-var errStopIter = errors.New("stop iteration")
-
-func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) {
- c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
-
- var prm common.IteratePrm
- prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
- select {
- case <-c.closeCh:
- return errStopIter
- case <-c.stopInitCh:
- return errStopIter
- default:
- }
-
- flushed, needRemove := c.flushStatus(ctx, addr)
- if flushed {
- c.store.flushed.Add(addr.EncodeToString(), true)
- if needRemove {
- var prm common.DeletePrm
- prm.Address = addr
-
- _, err := c.fsTree.Delete(ctx, prm)
- if err == nil {
- storagelog.Write(c.log,
- storagelog.AddressField(addr),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("fstree DELETE"),
- )
- }
- }
- }
+ if c.mode.NoMetabase() {
return nil
}
- c.modeMtx.RLock()
- defer c.modeMtx.RUnlock()
-
- _, _ = c.fsTree.Iterate(prm)
-
- c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
-}
-
-func (c *Cache) dbFlushMarkUpdate(ctx context.Context) {
- c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
-
- c.modeMtx.RLock()
- defer c.modeMtx.RUnlock()
-
- var m []string
- var indices []int
- var lastKey []byte
- var batchSize = flushBatchSize
- for {
- select {
- case <-c.closeCh:
- return
- case <-c.stopInitCh:
- return
- default:
- }
-
- m = m[:0]
- indices = indices[:0]
-
- // We put objects in batches of fixed size to not interfere with main put cycle a lot.
- _ = c.db.View(func(tx *bbolt.Tx) error {
- b := tx.Bucket(defaultBucket)
- cs := b.Cursor()
- for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
- m = append(m, string(k))
- }
- return nil
- })
-
- var addr oid.Address
- for i := range m {
- if err := addr.DecodeString(m[i]); err != nil {
- continue
- }
-
- flushed, needRemove := c.flushStatus(ctx, addr)
- if flushed {
- c.store.flushed.Add(addr.EncodeToString(), true)
- if needRemove {
- indices = append(indices, i)
- }
- }
- }
-
- if len(m) == 0 {
- break
- }
-
- err := c.db.Batch(func(tx *bbolt.Tx) error {
- b := tx.Bucket(defaultBucket)
- for _, j := range indices {
- if err := b.Delete([]byte(m[j])); err != nil {
- return err
- }
- }
- return nil
- })
- if err == nil {
- for _, j := range indices {
- storagelog.Write(c.log,
- zap.String("address", m[j]),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("db DELETE"),
- )
- }
- }
- lastKey = append([]byte(m[len(m)-1]), 0)
- }
-
- c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
-}
-
-// flushStatus returns info about the object state in the main storage.
-// First return value is true iff object exists.
-// Second return value is true iff object can be safely removed.
-func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
- var existsPrm meta.ExistsPrm
- existsPrm.SetAddress(addr)
-
- _, err := c.metabase.Exists(ctx, existsPrm)
+ err := c.initCounters(ctx)
if err != nil {
- needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
- return needRemove, needRemove
+ return fmt.Errorf("initializing write-cache size: %w", err)
}
- var prm meta.StorageIDPrm
- prm.SetAddress(addr)
+ if c.mode == mode.ReadWrite {
+ c.workersChan = make(chan struct{})
+ c.runFlushLoop()
+ }
- mRes, _ := c.metabase.StorageID(ctx, prm)
- res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
- return err == nil && res.Exists, false
+ return nil
}
diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go
index be7c37eab..945fe08fc 100644
--- a/pkg/local_object_storage/writecache/iterate.go
+++ b/pkg/local_object_storage/writecache/iterate.go
@@ -41,9 +41,6 @@ func (c *Cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
- if _, ok := c.flushed.Peek(string(k)); ok {
- return nil
- }
return prm.handler(data)
})
})
@@ -54,9 +51,6 @@ func (c *Cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
- if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
- return nil
- }
data, err := f()
if err != nil {
if prm.ignoreErrors {
diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go
index 681cd84aa..b84f2942e 100644
--- a/pkg/local_object_storage/writecache/mode.go
+++ b/pkg/local_object_storage/writecache/mode.go
@@ -16,9 +16,6 @@ import (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
-// ErrNotInitialized is returned when write-cache is initializing.
-var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
-
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
@@ -35,29 +32,28 @@ func (c *Cache) SetMode(m mode.Mode) error {
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
var err error
- turnOffMeta := m.NoMetabase()
-
- if !c.initialized.Load() {
- close(c.stopInitCh)
-
- c.initWG.Wait()
- c.stopInitCh = make(chan struct{})
-
- defer func() {
- if err == nil && !turnOffMeta {
- c.initFlushMarks(ctx)
- }
- }()
- }
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
- if turnOffMeta && !c.mode.NoMetabase() {
+ var workersActive bool
+ select {
+ case <-c.workersChan:
+ default:
+ workersActive = true
+ }
+
+ stopWorkers := m.NoMetabase() && !c.mode.NoMetabase() || c.mode.ReadWrite() && !m.ReadWrite()
+ if stopWorkers {
err = c.flush(ctx, true)
if err != nil {
return err
}
+
+ if workersActive {
+ close(c.workersChan)
+ c.wg.Wait()
+ }
}
if c.db != nil {
@@ -67,14 +63,14 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
}
// Suspend producers to ensure there are channel send operations in fly.
- // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
+ // smallFlushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations.
- for len(c.flushCh) != 0 {
+ for len(c.smallFlushCh) != 0 {
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
time.Sleep(time.Second)
}
- if turnOffMeta {
+ if m.NoMetabase() {
c.mode = m
return nil
}
@@ -84,6 +80,16 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
}
c.mode = m
+
+ if m == mode.ReadWrite {
+ select {
+ case <-c.workersChan:
+ c.workersChan = make(chan struct{})
+ c.runFlushLoop()
+ default:
+ }
+ }
+
return nil
}
diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go
index 3434e9355..1d3f6bc5b 100644
--- a/pkg/local_object_storage/writecache/options.go
+++ b/pkg/local_object_storage/writecache/options.go
@@ -48,8 +48,6 @@ type options struct {
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
// 1 GiB by default.
maxCacheSize uint64
- // objCounters contains atomic counters for the number of objects stored in cache.
- objCounters counters
// maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database.
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index 1b513854e..f1b762871 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -3,6 +3,7 @@ package writecache
import (
"context"
"errors"
+ "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@@ -37,8 +38,6 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
- } else if !c.initialized.Load() {
- return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))
@@ -52,23 +51,42 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
data: prm.RawData,
}
- if sz <= c.smallObjectSize {
- return common.PutRes{}, c.putSmall(oi)
+ if c.maxCacheSize < c.sizeIfAdd(sz) {
+ return common.PutRes{}, ErrOutOfSpace
}
- return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
+
+ if sz <= c.smallObjectSize {
+ err := c.putSmall(oi)
+ if err != nil {
+ err = fmt.Errorf("could not put small object to DB: %w", err)
+ }
+
+ return common.PutRes{}, err
+ }
+
+ err := c.putBig(ctx, oi.addr, prm)
+ if err != nil {
+ err = fmt.Errorf("could not put big object to FSTree: %w", err)
+ }
+
+ return common.PutRes{}, err
}
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *Cache) putSmall(obj objectInfo) error {
- cacheSize := c.estimateCacheSize()
- if c.maxCacheSize < c.incSizeDB(cacheSize) {
- return ErrOutOfSpace
- }
+ var alreadyExists bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
- return b.Put([]byte(obj.addr), obj.data)
+ addr := []byte(obj.addr)
+
+ alreadyExists = len(b.Get(addr)) != 0
+ if alreadyExists {
+ return nil
+ }
+
+ return b.Put(addr, obj.data)
})
if err == nil {
storagelog.Write(c.log,
@@ -76,29 +94,22 @@ func (c *Cache) putSmall(obj objectInfo) error {
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
- c.objCounters.IncDB()
+
+ if !alreadyExists {
+ c.objCounters.incDB(len(obj.data))
+ }
}
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
- cacheSz := c.estimateCacheSize()
- if c.maxCacheSize < c.incSizeFS(cacheSz) {
- return ErrOutOfSpace
- }
-
_, err := c.fsTree.Put(ctx, prm)
if err != nil {
return err
}
- if c.blobstor.NeedsCompression(prm.Object) {
- c.mtx.Lock()
- c.compressFlags[addr] = struct{}{}
- c.mtx.Unlock()
- }
- c.objCounters.IncFS()
+ c.objCounters.incFS(len(prm.RawData))
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go
index 99f496487..c4fea0cb2 100644
--- a/pkg/local_object_storage/writecache/state.go
+++ b/pkg/local_object_storage/writecache/state.go
@@ -1,72 +1,134 @@
package writecache
import (
+ "context"
+ "errors"
"fmt"
+ "sync"
+ "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
+ oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
)
-func (c *Cache) estimateCacheSize() uint64 {
- return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
-}
-
-func (c *Cache) incSizeDB(sz uint64) uint64 {
- return sz + c.smallObjectSize
-}
-
-func (c *Cache) incSizeFS(sz uint64) uint64 {
- return sz + c.maxObjectSize
+func (c *Cache) sizeIfAdd(delta uint64) uint64 {
+ return delta + c.objCounters.fstreeSize.Load() + c.objCounters.dbSize.Load()
}
type counters struct {
- cDB, cFS atomic.Uint64
+ dbSize, fstreeSize atomic.Uint64
}
-func (x *counters) IncDB() {
- x.cDB.Inc()
+func (x *counters) incDB(delta int) {
+ x.dbSize.Add(uint64(delta))
}
-func (x *counters) DecDB() {
- x.cDB.Dec()
+func (x *counters) decDB(delta int) {
+ x.dbSize.Sub(uint64(delta))
}
-func (x *counters) DB() uint64 {
- return x.cDB.Load()
+func (x *counters) incFS(delta int) {
+ x.fstreeSize.Add(uint64(delta))
}
-func (x *counters) IncFS() {
- x.cFS.Inc()
+func (x *counters) decFS(delta int) {
+ x.fstreeSize.Sub(uint64(delta))
}
-func (x *counters) DecFS() {
- x.cFS.Dec()
+func (c *Cache) initCounters(ctx context.Context) error {
+ var wg sync.WaitGroup
+ var dbErr error
+ var fsErr error
+
+ wg.Add(1)
+ go func() {
+ dbErr = c.initDBSizeCounter(ctx)
+ wg.Done()
+ }()
+
+ wg.Add(1)
+ go func() {
+ fsErr = c.initFSSizeCounter(ctx)
+ wg.Done()
+ }()
+
+ wg.Wait()
+
+ switch {
+ case dbErr != nil:
+ return fmt.Errorf("database counter initialization: %w", dbErr)
+ case fsErr != nil:
+ return fmt.Errorf("FSTree counter initialization: %w", fsErr)
+ default:
+ return nil
+ }
}
-func (x *counters) FS() uint64 {
- return x.cFS.Load()
-}
+var stopIter = errors.New("stop")
-func (c *Cache) initCounters() error {
- var inDB uint64
+func (c *Cache) initDBSizeCounter(ctx context.Context) error {
+ var inDB int
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
- if b != nil {
- inDB = uint64(b.Stats().KeyN)
+ if b == nil {
+ return nil
}
- return nil
+
+ return b.ForEach(func(_, v []byte) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.workersChan:
+ return stopIter
+ default:
+ }
+
+ inDB += len(v)
+ return nil
+ })
})
- if err != nil {
+ if err != nil && !errors.Is(err, stopIter) {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
- inFS, err := c.fsTree.NumberOfObjects()
- if err != nil {
- return fmt.Errorf("could not read write-cache FS counter: %w", err)
- }
-
- c.objCounters.cDB.Store(inDB)
- c.objCounters.cFS.Store(inFS)
+ c.objCounters.dbSize.Store(uint64(inDB))
+
+ return nil
+}
+
+func (c *Cache) initFSSizeCounter(ctx context.Context) error {
+ var inFSTree int
+
+ var prm common.IteratePrm
+ prm.LazyHandler = func(address oid.Address, f func() ([]byte, error)) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.workersChan:
+ return stopIter
+ default:
+ }
+
+ data, err := f()
+ if err != nil {
+ return err
+ }
+
+ // write-cache is a temporary storage on a fast disk,
+ // so it is not expected to be configured with any
+ // compressor ever
+ inFSTree += len(data)
+
+ return nil
+ }
+
+ _, err := c.fsTree.Iterate(prm)
+ if err != nil && !errors.Is(err, stopIter) {
+ return fmt.Errorf("could not read write-cache FSTree counter: %w", err)
+ }
+
+ c.objCounters.fstreeSize.Store(uint64(inFSTree))
return nil
}
diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go
index 93f4cfd80..a44a9e956 100644
--- a/pkg/local_object_storage/writecache/storage.go
+++ b/pkg/local_object_storage/writecache/storage.go
@@ -1,40 +1,18 @@
package writecache
import (
- "context"
- "errors"
"fmt"
"os"
- "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
- "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
- storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
- apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
- oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
- lru "github.com/hashicorp/golang-lru/v2"
- "github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
- "go.uber.org/zap"
)
-// store represents persistent storage with in-memory LRU cache
+// smallStore represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
-type store struct {
- maxFlushedMarksCount int
- maxRemoveBatchSize int
-
- // flushed contains addresses of objects that were already flushed to the main storage.
- // We use LRU cache instead of map here to facilitate removing of unused object in favour of
- // frequently read ones.
- // MUST NOT be used inside bolt db transaction because it's eviction handler
- // removes untracked items from the database.
- flushed simplelru.LRUCache[string, bool]
- db *bbolt.DB
-
- dbKeysToRemove []string
- fsKeysToRemove []string
+type smallStore struct {
+ db *bbolt.DB
}
const dbName = "small.bolt"
@@ -69,101 +47,9 @@ func (c *Cache) openStore(readOnly bool) error {
fstree.WithDepth(1),
fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync))
- if err := c.fsTree.Open(readOnly); err != nil {
+ if err = c.fsTree.Open(readOnly); err != nil {
return fmt.Errorf("could not open FSTree: %w", err)
}
- // Write-cache can be opened multiple times during `SetMode`.
- // flushed map must not be re-created in this case.
- if c.flushed == nil {
- c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
- }
-
- c.initialized.Store(false)
-
return nil
}
-
-// removeFlushed removes an object from the writecache.
-// To minimize interference with the client operations, the actual removal
-// is done in batches.
-// It is not thread-safe and is used only as an evict callback to LRU cache.
-func (c *Cache) removeFlushed(key string, value bool) {
- fromDatabase := value
- if fromDatabase {
- c.dbKeysToRemove = append(c.dbKeysToRemove, key)
- } else {
- c.fsKeysToRemove = append(c.fsKeysToRemove, key)
- }
-
- if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
- c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
- c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
- }
-}
-
-func (c *Cache) deleteFromDB(keys []string) []string {
- if len(keys) == 0 {
- return keys
- }
-
- var errorIndex int
- err := c.db.Batch(func(tx *bbolt.Tx) error {
- b := tx.Bucket(defaultBucket)
- for errorIndex = range keys {
- if err := b.Delete([]byte(keys[errorIndex])); err != nil {
- return err
- }
- }
- return nil
- })
- for i := 0; i < errorIndex; i++ {
- c.objCounters.DecDB()
- storagelog.Write(c.log,
- storagelog.AddressField(keys[i]),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("db DELETE"),
- )
- }
- if err != nil {
- c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
- }
-
- copy(keys, keys[errorIndex:])
- return keys[:len(keys)-errorIndex]
-}
-
-func (c *Cache) deleteFromDisk(keys []string) []string {
- if len(keys) == 0 {
- return keys
- }
-
- var copyIndex int
- var addr oid.Address
-
- for i := range keys {
- if err := addr.DecodeString(keys[i]); err != nil {
- c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
- continue
- }
-
- _, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
- if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
- c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
-
- // Save the key for the next iteration.
- keys[copyIndex] = keys[i]
- copyIndex++
- continue
- } else if err == nil {
- storagelog.Write(c.log,
- storagelog.AddressField(keys[i]),
- storagelog.StorageTypeField(wcStorageType),
- storagelog.OpField("fstree DELETE"),
- )
- c.objCounters.DecFS()
- }
- }
-
- return keys[:copyIndex]
-}
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index a58d2edbb..3f13b0b1c 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -5,13 +5,11 @@ import (
"os"
"sync"
- "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
- "go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -25,27 +23,21 @@ type Info struct {
type Cache struct {
options
- // mtx protects statistics, counters and compressFlags.
- mtx sync.RWMutex
+ // objCounters contains atomic counters for the number of objects stored in cache.
+ objCounters counters
- mode mode.Mode
- initialized atomic.Bool
- stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
- initWG sync.WaitGroup // for initialisation routines only
- modeMtx sync.RWMutex
-
- // compressFlags maps address of a big object to boolean value indicating
- // whether object should be compressed.
- compressFlags map[string]struct{}
+ modeMtx sync.RWMutex
+ mode mode.Mode
// flushCh is a channel with objects to flush.
- flushCh chan *object.Object
- // closeCh is close channel, protected by modeMtx.
- closeCh chan struct{}
+ smallFlushCh chan objWithData
+ // workersChan is close channel, protected by modeMtx.
+ // It indicates status of the background workers.
+ workersChan chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
- store
+ smallStore
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
}
@@ -70,13 +62,16 @@ var (
)
// New creates new writecache instance.
+// The value must not be copied after creation.
func New(opts ...Option) *Cache {
- c := &Cache{
- flushCh: make(chan *object.Object),
- mode: mode.ReadWrite,
- stopInitCh: make(chan struct{}),
+ closeCh := make(chan struct{})
+ close(closeCh)
+
+ c := &Cache{
+ smallFlushCh: make(chan objWithData),
+ mode: mode.ReadWrite,
+ workersChan: closeCh,
- compressFlags: make(map[string]struct{}),
options: options{
log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize,
@@ -93,12 +88,6 @@ func New(opts ...Option) *Cache {
opts[i](&c.options)
}
- // Make the LRU cache contain which take approximately 3/4 of the maximum space.
- // Assume small and big objects are stored in 50-50 proportion.
- c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
- // Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
- c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
-
return c
}
@@ -120,40 +109,31 @@ func (c *Cache) Open(readOnly bool) error {
return err
}
- // Opening after Close is done during maintenance mode,
- // thus we need to create a channel here.
- c.closeCh = make(chan struct{})
+ c.modeMtx.Lock()
+ defer c.modeMtx.Unlock()
- return c.initCounters()
-}
+ if readOnly {
+ c.mode = mode.ReadOnly
+ } else {
+ c.mode = mode.ReadWrite
+ }
-// Init runs necessary services.
-func (c *Cache) Init() error {
- ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
- defer span.End()
-
- c.initFlushMarks(ctx)
- c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *Cache) Close() error {
- // Finish all in-progress operations.
- if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
- return err
+ // Finish all in-progress operations if they are
+ // in progress.
+ select {
+ case <-c.workersChan:
+ default:
+ err := c.setMode(context.TODO(), mode.ReadOnly)
+ if err != nil {
+ return err
+ }
}
- if c.closeCh != nil {
- close(c.closeCh)
- }
- c.wg.Wait()
- if c.closeCh != nil {
- c.closeCh = nil
- }
-
- c.initialized.Store(false)
-
var err error
if c.db != nil {
err = c.db.Close()
--
2.45.2