WIP: FSTree only writecache #1273

Closed
dstepanov-yadro wants to merge 4 commits from dstepanov-yadro/frostfs-node:feat/fstree_only_writecache into master
39 changed files with 367 additions and 870 deletions

View file

@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
db := openMeta(cmd) db := openMeta(cmd)
defer db.Close() defer db.Close()
storageID := meta.StorageIDPrm{} storageID := meta.StorageIDPrm{Address: addr}
storageID.SetAddress(addr)
resStorageID, err := db.StorageID(cmd.Context(), storageID) resStorageID, err := db.StorageID(cmd.Context(), storageID)
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err)) common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
if id := resStorageID.StorageID(); id != nil { if id := resStorageID.StorageID; id != nil {
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path()) cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
} else { } else {
cmd.Printf("Object does not contain storageID\n\n") cmd.Printf("Object does not contain storageID\n\n")

View file

@ -1,11 +1,10 @@
package writecache package writecache
import ( import (
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -23,18 +22,20 @@ func init() {
} }
func inspectFunc(cmd *cobra.Command, _ []string) { func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte wc := writecache.New(
writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
db, err := writecache.OpenDB(vPath, true, os.OpenFile) var addr oid.Address
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not decode address: %w", addr.DecodeString(vAddress)))
defer db.Close()
data, err = writecache.Get(db, []byte(vAddress)) obj, err := wc.Get(cmd.Context(), addr)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
var o objectSDK.Object common.PrintObjectHeader(cmd, *obj)
common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) data, err := obj.Marshal()
common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err))
common.PrintObjectHeader(cmd, o)
common.WriteObjectToFile(cmd, vOut, data) common.WriteObjectToFile(cmd, vOut, data)
} }

View file

@ -3,11 +3,11 @@ package writecache
import ( import (
"fmt" "fmt"
"io" "io"
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
blobstor "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -26,15 +26,18 @@ func listFunc(cmd *cobra.Command, _ []string) {
// other targets can be supported // other targets can be supported
w := cmd.OutOrStderr() w := cmd.OutOrStderr()
wAddr := func(addr oid.Address) error { wc := writecache.New(
_, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
var prm blobstor.IteratePrm
prm.IgnoreErrors = true
prm.Handler = func(ie blobstor.IterationElement) error {
_, err := io.WriteString(w, fmt.Sprintf("%s\n", ie.Address))
return err return err
} }
_, err := wc.Iterate(cmd.Context(), prm)
db, err := writecache.OpenDB(vPath, true, os.OpenFile) common.ExitOnErr(cmd, common.Errf("could not iterate write-cache: %w", err))
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()
err = writecache.IterateDB(db, wAddr)
common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err))
} }

View file

@ -146,12 +146,10 @@ type shardCfg struct {
writecacheCfg struct { writecacheCfg struct {
enabled bool enabled bool
path string path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64 maxObjSize uint64
flushWorkerCount int flushWorkerCount int
sizeLimit uint64 sizeLimit uint64
countLimit uint64
noSync bool noSync bool
} }
@ -269,12 +267,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true wc.enabled = true
wc.path = writeCacheCfg.Path() wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit() wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync() wc.noSync = writeCacheCfg.NoSync()
} }
} }
@ -861,12 +857,10 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled { if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts, writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path), writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit),
writecache.WithNoSync(wcRead.noSync), writecache.WithNoSync(wcRead.noSync),
writecache.WithLogger(c.log), writecache.WithLogger(c.log),
) )

View file

