From 77b1b80e73507027575542e8c370157b749872b5 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 5 May 2023 18:36:12 +0300 Subject: [PATCH 1/4] [#314] wc: Drop WC interface It is not used for testing, its absence does not break build. The only one implementation is placed in the same package. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/shard/get.go | 4 +- pkg/local_object_storage/shard/range.go | 2 +- pkg/local_object_storage/shard/shard.go | 2 +- pkg/local_object_storage/writecache/delete.go | 2 +- pkg/local_object_storage/writecache/flush.go | 18 ++++----- .../writecache/flush_test.go | 31 +++++++------- pkg/local_object_storage/writecache/get.go | 4 +- pkg/local_object_storage/writecache/init.go | 8 ++-- .../writecache/iterate.go | 2 +- pkg/local_object_storage/writecache/mode.go | 6 +-- pkg/local_object_storage/writecache/put.go | 6 +-- pkg/local_object_storage/writecache/state.go | 8 ++-- .../writecache/storage.go | 8 ++-- .../writecache/writecache.go | 40 ++++--------------- 14 files changed, 58 insertions(+), 83 deletions(-) diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 5268ac790..0d400a7ff 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -91,7 +91,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { return res.Object, nil } - wc := func(c writecache.Cache) (*objectSDK.Object, error) { + wc := func(c *writecache.Cache) (*objectSDK.Object, error) { return c.Get(ctx, prm.addr) } @@ -109,7 +109,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { var emptyStorageID = make([]byte, 0) // fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { +func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w *writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { var ( mErr error mRes meta.ExistsRes diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 06aea2f8a..b80adcc93 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -104,7 +104,7 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { return obj, nil } - wc := func(c writecache.Cache) (*object.Object, error) { + wc := func(c *writecache.Cache) (*object.Object, error) { res, err := c.Get(ctx, prm.addr) if err != nil { return nil, err diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 44ec54645..cd82b521c 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -23,7 +23,7 @@ type Shard struct { gc *gc - writeCache writecache.Cache + writeCache *writecache.Cache blobStor *blobstor.BlobStor diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index c1aab9e5a..f0d3cfbf5 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -15,7 +15,7 @@ import ( // Delete removes object from write-cache. // // Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. -func (c *cache) Delete(ctx context.Context, addr oid.Address) error { +func (c *Cache) Delete(ctx context.Context, addr oid.Address) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete", trace.WithAttributes( attribute.String("address", addr.EncodeToString()), diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 04fcccede..91052e9f0 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -34,7 +34,7 @@ const ( ) // runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop() { +func (c *Cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.flushWorker(i) @@ -65,7 +65,7 @@ func (c *cache) runFlushLoop() { }() } -func (c *cache) flushDB() { +func (c *Cache) flushDB() { var lastKey []byte var m []objectInfo for { @@ -148,7 +148,7 @@ func (c *cache) flushDB() { } } -func (c *cache) flushBigObjects(ctx context.Context) { +func (c *Cache) flushBigObjects(ctx context.Context) { tick := time.NewTicker(defaultFlushInterval * 10) for { select { @@ -171,7 +171,7 @@ func (c *cache) flushBigObjects(ctx context.Context) { } } -func (c *cache) reportFlushError(msg string, addr string, err error) { +func (c *Cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) } else { @@ -181,7 +181,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) { } } -func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { +func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { var prm common.IteratePrm prm.IgnoreErrors = ignoreErrors prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { @@ -229,7 +229,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { } // flushWorker writes objects to the main storage. -func (c *cache) flushWorker(_ int) { +func (c *Cache) flushWorker(_ int) { defer c.wg.Done() var obj *object.Object @@ -249,7 +249,7 @@ func (c *cache) flushWorker(_ int) { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error { +func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error { addr := objectCore.AddressOf(obj) var prm common.PutPrm @@ -281,7 +281,7 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte // Flush flushes all objects from the write-cache to the main storage. // Write-cache must be in readonly mode to ensure correctness of an operation and // to prevent interference with background flush workers. -func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { +func (c *Cache) Flush(ctx context.Context, ignoreErrors bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), @@ -294,7 +294,7 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { return c.flush(ctx, ignoreErrors) } -func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { +func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error { if err := c.flushFSTree(ctx, ignoreErrors); err != nil { return err } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 2cec07081..c42bda255 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -39,7 +39,7 @@ func TestFlush(t *testing.T) { smallSize = 256 ) - newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { + newCache := func(t *testing.T, opts ...Option) (*Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() mb := meta.New( meta.WithPath(filepath.Join(dir, "meta")), @@ -76,7 +76,7 @@ func TestFlush(t *testing.T) { return wc, bs, mb } - putObjects := func(t *testing.T, c Cache) []objectPair { + putObjects := func(t *testing.T, c *Cache) []objectPair { objects := make([]objectPair, objCount) for i := range objects { objects[i] = putObject(t, c, 1+(i%2)*smallSize) @@ -109,8 +109,8 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) + wc.flushed.Add(objects[0].addr.EncodeToString(), true) + wc.flushed.Add(objects[1].addr.EncodeToString(), false) require.NoError(t, wc.Flush(context.Background(), false)) @@ -139,8 +139,8 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) + wc.flushed.Add(objects[0].addr.EncodeToString(), true) + wc.flushed.Add(objects[1].addr.EncodeToString(), false) require.NoError(t, wc.SetMode(mode.Degraded)) @@ -158,13 +158,13 @@ func TestFlush(t *testing.T) { }) t.Run("ignore errors", func(t *testing.T) { - testIgnoreErrors := func(t *testing.T, f func(*cache)) { + testIgnoreErrors := func(t *testing.T, f func(*Cache)) { var errCount atomic.Uint32 wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) { errCount.Inc() })) objects := putObjects(t, wc) - f(wc.(*cache)) + f(wc) require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) @@ -178,7 +178,7 @@ func TestFlush(t *testing.T) { check(t, mb, bs, objects) } t.Run("db, invalid address", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { + testIgnoreErrors(t, func(c *Cache) { _, data := newObject(t, 1) require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) @@ -187,7 +187,7 @@ func TestFlush(t *testing.T) { }) }) t.Run("db, invalid object", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { + testIgnoreErrors(t, func(c *Cache) { require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) @@ -195,7 +195,7 @@ func TestFlush(t *testing.T) { }) }) t.Run("fs, read error", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { + testIgnoreErrors(t, func(c *Cache) { obj, data := newObject(t, 1) var prm common.PutPrm @@ -214,7 +214,7 @@ func TestFlush(t *testing.T) { }) }) t.Run("fs, invalid object", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { + testIgnoreErrors(t, func(c *Cache) { var prm common.PutPrm prm.Address = oidtest.Address() prm.RawData = []byte{1, 2, 3} @@ -286,7 +286,7 @@ func TestFlush(t *testing.T) { }) } -func putObject(t *testing.T, c Cache, size int) objectPair { +func putObject(t *testing.T, c *Cache, size int) objectPair { obj, data := newObject(t, size) var prm common.PutPrm @@ -319,12 +319,11 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) { return obj, data } -func initWC(t *testing.T, wc Cache) { +func initWC(t *testing.T, wc *Cache) { require.NoError(t, wc.Init()) require.Eventually(t, func() bool { - rawWc := wc.(*cache) - return rawWc.initialized.Load() + return wc.initialized.Load() }, 100*time.Second, 1*time.Millisecond) } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 6af1bd181..e303b5d3d 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -18,7 +18,7 @@ import ( // Get returns object from write-cache. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { +func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { saddr := addr.EncodeToString() ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get", @@ -46,7 +46,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e // Head returns object header from write-cache. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { +func (c *Cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head", trace.WithAttributes( attribute.String("address", addr.EncodeToString()), diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 2ca8cceef..28f1249f2 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" ) -func (c *cache) initFlushMarks(ctx context.Context) { +func (c *Cache) initFlushMarks(ctx context.Context) { var localWG sync.WaitGroup localWG.Add(1) @@ -54,7 +54,7 @@ func (c *cache) initFlushMarks(ctx context.Context) { var errStopIter = errors.New("stop iteration") -func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) { +func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) { c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree) var prm common.IteratePrm @@ -95,7 +95,7 @@ func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) { c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks) } -func (c *cache) dbFlushMarkUpdate(ctx context.Context) { +func (c *Cache) dbFlushMarkUpdate(ctx context.Context) { c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase) c.modeMtx.RLock() @@ -173,7 +173,7 @@ func (c *cache) dbFlushMarkUpdate(ctx context.Context) { // flushStatus returns info about the object state in the main storage. // First return value is true iff object exists. // Second return value is true iff object can be safely removed. -func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) { +func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) { var existsPrm meta.ExistsPrm existsPrm.SetAddress(addr) diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 228dd2597..be7c37eab 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -31,7 +31,7 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) { // Iterate iterates over all objects present in write cache. // This is very difficult to do correctly unless write-cache is put in read-only mode. // Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results. -func (c *cache) Iterate(prm IterationPrm) error { +func (c *Cache) Iterate(prm IterationPrm) error { c.modeMtx.RLock() defer c.modeMtx.RUnlock() if !c.readOnly() { diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 14f8af49e..681cd84aa 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -22,7 +22,7 @@ var ErrNotInitialized = logicerr.New("write-cache is not initialized yet") // SetMode sets write-cache mode of operation. // When shard is put in read-only mode all objects in memory are flushed to disk // and all background jobs are suspended. -func (c *cache) SetMode(m mode.Mode) error { +func (c *Cache) SetMode(m mode.Mode) error { ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", trace.WithAttributes( attribute.String("mode", m.String()), @@ -33,7 +33,7 @@ func (c *cache) SetMode(m mode.Mode) error { } // setMode applies new mode. Must be called with cache.modeMtx lock taken. -func (c *cache) setMode(ctx context.Context, m mode.Mode) error { +func (c *Cache) setMode(ctx context.Context, m mode.Mode) error { var err error turnOffMeta := m.NoMetabase() @@ -89,6 +89,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error { // readOnly returns true if current mode is read-only. // `c.modeMtx` must be taken. -func (c *cache) readOnly() bool { +func (c *Cache) readOnly() bool { return c.mode.ReadOnly() } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index e2535d9e2..d5525a5a9 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -25,7 +25,7 @@ var ( // Returns ErrNotInitialized if write-cache has not been initialized yet. // Returns ErrOutOfSpace if saving an object leads to WC's size overflow. // Returns ErrBigObject if an objects exceeds maximum object size. -func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { +func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put", trace.WithAttributes( attribute.String("address", prm.Address.EncodeToString()), @@ -60,7 +60,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro // putSmall persists small objects to the write-cache database and // pushes the to the flush workers queue. -func (c *cache) putSmall(obj objectInfo) error { +func (c *Cache) putSmall(obj objectInfo) error { cacheSize := c.estimateCacheSize() if c.maxCacheSize < c.incSizeDB(cacheSize) { return ErrOutOfSpace @@ -82,7 +82,7 @@ func (c *cache) putSmall(obj objectInfo) error { } // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { +func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { cacheSz := c.estimateCacheSize() if c.maxCacheSize < c.incSizeFS(cacheSz) { return ErrOutOfSpace diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 1ba5a4bd3..99f496487 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -7,15 +7,15 @@ import ( "go.uber.org/atomic" ) -func (c *cache) estimateCacheSize() uint64 { +func (c *Cache) estimateCacheSize() uint64 { return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize } -func (c *cache) incSizeDB(sz uint64) uint64 { +func (c *Cache) incSizeDB(sz uint64) uint64 { return sz + c.smallObjectSize } -func (c *cache) incSizeFS(sz uint64) uint64 { +func (c *Cache) incSizeFS(sz uint64) uint64 { return sz + c.maxObjectSize } @@ -47,7 +47,7 @@ func (x *counters) FS() uint64 { return x.cFS.Load() } -func (c *cache) initCounters() error { +func (c *Cache) initCounters() error { var inDB uint64 err := c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index aeae752e3..93f4cfd80 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -39,7 +39,7 @@ type store struct { const dbName = "small.bolt" -func (c *cache) openStore(readOnly bool) error { +func (c *Cache) openStore(readOnly bool) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { return err @@ -88,7 +88,7 @@ func (c *cache) openStore(readOnly bool) error { // To minimize interference with the client operations, the actual removal // is done in batches. // It is not thread-safe and is used only as an evict callback to LRU cache. -func (c *cache) removeFlushed(key string, value bool) { +func (c *Cache) removeFlushed(key string, value bool) { fromDatabase := value if fromDatabase { c.dbKeysToRemove = append(c.dbKeysToRemove, key) @@ -102,7 +102,7 @@ func (c *cache) removeFlushed(key string, value bool) { } } -func (c *cache) deleteFromDB(keys []string) []string { +func (c *Cache) deleteFromDB(keys []string) []string { if len(keys) == 0 { return keys } @@ -133,7 +133,7 @@ func (c *cache) deleteFromDB(keys []string) []string { return keys[:len(keys)-errorIndex] } -func (c *cache) deleteFromDisk(keys []string) []string { +func (c *Cache) deleteFromDisk(keys []string) []string { if len(keys) == 0 { return keys } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index bdcc9bbf6..a58d2edbb 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -6,12 +6,10 @@ import ( "sync" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.uber.org/atomic" "go.uber.org/zap" @@ -24,29 +22,7 @@ type Info struct { } // Cache represents write-cache for objects. -type Cache interface { - Get(ctx context.Context, address oid.Address) (*object.Object, error) - Head(context.Context, oid.Address) (*object.Object, error) - // Delete removes object referenced by the given oid.Address from the - // Cache. Returns any error encountered that prevented the object to be - // removed. - // - // Returns apistatus.ObjectNotFound if object is missing in the Cache. - // Returns ErrReadOnly if the Cache is currently in the read-only mode. - Delete(context.Context, oid.Address) error - Iterate(IterationPrm) error - Put(context.Context, common.PutPrm) (common.PutRes, error) - SetMode(mode.Mode) error - SetLogger(*logger.Logger) - DumpInfo() Info - Flush(context.Context, bool) error - - Init() error - Open(readOnly bool) error - Close() error -} - -type cache struct { +type Cache struct { options // mtx protects statistics, counters and compressFlags. @@ -94,8 +70,8 @@ var ( ) // New creates new writecache instance. -func New(opts ...Option) Cache { - c := &cache{ +func New(opts ...Option) *Cache { + c := &Cache{ flushCh: make(chan *object.Object), mode: mode.ReadWrite, stopInitCh: make(chan struct{}), @@ -127,18 +103,18 @@ func New(opts ...Option) Cache { } // SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (c *cache) SetLogger(l *logger.Logger) { +func (c *Cache) SetLogger(l *logger.Logger) { c.log = l } -func (c *cache) DumpInfo() Info { +func (c *Cache) DumpInfo() Info { return Info{ Path: c.path, } } // Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(readOnly bool) error { +func (c *Cache) Open(readOnly bool) error { err := c.openStore(readOnly) if err != nil { return err @@ -152,7 +128,7 @@ func (c *cache) Open(readOnly bool) error { } // Init runs necessary services. -func (c *cache) Init() error { +func (c *Cache) Init() error { ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init") defer span.End() @@ -162,7 +138,7 @@ func (c *cache) Init() error { } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. -func (c *cache) Close() error { +func (c *Cache) Close() error { // Finish all in-progress operations. if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil { return err -- 2.45.2 From 1e6ffd45ad658c0648e21a975c6de52298a87689 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 5 May 2023 18:40:57 +0300 Subject: [PATCH 2/4] [#314] wc: Simplify background workers naming Also, drop not used arg. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/writecache/flush.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 91052e9f0..bdebad897 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -37,7 +37,7 @@ const ( func (c *Cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.flushWorker(i) + go c.smallObjectsFlusher() } c.wg.Add(1) @@ -56,7 +56,7 @@ func (c *Cache) runFlushLoop() { for { select { case <-tt.C: - c.flushDB() + c.flushSmallObjects() tt.Reset(defaultFlushInterval) case <-c.closeCh: return @@ -65,7 +65,7 @@ func (c *Cache) runFlushLoop() { }() } -func (c *Cache) flushDB() { +func (c *Cache) flushSmallObjects() { var lastKey []byte var m []objectInfo for { @@ -228,8 +228,8 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } -// flushWorker writes objects to the main storage. -func (c *Cache) flushWorker(_ int) { +// smallObjectsFlusher writes small objects to the main storage. +func (c *Cache) smallObjectsFlusher() { defer c.wg.Done() var obj *object.Object -- 2.45.2 From 99f76599e8ec1cdbe2fdb2862a7bd5a5eaffd305 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 5 May 2023 18:59:49 +0300 Subject: [PATCH 3/4] [#314] wc: Do not lose small objects on disk errors Do return error if an object could not been stored on WC's disk. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/writecache/put.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index d5525a5a9..1b513854e 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -78,7 +78,7 @@ func (c *Cache) putSmall(obj objectInfo) error { ) c.objCounters.IncDB() } - return nil + return err } // putBig writes object to FSTree and pushes it to the flush workers queue. -- 2.45.2 From 34544502dc0cfa5f9b26c34893929d67a1b3b7be Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 3 May 2023 12:11:36 +0300 Subject: [PATCH 4/4] [#314] wc: Simplify WC Do not use write-cache as a read cache: always remove objects from the WC, not only if an object hasn't been used for some time (LRU cache is dropped). Use object size (in bytes) as a metric of used space, not an approximate (and too inaccurate) maximum stored objects number. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/shard/mode/mode.go | 4 + pkg/local_object_storage/writecache/delete.go | 28 ++- pkg/local_object_storage/writecache/flush.go | 193 ++++++++++++++---- .../writecache/flush_test.go | 83 ++++---- pkg/local_object_storage/writecache/get.go | 2 - pkg/local_object_storage/writecache/init.go | 192 ++--------------- .../writecache/iterate.go | 6 - pkg/local_object_storage/writecache/mode.go | 48 +++-- .../writecache/options.go | 2 - pkg/local_object_storage/writecache/put.go | 55 +++-- pkg/local_object_storage/writecache/state.go | 136 ++++++++---- .../writecache/storage.go | 122 +---------- .../writecache/writecache.go | 86 +++----- 13 files changed, 436 insertions(+), 521 deletions(-) diff --git a/pkg/local_object_storage/shard/mode/mode.go b/pkg/local_object_storage/shard/mode/mode.go index 65b2b5c89..db11163c3 100644 --- a/pkg/local_object_storage/shard/mode/mode.go +++ b/pkg/local_object_storage/shard/mode/mode.go @@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool { func (m Mode) ReadOnly() bool { return m&ReadOnly != 0 } + +func (m Mode) ReadWrite() bool { + return m == 0 +} diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index f0d3cfbf5..5a4890075 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,10 +2,12 @@ package writecache import ( "context" + "errors" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" @@ -31,14 +33,14 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error { saddr := addr.EncodeToString() // Check disk cache. - var has int + var valLen int _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - has = len(b.Get([]byte(saddr))) + valLen = len(b.Get([]byte(saddr))) return nil }) - if 0 < has { + if valLen > 0 { err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) err := b.Delete([]byte(saddr)) @@ -52,18 +54,32 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db DELETE"), ) - c.objCounters.DecDB() + c.objCounters.decDB(valLen) return nil } - _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) + // While getting an object looks overheadly, it allows to + // get its size correctly without any additional memory/disk/CPU + // usage on the WC's side _for every object_. `Delete` is not + // expected to be called right after an object is put to the + // Write-cache often, and for non-existing objects (persisted + // to the main storage and dropped from the WC's storage) it + // is `os.Stat` vs `os.Remove` calls after all. + res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) + if errors.As(err, new(apistatus.ObjectNotFound)) { + return nil + } else if err != nil { + return err + } + + _, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) - c.objCounters.DecFS() + c.objCounters.decFS(len(res.RawData)) } return err diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index bdebad897..9d12cf3f1 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -11,7 +11,9 @@ import ( objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/mr-tron/base58" @@ -33,6 +35,11 @@ const ( defaultFlushInterval = time.Second ) +type objWithData struct { + obj *object.Object + data []byte +} + // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *Cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { @@ -58,7 +65,7 @@ func (c *Cache) runFlushLoop() { case <-tt.C: c.flushSmallObjects() tt.Reset(defaultFlushInterval) - case <-c.closeCh: + case <-c.workersChan: return } } @@ -70,20 +77,13 @@ func (c *Cache) flushSmallObjects() { var m []objectInfo for { select { - case <-c.closeCh: + case <-c.workersChan: return default: } m = m[:0] - c.modeMtx.RLock() - if c.readOnly() || !c.initialized.Load() { - c.modeMtx.RUnlock() - time.Sleep(time.Second) - continue - } - // We put objects in batches of fixed size to not interfere with main put cycle a lot. _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) @@ -117,31 +117,25 @@ func (c *Cache) flushSmallObjects() { var count int for i := range m { - if c.flushed.Contains(m[i].addr) { - continue - } - obj := object.New() - if err := obj.Unmarshal(m[i].data); err != nil { + data := m[i].data + + if err := obj.Unmarshal(data); err != nil { continue } count++ select { - case c.flushCh <- obj: - case <-c.closeCh: - c.modeMtx.RUnlock() + case c.smallFlushCh <- objWithData{obj: obj, data: data}: + case <-c.workersChan: return } } if count == 0 { - c.modeMtx.RUnlock() break } - c.modeMtx.RUnlock() - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Int("count", count), zap.String("start", base58.Encode(lastKey))) @@ -157,15 +151,12 @@ func (c *Cache) flushBigObjects(ctx context.Context) { if c.readOnly() { c.modeMtx.RUnlock() break - } else if !c.initialized.Load() { - c.modeMtx.RUnlock() - continue } _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() - case <-c.closeCh: + case <-c.workersChan: return } } @@ -187,8 +178,12 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { sAddr := addr.EncodeToString() - if _, ok := c.store.flushed.Peek(sAddr); ok { - return nil + select { + case <-c.workersChan: + return stopIter + case <-ctx.Done(): + return ctx.Err() + default: } data, err := f() @@ -210,7 +205,7 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, data) + err = c.flushObject(ctx, objWithData{obj: &obj, data: data}) if err != nil { if ignoreErrors { return nil @@ -218,13 +213,23 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - // mark object as flushed - c.flushed.Add(sAddr, false) + err = c.dropBigObject(ctx, addr, len(data)) + if err != nil { + c.reportFlushError("can't drop an object from FSTree", sAddr, err) + if ignoreErrors { + return nil + } + return err + } return nil } _, err := c.fsTree.Iterate(prm) + if errors.Is(err, stopIter) { + return nil + } + return err } @@ -232,24 +237,32 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { func (c *Cache) smallObjectsFlusher() { defer c.wg.Done() - var obj *object.Object + var objAndData objWithData for { // Give priority to direct put. select { - case obj = <-c.flushCh: - case <-c.closeCh: + case objAndData = <-c.smallFlushCh: + case <-c.workersChan: return } - err := c.flushObject(context.TODO(), obj, nil) + err := c.flushObject(context.TODO(), objAndData) if err == nil { - c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) + addr := objectCore.AddressOf(objAndData.obj) + + err = c.dropSmallObject(context.TODO(), addr) + if err != nil { + c.reportFlushError("can't drop object from write-cache", + addr.EncodeToString(), err) + } } } } // flushObject is used to write object directly to the main storage. -func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error { +func (c *Cache) flushObject(ctx context.Context, objAndData objWithData) error { + obj := objAndData.obj + data := objAndData.data addr := objectCore.AddressOf(obj) var prm common.PutPrm @@ -272,6 +285,11 @@ func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte _, err = c.metabase.UpdateStorageID(updPrm) if err != nil { + if errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) { + // object info is outdated in the WC + return nil + } + c.reportFlushError("can't update object storage ID", addr.EncodeToString(), err) } @@ -299,16 +317,20 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - return c.db.View(func(tx *bbolt.Tx) error { + var dbFunc func(func(*bbolt.Tx) error) error + if c.readOnly() { + dbFunc = c.db.View + } else { + dbFunc = c.db.Update + } + + return dbFunc(func(tx *bbolt.Tx) error { var addr oid.Address b := tx.Bucket(defaultBucket) cs := b.Cursor() for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { sa := string(k) - if _, ok := c.flushed.Peek(sa); ok { - continue - } if err := addr.DecodeString(sa); err != nil { c.reportFlushError("can't decode object address from the DB", sa, err) @@ -327,10 +349,101 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - if err := c.flushObject(ctx, &obj, data); err != nil { + err := c.flushObject(ctx, objWithData{obj: &obj, data: data}) + if err != nil { + if ignoreErrors { + continue + } + return err } + + if c.readOnly() { + continue + } + + removed, err := dropObject(tx, k) + if err != nil { + c.reportFlushError("can't drop an object from the DB", sa, err) + if ignoreErrors { + continue + } + } + + storagelog.Write(c.log, + storagelog.AddressField(addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + c.objCounters.decDB(removed) } return nil }) } + +func (c *Cache) dropSmallObject(ctx context.Context, addr oid.Address) error { + var removedBytes int + key := []byte(addr.EncodeToString()) + var err error + + err = c.db.Batch(func(tx *bbolt.Tx) error { + select { + case <-c.workersChan: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + + removedBytes, err = dropObject(tx, key) + + return err + + }) + if err != nil { + return err + } + + storagelog.Write(c.log, + storagelog.AddressField(addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + + if removedBytes > 0 { + c.objCounters.decDB(removedBytes) + } + + return nil +} + +func dropObject(tx *bbolt.Tx, key []byte) (int, error) { + b := tx.Bucket(defaultBucket) + + removedBytes := len(b.Get(key)) + if removedBytes > 0 { + return removedBytes, b.Delete(key) + } + + return 0, nil +} + +func (c *Cache) dropBigObject(ctx context.Context, addr oid.Address, size int) error { + _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) + if err != nil { + if errors.As(err, new(apistatus.ObjectNotFound)) { + return nil + } + + return err + } + + storagelog.Write(c.log, + storagelog.AddressField(addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("fstree DELETE"), + ) + c.objCounters.decFS(size) + + return nil +} diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index c42bda255..87799b643 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -5,7 +5,6 @@ import ( "os" "path/filepath" "testing" - "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -106,14 +105,6 @@ func TestFlush(t *testing.T) { wc, bs, mb := newCache(t) objects := putObjects(t, wc) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) - - wc.flushed.Add(objects[0].addr.EncodeToString(), true) - wc.flushed.Add(objects[1].addr.EncodeToString(), false) - - require.NoError(t, wc.Flush(context.Background(), false)) - for i := 0; i < 2; i++ { var mPrm meta.GetPrm mPrm.SetAddress(objects[i].addr) @@ -124,25 +115,10 @@ func TestFlush(t *testing.T) { require.Error(t, err) } - check(t, mb, bs, objects[2:]) - }) - - t.Run("flush on moving to degraded mode", func(t *testing.T) { - wc, bs, mb := newCache(t) - objects := putObjects(t, wc) - - // Blobstor is read-only, so we expect en error from `flush` here. - require.Error(t, wc.SetMode(mode.Degraded)) - - // First move to read-only mode to close background workers. - require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - wc.flushed.Add(objects[0].addr.EncodeToString(), true) - wc.flushed.Add(objects[1].addr.EncodeToString(), false) - - require.NoError(t, wc.SetMode(mode.Degraded)) + require.NoError(t, wc.Flush(context.Background(), false)) for i := 0; i < 2; i++ { var mPrm meta.GetPrm @@ -151,7 +127,40 @@ func TestFlush(t *testing.T) { require.Error(t, err) _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) + require.NoError(t, err) + } + + check(t, mb, bs, objects[2:]) + }) + + t.Run("flush on moving to degraded mode", func(t *testing.T) { + wc, bs, mb := newCache(t) + objects := putObjects(t, wc) + + // Moving to the degraded mode is called with `ignoreErrors` so + // we do not expect an error from `flush` here. + require.NoError(t, wc.SetMode(mode.Degraded)) + + // bs is read-only; so is can't get the objects + for i := 0; i < 2; i++ { + var mPrm meta.GetPrm + mPrm.SetAddress(objects[i].addr) + _, err := mb.Get(context.Background(), mPrm) require.Error(t, err) + + _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) + require.Error(t, err) + } + + require.NoError(t, wc.SetMode(mode.ReadWrite)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + require.NoError(t, wc.SetMode(mode.Degraded)) + + for i := 0; i < 2; i++ { + _, err := bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) + require.NoError(t, err) } check(t, mb, bs, objects[2:]) @@ -166,7 +175,6 @@ func TestFlush(t *testing.T) { objects := putObjects(t, wc) f(wc) - require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) @@ -224,7 +232,7 @@ func TestFlush(t *testing.T) { }) }) - t.Run("on init", func(t *testing.T) { + t.Run("flush", func(t *testing.T) { wc, bs, mb := newCache(t) objects := []objectPair{ // removed @@ -260,9 +268,6 @@ func TestFlush(t *testing.T) { _, err = mb.Delete(context.Background(), deletePrm) require.NoError(t, err) - require.NoError(t, bs.SetMode(mode.ReadOnly)) - require.NoError(t, mb.SetMode(mode.ReadOnly)) - // Open in read-only: no error, nothing is removed. require.NoError(t, wc.Open(true)) initWC(t, wc) @@ -275,13 +280,17 @@ func TestFlush(t *testing.T) { // Open in read-write: no error, something is removed. require.NoError(t, wc.Open(false)) initWC(t, wc) + for i := range objects { _, err := wc.Get(context.Background(), objects[i].addr) - if i < 2 { - require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) - } else { - require.NoError(t, err, i) - } + require.NoError(t, err, i) + } + + require.NoError(t, wc.Flush(context.Background(), true)) + + for i := range objects { + _, err := wc.Get(context.Background(), objects[i].addr) + require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) } }) } @@ -321,10 +330,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) { func initWC(t *testing.T, wc *Cache) { require.NoError(t, wc.Init()) - - require.Eventually(t, func() bool { - return wc.initialized.Load() - }, 100*time.Second, 1*time.Millisecond) } type dummyEpoch struct{} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index e303b5d3d..71d05538e 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -30,7 +30,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() - c.flushed.Get(saddr) return obj, obj.Unmarshal(value) } @@ -39,7 +38,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } - c.flushed.Get(saddr) return res.Object, nil } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 28f1249f2..89c1bf351 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -2,191 +2,33 @@ package writecache import ( "context" - "errors" - "sync" + "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.uber.org/zap" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" ) -func (c *Cache) initFlushMarks(ctx context.Context) { - var localWG sync.WaitGroup +// Init runs necessary services. +func (c *Cache) Init() error { + ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init") + defer span.End() - localWG.Add(1) - go func() { - defer localWG.Done() + c.modeMtx.Lock() + defer c.modeMtx.Unlock() - c.fsTreeFlushMarkUpdate(ctx) - }() - - localWG.Add(1) - go func() { - defer localWG.Done() - - c.dbFlushMarkUpdate(ctx) - }() - - c.initWG.Add(1) - c.wg.Add(1) - go func() { - defer c.wg.Done() - defer c.initWG.Done() - - localWG.Wait() - - select { - case <-c.stopInitCh: - return - case <-c.closeCh: - return - default: - } - - c.initialized.Store(true) - }() -} - -var errStopIter = errors.New("stop iteration") - -func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) { - c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree) - - var prm common.IteratePrm - prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { - select { - case <-c.closeCh: - return errStopIter - case <-c.stopInitCh: - return errStopIter - default: - } - - flushed, needRemove := c.flushStatus(ctx, addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - var prm common.DeletePrm - prm.Address = addr - - _, err := c.fsTree.Delete(ctx, prm) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("fstree DELETE"), - ) - } - } - } + if c.mode.NoMetabase() { return nil } - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - _, _ = c.fsTree.Iterate(prm) - - c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks) -} - -func (c *Cache) dbFlushMarkUpdate(ctx context.Context) { - c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase) - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - var m []string - var indices []int - var lastKey []byte - var batchSize = flushBatchSize - for { - select { - case <-c.closeCh: - return - case <-c.stopInitCh: - return - default: - } - - m = m[:0] - indices = indices[:0] - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() { - m = append(m, string(k)) - } - return nil - }) - - var addr oid.Address - for i := range m { - if err := addr.DecodeString(m[i]); err != nil { - continue - } - - flushed, needRemove := c.flushStatus(ctx, addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - indices = append(indices, i) - } - } - } - - if len(m) == 0 { - break - } - - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for _, j := range indices { - if err := b.Delete([]byte(m[j])); err != nil { - return err - } - } - return nil - }) - if err == nil { - for _, j := range indices { - storagelog.Write(c.log, - zap.String("address", m[j]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - } - } - lastKey = append([]byte(m[len(m)-1]), 0) - } - - c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks) -} - -// flushStatus returns info about the object state in the main storage. -// First return value is true iff object exists. -// Second return value is true iff object can be safely removed. -func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) { - var existsPrm meta.ExistsPrm - existsPrm.SetAddress(addr) - - _, err := c.metabase.Exists(ctx, existsPrm) + err := c.initCounters(ctx) if err != nil { - needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) - return needRemove, needRemove + return fmt.Errorf("initializing write-cache size: %w", err) } - var prm meta.StorageIDPrm - prm.SetAddress(addr) + if c.mode == mode.ReadWrite { + c.workersChan = make(chan struct{}) + c.runFlushLoop() + } - mRes, _ := c.metabase.StorageID(ctx, prm) - res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) - return err == nil && res.Exists, false + return nil } diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index be7c37eab..945fe08fc 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -41,9 +41,6 @@ func (c *Cache) Iterate(prm IterationPrm) error { err := c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.ForEach(func(k, data []byte) error { - if _, ok := c.flushed.Peek(string(k)); ok { - return nil - } return prm.handler(data) }) }) @@ -54,9 +51,6 @@ func (c *Cache) Iterate(prm IterationPrm) error { var fsPrm common.IteratePrm fsPrm.IgnoreErrors = prm.ignoreErrors fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { - return nil - } data, err := f() if err != nil { if prm.ignoreErrors { diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 681cd84aa..b84f2942e 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -16,9 +16,6 @@ import ( // ErrReadOnly is returned when Put/Write is performed in a read-only mode. var ErrReadOnly = logicerr.New("write-cache is in read-only mode") -// ErrNotInitialized is returned when write-cache is initializing. -var ErrNotInitialized = logicerr.New("write-cache is not initialized yet") - // SetMode sets write-cache mode of operation. // When shard is put in read-only mode all objects in memory are flushed to disk // and all background jobs are suspended. @@ -35,29 +32,28 @@ func (c *Cache) SetMode(m mode.Mode) error { // setMode applies new mode. Must be called with cache.modeMtx lock taken. func (c *Cache) setMode(ctx context.Context, m mode.Mode) error { var err error - turnOffMeta := m.NoMetabase() - - if !c.initialized.Load() { - close(c.stopInitCh) - - c.initWG.Wait() - c.stopInitCh = make(chan struct{}) - - defer func() { - if err == nil && !turnOffMeta { - c.initFlushMarks(ctx) - } - }() - } c.modeMtx.Lock() defer c.modeMtx.Unlock() - if turnOffMeta && !c.mode.NoMetabase() { + var workersActive bool + select { + case <-c.workersChan: + default: + workersActive = true + } + + stopWorkers := m.NoMetabase() && !c.mode.NoMetabase() || c.mode.ReadWrite() && !m.ReadWrite() + if stopWorkers { err = c.flush(ctx, true) if err != nil { return err } + + if workersActive { + close(c.workersChan) + c.wg.Wait() + } } if c.db != nil { @@ -67,14 +63,14 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error { } // Suspend producers to ensure there are channel send operations in fly. - // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty + // smallFlushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty // guarantees that there are no in-fly operations. - for len(c.flushCh) != 0 { + for len(c.smallFlushCh) != 0 { c.log.Info(logs.WritecacheWaitingForChannelsToFlush) time.Sleep(time.Second) } - if turnOffMeta { + if m.NoMetabase() { c.mode = m return nil } @@ -84,6 +80,16 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error { } c.mode = m + + if m == mode.ReadWrite { + select { + case <-c.workersChan: + c.workersChan = make(chan struct{}) + c.runFlushLoop() + default: + } + } + return nil } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 3434e9355..1d3f6bc5b 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -48,8 +48,6 @@ type options struct { // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // 1 GiB by default. maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters // maxBatchSize is the maximum batch size for the small object database. maxBatchSize int // maxBatchDelay is the maximum batch wait time for the small object database. diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 1b513854e..f1b762871 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -3,6 +3,7 @@ package writecache import ( "context" "errors" + "fmt" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -37,8 +38,6 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro defer c.modeMtx.RUnlock() if c.readOnly() { return common.PutRes{}, ErrReadOnly - } else if !c.initialized.Load() { - return common.PutRes{}, ErrNotInitialized } sz := uint64(len(prm.RawData)) @@ -52,23 +51,42 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro data: prm.RawData, } - if sz <= c.smallObjectSize { - return common.PutRes{}, c.putSmall(oi) + if c.maxCacheSize < c.sizeIfAdd(sz) { + return common.PutRes{}, ErrOutOfSpace } - return common.PutRes{}, c.putBig(ctx, oi.addr, prm) + + if sz <= c.smallObjectSize { + err := c.putSmall(oi) + if err != nil { + err = fmt.Errorf("could not put small object to DB: %w", err) + } + + return common.PutRes{}, err + } + + err := c.putBig(ctx, oi.addr, prm) + if err != nil { + err = fmt.Errorf("could not put big object to FSTree: %w", err) + } + + return common.PutRes{}, err } // putSmall persists small objects to the write-cache database and // pushes the to the flush workers queue. func (c *Cache) putSmall(obj objectInfo) error { - cacheSize := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeDB(cacheSize) { - return ErrOutOfSpace - } + var alreadyExists bool err := c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - return b.Put([]byte(obj.addr), obj.data) + addr := []byte(obj.addr) + + alreadyExists = len(b.Get(addr)) != 0 + if alreadyExists { + return nil + } + + return b.Put(addr, obj.data) }) if err == nil { storagelog.Write(c.log, @@ -76,29 +94,22 @@ func (c *Cache) putSmall(obj objectInfo) error { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db PUT"), ) - c.objCounters.IncDB() + + if !alreadyExists { + c.objCounters.incDB(len(obj.data)) + } } return err } // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { - cacheSz := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeFS(cacheSz) { - return ErrOutOfSpace - } - _, err := c.fsTree.Put(ctx, prm) if err != nil { return err } - if c.blobstor.NeedsCompression(prm.Object) { - c.mtx.Lock() - c.compressFlags[addr] = struct{}{} - c.mtx.Unlock() - } - c.objCounters.IncFS() + c.objCounters.incFS(len(prm.RawData)) storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.StorageTypeField(wcStorageType), diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 99f496487..c4fea0cb2 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,72 +1,134 @@ package writecache import ( + "context" + "errors" "fmt" + "sync" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.uber.org/atomic" ) -func (c *Cache) estimateCacheSize() uint64 { - return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize -} - -func (c *Cache) incSizeDB(sz uint64) uint64 { - return sz + c.smallObjectSize -} - -func (c *Cache) incSizeFS(sz uint64) uint64 { - return sz + c.maxObjectSize +func (c *Cache) sizeIfAdd(delta uint64) uint64 { + return delta + c.objCounters.fstreeSize.Load() + c.objCounters.dbSize.Load() } type counters struct { - cDB, cFS atomic.Uint64 + dbSize, fstreeSize atomic.Uint64 } -func (x *counters) IncDB() { - x.cDB.Inc() +func (x *counters) incDB(delta int) { + x.dbSize.Add(uint64(delta)) } -func (x *counters) DecDB() { - x.cDB.Dec() +func (x *counters) decDB(delta int) { + x.dbSize.Sub(uint64(delta)) } -func (x *counters) DB() uint64 { - return x.cDB.Load() +func (x *counters) incFS(delta int) { + x.fstreeSize.Add(uint64(delta)) } -func (x *counters) IncFS() { - x.cFS.Inc() +func (x *counters) decFS(delta int) { + x.fstreeSize.Sub(uint64(delta)) } -func (x *counters) DecFS() { - x.cFS.Dec() +func (c *Cache) initCounters(ctx context.Context) error { + var wg sync.WaitGroup + var dbErr error + var fsErr error + + wg.Add(1) + go func() { + dbErr = c.initDBSizeCounter(ctx) + wg.Done() + }() + + wg.Add(1) + go func() { + fsErr = c.initFSSizeCounter(ctx) + wg.Done() + }() + + wg.Wait() + + switch { + case dbErr != nil: + return fmt.Errorf("database counter initialization: %w", dbErr) + case fsErr != nil: + return fmt.Errorf("FSTree counter initialization: %w", fsErr) + default: + return nil + } } -func (x *counters) FS() uint64 { - return x.cFS.Load() -} +var stopIter = errors.New("stop") -func (c *Cache) initCounters() error { - var inDB uint64 +func (c *Cache) initDBSizeCounter(ctx context.Context) error { + var inDB int err := c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) + if b == nil { + return nil } - return nil + + return b.ForEach(func(_, v []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.workersChan: + return stopIter + default: + } + + inDB += len(v) + return nil + }) }) - if err != nil { + if err != nil && !errors.Is(err, stopIter) { return fmt.Errorf("could not read write-cache DB counter: %w", err) } - inFS, err := c.fsTree.NumberOfObjects() - if err != nil { - return fmt.Errorf("could not read write-cache FS counter: %w", err) - } - - c.objCounters.cDB.Store(inDB) - c.objCounters.cFS.Store(inFS) + c.objCounters.dbSize.Store(uint64(inDB)) + + return nil +} + +func (c *Cache) initFSSizeCounter(ctx context.Context) error { + var inFSTree int + + var prm common.IteratePrm + prm.LazyHandler = func(address oid.Address, f func() ([]byte, error)) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.workersChan: + return stopIter + default: + } + + data, err := f() + if err != nil { + return err + } + + // write-cache is a temporary storage on a fast disk, + // so it is not expected to be configured with any + // compressor ever + inFSTree += len(data) + + return nil + } + + _, err := c.fsTree.Iterate(prm) + if err != nil && !errors.Is(err, stopIter) { + return fmt.Errorf("could not read write-cache FSTree counter: %w", err) + } + + c.objCounters.fstreeSize.Store(uint64(inFSTree)) return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 93f4cfd80..a44a9e956 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -1,40 +1,18 @@ package writecache import ( - "context" - "errors" "fmt" "os" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/hashicorp/golang-lru/v2/simplelru" "go.etcd.io/bbolt" - "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache +// smallStore represents persistent storage with in-memory LRU cache // for flushed items on top of it. -type store struct { - maxFlushedMarksCount int - maxRemoveBatchSize int - - // flushed contains addresses of objects that were already flushed to the main storage. - // We use LRU cache instead of map here to facilitate removing of unused object in favour of - // frequently read ones. - // MUST NOT be used inside bolt db transaction because it's eviction handler - // removes untracked items from the database. - flushed simplelru.LRUCache[string, bool] - db *bbolt.DB - - dbKeysToRemove []string - fsKeysToRemove []string +type smallStore struct { + db *bbolt.DB } const dbName = "small.bolt" @@ -69,101 +47,9 @@ func (c *Cache) openStore(readOnly bool) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync)) - if err := c.fsTree.Open(readOnly); err != nil { + if err = c.fsTree.Open(readOnly); err != nil { return fmt.Errorf("could not open FSTree: %w", err) } - // Write-cache can be opened multiple times during `SetMode`. - // flushed map must not be re-created in this case. - if c.flushed == nil { - c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed) - } - - c.initialized.Store(false) - return nil } - -// removeFlushed removes an object from the writecache. -// To minimize interference with the client operations, the actual removal -// is done in batches. -// It is not thread-safe and is used only as an evict callback to LRU cache. -func (c *Cache) removeFlushed(key string, value bool) { - fromDatabase := value - if fromDatabase { - c.dbKeysToRemove = append(c.dbKeysToRemove, key) - } else { - c.fsKeysToRemove = append(c.fsKeysToRemove, key) - } - - if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize { - c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove) - c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove) - } -} - -func (c *Cache) deleteFromDB(keys []string) []string { - if len(keys) == 0 { - return keys - } - - var errorIndex int - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for errorIndex = range keys { - if err := b.Delete([]byte(keys[errorIndex])); err != nil { - return err - } - } - return nil - }) - for i := 0; i < errorIndex; i++ { - c.objCounters.DecDB() - storagelog.Write(c.log, - storagelog.AddressField(keys[i]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - } - if err != nil { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) - } - - copy(keys, keys[errorIndex:]) - return keys[:len(keys)-errorIndex] -} - -func (c *Cache) deleteFromDisk(keys []string) []string { - if len(keys) == 0 { - return keys - } - - var copyIndex int - var addr oid.Address - - for i := range keys { - if err := addr.DecodeString(keys[i]); err != nil { - c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i])) - continue - } - - _, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr}) - if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) { - c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) - - // Save the key for the next iteration. - keys[copyIndex] = keys[i] - copyIndex++ - continue - } else if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(keys[i]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("fstree DELETE"), - ) - c.objCounters.DecFS() - } - } - - return keys[:copyIndex] -} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index a58d2edbb..3f13b0b1c 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -5,13 +5,11 @@ import ( "os" "sync" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.etcd.io/bbolt" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -25,27 +23,21 @@ type Info struct { type Cache struct { options - // mtx protects statistics, counters and compressFlags. - mtx sync.RWMutex + // objCounters contains atomic counters for the number of objects stored in cache. + objCounters counters - mode mode.Mode - initialized atomic.Bool - stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them - initWG sync.WaitGroup // for initialisation routines only - modeMtx sync.RWMutex - - // compressFlags maps address of a big object to boolean value indicating - // whether object should be compressed. - compressFlags map[string]struct{} + modeMtx sync.RWMutex + mode mode.Mode // flushCh is a channel with objects to flush. - flushCh chan *object.Object - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} + smallFlushCh chan objWithData + // workersChan is close channel, protected by modeMtx. + // It indicates status of the background workers. + workersChan chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup // store contains underlying database. - store + smallStore // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -70,13 +62,16 @@ var ( ) // New creates new writecache instance. +// The value must not be copied after creation. func New(opts ...Option) *Cache { - c := &Cache{ - flushCh: make(chan *object.Object), - mode: mode.ReadWrite, - stopInitCh: make(chan struct{}), + closeCh := make(chan struct{}) + close(closeCh) + + c := &Cache{ + smallFlushCh: make(chan objWithData), + mode: mode.ReadWrite, + workersChan: closeCh, - compressFlags: make(map[string]struct{}), options: options{ log: &logger.Logger{Logger: zap.NewNop()}, maxObjectSize: defaultMaxObjectSize, @@ -93,12 +88,6 @@ func New(opts ...Option) *Cache { opts[i](&c.options) } - // Make the LRU cache contain which take approximately 3/4 of the maximum space. - // Assume small and big objects are stored in 50-50 proportion. - c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4 - // Trigger the removal when the cache is 7/8 full, so that new items can still arrive. - c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8 - return c } @@ -120,40 +109,31 @@ func (c *Cache) Open(readOnly bool) error { return err } - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) + c.modeMtx.Lock() + defer c.modeMtx.Unlock() - return c.initCounters() -} + if readOnly { + c.mode = mode.ReadOnly + } else { + c.mode = mode.ReadWrite + } -// Init runs necessary services. -func (c *Cache) Init() error { - ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init") - defer span.End() - - c.initFlushMarks(ctx) - c.runFlushLoop() return nil } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *Cache) Close() error { - // Finish all in-progress operations. - if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil { - return err + // Finish all in-progress operations if they are + // in progress. + select { + case <-c.workersChan: + default: + err := c.setMode(context.TODO(), mode.ReadOnly) + if err != nil { + return err + } } - if c.closeCh != nil { - close(c.closeCh) - } - c.wg.Wait() - if c.closeCh != nil { - c.closeCh = nil - } - - c.initialized.Store(false) - var err error if c.db != nil { err = c.db.Close() -- 2.45.2