forked from TrueCloudLab/frostfs-node
91 lines
2.1 KiB
Go
91 lines
2.1 KiB
Go
package writecachebadger
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/dgraph-io/badger/v4"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// store represents persistent storage with in-memory LRU cache
|
|
// for flushed items on top of it.
|
|
type store struct {
|
|
db *badger.DB
|
|
}
|
|
|
|
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
|
|
|
|
func (k internalKey) address() oid.Address {
|
|
var addr oid.Address
|
|
var cnr cid.ID
|
|
var obj oid.ID
|
|
copy(cnr[:], k[:len(cnr)])
|
|
copy(obj[:], k[len(cnr):])
|
|
addr.SetContainer(cnr)
|
|
addr.SetObject(obj)
|
|
return addr
|
|
}
|
|
|
|
func addr2key(addr oid.Address) internalKey {
|
|
var key internalKey
|
|
cnr, obj := addr.Container(), addr.Object()
|
|
copy(key[:len(cnr)], cnr[:])
|
|
copy(key[len(cnr):], obj[:])
|
|
return key
|
|
}
|
|
|
|
const dbName = "small.badger"
|
|
|
|
func (c *cache) openStore(readOnly bool) error {
|
|
err := util.MkdirAllX(c.path, os.ModePerm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.db, err = OpenDB(filepath.Join(c.path, dbName), readOnly, c.log)
|
|
if err != nil {
|
|
return fmt.Errorf("could not open database: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *cache) deleteFromDB(keys []string) []string {
|
|
if len(keys) == 0 {
|
|
return keys
|
|
}
|
|
|
|
wb := c.db.NewWriteBatch()
|
|
|
|
var errorIndex int
|
|
for errorIndex = range keys {
|
|
if err := wb.Delete([]byte(keys[errorIndex])); err != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
for i := 0; i < errorIndex; i++ {
|
|
c.objCounters.DecDB()
|
|
c.metrics.Evict(writecache.StorageTypeDB)
|
|
storagelog.Write(c.log,
|
|
storagelog.AddressField(keys[i]),
|
|
storagelog.StorageTypeField(wcStorageType),
|
|
storagelog.OpField("db DELETE"),
|
|
)
|
|
}
|
|
|
|
if err := wb.Flush(); err != nil {
|
|
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
|
}
|
|
|
|
copy(keys, keys[errorIndex:])
|
|
return keys[:len(keys)-errorIndex]
|
|
}
|