@ -74,10 +74,10 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync()) require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path()) require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 0, wc.CountLimit())
require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -129,10 +129,10 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync()) require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path()) require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 10000, wc.CountLimit())
require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -10,9 +10,6 @@ import (
type Config config.Config type Config config.Config
const ( const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit. // MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20 MaxSizeDefault = 64 << 20
@ -51,22 +48,6 @@ func (x *Config) Path() string {
return p return p
} }
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter. // MaxObjectSize returns the value of "max_object_size" config parameter.
// //
// Returns MaxSizeDefault if the value is not a positive number. // Returns MaxSizeDefault if the value is not a positive number.
@ -99,11 +80,20 @@ func (x *Config) WorkerCount() int {
return WorkersNumberDefault return WorkersNumberDefault
} }
// SizeLimit returns the value of "capacity" config parameter. // SizeLimit returns the value of "capacity_size" or "capacity" config parameter.
// //
// Returns SizeLimitDefault if the value is not a positive number. // Returns SizeLimitDefault if the value is not a positive number.
func (x *Config) SizeLimit() uint64 { func (x *Config) SizeLimit() uint64 {
c := config.SizeInBytesSafe( c := config.SizeInBytesSafe(
(*config.Config)(x),
"capacity_size",
)
if c > 0 {
return c
}
c = config.SizeInBytesSafe(
(*config.Config)(x), (*config.Config)(x),
"capacity", "capacity",
) )
@ -115,6 +105,16 @@ func (x *Config) SizeLimit() uint64 {
return SizeLimitDefault return SizeLimitDefault
} }
// CountLimit returns the value of "capacity_count" config parameter.
//
// Returns 0 (means no limit) if the value is not a positive number.
func (x *Config) CountLimit() uint64 {
return config.SizeInBytesSafe(
(*config.Config)(x),
"capacity_count",
)
}
// NoSync returns the value of "no_sync" config parameter. // NoSync returns the value of "no_sync" config parameter.
// //
// Returns false if the value is not a boolean. // Returns false if the value is not a boolean.

View file

@ -101,7 +101,6 @@ FROSTFS_STORAGE_SHARD_0_MODE=read-only
FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false
FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache
FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
@ -155,10 +154,10 @@ FROSTFS_STORAGE_SHARD_1_MODE=read-write
### Write cache config ### Write cache config
FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296 FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_SIZE=4294967296
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_COUNT=10000
### Metabase config ### Metabase config
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta
FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644 FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644

View file

@ -145,7 +145,6 @@
"enabled": false, "enabled": false,
"no_sync": true, "no_sync": true,
"path": "tmp/0/cache", "path": "tmp/0/cache",
"small_object_size": 16384,
"max_object_size": 134217728, "max_object_size": 134217728,
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 3221225472 "capacity": 3221225472
@ -203,10 +202,10 @@
"enabled": true, "enabled": true,
"path": "tmp/1/cache", "path": "tmp/1/cache",
"memcache_capacity": 2147483648, "memcache_capacity": 2147483648,
"small_object_size": 16384,
"max_object_size": 134217728, "max_object_size": 134217728,
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 4294967296 "capacity_size": 4294967296,
"capacity_count": 10000
}, },
"metabase": { "metabase": {
"path": "tmp/1/meta", "path": "tmp/1/meta",

View file

@ -125,7 +125,6 @@ storage:
writecache: writecache:
enabled: true enabled: true
small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes
flush_worker_count: 30 # number of write-cache flusher threads flush_worker_count: 30 # number of write-cache flusher threads
@ -208,7 +207,8 @@ storage:
1: 1:
writecache: writecache:
path: tmp/1/cache # write-cache root directory path: tmp/1/cache # write-cache root directory
capacity: 4 G # approximate write-cache total size, bytes capacity_size: 4 G # approximate write-cache total size, bytes
capacity_count: 10000
metabase: metabase:
path: tmp/1/meta # metabase path path: tmp/1/meta # metabase path

View file

@ -286,21 +286,20 @@ metabase:
writecache: writecache:
enabled: true enabled: true
path: /path/to/writecache path: /path/to/writecache
capacity: 4294967296 capacity_size: 4294967296
small_object_size: 16384 capacity_count: 100000
max_object_size: 134217728 max_object_size: 134217728
flush_worker_count: 30 flush_worker_count: 30
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| |----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. | | `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `capacity_size` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | | `capacity` | `size` | `1G` | The same as for `capacity`. Deprecated, use `capacity_size`. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | | `capacity_count` | `int` | unrestricted | Approximate maximum count of objects in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | | `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
# `node` section # `node` section

View file

@ -278,7 +278,6 @@ const (
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza" BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza"
@ -469,6 +468,7 @@ const (
FSTreeCantUnmarshalObject = "can't unmarshal an object" FSTreeCantUnmarshalObject = "can't unmarshal an object"
FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor" FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor"
FSTreeCantUpdateID = "can't update object storage ID" FSTreeCantUpdateID = "can't update object storage ID"
FSTreeCantGetID = "can't get object storage ID"
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
PutSingleRedirectFailure = "failed to redirect PutSingle request" PutSingleRedirectFailure = "failed to redirect PutSingle request"
StorageIDRetrievalFailure = "can't get storage ID from metabase" StorageIDRetrievalFailure = "can't get storage ID from metabase"

View file

@ -1,22 +1,23 @@
package fstree package fstree
import ( import (
"math"
"sync/atomic" "sync/atomic"
) )
// FileCounter used to count files in FSTree. The implementation must be thread-safe. // FileCounter used to count files in FSTree. The implementation must be thread-safe.
type FileCounter interface { type FileCounter interface {
Set(v uint64) Set(count, size int64)
Inc() Inc(size int64)
Dec() Dec(size int64)
Value() (int64, int64)
} }
type noopCounter struct{} type noopCounter struct{}
func (c *noopCounter) Set(uint64) {} func (c *noopCounter) Set(int64, int64) {}
func (c *noopCounter) Inc() {} func (c *noopCounter) Inc(int64) {}
func (c *noopCounter) Dec() {} func (c *noopCounter) Dec(int64) {}
func (c *noopCounter) Value() (int64, int64) { return 0, 0 }
func counterEnabled(c FileCounter) bool { func counterEnabled(c FileCounter) bool {
_, noop := c.(*noopCounter) _, noop := c.(*noopCounter)
@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool {
} }
type SimpleCounter struct { type SimpleCounter struct {
v atomic.Uint64 count atomic.Int64
size atomic.Int64
} }
func NewSimpleCounter() *SimpleCounter { func NewSimpleCounter() *SimpleCounter {
return &SimpleCounter{} return &SimpleCounter{}
} }
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } func (c *SimpleCounter) Set(count, size int64) {
func (c *SimpleCounter) Inc() { c.v.Add(1) } c.count.Store(count)
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } c.size.Store(size)
func (c *SimpleCounter) Value() uint64 { return c.v.Load() } }
func (c *SimpleCounter) Inc(size int64) {
c.count.Add(1)
c.size.Add(size)
}
func (c *SimpleCounter) Dec(size int64) {
c.count.Add(-1)
c.size.Add(-size)
}
func (c *SimpleCounter) Value() (int64, int64) {
return c.count.Load(), c.size.Load()
}

View file

@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error {
return nil return nil
} }
counter, err := t.countFiles() count, size, err := t.countFiles()
if err != nil { if err != nil {
return err return err
} }
t.fileCounter.Set(counter) t.fileCounter.Set(count, size)
return nil return nil
} }
func (t *FSTree) countFiles() (uint64, error) { func (t *FSTree) countFiles() (int64, int64, error) {
var counter uint64 var count int64
var size int64
// it is simpler to just consider every file // it is simpler to just consider every file
// that is not directory as an object // that is not directory as an object
err := filepath.WalkDir(t.RootPath, err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error { func(_ string, d fs.DirEntry, _ error) error {
if !d.IsDir() { if !d.IsDir() {
counter++ count++
fi, err := d.Info()
if err != nil {
return err
}
size += fi.Size()
} }
return nil return nil
}, },
) )
if err != nil { if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
} }
return counter, nil return count, size, nil
} }
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {

View file

@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init()) require.NoError(t, fst.Init())
counterValue := counter.Value() counterValue, sizeValue := counter.Value()
require.Equal(t, uint64(0), counterValue) require.Equal(t, int64(0), counterValue)
require.Equal(t, int64(0), sizeValue)
defer func() { defer func() {
require.NoError(t, fst.Close()) require.NoError(t, fst.Close())
@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) {
putPrm.Address = addr putPrm.Address = addr
putPrm.RawData, _ = obj.Marshal() putPrm.RawData, _ = obj.Marshal()
var getPrm common.GetPrm
getPrm.Address = putPrm.Address
var delPrm common.DeletePrm var delPrm common.DeletePrm
delPrm.Address = addr delPrm.Address = addr
@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
counterValue = counter.Value() counterValue, sizeValue = counter.Value()
realCount, err := fst.countFiles() realCount, realSize, err := fst.countFiles()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, realCount, counterValue) require.Equal(t, realCount, counterValue)
require.Equal(t, realSize, sizeValue)
} }

