[#421] Try using badger for the write-cache
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
parent
65c72f3e0b
commit
1a0cb0f34a
56 changed files with 2234 additions and 747 deletions
132
pkg/local_object_storage/writecache/writecachebbolt/storage.go
Normal file
132
pkg/local_object_storage/writecache/writecachebbolt/storage.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package writecachebbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// store represents persistent storage with in-memory LRU cache
|
||||
// for flushed items on top of it.
|
||||
type store struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
const dbName = "small.bolt"
|
||||
|
||||
func (c *cache) openStore(readOnly bool) error {
|
||||
err := util.MkdirAllX(c.path, os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.db, err = OpenDB(c.path, readOnly, c.openFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open database: %w", err)
|
||||
}
|
||||
|
||||
c.db.MaxBatchSize = c.maxBatchSize
|
||||
c.db.MaxBatchDelay = c.maxBatchDelay
|
||||
|
||||
if !readOnly {
|
||||
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(defaultBucket)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create default bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.fsTree = fstree.New(
|
||||
fstree.WithPath(c.path),
|
||||
fstree.WithPerm(os.ModePerm),
|
||||
fstree.WithDepth(1),
|
||||
fstree.WithDirNameLen(1),
|
||||
fstree.WithNoSync(c.noSync))
|
||||
if err := c.fsTree.Open(readOnly); err != nil {
|
||||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDB(keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
}
|
||||
|
||||
var errorIndex int
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
for errorIndex = range keys {
|
||||
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for i := 0; i < errorIndex; i++ {
|
||||
c.objCounters.DecDB()
|
||||
c.metrics.Evict(writecache.StorageTypeDB)
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(keys[i]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||
}
|
||||
|
||||
copy(keys, keys[errorIndex:])
|
||||
return keys[:len(keys)-errorIndex]
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
}
|
||||
|
||||
var copyIndex int
|
||||
var addr oid.Address
|
||||
|
||||
for i := range keys {
|
||||
if err := addr.DecodeString(keys[i]); err != nil {
|
||||
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||
|
||||
// Save the key for the next iteration.
|
||||
keys[copyIndex] = keys[i]
|
||||
copyIndex++
|
||||
continue
|
||||
} else if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(keys[i]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
c.metrics.Evict(writecache.StorageTypeFSTree)
|
||||
c.objCounters.DecFS()
|
||||
}
|
||||
}
|
||||
|
||||
return keys[:copyIndex]
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue