[#421] Create common interface for kv repositories
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-06-20 12:40:30 +03:00
parent 50caa388b0
commit 00c8712dc6
23 changed files with 569 additions and 203 deletions

View file

@ -5,7 +5,7 @@ import (
"io"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
@ -25,14 +25,17 @@ func listFunc(cmd *cobra.Command, _ []string) {
// other targets can be supported
w := cmd.OutOrStderr()
wAddr := func(addr oid.Address) error {
_, err := io.WriteString(w, fmt.Sprintf("%s\n", addr))
return err
}
db := openWC(cmd)
defer db.Close()
err := writecache.IterateDB(db, wAddr)
err := kvio.ForEachKey(db, func(k kvio.Key) error {
var addr oid.Address
if err := addr.DecodeString(string(k)); err != nil {
return fmt.Errorf("could not parse object address: %w", err)
}
_, err := io.WriteString(w, fmt.Sprintf("%s\n", addr))
return err
})
common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err))
}

View file

@ -2,9 +2,11 @@ package writecache
import (
"os"
"time"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
bboltrepo "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio/bbolt"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
)
@ -25,9 +27,15 @@ func init() {
Root.AddCommand(listCMD, inspectCMD)
}
func openWC(cmd *cobra.Command) *bbolt.DB {
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
func openWC(cmd *cobra.Command) kvio.Repository {
db, err := bboltrepo.Open(vPath, &bboltrepo.Options{
Options: bbolt.Options{
NoFreelistSync: true,
ReadOnly: true,
Timeout: 100 * time.Millisecond,
OpenFile: os.OpenFile,
},
})
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
return db
}

12
go.mod
View file

@ -41,6 +41,17 @@ require (
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opencensus.io v0.24.0 // indirect
)
require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
@ -54,6 +65,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/badger/v4 v4.1.0
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -0,0 +1,58 @@
// Package badger provides a kvio.Repository backed by badger.
package badger
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
badger "github.com/dgraph-io/badger/v4"
)
type impl struct {
db *badger.DB
}
func Open(opts badger.Options) (kvio.Repository, error) {
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("could not open database: %w", err)
}
return impl{db}, nil
}
func (r impl) Read(f func(kvio.ReadOnlyTx) error) error {
return r.db.View(func(tx *badger.Txn) error {
return f(txImpl{tx})
})
}
func (r impl) Write(f func(kvio.WriteOnlyTx) error) error {
wb := r.db.NewWriteBatch()
defer wb.Cancel()
if err := f(writeBatchTx{wb}); err != nil {
return err
}
return wb.Flush()
}
func (r impl) ReadWrite(f func(kvio.ReadWriteTx) error) error {
return r.db.Update(func(tx *badger.Txn) error {
return f(txImpl{tx})
})
}
func (r impl) Stats() (kvio.Stats, error) {
var stats kvio.Stats
err := r.Read(func(tx kvio.ReadOnlyTx) error {
cur := tx.IterateKeys()
defer cur.Close()
for ; cur.Key() != nil; cur.Next() {
stats.KeyCount++
}
return nil
})
return stats, err
}
func (r impl) Close() error { return r.db.Close() }

View file

@ -0,0 +1,31 @@
package badger
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
badger "github.com/dgraph-io/badger/v4"
)
type cursor struct {
it *badger.Iterator
}
func (c cursor) Key() kvio.Key {
if c.it.Valid() {
return c.it.Item().Key()
}
return nil
}
func (c cursor) Rewind() { c.it.Rewind() }
func (c cursor) Seek(k kvio.Key) { c.it.Seek(k) }
func (c cursor) Next() { c.it.Next() }
func (c cursor) Close() { c.it.Close() }
func (c cursor) Value(f func(kvio.Value) error) error {
if c.it.Valid() {
return c.it.Item().Value(func(val []byte) error {
return f(val)
})
}
return f(nil)
}

View file