View file

@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
} }
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileCounter.Inc() w.fileCounter.Inc(int64(len(data)))
var targetFileExists bool var targetFileExists bool
if _, e := os.Stat(p); e == nil { s, e := os.Stat(p)
if e == nil {
targetFileExists = true targetFileExists = true
} }
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
if err == nil && targetFileExists { if err == nil && targetFileExists {
w.fileCounter.Dec() w.fileCounter.Dec(int64(s.Size()))
} }
} else { } else {
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
} }
func (w *genericWriter) removeFile(p string) error { func (w *genericWriter) removeFile(p string) error {
var err error
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileGuard.Lock(p) return w.removeFileWithCounter(p)
err = os.Remove(p)
w.fileGuard.Unlock(p)
if err == nil {
w.fileCounter.Dec()
}
} else {
err = os.Remove(p)
} }
err := os.Remove(p)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = logicerr.Wrap(new(apistatus.ObjectNotFound)) err = logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
return err return err
} }
func (w *genericWriter) removeFileWithCounter(p string) error {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
s, err := os.Stat(p)
if err != nil && os.IsNotExist(err) {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
err = os.Remove(p)
if err == nil {
w.fileCounter.Dec(s.Size())
}
return err
}

View file

@ -18,7 +18,8 @@ type linuxWriter struct {
perm uint32 perm uint32
flags int flags int
counter FileCounter counter FileCounter
counterEnabled bool
} }
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
} }
_ = unix.Close(fd) // Don't care about error. _ = unix.Close(fd) // Don't care about error.
w := &linuxWriter{ w := &linuxWriter{
root: root, root: root,
perm: uint32(perm), perm: uint32(perm),
flags: flags, flags: flags,
counter: c, counter: c,
counterEnabled: counterEnabled(c),
} }
return w return w
} }
@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if n == len(data) { if n == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil { if err == nil {
w.counter.Inc() w.counter.Inc(int64(len(data)))
} }
if errors.Is(err, unix.EEXIST) { if errors.Is(err, unix.EEXIST) {
err = nil err = nil
@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
} }
func (w *linuxWriter) removeFile(p string) error { func (w *linuxWriter) removeFile(p string) error {
var s unix.Stat_t
if w.counterEnabled {
err := unix.Stat(p, &s)
if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
}
err := unix.Unlink(p) err := unix.Unlink(p)
if err != nil && err == unix.ENOENT { if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.counter.Dec() w.counter.Dec(s.Size)
} }
return err return err
} }

View file

