frostfs-node/pkg/local_object_storage/writecache/writecachebadger/storage.go
Dmitrii Stepanov d4b6ebe7e7
All checks were successful
DCO action / DCO (pull_request) Successful in 1m33s
Build / Build Components (1.21) (pull_request) Successful in 3m11s
Build / Build Components (1.20) (pull_request) Successful in 3m22s
Vulncheck / Vulncheck (pull_request) Successful in 3m34s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m0s
Tests and linters / Tests (1.21) (pull_request) Successful in 6m1s
Tests and linters / Staticcheck (pull_request) Successful in 6m4s
Tests and linters / Tests with -race (pull_request) Successful in 3m58s
Tests and linters / Lint (pull_request) Successful in 11m43s
[#725] writecache: Fix metric values
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-27 12:22:29 +03:00

91 lines
2.1 KiB
Go

package writecachebadger
import (
"fmt"
"os"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/dgraph-io/badger/v4"
"go.uber.org/zap"
)
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *badger.DB
}
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
func (k internalKey) address() oid.Address {
var addr oid.Address
var cnr cid.ID
var obj oid.ID
copy(cnr[:], k[:len(cnr)])
copy(obj[:], k[len(cnr):])
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}
func addr2key(addr oid.Address) internalKey {
var key internalKey
cnr, obj := addr.Container(), addr.Object()
copy(key[:len(cnr)], cnr[:])
copy(key[len(cnr):], obj[:])
return key
}
const dbName = "small.badger"
func (c *cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil {
return err
}
c.db, err = OpenDB(filepath.Join(c.path, dbName), readOnly, c.log)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
return nil
}
func (c *cache) deleteFromDB(keys []internalKey) []internalKey {
if len(keys) == 0 {
return keys
}
wb := c.db.NewWriteBatch()
var errorIndex int
for errorIndex = range keys {
if err := wb.Delete(keys[errorIndex][:]); err != nil {
break
}
}
for i := 0; i < errorIndex; i++ {
c.decDB()
c.metrics.Evict(writecache.StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(keys[i]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
if err := wb.Flush(); err != nil {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
copy(keys, keys[errorIndex:])
return keys[:len(keys)-errorIndex]
}