From 00c8712dc6bf4e29fca92947b868c8d023ba0500 Mon Sep 17 00:00:00 2001 From: Alejandro Lopez Date: Tue, 20 Jun 2023 12:40:30 +0300 Subject: [PATCH] [#421] Create common interface for kv repositories Signed-off-by: Alejandro Lopez --- cmd/frostfs-lens/internal/writecache/list.go | 17 +-- cmd/frostfs-lens/internal/writecache/root.go | 16 ++- go.mod | 12 ++ go.sum | Bin 99899 -> 101690 bytes .../kvio/badger/badger.go | 58 ++++++++++ .../kvio/badger/cursor.go | 31 ++++++ pkg/local_object_storage/kvio/badger/tx.go | 45 ++++++++ pkg/local_object_storage/kvio/bbolt/bbolt.go | 105 ++++++++++++++++++ pkg/local_object_storage/kvio/bbolt/cursor.go | 19 ++++ pkg/local_object_storage/kvio/bbolt/tx.go | 21 ++++ pkg/local_object_storage/kvio/kvio.go | 88 +++++++++++++++ pkg/local_object_storage/kvio/util.go | 70 ++++++++++++ pkg/local_object_storage/writecache/delete.go | 18 ++- pkg/local_object_storage/writecache/flush.go | 85 +++++++------- .../writecache/flush_test.go | 12 +- pkg/local_object_storage/writecache/get.go | 29 ++--- .../writecache/iterate.go | 39 ------- pkg/local_object_storage/writecache/mode.go | 4 +- pkg/local_object_storage/writecache/put.go | 7 +- pkg/local_object_storage/writecache/state.go | 13 +-- .../writecache/storage.go | 48 ++++---- pkg/local_object_storage/writecache/util.go | 20 ---- .../writecache/writecache.go | 15 +-- 23 files changed, 569 insertions(+), 203 deletions(-) create mode 100644 pkg/local_object_storage/kvio/badger/badger.go create mode 100644 pkg/local_object_storage/kvio/badger/cursor.go create mode 100644 pkg/local_object_storage/kvio/badger/tx.go create mode 100644 pkg/local_object_storage/kvio/bbolt/bbolt.go create mode 100644 pkg/local_object_storage/kvio/bbolt/cursor.go create mode 100644 pkg/local_object_storage/kvio/bbolt/tx.go create mode 100644 pkg/local_object_storage/kvio/kvio.go create mode 100644 pkg/local_object_storage/kvio/util.go delete mode 100644 pkg/local_object_storage/writecache/iterate.go delete mode 100644 pkg/local_object_storage/writecache/util.go diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index f6d0cfff0..df9ff665a 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -5,7 +5,7 @@ import ( "io" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -25,14 +25,17 @@ func listFunc(cmd *cobra.Command, _ []string) { // other targets can be supported w := cmd.OutOrStderr() - wAddr := func(addr oid.Address) error { - _, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) - return err - } - db := openWC(cmd) defer db.Close() - err := writecache.IterateDB(db, wAddr) + err := kvio.ForEachKey(db, func(k kvio.Key) error { + var addr oid.Address + if err := addr.DecodeString(string(k)); err != nil { + return fmt.Errorf("could not parse object address: %w", err) + } + _, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) + return err + }) + common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) } diff --git a/cmd/frostfs-lens/internal/writecache/root.go b/cmd/frostfs-lens/internal/writecache/root.go index 4a1305848..1a1a92ef2 100644 --- a/cmd/frostfs-lens/internal/writecache/root.go +++ b/cmd/frostfs-lens/internal/writecache/root.go @@ -2,9 +2,11 @@ package writecache import ( "os" + "time" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" + bboltrepo "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio/bbolt" "github.com/spf13/cobra" "go.etcd.io/bbolt" ) @@ -25,9 +27,15 @@ func init() { Root.AddCommand(listCMD, inspectCMD) } -func openWC(cmd *cobra.Command) *bbolt.DB { - db, err := writecache.OpenDB(vPath, true, os.OpenFile) +func openWC(cmd *cobra.Command) kvio.Repository { + db, err := bboltrepo.Open(vPath, &bboltrepo.Options{ + Options: bbolt.Options{ + NoFreelistSync: true, + ReadOnly: true, + Timeout: 100 * time.Millisecond, + OpenFile: os.OpenFile, + }, + }) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - return db } diff --git a/go.mod b/go.mod index 0fdcb4409..ccc8a1eb7 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,17 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require ( + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.1.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/google/flatbuffers v1.12.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + go.opencensus.io v0.24.0 // indirect +) + require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect @@ -54,6 +65,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgraph-io/badger/v4 v4.1.0 github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 5da5a9a9ed7db66bf1919e0c23dc30541c154708..4905b72dc7ea07ed2f8a51e4049d788956116660 100644 GIT binary patch delta 1362 zcma)*Ppjip7{=)=M!A9r4i37yOEI)P=j6XIFll<*G`&rerfHhOERr-y(>Bfjq>adR z_HLw$3xW%mu2d?FIN!j946bG=_%YnrdKX^Dxgv+NJiqt%JiO=m=l6g5^8R<9Qde^? zF{4G@51W=b_nf3z5a$KafDLfIfFEDr%+dxfHsZ!cva?mqWiJOsGb89Jx5IFri*|+@ z6P!F$9-Vo9w#co99R|(2sZB3z1mXMymr0OcA!d%VzSSB9Juz4KrlihWLyw?@jZo6& z8mT&|j0Eovdg7;9;$&HPUVx)BJYgwbFUd@lyvXsmL=_@Q5c})7yBU!P}BE@a6u7MT;OQ9v_nhD zw@%=#Vxls_|EBJrp1q3+S&~^Ddj?sJ?DDkOjssg$TMXTc$qPjGH!9^042^Pzt3yP6 z_Db$K+A%R0e3E%_kX5*7=%_6e(<1`si|8l73Jc=^D$^ zM1|vN7YW4Dqk2KN+{fC3-fJ+o8iAZM$@}R-FNlI0g)=j&s9>!CajT64&@V`=k{q!w z@+NbLwU_s5`2OqZ 0 { + db.MaxBatchSize = opts.MaxBatchSize + } + if opts.MaxBatchDelay > 0 { + db.MaxBatchDelay = opts.MaxBatchDelay + } + + if !opts.Options.ReadOnly { + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(defaultBucket) + return err + }) + if err != nil { + return nil, fmt.Errorf("could not create default bucket: %w", err) + } + } + + return impl{db}, nil +} + +func (r impl) Read(f func(kvio.ReadOnlyTx) error) error { + return r.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b == nil { + return ErrNoDefaultBucket + } + return f(txImpl{b, tx}) + }) +} + +func (r impl) Write(f func(kvio.WriteOnlyTx) error) error { + return r.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b == nil { + return ErrNoDefaultBucket + } + return f(txImpl{b, tx}) + }) +} + +func (r impl) ReadWrite(f func(kvio.ReadWriteTx) error) error { + return r.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b == nil { + return ErrNoDefaultBucket + } + return f(txImpl{b, tx}) + }) +} + +func (r impl) Stats() (kvio.Stats, error) { + var keyCount uint64 + err := r.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + keyCount = uint64(b.Stats().KeyN) + } + return nil + }) + if err != nil { + return kvio.Stats{}, fmt.Errorf("could not read write-cache DB counter: %w", err) + } + + return kvio.Stats{ + KeyCount: keyCount, + }, nil +} + +func (r impl) Close() error { return r.db.Close() } diff --git a/pkg/local_object_storage/kvio/bbolt/cursor.go b/pkg/local_object_storage/kvio/bbolt/cursor.go new file mode 100644 index 000000000..33e3bf482 --- /dev/null +++ b/pkg/local_object_storage/kvio/bbolt/cursor.go @@ -0,0 +1,19 @@ +package bbolt + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" + "go.etcd.io/bbolt" +) + +type cursor struct { + cur *bbolt.Cursor + key []byte + val []byte +} + +func (c *cursor) Key() kvio.Key { return c.key } +func (c *cursor) Rewind() { c.key, c.val = c.cur.First() } +func (c *cursor) Seek(k kvio.Key) { c.key, c.val = c.cur.Seek(k) } +func (c *cursor) Next() { c.key, c.val = c.cur.Next() } +func (c *cursor) Close() {} +func (c *cursor) Value(f func(kvio.Value) error) error { return f(c.val) } diff --git a/pkg/local_object_storage/kvio/bbolt/tx.go b/pkg/local_object_storage/kvio/bbolt/tx.go new file mode 100644 index 000000000..83ca6734b --- /dev/null +++ b/pkg/local_object_storage/kvio/bbolt/tx.go @@ -0,0 +1,21 @@ +package bbolt + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" + "go.etcd.io/bbolt" +) + +type txImpl struct { + b *bbolt.Bucket + tx *bbolt.Tx +} + +func (tx txImpl) Read(k kvio.Key, f func(kvio.Value) error) error { return f(tx.b.Get(k)) } +func (tx txImpl) Write(k kvio.Key, v kvio.Value) error { return tx.b.Put(k, v) } +func (tx txImpl) Delete(k kvio.Key) error { return tx.b.Delete(k) } +func (tx txImpl) IterateKeys() kvio.KeyCursor { return tx.Iterate() } +func (tx txImpl) Iterate() kvio.Cursor { + cur := tx.b.Cursor() + k0, v0 := cur.First() + return &cursor{cur, k0, v0} +} diff --git a/pkg/local_object_storage/kvio/kvio.go b/pkg/local_object_storage/kvio/kvio.go new file mode 100644 index 000000000..3459ba396 --- /dev/null +++ b/pkg/local_object_storage/kvio/kvio.go @@ -0,0 +1,88 @@ +// Package kvio provides basic interfaces to key-value I/O primitives. +// It serves as a wrapper for exisiting implementations such as those +// provided by popular key-value stores (bbolt, badger, leveldb, etc.) +// and can serve to compose them as well. +package kvio + +type ( + Key []byte + Value []byte +) + +// Writer is the interface that wraps the Write method for a key-value entry. +type Writer interface { + Write(Key, Value) error +} + +// Reader is the interface that wraps the Read method for a key-value entry. +type Reader interface { + Read(Key, func(Value) error) error +} + +// Deleter is the interface that wraps the Delete method for a key-value entry. +type Deleter interface { + Delete(Key) error +} + +// KeyCursor is the interface of cursors that can iterate over a set of keys. +type KeyCursor interface { + // Key returns the key of the current entry, or nil if at the end of the cursor. + Key() Key + // Rewind rewinds the cursor to the first key in lexicographical order. + Rewind() + // Seek moves the cursor to the smallest key that is equal or greater than the + // provided key in lexicographical order. + Seek(Key) + // Next moves the cursor to the next key in lexicographical order. + Next() + // Close frees the resources allocated by the cursor and invalidates it. + Close() +} + +// Cursor is the interface of cursors that can iterate over a set of key-value entries. +type Cursor interface { + KeyCursor + // Value calls the given function with the value of the current entry, or nil if + // at the end of the cursor. + Value(func(Value) error) error +} + +// Iterator is the interface which provides cursors over sets of key-value entries. +type Iterator interface { + // IterateKeys provides a cursor to iterate the key set in lexicographical order. + IterateKeys() KeyCursor + // IterateKeys provides a cursor to iterate the key-value set in lexicographical order of keys. + Iterate() Cursor +} + +// ReadOnlyTx is the interface of read-only transactions for key-value repositories. +type ReadOnlyTx interface { + Reader + Iterator +} + +// WriteOnlyTx is the interface of write-only transactions for key-value repositories. +type WriteOnlyTx interface { + Writer + Deleter +} + +// ReadWriteTx is the interface of read-write transactions for key-value repositories. +type ReadWriteTx interface { + ReadOnlyTx + WriteOnlyTx +} + +// Stats are the common statistics for key-value repositories. +type Stats struct { + KeyCount uint64 +} + +// Repository is the interface of key-value repositories. +type Repository interface { + Read(func(ReadOnlyTx) error) error + Write(func(WriteOnlyTx) error) error + ReadWrite(func(ReadWriteTx) error) error + Stats() (Stats, error) + Close() error +} diff --git a/pkg/local_object_storage/kvio/util.go b/pkg/local_object_storage/kvio/util.go new file mode 100644 index 000000000..79f320bde --- /dev/null +++ b/pkg/local_object_storage/kvio/util.go @@ -0,0 +1,70 @@ +package kvio + +import "github.com/nspcc-dev/neo-go/pkg/util/slice" + +// ForEachKey calls the given function for each key in the given repository in lexicographical order. +func ForEachKey(repo Repository, f func(Key) error) error { + return repo.Read(func(tx ReadOnlyTx) error { + cur := tx.IterateKeys() + defer cur.Close() + for ; cur.Key() != nil; cur.Next() { + if err := f(cur.Key()); err != nil { + return err + } + } + return nil + }) +} + +// ForEach calls the given function for each key-value entry in the given repository +// in lexicographical order of the keys. +func ForEach(repo Repository, f func(Key, Value) error) error { + return repo.Read(func(tx ReadOnlyTx) error { + cur := tx.Iterate() + defer cur.Close() + for ; cur.Key() != nil; cur.Next() { + var val Value + if err := cur.Value(func(v Value) error { + val = v + return nil + }); err != nil { + return err + } + if err := f(cur.Key(), val); err != nil { + return err + } + } + return nil + }) +} + +// Write writes a single key-value entry to the given repository. +// This is a convenience method that takes care of creating the appropriate transaction. +func Write(repo Repository, k Key, v Value) error { + return repo.Write(func(tx WriteOnlyTx) error { + return tx.Write(k, v) + }) +} + +// Read reads a single key-value entry from the given repository. +// This is a convenience method that takes care of creating the appropriate transaction. +func Read(repo Repository, k Key) (Value, error) { + var ret Value + err := repo.Read(func(tx ReadOnlyTx) error { + return tx.Read(k, func(v Value) error { + if v != nil { + ret = slice.Copy(v) + } + return nil + }) + }) + return ret, err +} + +// Deletes deletes a single key-value entry from the given repository. +// This is a convenience method that takes care of creating the appropriate transaction. +func Delete(repo Repository, k Key) error { + return repo.Write(func(tx WriteOnlyTx) error { + return tx.Delete(k) + }) +} diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index aeab88b0b..52b3c20bb 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -7,9 +7,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -40,20 +40,16 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { saddr := addr.EncodeToString() var dataSize int - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - dataSize = len(b.Get([]byte(saddr))) - return nil + _ = c.repo.Read(func(tx kvio.ReadOnlyTx) error { + return tx.Read([]byte(saddr), func(v kvio.Value) error { + dataSize = len(v) + return nil + }) }) if dataSize > 0 { storageType = StorageTypeDB - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - err := b.Delete([]byte(saddr)) - return err - }) - if err != nil { + if err := kvio.Delete(c.repo, []byte(saddr)); err != nil { return err } storagelog.Write(c.log, diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 21f968b04..ea65ef694 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -11,13 +11,13 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/mr-tron/base58" "github.com/nspcc-dev/neo-go/pkg/util/slice" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -97,32 +97,33 @@ func (c *cache) flushSmallObjects() { } // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() + _ = c.repo.Read(func(tx kvio.ReadOnlyTx) error { + cur := tx.Iterate() + defer cur.Close() - var k, v []byte - - if len(lastKey) == 0 { - k, v = cs.First() - } else { - k, v = cs.Seek(lastKey) - if bytes.Equal(k, lastKey) { - k, v = cs.Next() + if len(lastKey) != 0 { + cur.Seek(lastKey) + if bytes.Equal(cur.Key(), lastKey) { + cur.Next() } } - for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { - if len(lastKey) == len(k) { - copy(lastKey, k) + for ; cur.Key() != nil && len(m) < flushBatchSize; cur.Next() { + if len(lastKey) == len(cur.Key()) { + copy(lastKey, cur.Key()) } else { - lastKey = slice.Copy(k) + lastKey = slice.Copy(cur.Key()) } - m = append(m, objectInfo{ - addr: string(k), - data: slice.Copy(v), - }) + if err := cur.Value(func(v kvio.Value) error { + m = append(m, objectInfo{ + addr: string(cur.Key()), + data: slice.Copy(v), + }) + return nil + }); err != nil { + return err + } } return nil }) @@ -307,34 +308,26 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - return c.db.View(func(tx *bbolt.Tx) error { + return kvio.ForEach(c.repo, func(k kvio.Key, v kvio.Value) error { var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { - sa := string(k) - if err := addr.DecodeString(sa); err != nil { - c.reportFlushError("can't decode object address from the DB", sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - var obj object.Object - if err := obj.Unmarshal(data); err != nil { - c.reportFlushError("can't unmarshal an object from the DB", sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil { - return err + sa := string(k) + if err := addr.DecodeString(sa); err != nil { + c.reportFlushError("can't decode object address from the DB", sa, metaerr.Wrap(err)) + if ignoreErrors { + return nil } + return err } - return nil + + var obj object.Object + if err := obj.Unmarshal(v); err != nil { + c.reportFlushError("can't unmarshal an object from the DB", sa, metaerr.Wrap(err)) + if ignoreErrors { + return nil + } + return err + } + + return c.flushObject(ctx, &obj, v, StorageTypeDB) }) } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index e8224ce5d..9ee2a059d 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -22,7 +23,6 @@ import ( usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" "go.uber.org/zap/zaptest" ) @@ -151,17 +151,15 @@ func TestFlush(t *testing.T) { t.Run("db, invalid address", func(t *testing.T) { testIgnoreErrors(t, func(c *cache) { _, data := newObject(t, 1) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) + require.NoError(t, c.repo.Write(func(tx kvio.WriteOnlyTx) error { + return tx.Write([]byte{1, 2, 3}, data) })) }) }) t.Run("db, invalid object", func(t *testing.T) { testIgnoreErrors(t, func(c *cache) { - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) + require.NoError(t, c.repo.Write(func(tx kvio.WriteOnlyTx) error { + return tx.Write([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) })) }) }) diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 2546bada9..0003c6ea3 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -6,13 +6,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/nspcc-dev/neo-go/pkg/util/slice" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -41,7 +40,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) c.metrics.Get(time.Since(startedAt), found, storageType) }() - value, err := Get(c.db, []byte(saddr)) + value, err := Get(c.repo, []byte(saddr)) if err == nil { obj := objectSDK.New() found = true @@ -83,19 +82,13 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, // Key should be a stringified address. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. -func Get(db *bbolt.DB, key []byte) ([]byte, error) { - var value []byte - err := db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } - value = b.Get(key) - if value == nil { - return logicerr.Wrap(apistatus.ObjectNotFound{}) - } - value = slice.Copy(value) - return nil - }) - return value, metaerr.Wrap(err) +func Get(repo kvio.Repository, key []byte) ([]byte, error) { + value, err := kvio.Read(repo, key) + if err != nil { + return nil, metaerr.Wrap(err) + } + if value == nil { + return nil, metaerr.Wrap(logicerr.Wrap(apistatus.ObjectNotFound{})) + } + return value, nil } diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go deleted file mode 100644 index 5349c069c..000000000 --- a/pkg/local_object_storage/writecache/iterate.go +++ /dev/null @@ -1,39 +0,0 @@ -package writecache - -import ( - "errors" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" -) - -// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. -var ErrNoDefaultBucket = errors.New("no default bucket") - -// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. -// It is assumed that db is an underlying database of some WriteCache instance. -// -// Returns ErrNoDefaultBucket if there is no default bucket in db. -// -// DB must not be nil and should be opened. -func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { - return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } - - var addr oid.Address - - return b.ForEach(func(k, v []byte) error { - err := addr.DecodeString(string(k)) - if err != nil { - return fmt.Errorf("could not parse object address: %w", err) - } - - return f(addr) - }) - })) -} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index bdbbec7c9..9c9c54cad 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -48,8 +48,8 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error { } } - if c.db != nil { - if err = c.db.Close(); err != nil { + if c.repo != nil { + if err = c.repo.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) } } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 619b2bd26..92babd5f2 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,8 +8,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -84,10 +84,7 @@ func (c *cache) putSmall(obj objectInfo) error { return ErrOutOfSpace } - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte(obj.addr), obj.data) - }) + err := kvio.Write(c.repo, []byte(obj.addr), obj.data) if err == nil { storagelog.Write(c.log, storagelog.AddressField(obj.addr), diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 14103e626..5023ceabf 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -4,8 +4,6 @@ import ( "fmt" "math" "sync/atomic" - - "go.etcd.io/bbolt" ) func (c *cache) estimateCacheSize() uint64 { @@ -52,18 +50,13 @@ func (x *counters) FS() uint64 { } func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) + stats, err := c.repo.Stats() if err != nil { return fmt.Errorf("could not read write-cache DB counter: %w", err) } + inDB := stats.KeyCount + inFS, err := c.fsTree.NumberOfObjects() if err != nil { return fmt.Errorf("could not read write-cache FS counter: %w", err) diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 3bd3813d1..e6999e696 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -5,50 +5,49 @@ import ( "errors" "fmt" "os" + "path/filepath" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" + bboltrepo "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio/bbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *bbolt.DB -} - -const dbName = "small.bolt" - func (c *cache) openStore(readOnly bool) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { return err } - c.db, err = OpenDB(c.path, readOnly, c.openFile) + // It's possible to use other Repository implementations here, such as the badger-backed one: + // + // opts := badger.DefaultOptions(filepath.Join(c.path, "small.badger")).WithReadOnly(readOnly) + // c.repo, err = badgerrepo.Open(opts) + + c.repo, err = bboltrepo.Open(filepath.Join(c.path, "small.bolt"), &bboltrepo.Options{ + Options: bbolt.Options{ + NoFreelistSync: true, + ReadOnly: readOnly, + Timeout: 100 * time.Millisecond, + OpenFile: c.openFile, + }, + MaxBatchSize: c.maxBatchSize, + MaxBatchDelay: c.maxBatchDelay, + }) + if err != nil { return fmt.Errorf("could not open database: %w", err) } - c.db.MaxBatchSize = c.maxBatchSize - c.db.MaxBatchDelay = c.maxBatchDelay - - if !readOnly { - err = c.db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(defaultBucket) - return err - }) - if err != nil { - return fmt.Errorf("could not create default bucket: %w", err) - } - } - c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), @@ -68,10 +67,9 @@ func (c *cache) deleteFromDB(keys []string) []string { } var errorIndex int - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) + err := c.repo.Write(func(tx kvio.WriteOnlyTx) error { for errorIndex = range keys { - if err := b.Delete([]byte(keys[errorIndex])); err != nil { + if err := tx.Delete([]byte(keys[errorIndex])); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go deleted file mode 100644 index 0ed4a954e..000000000 --- a/pkg/local_object_storage/writecache/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package writecache - -import ( - "io/fs" - "os" - "path/filepath" - "time" - - "go.etcd.io/bbolt" -) - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 962c9c39a..c101b79b2 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -63,8 +64,8 @@ type cache struct { closeCh chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup - // store contains underlying database. - store + // repo is the underlying key/value repository. + repo kvio.Repository // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -84,10 +85,6 @@ const ( defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var ( - defaultBucket = []byte{0} -) - // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ @@ -165,10 +162,10 @@ func (c *cache) Close() error { c.closeCh = nil var err error - if c.db != nil { - err = c.db.Close() + if c.repo != nil { + err = c.repo.Close() if err != nil { - c.db = nil + c.repo = nil } } c.metrics.Close()