@ -166,8 +166,7 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
} }
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { func (m *writeCacheMetrics) SetEstimateSize(fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
@ -175,8 +174,7 @@ func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String()) m.metrics.SetMode(m.shardID, mod.String())
} }
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { func (m *writeCacheMetrics) SetActualCounters(fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }

View file

@ -15,22 +15,12 @@ import (
// StorageIDPrm groups the parameters of StorageID operation. // StorageIDPrm groups the parameters of StorageID operation.
type StorageIDPrm struct { type StorageIDPrm struct {
addr oid.Address Address oid.Address
} }
// StorageIDRes groups the resulting values of StorageID operation. // StorageIDRes groups the resulting values of StorageID operation.
type StorageIDRes struct { type StorageIDRes struct {
id []byte StorageID []byte
}
// SetAddress is a StorageID option to set the object address to check.
func (p *StorageIDPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// StorageID returns storage ID.
func (r StorageIDRes) StorageID() []byte {
return r.id
} }
// StorageID returns storage descriptor for objects from the blobstor. // StorageID returns storage descriptor for objects from the blobstor.
@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID", _, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.Address.EncodeToString()),
)) ))
defer span.End() defer span.End()
@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
} }
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.id, err = db.storageID(tx, prm.addr) res.StorageID, err = db.storageID(tx, prm.Address)
return err return err
}) })
@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
type UpdateStorageIDPrm struct { type UpdateStorageIDPrm struct {
addr oid.Address Address oid.Address
id []byte StorageID []byte
} }
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation. // UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
type UpdateStorageIDRes struct{} type UpdateStorageIDRes struct{}
// SetAddress is an UpdateStorageID option to set the object address to check.
func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetStorageID is an UpdateStorageID option to set the storage ID.
func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
p.id = id
}
// UpdateStorageID updates storage descriptor for objects from the blobstor. // UpdateStorageID updates storage descriptor for objects from the blobstor.
func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) { func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
var ( var (
@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
_, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID", _, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.id)), attribute.String("storage_id", string(prm.StorageID)),
)) ))
defer span.End() defer span.End()
@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
} }
err = db.boltDB.Batch(func(tx *bbolt.Tx) error { err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return setStorageID(tx, prm.addr, prm.id, true) return setStorageID(tx, prm.Address, prm.StorageID, true)
}) })
success = err == nil success = err == nil
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)

View file

@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) {
} }
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error { func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
var sidPrm meta.UpdateStorageIDPrm sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id}
sidPrm.SetAddress(addr)
sidPrm.SetStorageID(id)
_, err := db.UpdateStorageID(context.Background(), sidPrm) _, err := db.UpdateStorageID(context.Background(), sidPrm)
return err return err
} }
func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) { func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) {
var sidPrm meta.StorageIDPrm sidPrm := meta.StorageIDPrm{Address: addr}
sidPrm.SetAddress(addr)
r, err := db.StorageID(context.Background(), sidPrm) r, err := db.StorageID(context.Background(), sidPrm)
return r.StorageID(), err return r.StorageID, err
} }

View file

@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr
} }
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error { func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm sPrm := meta.StorageIDPrm{Address: addr}
sPrm.SetAddress(addr)
res, err := s.metaBase.StorageID(ctx, sPrm) res, err := s.metaBase.StorageID(ctx, sPrm)
if err != nil { if err != nil {
s.log.Debug(logs.StorageIDRetrievalFailure, s.log.Debug(logs.StorageIDRetrievalFailure,
@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
storageID := res.StorageID() storageID := res.StorageID
if storageID == nil { if storageID == nil {
// if storageID is nil it means: // if storageID is nil it means:
// 1. there is no such object // 1. there is no such object

View file

@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
require.True(t, client.IsErrObjectNotFound(err), "invalid error type") require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
// storageID // storageID
var metaStIDPrm meta.StorageIDPrm metaStIDPrm := meta.StorageIDPrm{Address: addr}
metaStIDPrm.SetAddress(addr)
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
require.NoError(t, err, "failed to get storage ID") require.NoError(t, err, "failed to get storage ID")
// check existence in blobstore // check existence in blobstore
var bsExisted common.ExistsPrm var bsExisted common.ExistsPrm
bsExisted.Address = addr bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID() bsExisted.StorageID = storageID.StorageID
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existence") require.NoError(t, err, "failed to check blobstore existence")
require.True(t, exRes.Exists, "invalid blobstore existence result") require.True(t, exRes.Exists, "invalid blobstore existence result")
@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
// drop from blobstor // drop from blobstor
var bsDeletePrm common.DeletePrm var bsDeletePrm common.DeletePrm
bsDeletePrm.Address = addr bsDeletePrm.Address = addr
bsDeletePrm.StorageID = storageID.StorageID() bsDeletePrm.StorageID = storageID.StorageID
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore") require.NoError(t, err, "failed to delete from blobstore")

View file

@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
return res, false, err return res, false, err
} }
var mPrm meta.StorageIDPrm mPrm := meta.StorageIDPrm{Address: addr}
mPrm.SetAddress(addr)
mExRes, err := s.metaBase.StorageID(ctx, mPrm) mExRes, err := s.metaBase.StorageID(ctx, mPrm)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
} }
storageID := mExRes.StorageID() storageID := mExRes.StorageID
if storageID == nil { if storageID == nil {
// `nil` storageID returned without any error // `nil` storageID returned without any error
// means that object is big, `cb` expects an // means that object is big, `cb` expects an

View file

@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
return errMBIsNotAvailable return errMBIsNotAvailable
} }
var prm meta.UpdateStorageIDPrm prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID}
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm) _, err := u.mb.UpdateStorageID(ctx, prm)
return err return err
} }

View file

