WIP: FSTree only writecache #1273

Closed
dstepanov-yadro wants to merge 4 commits from dstepanov-yadro/frostfs-node:feat/fstree_only_writecache into master
27 changed files with 80 additions and 695 deletions
Showing only changes of commit c180795f9b - Show all commits

View file

@ -1,11 +1,10 @@
package writecache package writecache
import ( import (
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -23,18 +22,20 @@ func init() {
} }
func inspectFunc(cmd *cobra.Command, _ []string) { func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte wc := writecache.New(
writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
db, err := writecache.OpenDB(vPath, true, os.OpenFile) var addr oid.Address
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not decode address: %w", addr.DecodeString(vAddress)))
defer db.Close()
data, err = writecache.Get(db, []byte(vAddress)) obj, err := wc.Get(cmd.Context(), addr)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
var o objectSDK.Object common.PrintObjectHeader(cmd, *obj)
common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) data, err := obj.Marshal()
common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err))
common.PrintObjectHeader(cmd, o)
common.WriteObjectToFile(cmd, vOut, data) common.WriteObjectToFile(cmd, vOut, data)
} }

View file

@ -3,11 +3,11 @@ package writecache
import ( import (
"fmt" "fmt"
"io" "io"
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
blobstor "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -26,15 +26,18 @@ func listFunc(cmd *cobra.Command, _ []string) {
// other targets can be supported // other targets can be supported
w := cmd.OutOrStderr() w := cmd.OutOrStderr()
wAddr := func(addr oid.Address) error { wc := writecache.New(
_, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
var prm blobstor.IteratePrm
prm.IgnoreErrors = true
prm.Handler = func(ie blobstor.IterationElement) error {
_, err := io.WriteString(w, fmt.Sprintf("%s\n", ie.Address))
return err return err
} }
_, err := wc.Iterate(cmd.Context(), prm)
db, err := writecache.OpenDB(vPath, true, os.OpenFile) common.ExitOnErr(cmd, common.Errf("could not iterate write-cache: %w", err))
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()
err = writecache.IterateDB(db, wAddr)
common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err))
} }

View file

@ -146,9 +146,6 @@ type shardCfg struct {
writecacheCfg struct { writecacheCfg struct {
enabled bool enabled bool
path string path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64 maxObjSize uint64
flushWorkerCount int flushWorkerCount int
sizeLimit uint64 sizeLimit uint64
@ -269,10 +266,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true wc.enabled = true
wc.path = writeCacheCfg.Path() wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit() wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.noSync = writeCacheCfg.NoSync() wc.noSync = writeCacheCfg.NoSync()
@ -861,10 +855,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled { if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts, writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path), writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithNoSync(wcRead.noSync), writecache.WithNoSync(wcRead.noSync),

View file

@ -74,7 +74,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync()) require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path()) require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
@ -129,7 +128,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync()) require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path()) require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())

View file