@ -0,0 +1,45 @@
package badger
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
badger "github.com/dgraph-io/badger/v4"
)
type txImpl struct {
tx *badger.Txn
}
func (tx txImpl) Read(k kvio.Key, f func(kvio.Value) error) error {
it, err := tx.tx.Get(k)
if err != nil {
if err == badger.ErrKeyNotFound {
return f(nil)
}
return err
}
return it.Value(func(val []byte) error { return f(val) })
}
func (tx txImpl) Write(k kvio.Key, v kvio.Value) error { return tx.tx.Set(k, v) }
func (tx txImpl) Delete(k kvio.Key) error { return tx.tx.Delete(k) }
func (tx txImpl) IterateKeys() kvio.KeyCursor {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
cur := tx.tx.NewIterator(opts)
cur.Rewind()
return cursor{cur}
}
func (tx txImpl) Iterate() kvio.Cursor {
cur := tx.tx.NewIterator(badger.DefaultIteratorOptions)
cur.Rewind()
return cursor{cur}
}
type writeBatchTx struct {
wb *badger.WriteBatch
}
func (tx writeBatchTx) Write(k kvio.Key, v kvio.Value) error { return tx.wb.Set(k, v) }
func (tx writeBatchTx) Delete(k kvio.Key) error { return tx.wb.Delete(k) }

View file

@ -0,0 +1,105 @@
// Package bbolt provides a kvio.Repository backed by bbolt.
package bbolt
import (
"errors"
"fmt"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"go.etcd.io/bbolt"
)
type impl struct {
db *bbolt.DB
}
type Options struct {
bbolt.Options
MaxBatchSize int
MaxBatchDelay time.Duration
}
var (
defaultBucket = []byte{0}
// ErrNoDefaultBucket is returned when the default bucket for objects is missing.
ErrNoDefaultBucket = errors.New("no default bucket")
)
func Open(path string, opts *Options) (kvio.Repository, error) {
db, err := bbolt.Open(path, os.ModePerm, &opts.Options)
if err != nil {
return nil, fmt.Errorf("could not open database: %w", err)
}
if opts.MaxBatchSize > 0 {
db.MaxBatchSize = opts.MaxBatchSize
}
if opts.MaxBatchDelay > 0 {
db.MaxBatchDelay = opts.MaxBatchDelay
}
if !opts.Options.ReadOnly {
err = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return nil, fmt.Errorf("could not create default bucket: %w", err)
}
}
return impl{db}, nil
}
func (r impl) Read(f func(kvio.ReadOnlyTx) error) error {
return r.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
return f(txImpl{b, tx})
})
}
func (r impl) Write(f func(kvio.WriteOnlyTx) error) error {
return r.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
return f(txImpl{b, tx})
})
}
func (r impl) ReadWrite(f func(kvio.ReadWriteTx) error) error {
return r.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
return f(txImpl{b, tx})
})
}
func (r impl) Stats() (kvio.Stats, error) {
var keyCount uint64
err := r.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
keyCount = uint64(b.Stats().KeyN)
}
return nil
})
if err != nil {
return kvio.Stats{}, fmt.Errorf("could not read write-cache DB counter: %w", err)
}
return kvio.Stats{
KeyCount: keyCount,
}, nil
}
func (r impl) Close() error { return r.db.Close() }

View file

@ -0,0 +1,19 @@
package bbolt
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"go.etcd.io/bbolt"
)
type cursor struct {
cur *bbolt.Cursor
key []byte
val []byte
}
func (c *cursor) Key() kvio.Key { return c.key }
func (c *cursor) Rewind() { c.key, c.val = c.cur.First() }
func (c *cursor) Seek(k kvio.Key) { c.key, c.val = c.cur.Seek(k) }
func (c *cursor) Next() { c.key, c.val = c.cur.Next() }
func (c *cursor) Close() {}
func (c *cursor) Value(f func(kvio.Value) error) error { return f(c.val) }

View file