@ -2,6 +2,7 @@ package benchmark
import ( import (
"context" "context"
"sync"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -10,6 +11,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
require.NoError(b, cache.Init(), "initializing") require.NoError(b, cache.Init(), "initializing")
} }
type testMetabase struct{} type testMetabase struct {
storageIDs map[oid.Address][]byte
guard *sync.RWMutex
}
func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
t.guard.Lock()
defer t.guard.Unlock()
t.storageIDs[prm.Address] = prm.StorageID
return meta.UpdateStorageIDRes{}, nil return meta.UpdateStorageIDRes{}, nil
} }
func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) {
t.guard.RLock()
defer t.guard.RUnlock()
if id, found := t.storageIDs[prm.Address]; found {
return meta.StorageIDRes{
StorageID: id,
}, nil
}
return meta.StorageIDRes{}, nil
}
func newCache(b *testing.B) writecache.Cache { func newCache(b *testing.B) writecache.Cache {
bs := teststore.New( bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
@ -93,8 +113,7 @@ func newCache(b *testing.B) writecache.Cache {
return writecache.New( return writecache.New(
writecache.WithPath(b.TempDir()), writecache.WithPath(b.TempDir()),
writecache.WithBlobstor(bs), writecache.WithBlobstor(bs),
writecache.WithMetabase(testMetabase{}), writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}),
writecache.WithMaxCacheSize(256<<30), writecache.WithMaxCacheSize(256<<30),
writecache.WithSmallObjectSize(128<<10),
) )
} }

View file

@ -10,8 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
"go.etcd.io/bbolt" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,14 +28,13 @@ type cache struct {
// whether object should be compressed. // whether object should be compressed.
compressFlags map[string]struct{} compressFlags map[string]struct{}
// flushCh is a channel with objects to flush. flushCh chan objectInfo
flushCh chan objectInfo flushingGuard *utilSync.KeyLocker[oid.Address]
// cancel is cancel function, protected by modeMtx in Close. // cancel is cancel function, protected by modeMtx in Close.
cancel atomic.Value cancel atomic.Value
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system. // fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree fsTree *fstree.FSTree
} }
@ -43,40 +42,28 @@ type cache struct {
// wcStorageType is used for write-cache operations logging. // wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache" const wcStorageType = "write-cache"
type objectInfo struct {
addr string
data []byte
obj *objectSDK.Object
}
const ( const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB defaultMaxCacheSize = 1 << 30 // 1 GiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
) )
var ( var dummyCanceler context.CancelFunc = func() {}
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { func New(opts ...Option) Cache {
c := &cache{ c := &cache{
flushCh: make(chan objectInfo), mode: mode.Disabled,
mode: mode.Disabled, flushCh: make(chan objectInfo),
flushingGuard: utilSync.NewKeyLocker[oid.Address](),
compressFlags: make(map[string]struct{}), compressFlags: make(map[string]struct{}),
options: options{ options: options{
log: &logger.Logger{Logger: zap.NewNop()}, log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize, maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize, workersCount: defaultFlushWorkersCount,
workersCount: defaultFlushWorkersCount, maxCacheSize: defaultMaxCacheSize,
maxCacheSize: defaultMaxCacheSize, openFile: os.OpenFile,
maxBatchSize: bbolt.DefaultMaxBatchSize, metrics: DefaultMetrics(),
maxBatchDelay: bbolt.DefaultMaxBatchDelay, counter: fstree.NewSimpleCounter(),
openFile: os.OpenFile,
metrics: DefaultMetrics(),
}, },
} }
@ -111,7 +98,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
return metaerr.Wrap(c.initCounters()) c.estimateCacheSize()
return nil
} }
// Init runs necessary services. // Init runs necessary services.
@ -138,14 +126,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
var err error
if c.db != nil {
err = c.db.Close()
if err != nil {
c.db = nil
}
}
c.metrics.Close() c.metrics.Close()
return nil return nil
} }

View file

