Compare commits

...

4 commits

Author SHA1 Message Date
81df490709 [#1273] writecache: Flush writecache concurrently
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 14:17:52 +03:00
e61f9ac796 [#1273] writecache: Add count limit
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 14:17:47 +03:00
a7536afbf5 [#1273] writecache: Count real data size
The size of the data on the disk is used to determine the writeache
size, but not the number of objects multiplied by the maximum allowed
object size.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 14:17:43 +03:00
c180795f9b [#1273] writecache: Make writecache as FSTree only
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 14:17:31 +03:00
39 changed files with 367 additions and 870 deletions

View file

@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
db := openMeta(cmd)
defer db.Close()
storageID := meta.StorageIDPrm{}
storageID.SetAddress(addr)
storageID := meta.StorageIDPrm{Address: addr}
resStorageID, err := db.StorageID(cmd.Context(), storageID)
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
if id := resStorageID.StorageID(); id != nil {
if id := resStorageID.StorageID; id != nil {
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
} else {
cmd.Printf("Object does not contain storageID\n\n")

View file

@ -1,11 +1,10 @@
package writecache
import (
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
@ -23,18 +22,20 @@ func init() {
}
func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte
wc := writecache.New(
writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()
var addr oid.Address
common.ExitOnErr(cmd, common.Errf("could not decode address: %w", addr.DecodeString(vAddress)))
data, err = writecache.Get(db, []byte(vAddress))
obj, err := wc.Get(cmd.Context(), addr)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
var o objectSDK.Object
common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data)))
common.PrintObjectHeader(cmd, o)
common.PrintObjectHeader(cmd, *obj)
data, err := obj.Marshal()
common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err))
common.WriteObjectToFile(cmd, vOut, data)
}

View file

@ -3,11 +3,11 @@ package writecache
import (
"fmt"
"io"
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
blobstor "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
@ -26,15 +26,18 @@ func listFunc(cmd *cobra.Command, _ []string) {
// other targets can be supported
w := cmd.OutOrStderr()
wAddr := func(addr oid.Address) error {
_, err := io.WriteString(w, fmt.Sprintf("%s\n", addr))
wc := writecache.New(
writecache.WithPath(vPath),
)
common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly)))
defer wc.Close()
var prm blobstor.IteratePrm
prm.IgnoreErrors = true
prm.Handler = func(ie blobstor.IterationElement) error {
_, err := io.WriteString(w, fmt.Sprintf("%s\n", ie.Address))
return err
}
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()
err = writecache.IterateDB(db, wAddr)
common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err))
_, err := wc.Iterate(cmd.Context(), prm)
common.ExitOnErr(cmd, common.Errf("could not iterate write-cache: %w", err))
}

View file

@ -146,12 +146,10 @@ type shardCfg struct {
writecacheCfg struct {
enabled bool
path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64
flushWorkerCount int
sizeLimit uint64
countLimit uint64
noSync bool
}
@ -269,12 +267,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true
wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync()
}
}
@ -861,12 +857,10 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit),
writecache.WithNoSync(wcRead.noSync),
writecache.WithLogger(c.log),
)

View file