@ -10,9 +10,6 @@ import (
type Config config.Config type Config config.Config
const ( const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit. // MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20 MaxSizeDefault = 64 << 20
@ -51,22 +48,6 @@ func (x *Config) Path() string {
return p return p
} }
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter. // MaxObjectSize returns the value of "max_object_size" config parameter.
// //
// Returns MaxSizeDefault if the value is not a positive number. // Returns MaxSizeDefault if the value is not a positive number.

View file

@ -101,7 +101,6 @@ FROSTFS_STORAGE_SHARD_0_MODE=read-only
FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false
FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache
FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
@ -155,7 +154,6 @@ FROSTFS_STORAGE_SHARD_1_MODE=read-write
### Write cache config ### Write cache config
FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296 FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296

View file

@ -145,7 +145,6 @@
"enabled": false, "enabled": false,
"no_sync": true, "no_sync": true,
"path": "tmp/0/cache", "path": "tmp/0/cache",
"small_object_size": 16384,
"max_object_size": 134217728, "max_object_size": 134217728,
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 3221225472 "capacity": 3221225472
@ -203,7 +202,6 @@
"enabled": true, "enabled": true,
"path": "tmp/1/cache", "path": "tmp/1/cache",
"memcache_capacity": 2147483648, "memcache_capacity": 2147483648,
"small_object_size": 16384,
"max_object_size": 134217728, "max_object_size": 134217728,
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 4294967296 "capacity": 4294967296

View file

@ -125,7 +125,6 @@ storage:
writecache: writecache:
enabled: true enabled: true
small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes
flush_worker_count: 30 # number of write-cache flusher threads flush_worker_count: 30 # number of write-cache flusher threads

View file

@ -287,7 +287,6 @@ writecache:
enabled: true enabled: true
path: /path/to/writecache path: /path/to/writecache
capacity: 4294967296 capacity: 4294967296
small_object_size: 16384
max_object_size: 134217728 max_object_size: 134217728
flush_worker_count: 30 flush_worker_count: 30
``` ```
@ -296,11 +295,8 @@ writecache:
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| |----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. | | `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | | `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
# `node` section # `node` section

View file

@ -278,7 +278,6 @@ const (
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza" BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza"

View file

@ -166,8 +166,7 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
} }
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { func (m *writeCacheMetrics) SetEstimateSize(fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
@ -175,8 +174,7 @@ func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String()) m.metrics.SetMode(m.shardID, mod.String())
} }
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { func (m *writeCacheMetrics) SetActualCounters(fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }

View file

@ -95,6 +95,5 @@ func newCache(b *testing.B) writecache.Cache {
writecache.WithBlobstor(bs), writecache.WithBlobstor(bs),
writecache.WithMetabase(testMetabase{}), writecache.WithMetabase(testMetabase{}),
writecache.WithMaxCacheSize(256<<30), writecache.WithMaxCacheSize(256<<30),
writecache.WithSmallObjectSize(128<<10),
) )
} }

View file

@ -10,8 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,14 +26,10 @@ type cache struct {
// whether object should be compressed. // whether object should be compressed.
compressFlags map[string]struct{} compressFlags map[string]struct{}
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
// cancel is cancel function, protected by modeMtx in Close. // cancel is cancel function, protected by modeMtx in Close.
cancel atomic.Value cancel atomic.Value
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system. // fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree fsTree *fstree.FSTree
} }
@ -43,40 +37,26 @@ type cache struct {
// wcStorageType is used for write-cache operations logging. // wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache" const wcStorageType = "write-cache"
type objectInfo struct {
addr string
data []byte
obj *objectSDK.Object
}
const ( const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB defaultMaxCacheSize = 1 << 30 // 1 GiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
) )
var ( var dummyCanceler context.CancelFunc = func() {}
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { func New(opts ...Option) Cache {
c := &cache{ c := &cache{
flushCh: make(chan objectInfo), mode: mode.Disabled,
mode: mode.Disabled,
compressFlags: make(map[string]struct{}), compressFlags: make(map[string]struct{}),
options: options{ options: options{
log: &logger.Logger{Logger: zap.NewNop()}, log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize, maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize, workersCount: defaultFlushWorkersCount,
workersCount: defaultFlushWorkersCount, maxCacheSize: defaultMaxCacheSize,
maxCacheSize: defaultMaxCacheSize, openFile: os.OpenFile,
maxBatchSize: bbolt.DefaultMaxBatchSize, metrics: DefaultMetrics(),
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
metrics: DefaultMetrics(),
}, },
} }
@ -111,7 +91,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
return metaerr.Wrap(c.initCounters()) _ = c.estimateCacheSize()
return nil
} }
// Init runs necessary services. // Init runs necessary services.
@ -138,14 +119,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
var err error
if c.db != nil {
err = c.db.Close()
if err != nil {
c.db = nil
}
}
c.metrics.Close() c.metrics.Close()
return nil return nil
} }

View file