@ -0,0 +1,21 @@
package bbolt
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"go.etcd.io/bbolt"
)
type txImpl struct {
b *bbolt.Bucket
tx *bbolt.Tx
}
func (tx txImpl) Read(k kvio.Key, f func(kvio.Value) error) error { return f(tx.b.Get(k)) }
func (tx txImpl) Write(k kvio.Key, v kvio.Value) error { return tx.b.Put(k, v) }
func (tx txImpl) Delete(k kvio.Key) error { return tx.b.Delete(k) }
func (tx txImpl) IterateKeys() kvio.KeyCursor { return tx.Iterate() }
func (tx txImpl) Iterate() kvio.Cursor {
cur := tx.b.Cursor()
k0, v0 := cur.First()
return &cursor{cur, k0, v0}
}

View file

@ -0,0 +1,88 @@
// Package kvio provides basic interfaces to key-value I/O primitives.
// It serves as a wrapper for exisiting implementations such as those
// provided by popular key-value stores (bbolt, badger, leveldb, etc.)
// and can serve to compose them as well.
package kvio
type (
Key []byte
Value []byte
)
// Writer is the interface that wraps the Write method for a key-value entry.
type Writer interface {
Write(Key, Value) error
}
// Reader is the interface that wraps the Read method for a key-value entry.
type Reader interface {
Read(Key, func(Value) error) error
}
// Deleter is the interface that wraps the Delete method for a key-value entry.
type Deleter interface {
Delete(Key) error
}
// KeyCursor is the interface of cursors that can iterate over a set of keys.
type KeyCursor interface {
// Key returns the key of the current entry, or nil if at the end of the cursor.
Key() Key
// Rewind rewinds the cursor to the first key in lexicographical order.
Rewind()
// Seek moves the cursor to the smallest key that is equal or greater than the
// provided key in lexicographical order.
Seek(Key)
// Next moves the cursor to the next key in lexicographical order.
Next()
// Close frees the resources allocated by the cursor and invalidates it.
Close()
}
// Cursor is the interface of cursors that can iterate over a set of key-value entries.
type Cursor interface {
KeyCursor
// Value calls the given function with the value of the current entry, or nil if
// at the end of the cursor.
Value(func(Value) error) error
}
// Iterator is the interface which provides cursors over sets of key-value entries.
type Iterator interface {
// IterateKeys provides a cursor to iterate the key set in lexicographical order.
IterateKeys() KeyCursor
// IterateKeys provides a cursor to iterate the key-value set in lexicographical order of keys.
Iterate() Cursor
}
// ReadOnlyTx is the interface of read-only transactions for key-value repositories.
type ReadOnlyTx interface {
Reader
Iterator
}
// WriteOnlyTx is the interface of write-only transactions for key-value repositories.
type WriteOnlyTx interface {
Writer
Deleter
}
// ReadWriteTx is the interface of read-write transactions for key-value repositories.
type ReadWriteTx interface {
ReadOnlyTx
WriteOnlyTx
}
// Stats are the common statistics for key-value repositories.
type Stats struct {
KeyCount uint64
}
// Repository is the interface of key-value repositories.
type Repository interface {
Read(func(ReadOnlyTx) error) error
Write(func(WriteOnlyTx) error) error
ReadWrite(func(ReadWriteTx) error) error
Stats() (Stats, error)
Close() error
}

View file

@ -0,0 +1,70 @@
package kvio
import "github.com/nspcc-dev/neo-go/pkg/util/slice"
// ForEachKey calls the given function for each key in the given repository in lexicographical order.
func ForEachKey(repo Repository, f func(Key) error) error {
return repo.Read(func(tx ReadOnlyTx) error {
cur := tx.IterateKeys()
defer cur.Close()
for ; cur.Key() != nil; cur.Next() {
if err := f(cur.Key()); err != nil {
return err
}
}
return nil
})
}
// ForEach calls the given function for each key-value entry in the given repository
// in lexicographical order of the keys.
func ForEach(repo Repository, f func(Key, Value) error) error {
return repo.Read(func(tx ReadOnlyTx) error {
cur := tx.Iterate()
defer cur.Close()
for ; cur.Key() != nil; cur.Next() {
var val Value
if err := cur.Value(func(v Value) error {
val = v
return nil
}); err != nil {
return err
}
if err := f(cur.Key(), val); err != nil {
return err
}
}
return nil
})
}
// Write writes a single key-value entry to the given repository.
// This is a convenience method that takes care of creating the appropriate transaction.
func Write(repo Repository, k Key, v Value) error {
return repo.Write(func(tx WriteOnlyTx) error {
return tx.Write(k, v)
})
}
// Read reads a single key-value entry from the given repository.
// This is a convenience method that takes care of creating the appropriate transaction.
func Read(repo Repository, k Key) (Value, error) {
var ret Value
err := repo.Read(func(tx ReadOnlyTx) error {
return tx.Read(k, func(v Value) error {
if v != nil {
ret = slice.Copy(v)
}
return nil
})
})
return ret, err
}
// Deletes deletes a single key-value entry from the given repository.
// This is a convenience method that takes care of creating the appropriate transaction.
func Delete(repo Repository, k Key) error {
return repo.Write(func(tx WriteOnlyTx) error {
return tx.Delete(k)
})
}

