diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 7e8bf5926..5e1c4b0de 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -516,7 +516,7 @@ const ( StartedWritecacheSealAsync = "started writecache seal async" WritecacheSealCompletedAsync = "writecache seal completed successfully" FailedToSealWritecacheAsync = "failed to seal writecache async" - WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" + WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" ) diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cachebbolt.go similarity index 92% rename from pkg/local_object_storage/writecache/cache.go rename to pkg/local_object_storage/writecache/cachebbolt.go index 796fe155b..113ce54cd 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cachebbolt.go @@ -2,7 +2,7 @@ package writecache import ( "context" - "fmt" + "os" "sync" "sync/atomic" @@ -27,6 +27,8 @@ type cache struct { cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup + // store contains underlying database. + store // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -65,6 +67,7 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, + openFile: os.OpenFile, metrics: DefaultMetrics(), }, } @@ -99,16 +102,14 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { if err != nil { return metaerr.Wrap(err) } + return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init(ctx context.Context) error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) - if err := c.flushAndDropBBoltDB(ctx); err != nil { - return fmt.Errorf("flush previous version write-cache database: %w", err) - } - ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) // canceling performed by cache + ctx, cancel := context.WithCancel(context.Background()) c.cancel.Store(cancel) c.runFlushLoop(ctx) return nil @@ -131,10 +132,10 @@ func (c *cache) Close(ctx context.Context) error { defer c.modeMtx.Unlock() var err error - if c.fsTree != nil { - err = c.fsTree.Close(ctx) + if c.db != nil { + err = c.db.Close(ctx) if err != nil { - c.fsTree = nil + c.db = nil } } c.metrics.Close() diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 94a0a40db..ca3ab686c 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,6 +2,7 @@ package writecache import ( "context" + "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -9,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -43,11 +45,46 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return ErrDegraded } + saddr := addr.EncodeToString() + + var dataSize int + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + dataSize = len(b.Get([]byte(saddr))) + return nil + }) + + if dataSize > 0 { + storageType = StorageTypeDB + var recordDeleted bool + err := c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(saddr) + recordDeleted = b.Get(key) != nil + err := b.Delete(key) + return err + }) + if err != nil { + return err + } + storagelog.Write(c.log, + storagelog.AddressField(saddr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + if recordDeleted { + c.objCounters.cDB.Add(math.MaxUint64) + c.estimateCacheSize() + } + deleted = true + return nil + } + storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(ctx, c.log, - storagelog.AddressField(addr.EncodeToString()), + storagelog.AddressField(saddr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 094c6486d..acfb61653 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -4,9 +4,6 @@ import ( "bytes" "context" "errors" - "fmt" - "os" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -19,6 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/mr-tron/base58" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -26,6 +24,10 @@ import ( ) const ( + // flushBatchSize is amount of keys which will be read from cache to be flushed + // to the main storage. It is used to reduce contention between cache put + // and cache persist. + flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. @@ -39,11 +41,112 @@ func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } + for range c.workersCount { + c.wg.Add(1) + go c.workerFlushSmall(ctx) + } + c.wg.Add(1) go func() { c.workerFlushBig(ctx) c.wg.Done() }() + + c.wg.Add(1) + go func() { + defer c.wg.Done() + + tt := time.NewTimer(defaultFlushInterval) + defer tt.Stop() + + for { + select { + case <-tt.C: + c.flushSmallObjects(ctx) + tt.Reset(defaultFlushInterval) + c.estimateCacheSize() + case <-ctx.Done(): + return + } + } + }() +} + +func (c *cache) flushSmallObjects(ctx context.Context) { + var lastKey []byte + for { + select { + case <-ctx.Done(): + return + default: + } + + var m []objectInfo + + c.modeMtx.RLock() + if c.readOnly() { + c.modeMtx.RUnlock() + time.Sleep(time.Second) + continue + } + + // We put objects in batches of fixed size to not interfere with main put cycle a lot. + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + + var k, v []byte + + if len(lastKey) == 0 { + k, v = cs.First() + } else { + k, v = cs.Seek(lastKey) + if bytes.Equal(k, lastKey) { + k, v = cs.Next() + } + } + + for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { + if len(lastKey) == len(k) { + copy(lastKey, k) + } else { + lastKey = bytes.Clone(k) + } + + m = append(m, objectInfo{ + addr: string(k), + data: bytes.Clone(v), + }) + } + return nil + }) + + var count int + for i := range m { + obj := objectSDK.New() + if err := obj.Unmarshal(m[i].data); err != nil { + continue + } + m[i].obj = obj + + count++ + select { + case c.flushCh <- m[i]: + case <-ctx.Done(): + c.modeMtx.RUnlock() + return + } + } + + c.modeMtx.RUnlock() + if count == 0 { + break + } + + c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, + zap.Int("count", count), + zap.String("start", base58.Encode(lastKey))) + } } func (c *cache) workerFlushBig(ctx context.Context) { @@ -94,6 +197,9 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { + if ignoreErrors { + return nil + } return err } @@ -105,6 +211,29 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } +// workerFlushSmall writes small objects to the main storage. +func (c *cache) workerFlushSmall(ctx context.Context) { + defer c.wg.Done() + + var objInfo objectInfo + for { + // Give priority to direct put. + select { + case objInfo = <-c.flushCh: + case <-ctx.Done(): + return + } + + err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDB(objInfo.addr, true) + } +} + // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error @@ -171,33 +300,13 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - return c.flushFSTree(ctx, ignoreErrors) -} - -type batchItem struct { - data []byte - address string -} - -func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { - _, err := os.Stat(filepath.Join(c.path, dbName)) - if err != nil && os.IsNotExist(err) { - return nil + if err := c.flushFSTree(ctx, ignoreErrors); err != nil { + return err } - if err != nil { - return fmt.Errorf("could not check write-cache database existence: %w", err) - } - db, err := OpenDB(c.path, true, os.OpenFile, c.pageSize) - if err != nil { - return fmt.Errorf("could not open write-cache database: %w", err) - } - defer func() { - _ = db.Close() - }() var last string for { - batch, err := c.readNextDBBatch(db, last) + batch, err := c.readNextDBBatch(ignoreErrors, last) if err != nil { return err } @@ -207,27 +316,32 @@ func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { for _, item := range batch { var obj objectSDK.Object if err := obj.Unmarshal(item.data); err != nil { - return fmt.Errorf("unmarshal object from database: %w", err) + c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err } + if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return fmt.Errorf("flush object from database: %w", err) + return err } + c.deleteFromDB(item.address, false) } last = batch[len(batch)-1].address } - if err := db.Close(); err != nil { - return fmt.Errorf("close write-cache database: %w", err) - } - if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { - return fmt.Errorf("remove write-cache database: %w", err) - } return nil } -func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { +type batchItem struct { + data []byte + address string +} + +func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { const batchSize = 100 var batch []batchItem - err := db.View(func(tx *bbolt.Tx) error { + err := c.db.View(func(tx *bbolt.Tx) error { var addr oid.Address b := tx.Bucket(defaultBucket) @@ -238,7 +352,11 @@ func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) continue } if err := addr.DecodeString(sa); err != nil { - return fmt.Errorf("decode address from database: %w", err) + c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err } batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index e507d3b86..db3d79a86 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -19,6 +19,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -46,6 +47,31 @@ func TestFlush(t *testing.T) { } failures := []TestFailureInjector[Option]{ + { + Desc: "db, invalid address", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte{1, 2, 3}, data) + })) + }, + }, + { + Desc: "db, invalid object", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + k := []byte(oidtest.Address().EncodeToString()) + v := []byte{1, 2, 3} + return b.Put(k, v) + })) + }, + }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { @@ -237,7 +263,7 @@ func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPai prm.StorageID = mRes.StorageID() res, err := bs.Get(context.Background(), prm) - require.NoError(t, err, objects[i].addr) + require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) } } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index c0847a65f..bf26833bd 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -37,11 +37,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, ErrDegraded } - obj, err := c.getInternal(ctx, addr) + obj, err := c.getInternal(ctx, saddr, addr) return obj, metaerr.Wrap(err) } -func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { found := false storageType := StorageTypeUndefined startedAt := time.Now() @@ -49,6 +49,14 @@ func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.O c.metrics.Get(time.Since(startedAt), found, storageType) }() + value, err := Get(c.db, []byte(saddr)) + if err == nil { + obj := objectSDK.New() + found = true + storageType = StorageTypeDB + return obj, obj.Unmarshal(value) + } + res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) @@ -79,7 +87,7 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, return nil, ErrDegraded } - obj, err := c.getInternal(ctx, addr) + obj, err := c.getInternal(ctx, saddr, addr) if err != nil { return nil, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 73d12fd33..9a0c09f41 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -5,12 +5,13 @@ import ( "errors" "fmt" "os" + "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -52,7 +53,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error } } - if err := c.closeStorage(ctx, prm.shrink); err != nil { + if err := c.closeDB(prm.shrink); err != nil { return err } @@ -77,37 +78,33 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error return nil } -func (c *cache) closeStorage(ctx context.Context, shrink bool) error { - if c.fsTree == nil { +func (c *cache) closeDB(shrink bool) error { + if c.db == nil { return nil } if !shrink { - if err := c.fsTree.Close(ctx); err != nil { - return fmt.Errorf("can't close write-cache storage: %w", err) + if err := c.db.Close(ctx); err != nil { + return fmt.Errorf("can't close write-cache database: %w", err) } return nil } - empty := true - _, err := c.fsTree.Iterate(ctx, common.IteratePrm{ - Handler: func(common.IterationElement) error { - return errIterationCompleted - }, + var empty bool + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + empty = b == nil || b.Stats().KeyN == 0 + return nil }) - if err != nil { - if errors.Is(err, errIterationCompleted) { - empty = false - } else { - return fmt.Errorf("failed to check write-cache items: %w", err) - } + if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) { + return fmt.Errorf("failed to check DB items: %w", err) } - if err := c.fsTree.Close(ctx); err != nil { - return fmt.Errorf("can't close write-cache storage: %w", err) + if err := c.db.Close(ctx); err != nil { + return fmt.Errorf("can't close write-cache database: %w", err) } if empty { - err := os.RemoveAll(c.path) + err := os.Remove(filepath.Join(c.path, dbName)) if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove write-cache files: %w", err) + return fmt.Errorf("failed to remove DB file: %w", err) } } else { c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty) diff --git a/pkg/local_object_storage/writecache/mode_test.go b/pkg/local_object_storage/writecache/mode_test.go index 4fbadbc64..9facb784a 100644 --- a/pkg/local_object_storage/writecache/mode_test.go +++ b/pkg/local_object_storage/writecache/mode_test.go @@ -17,14 +17,14 @@ func TestMode(t *testing.T) { WithPath(t.TempDir())) require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Init(context.Background())) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Close(context.Background())) require.NoError(t, wc.Open(context.Background(), mode.Degraded)) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Init(context.Background())) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Close(context.Background())) } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 280359d00..96a30ed53 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -2,6 +2,8 @@ package writecache import ( "context" + "io/fs" + "os" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -41,6 +43,8 @@ type options struct { noSync bool // reportError is the function called when encountering disk errors in background workers. reportError func(context.Context, string, error) + // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. + openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation metrics Metrics // disableBackgroundFlush is for testing purposes only. @@ -152,6 +156,13 @@ func WithReportErrorFunc(f func(context.Context, string, error)) Option { } } +// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing. +func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { + return func(o *options) { + o.openFile = f + } +} + // WithMetrics sets metrics implementation. func WithMetrics(metrics Metrics) Option { return func(o *options) { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 7da5c4d3a..c423c2163 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,6 +8,7 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -49,16 +50,62 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, ErrBigObject } + oi := objectInfo{ + addr: prm.Address.EncodeToString(), + obj: prm.Object, + data: prm.RawData, + } + + if sz <= c.smallObjectSize { + storageType = StorageTypeDB + err := c.putSmall(oi) + if err == nil { + added = true + } + return common.PutRes{}, err + } + storageType = StorageTypeFSTree - err := c.putBig(ctx, prm) + err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } +// putSmall persists small objects to the write-cache database and +// pushes the to the flush workers queue. +func (c *cache) putSmall(obj objectInfo) error { + if !c.hasEnoughSpaceDB() { + return ErrOutOfSpace + } + + var newRecord bool + err := c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(obj.addr) + newRecord = b.Get(key) == nil + if newRecord { + return b.Put(key, obj.data) + } + return nil + }) + if err == nil { + storagelog.Write(c.log, + storagelog.AddressField(obj.addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db PUT"), + ) + if newRecord { + c.objCounters.cDB.Add(1) + c.estimateCacheSize() + } + } + return err +} + // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { +func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { if !c.hasEnoughSpaceFS() { return ErrOutOfSpace } @@ -69,7 +116,7 @@ func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { } storagelog.Write(ctx, c.log, - storagelog.AddressField(prm.Address.EncodeToString()), + storagelog.AddressField(addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree PUT"), ) diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index e4e22f404..d03f4a63e 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,18 +1,29 @@ package writecache import ( + "fmt" "math" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" ) func (c *cache) estimateCacheSize() (uint64, uint64) { + dbCount := c.objCounters.DB() fsCount := c.objCounters.FS() + if fsCount > 0 { + fsCount-- // db file + } + dbSize := dbCount * c.smallObjectSize fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(0, fsSize) - c.metrics.SetActualCounters(0, fsCount) - return fsCount, fsSize + c.metrics.SetEstimateSize(dbSize, fsSize) + c.metrics.SetActualCounters(dbCount, fsCount) + return dbCount + fsCount, dbSize + fsSize +} + +func (c *cache) hasEnoughSpaceDB() bool { + return c.hasEnoughSpace(c.smallObjectSize) } func (c *cache) hasEnoughSpaceFS() bool { @@ -30,7 +41,11 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { var _ fstree.FileCounter = &counters{} type counters struct { - cFS atomic.Uint64 + cDB, cFS atomic.Uint64 +} + +func (x *counters) DB() uint64 { + return x.cDB.Load() } func (x *counters) FS() uint64 { @@ -53,6 +68,18 @@ func (x *counters) Dec() { } func (c *cache) initCounters() error { + var inDB uint64 + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + inDB = uint64(b.Stats().KeyN) + } + return nil + }) + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) + } + c.objCounters.cDB.Store(inDB) c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 8631437f4..411ec34b9 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -3,6 +3,7 @@ package writecache import ( "context" "fmt" + "math" "os" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -13,9 +14,16 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.uber.org/zap" ) +// store represents persistent storage with in-memory LRU cache +// for flushed items on top of it. +type store struct { + db *bbolt.DB +} + const dbName = "small.bolt" func (c *cache) openStore(mod mode.ComponentMode) error { @@ -24,6 +32,24 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return err } + c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize) + if err != nil { + return fmt.Errorf("could not open database: %w", err) + } + + c.db.MaxBatchSize = c.maxBatchSize + c.db.MaxBatchDelay = c.maxBatchDelay + + if !mod.ReadOnly() { + err = c.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(defaultBucket) + return err + }) + if err != nil { + return fmt.Errorf("could not create default bucket: %w", err) + } + } + c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), @@ -42,6 +68,41 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } +func (c *cache) deleteFromDB(key string, batched bool) { + var recordDeleted bool + var err error + if batched { + err = c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } else { + err = c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } + + if err == nil { + c.metrics.Evict(StorageTypeDB) + storagelog.Write(c.log, + storagelog.AddressField(key), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + if recordDeleted { + c.objCounters.cDB.Add(math.MaxUint64) + c.estimateCacheSize() + } + } else { + c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + } +} + func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err != nil && !client.IsErrObjectNotFound(err) {