diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index 63c669a35..762c8f9ef 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -1,8 +1,6 @@ package writecache import ( - "os" - common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -25,7 +23,7 @@ func init() { func inspectFunc(cmd *cobra.Command, _ []string) { var data []byte - db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) + db, err := writecache.OpenDB(vPath, true) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) defer db.Close() diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index 9c8fa6138..f03b91fd5 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -3,7 +3,6 @@ package writecache import ( "fmt" "io" - "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" @@ -31,7 +30,7 @@ func listFunc(cmd *cobra.Command, _ []string) { return err } - db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) + db, err := writecache.OpenDB(vPath, true) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) defer db.Close() diff --git a/go.mod b/go.mod index 93eef5b8c..cc2be1804 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/VictoriaMetrics/easyproto v0.1.4 github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 + github.com/cockroachdb/pebble v1.1.2 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/gdamore/tcell/v2 v2.7.4 @@ -60,11 +61,17 @@ require ( require ( git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect + github.com/DataDog/zstd v1.4.5 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect + github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect + github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect @@ -72,9 +79,11 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-fed/httpsig v1.1.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect @@ -88,6 +97,8 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/reedsolomon v1.12.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect @@ -103,11 +114,13 @@ require ( github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect github.com/nspcc-dev/rfc6979 v0.2.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.11.0 // indirect diff --git a/go.sum b/go.sum index 102501484..bb5de556c 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/local_object_storage/writecache/cachebbolt.go b/pkg/local_object_storage/writecache/cache.go similarity index 91% rename from pkg/local_object_storage/writecache/cachebbolt.go rename to pkg/local_object_storage/writecache/cache.go index cdd4ed442..2fca680bb 100644 --- a/pkg/local_object_storage/writecache/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -2,7 +2,6 @@ package writecache import ( "context" - "os" "sync" "sync/atomic" @@ -10,7 +9,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/cockroachdb/pebble" "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -34,8 +35,9 @@ type cache struct { cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup - // store contains underlying database. - store + // db contains underlying database. + db *pebble.DB + dbEditLocker *utilSync.KeyLocker[string] // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -55,10 +57,7 @@ const ( defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var ( - defaultBucket = []byte{0} - dummyCanceler context.CancelFunc = func() {} -) +var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { @@ -75,9 +74,9 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, - openFile: os.OpenFile, metrics: DefaultMetrics(), }, + dbEditLocker: utilSync.NewKeyLocker[string](), } for i := range opts { @@ -142,12 +141,12 @@ func (c *cache) Close() error { var err error if c.db != nil { err = c.db.Close() - if err != nil { + if err == nil { c.db = nil } } c.metrics.Close() - return nil + return err } func (c *cache) GetMetrics() Metrics { diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index b1a0511ee..4a7c05595 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,6 +2,7 @@ package writecache import ( "context" + "errors" "math" "time" @@ -10,7 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -46,24 +47,26 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { } saddr := addr.EncodeToString() - + c.dbEditLocker.Lock(saddr) + defer func() { + c.dbEditLocker.Unlock(saddr) + }() + dbKey := []byte(saddr) var dataSize int - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - dataSize = len(b.Get([]byte(saddr))) - return nil - }) + data, closer, err := c.db.Get(dbKey) + if err == nil { + dataSize = len(data) + err = closer.Close() + } + if err != nil && !errors.Is(err, pebble.ErrNotFound) { + return err + } if dataSize > 0 { storageType = StorageTypeDB + var recordDeleted bool - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(saddr) - recordDeleted = b.Get(key) != nil - err := b.Delete(key) - return err - }) + err := c.db.DeleteSized(dbKey, uint32(dataSize), pebble.Sync) if err != nil { return err } @@ -81,7 +84,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { } storageType = StorageTypeFSTree - _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) + _, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, storagelog.AddressField(saddr), diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 930ac8431..03ea58883 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -17,7 +17,6 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/mr-tron/base58" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -34,8 +33,6 @@ const ( defaultFlushInterval = time.Second ) -var errIterationCompleted = errors.New("iteration completed") - // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { @@ -90,36 +87,31 @@ func (c *cache) flushSmallObjects(ctx context.Context) { continue } - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() + iter, err := c.db.NewIterWithContext(ctx, nil) + if err != nil { + c.log.Warn(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Error(err)) + continue + } - var k, v []byte - - if len(lastKey) == 0 { - k, v = cs.First() + for v := iter.SeekGE(lastKey); v && len(m) < flushBatchSize; v = iter.Next() { + if bytes.Equal(iter.Key(), lastKey) { + continue + } + if len(lastKey) == len(iter.Key()) { + copy(lastKey, iter.Key()) } else { - k, v = cs.Seek(lastKey) - if bytes.Equal(k, lastKey) { - k, v = cs.Next() - } + lastKey = bytes.Clone(iter.Key()) } - for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { - if len(lastKey) == len(k) { - copy(lastKey, k) - } else { - lastKey = bytes.Clone(k) - } + m = append(m, objectInfo{ + addr: string(iter.Key()), + data: bytes.Clone(iter.Value()), + }) + } - m = append(m, objectInfo{ - addr: string(k), - data: bytes.Clone(v), - }) - } - return nil - }) + if err := iter.Close(); err != nil { + c.log.Warn(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Error(err)) + } var count int for i := range m { @@ -230,7 +222,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) { continue } - c.deleteFromDB(objInfo.addr, true) + c.deleteFromDB(objInfo.addr) } } @@ -306,7 +298,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { var last string for { - batch, err := c.readNextDBBatch(ignoreErrors, last) + batch, err := c.readNextDBBatch(ctx, ignoreErrors, last) if err != nil { return err } @@ -326,7 +318,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { return err } - c.deleteFromDB(item.address, false) + c.deleteFromDB(item.address) } last = batch[len(batch)-1].address } @@ -338,36 +330,37 @@ type batchItem struct { address string } -func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { +func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) { const batchSize = 100 var batch []batchItem - err := c.db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { + iter, err := c.db.NewIterWithContext(ctx, nil) + if err != nil { + return nil, err + } + var addr oid.Address + lastKey := []byte(last) + for v := iter.SeekGE(lastKey); v && len(batch) < flushBatchSize; v = iter.Next() { + if bytes.Equal(iter.Key(), lastKey) { + continue + } + sa := string(iter.Key()) + if err := addr.DecodeString(sa); err != nil { + c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) + if ignoreErrors { continue } - if err := addr.DecodeString(sa); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } + _ = iter.Close() + return nil, err + } + + batch = append(batch, batchItem{data: bytes.Clone(iter.Value()), address: sa}) + if len(batch) == batchSize { + break } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil } - return nil, err + if err := iter.Close(); err != nil { + return nil, err + } + return batch, nil } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index a637da45d..1570994f6 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -18,8 +18,8 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -54,22 +54,16 @@ func TestFlush(t *testing.T) { obj := testutil.GenerateObject() data, err := obj.Marshal() require.NoError(t, err) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) + require.NoError(t, c.db.Set([]byte{1, 2, 3}, data, pebble.Sync)) }, }, { Desc: "db, invalid object", InjectFn: func(t *testing.T, wc Cache) { c := wc.(*cache) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - k := []byte(oidtest.Address().EncodeToString()) - v := []byte{1, 2, 3} - return b.Put(k, v) - })) + k := []byte(oidtest.Address().EncodeToString()) + v := []byte{1, 2, 3} + require.NoError(t, c.db.Set(k, v, pebble.Sync)) }, }, { diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index bf26833bd..da0baa343 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -3,6 +3,7 @@ package writecache import ( "bytes" "context" + "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -12,7 +13,7 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -99,22 +100,18 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, // Key should be a stringified address. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. -func Get(db *bbolt.DB, key []byte) ([]byte, error) { +func Get(db *pebble.DB, key []byte) ([]byte, error) { if db == nil { return nil, ErrNotInitialized } var value []byte - err := db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket + v, closer, err := db.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, metaerr.Wrap(logicerr.Wrap(new(apistatus.ObjectNotFound))) } - value = b.Get(key) - if value == nil { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - value = bytes.Clone(value) - return nil - }) - return value, metaerr.Wrap(err) + return nil, metaerr.Wrap(err) + } + value = bytes.Clone(v) + return value, metaerr.Wrap(closer.Close()) } diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 9ec039f91..519f65689 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -6,7 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" ) // ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. @@ -18,22 +18,23 @@ var ErrNoDefaultBucket = errors.New("no default bucket") // Returns ErrNoDefaultBucket if there is no default bucket in db. // // DB must not be nil and should be opened. -func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { - return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket +func IterateDB(db *pebble.DB, f func(oid.Address) error) error { + it, err := db.NewIter(nil) + if err != nil { + return metaerr.Wrap(err) + } + for v := it.First(); v; v = it.Next() { + var addr oid.Address + err := addr.DecodeString(string(it.Key())) + if err != nil { + _ = it.Close() + return fmt.Errorf("could not parse object address: %w", err) } - var addr oid.Address - - return b.ForEach(func(k, _ []byte) error { - err := addr.DecodeString(string(k)) - if err != nil { - return fmt.Errorf("could not parse object address: %w", err) - } - - return f(addr) - }) - })) + if err := f(addr); err != nil { + _ = it.Close() + return err + } + } + return metaerr.Wrap(it.Close()) } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 44da9b36e..81fdee2ba 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -11,7 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -86,23 +86,25 @@ func (c *cache) closeDB(shrink bool) error { if err := c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) } + c.db = nil return nil } var empty bool - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - empty = b == nil || b.Stats().KeyN == 0 - return nil - }) - if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) { + it, err := c.db.NewIter(nil) + if err == nil { + empty = it.First() + err = it.Close() + } + if err != nil && !errors.Is(err, pebble.ErrClosed) { return fmt.Errorf("failed to check DB items: %w", err) } if err := c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) } + c.db = nil if empty { - err := os.Remove(filepath.Join(c.path, dbName)) + err := os.RemoveAll(filepath.Join(c.path, dbName)) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to remove DB file: %w", err) } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 980cf9303..7845c5da9 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -1,8 +1,6 @@ package writecache import ( - "io/fs" - "os" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -42,8 +40,6 @@ type options struct { noSync bool // reportError is the function called when encountering disk errors in background workers. reportError func(string, error) - // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. - openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation metrics Metrics // disableBackgroundFlush is for testing purposes only. @@ -155,13 +151,6 @@ func WithReportErrorFunc(f func(string, error)) Option { } } -// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing. -func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { - return func(o *options) { - o.openFile = f - } -} - // WithMetrics sets metrics implementation. func WithMetrics(metrics Metrics) Option { return func(o *options) { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 150399de8..03a94c761 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -2,13 +2,14 @@ package writecache import ( "context" + "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -80,28 +81,36 @@ func (c *cache) putSmall(obj objectInfo) error { return ErrOutOfSpace } - var newRecord bool - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(obj.addr) - newRecord = b.Get(key) == nil - if newRecord { - return b.Put(key, obj.data) - } - return nil - }) + c.dbEditLocker.Lock(obj.addr) + defer func() { + c.dbEditLocker.Unlock(obj.addr) + }() + newRecord := true + dbKey := []byte(obj.addr) + data, closer, err := c.db.Get(dbKey) if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(obj.addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db PUT"), - ) - if newRecord { - c.objCounters.cDB.Add(1) - c.estimateCacheSize() + newRecord = len(data) == 0 + err = closer.Close() + } + if err != nil && !errors.Is(err, pebble.ErrNotFound) { + return err + } + if newRecord { + err = c.db.Set(dbKey, obj.data, pebble.Sync) + if err != nil { + return err } } - return err + storagelog.Write(c.log, + storagelog.AddressField(obj.addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db PUT"), + ) + if newRecord { + c.objCounters.cDB.Add(1) + c.estimateCacheSize() + } + return nil } // putBig writes object to FSTree and pushes it to the flush workers queue. diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index d03f4a63e..cfc6c7540 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -6,15 +6,11 @@ import ( "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "go.etcd.io/bbolt" ) func (c *cache) estimateCacheSize() (uint64, uint64) { dbCount := c.objCounters.DB() fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file - } dbSize := dbCount * c.smallObjectSize fsSize := fsCount * c.maxObjectSize c.metrics.SetEstimateSize(dbSize, fsSize) @@ -69,15 +65,15 @@ func (x *counters) Dec() { func (c *cache) initCounters() error { var inDB uint64 - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) + it, err := c.db.NewIter(nil) if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) + return fmt.Errorf("can't create write-cache database iterator: %w", err) + } + for v := it.First(); v; v = it.Next() { + inDB++ + } + if err := it.Close(); err != nil { + return fmt.Errorf("can't close write-cache database iterator: %w", err) } c.objCounters.cDB.Store(inDB) c.estimateCacheSize() diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 57021cc17..cd0e6980d 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -2,9 +2,11 @@ package writecache import ( "context" + "errors" "fmt" "math" "os" + "path/filepath" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -14,44 +16,25 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *bbolt.DB -} - -const dbName = "small.bolt" +const dbName = "pebble" func (c *cache) openStore(mod mode.ComponentMode) error { - err := util.MkdirAllX(c.path, os.ModePerm) + err := util.MkdirAllX(filepath.Join(c.path, dbName), os.ModePerm) if err != nil { return err } - c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize) + c.db, err = OpenDB(filepath.Join(c.path, dbName), mod.ReadOnly()) if err != nil { return fmt.Errorf("could not open database: %w", err) } - c.db.MaxBatchSize = c.maxBatchSize - c.db.MaxBatchDelay = c.maxBatchDelay - - if !mod.ReadOnly() { - err = c.db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(defaultBucket) - return err - }) - if err != nil { - return fmt.Errorf("could not create default bucket: %w", err) - } - } - c.fsTree = fstree.New( - fstree.WithPath(c.path), + fstree.WithPath(filepath.Join(c.path, "fstree")), fstree.WithPerm(os.ModePerm), fstree.WithDepth(1), fstree.WithDirNameLen(1), @@ -68,38 +51,36 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDB(key string, batched bool) { - var recordDeleted bool - var err error - if batched { - err = c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } else { - err = c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) +func (c *cache) deleteFromDB(key string) { + c.dbEditLocker.Lock(key) + defer func() { + c.dbEditLocker.Unlock(key) + }() + var dataSize uint32 + dbKey := []byte(key) + data, closer, err := c.db.Get(dbKey) + if err == nil { + dataSize = uint32(len(data)) + err = closer.Close() + } + if err != nil && !errors.Is(err, pebble.ErrNotFound) { + c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + return + } + if err := c.db.DeleteSized(dbKey, dataSize, pebble.Sync); err != nil { + c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + return } - if err == nil { - c.metrics.Evict(StorageTypeDB) - storagelog.Write(c.log, - storagelog.AddressField(key), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - } else { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + c.metrics.Evict(StorageTypeDB) + storagelog.Write(c.log, + storagelog.AddressField(key), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + if dataSize > 0 { + c.objCounters.cDB.Add(math.MaxUint64) + c.estimateCacheSize() } } diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go index ad3b443f3..26bd70c67 100644 --- a/pkg/local_object_storage/writecache/util.go +++ b/pkg/local_object_storage/writecache/util.go @@ -1,21 +1,67 @@ package writecache import ( - "io/fs" - "os" - "path/filepath" - "time" + "log" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/bloom" +) + +const ( + defaultPebbleCacheSize = 1 << 30 + defaultPebbleMemTableSize = 64 << 20 + defaultMemTableStopWritesThreshold = 20 + defaultPebbleBytesPerSync = 1 << 20 + defaultPebbleMaxConcurrentCompactions = 5 + defaultPebbleMaxOpenFiles = 1000 + defaultPebbleL0CompactionThreshold = 4 + defaultPebbleL0CompactionFileThreshold = 500 + defaultPebbleL0StopWritesThreshold = 12 + defaultPebbleLBaseMaxBytes = 128 << 20 + defaultPebbleLevels = 7 + defaultPebbleL0TargetFileSize = 4 << 20 + defaultPebbleBlockSize = 4 << 10 + defaultPebbleFilterPolicy bloom.FilterPolicy = 10 ) // OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - PageSize: pageSize, - }) +func OpenDB(p string, ro bool) (*pebble.DB, error) { + opts := &pebble.Options{ + ReadOnly: ro, + FormatMajorVersion: pebble.FormatNewest, + } + opts.Logger = &noopPebbleLogger{} + opts.Cache = pebble.NewCache(defaultPebbleCacheSize) + opts.MemTableSize = defaultPebbleMemTableSize + opts.MemTableStopWritesThreshold = defaultMemTableStopWritesThreshold + opts.BytesPerSync = defaultPebbleBytesPerSync + opts.MaxConcurrentCompactions = func() int { return defaultPebbleMaxConcurrentCompactions } + opts.MaxOpenFiles = defaultPebbleMaxOpenFiles + opts.L0CompactionThreshold = defaultPebbleL0CompactionThreshold + opts.L0CompactionFileThreshold = defaultPebbleL0CompactionFileThreshold + opts.L0StopWritesThreshold = defaultPebbleL0StopWritesThreshold + opts.LBaseMaxBytes = defaultPebbleLBaseMaxBytes + opts.Levels = make([]pebble.LevelOptions, defaultPebbleLevels) + opts.Levels[0].TargetFileSize = defaultPebbleL0TargetFileSize + for i := 0; i < defaultPebbleLevels; i++ { + l := &opts.Levels[i] + l.BlockSize = defaultPebbleBlockSize + l.FilterPolicy = defaultPebbleFilterPolicy + l.FilterType = pebble.TableFilter + if i > 0 { + l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 + } + l.EnsureDefaults() + } + return pebble.Open(p, opts) } + +var _ pebble.Logger = (*noopPebbleLogger)(nil) + +type noopPebbleLogger struct{} + +func (n *noopPebbleLogger) Fatalf(format string, args ...interface{}) { + log.Fatalf(format, args...) +} + +func (n *noopPebbleLogger) Infof(string, ...interface{}) {}