@ -2,7 +2,6 @@ package writecache
import ( import (
"context" "context"
"math"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -10,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -47,39 +45,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString() saddr := addr.EncodeToString()
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
dataSize = len(b.Get([]byte(saddr)))
return nil
})
if dataSize > 0 {
storageType = StorageTypeDB
var recordDeleted bool
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(saddr)
recordDeleted = b.Get(key) != nil
err := b.Delete(key)
return err
})
if err != nil {
return err
}
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
deleted = true
return nil
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil { if err == nil {

View file

@ -1,7 +1,6 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"time" "time"
@ -15,142 +14,33 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// defaultFlushWorkersCount is number of workers for putting objects in main storage. // defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20 defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes. // defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second defaultFlushInterval = 10 * time.Second
) )
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
return return
} }
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall(ctx)
}
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
c.workerFlushBig(ctx) c.workerFlushBig(ctx)
c.wg.Done() c.wg.Done()
}() }()
c.wg.Add(1)
go func() {
defer c.wg.Done()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
c.estimateCacheSize()
case <-ctx.Done():
return
}
}
}()
}
func (c *cache) flushSmallObjects(ctx context.Context) {
var lastKey []byte
for {
select {
case <-ctx.Done():
return
default:
}
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
var k, v []byte
if len(lastKey) == 0 {
k, v = cs.First()
} else {
k, v = cs.Seek(lastKey)
if bytes.Equal(k, lastKey) {
k, v = cs.Next()
}
}
for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if len(lastKey) == len(k) {
copy(lastKey, k)
} else {
lastKey = bytes.Clone(k)
}
m = append(m, objectInfo{
addr: string(k),
data: bytes.Clone(v),
})
}
return nil
})
var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- m[i]:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
}
c.modeMtx.RUnlock()
if count == 0 {
break
}
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
}
} }
func (c *cache) workerFlushBig(ctx context.Context) { func (c *cache) workerFlushBig(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10) tick := time.NewTicker(defaultFlushInterval)
for { for {
select { select {
case <-tick.C: case <-tick.C:
@ -211,29 +101,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(objInfo.addr, true)
}
}
// flushObject is used to write object directly to the main storage. // flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error {
var err error var err error
@ -300,74 +167,5 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
if err := c.flushFSTree(ctx, ignoreErrors); err != nil { return c.flushFSTree(ctx, ignoreErrors)
return err
}
var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
} }

View file

@ -19,7 +19,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -31,7 +30,6 @@ func TestFlush(t *testing.T) {
append([]Option{ append([]Option{
WithLogger(testlogger), WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")), WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithDisableBackgroundFlush(), WithDisableBackgroundFlush(),
@ -47,31 +45,6 @@ func TestFlush(t *testing.T) {
} }
failures := []TestFailureInjector[Option]{ failures := []TestFailureInjector[Option]{
{
Desc: "db, invalid address",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
},
},
{
Desc: "db, invalid object",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
k := []byte(oidtest.Address().EncodeToString())
v := []byte{1, 2, 3}
return b.Put(k, v)
}))
},
},
{ {
Desc: "fs, read error", Desc: "fs, read error",
InjectFn: func(t *testing.T, wc Cache) { InjectFn: func(t *testing.T, wc Cache) {

View file

@ -1,7 +1,6 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"time" "time"
@ -12,7 +11,6 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -37,11 +35,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, saddr, addr) obj, err := c.getInternal(ctx, addr)
return obj, metaerr.Wrap(err) return obj, metaerr.Wrap(err)
} }
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
found := false found := false
storageType := StorageTypeUndefined storageType := StorageTypeUndefined
startedAt := time.Now() startedAt := time.Now()
@ -49,14 +47,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
c.metrics.Get(time.Since(startedAt), found, storageType) c.metrics.Get(time.Since(startedAt), found, storageType)
}() }()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
storageType = StorageTypeDB
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil { if err != nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
@ -87,34 +77,10 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, saddr, addr) obj, err := c.getInternal(ctx, addr)
if err != nil { if err != nil {
return nil, metaerr.Wrap(err) return nil, metaerr.Wrap(err)
} }
return obj.CutPayload(), nil return obj.CutPayload(), nil
} }
// Get fetches object from the underlying database.
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
if db == nil {
return nil, ErrNotInitialized
}
var value []byte
err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
value = b.Get(key)
if value == nil {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
value = bytes.Clone(value)
return nil
})
return value, metaerr.Wrap(err)
}

View file

@ -1,39 +1,28 @@
package writecache package writecache
import ( import (
"errors" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. func (c *cache) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
var ErrNoDefaultBucket = errors.New("no default bucket") ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Iterate",
trace.WithAttributes(
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. if !c.modeMtx.TryRLock() {
// It is assumed that db is an underlying database of some WriteCache instance. return common.IterateRes{}, ErrNotInitialized
// }
// Returns ErrNoDefaultBucket if there is no default bucket in db. defer c.modeMtx.RUnlock()
// if c.mode.NoMetabase() {
// DB must not be nil and should be opened. return common.IterateRes{}, ErrDegraded
func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { }
return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
var addr oid.Address return c.fsTree.Iterate(ctx, prm)
return b.ForEach(func(k, _ []byte) error {
err := addr.DecodeString(string(k))
if err != nil {
return fmt.Errorf("could not parse object address: %w", err)
}
return f(addr)
})
}))
} }