@ -2,7 +2,6 @@ package writecache
import ( import (
"context" "context"
"math"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -10,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -47,39 +45,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString() saddr := addr.EncodeToString()
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
dataSize = len(b.Get([]byte(saddr)))
return nil
})
if dataSize > 0 {
storageType = StorageTypeDB
var recordDeleted bool
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(saddr)
recordDeleted = b.Get(key) != nil
err := b.Delete(key)
return err
})
if err != nil {
return err
}
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
deleted = true
return nil
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil { if err == nil {

View file

@ -1,13 +1,11 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
@ -16,159 +14,96 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// defaultFlushWorkersCount is number of workers for putting objects in main storage. // defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20 defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes. // defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second defaultFlushInterval = 10 * time.Second
) )
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
return return
} }
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall(ctx) go c.workerFlush(ctx)
} }
c.wg.Add(1) c.wg.Add(1)
go func() { go c.workerSelect(ctx)
c.workerFlushBig(ctx)
c.wg.Done()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
c.estimateCacheSize()
case <-ctx.Done():
return
}
}
}()
} }
func (c *cache) flushSmallObjects(ctx context.Context) { func (c *cache) workerSelect(ctx context.Context) {
var lastKey []byte defer c.wg.Done()
for { tick := time.NewTicker(defaultFlushInterval)
select {
case <-ctx.Done():
return
default:
}
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
var k, v []byte
if len(lastKey) == 0 {
k, v = cs.First()
} else {
k, v = cs.Seek(lastKey)
if bytes.Equal(k, lastKey) {
k, v = cs.Next()
}
}
for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if len(lastKey) == len(k) {
copy(lastKey, k)
} else {
lastKey = bytes.Clone(k)
}
m = append(m, objectInfo{
addr: string(k),
data: bytes.Clone(v),
})
}
return nil
})
var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- m[i]:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
}
c.modeMtx.RUnlock()
if count == 0 {
break
}
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
}
}
func (c *cache) workerFlushBig(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10)
for { for {
select { select {
case <-tick.C: case <-tick.C:
c.modeMtx.RLock() var prm common.IteratePrm
if c.readOnly() || c.noMetabase() { prm.IgnoreErrors = true
c.modeMtx.RUnlock() prm.Handler = func(ie common.IterationElement) error {
break c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
return ErrReadOnly
}
if c.noMetabase() {
return ErrDegraded
}
select {
case <-ctx.Done():
return ctx.Err()
case c.flushCh <- objectInfo{
data: ie.ObjectData,
address: ie.Address,
}:
return nil
}
} }
_, _ = c.fsTree.Iterate(ctx, prm)
_ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock()
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (c *cache) workerFlush(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
var obj objectSDK.Object
err := obj.Unmarshal(objInfo.data)
if err != nil {
c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err))
continue
}
err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDisk(ctx, objInfo.address)
}
}
func (c *cache) reportFlushError(msg string, addr string, err error) { func (c *cache) reportFlushError(msg string, addr string, err error) {
if c.reportError != nil { if c.reportError != nil {
c.reportError(msg, err) c.reportError(msg, err)
@ -195,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) err = c.flushObject(ctx, e.Address, &obj, e.ObjectData)
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
return nil return nil
@ -211,39 +146,26 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(objInfo.addr, true)
}
}
// flushObject is used to write object directly to the main storage. // flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error {
var err error c.flushingGuard.Lock(addr)
defer c.flushingGuard.Unlock(addr)
stPrm := meta.StorageIDPrm{Address: addr}
stRes, err := c.metabase.StorageID(ctx, stPrm)
if err != nil {
c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err)
return err
}
if stRes.StorageID != nil {
// already flushed
return nil
}
defer func() { defer func() {
c.metrics.Flush(err == nil, st) c.metrics.Flush(err == nil, StorageTypeFSTree)
}() }()
addr := objectCore.AddressOf(obj)
var prm common.PutPrm var prm common.PutPrm
prm.Object = obj prm.Object = obj
prm.RawData = data prm.RawData = data
@ -258,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
return err return err
} }
var updPrm meta.UpdateStorageIDPrm updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID}
updPrm.SetAddress(addr)
updPrm.SetStorageID(res.StorageID)
_, err = c.metabase.UpdateStorageID(ctx, updPrm) _, err = c.metabase.UpdateStorageID(ctx, updPrm)
if err != nil { if err != nil {
@ -300,74 +220,10 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
if err := c.flushFSTree(ctx, ignoreErrors); err != nil { return c.flushFSTree(ctx, ignoreErrors)
return err
}
var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
} }
type batchItem struct { type objectInfo struct {
data []byte data []byte
address string address oid.Address
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
} }

View file

@ -19,7 +19,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -31,7 +30,6 @@ func TestFlush(t *testing.T) {
append([]Option{ append([]Option{
WithLogger(testlogger), WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")), WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithDisableBackgroundFlush(), WithDisableBackgroundFlush(),
@ -47,31 +45,6 @@ func TestFlush(t *testing.T) {
} }
failures := []TestFailureInjector[Option]{ failures := []TestFailureInjector[Option]{
{
Desc: "db, invalid address",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
},
},
{
Desc: "db, invalid object",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
k := []byte(oidtest.Address().EncodeToString())
v := []byte{1, 2, 3}
return b.Put(k, v)
}))
},
},
{ {
Desc: "fs, read error", Desc: "fs, read error",
InjectFn: func(t *testing.T, wc Cache) { InjectFn: func(t *testing.T, wc Cache) {
@ -253,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair {
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects { for i := range objects {
var mPrm meta.StorageIDPrm mPrm := meta.StorageIDPrm{Address: objects[i].addr}
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(context.Background(), mPrm) mRes, err := mb.StorageID(context.Background(), mPrm)
require.NoError(t, err) require.NoError(t, err)
var prm common.GetPrm var prm common.GetPrm
prm.Address = objects[i].addr prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID() prm.StorageID = mRes.StorageID
res, err := bs.Get(context.Background(), prm) res, err := bs.Get(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)

View file

@ -1,7 +1,6 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"time" "time"
@ -12,7 +11,6 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -37,11 +35,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, saddr, addr) obj, err := c.getInternal(ctx, addr)
return obj, metaerr.Wrap(err) return obj, metaerr.Wrap(err)
} }
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
found := false found := false
storageType := StorageTypeUndefined storageType := StorageTypeUndefined
startedAt := time.Now() startedAt := time.Now()
@ -49,14 +47,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
c.metrics.Get(time.Since(startedAt), found, storageType) c.metrics.Get(time.Since(startedAt), found, storageType)
}() }()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
storageType = StorageTypeDB
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil { if err != nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
@ -87,34 +77,10 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, saddr, addr) obj, err := c.getInternal(ctx, addr)
if err != nil { if err != nil {
return nil, metaerr.Wrap(err) return nil, metaerr.Wrap(err)
} }
return obj.CutPayload(), nil return obj.CutPayload(), nil
} }
// Get fetches object from the underlying database.
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
if db == nil {
return nil, ErrNotInitialized
}
var value []byte
err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
value = b.Get(key)
if value == nil {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
value = bytes.Clone(value)
return nil
})
return value, metaerr.Wrap(err)
}

