frostfs-node/pkg/local_object_storage/writecache/writecachebbolt/storage.go
Dmitrii Stepanov d4b6ebe7e7 [#725] writecache: Fix metric values
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-27 12:22:29 +03:00

131 lines
3.3 KiB
Go

package writecachebbolt
import (
"context"
"fmt"
"math"
"os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil {
return err
}
c.db, err = OpenDB(c.path, readOnly, c.openFile)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !readOnly {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New(
fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm),
fstree.WithDepth(1),
fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync),
fstree.WithFileCounter(&c.objCounters),
)
if err := c.fsTree.Open(readOnly); err != nil {
return fmt.Errorf("could not open FSTree: %w", err)
}
if err := c.fsTree.Init(); err != nil {
return fmt.Errorf("could not init FSTree: %w", err)
}
return nil
}
func (c *cache) deleteFromDB(key string) {
var recordDeleted bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
if err == nil {
c.metrics.Evict(writecache.StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
} else {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
}
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
if len(keys) == 0 {
return keys
}
var copyIndex int
var addr oid.Address
for i := range keys {
if err := addr.DecodeString(keys[i]); err != nil {
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
continue
}
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
// Save the key for the next iteration.
keys[copyIndex] = keys[i]
copyIndex++
continue
} else if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(keys[i]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
c.metrics.Evict(writecache.StorageTypeFSTree)
// counter changed by fstree
c.estimateCacheSize()
}
}
return keys[:copyIndex]
}