[#1335] writecache: Change DB engine to Pebble
Some checks failed
Tests and linters / Tests with -race (pull_request) Failing after 12s
Tests and linters / Run gofumpt (pull_request) Successful in 1m14s
DCO action / DCO (pull_request) Successful in 1m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m51s
Vulncheck / Vulncheck (pull_request) Successful in 1m48s
Build / Build Components (pull_request) Successful in 2m2s
Tests and linters / Tests (pull_request) Successful in 3m7s
Tests and linters / Staticcheck (pull_request) Successful in 3m9s
Tests and linters / Lint (pull_request) Successful in 3m45s
Tests and linters / gopls check (pull_request) Successful in 3m56s
Some checks failed
Tests and linters / Tests with -race (pull_request) Failing after 12s
Tests and linters / Run gofumpt (pull_request) Successful in 1m14s
DCO action / DCO (pull_request) Successful in 1m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m51s
Vulncheck / Vulncheck (pull_request) Successful in 1m48s
Build / Build Components (pull_request) Successful in 2m2s
Tests and linters / Tests (pull_request) Successful in 3m7s
Tests and linters / Staticcheck (pull_request) Successful in 3m9s
Tests and linters / Lint (pull_request) Successful in 3m45s
Tests and linters / gopls check (pull_request) Successful in 3m56s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
7768a482b5
commit
5b9928536d
16 changed files with 266 additions and 246 deletions
|
@ -1,8 +1,6 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -25,7 +23,7 @@ func init() {
|
|||
func inspectFunc(cmd *cobra.Command, _ []string) {
|
||||
var data []byte
|
||||
|
||||
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
|
||||
db, err := writecache.OpenDB(vPath, true)
|
||||
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
|
||||
defer db.Close()
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package writecache
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
|
@ -31,7 +30,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
|
|||
return err
|
||||
}
|
||||
|
||||
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
|
||||
db, err := writecache.OpenDB(vPath, true)
|
||||
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
|
||||
defer db.Close()
|
||||
|
||||
|
|
13
go.mod
13
go.mod
|
@ -17,6 +17,7 @@ require (
|
|||
github.com/VictoriaMetrics/easyproto v0.1.4
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
github.com/chzyer/readline v1.5.1
|
||||
github.com/cockroachdb/pebble v1.1.2
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/gdamore/tcell/v2 v2.7.4
|
||||
|
@ -60,11 +61,17 @@ require (
|
|||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
github.com/DataDog/zstd v1.4.5 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.13.0 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/cockroachdb/errors v1.11.3 // indirect
|
||||
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
|
||||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
|
||||
github.com/cockroachdb/redact v1.1.5 // indirect
|
||||
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
|
||||
github.com/consensys/bavard v0.1.13 // indirect
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
|
@ -72,9 +79,11 @@ require (
|
|||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/gdamore/encoding v1.0.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.27.0 // indirect
|
||||
github.com/go-fed/httpsig v1.1.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
||||
|
@ -88,6 +97,8 @@ require (
|
|||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/klauspost/reedsolomon v1.12.1 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
|
@ -103,11 +114,13 @@ require (
|
|||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/prometheus/client_model v0.5.0 // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.11.0 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -2,7 +2,6 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -10,7 +9,9 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -34,8 +35,9 @@ type cache struct {
|
|||
cancel atomic.Value
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
store
|
||||
// db contains underlying database.
|
||||
db *pebble.DB
|
||||
dbEditLocker *utilSync.KeyLocker[string]
|
||||
// fsTree contains big files stored directly on file-system.
|
||||
fsTree *fstree.FSTree
|
||||
}
|
||||
|
@ -55,10 +57,7 @@ const (
|
|||
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||||
)
|
||||
|
||||
var (
|
||||
defaultBucket = []byte{0}
|
||||
dummyCanceler context.CancelFunc = func() {}
|
||||
)
|
||||
var dummyCanceler context.CancelFunc = func() {}
|
||||
|
||||
// New creates new writecache instance.
|
||||
func New(opts ...Option) Cache {
|
||||
|
@ -75,9 +74,9 @@ func New(opts ...Option) Cache {
|
|||
maxCacheSize: defaultMaxCacheSize,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
openFile: os.OpenFile,
|
||||
metrics: DefaultMetrics(),
|
||||
},
|
||||
dbEditLocker: utilSync.NewKeyLocker[string](),
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
|
@ -142,12 +141,12 @@ func (c *cache) Close() error {
|
|||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
c.db = nil
|
||||
}
|
||||
}
|
||||
c.metrics.Close()
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *cache) GetMetrics() Metrics {
|
|
@ -2,6 +2,7 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
|
@ -10,7 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -46,24 +47,26 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
}
|
||||
|
||||
saddr := addr.EncodeToString()
|
||||
|
||||
c.dbEditLocker.Lock(saddr)
|
||||
defer func() {
|
||||
c.dbEditLocker.Unlock(saddr)
|
||||
}()
|
||||
dbKey := []byte(saddr)
|
||||
var dataSize int
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
dataSize = len(b.Get([]byte(saddr)))
|
||||
return nil
|
||||
})
|
||||
data, closer, err := c.db.Get(dbKey)
|
||||
if err == nil {
|
||||
dataSize = len(data)
|
||||
err = closer.Close()
|
||||
}
|
||||
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
if dataSize > 0 {
|
||||
storageType = StorageTypeDB
|
||||
|
||||
var recordDeleted bool
|
||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(saddr)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
err := b.Delete(key)
|
||||
return err
|
||||
})
|
||||
err := c.db.DeleteSized(dbKey, uint32(dataSize), pebble.Sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -81,7 +84,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
}
|
||||
|
||||
storageType = StorageTypeFSTree
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
_, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(saddr),
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/mr-tron/base58"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
@ -34,8 +33,6 @@ const (
|
|||
defaultFlushInterval = time.Second
|
||||
)
|
||||
|
||||
var errIterationCompleted = errors.New("iteration completed")
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||
if c.disableBackgroundFlush {
|
||||
|
@ -90,36 +87,31 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
|
||||
var k, v []byte
|
||||
|
||||
if len(lastKey) == 0 {
|
||||
k, v = cs.First()
|
||||
} else {
|
||||
k, v = cs.Seek(lastKey)
|
||||
if bytes.Equal(k, lastKey) {
|
||||
k, v = cs.Next()
|
||||
}
|
||||
iter, err := c.db.NewIterWithContext(ctx, nil)
|
||||
if err != nil {
|
||||
c.log.Warn(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
|
||||
if len(lastKey) == len(k) {
|
||||
copy(lastKey, k)
|
||||
for v := iter.SeekGE(lastKey); v && len(m) < flushBatchSize; v = iter.Next() {
|
||||
if bytes.Equal(iter.Key(), lastKey) {
|
||||
continue
|
||||
}
|
||||
if len(lastKey) == len(iter.Key()) {
|
||||
copy(lastKey, iter.Key())
|
||||
} else {
|
||||
lastKey = bytes.Clone(k)
|
||||
lastKey = bytes.Clone(iter.Key())
|
||||
}
|
||||
|
||||
m = append(m, objectInfo{
|
||||
addr: string(k),
|
||||
data: bytes.Clone(v),
|
||||
addr: string(iter.Key()),
|
||||
data: bytes.Clone(iter.Value()),
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
c.log.Warn(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Error(err))
|
||||
}
|
||||
|
||||
var count int
|
||||
for i := range m {
|
||||
|
@ -230,7 +222,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
c.deleteFromDB(objInfo.addr, true)
|
||||
c.deleteFromDB(objInfo.addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +298,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
|
||||
var last string
|
||||
for {
|
||||
batch, err := c.readNextDBBatch(ignoreErrors, last)
|
||||
batch, err := c.readNextDBBatch(ctx, ignoreErrors, last)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -326,7 +318,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
||||
return err
|
||||
}
|
||||
c.deleteFromDB(item.address, false)
|
||||
c.deleteFromDB(item.address)
|
||||
}
|
||||
last = batch[len(batch)-1].address
|
||||
}
|
||||
|
@ -338,36 +330,37 @@ type batchItem struct {
|
|||
address string
|
||||
}
|
||||
|
||||
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
|
||||
func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) {
|
||||
const batchSize = 100
|
||||
var batch []batchItem
|
||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
var addr oid.Address
|
||||
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
|
||||
sa := string(k)
|
||||
if sa == last {
|
||||
iter, err := c.db.NewIterWithContext(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var addr oid.Address
|
||||
lastKey := []byte(last)
|
||||
for v := iter.SeekGE(lastKey); v && len(batch) < flushBatchSize; v = iter.Next() {
|
||||
if bytes.Equal(iter.Key(), lastKey) {
|
||||
continue
|
||||
}
|
||||
sa := string(iter.Key())
|
||||
if err := addr.DecodeString(sa); err != nil {
|
||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||
if ignoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
|
||||
if len(batch) == batchSize {
|
||||
return errIterationCompleted
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil || errors.Is(err, errIterationCompleted) {
|
||||
return batch, nil
|
||||
}
|
||||
_ = iter.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batch = append(batch, batchItem{data: bytes.Clone(iter.Value()), address: sa})
|
||||
if len(batch) == batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := iter.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return batch, nil
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -54,22 +54,16 @@ func TestFlush(t *testing.T) {
|
|||
obj := testutil.GenerateObject()
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
return b.Put([]byte{1, 2, 3}, data)
|
||||
}))
|
||||
require.NoError(t, c.db.Set([]byte{1, 2, 3}, data, pebble.Sync))
|
||||
},
|
||||
},
|
||||
{
|
||||
Desc: "db, invalid object",
|
||||
InjectFn: func(t *testing.T, wc Cache) {
|
||||
c := wc.(*cache)
|
||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
k := []byte(oidtest.Address().EncodeToString())
|
||||
v := []byte{1, 2, 3}
|
||||
return b.Put(k, v)
|
||||
}))
|
||||
require.NoError(t, c.db.Set(k, v, pebble.Sync))
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -12,7 +13,7 @@ import (
|
|||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -99,22 +100,18 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
|
|||
// Key should be a stringified address.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
|
||||
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
||||
func Get(db *pebble.DB, key []byte) ([]byte, error) {
|
||||
if db == nil {
|
||||
return nil, ErrNotInitialized
|
||||
}
|
||||
var value []byte
|
||||
err := db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
if b == nil {
|
||||
return ErrNoDefaultBucket
|
||||
v, closer, err := db.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, metaerr.Wrap(logicerr.Wrap(new(apistatus.ObjectNotFound)))
|
||||
}
|
||||
value = b.Get(key)
|
||||
if value == nil {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return nil, metaerr.Wrap(err)
|
||||
}
|
||||
value = bytes.Clone(value)
|
||||
return nil
|
||||
})
|
||||
return value, metaerr.Wrap(err)
|
||||
value = bytes.Clone(v)
|
||||
return value, metaerr.Wrap(closer.Close())
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
|
||||
|
@ -18,22 +18,23 @@ var ErrNoDefaultBucket = errors.New("no default bucket")
|
|||
// Returns ErrNoDefaultBucket if there is no default bucket in db.
|
||||
//
|
||||
// DB must not be nil and should be opened.
|
||||
func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
|
||||
return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
if b == nil {
|
||||
return ErrNoDefaultBucket
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
||||
return b.ForEach(func(k, _ []byte) error {
|
||||
err := addr.DecodeString(string(k))
|
||||
func IterateDB(db *pebble.DB, f func(oid.Address) error) error {
|
||||
it, err := db.NewIter(nil)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
var addr oid.Address
|
||||
err := addr.DecodeString(string(it.Key()))
|
||||
if err != nil {
|
||||
_ = it.Close()
|
||||
return fmt.Errorf("could not parse object address: %w", err)
|
||||
}
|
||||
|
||||
return f(addr)
|
||||
})
|
||||
}))
|
||||
if err := f(addr); err != nil {
|
||||
_ = it.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return metaerr.Wrap(it.Close())
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -86,23 +86,25 @@ func (c *cache) closeDB(shrink bool) error {
|
|||
if err := c.db.Close(); err != nil {
|
||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||
}
|
||||
c.db = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
var empty bool
|
||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
empty = b == nil || b.Stats().KeyN == 0
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
||||
it, err := c.db.NewIter(nil)
|
||||
if err == nil {
|
||||
empty = it.First()
|
||||
err = it.Close()
|
||||
}
|
||||
if err != nil && !errors.Is(err, pebble.ErrClosed) {
|
||||
return fmt.Errorf("failed to check DB items: %w", err)
|
||||
}
|
||||
if err := c.db.Close(); err != nil {
|
||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||
}
|
||||
c.db = nil
|
||||
if empty {
|
||||
err := os.Remove(filepath.Join(c.path, dbName))
|
||||
err := os.RemoveAll(filepath.Join(c.path, dbName))
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to remove DB file: %w", err)
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -42,8 +40,6 @@ type options struct {
|
|||
noSync bool
|
||||
// reportError is the function called when encountering disk errors in background workers.
|
||||
reportError func(string, error)
|
||||
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||
// metrics is metrics implementation
|
||||
metrics Metrics
|
||||
// disableBackgroundFlush is for testing purposes only.
|
||||
|
@ -155,13 +151,6 @@ func WithReportErrorFunc(f func(string, error)) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
|
||||
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||
return func(o *options) {
|
||||
o.openFile = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics sets metrics implementation.
|
||||
func WithMetrics(metrics Metrics) Option {
|
||||
return func(o *options) {
|
||||
|
|
|
@ -2,13 +2,14 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -80,17 +81,26 @@ func (c *cache) putSmall(obj objectInfo) error {
|
|||
return ErrOutOfSpace
|
||||
}
|
||||
|
||||
var newRecord bool
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(obj.addr)
|
||||
newRecord = b.Get(key) == nil
|
||||
if newRecord {
|
||||
return b.Put(key, obj.data)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
c.dbEditLocker.Lock(obj.addr)
|
||||
defer func() {
|
||||
c.dbEditLocker.Unlock(obj.addr)
|
||||
}()
|
||||
newRecord := true
|
||||
dbKey := []byte(obj.addr)
|
||||
data, closer, err := c.db.Get(dbKey)
|
||||
if err == nil {
|
||||
newRecord = len(data) == 0
|
||||
err = closer.Close()
|
||||
}
|
||||
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
if newRecord {
|
||||
err = c.db.Set(dbKey, obj.data, pebble.Sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(obj.addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
|
@ -100,8 +110,7 @@ func (c *cache) putSmall(obj objectInfo) error {
|
|||
c.objCounters.cDB.Add(1)
|
||||
c.estimateCacheSize()
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
|
|
|
@ -6,15 +6,11 @@ import (
|
|||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
||||
dbCount := c.objCounters.DB()
|
||||
fsCount := c.objCounters.FS()
|
||||
if fsCount > 0 {
|
||||
fsCount-- // db file
|
||||
}
|
||||
dbSize := dbCount * c.smallObjectSize
|
||||
fsSize := fsCount * c.maxObjectSize
|
||||
c.metrics.SetEstimateSize(dbSize, fsSize)
|
||||
|
@ -69,15 +65,15 @@ func (x *counters) Dec() {
|
|||
|
||||
func (c *cache) initCounters() error {
|
||||
var inDB uint64
|
||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
if b != nil {
|
||||
inDB = uint64(b.Stats().KeyN)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
it, err := c.db.NewIter(nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||
return fmt.Errorf("can't create write-cache database iterator: %w", err)
|
||||
}
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
inDB++
|
||||
}
|
||||
if err := it.Close(); err != nil {
|
||||
return fmt.Errorf("can't close write-cache database iterator: %w", err)
|
||||
}
|
||||
c.objCounters.cDB.Store(inDB)
|
||||
c.estimateCacheSize()
|
||||
|
|
|
@ -2,9 +2,11 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -14,44 +16,25 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// store represents persistent storage with in-memory LRU cache
|
||||
// for flushed items on top of it.
|
||||
type store struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
const dbName = "small.bolt"
|
||||
const dbName = "pebble"
|
||||
|
||||
func (c *cache) openStore(mod mode.ComponentMode) error {
|
||||
err := util.MkdirAllX(c.path, os.ModePerm)
|
||||
err := util.MkdirAllX(filepath.Join(c.path, dbName), os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize)
|
||||
c.db, err = OpenDB(filepath.Join(c.path, dbName), mod.ReadOnly())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open database: %w", err)
|
||||
}
|
||||
|
||||
c.db.MaxBatchSize = c.maxBatchSize
|
||||
c.db.MaxBatchDelay = c.maxBatchDelay
|
||||
|
||||
if !mod.ReadOnly() {
|
||||
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(defaultBucket)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create default bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.fsTree = fstree.New(
|
||||
fstree.WithPath(c.path),
|
||||
fstree.WithPath(filepath.Join(c.path, "fstree")),
|
||||
fstree.WithPerm(os.ModePerm),
|
||||
fstree.WithDepth(1),
|
||||
fstree.WithDirNameLen(1),
|
||||
|
@ -68,39 +51,37 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDB(key string, batched bool) {
|
||||
var recordDeleted bool
|
||||
var err error
|
||||
if batched {
|
||||
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(key)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
return b.Delete(key)
|
||||
})
|
||||
} else {
|
||||
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(key)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
return b.Delete(key)
|
||||
})
|
||||
func (c *cache) deleteFromDB(key string) {
|
||||
c.dbEditLocker.Lock(key)
|
||||
defer func() {
|
||||
c.dbEditLocker.Unlock(key)
|
||||
}()
|
||||
var dataSize uint32
|
||||
dbKey := []byte(key)
|
||||
data, closer, err := c.db.Get(dbKey)
|
||||
if err == nil {
|
||||
dataSize = uint32(len(data))
|
||||
err = closer.Close()
|
||||
}
|
||||
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||
return
|
||||
}
|
||||
if err := c.db.DeleteSized(dbKey, dataSize, pebble.Sync); err != nil {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
c.metrics.Evict(StorageTypeDB)
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(key),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
if recordDeleted {
|
||||
if dataSize > 0 {
|
||||
c.objCounters.cDB.Add(math.MaxUint64)
|
||||
c.estimateCacheSize()
|
||||
}
|
||||
} else {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
|
||||
|
|
|
@ -1,21 +1,67 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
"log"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/cockroachdb/pebble/bloom"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultPebbleCacheSize = 1 << 30
|
||||
defaultPebbleMemTableSize = 64 << 20
|
||||
defaultMemTableStopWritesThreshold = 20
|
||||
defaultPebbleBytesPerSync = 1 << 20
|
||||
defaultPebbleMaxConcurrentCompactions = 5
|
||||
defaultPebbleMaxOpenFiles = 1000
|
||||
defaultPebbleL0CompactionThreshold = 4
|
||||
defaultPebbleL0CompactionFileThreshold = 500
|
||||
defaultPebbleL0StopWritesThreshold = 12
|
||||
defaultPebbleLBaseMaxBytes = 128 << 20
|
||||
defaultPebbleLevels = 7
|
||||
defaultPebbleL0TargetFileSize = 4 << 20
|
||||
defaultPebbleBlockSize = 4 << 10
|
||||
defaultPebbleFilterPolicy bloom.FilterPolicy = 10
|
||||
)
|
||||
|
||||
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
|
||||
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) {
|
||||
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
|
||||
NoFreelistSync: true,
|
||||
func OpenDB(p string, ro bool) (*pebble.DB, error) {
|
||||
opts := &pebble.Options{
|
||||
ReadOnly: ro,
|
||||
Timeout: 100 * time.Millisecond,
|
||||
OpenFile: openFile,
|
||||
PageSize: pageSize,
|
||||
})
|
||||
FormatMajorVersion: pebble.FormatNewest,
|
||||
}
|
||||
opts.Logger = &noopPebbleLogger{}
|
||||
opts.Cache = pebble.NewCache(defaultPebbleCacheSize)
|
||||
opts.MemTableSize = defaultPebbleMemTableSize
|
||||
opts.MemTableStopWritesThreshold = defaultMemTableStopWritesThreshold
|
||||
opts.BytesPerSync = defaultPebbleBytesPerSync
|
||||
opts.MaxConcurrentCompactions = func() int { return defaultPebbleMaxConcurrentCompactions }
|
||||
opts.MaxOpenFiles = defaultPebbleMaxOpenFiles
|
||||
opts.L0CompactionThreshold = defaultPebbleL0CompactionThreshold
|
||||
opts.L0CompactionFileThreshold = defaultPebbleL0CompactionFileThreshold
|
||||
opts.L0StopWritesThreshold = defaultPebbleL0StopWritesThreshold
|
||||
opts.LBaseMaxBytes = defaultPebbleLBaseMaxBytes
|
||||
opts.Levels = make([]pebble.LevelOptions, defaultPebbleLevels)
|
||||
opts.Levels[0].TargetFileSize = defaultPebbleL0TargetFileSize
|
||||
for i := 0; i < defaultPebbleLevels; i++ {
|
||||
l := &opts.Levels[i]
|
||||
l.BlockSize = defaultPebbleBlockSize
|
||||
l.FilterPolicy = defaultPebbleFilterPolicy
|
||||
l.FilterType = pebble.TableFilter
|
||||
if i > 0 {
|
||||
l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2
|
||||
}
|
||||
l.EnsureDefaults()
|
||||
}
|
||||
return pebble.Open(p, opts)
|
||||
}
|
||||
|
||||
var _ pebble.Logger = (*noopPebbleLogger)(nil)
|
||||
|
||||
type noopPebbleLogger struct{}
|
||||
|
||||
func (n *noopPebbleLogger) Fatalf(format string, args ...interface{}) {
|
||||
log.Fatalf(format, args...)
|
||||
}
|
||||
|
||||
func (n *noopPebbleLogger) Infof(string, ...interface{}) {}
|
||||
|
|
Loading…
Reference in a new issue