View file

@ -1,39 +1,28 @@
package writecache package writecache
import ( import (
"errors" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. func (c *cache) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
var ErrNoDefaultBucket = errors.New("no default bucket") ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Iterate",
trace.WithAttributes(
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. if !c.modeMtx.TryRLock() {
// It is assumed that db is an underlying database of some WriteCache instance. return common.IterateRes{}, ErrNotInitialized
// }
// Returns ErrNoDefaultBucket if there is no default bucket in db. defer c.modeMtx.RUnlock()
// if c.mode.NoMetabase() {
// DB must not be nil and should be opened. return common.IterateRes{}, ErrDegraded
func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { }
return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
var addr oid.Address return c.fsTree.Iterate(ctx, prm)
return b.ForEach(func(k, _ []byte) error {
err := addr.DecodeString(string(k))
if err != nil {
return fmt.Errorf("could not parse object address: %w", err)
}
return f(addr)
})
}))
} }

View file

@ -14,7 +14,6 @@ func (t StorageType) String() string {
const ( const (
StorageTypeUndefined StorageType = "null" StorageTypeUndefined StorageType = "null"
StorageTypeDB StorageType = "db"
StorageTypeFSTree StorageType = "fstree" StorageTypeFSTree StorageType = "fstree"
) )
@ -26,9 +25,9 @@ type Metrics interface {
Flush(success bool, st StorageType) Flush(success bool, st StorageType)
Evict(st StorageType) Evict(st StorageType)
SetEstimateSize(db, fstree uint64) SetEstimateSize(uint64)
SetMode(m mode.ComponentMode) SetMode(m mode.ComponentMode)
SetActualCounters(db, fstree uint64) SetActualCounters(uint64)
SetPath(path string) SetPath(path string)
Close() Close()
} }
@ -47,11 +46,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {}
func (metricsStub) Put(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {}
func (metricsStub) SetEstimateSize(uint64, uint64) {} func (metricsStub) SetEstimateSize(uint64) {}
func (metricsStub) SetMode(mode.ComponentMode) {} func (metricsStub) SetMode(mode.ComponentMode) {}
func (metricsStub) SetActualCounters(uint64, uint64) {} func (metricsStub) SetActualCounters(uint64) {}
func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Flush(bool, StorageType) {}

View file

@ -2,10 +2,7 @@ package writecache
import ( import (
"context" "context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -25,7 +22,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
err := c.setMode(ctx, m, true) err := c.setMode(ctx, m, !m.NoMetabase())
if err == nil { if err == nil {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m)) c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
} }
@ -44,20 +41,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err
} }
} }
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}
// Suspend producers to ensure there are channel send operations in fly.
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations.
for len(c.flushCh) != 0 {
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
time.Sleep(time.Second)
}
if turnOffMeta { if turnOffMeta {
c.mode = m c.mode = m
return nil return nil

View file

@ -1,30 +0,0 @@
package writecache
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"github.com/stretchr/testify/require"
)
func TestMode(t *testing.T) {
t.Parallel()
wc := New(
WithLogger(test.NewLogger(t)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()))
require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
require.NoError(t, wc.Open(context.Background(), mode.Degraded))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
}

View file

@ -3,8 +3,8 @@ package writecache
import ( import (
"io/fs" "io/fs"
"os" "os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -22,19 +22,15 @@ type options struct {
metabase Metabase metabase Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache. // maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64 maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel. // workersCount is the number of workers flushing objects in parallel.
workersCount int workersCount int
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // maxCacheSize is the maximum total size of all objects saved in cache.
// 1 GiB by default. // 1 GiB by default.
maxCacheSize uint64 maxCacheSize uint64
// objCounters contains atomic counters for the number of objects stored in cache. // maxCacheCount is the maximum total count of all objects saved in cache.
objCounters counters maxCacheCount uint64
// maxBatchSize is the maximum batch size for the small object database. // counter contains atomic counters for the number of objects stored in cache.
maxBatchSize int counter *fstree.SimpleCounter
// maxBatchDelay is the maximum batch wait time for the small object database.
maxBatchDelay time.Duration
// noSync is true iff FSTree allows unsynchronized writes. // noSync is true iff FSTree allows unsynchronized writes.
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.
@ -84,15 +80,6 @@ func WithMaxObjectSize(sz uint64) Option {
} }
} }
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option { func WithFlushWorkersCount(c int) Option {
return func(o *options) { return func(o *options) {
if c > 0 { if c > 0 {
@ -108,21 +95,10 @@ func WithMaxCacheSize(sz uint64) Option {
} }
} }
// WithMaxBatchSize sets max batch size for the small object database. // WithMaxCacheCount sets maximum write-cache count of objects.
func WithMaxBatchSize(sz int) Option { func WithMaxCacheCount(cnt uint64) Option {
return func(o *options) { return func(o *options) {
if sz > 0 { o.maxCacheCount = cnt
o.maxBatchSize = sz
}
}
}
// WithMaxBatchDelay sets max batch delay for the small object database.
func WithMaxBatchDelay(d time.Duration) Option {
return func(o *options) {
if d > 0 {
o.maxBatchDelay = d
}
} }
} }

View file

@ -8,7 +8,6 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -50,65 +49,22 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
return common.PutRes{}, ErrBigObject return common.PutRes{}, ErrBigObject
} }
oi := objectInfo{
addr: prm.Address.EncodeToString(),
obj: prm.Object,
data: prm.RawData,
}
if sz <= c.smallObjectSize {
storageType = StorageTypeDB
err := c.putSmall(oi)
if err == nil {
added = true
}
return common.PutRes{}, err
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
err := c.putBig(ctx, oi.addr, prm) err := c.putBig(ctx, prm)
if err == nil { if err == nil {
added = true added = true
} }
return common.PutRes{}, metaerr.Wrap(err) return common.PutRes{}, metaerr.Wrap(err)
} }
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
}
var newRecord bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(obj.addr)
newRecord = b.Get(key) == nil
if newRecord {
return b.Put(key, obj.data)
}
return nil
})
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(obj.addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
if newRecord {
c.objCounters.cDB.Add(1)
c.estimateCacheSize()
}
}
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
cacheSz := c.estimateCacheSize() addr := prm.Address.EncodeToString()
if c.maxCacheSize < c.incSizeFS(cacheSz) { estimatedObjSize := uint64(len(prm.RawData))
if estimatedObjSize == 0 {
estimatedObjSize = prm.Object.PayloadSize()
}
if !c.hasFreeSpace(estimatedObjSize) {
return ErrOutOfSpace return ErrOutOfSpace
} }