@ -74,10 +74,10 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 0, wc.CountLimit())
require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -129,10 +129,10 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 10000, wc.CountLimit())
require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -10,9 +10,6 @@ import (
type Config config.Config
const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20
@ -51,22 +48,6 @@ func (x *Config) Path() string {
return p
}
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter.
//
// Returns MaxSizeDefault if the value is not a positive number.
@ -99,11 +80,20 @@ func (x *Config) WorkerCount() int {
return WorkersNumberDefault
}
// SizeLimit returns the value of "capacity" config parameter.
// SizeLimit returns the value of "capacity_size" or "capacity" config parameter.
//
// Returns SizeLimitDefault if the value is not a positive number.
func (x *Config) SizeLimit() uint64 {
c := config.SizeInBytesSafe(
(*config.Config)(x),
"capacity_size",
)
if c > 0 {
return c
}
c = config.SizeInBytesSafe(
(*config.Config)(x),
"capacity",
)
@ -115,6 +105,16 @@ func (x *Config) SizeLimit() uint64 {
return SizeLimitDefault
}
// CountLimit returns the value of "capacity_count" config parameter.
//
// Returns 0 (means no limit) if the value is not a positive number.
func (x *Config) CountLimit() uint64 {
return config.SizeInBytesSafe(
(*config.Config)(x),
"capacity_count",
)
}
// NoSync returns the value of "no_sync" config parameter.
//
// Returns false if the value is not a boolean.

View file

@ -101,7 +101,6 @@ FROSTFS_STORAGE_SHARD_0_MODE=read-only
FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false
FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache
FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
@ -155,10 +154,10 @@ FROSTFS_STORAGE_SHARD_1_MODE=read-write
### Write cache config
FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_SIZE=4294967296
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_COUNT=10000
### Metabase config
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta
FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644

View file

@ -145,7 +145,6 @@
"enabled": false,
"no_sync": true,
"path": "tmp/0/cache",
"small_object_size": 16384,
"max_object_size": 134217728,
"flush_worker_count": 30,
"capacity": 3221225472
@ -203,10 +202,10 @@
"enabled": true,
"path": "tmp/1/cache",
"memcache_capacity": 2147483648,
"small_object_size": 16384,
"max_object_size": 134217728,
"flush_worker_count": 30,
"capacity": 4294967296
"capacity_size": 4294967296,
"capacity_count": 10000
},
"metabase": {
"path": "tmp/1/meta",

View file

@ -125,7 +125,6 @@ storage:
writecache:
enabled: true
small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes
flush_worker_count: 30 # number of write-cache flusher threads
@ -208,7 +207,8 @@ storage:
1:
writecache:
path: tmp/1/cache # write-cache root directory
capacity: 4 G # approximate write-cache total size, bytes
capacity_size: 4 G # approximate write-cache total size, bytes
capacity_count: 10000
metabase:
path: tmp/1/meta # metabase path

View file

@ -286,21 +286,20 @@ metabase:
writecache:
enabled: true
path: /path/to/writecache
capacity: 4294967296
small_object_size: 16384
capacity_size: 4294967296
capacity_count: 100000
max_object_size: 134217728
flush_worker_count: 30
```
| Parameter | Type | Default value | Description |
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| Parameter | Type | Default value | Description |
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. |
| `capacity_size` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `capacity` | `size` | `1G` | The same as for `capacity`. Deprecated, use `capacity_size`. |
| `capacity_count` | `int` | unrestricted | Approximate maximum count of objects in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
# `node` section

View file

@ -278,7 +278,6 @@ const (
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza"
@ -469,6 +468,7 @@ const (
FSTreeCantUnmarshalObject = "can't unmarshal an object"
FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor"
FSTreeCantUpdateID = "can't update object storage ID"
FSTreeCantGetID = "can't get object storage ID"
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
PutSingleRedirectFailure = "failed to redirect PutSingle request"
StorageIDRetrievalFailure = "can't get storage ID from metabase"

View file

@ -1,22 +1,23 @@
package fstree
import (
"math"
"sync/atomic"
)
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
type FileCounter interface {
Set(v uint64)
Inc()
Dec()
Set(count, size int64)
Inc(size int64)
Dec(size int64)
Value() (int64, int64)
}
type noopCounter struct{}
func (c *noopCounter) Set(uint64) {}
func (c *noopCounter) Inc() {}
func (c *noopCounter) Dec() {}
func (c *noopCounter) Set(int64, int64) {}
func (c *noopCounter) Inc(int64) {}
func (c *noopCounter) Dec(int64) {}
func (c *noopCounter) Value() (int64, int64) { return 0, 0 }
func counterEnabled(c FileCounter) bool {
_, noop := c.(*noopCounter)
@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool {
}
type SimpleCounter struct {
v atomic.Uint64
count atomic.Int64
size atomic.Int64
}
func NewSimpleCounter() *SimpleCounter {
return &SimpleCounter{}
}
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
func (c *SimpleCounter) Inc() { c.v.Add(1) }
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
func (c *SimpleCounter) Set(count, size int64) {
c.count.Store(count)
c.size.Store(size)
}
func (c *SimpleCounter) Inc(size int64) {
c.count.Add(1)
c.size.Add(size)
}
func (c *SimpleCounter) Dec(size int64) {
c.count.Add(-1)
c.size.Add(-size)
}
func (c *SimpleCounter) Value() (int64, int64) {
return c.count.Load(), c.size.Load()
}

View file

@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error {
return nil
}
counter, err := t.countFiles()
count, size, err := t.countFiles()
if err != nil {
return err
}
t.fileCounter.Set(counter)
t.fileCounter.Set(count, size)
return nil
}
func (t *FSTree) countFiles() (uint64, error) {
var counter uint64
func (t *FSTree) countFiles() (int64, int64, error) {
var count int64
var size int64
// it is simpler to just consider every file
// that is not directory as an object
err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error {
if !d.IsDir() {
counter++
count++
fi, err := d.Info()
if err != nil {
return err
}
size += fi.Size()
}
return nil
},
)
if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
}
return counter, nil
return count, size, nil
}
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {

View file

@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init())
counterValue := counter.Value()
require.Equal(t, uint64(0), counterValue)
counterValue, sizeValue := counter.Value()
require.Equal(t, int64(0), counterValue)
require.Equal(t, int64(0), sizeValue)
defer func() {
require.NoError(t, fst.Close())
@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) {
putPrm.Address = addr
putPrm.RawData, _ = obj.Marshal()
var getPrm common.GetPrm
getPrm.Address = putPrm.Address
var delPrm common.DeletePrm
delPrm.Address = addr
@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, eg.Wait())
counterValue = counter.Value()
realCount, err := fst.countFiles()
counterValue, sizeValue = counter.Value()
realCount, realSize, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, counterValue)
require.Equal(t, realSize, sizeValue)
}

View file

@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
}
if w.fileCounterEnabled {
w.fileCounter.Inc()
w.fileCounter.Inc(int64(len(data)))
var targetFileExists bool
if _, e := os.Stat(p); e == nil {
s, e := os.Stat(p)
if e == nil {
targetFileExists = true
}
err = os.Rename(tmpPath, p)
if err == nil && targetFileExists {
w.fileCounter.Dec()
w.fileCounter.Dec(int64(s.Size()))
}
} else {
err = os.Rename(tmpPath, p)
@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
}
func (w *genericWriter) removeFile(p string) error {
var err error
if w.fileCounterEnabled {
w.fileGuard.Lock(p)
err = os.Remove(p)
w.fileGuard.Unlock(p)
if err == nil {
w.fileCounter.Dec()
}
} else {
err = os.Remove(p)
return w.removeFileWithCounter(p)
}
err := os.Remove(p)
if err != nil && os.IsNotExist(err) {
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return err
}
func (w *genericWriter) removeFileWithCounter(p string) error {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
s, err := os.Stat(p)
if err != nil && os.IsNotExist(err) {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
err = os.Remove(p)
if err == nil {
w.fileCounter.Dec(s.Size())
}
return err
}

View file

@ -18,7 +18,8 @@ type linuxWriter struct {
perm uint32
flags int
counter FileCounter
counter FileCounter
counterEnabled bool
}
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
}
_ = unix.Close(fd) // Don't care about error.
w := &linuxWriter{
root: root,
perm: uint32(perm),
flags: flags,
counter: c,
root: root,
perm: uint32(perm),
flags: flags,
counter: c,
counterEnabled: counterEnabled(c),
}
return w
}
@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if n == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil {
w.counter.Inc()
w.counter.Inc(int64(len(data)))
}
if errors.Is(err, unix.EEXIST) {
err = nil
@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
}
func (w *linuxWriter) removeFile(p string) error {
var s unix.Stat_t
if w.counterEnabled {
err := unix.Stat(p, &s)
if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
}
err := unix.Unlink(p)
if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err == nil {
w.counter.Dec()
w.counter.Dec(s.Size)
}
return err
}

View file

@ -166,8 +166,7 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
}
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
func (m *writeCacheMetrics) SetEstimateSize(fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
}
@ -175,8 +174,7 @@ func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String())
}
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
func (m *writeCacheMetrics) SetActualCounters(fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
}

View file

@ -15,22 +15,12 @@ import (
// StorageIDPrm groups the parameters of StorageID operation.
type StorageIDPrm struct {
addr oid.Address
Address oid.Address
}
// StorageIDRes groups the resulting values of StorageID operation.
type StorageIDRes struct {
id []byte
}
// SetAddress is a StorageID option to set the object address to check.
func (p *StorageIDPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// StorageID returns storage ID.
func (r StorageIDRes) StorageID() []byte {
return r.id
StorageID []byte
}
// StorageID returns storage descriptor for objects from the blobstor.
@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
}
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.id, err = db.storageID(tx, prm.addr)
res.StorageID, err = db.storageID(tx, prm.Address)
return err
})
@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
type UpdateStorageIDPrm struct {
addr oid.Address
id []byte
Address oid.Address
StorageID []byte
}
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
type UpdateStorageIDRes struct{}
// SetAddress is an UpdateStorageID option to set the object address to check.
func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetStorageID is an UpdateStorageID option to set the storage ID.
func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
p.id = id
}
// UpdateStorageID updates storage descriptor for objects from the blobstor.
func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
var (
@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
_, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("storage_id", string(prm.id)),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.StorageID)),
))
defer span.End()
@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
}
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return setStorageID(tx, prm.addr, prm.id, true)
return setStorageID(tx, prm.Address, prm.StorageID, true)
})
success = err == nil
return res, metaerr.Wrap(err)

View file

@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) {
}
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
var sidPrm meta.UpdateStorageIDPrm
sidPrm.SetAddress(addr)
sidPrm.SetStorageID(id)
sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id}
_, err := db.UpdateStorageID(context.Background(), sidPrm)
return err
}
func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) {
var sidPrm meta.StorageIDPrm
sidPrm.SetAddress(addr)
sidPrm := meta.StorageIDPrm{Address: addr}
r, err := db.StorageID(context.Background(), sidPrm)
return r.StorageID(), err
return r.StorageID, err
}

View file

@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr
}
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
sPrm := meta.StorageIDPrm{Address: addr}
res, err := s.metaBase.StorageID(ctx, sPrm)
if err != nil {
s.log.Debug(logs.StorageIDRetrievalFailure,
@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
storageID := res.StorageID()
storageID := res.StorageID
if storageID == nil {
// if storageID is nil it means:
// 1. there is no such object

View file

@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
// storageID
var metaStIDPrm meta.StorageIDPrm
metaStIDPrm.SetAddress(addr)
metaStIDPrm := meta.StorageIDPrm{Address: addr}
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
require.NoError(t, err, "failed to get storage ID")
// check existence in blobstore
var bsExisted common.ExistsPrm
bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID()
bsExisted.StorageID = storageID.StorageID
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existence")
require.True(t, exRes.Exists, "invalid blobstore existence result")
@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
// drop from blobstor
var bsDeletePrm common.DeletePrm
bsDeletePrm.Address = addr
bsDeletePrm.StorageID = storageID.StorageID()
bsDeletePrm.StorageID = storageID.StorageID
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore")

View file

@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
return res, false, err
}
var mPrm meta.StorageIDPrm
mPrm.SetAddress(addr)
mPrm := meta.StorageIDPrm{Address: addr}
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
if err != nil {
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
}
storageID := mExRes.StorageID()
storageID := mExRes.StorageID
if storageID == nil {
// `nil` storageID returned without any error
// means that object is big, `cb` expects an

View file

@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID}
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}

View file

@ -2,6 +2,7 @@ package benchmark
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -10,6 +11,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
require.NoError(b, cache.Init(), "initializing")
}
type testMetabase struct{}
type testMetabase struct {
storageIDs map[oid.Address][]byte
guard *sync.RWMutex
}
func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
t.guard.Lock()
defer t.guard.Unlock()
t.storageIDs[prm.Address] = prm.StorageID
return meta.UpdateStorageIDRes{}, nil
}
func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) {
t.guard.RLock()
defer t.guard.RUnlock()
if id, found := t.storageIDs[prm.Address]; found {
return meta.StorageIDRes{
StorageID: id,
}, nil
}
return meta.StorageIDRes{}, nil
}
func newCache(b *testing.B) writecache.Cache {
bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
@ -93,8 +113,7 @@ func newCache(b *testing.B) writecache.Cache {
return writecache.New(
writecache.WithPath(b.TempDir()),
writecache.WithBlobstor(bs),
writecache.WithMetabase(testMetabase{}),
writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}),
writecache.WithMaxCacheSize(256<<30),
writecache.WithSmallObjectSize(128<<10),
)
}

View file

@ -10,8 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -28,14 +28,13 @@ type cache struct {
// whether object should be compressed.
compressFlags map[string]struct{}
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
flushCh chan objectInfo
flushingGuard *utilSync.KeyLocker[oid.Address]
// cancel is cancel function, protected by modeMtx in Close.
cancel atomic.Value
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
}
@ -43,40 +42,28 @@ type cache struct {
// wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache"
type objectInfo struct {
addr string
data []byte
obj *objectSDK.Object
}
const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
)
var (
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
var dummyCanceler context.CancelFunc = func() {}
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan objectInfo),
mode: mode.Disabled,
mode: mode.Disabled,
flushCh: make(chan objectInfo),
flushingGuard: utilSync.NewKeyLocker[oid.Address](),
compressFlags: make(map[string]struct{}),
options: options{
log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize,
workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
metrics: DefaultMetrics(),
log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize,
workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize,
openFile: os.OpenFile,
metrics: DefaultMetrics(),
counter: fstree.NewSimpleCounter(),
},
}
@ -111,7 +98,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
return metaerr.Wrap(err)
}
return metaerr.Wrap(c.initCounters())
c.estimateCacheSize()
return nil
}
// Init runs necessary services.
@ -138,14 +126,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
var err error
if c.db != nil {
err = c.db.Close()
if err != nil {
c.db = nil
}
}
c.metrics.Close()
return nil
}

View file

@ -2,7 +2,6 @@ package writecache
import (
"context"
"math"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -10,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -47,39 +45,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString()
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
dataSize = len(b.Get([]byte(saddr)))
return nil
})
if dataSize > 0 {
storageType = StorageTypeDB
var recordDeleted bool
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(saddr)
recordDeleted = b.Get(key) != nil
err := b.Delete(key)
return err
})
if err != nil {
return err
}
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
deleted = true
return nil
}
storageType = StorageTypeFSTree
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil {

View file

@ -1,13 +1,11 @@
package writecache
import (
"bytes"
"context"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
@ -16,159 +14,96 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second
defaultFlushInterval = 10 * time.Second
)
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush {
return
}
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall(ctx)
go c.workerFlush(ctx)
}
c.wg.Add(1)
go func() {
c.workerFlushBig(ctx)
c.wg.Done()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
c.estimateCacheSize()
case <-ctx.Done():
return
}
}
}()
go c.workerSelect(ctx)
}
func (c *cache) flushSmallObjects(ctx context.Context) {
var lastKey []byte
for {
select {
case <-ctx.Done():
return
default:
}
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
var k, v []byte
if len(lastKey) == 0 {
k, v = cs.First()
} else {
k, v = cs.Seek(lastKey)
if bytes.Equal(k, lastKey) {
k, v = cs.Next()
}
}
for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
if len(lastKey) == len(k) {
copy(lastKey, k)
} else {
lastKey = bytes.Clone(k)
}
m = append(m, objectInfo{
addr: string(k),
data: bytes.Clone(v),
})
}
return nil
})
var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- m[i]:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
}
c.modeMtx.RUnlock()
if count == 0 {
break
}
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
}
}
func (c *cache) workerFlushBig(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10)
func (c *cache) workerSelect(ctx context.Context) {
defer c.wg.Done()
tick := time.NewTicker(defaultFlushInterval)
for {
select {
case <-tick.C:
c.modeMtx.RLock()
if c.readOnly() || c.noMetabase() {
c.modeMtx.RUnlock()
break
var prm common.IteratePrm
prm.IgnoreErrors = true
prm.Handler = func(ie common.IterationElement) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
return ErrReadOnly
}
if c.noMetabase() {
return ErrDegraded
}
select {
case <-ctx.Done():
return ctx.Err()
case c.flushCh <- objectInfo{
data: ie.ObjectData,
address: ie.Address,
}:
return nil
}
}
_ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock()
_, _ = c.fsTree.Iterate(ctx, prm)
case <-ctx.Done():
return
}
}
}
func (c *cache) workerFlush(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
var obj objectSDK.Object
err := obj.Unmarshal(objInfo.data)
if err != nil {
c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err))
continue
}
err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDisk(ctx, objInfo.address)
}
}
func (c *cache) reportFlushError(msg string, addr string, err error) {
if c.reportError != nil {
c.reportError(msg, err)
@ -195,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree)
err = c.flushObject(ctx, e.Address, &obj, e.ObjectData)
if err != nil {
if ignoreErrors {
return nil
@ -211,39 +146,26 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(objInfo.addr, true)
}
}
// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error {
var err error
func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error {
c.flushingGuard.Lock(addr)
defer c.flushingGuard.Unlock(addr)
stPrm := meta.StorageIDPrm{Address: addr}
stRes, err := c.metabase.StorageID(ctx, stPrm)
if err != nil {
c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err)
return err
}
if stRes.StorageID != nil {
// already flushed
return nil
}
defer func() {
c.metrics.Flush(err == nil, st)
c.metrics.Flush(err == nil, StorageTypeFSTree)
}()
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
prm.Object = obj
prm.RawData = data
@ -258,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
return err
}
var updPrm meta.UpdateStorageIDPrm
updPrm.SetAddress(addr)
updPrm.SetStorageID(res.StorageID)
updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID}
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
if err != nil {
@ -300,74 +220,10 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
}
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
return err
}
var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
return c.flushFSTree(ctx, ignoreErrors)
}
type batchItem struct {
type objectInfo struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
address oid.Address
}

View file

@ -19,7 +19,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -31,7 +30,6 @@ func TestFlush(t *testing.T) {
append([]Option{
WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb),
WithBlobstor(bs),
WithDisableBackgroundFlush(),
@ -47,31 +45,6 @@ func TestFlush(t *testing.T) {
}
failures := []TestFailureInjector[Option]{
{
Desc: "db, invalid address",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
},
},
{
Desc: "db, invalid object",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
k := []byte(oidtest.Address().EncodeToString())
v := []byte{1, 2, 3}
return b.Put(k, v)
}))
},
},
{
Desc: "fs, read error",
InjectFn: func(t *testing.T, wc Cache) {
@ -253,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair {
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects {
var mPrm meta.StorageIDPrm
mPrm.SetAddress(objects[i].addr)
mPrm := meta.StorageIDPrm{Address: objects[i].addr}
mRes, err := mb.StorageID(context.Background(), mPrm)
require.NoError(t, err)
var prm common.GetPrm
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
prm.StorageID = mRes.StorageID
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)

View file

@ -1,7 +1,6 @@
package writecache
import (
"bytes"
"context"
"time"
@ -12,7 +11,6 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -37,11 +35,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, ErrDegraded
}
obj, err := c.getInternal(ctx, saddr, addr)
obj, err := c.getInternal(ctx, addr)
return obj, metaerr.Wrap(err)
}
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
found := false
storageType := StorageTypeUndefined
startedAt := time.Now()
@ -49,14 +47,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
c.metrics.Get(time.Since(startedAt), found, storageType)
}()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
storageType = StorageTypeDB
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
@ -87,34 +77,10 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
return nil, ErrDegraded
}
obj, err := c.getInternal(ctx, saddr, addr)
obj, err := c.getInternal(ctx, addr)
if err != nil {
return nil, metaerr.Wrap(err)
}
return obj.CutPayload(), nil
}
// Get fetches object from the underlying database.
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
if db == nil {
return nil, ErrNotInitialized
}
var value []byte
err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
value = b.Get(key)
if value == nil {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
value = bytes.Clone(value)
return nil
})
return value, metaerr.Wrap(err)
}

View file

@ -1,39 +1,28 @@
package writecache
import (
"errors"
"fmt"
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
var ErrNoDefaultBucket = errors.New("no default bucket")
func (c *cache) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Iterate",
trace.WithAttributes(
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
// It is assumed that db is an underlying database of some WriteCache instance.
//
// Returns ErrNoDefaultBucket if there is no default bucket in db.
//
// DB must not be nil and should be opened.
func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return ErrNoDefaultBucket
}
if !c.modeMtx.TryRLock() {
return common.IterateRes{}, ErrNotInitialized
}
defer c.modeMtx.RUnlock()
if c.mode.NoMetabase() {
return common.IterateRes{}, ErrDegraded
}
var addr oid.Address
return b.ForEach(func(k, _ []byte) error {
err := addr.DecodeString(string(k))
if err != nil {
return fmt.Errorf("could not parse object address: %w", err)
}
return f(addr)
})
}))
return c.fsTree.Iterate(ctx, prm)
}

View file

@ -14,7 +14,6 @@ func (t StorageType) String() string {
const (
StorageTypeUndefined StorageType = "null"
StorageTypeDB StorageType = "db"
StorageTypeFSTree StorageType = "fstree"
)
@ -26,9 +25,9 @@ type Metrics interface {
Flush(success bool, st StorageType)
Evict(st StorageType)
SetEstimateSize(db, fstree uint64)
SetEstimateSize(uint64)
SetMode(m mode.ComponentMode)
SetActualCounters(db, fstree uint64)
SetActualCounters(uint64)
SetPath(path string)
Close()
}
@ -47,11 +46,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {}
func (metricsStub) Put(time.Duration, bool, StorageType) {}
func (metricsStub) SetEstimateSize(uint64, uint64) {}
func (metricsStub) SetEstimateSize(uint64) {}
func (metricsStub) SetMode(mode.ComponentMode) {}
func (metricsStub) SetActualCounters(uint64, uint64) {}
func (metricsStub) SetActualCounters(uint64) {}
func (metricsStub) Flush(bool, StorageType) {}

View file

@ -2,10 +2,7 @@ package writecache
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
@ -25,7 +22,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
err := c.setMode(ctx, m, true)
err := c.setMode(ctx, m, !m.NoMetabase())
if err == nil {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
}
@ -44,20 +41,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err
}
}
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)
}
}
// Suspend producers to ensure there are channel send operations in fly.
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations.
for len(c.flushCh) != 0 {
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
time.Sleep(time.Second)
}
if turnOffMeta {
c.mode = m
return nil

View file

@ -1,30 +0,0 @@
package writecache
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"github.com/stretchr/testify/require"
)
func TestMode(t *testing.T) {
t.Parallel()
wc := New(
WithLogger(test.NewLogger(t)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()))
require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
require.NoError(t, wc.Open(context.Background(), mode.Degraded))
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init())
require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close())
}

View file

@ -3,8 +3,8 @@ package writecache
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -22,19 +22,15 @@ type options struct {
metabase Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel.
workersCount int
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
// maxCacheSize is the maximum total size of all objects saved in cache.
// 1 GiB by default.
maxCacheSize uint64
// objCounters contains atomic counters for the number of objects stored in cache.
objCounters counters
// maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database.
maxBatchDelay time.Duration
// maxCacheCount is the maximum total count of all objects saved in cache.
maxCacheCount uint64
// counter contains atomic counters for the number of objects stored in cache.
counter *fstree.SimpleCounter
// noSync is true iff FSTree allows unsynchronized writes.
noSync bool
// reportError is the function called when encountering disk errors in background workers.
@ -84,15 +80,6 @@ func WithMaxObjectSize(sz uint64) Option {
}
}
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option {
return func(o *options) {
if c > 0 {
@ -108,21 +95,10 @@ func WithMaxCacheSize(sz uint64) Option {
}
}
// WithMaxBatchSize sets max batch size for the small object database.
func WithMaxBatchSize(sz int) Option {
// WithMaxCacheCount sets maximum write-cache count of objects.
func WithMaxCacheCount(cnt uint64) Option {
return func(o *options) {
if sz > 0 {
o.maxBatchSize = sz
}
}
}
// WithMaxBatchDelay sets max batch delay for the small object database.
func WithMaxBatchDelay(d time.Duration) Option {
return func(o *options) {
if d > 0 {
o.maxBatchDelay = d
}
o.maxCacheCount = cnt
}
}

View file

@ -8,7 +8,6 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -50,65 +49,22 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
return common.PutRes{}, ErrBigObject
}
oi := objectInfo{
addr: prm.Address.EncodeToString(),
obj: prm.Object,
data: prm.RawData,
}
if sz <= c.smallObjectSize {
storageType = StorageTypeDB
err := c.putSmall(oi)
if err == nil {
added = true
}
return common.PutRes{}, err
}
storageType = StorageTypeFSTree
err := c.putBig(ctx, oi.addr, prm)
err := c.putBig(ctx, prm)
if err == nil {
added = true
}
return common.PutRes{}, metaerr.Wrap(err)
}
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
}
var newRecord bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(obj.addr)
newRecord = b.Get(key) == nil
if newRecord {
return b.Put(key, obj.data)
}
return nil
})
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(obj.addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
if newRecord {
c.objCounters.cDB.Add(1)
c.estimateCacheSize()
}
}
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
cacheSz := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeFS(cacheSz) {
func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
addr := prm.Address.EncodeToString()
estimatedObjSize := uint64(len(prm.RawData))
if estimatedObjSize == 0 {
estimatedObjSize = prm.Object.PayloadSize()
}
if !c.hasFreeSpace(estimatedObjSize) {
return ErrOutOfSpace
}

View file

@ -1,77 +1,20 @@
package writecache
import (
"fmt"
"math"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
)
func (c *cache) estimateCacheSize() uint64 {
dbCount := c.objCounters.DB()
fsCount := c.objCounters.FS()
if fsCount > 0 {
fsCount-- // db file
func (c *cache) estimateCacheSize() {
count, size := c.counter.Value()
var ucount, usize uint64
if count > 0 {
ucount = uint64(count)
}
dbSize := dbCount * c.smallObjectSize
fsSize := fsCount * c.maxObjectSize
c.metrics.SetEstimateSize(dbSize, fsSize)
c.metrics.SetActualCounters(dbCount, fsCount)
return dbSize + fsSize
}
func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.smallObjectSize
}
func (c *cache) incSizeFS(sz uint64) uint64 {
return sz + c.maxObjectSize
}
var _ fstree.FileCounter = &counters{}
type counters struct {
cDB, cFS atomic.Uint64
}
func (x *counters) DB() uint64 {
return x.cDB.Load()
}
func (x *counters) FS() uint64 {
return x.cFS.Load()
}
// Set implements fstree.ObjectCounter.
func (x *counters) Set(v uint64) {
x.cFS.Store(v)
}
// Inc implements fstree.ObjectCounter.
func (x *counters) Inc() {
x.cFS.Add(1)
}
// Dec implements fstree.ObjectCounter.
func (x *counters) Dec() {
x.cFS.Add(math.MaxUint64)
}
func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
inDB = uint64(b.Stats().KeyN)
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
if size > 0 {
usize = uint64(size)
}
c.objCounters.cDB.Store(inDB)
c.estimateCacheSize()
return nil
c.metrics.SetEstimateSize(ucount)
c.metrics.SetActualCounters(usize)
}
func (c *cache) hasFreeSpace(sz uint64) bool {
count, size := c.counter.Value()
return (size+int64(sz) <= int64(c.maxCacheSize)) &&
(c.maxCacheCount == 0 || count+1 <= int64(c.maxCacheCount))
}

View file

@ -3,7 +3,6 @@ package writecache
import (
"context"
"fmt"
"math"
"os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,49 +13,22 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(mod mode.ComponentMode) error {
err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil {
return err
}
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !mod.ReadOnly() {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New(
fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm),
fstree.WithDepth(1),
fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync),
fstree.WithFileCounter(&c.objCounters),
fstree.WithFileCounter(c.counter),
)
if err := c.fsTree.Open(mod); err != nil {
return fmt.Errorf("could not open FSTree: %w", err)
@ -68,41 +40,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return nil
}
func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool
var err error
if batched {
err = c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil {
c.metrics.Evict(StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
} else {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
}
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) {

View file

@ -1,20 +0,0 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -37,6 +37,7 @@ type Cache interface {
DumpInfo() Info
Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error
Iterate(context.Context, common.IteratePrm) (common.IterateRes, error)
Init() error
Open(ctx context.Context, mode mode.Mode) error
@ -54,6 +55,7 @@ type MainStorage interface {
// Metabase is the interface of the metabase used by Cache implementations.
type Metabase interface {
UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
}
var (