View file

@ -7,9 +7,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -40,20 +40,16 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString()
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
dataSize = len(b.Get([]byte(saddr)))
return nil
_ = c.repo.Read(func(tx kvio.ReadOnlyTx) error {
return tx.Read([]byte(saddr), func(v kvio.Value) error {
dataSize = len(v)
return nil
})
})
if dataSize > 0 {
storageType = StorageTypeDB
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
err := b.Delete([]byte(saddr))
return err
})
if err != nil {
if err := kvio.Delete(c.repo, []byte(saddr)); err != nil {
return err
}
storagelog.Write(c.log,

View file

@ -11,13 +11,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -97,32 +97,33 @@ func (c *cache) flushSmallObjects() {
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
_ = c.repo.Read(func(tx kvio.ReadOnlyTx) error {
cur := tx.Iterate()
defer cur.Close()
var k, v []byte
if len(lastKey) == 0 {
k, v = cs.First()
} else {
k, v = cs.Seek(lastKey)
if bytes.Equal(k, lastKey) {
k, v = cs.Next()
if len(lastKey) != 0 {
cur.Seek(lastKey)
if bytes.Equal(cur.Key(), lastKey) {
cur.Next()
}
}
for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if len(lastKey) == len(k) {
copy(lastKey, k)
for ; cur.Key() != nil && len(m) < flushBatchSize; cur.Next() {
if len(lastKey) == len(cur.Key()) {
copy(lastKey, cur.Key())
} else {
lastKey = slice.Copy(k)
lastKey = slice.Copy(cur.Key())
}
m = append(m, objectInfo{
addr: string(k),
data: slice.Copy(v),
})
if err := cur.Value(func(v kvio.Value) error {
m = append(m, objectInfo{
addr: string(cur.Key()),
data: slice.Copy(v),
})
return nil
}); err != nil {
return err
}
}
return nil
})
@ -307,34 +308,26 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
return c.db.View(func(tx *bbolt.Tx) error {
return kvio.ForEach(c.repo, func(k kvio.Key, v kvio.Value) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
var obj object.Object
if err := obj.Unmarshal(data); err != nil {
c.reportFlushError("can't unmarshal an object from the DB", sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
return err
sa := string(k)
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, metaerr.Wrap(err))
if ignoreErrors {
return nil
}
return err
}
return nil
var obj object.Object
if err := obj.Unmarshal(v); err != nil {
c.reportFlushError("can't unmarshal an object from the DB", sa, metaerr.Wrap(err))
if ignoreErrors {
return nil
}
return err
}
return c.flushObject(ctx, &obj, v, StorageTypeDB)
})
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,7 +23,6 @@ import (
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap/zaptest"
)
@ -151,17 +151,15 @@ func TestFlush(t *testing.T) {
t.Run("db, invalid address", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) {
_, data := newObject(t, 1)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
require.NoError(t, c.repo.Write(func(tx kvio.WriteOnlyTx) error {
return tx.Write([]byte{1, 2, 3}, data)
}))
})
})
t.Run("db, invalid object", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) {
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
require.NoError(t, c.repo.Write(func(tx kvio.WriteOnlyTx) error {
return tx.Write([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
}))
})
})

View file

@ -6,13 +6,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -41,7 +40,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
c.metrics.Get(time.Since(startedAt), found, storageType)
}()
value, err := Get(c.db, []byte(saddr))
value, err := Get(c.repo, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
@ -83,19 +82,13 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
var value []byte
err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
value = b.Get(key)
if value == nil {
return logicerr.Wrap(apistatus.ObjectNotFound{})
}
value = slice.Copy(value)
return nil
})
return value, metaerr.Wrap(err)
func Get(repo kvio.Repository, key []byte) ([]byte, error) {
value, err := kvio.Read(repo, key)
if err != nil {
return nil, metaerr.Wrap(err)
}
if value == nil {
return nil, metaerr.Wrap(logicerr.Wrap(apistatus.ObjectNotFound{}))
}
return value, nil
}

View file

@ -1,39 +0,0 @@
package writecache
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
var ErrNoDefaultBucket = errors.New("no default bucket")
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
// It is assumed that db is an underlying database of some WriteCache instance.
//
// Returns ErrNoDefaultBucket if there is no default bucket in db.
//
// DB must not be nil and should be opened.
func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
var addr oid.Address
return b.ForEach(func(k, v []byte) error {
err := addr.DecodeString(string(k))
if err != nil {
return fmt.Errorf("could not parse object address: %w", err)
}
return f(addr)
})
}))
}