View file

@ -1,77 +1,20 @@
package writecache package writecache
import ( func (c *cache) estimateCacheSize() {
"fmt" count, size := c.counter.Value()
"math" var ucount, usize uint64
"sync/atomic" if count > 0 {
ucount = uint64(count)
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
)
func (c *cache) estimateCacheSize() uint64 {
dbCount := c.objCounters.DB()
fsCount := c.objCounters.FS()
if fsCount > 0 {
fsCount-- // db file
} }
dbSize := dbCount * c.smallObjectSize if size > 0 {
fsSize := fsCount * c.maxObjectSize usize = uint64(size)
c.metrics.SetEstimateSize(dbSize, fsSize)
c.metrics.SetActualCounters(dbCount, fsCount)
return dbSize + fsSize
}
func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.smallObjectSize
}
func (c *cache) incSizeFS(sz uint64) uint64 {
return sz + c.maxObjectSize
}
var _ fstree.FileCounter = &counters{}
type counters struct {
cDB, cFS atomic.Uint64
}
func (x *counters) DB() uint64 {
return x.cDB.Load()
}
func (x *counters) FS() uint64 {
return x.cFS.Load()
}
// Set implements fstree.ObjectCounter.
func (x *counters) Set(v uint64) {
x.cFS.Store(v)
}
// Inc implements fstree.ObjectCounter.
func (x *counters) Inc() {
x.cFS.Add(1)
}
// Dec implements fstree.ObjectCounter.
func (x *counters) Dec() {
x.cFS.Add(math.MaxUint64)
}
func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
inDB = uint64(b.Stats().KeyN)
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
} }
c.objCounters.cDB.Store(inDB) c.metrics.SetEstimateSize(ucount)
c.estimateCacheSize() c.metrics.SetActualCounters(usize)
return nil }
func (c *cache) hasFreeSpace(sz uint64) bool {
count, size := c.counter.Value()
return (size+int64(sz) <= int64(c.maxCacheSize)) &&
(c.maxCacheCount == 0 || count+1 <= int64(c.maxCacheCount))
} }

View file

@ -3,7 +3,6 @@ package writecache
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"os" "os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,49 +13,22 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(mod mode.ComponentMode) error { func (c *cache) openStore(mod mode.ComponentMode) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
return err return err
} }
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !mod.ReadOnly() {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New( c.fsTree = fstree.New(
fstree.WithPath(c.path), fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm), fstree.WithPerm(os.ModePerm),
fstree.WithDepth(1), fstree.WithDepth(1),
fstree.WithDirNameLen(1), fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync), fstree.WithNoSync(c.noSync),
fstree.WithFileCounter(&c.objCounters), fstree.WithFileCounter(c.counter),
) )
if err := c.fsTree.Open(mod); err != nil { if err := c.fsTree.Open(mod); err != nil {
return fmt.Errorf("could not open FSTree: %w", err) return fmt.Errorf("could not open FSTree: %w", err)
@ -68,41 +40,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return nil return nil
} }
func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool
var err error
if batched {
err = c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil {
c.metrics.Evict(StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
} else {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
}
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) { if err != nil && !client.IsErrObjectNotFound(err) {

View file

@ -1,20 +0,0 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -37,6 +37,7 @@ type Cache interface {
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool, bool) error Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error Seal(context.Context, bool) error
Iterate(context.Context, common.IteratePrm) (common.IterateRes, error)
Init() error Init() error
Open(ctx context.Context, mode mode.Mode) error Open(ctx context.Context, mode mode.Mode) error
@ -54,6 +55,7 @@ type MainStorage interface {
// Metabase is the interface of the metabase used by Cache implementations. // Metabase is the interface of the metabase used by Cache implementations.
type Metabase interface { type Metabase interface {
UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
} }
var ( var (