diff --git a/cmd/frostfs-lens/internal/meta/inspect.go b/cmd/frostfs-lens/internal/meta/inspect.go index 9eb60f966..205c71dc6 100644 --- a/cmd/frostfs-lens/internal/meta/inspect.go +++ b/cmd/frostfs-lens/internal/meta/inspect.go @@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) { db := openMeta(cmd) defer db.Close() - storageID := meta.StorageIDPrm{} - storageID.SetAddress(addr) - + storageID := meta.StorageIDPrm{Address: addr} resStorageID, err := db.StorageID(cmd.Context(), storageID) common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err)) - if id := resStorageID.StorageID(); id != nil { + if id := resStorageID.StorageID; id != nil { cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path()) } else { cmd.Printf("Object does not contain storageID\n\n") diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index afc986c8b..fbd760550 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -1,11 +1,10 @@ package writecache import ( - "os" - common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -23,18 +22,20 @@ func init() { } func inspectFunc(cmd *cobra.Command, _ []string) { - var data []byte + wc := writecache.New( + writecache.WithPath(vPath), + ) + common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly))) + defer wc.Close() - db, err := writecache.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() + var addr oid.Address + common.ExitOnErr(cmd, common.Errf("could not decode address: %w", addr.DecodeString(vAddress))) - data, err = writecache.Get(db, []byte(vAddress)) + obj, err := wc.Get(cmd.Context(), addr) common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) - var o objectSDK.Object - common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) - - common.PrintObjectHeader(cmd, o) + common.PrintObjectHeader(cmd, *obj) + data, err := obj.Marshal() + common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err)) common.WriteObjectToFile(cmd, vOut, data) } diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index bcbae0ec9..bb19cd108 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -3,11 +3,11 @@ package writecache import ( "fmt" "io" - "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" + blobstor "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -26,15 +26,18 @@ func listFunc(cmd *cobra.Command, _ []string) { // other targets can be supported w := cmd.OutOrStderr() - wAddr := func(addr oid.Address) error { - _, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) + wc := writecache.New( + writecache.WithPath(vPath), + ) + common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly))) + defer wc.Close() + + var prm blobstor.IteratePrm + prm.IgnoreErrors = true + prm.Handler = func(ie blobstor.IterationElement) error { + _, err := io.WriteString(w, fmt.Sprintf("%s\n", ie.Address)) return err } - - db, err := writecache.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() - - err = writecache.IterateDB(db, wAddr) - common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) + _, err := wc.Iterate(cmd.Context(), prm) + common.ExitOnErr(cmd, common.Errf("could not iterate write-cache: %w", err)) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 5b91e7819..446d7bb3e 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -146,12 +146,10 @@ type shardCfg struct { writecacheCfg struct { enabled bool path string - maxBatchSize int - maxBatchDelay time.Duration - smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int sizeLimit uint64 + countLimit uint64 noSync bool } @@ -269,12 +267,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.enabled = true wc.path = writeCacheCfg.Path() - wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() - wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxObjSize = writeCacheCfg.MaxObjectSize() - wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() + wc.countLimit = writeCacheCfg.CountLimit() wc.noSync = writeCacheCfg.NoSync() } } @@ -861,12 +857,10 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), - writecache.WithMaxBatchSize(wcRead.maxBatchSize), - writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxObjectSize(wcRead.maxObjSize), - writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), + writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log), ) diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 7473afefb..0f191fb37 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -74,10 +74,10 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, wc.NoSync()) require.Equal(t, "tmp/0/cache", wc.Path()) - require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 3221225472, wc.SizeLimit()) + require.EqualValues(t, 0, wc.CountLimit()) require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) @@ -129,10 +129,10 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, wc.NoSync()) require.Equal(t, "tmp/1/cache", wc.Path()) - require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 4294967296, wc.SizeLimit()) + require.EqualValues(t, 10000, wc.CountLimit()) require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index 5e31e04ad..9293ad5fe 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -10,9 +10,6 @@ import ( type Config config.Config const ( - // SmallSizeDefault is a default size of small objects. - SmallSizeDefault = 32 << 10 - // MaxSizeDefault is a default value of the object payload size limit. MaxSizeDefault = 64 << 20 @@ -51,22 +48,6 @@ func (x *Config) Path() string { return p } -// SmallObjectSize returns the value of "small_object_size" config parameter. -// -// Returns SmallSizeDefault if the value is not a positive number. -func (x *Config) SmallObjectSize() uint64 { - s := config.SizeInBytesSafe( - (*config.Config)(x), - "small_object_size", - ) - - if s > 0 { - return s - } - - return SmallSizeDefault -} - // MaxObjectSize returns the value of "max_object_size" config parameter. // // Returns MaxSizeDefault if the value is not a positive number. @@ -99,11 +80,20 @@ func (x *Config) WorkerCount() int { return WorkersNumberDefault } -// SizeLimit returns the value of "capacity" config parameter. +// SizeLimit returns the value of "capacity_size" or "capacity" config parameter. // // Returns SizeLimitDefault if the value is not a positive number. func (x *Config) SizeLimit() uint64 { c := config.SizeInBytesSafe( + (*config.Config)(x), + "capacity_size", + ) + + if c > 0 { + return c + } + + c = config.SizeInBytesSafe( (*config.Config)(x), "capacity", ) @@ -115,6 +105,16 @@ func (x *Config) SizeLimit() uint64 { return SizeLimitDefault } +// CountLimit returns the value of "capacity_count" config parameter. +// +// Returns 0 (means no limit) if the value is not a positive number. +func (x *Config) CountLimit() uint64 { + return config.SizeInBytesSafe( + (*config.Config)(x), + "capacity_count", + ) +} + // NoSync returns the value of "no_sync" config parameter. // // Returns false if the value is not a boolean. diff --git a/config/example/node.env b/config/example/node.env index 00190eb39..e49821c2b 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -101,7 +101,6 @@ FROSTFS_STORAGE_SHARD_0_MODE=read-only FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache -FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 @@ -155,10 +154,10 @@ FROSTFS_STORAGE_SHARD_1_MODE=read-write ### Write cache config FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache -FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384 FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30 -FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296 +FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_SIZE=4294967296 +FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_COUNT=10000 ### Metabase config FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644 diff --git a/config/example/node.json b/config/example/node.json index 9051d2bb7..a2b699703 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -145,7 +145,6 @@ "enabled": false, "no_sync": true, "path": "tmp/0/cache", - "small_object_size": 16384, "max_object_size": 134217728, "flush_worker_count": 30, "capacity": 3221225472 @@ -203,10 +202,10 @@ "enabled": true, "path": "tmp/1/cache", "memcache_capacity": 2147483648, - "small_object_size": 16384, "max_object_size": 134217728, "flush_worker_count": 30, - "capacity": 4294967296 + "capacity_size": 4294967296, + "capacity_count": 10000 }, "metabase": { "path": "tmp/1/meta", diff --git a/config/example/node.yaml b/config/example/node.yaml index bcc8552b3..50e46630c 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -125,7 +125,6 @@ storage: writecache: enabled: true - small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes flush_worker_count: 30 # number of write-cache flusher threads @@ -208,7 +207,8 @@ storage: 1: writecache: path: tmp/1/cache # write-cache root directory - capacity: 4 G # approximate write-cache total size, bytes + capacity_size: 4 G # approximate write-cache total size, bytes + capacity_count: 10000 metabase: path: tmp/1/meta # metabase path diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 4a6e5ba6d..f6657980a 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -286,21 +286,20 @@ metabase: writecache: enabled: true path: /path/to/writecache - capacity: 4294967296 - small_object_size: 16384 + capacity_size: 4294967296 + capacity_count: 100000 max_object_size: 134217728 flush_worker_count: 30 ``` -| Parameter | Type | Default value | Description | -|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| -| `path` | `string` | | Path to the metabase file. | -| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | -| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | -| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | -| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | -| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | +| Parameter | Type | Default value | Description | +|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------------------| +| `path` | `string` | | Path to the metabase file. | +| `capacity_size` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `capacity` | `size` | `1G` | The same as for `capacity`. Deprecated, use `capacity_size`. | +| `capacity_count` | `int` | unrestricted | Approximate maximum count of objects in the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | +| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | # `node` section diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 67f173f29..f73866861 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -278,7 +278,6 @@ const ( ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" - WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza" @@ -469,6 +468,7 @@ const ( FSTreeCantUnmarshalObject = "can't unmarshal an object" FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor" FSTreeCantUpdateID = "can't update object storage ID" + FSTreeCantGetID = "can't get object storage ID" FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" PutSingleRedirectFailure = "failed to redirect PutSingle request" StorageIDRetrievalFailure = "can't get storage ID from metabase" diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2e..c1c6ff6fa 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,23 @@ package fstree import ( - "math" "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size int64) + Inc(size int64) + Dec(size int64) + Value() (int64, int64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(int64, int64) {} +func (c *noopCounter) Inc(int64) {} +func (c *noopCounter) Dec(int64) {} +func (c *noopCounter) Value() (int64, int64) { return 0, 0 } func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + count atomic.Int64 + size atomic.Int64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size int64) { + c.count.Store(count) + c.size.Store(size) +} + +func (c *SimpleCounter) Inc(size int64) { + c.count.Add(1) + c.size.Add(size) +} + +func (c *SimpleCounter) Dec(size int64) { + c.count.Add(-1) + c.size.Add(-size) +} + +func (c *SimpleCounter) Value() (int64, int64) { + return c.count.Load(), c.size.Load() +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbfa..cf0affdb6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (int64, int64, error) { + var count int64 + var size int64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { if !d.IsDir() { - counter++ + count++ + fi, err := d.Info() + if err != nil { + return err + } + size += fi.Size() } return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index d633cbac3..f89df7405 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + counterValue, sizeValue := counter.Value() + require.Equal(t, int64(0), counterValue) + require.Equal(t, int64(0), sizeValue) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + counterValue, sizeValue = counter.Value() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) require.Equal(t, realCount, counterValue) + require.Equal(t, realSize, sizeValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b2622885..cf9941ec8 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(int64(len(data))) var targetFileExists bool - if _, e := os.Stat(p); e == nil { + s, e := os.Stat(p) + if e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(int64(s.Size())) } } else { err = os.Rename(tmpPath, p) @@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error { } func (w *genericWriter) removeFile(p string) error { - var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } - } else { - err = os.Remove(p) + return w.removeFileWithCounter(p) } + err := os.Remove(p) if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } return err } + +func (w *genericWriter) removeFileWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + s, err := os.Stat(p) + if err != nil && os.IsNotExist(err) { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + err = os.Remove(p) + if err == nil { + w.fileCounter.Dec(s.Size()) + } + return err +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3d..774e996f9 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -18,7 +18,8 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + counter FileCounter + counterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b } _ = unix.Close(fd) // Don't care about error. w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, + counterEnabled: counterEnabled(c), } return w } @@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.counter.Inc(int64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { + var s unix.Stat_t + if w.counterEnabled { + err := unix.Stat(p, &s) + if err != nil && err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + } err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.counter.Dec(s.Size) } return err } diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index da488260a..2edbd2d6b 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -166,8 +166,7 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) } -func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { - m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db) +func (m *writeCacheMetrics) SetEstimateSize(fstree uint64) { m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } @@ -175,8 +174,7 @@ func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) { m.metrics.SetMode(m.shardID, mod.String()) } -func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { - m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db) +func (m *writeCacheMetrics) SetActualCounters(fstree uint64) { m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 6d620b41a..657334d17 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -15,22 +15,12 @@ import ( // StorageIDPrm groups the parameters of StorageID operation. type StorageIDPrm struct { - addr oid.Address + Address oid.Address } // StorageIDRes groups the resulting values of StorageID operation. type StorageIDRes struct { - id []byte -} - -// SetAddress is a StorageID option to set the object address to check. -func (p *StorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// StorageID returns storage ID. -func (r StorageIDRes) StorageID() []byte { - return r.id + StorageID []byte } // StorageID returns storage descriptor for objects from the blobstor. @@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes _, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), + attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() @@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes } err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.id, err = db.storageID(tx, prm.addr) + res.StorageID, err = db.storageID(tx, prm.Address) return err }) @@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. type UpdateStorageIDPrm struct { - addr oid.Address - id []byte + Address oid.Address + StorageID []byte } // UpdateStorageIDRes groups the resulting values of UpdateStorageID operation. type UpdateStorageIDRes struct{} -// SetAddress is an UpdateStorageID option to set the object address to check. -func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// SetStorageID is an UpdateStorageID option to set the storage ID. -func (p *UpdateStorageIDPrm) SetStorageID(id []byte) { - p.id = id -} - // UpdateStorageID updates storage descriptor for objects from the blobstor. func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) { var ( @@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res _, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), - attribute.String("storage_id", string(prm.id)), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), )) defer span.End() @@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res } err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return setStorageID(tx, prm.addr, prm.id, true) + return setStorageID(tx, prm.Address, prm.StorageID, true) }) success = err == nil return res, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/storage_id_test.go b/pkg/local_object_storage/metabase/storage_id_test.go index aaf6480ab..d6e4a2290 100644 --- a/pkg/local_object_storage/metabase/storage_id_test.go +++ b/pkg/local_object_storage/metabase/storage_id_test.go @@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) { } func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error { - var sidPrm meta.UpdateStorageIDPrm - sidPrm.SetAddress(addr) - sidPrm.SetStorageID(id) - + sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id} _, err := db.UpdateStorageID(context.Background(), sidPrm) return err } func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) { - var sidPrm meta.StorageIDPrm - sidPrm.SetAddress(addr) - + sidPrm := meta.StorageIDPrm{Address: addr} r, err := db.StorageID(context.Background(), sidPrm) - return r.StorageID(), err + return r.StorageID, err } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index c898fdf41..4b57f82b6 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr } func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error { - var sPrm meta.StorageIDPrm - sPrm.SetAddress(addr) - + sPrm := meta.StorageIDPrm{Address: addr} res, err := s.metaBase.StorageID(ctx, sPrm) if err != nil { s.log.Debug(logs.StorageIDRetrievalFailure, @@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - storageID := res.StorageID() + storageID := res.StorageID if storageID == nil { // if storageID is nil it means: // 1. there is no such object diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index 3993593ad..5e4b9b5a2 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { require.True(t, client.IsErrObjectNotFound(err), "invalid error type") // storageID - var metaStIDPrm meta.StorageIDPrm - metaStIDPrm.SetAddress(addr) + metaStIDPrm := meta.StorageIDPrm{Address: addr} storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) require.NoError(t, err, "failed to get storage ID") // check existence in blobstore var bsExisted common.ExistsPrm bsExisted.Address = addr - bsExisted.StorageID = storageID.StorageID() + bsExisted.StorageID = storageID.StorageID exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) require.NoError(t, err, "failed to check blobstore existence") require.True(t, exRes.Exists, "invalid blobstore existence result") @@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { // drop from blobstor var bsDeletePrm common.DeletePrm bsDeletePrm.Address = addr - bsDeletePrm.StorageID = storageID.StorageID() + bsDeletePrm.StorageID = storageID.StorageID _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) require.NoError(t, err, "failed to delete from blobstore") diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 2e7c84bcd..8b43b4fc3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta return res, false, err } - var mPrm meta.StorageIDPrm - mPrm.SetAddress(addr) + mPrm := meta.StorageIDPrm{Address: addr} mExRes, err := s.metaBase.StorageID(ctx, mPrm) if err != nil { return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } - storageID := mExRes.StorageID() + storageID := mExRes.StorageID if storageID == nil { // `nil` storageID returned without any error // means that object is big, `cb` expects an diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go index f18573c57..06a254319 100644 --- a/pkg/local_object_storage/shard/rebuilder.go +++ b/pkg/local_object_storage/shard/rebuilder.go @@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres return errMBIsNotAvailable } - var prm meta.UpdateStorageIDPrm - prm.SetAddress(addr) - prm.SetStorageID(storageID) + prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID} _, err := u.mb.UpdateStorageID(ctx, prm) return err } diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index c1c0e88b3..495f66b0d 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -2,6 +2,7 @@ package benchmark import ( "context" + "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,6 +11,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) { require.NoError(b, cache.Init(), "initializing") } -type testMetabase struct{} +type testMetabase struct { + storageIDs map[oid.Address][]byte + guard *sync.RWMutex +} -func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { +func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { + t.guard.Lock() + defer t.guard.Unlock() + t.storageIDs[prm.Address] = prm.StorageID return meta.UpdateStorageIDRes{}, nil } +func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) { + t.guard.RLock() + defer t.guard.RUnlock() + + if id, found := t.storageIDs[prm.Address]; found { + return meta.StorageIDRes{ + StorageID: id, + }, nil + } + return meta.StorageIDRes{}, nil +} + func newCache(b *testing.B) writecache.Cache { bs := teststore.New( teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), @@ -93,8 +113,7 @@ func newCache(b *testing.B) writecache.Cache { return writecache.New( writecache.WithPath(b.TempDir()), writecache.WithBlobstor(bs), - writecache.WithMetabase(testMetabase{}), + writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}), writecache.WithMaxCacheSize(256<<30), - writecache.WithSmallObjectSize(128<<10), ) } diff --git a/pkg/local_object_storage/writecache/cachebbolt.go b/pkg/local_object_storage/writecache/cache.go similarity index 70% rename from pkg/local_object_storage/writecache/cachebbolt.go rename to pkg/local_object_storage/writecache/cache.go index cdd4ed442..a6d055dba 100644 --- a/pkg/local_object_storage/writecache/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -10,8 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "go.etcd.io/bbolt" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -28,14 +28,13 @@ type cache struct { // whether object should be compressed. compressFlags map[string]struct{} - // flushCh is a channel with objects to flush. - flushCh chan objectInfo + flushCh chan objectInfo + flushingGuard *utilSync.KeyLocker[oid.Address] + // cancel is cancel function, protected by modeMtx in Close. cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup - // store contains underlying database. - store // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -43,40 +42,28 @@ type cache struct { // wcStorageType is used for write-cache operations logging. const wcStorageType = "write-cache" -type objectInfo struct { - addr string - data []byte - obj *objectSDK.Object -} - const ( - defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB - defaultSmallObjectSize = 32 * 1024 // 32 KiB - defaultMaxCacheSize = 1 << 30 // 1 GiB + defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB + defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var ( - defaultBucket = []byte{0} - dummyCanceler context.CancelFunc = func() {} -) +var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - flushCh: make(chan objectInfo), - mode: mode.Disabled, - + mode: mode.Disabled, + flushCh: make(chan objectInfo), + flushingGuard: utilSync.NewKeyLocker[oid.Address](), compressFlags: make(map[string]struct{}), options: options{ - log: &logger.Logger{Logger: zap.NewNop()}, - maxObjectSize: defaultMaxObjectSize, - smallObjectSize: defaultSmallObjectSize, - workersCount: defaultFlushWorkersCount, - maxCacheSize: defaultMaxCacheSize, - maxBatchSize: bbolt.DefaultMaxBatchSize, - maxBatchDelay: bbolt.DefaultMaxBatchDelay, - openFile: os.OpenFile, - metrics: DefaultMetrics(), + log: &logger.Logger{Logger: zap.NewNop()}, + maxObjectSize: defaultMaxObjectSize, + workersCount: defaultFlushWorkersCount, + maxCacheSize: defaultMaxCacheSize, + openFile: os.OpenFile, + metrics: DefaultMetrics(), + counter: fstree.NewSimpleCounter(), }, } @@ -111,7 +98,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { return metaerr.Wrap(err) } - return metaerr.Wrap(c.initCounters()) + c.estimateCacheSize() + return nil } // Init runs necessary services. @@ -138,14 +126,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - - var err error - if c.db != nil { - err = c.db.Close() - if err != nil { - c.db = nil - } - } c.metrics.Close() return nil } diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index b1a0511ee..e3034e1dc 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,7 +2,6 @@ package writecache import ( "context" - "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -47,39 +45,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { saddr := addr.EncodeToString() - var dataSize int - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - dataSize = len(b.Get([]byte(saddr))) - return nil - }) - - if dataSize > 0 { - storageType = StorageTypeDB - var recordDeleted bool - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(saddr) - recordDeleted = b.Get(key) != nil - err := b.Delete(key) - return err - }) - if err != nil { - return err - } - storagelog.Write(c.log, - storagelog.AddressField(saddr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - deleted = true - return nil - } - storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index da7feda9a..105d7a189 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,13 +1,11 @@ package writecache import ( - "bytes" "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -16,159 +14,96 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/mr-tron/base58" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( - // flushBatchSize is amount of keys which will be read from cache to be flushed - // to the main storage. It is used to reduce contention between cache put - // and cache persist. - flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. - defaultFlushInterval = time.Second + defaultFlushInterval = 10 * time.Second ) -var errIterationCompleted = errors.New("iteration completed") - // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } + for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.workerFlushSmall(ctx) + go c.workerFlush(ctx) } c.wg.Add(1) - go func() { - c.workerFlushBig(ctx) - c.wg.Done() - }() - - c.wg.Add(1) - go func() { - defer c.wg.Done() - - tt := time.NewTimer(defaultFlushInterval) - defer tt.Stop() - - for { - select { - case <-tt.C: - c.flushSmallObjects(ctx) - tt.Reset(defaultFlushInterval) - c.estimateCacheSize() - case <-ctx.Done(): - return - } - } - }() + go c.workerSelect(ctx) } -func (c *cache) flushSmallObjects(ctx context.Context) { - var lastKey []byte - for { - select { - case <-ctx.Done(): - return - default: - } - - var m []objectInfo - - c.modeMtx.RLock() - if c.readOnly() { - c.modeMtx.RUnlock() - time.Sleep(time.Second) - continue - } - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - - var k, v []byte - - if len(lastKey) == 0 { - k, v = cs.First() - } else { - k, v = cs.Seek(lastKey) - if bytes.Equal(k, lastKey) { - k, v = cs.Next() - } - } - - for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { - if len(lastKey) == len(k) { - copy(lastKey, k) - } else { - lastKey = bytes.Clone(k) - } - - m = append(m, objectInfo{ - addr: string(k), - data: bytes.Clone(v), - }) - } - return nil - }) - - var count int - for i := range m { - obj := objectSDK.New() - if err := obj.Unmarshal(m[i].data); err != nil { - continue - } - m[i].obj = obj - - count++ - select { - case c.flushCh <- m[i]: - case <-ctx.Done(): - c.modeMtx.RUnlock() - return - } - } - - c.modeMtx.RUnlock() - if count == 0 { - break - } - - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, - zap.Int("count", count), - zap.String("start", base58.Encode(lastKey))) - } -} - -func (c *cache) workerFlushBig(ctx context.Context) { - tick := time.NewTicker(defaultFlushInterval * 10) +func (c *cache) workerSelect(ctx context.Context) { + defer c.wg.Done() + tick := time.NewTicker(defaultFlushInterval) for { select { case <-tick.C: - c.modeMtx.RLock() - if c.readOnly() || c.noMetabase() { - c.modeMtx.RUnlock() - break + var prm common.IteratePrm + prm.IgnoreErrors = true + prm.Handler = func(ie common.IterationElement) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.readOnly() { + return ErrReadOnly + } + if c.noMetabase() { + return ErrDegraded + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.flushCh <- objectInfo{ + data: ie.ObjectData, + address: ie.Address, + }: + return nil + } } - - _ = c.flushFSTree(ctx, true) - - c.modeMtx.RUnlock() + _, _ = c.fsTree.Iterate(ctx, prm) case <-ctx.Done(): return } } } +func (c *cache) workerFlush(ctx context.Context) { + defer c.wg.Done() + + var objInfo objectInfo + for { + select { + case objInfo = <-c.flushCh: + case <-ctx.Done(): + return + } + + var obj objectSDK.Object + err := obj.Unmarshal(objInfo.data) + if err != nil { + c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err)) + continue + } + + err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDisk(ctx, objInfo.address) + } +} + func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) @@ -195,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) + err = c.flushObject(ctx, e.Address, &obj, e.ObjectData) if err != nil { if ignoreErrors { return nil @@ -211,39 +146,26 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } -// workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall(ctx context.Context) { - defer c.wg.Done() - - var objInfo objectInfo - for { - // Give priority to direct put. - select { - case objInfo = <-c.flushCh: - case <-ctx.Done(): - return - } - - err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDB(objInfo.addr, true) - } -} - // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { - var err error +func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error { + c.flushingGuard.Lock(addr) + defer c.flushingGuard.Unlock(addr) + + stPrm := meta.StorageIDPrm{Address: addr} + stRes, err := c.metabase.StorageID(ctx, stPrm) + if err != nil { + c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err) + return err + } + if stRes.StorageID != nil { + // already flushed + return nil + } defer func() { - c.metrics.Flush(err == nil, st) + c.metrics.Flush(err == nil, StorageTypeFSTree) }() - addr := objectCore.AddressOf(obj) - var prm common.PutPrm prm.Object = obj prm.RawData = data @@ -258,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b return err } - var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(addr) - updPrm.SetStorageID(res.StorageID) + updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID} _, err = c.metabase.UpdateStorageID(ctx, updPrm) if err != nil { @@ -300,74 +220,10 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - if err := c.flushFSTree(ctx, ignoreErrors); err != nil { - return err - } - - var last string - for { - batch, err := c.readNextDBBatch(ignoreErrors, last) - if err != nil { - return err - } - if len(batch) == 0 { - break - } - for _, item := range batch { - var obj objectSDK.Object - if err := obj.Unmarshal(item.data); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return err - } - c.deleteFromDB(item.address, false) - } - last = batch[len(batch)-1].address - } - return nil + return c.flushFSTree(ctx, ignoreErrors) } -type batchItem struct { +type objectInfo struct { data []byte - address string -} - -func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { - const batchSize = 100 - var batch []batchItem - err := c.db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { - continue - } - if err := addr.DecodeString(sa); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } - } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil - } - return nil, err + address oid.Address } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 3c951bebe..3038f6470 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -19,7 +19,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -31,7 +30,6 @@ func TestFlush(t *testing.T) { append([]Option{ WithLogger(testlogger), WithPath(filepath.Join(t.TempDir(), "writecache")), - WithSmallObjectSize(smallSize), WithMetabase(mb), WithBlobstor(bs), WithDisableBackgroundFlush(), @@ -47,31 +45,6 @@ func TestFlush(t *testing.T) { } failures := []TestFailureInjector[Option]{ - { - Desc: "db, invalid address", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) - }, - }, - { - Desc: "db, invalid object", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - k := []byte(oidtest.Address().EncodeToString()) - v := []byte{1, 2, 3} - return b.Put(k, v) - })) - }, - }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { @@ -253,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair { func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { for i := range objects { - var mPrm meta.StorageIDPrm - mPrm.SetAddress(objects[i].addr) - + mPrm := meta.StorageIDPrm{Address: objects[i].addr} mRes, err := mb.StorageID(context.Background(), mPrm) require.NoError(t, err) var prm common.GetPrm prm.Address = objects[i].addr - prm.StorageID = mRes.StorageID() + prm.StorageID = mRes.StorageID res, err := bs.Get(context.Background(), prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index bf26833bd..67e360d21 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -1,7 +1,6 @@ package writecache import ( - "bytes" "context" "time" @@ -12,7 +11,6 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -37,11 +35,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) return obj, metaerr.Wrap(err) } -func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { found := false storageType := StorageTypeUndefined startedAt := time.Now() @@ -49,14 +47,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) c.metrics.Get(time.Since(startedAt), found, storageType) }() - value, err := Get(c.db, []byte(saddr)) - if err == nil { - obj := objectSDK.New() - found = true - storageType = StorageTypeDB - return obj, obj.Unmarshal(value) - } - res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) @@ -87,34 +77,10 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) if err != nil { return nil, metaerr.Wrap(err) } return obj.CutPayload(), nil } - -// Get fetches object from the underlying database. -// Key should be a stringified address. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. -func Get(db *bbolt.DB, key []byte) ([]byte, error) { - if db == nil { - return nil, ErrNotInitialized - } - var value []byte - err := db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } - value = b.Get(key) - if value == nil { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - value = bytes.Clone(value) - return nil - }) - return value, metaerr.Wrap(err) -} diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 9ec039f91..a0268c09a 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -1,39 +1,28 @@ package writecache import ( - "errors" - "fmt" + "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) -// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. -var ErrNoDefaultBucket = errors.New("no default bucket") +func (c *cache) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Iterate", + trace.WithAttributes( + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() -// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. -// It is assumed that db is an underlying database of some WriteCache instance. -// -// Returns ErrNoDefaultBucket if there is no default bucket in db. -// -// DB must not be nil and should be opened. -func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { - return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } + if !c.modeMtx.TryRLock() { + return common.IterateRes{}, ErrNotInitialized + } + defer c.modeMtx.RUnlock() + if c.mode.NoMetabase() { + return common.IterateRes{}, ErrDegraded + } - var addr oid.Address - - return b.ForEach(func(k, _ []byte) error { - err := addr.DecodeString(string(k)) - if err != nil { - return fmt.Errorf("could not parse object address: %w", err) - } - - return f(addr) - }) - })) + return c.fsTree.Iterate(ctx, prm) } diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index e68b6d8be..7f9dc6f71 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -14,7 +14,6 @@ func (t StorageType) String() string { const ( StorageTypeUndefined StorageType = "null" - StorageTypeDB StorageType = "db" StorageTypeFSTree StorageType = "fstree" ) @@ -26,9 +25,9 @@ type Metrics interface { Flush(success bool, st StorageType) Evict(st StorageType) - SetEstimateSize(db, fstree uint64) + SetEstimateSize(uint64) SetMode(m mode.ComponentMode) - SetActualCounters(db, fstree uint64) + SetActualCounters(uint64) SetPath(path string) Close() } @@ -47,11 +46,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {} -func (metricsStub) SetEstimateSize(uint64, uint64) {} +func (metricsStub) SetEstimateSize(uint64) {} func (metricsStub) SetMode(mode.ComponentMode) {} -func (metricsStub) SetActualCounters(uint64, uint64) {} +func (metricsStub) SetActualCounters(uint64) {} func (metricsStub) Flush(bool, StorageType) {} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 4172cfbc8..a55c7d967 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -2,10 +2,7 @@ package writecache import ( "context" - "fmt" - "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" @@ -25,7 +22,7 @@ func (c *cache) SetMode(m mode.Mode) error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - err := c.setMode(ctx, m, true) + err := c.setMode(ctx, m, !m.NoMetabase()) if err == nil { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m)) } @@ -44,20 +41,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err } } - if c.db != nil { - if err = c.db.Close(); err != nil { - return fmt.Errorf("can't close write-cache database: %w", err) - } - } - - // Suspend producers to ensure there are channel send operations in fly. - // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty - // guarantees that there are no in-fly operations. - for len(c.flushCh) != 0 { - c.log.Info(logs.WritecacheWaitingForChannelsToFlush) - time.Sleep(time.Second) - } - if turnOffMeta { c.mode = m return nil diff --git a/pkg/local_object_storage/writecache/mode_test.go b/pkg/local_object_storage/writecache/mode_test.go deleted file mode 100644 index f684c15bc..000000000 --- a/pkg/local_object_storage/writecache/mode_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package writecache - -import ( - "context" - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - "github.com/stretchr/testify/require" -) - -func TestMode(t *testing.T) { - t.Parallel() - wc := New( - WithLogger(test.NewLogger(t)), - WithFlushWorkersCount(2), - WithPath(t.TempDir())) - - require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Close()) - - require.NoError(t, wc.Open(context.Background(), mode.Degraded)) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Close()) -} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index c8eb1bc45..9b242afdf 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -3,8 +3,8 @@ package writecache import ( "io/fs" "os" - "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -22,19 +22,15 @@ type options struct { metabase Metabase // maxObjectSize is the maximum size of the object stored in the write-cache. maxObjectSize uint64 - // smallObjectSize is the maximum size of the object stored in the database. - smallObjectSize uint64 // workersCount is the number of workers flushing objects in parallel. workersCount int - // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). + // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters - // maxBatchSize is the maximum batch size for the small object database. - maxBatchSize int - // maxBatchDelay is the maximum batch wait time for the small object database. - maxBatchDelay time.Duration + // maxCacheCount is the maximum total count of all objects saved in cache. + maxCacheCount uint64 + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. @@ -84,15 +80,6 @@ func WithMaxObjectSize(sz uint64) Option { } } -// WithSmallObjectSize sets maximum object size to be stored in write-cache. -func WithSmallObjectSize(sz uint64) Option { - return func(o *options) { - if sz > 0 { - o.smallObjectSize = sz - } - } -} - func WithFlushWorkersCount(c int) Option { return func(o *options) { if c > 0 { @@ -108,21 +95,10 @@ func WithMaxCacheSize(sz uint64) Option { } } -// WithMaxBatchSize sets max batch size for the small object database. -func WithMaxBatchSize(sz int) Option { +// WithMaxCacheCount sets maximum write-cache count of objects. +func WithMaxCacheCount(cnt uint64) Option { return func(o *options) { - if sz > 0 { - o.maxBatchSize = sz - } - } -} - -// WithMaxBatchDelay sets max batch delay for the small object database. -func WithMaxBatchDelay(d time.Duration) Option { - return func(o *options) { - if d > 0 { - o.maxBatchDelay = d - } + o.maxCacheCount = cnt } } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 0e419f95b..9f60972d3 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,7 +8,6 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -50,65 +49,22 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, ErrBigObject } - oi := objectInfo{ - addr: prm.Address.EncodeToString(), - obj: prm.Object, - data: prm.RawData, - } - - if sz <= c.smallObjectSize { - storageType = StorageTypeDB - err := c.putSmall(oi) - if err == nil { - added = true - } - return common.PutRes{}, err - } - storageType = StorageTypeFSTree - err := c.putBig(ctx, oi.addr, prm) + err := c.putBig(ctx, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } -// putSmall persists small objects to the write-cache database and -// pushes the to the flush workers queue. -func (c *cache) putSmall(obj objectInfo) error { - cacheSize := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeDB(cacheSize) { - return ErrOutOfSpace - } - - var newRecord bool - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(obj.addr) - newRecord = b.Get(key) == nil - if newRecord { - return b.Put(key, obj.data) - } - return nil - }) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(obj.addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db PUT"), - ) - if newRecord { - c.objCounters.cDB.Add(1) - c.estimateCacheSize() - } - } - return err -} - // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { - cacheSz := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeFS(cacheSz) { +func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { + addr := prm.Address.EncodeToString() + estimatedObjSize := uint64(len(prm.RawData)) + if estimatedObjSize == 0 { + estimatedObjSize = prm.Object.PayloadSize() + } + if !c.hasFreeSpace(estimatedObjSize) { return ErrOutOfSpace } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index bc75aaf27..0d2e74302 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,77 +1,20 @@ package writecache -import ( - "fmt" - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "go.etcd.io/bbolt" -) - -func (c *cache) estimateCacheSize() uint64 { - dbCount := c.objCounters.DB() - fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file +func (c *cache) estimateCacheSize() { + count, size := c.counter.Value() + var ucount, usize uint64 + if count > 0 { + ucount = uint64(count) } - dbSize := dbCount * c.smallObjectSize - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(dbSize, fsSize) - c.metrics.SetActualCounters(dbCount, fsCount) - return dbSize + fsSize -} - -func (c *cache) incSizeDB(sz uint64) uint64 { - return sz + c.smallObjectSize -} - -func (c *cache) incSizeFS(sz uint64) uint64 { - return sz + c.maxObjectSize -} - -var _ fstree.FileCounter = &counters{} - -type counters struct { - cDB, cFS atomic.Uint64 -} - -func (x *counters) DB() uint64 { - return x.cDB.Load() -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) -} - -func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) + if size > 0 { + usize = uint64(size) } - c.objCounters.cDB.Store(inDB) - c.estimateCacheSize() - return nil + c.metrics.SetEstimateSize(ucount) + c.metrics.SetActualCounters(usize) +} + +func (c *cache) hasFreeSpace(sz uint64) bool { + count, size := c.counter.Value() + return (size+int64(sz) <= int64(c.maxCacheSize)) && + (c.maxCacheCount == 0 || count+1 <= int64(c.maxCacheCount)) } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index caf997af8..6aface7a5 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -3,7 +3,6 @@ package writecache import ( "context" "fmt" - "math" "os" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -14,49 +13,22 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *bbolt.DB -} - -const dbName = "small.bolt" - func (c *cache) openStore(mod mode.ComponentMode) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { return err } - c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile) - if err != nil { - return fmt.Errorf("could not open database: %w", err) - } - - c.db.MaxBatchSize = c.maxBatchSize - c.db.MaxBatchDelay = c.maxBatchDelay - - if !mod.ReadOnly() { - err = c.db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(defaultBucket) - return err - }) - if err != nil { - return fmt.Errorf("could not create default bucket: %w", err) - } - } - c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err) @@ -68,41 +40,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDB(key string, batched bool) { - var recordDeleted bool - var err error - if batched { - err = c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } else { - err = c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } - - if err == nil { - c.metrics.Evict(StorageTypeDB) - storagelog.Write(c.log, - storagelog.AddressField(key), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - } else { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) - } -} - func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err != nil && !client.IsErrObjectNotFound(err) { diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go deleted file mode 100644 index 0ed4a954e..000000000 --- a/pkg/local_object_storage/writecache/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package writecache - -import ( - "io/fs" - "os" - "path/filepath" - "time" - - "go.etcd.io/bbolt" -) - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 71dba61cf..960137dfb 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -37,6 +37,7 @@ type Cache interface { DumpInfo() Info Flush(context.Context, bool, bool) error Seal(context.Context, bool) error + Iterate(context.Context, common.IteratePrm) (common.IterateRes, error) Init() error Open(ctx context.Context, mode mode.Mode) error @@ -54,6 +55,7 @@ type MainStorage interface { // Metabase is the interface of the metabase used by Cache implementations. type Metabase interface { UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) + StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error) } var (