View file

@ -48,8 +48,8 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
}
}
if c.db != nil {
if err = c.db.Close(); err != nil {
if c.repo != nil {
if err = c.repo.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}

View file

@ -8,8 +8,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -84,10 +84,7 @@ func (c *cache) putSmall(obj objectInfo) error {
return ErrOutOfSpace
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte(obj.addr), obj.data)
})
err := kvio.Write(c.repo, []byte(obj.addr), obj.data)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(obj.addr),

View file

@ -4,8 +4,6 @@ import (
"fmt"
"math"
"sync/atomic"
"go.etcd.io/bbolt"
)
func (c *cache) estimateCacheSize() uint64 {
@ -52,18 +50,13 @@ func (x *counters) FS() uint64 {
}
func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
inDB = uint64(b.Stats().KeyN)
}
return nil
})
stats, err := c.repo.Stats()
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
inDB := stats.KeyCount
inFS, err := c.fsTree.NumberOfObjects()
if err != nil {
return fmt.Errorf("could not read write-cache FS counter: %w", err)

View file

@ -5,50 +5,49 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
bboltrepo "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio/bbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil {
return err
}
c.db, err = OpenDB(c.path, readOnly, c.openFile)
// It's possible to use other Repository implementations here, such as the badger-backed one:
//
// opts := badger.DefaultOptions(filepath.Join(c.path, "small.badger")).WithReadOnly(readOnly)
// c.repo, err = badgerrepo.Open(opts)
c.repo, err = bboltrepo.Open(filepath.Join(c.path, "small.bolt"), &bboltrepo.Options{
Options: bbolt.Options{
NoFreelistSync: true,
ReadOnly: readOnly,
Timeout: 100 * time.Millisecond,
OpenFile: c.openFile,
},
MaxBatchSize: c.maxBatchSize,
MaxBatchDelay: c.maxBatchDelay,
})
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !readOnly {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New(
fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm),
@ -68,10 +67,9 @@ func (c *cache) deleteFromDB(keys []string) []string {
}
var errorIndex int
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
err := c.repo.Write(func(tx kvio.WriteOnlyTx) error {
for errorIndex = range keys {
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
if err := tx.Delete([]byte(keys[errorIndex])); err != nil {
return err
}
}

View file

@ -1,20 +0,0 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/kvio"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -63,8 +64,8 @@ type cache struct {
closeCh chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
store
// repo is the underlying key/value repository.
repo kvio.Repository
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
}
@ -84,10 +85,6 @@ const (
defaultMaxCacheSize = 1 << 30 // 1 GiB
)
var (
defaultBucket = []byte{0}
)
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
@ -165,10 +162,10 @@ func (c *cache) Close() error {
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()
if c.repo != nil {
err = c.repo.Close()
if err != nil {
c.db = nil
c.repo = nil
}
}
c.metrics.Close()