View file

@ -14,7 +14,6 @@ func (t StorageType) String() string {
const ( const (
StorageTypeUndefined StorageType = "null" StorageTypeUndefined StorageType = "null"
StorageTypeDB StorageType = "db"
StorageTypeFSTree StorageType = "fstree" StorageTypeFSTree StorageType = "fstree"
) )
@ -26,9 +25,9 @@ type Metrics interface {
Flush(success bool, st StorageType) Flush(success bool, st StorageType)
Evict(st StorageType) Evict(st StorageType)
SetEstimateSize(db, fstree uint64) SetEstimateSize(uint64)
SetMode(m mode.ComponentMode) SetMode(m mode.ComponentMode)
SetActualCounters(db, fstree uint64) SetActualCounters(uint64)
SetPath(path string) SetPath(path string)
Close() Close()
} }
@ -47,11 +46,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {}
func (metricsStub) Put(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {}
func (metricsStub) SetEstimateSize(uint64, uint64) {} func (metricsStub) SetEstimateSize(uint64) {}
func (metricsStub) SetMode(mode.ComponentMode) {} func (metricsStub) SetMode(mode.ComponentMode) {}
func (metricsStub) SetActualCounters(uint64, uint64) {} func (metricsStub) SetActualCounters(uint64) {}
func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Flush(bool, StorageType) {}

View file

@ -2,10 +2,7 @@ package writecache
import ( import (
"context" "context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -25,7 +22,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
err := c.setMode(ctx, m, true) err := c.setMode(ctx, m, !m.NoMetabase())
if err == nil { if err == nil {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m)) c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
} }
@ -44,20 +41,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err
} }
} }
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}
// Suspend producers to ensure there are channel send operations in fly.
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations.
for len(c.flushCh) != 0 {
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
time.Sleep(time.Second)
}
if turnOffMeta { if turnOffMeta {
c.mode = m c.mode = m
return nil return nil

View file

@ -1,30 +0,0 @@
package writecache
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"github.com/stretchr/testify/require"
)
func TestMode(t *testing.T) {
t.Parallel()
wc := New(
WithLogger(test.NewLogger(t)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()))
require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
require.NoError(t, wc.Open(context.Background(), mode.Degraded))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
}

View file

@ -3,7 +3,6 @@ package writecache
import ( import (
"io/fs" "io/fs"
"os" "os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
@ -22,19 +21,13 @@ type options struct {
metabase Metabase metabase Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache. // maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64 maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel. // workersCount is the number of workers flushing objects in parallel.
workersCount int workersCount int
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // maxCacheSize is the maximum total size of all objects saved in cache.
// 1 GiB by default. // 1 GiB by default.
maxCacheSize uint64 maxCacheSize uint64
// objCounters contains atomic counters for the number of objects stored in cache. // objCounters contains atomic counters for the number of objects stored in cache.
objCounters counters objCounters counters
// maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database.
maxBatchDelay time.Duration
// noSync is true iff FSTree allows unsynchronized writes. // noSync is true iff FSTree allows unsynchronized writes.
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.
@ -84,15 +77,6 @@ func WithMaxObjectSize(sz uint64) Option {
} }
} }
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option { func WithFlushWorkersCount(c int) Option {
return func(o *options) { return func(o *options) {
if c > 0 { if c > 0 {
@ -108,24 +92,6 @@ func WithMaxCacheSize(sz uint64) Option {
} }
} }
// WithMaxBatchSize sets max batch size for the small object database.
func WithMaxBatchSize(sz int) Option {
return func(o *options) {
if sz > 0 {
o.maxBatchSize = sz
}
}
}
// WithMaxBatchDelay sets max batch delay for the small object database.
func WithMaxBatchDelay(d time.Duration) Option {
return func(o *options) {
if d > 0 {
o.maxBatchDelay = d
}
}
}
// WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // WithNoSync sets an option to allow returning to caller on PUT before write is persisted.
// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because
// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT

View file

@ -8,7 +8,6 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -50,63 +49,17 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
return common.PutRes{}, ErrBigObject return common.PutRes{}, ErrBigObject
} }
oi := objectInfo{
addr: prm.Address.EncodeToString(),
obj: prm.Object,
data: prm.RawData,
}
if sz <= c.smallObjectSize {
storageType = StorageTypeDB
err := c.putSmall(oi)
if err == nil {
added = true
}
return common.PutRes{}, err
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
err := c.putBig(ctx, oi.addr, prm) err := c.putBig(ctx, prm)
if err == nil { if err == nil {
added = true added = true
} }
return common.PutRes{}, metaerr.Wrap(err) return common.PutRes{}, metaerr.Wrap(err)
} }
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
}
var newRecord bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(obj.addr)
newRecord = b.Get(key) == nil
if newRecord {
return b.Put(key, obj.data)
}
return nil
})
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(obj.addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
if newRecord {
c.objCounters.cDB.Add(1)
c.estimateCacheSize()
}
}
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
addr := prm.Address.EncodeToString()
cacheSz := c.estimateCacheSize() cacheSz := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeFS(cacheSz) { if c.maxCacheSize < c.incSizeFS(cacheSz) {
return ErrOutOfSpace return ErrOutOfSpace

View file

@ -1,29 +1,21 @@
package writecache package writecache
import ( import (
"fmt"
"math" "math"
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
) )
func (c *cache) estimateCacheSize() uint64 { func (c *cache) estimateCacheSize() uint64 {
dbCount := c.objCounters.DB()
fsCount := c.objCounters.FS() fsCount := c.objCounters.FS()
if fsCount > 0 { if fsCount > 0 {
fsCount-- // db file fsCount-- // db file
} }
dbSize := dbCount * c.smallObjectSize
fsSize := fsCount * c.maxObjectSize fsSize := fsCount * c.maxObjectSize
c.metrics.SetEstimateSize(dbSize, fsSize) c.metrics.SetEstimateSize(fsSize)
c.metrics.SetActualCounters(dbCount, fsCount) c.metrics.SetActualCounters(fsCount)
return dbSize + fsSize return fsSize
}
func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.smallObjectSize
} }
func (c *cache) incSizeFS(sz uint64) uint64 { func (c *cache) incSizeFS(sz uint64) uint64 {
@ -33,11 +25,7 @@ func (c *cache) incSizeFS(sz uint64) uint64 {
var _ fstree.FileCounter = &counters{} var _ fstree.FileCounter = &counters{}
type counters struct { type counters struct {
cDB, cFS atomic.Uint64 cFS atomic.Uint64
}
func (x *counters) DB() uint64 {
return x.cDB.Load()
} }
func (x *counters) FS() uint64 { func (x *counters) FS() uint64 {
@ -58,20 +46,3 @@ func (x *counters) Inc() {
func (x *counters) Dec() { func (x *counters) Dec() {
x.cFS.Add(math.MaxUint64) x.cFS.Add(math.MaxUint64)
} }
func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
inDB = uint64(b.Stats().KeyN)
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
c.objCounters.cDB.Store(inDB)
c.estimateCacheSize()
return nil
}

View file

@ -3,7 +3,6 @@ package writecache
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"os" "os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,42 +13,15 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(mod mode.ComponentMode) error { func (c *cache) openStore(mod mode.ComponentMode) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
return err return err
} }
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !mod.ReadOnly() {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New( c.fsTree = fstree.New(
fstree.WithPath(c.path), fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm), fstree.WithPerm(os.ModePerm),
@ -68,41 +40,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return nil return nil
} }
func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool
var err error
if batched {
err = c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil {
c.metrics.Evict(StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
} else {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
}
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) { if err != nil && !client.IsErrObjectNotFound(err) {

View file

@ -1,20 +0,0 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -37,6 +37,7 @@ type Cache interface {
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool, bool) error Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error Seal(context.Context, bool) error
Iterate(context.Context, common.IteratePrm) (common.IterateRes, error)
Init() error Init() error
Open(ctx context.Context, mode mode.Mode) error Open(ctx context.Context, mode mode.Mode) error