From a8d5756faba449cf9b82df35c37dce3f5778d22f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 9 Sep 2024 18:37:06 +0300 Subject: [PATCH] [#9999] writecache: Drop bbolt DB Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 +- .../writecache/{cachebbolt.go => cache.go} | 17 +- pkg/local_object_storage/writecache/delete.go | 39 +--- pkg/local_object_storage/writecache/flush.go | 188 ++++-------------- .../writecache/flush_test.go | 28 +-- pkg/local_object_storage/writecache/get.go | 14 +- pkg/local_object_storage/writecache/mode.go | 39 ++-- .../writecache/mode_test.go | 8 +- .../writecache/options.go | 11 - pkg/local_object_storage/writecache/put.go | 53 +---- pkg/local_object_storage/writecache/state.go | 35 +--- .../writecache/storage.go | 54 +---- 12 files changed, 89 insertions(+), 399 deletions(-) rename pkg/local_object_storage/writecache/{cachebbolt.go => cache.go} (92%) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 97b189529..87e4e0b43 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -542,6 +542,6 @@ const ( StartedWritecacheSealAsync = "started writecache seal async" WritecacheSealCompletedAsync = "writecache seal completed successfully" FailedToSealWritecacheAsync = "failed to seal writecache async" - WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" + WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" ) diff --git a/pkg/local_object_storage/writecache/cachebbolt.go b/pkg/local_object_storage/writecache/cache.go similarity index 92% rename from pkg/local_object_storage/writecache/cachebbolt.go rename to pkg/local_object_storage/writecache/cache.go index f1e6a619a..237134a3e 100644 --- a/pkg/local_object_storage/writecache/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -2,7 +2,7 @@ package writecache import ( "context" - "os" + "fmt" "sync" "sync/atomic" @@ -27,8 +27,6 @@ type cache struct { cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup - // store contains underlying database. - store // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -67,7 +65,6 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, - openFile: os.OpenFile, metrics: DefaultMetrics(), }, } @@ -91,7 +88,7 @@ func (c *cache) DumpInfo() Info { } // Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(_ context.Context, mod mode.Mode) error { +func (c *cache) Open(ctx context.Context, mod mode.Mode) error { c.modeMtx.Lock() defer c.modeMtx.Unlock() c.mode = mod @@ -102,13 +99,15 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { if err != nil { return metaerr.Wrap(err) } - return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init() error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) + if err := c.flushAndDropBBoltDB(context.Background()); err != nil { + return fmt.Errorf("flush previous version write-cache database: %w", err) + } ctx, cancel := context.WithCancel(context.Background()) c.cancel.Store(cancel) c.runFlushLoop(ctx) @@ -132,10 +131,10 @@ func (c *cache) Close() error { defer c.modeMtx.Unlock() var err error - if c.db != nil { - err = c.db.Close() + if c.fsTree != nil { + err = c.fsTree.Close() if err != nil { - c.db = nil + c.fsTree = nil } } c.metrics.Close() diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index b1a0511ee..dda284439 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,7 +2,6 @@ package writecache import ( "context" - "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -45,46 +43,11 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return ErrDegraded } - saddr := addr.EncodeToString() - - var dataSize int - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - dataSize = len(b.Get([]byte(saddr))) - return nil - }) - - if dataSize > 0 { - storageType = StorageTypeDB - var recordDeleted bool - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(saddr) - recordDeleted = b.Get(key) != nil - err := b.Delete(key) - return err - }) - if err != nil { - return err - } - storagelog.Write(c.log, - storagelog.AddressField(saddr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - deleted = true - return nil - } - storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, - storagelog.AddressField(saddr), + storagelog.AddressField(addr.EncodeToString()), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 930ac8431..c2a916baa 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "errors" + "fmt" + "os" + "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -16,7 +19,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/mr-tron/base58" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -41,112 +43,11 @@ func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } - for range c.workersCount { - c.wg.Add(1) - go c.workerFlushSmall(ctx) - } - c.wg.Add(1) go func() { c.workerFlushBig(ctx) c.wg.Done() }() - - c.wg.Add(1) - go func() { - defer c.wg.Done() - - tt := time.NewTimer(defaultFlushInterval) - defer tt.Stop() - - for { - select { - case <-tt.C: - c.flushSmallObjects(ctx) - tt.Reset(defaultFlushInterval) - c.estimateCacheSize() - case <-ctx.Done(): - return - } - } - }() -} - -func (c *cache) flushSmallObjects(ctx context.Context) { - var lastKey []byte - for { - select { - case <-ctx.Done(): - return - default: - } - - var m []objectInfo - - c.modeMtx.RLock() - if c.readOnly() { - c.modeMtx.RUnlock() - time.Sleep(time.Second) - continue - } - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - - var k, v []byte - - if len(lastKey) == 0 { - k, v = cs.First() - } else { - k, v = cs.Seek(lastKey) - if bytes.Equal(k, lastKey) { - k, v = cs.Next() - } - } - - for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { - if len(lastKey) == len(k) { - copy(lastKey, k) - } else { - lastKey = bytes.Clone(k) - } - - m = append(m, objectInfo{ - addr: string(k), - data: bytes.Clone(v), - }) - } - return nil - }) - - var count int - for i := range m { - obj := objectSDK.New() - if err := obj.Unmarshal(m[i].data); err != nil { - continue - } - m[i].obj = obj - - count++ - select { - case c.flushCh <- m[i]: - case <-ctx.Done(): - c.modeMtx.RUnlock() - return - } - } - - c.modeMtx.RUnlock() - if count == 0 { - break - } - - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, - zap.Int("count", count), - zap.String("start", base58.Encode(lastKey))) - } } func (c *cache) workerFlushBig(ctx context.Context) { @@ -197,9 +98,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { - if ignoreErrors { - return nil - } return err } @@ -211,29 +109,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } -// workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall(ctx context.Context) { - defer c.wg.Done() - - var objInfo objectInfo - for { - // Give priority to direct put. - select { - case objInfo = <-c.flushCh: - case <-ctx.Done(): - return - } - - err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDB(objInfo.addr, true) - } -} - // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error @@ -300,13 +175,33 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - if err := c.flushFSTree(ctx, ignoreErrors); err != nil { - return err + return c.flushFSTree(ctx, ignoreErrors) +} + +type batchItem struct { + data []byte + address string +} + +func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { + _, err := os.Stat(filepath.Join(c.path, dbName)) + if err != nil && os.IsNotExist(err) { + return nil } + if err != nil { + return fmt.Errorf("could not check write-cache database existence: %w", err) + } + db, err := OpenDB(c.path, true, os.OpenFile, c.pageSize) + if err != nil { + return fmt.Errorf("could not open write-cache database: %w", err) + } + defer func() { + _ = db.Close() + }() var last string for { - batch, err := c.readNextDBBatch(ignoreErrors, last) + batch, err := c.readNextDBBatch(db, last) if err != nil { return err } @@ -316,32 +211,27 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { for _, item := range batch { var obj objectSDK.Object if err := obj.Unmarshal(item.data); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err + return fmt.Errorf("unmarshal object from database: %w", err) } - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return err + return fmt.Errorf("flush object from database: %w", err) } - c.deleteFromDB(item.address, false) } last = batch[len(batch)-1].address } + if err := db.Close(); err != nil { + return fmt.Errorf("close write-cache database: %w", err) + } + if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { + return fmt.Errorf("remove write-cache database: %w", err) + } return nil } -type batchItem struct { - data []byte - address string -} - -func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { +func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { const batchSize = 100 var batch []batchItem - err := c.db.View(func(tx *bbolt.Tx) error { + err := db.View(func(tx *bbolt.Tx) error { var addr oid.Address b := tx.Bucket(defaultBucket) @@ -352,11 +242,7 @@ func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, er continue } if err := addr.DecodeString(sa); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err + return fmt.Errorf("decode address from database: %w", err) } batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index a637da45d..9c7e240e0 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -19,7 +19,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -47,31 +46,6 @@ func TestFlush(t *testing.T) { } failures := []TestFailureInjector[Option]{ - { - Desc: "db, invalid address", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) - }, - }, - { - Desc: "db, invalid object", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - k := []byte(oidtest.Address().EncodeToString()) - v := []byte{1, 2, 3} - return b.Put(k, v) - })) - }, - }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { @@ -263,7 +237,7 @@ func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPai prm.StorageID = mRes.StorageID() res, err := bs.Get(context.Background(), prm) - require.NoError(t, err) + require.NoError(t, err, objects[i].addr) require.Equal(t, objects[i].obj, res.Object) } } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index bf26833bd..c0847a65f 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -37,11 +37,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) return obj, metaerr.Wrap(err) } -func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { found := false storageType := StorageTypeUndefined startedAt := time.Now() @@ -49,14 +49,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) c.metrics.Get(time.Since(startedAt), found, storageType) }() - value, err := Get(c.db, []byte(saddr)) - if err == nil { - obj := objectSDK.New() - found = true - storageType = StorageTypeDB - return obj, obj.Unmarshal(value) - } - res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) @@ -87,7 +79,7 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) if err != nil { return nil, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 44da9b36e..ff1d04774 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -5,13 +5,12 @@ import ( "errors" "fmt" "os" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -53,7 +52,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error } } - if err := c.closeDB(prm.shrink); err != nil { + if err := c.closeStorage(ctx, prm.shrink); err != nil { return err } @@ -78,33 +77,37 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error return nil } -func (c *cache) closeDB(shrink bool) error { - if c.db == nil { +func (c *cache) closeStorage(ctx context.Context, shrink bool) error { + if c.fsTree == nil { return nil } if !shrink { - if err := c.db.Close(); err != nil { - return fmt.Errorf("can't close write-cache database: %w", err) + if err := c.fsTree.Close(); err != nil { + return fmt.Errorf("can't close write-cache storage: %w", err) } return nil } - var empty bool - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - empty = b == nil || b.Stats().KeyN == 0 - return nil + empty := true + _, err := c.fsTree.Iterate(ctx, common.IteratePrm{ + Handler: func(ie common.IterationElement) error { + return errIterationCompleted + }, }) - if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) { - return fmt.Errorf("failed to check DB items: %w", err) + if err != nil { + if errors.Is(err, errIterationCompleted) { + empty = false + } else { + return fmt.Errorf("failed to check write-cache items: %w", err) + } } - if err := c.db.Close(); err != nil { - return fmt.Errorf("can't close write-cache database: %w", err) + if err := c.fsTree.Close(); err != nil { + return fmt.Errorf("can't close write-cache storage: %w", err) } if empty { - err := os.Remove(filepath.Join(c.path, dbName)) + err := os.RemoveAll(c.path) if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove DB file: %w", err) + return fmt.Errorf("failed to remove write-cache files: %w", err) } } else { c.log.Info(logs.WritecacheShrinkSkippedNotEmpty) diff --git a/pkg/local_object_storage/writecache/mode_test.go b/pkg/local_object_storage/writecache/mode_test.go index f684c15bc..70cfe8382 100644 --- a/pkg/local_object_storage/writecache/mode_test.go +++ b/pkg/local_object_storage/writecache/mode_test.go @@ -17,14 +17,14 @@ func TestMode(t *testing.T) { WithPath(t.TempDir())) require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, wc.(*cache).db) + require.Nil(t, wc.(*cache).fsTree) require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) + require.Nil(t, wc.(*cache).fsTree) require.NoError(t, wc.Close()) require.NoError(t, wc.Open(context.Background(), mode.Degraded)) - require.Nil(t, wc.(*cache).db) + require.Nil(t, wc.(*cache).fsTree) require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) + require.Nil(t, wc.(*cache).fsTree) require.NoError(t, wc.Close()) } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 980cf9303..7845c5da9 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -1,8 +1,6 @@ package writecache import ( - "io/fs" - "os" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -42,8 +40,6 @@ type options struct { noSync bool // reportError is the function called when encountering disk errors in background workers. reportError func(string, error) - // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. - openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation metrics Metrics // disableBackgroundFlush is for testing purposes only. @@ -155,13 +151,6 @@ func WithReportErrorFunc(f func(string, error)) Option { } } -// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing. -func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { - return func(o *options) { - o.openFile = f - } -} - // WithMetrics sets metrics implementation. func WithMetrics(metrics Metrics) Option { return func(o *options) { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index ae0e8b77a..c53067bea 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,7 +8,6 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -50,62 +49,16 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, ErrBigObject } - oi := objectInfo{ - addr: prm.Address.EncodeToString(), - obj: prm.Object, - data: prm.RawData, - } - - if sz <= c.smallObjectSize { - storageType = StorageTypeDB - err := c.putSmall(oi) - if err == nil { - added = true - } - return common.PutRes{}, err - } - storageType = StorageTypeFSTree - err := c.putBig(ctx, oi.addr, prm) + err := c.putBig(ctx, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } -// putSmall persists small objects to the write-cache database and -// pushes the to the flush workers queue. -func (c *cache) putSmall(obj objectInfo) error { - if !c.hasEnoughSpaceDB() { - return ErrOutOfSpace - } - - var newRecord bool - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(obj.addr) - newRecord = b.Get(key) == nil - if newRecord { - return b.Put(key, obj.data) - } - return nil - }) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(obj.addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db PUT"), - ) - if newRecord { - c.objCounters.cDB.Add(1) - c.estimateCacheSize() - } - } - return err -} - // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { +func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { if !c.hasEnoughSpaceFS() { return ErrOutOfSpace } @@ -116,7 +69,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro } storagelog.Write(c.log, - storagelog.AddressField(addr), + storagelog.AddressField(prm.Address.EncodeToString()), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree PUT"), ) diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index d03f4a63e..e4e22f404 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,29 +1,18 @@ package writecache import ( - "fmt" "math" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "go.etcd.io/bbolt" ) func (c *cache) estimateCacheSize() (uint64, uint64) { - dbCount := c.objCounters.DB() fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file - } - dbSize := dbCount * c.smallObjectSize fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(dbSize, fsSize) - c.metrics.SetActualCounters(dbCount, fsCount) - return dbCount + fsCount, dbSize + fsSize -} - -func (c *cache) hasEnoughSpaceDB() bool { - return c.hasEnoughSpace(c.smallObjectSize) + c.metrics.SetEstimateSize(0, fsSize) + c.metrics.SetActualCounters(0, fsCount) + return fsCount, fsSize } func (c *cache) hasEnoughSpaceFS() bool { @@ -41,11 +30,7 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { var _ fstree.FileCounter = &counters{} type counters struct { - cDB, cFS atomic.Uint64 -} - -func (x *counters) DB() uint64 { - return x.cDB.Load() + cFS atomic.Uint64 } func (x *counters) FS() uint64 { @@ -68,18 +53,6 @@ func (x *counters) Dec() { } func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) - } - c.objCounters.cDB.Store(inDB) c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 57021cc17..0347cee51 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -3,7 +3,6 @@ package writecache import ( "context" "fmt" - "math" "os" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -18,12 +17,6 @@ import ( "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *bbolt.DB -} - const dbName = "small.bolt" func (c *cache) openStore(mod mode.ComponentMode) error { @@ -32,24 +25,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return err } - c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize) - if err != nil { - return fmt.Errorf("could not open database: %w", err) - } - - c.db.MaxBatchSize = c.maxBatchSize - c.db.MaxBatchDelay = c.maxBatchDelay - - if !mod.ReadOnly() { - err = c.db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(defaultBucket) - return err - }) - if err != nil { - return fmt.Errorf("could not create default bucket: %w", err) - } - } - c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), @@ -68,36 +43,19 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDB(key string, batched bool) { - var recordDeleted bool - var err error - if batched { - err = c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } else { - err = c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } +func (c *cache) deleteFromDB(db *bbolt.DB, key string) { + err := db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + return b.Delete(key) + }) if err == nil { - c.metrics.Evict(StorageTypeDB) storagelog.Write(c.log, storagelog.AddressField(key), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db DELETE"), ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } } else { c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) }