From c180795f9b2fcfc3153e76923452221eeb28e87f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 23 Jul 2024 11:06:45 +0300 Subject: [PATCH 1/4] [#1273] writecache: Make writecache as FSTree only Signed-off-by: Dmitrii Stepanov --- .../internal/writecache/inspect.go | 25 ++- cmd/frostfs-lens/internal/writecache/list.go | 25 ++- cmd/frostfs-node/config.go | 9 - cmd/frostfs-node/config/engine/config_test.go | 2 - .../config/engine/shard/writecache/config.go | 19 -- config/example/node.env | 2 - config/example/node.json | 2 - config/example/node.yaml | 1 - docs/storage-node-configuration.md | 4 - internal/logs/logs.go | 1 - pkg/local_object_storage/engine/writecache.go | 6 +- .../writecache/benchmark/writecache_test.go | 1 - .../writecache/{cachebbolt.go => cache.go} | 51 +---- pkg/local_object_storage/writecache/delete.go | 35 --- pkg/local_object_storage/writecache/flush.go | 208 +----------------- .../writecache/flush_test.go | 27 --- pkg/local_object_storage/writecache/get.go | 40 +--- .../writecache/iterate.go | 49 ++--- .../writecache/metrics.go | 9 +- pkg/local_object_storage/writecache/mode.go | 19 +- .../writecache/mode_test.go | 30 --- .../writecache/options.go | 36 +-- pkg/local_object_storage/writecache/put.go | 53 +---- pkg/local_object_storage/writecache/state.go | 37 +--- .../writecache/storage.go | 63 ------ pkg/local_object_storage/writecache/util.go | 20 -- .../writecache/writecache.go | 1 + 27 files changed, 80 insertions(+), 695 deletions(-) rename pkg/local_object_storage/writecache/{cachebbolt.go => cache.go} (70%) delete mode 100644 pkg/local_object_storage/writecache/mode_test.go delete mode 100644 pkg/local_object_storage/writecache/util.go diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index afc986c8b..fbd760550 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -1,11 +1,10 @@ package writecache import ( - "os" - common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -23,18 +22,20 @@ func init() { } func inspectFunc(cmd *cobra.Command, _ []string) { - var data []byte + wc := writecache.New( + writecache.WithPath(vPath), + ) + common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly))) + defer wc.Close() - db, err := writecache.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() + var addr oid.Address + common.ExitOnErr(cmd, common.Errf("could not decode address: %w", addr.DecodeString(vAddress))) - data, err = writecache.Get(db, []byte(vAddress)) + obj, err := wc.Get(cmd.Context(), addr) common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) - var o objectSDK.Object - common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) - - common.PrintObjectHeader(cmd, o) + common.PrintObjectHeader(cmd, *obj) + data, err := obj.Marshal() + common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err)) common.WriteObjectToFile(cmd, vOut, data) } diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index bcbae0ec9..bb19cd108 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -3,11 +3,11 @@ package writecache import ( "fmt" "io" - "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" + blobstor "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -26,15 +26,18 @@ func listFunc(cmd *cobra.Command, _ []string) { // other targets can be supported w := cmd.OutOrStderr() - wAddr := func(addr oid.Address) error { - _, err := io.WriteString(w, fmt.Sprintf("%s\n", addr)) + wc := writecache.New( + writecache.WithPath(vPath), + ) + common.ExitOnErr(cmd, common.Errf("could not open write-cache: %w", wc.Open(cmd.Context(), mode.ReadOnly))) + defer wc.Close() + + var prm blobstor.IteratePrm + prm.IgnoreErrors = true + prm.Handler = func(ie blobstor.IterationElement) error { + _, err := io.WriteString(w, fmt.Sprintf("%s\n", ie.Address)) return err } - - db, err := writecache.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() - - err = writecache.IterateDB(db, wAddr) - common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) + _, err := wc.Iterate(cmd.Context(), prm) + common.ExitOnErr(cmd, common.Errf("could not iterate write-cache: %w", err)) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 5b91e7819..03b67e53a 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -146,9 +146,6 @@ type shardCfg struct { writecacheCfg struct { enabled bool path string - maxBatchSize int - maxBatchDelay time.Duration - smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int sizeLimit uint64 @@ -269,10 +266,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.enabled = true wc.path = writeCacheCfg.Path() - wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() - wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxObjSize = writeCacheCfg.MaxObjectSize() - wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() @@ -861,10 +855,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), - writecache.WithMaxBatchSize(wcRead.maxBatchSize), - writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxObjectSize(wcRead.maxObjSize), - writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithNoSync(wcRead.noSync), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 7473afefb..a4a102229 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -74,7 +74,6 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, wc.NoSync()) require.Equal(t, "tmp/0/cache", wc.Path()) - require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 3221225472, wc.SizeLimit()) @@ -129,7 +128,6 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, wc.NoSync()) require.Equal(t, "tmp/1/cache", wc.Path()) - require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 4294967296, wc.SizeLimit()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index 5e31e04ad..1d4fbce9b 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -10,9 +10,6 @@ import ( type Config config.Config const ( - // SmallSizeDefault is a default size of small objects. - SmallSizeDefault = 32 << 10 - // MaxSizeDefault is a default value of the object payload size limit. MaxSizeDefault = 64 << 20 @@ -51,22 +48,6 @@ func (x *Config) Path() string { return p } -// SmallObjectSize returns the value of "small_object_size" config parameter. -// -// Returns SmallSizeDefault if the value is not a positive number. -func (x *Config) SmallObjectSize() uint64 { - s := config.SizeInBytesSafe( - (*config.Config)(x), - "small_object_size", - ) - - if s > 0 { - return s - } - - return SmallSizeDefault -} - // MaxObjectSize returns the value of "max_object_size" config parameter. // // Returns MaxSizeDefault if the value is not a positive number. diff --git a/config/example/node.env b/config/example/node.env index 00190eb39..68a0503b3 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -101,7 +101,6 @@ FROSTFS_STORAGE_SHARD_0_MODE=read-only FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED=false FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache -FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 @@ -155,7 +154,6 @@ FROSTFS_STORAGE_SHARD_1_MODE=read-write ### Write cache config FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache -FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384 FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296 diff --git a/config/example/node.json b/config/example/node.json index 9051d2bb7..7d4b34c7b 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -145,7 +145,6 @@ "enabled": false, "no_sync": true, "path": "tmp/0/cache", - "small_object_size": 16384, "max_object_size": 134217728, "flush_worker_count": 30, "capacity": 3221225472 @@ -203,7 +202,6 @@ "enabled": true, "path": "tmp/1/cache", "memcache_capacity": 2147483648, - "small_object_size": 16384, "max_object_size": 134217728, "flush_worker_count": 30, "capacity": 4294967296 diff --git a/config/example/node.yaml b/config/example/node.yaml index bcc8552b3..cd13fa836 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -125,7 +125,6 @@ storage: writecache: enabled: true - small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes flush_worker_count: 30 # number of write-cache flusher threads diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 4a6e5ba6d..f0216a25d 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -287,7 +287,6 @@ writecache: enabled: true path: /path/to/writecache capacity: 4294967296 - small_object_size: 16384 max_object_size: 134217728 flush_worker_count: 30 ``` @@ -296,11 +295,8 @@ writecache: |----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| | `path` | `string` | | Path to the metabase file. | | `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | | `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | -| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | -| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | # `node` section diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 67f173f29..1a0bda00d 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -278,7 +278,6 @@ const ( ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" - WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotCloseBlobovnicza = "could not close Blobovnicza" diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index da488260a..2edbd2d6b 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -166,8 +166,7 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) } -func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { - m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db) +func (m *writeCacheMetrics) SetEstimateSize(fstree uint64) { m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } @@ -175,8 +174,7 @@ func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) { m.metrics.SetMode(m.shardID, mod.String()) } -func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { - m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db) +func (m *writeCacheMetrics) SetActualCounters(fstree uint64) { m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index c1c0e88b3..8d3ba4b74 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -95,6 +95,5 @@ func newCache(b *testing.B) writecache.Cache { writecache.WithBlobstor(bs), writecache.WithMetabase(testMetabase{}), writecache.WithMaxCacheSize(256<<30), - writecache.WithSmallObjectSize(128<<10), ) } diff --git a/pkg/local_object_storage/writecache/cachebbolt.go b/pkg/local_object_storage/writecache/cache.go similarity index 70% rename from pkg/local_object_storage/writecache/cachebbolt.go rename to pkg/local_object_storage/writecache/cache.go index cdd4ed442..3faef8838 100644 --- a/pkg/local_object_storage/writecache/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -10,8 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -28,14 +26,10 @@ type cache struct { // whether object should be compressed. compressFlags map[string]struct{} - // flushCh is a channel with objects to flush. - flushCh chan objectInfo // cancel is cancel function, protected by modeMtx in Close. cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup - // store contains underlying database. - store // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree } @@ -43,40 +37,26 @@ type cache struct { // wcStorageType is used for write-cache operations logging. const wcStorageType = "write-cache" -type objectInfo struct { - addr string - data []byte - obj *objectSDK.Object -} - const ( - defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB - defaultSmallObjectSize = 32 * 1024 // 32 KiB - defaultMaxCacheSize = 1 << 30 // 1 GiB + defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB + defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var ( - defaultBucket = []byte{0} - dummyCanceler context.CancelFunc = func() {} -) +var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - flushCh: make(chan objectInfo), - mode: mode.Disabled, + mode: mode.Disabled, compressFlags: make(map[string]struct{}), options: options{ - log: &logger.Logger{Logger: zap.NewNop()}, - maxObjectSize: defaultMaxObjectSize, - smallObjectSize: defaultSmallObjectSize, - workersCount: defaultFlushWorkersCount, - maxCacheSize: defaultMaxCacheSize, - maxBatchSize: bbolt.DefaultMaxBatchSize, - maxBatchDelay: bbolt.DefaultMaxBatchDelay, - openFile: os.OpenFile, - metrics: DefaultMetrics(), + log: &logger.Logger{Logger: zap.NewNop()}, + maxObjectSize: defaultMaxObjectSize, + workersCount: defaultFlushWorkersCount, + maxCacheSize: defaultMaxCacheSize, + openFile: os.OpenFile, + metrics: DefaultMetrics(), }, } @@ -111,7 +91,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { return metaerr.Wrap(err) } - return metaerr.Wrap(c.initCounters()) + _ = c.estimateCacheSize() + return nil } // Init runs necessary services. @@ -138,14 +119,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - - var err error - if c.db != nil { - err = c.db.Close() - if err != nil { - c.db = nil - } - } c.metrics.Close() return nil } diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index b1a0511ee..e3034e1dc 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,7 +2,6 @@ package writecache import ( "context" - "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -47,39 +45,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { saddr := addr.EncodeToString() - var dataSize int - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - dataSize = len(b.Get([]byte(saddr))) - return nil - }) - - if dataSize > 0 { - storageType = StorageTypeDB - var recordDeleted bool - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(saddr) - recordDeleted = b.Get(key) != nil - err := b.Delete(key) - return err - }) - if err != nil { - return err - } - storagelog.Write(c.log, - storagelog.AddressField(saddr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - deleted = true - return nil - } - storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index da7feda9a..d595ddc6c 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,7 +1,6 @@ package writecache import ( - "bytes" "context" "errors" "time" @@ -15,142 +14,33 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/mr-tron/base58" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( - // flushBatchSize is amount of keys which will be read from cache to be flushed - // to the main storage. It is used to reduce contention between cache put - // and cache persist. - flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. - defaultFlushInterval = time.Second + defaultFlushInterval = 10 * time.Second ) -var errIterationCompleted = errors.New("iteration completed") - // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } - for i := 0; i < c.workersCount; i++ { - c.wg.Add(1) - go c.workerFlushSmall(ctx) - } c.wg.Add(1) go func() { c.workerFlushBig(ctx) c.wg.Done() }() - - c.wg.Add(1) - go func() { - defer c.wg.Done() - - tt := time.NewTimer(defaultFlushInterval) - defer tt.Stop() - - for { - select { - case <-tt.C: - c.flushSmallObjects(ctx) - tt.Reset(defaultFlushInterval) - c.estimateCacheSize() - case <-ctx.Done(): - return - } - } - }() -} - -func (c *cache) flushSmallObjects(ctx context.Context) { - var lastKey []byte - for { - select { - case <-ctx.Done(): - return - default: - } - - var m []objectInfo - - c.modeMtx.RLock() - if c.readOnly() { - c.modeMtx.RUnlock() - time.Sleep(time.Second) - continue - } - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - - var k, v []byte - - if len(lastKey) == 0 { - k, v = cs.First() - } else { - k, v = cs.Seek(lastKey) - if bytes.Equal(k, lastKey) { - k, v = cs.Next() - } - } - - for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { - if len(lastKey) == len(k) { - copy(lastKey, k) - } else { - lastKey = bytes.Clone(k) - } - - m = append(m, objectInfo{ - addr: string(k), - data: bytes.Clone(v), - }) - } - return nil - }) - - var count int - for i := range m { - obj := objectSDK.New() - if err := obj.Unmarshal(m[i].data); err != nil { - continue - } - m[i].obj = obj - - count++ - select { - case c.flushCh <- m[i]: - case <-ctx.Done(): - c.modeMtx.RUnlock() - return - } - } - - c.modeMtx.RUnlock() - if count == 0 { - break - } - - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, - zap.Int("count", count), - zap.String("start", base58.Encode(lastKey))) - } } func (c *cache) workerFlushBig(ctx context.Context) { - tick := time.NewTicker(defaultFlushInterval * 10) + tick := time.NewTicker(defaultFlushInterval) for { select { case <-tick.C: @@ -211,29 +101,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } -// workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall(ctx context.Context) { - defer c.wg.Done() - - var objInfo objectInfo - for { - // Give priority to direct put. - select { - case objInfo = <-c.flushCh: - case <-ctx.Done(): - return - } - - err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDB(objInfo.addr, true) - } -} - // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error @@ -300,74 +167,5 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - if err := c.flushFSTree(ctx, ignoreErrors); err != nil { - return err - } - - var last string - for { - batch, err := c.readNextDBBatch(ignoreErrors, last) - if err != nil { - return err - } - if len(batch) == 0 { - break - } - for _, item := range batch { - var obj objectSDK.Object - if err := obj.Unmarshal(item.data); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return err - } - c.deleteFromDB(item.address, false) - } - last = batch[len(batch)-1].address - } - return nil -} - -type batchItem struct { - data []byte - address string -} - -func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { - const batchSize = 100 - var batch []batchItem - err := c.db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { - continue - } - if err := addr.DecodeString(sa); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } - } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil - } - return nil, err + return c.flushFSTree(ctx, ignoreErrors) } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 3c951bebe..861be6fd2 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -19,7 +19,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -31,7 +30,6 @@ func TestFlush(t *testing.T) { append([]Option{ WithLogger(testlogger), WithPath(filepath.Join(t.TempDir(), "writecache")), - WithSmallObjectSize(smallSize), WithMetabase(mb), WithBlobstor(bs), WithDisableBackgroundFlush(), @@ -47,31 +45,6 @@ func TestFlush(t *testing.T) { } failures := []TestFailureInjector[Option]{ - { - Desc: "db, invalid address", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) - }, - }, - { - Desc: "db, invalid object", - InjectFn: func(t *testing.T, wc Cache) { - c := wc.(*cache) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - k := []byte(oidtest.Address().EncodeToString()) - v := []byte{1, 2, 3} - return b.Put(k, v) - })) - }, - }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index bf26833bd..67e360d21 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -1,7 +1,6 @@ package writecache import ( - "bytes" "context" "time" @@ -12,7 +11,6 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -37,11 +35,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) return obj, metaerr.Wrap(err) } -func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { found := false storageType := StorageTypeUndefined startedAt := time.Now() @@ -49,14 +47,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) c.metrics.Get(time.Since(startedAt), found, storageType) }() - value, err := Get(c.db, []byte(saddr)) - if err == nil { - obj := objectSDK.New() - found = true - storageType = StorageTypeDB - return obj, obj.Unmarshal(value) - } - res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) @@ -87,34 +77,10 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, return nil, ErrDegraded } - obj, err := c.getInternal(ctx, saddr, addr) + obj, err := c.getInternal(ctx, addr) if err != nil { return nil, metaerr.Wrap(err) } return obj.CutPayload(), nil } - -// Get fetches object from the underlying database. -// Key should be a stringified address. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. -func Get(db *bbolt.DB, key []byte) ([]byte, error) { - if db == nil { - return nil, ErrNotInitialized - } - var value []byte - err := db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } - value = b.Get(key) - if value == nil { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - value = bytes.Clone(value) - return nil - }) - return value, metaerr.Wrap(err) -} diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 9ec039f91..a0268c09a 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -1,39 +1,28 @@ package writecache import ( - "errors" - "fmt" + "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) -// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. -var ErrNoDefaultBucket = errors.New("no default bucket") +func (c *cache) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Iterate", + trace.WithAttributes( + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() -// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. -// It is assumed that db is an underlying database of some WriteCache instance. -// -// Returns ErrNoDefaultBucket if there is no default bucket in db. -// -// DB must not be nil and should be opened. -func IterateDB(db *bbolt.DB, f func(oid.Address) error) error { - return metaerr.Wrap(db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b == nil { - return ErrNoDefaultBucket - } + if !c.modeMtx.TryRLock() { + return common.IterateRes{}, ErrNotInitialized + } + defer c.modeMtx.RUnlock() + if c.mode.NoMetabase() { + return common.IterateRes{}, ErrDegraded + } - var addr oid.Address - - return b.ForEach(func(k, _ []byte) error { - err := addr.DecodeString(string(k)) - if err != nil { - return fmt.Errorf("could not parse object address: %w", err) - } - - return f(addr) - }) - })) + return c.fsTree.Iterate(ctx, prm) } diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index e68b6d8be..7f9dc6f71 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -14,7 +14,6 @@ func (t StorageType) String() string { const ( StorageTypeUndefined StorageType = "null" - StorageTypeDB StorageType = "db" StorageTypeFSTree StorageType = "fstree" ) @@ -26,9 +25,9 @@ type Metrics interface { Flush(success bool, st StorageType) Evict(st StorageType) - SetEstimateSize(db, fstree uint64) + SetEstimateSize(uint64) SetMode(m mode.ComponentMode) - SetActualCounters(db, fstree uint64) + SetActualCounters(uint64) SetPath(path string) Close() } @@ -47,11 +46,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {} -func (metricsStub) SetEstimateSize(uint64, uint64) {} +func (metricsStub) SetEstimateSize(uint64) {} func (metricsStub) SetMode(mode.ComponentMode) {} -func (metricsStub) SetActualCounters(uint64, uint64) {} +func (metricsStub) SetActualCounters(uint64) {} func (metricsStub) Flush(bool, StorageType) {} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 4172cfbc8..a55c7d967 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -2,10 +2,7 @@ package writecache import ( "context" - "fmt" - "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" @@ -25,7 +22,7 @@ func (c *cache) SetMode(m mode.Mode) error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - err := c.setMode(ctx, m, true) + err := c.setMode(ctx, m, !m.NoMetabase()) if err == nil { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m)) } @@ -44,20 +41,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err } } - if c.db != nil { - if err = c.db.Close(); err != nil { - return fmt.Errorf("can't close write-cache database: %w", err) - } - } - - // Suspend producers to ensure there are channel send operations in fly. - // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty - // guarantees that there are no in-fly operations. - for len(c.flushCh) != 0 { - c.log.Info(logs.WritecacheWaitingForChannelsToFlush) - time.Sleep(time.Second) - } - if turnOffMeta { c.mode = m return nil diff --git a/pkg/local_object_storage/writecache/mode_test.go b/pkg/local_object_storage/writecache/mode_test.go deleted file mode 100644 index f684c15bc..000000000 --- a/pkg/local_object_storage/writecache/mode_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package writecache - -import ( - "context" - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - "github.com/stretchr/testify/require" -) - -func TestMode(t *testing.T) { - t.Parallel() - wc := New( - WithLogger(test.NewLogger(t)), - WithFlushWorkersCount(2), - WithPath(t.TempDir())) - - require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Close()) - - require.NoError(t, wc.Open(context.Background(), mode.Degraded)) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Init()) - require.Nil(t, wc.(*cache).db) - require.NoError(t, wc.Close()) -} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index c8eb1bc45..9620aab94 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -3,7 +3,6 @@ package writecache import ( "io/fs" "os" - "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" @@ -22,19 +21,13 @@ type options struct { metabase Metabase // maxObjectSize is the maximum size of the object stored in the write-cache. maxObjectSize uint64 - // smallObjectSize is the maximum size of the object stored in the database. - smallObjectSize uint64 // workersCount is the number of workers flushing objects in parallel. workersCount int - // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). + // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 // objCounters contains atomic counters for the number of objects stored in cache. objCounters counters - // maxBatchSize is the maximum batch size for the small object database. - maxBatchSize int - // maxBatchDelay is the maximum batch wait time for the small object database. - maxBatchDelay time.Duration // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. @@ -84,15 +77,6 @@ func WithMaxObjectSize(sz uint64) Option { } } -// WithSmallObjectSize sets maximum object size to be stored in write-cache. -func WithSmallObjectSize(sz uint64) Option { - return func(o *options) { - if sz > 0 { - o.smallObjectSize = sz - } - } -} - func WithFlushWorkersCount(c int) Option { return func(o *options) { if c > 0 { @@ -108,24 +92,6 @@ func WithMaxCacheSize(sz uint64) Option { } } -// WithMaxBatchSize sets max batch size for the small object database. -func WithMaxBatchSize(sz int) Option { - return func(o *options) { - if sz > 0 { - o.maxBatchSize = sz - } - } -} - -// WithMaxBatchDelay sets max batch delay for the small object database. -func WithMaxBatchDelay(d time.Duration) Option { - return func(o *options) { - if d > 0 { - o.maxBatchDelay = d - } - } -} - // WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 0e419f95b..6641d3b70 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,7 +8,6 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -50,63 +49,17 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, ErrBigObject } - oi := objectInfo{ - addr: prm.Address.EncodeToString(), - obj: prm.Object, - data: prm.RawData, - } - - if sz <= c.smallObjectSize { - storageType = StorageTypeDB - err := c.putSmall(oi) - if err == nil { - added = true - } - return common.PutRes{}, err - } - storageType = StorageTypeFSTree - err := c.putBig(ctx, oi.addr, prm) + err := c.putBig(ctx, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } -// putSmall persists small objects to the write-cache database and -// pushes the to the flush workers queue. -func (c *cache) putSmall(obj objectInfo) error { - cacheSize := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeDB(cacheSize) { - return ErrOutOfSpace - } - - var newRecord bool - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(obj.addr) - newRecord = b.Get(key) == nil - if newRecord { - return b.Put(key, obj.data) - } - return nil - }) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(obj.addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db PUT"), - ) - if newRecord { - c.objCounters.cDB.Add(1) - c.estimateCacheSize() - } - } - return err -} - // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { +func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { + addr := prm.Address.EncodeToString() cacheSz := c.estimateCacheSize() if c.maxCacheSize < c.incSizeFS(cacheSz) { return ErrOutOfSpace diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index bc75aaf27..c2198cc09 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,29 +1,21 @@ package writecache import ( - "fmt" "math" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "go.etcd.io/bbolt" ) func (c *cache) estimateCacheSize() uint64 { - dbCount := c.objCounters.DB() fsCount := c.objCounters.FS() if fsCount > 0 { fsCount-- // db file } - dbSize := dbCount * c.smallObjectSize fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(dbSize, fsSize) - c.metrics.SetActualCounters(dbCount, fsCount) - return dbSize + fsSize -} - -func (c *cache) incSizeDB(sz uint64) uint64 { - return sz + c.smallObjectSize + c.metrics.SetEstimateSize(fsSize) + c.metrics.SetActualCounters(fsCount) + return fsSize } func (c *cache) incSizeFS(sz uint64) uint64 { @@ -33,11 +25,7 @@ func (c *cache) incSizeFS(sz uint64) uint64 { var _ fstree.FileCounter = &counters{} type counters struct { - cDB, cFS atomic.Uint64 -} - -func (x *counters) DB() uint64 { - return x.cDB.Load() + cFS atomic.Uint64 } func (x *counters) FS() uint64 { @@ -58,20 +46,3 @@ func (x *counters) Inc() { func (x *counters) Dec() { x.cFS.Add(math.MaxUint64) } - -func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) - } - c.objCounters.cDB.Store(inDB) - c.estimateCacheSize() - return nil -} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index caf997af8..2dc922032 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -3,7 +3,6 @@ package writecache import ( "context" "fmt" - "math" "os" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -14,42 +13,15 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.uber.org/zap" ) -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *bbolt.DB -} - -const dbName = "small.bolt" - func (c *cache) openStore(mod mode.ComponentMode) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { return err } - c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile) - if err != nil { - return fmt.Errorf("could not open database: %w", err) - } - - c.db.MaxBatchSize = c.maxBatchSize - c.db.MaxBatchDelay = c.maxBatchDelay - - if !mod.ReadOnly() { - err = c.db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(defaultBucket) - return err - }) - if err != nil { - return fmt.Errorf("could not create default bucket: %w", err) - } - } - c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), @@ -68,41 +40,6 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDB(key string, batched bool) { - var recordDeleted bool - var err error - if batched { - err = c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } else { - err = c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) - } - - if err == nil { - c.metrics.Evict(StorageTypeDB) - storagelog.Write(c.log, - storagelog.AddressField(key), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - if recordDeleted { - c.objCounters.cDB.Add(math.MaxUint64) - c.estimateCacheSize() - } - } else { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) - } -} - func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err != nil && !client.IsErrObjectNotFound(err) { diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go deleted file mode 100644 index 0ed4a954e..000000000 --- a/pkg/local_object_storage/writecache/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package writecache - -import ( - "io/fs" - "os" - "path/filepath" - "time" - - "go.etcd.io/bbolt" -) - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 71dba61cf..76ea84eda 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -37,6 +37,7 @@ type Cache interface { DumpInfo() Info Flush(context.Context, bool, bool) error Seal(context.Context, bool) error + Iterate(context.Context, common.IteratePrm) (common.IterateRes, error) Init() error Open(ctx context.Context, mode mode.Mode) error -- 2.45.2 From a7536afbf5228118b62d0758120cf418e517e421 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 23 Jul 2024 11:29:05 +0300 Subject: [PATCH 2/4] [#1273] writecache: Count real data size The size of the data on the disk is used to determine the writeache size, but not the number of objects multiplied by the maximum allowed object size. Signed-off-by: Dmitrii Stepanov --- .../blobstor/fstree/counter.go | 40 ++++++++++---- .../blobstor/fstree/fstree.go | 20 ++++--- .../blobstor/fstree/fstree_test.go | 13 ++--- .../blobstor/fstree/fstree_write_generic.go | 36 ++++++++---- .../blobstor/fstree/fstree_write_linux.go | 26 ++++++--- pkg/local_object_storage/writecache/cache.go | 3 +- .../writecache/options.go | 5 +- pkg/local_object_storage/writecache/put.go | 7 ++- pkg/local_object_storage/writecache/state.go | 55 +++++-------------- .../writecache/storage.go | 2 +- 10 files changed, 114 insertions(+), 93 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2e..c1c6ff6fa 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,23 @@ package fstree import ( - "math" "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size int64) + Inc(size int64) + Dec(size int64) + Value() (int64, int64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(int64, int64) {} +func (c *noopCounter) Inc(int64) {} +func (c *noopCounter) Dec(int64) {} +func (c *noopCounter) Value() (int64, int64) { return 0, 0 } func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + count atomic.Int64 + size atomic.Int64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size int64) { + c.count.Store(count) + c.size.Store(size) +} + +func (c *SimpleCounter) Inc(size int64) { + c.count.Add(1) + c.size.Add(size) +} + +func (c *SimpleCounter) Dec(size int64) { + c.count.Add(-1) + c.size.Add(-size) +} + +func (c *SimpleCounter) Value() (int64, int64) { + return c.count.Load(), c.size.Load() +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbfa..cf0affdb6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (int64, int64, error) { + var count int64 + var size int64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { if !d.IsDir() { - counter++ + count++ + fi, err := d.Info() + if err != nil { + return err + } + size += fi.Size() } return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index d633cbac3..f89df7405 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + counterValue, sizeValue := counter.Value() + require.Equal(t, int64(0), counterValue) + require.Equal(t, int64(0), sizeValue) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + counterValue, sizeValue = counter.Value() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) require.Equal(t, realCount, counterValue) + require.Equal(t, realSize, sizeValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b2622885..cf9941ec8 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(int64(len(data))) var targetFileExists bool - if _, e := os.Stat(p); e == nil { + s, e := os.Stat(p) + if e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(int64(s.Size())) } } else { err = os.Rename(tmpPath, p) @@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error { } func (w *genericWriter) removeFile(p string) error { - var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } - } else { - err = os.Remove(p) + return w.removeFileWithCounter(p) } + err := os.Remove(p) if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } return err } + +func (w *genericWriter) removeFileWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + s, err := os.Stat(p) + if err != nil && os.IsNotExist(err) { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + err = os.Remove(p) + if err == nil { + w.fileCounter.Dec(s.Size()) + } + return err +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3d..774e996f9 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -18,7 +18,8 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + counter FileCounter + counterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b } _ = unix.Close(fd) // Don't care about error. w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, + counterEnabled: counterEnabled(c), } return w } @@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.counter.Inc(int64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { + var s unix.Stat_t + if w.counterEnabled { + err := unix.Stat(p, &s) + if err != nil && err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + } err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.counter.Dec(s.Size) } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 3faef8838..e57f4fc5e 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -57,6 +57,7 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, openFile: os.OpenFile, metrics: DefaultMetrics(), + counter: fstree.NewSimpleCounter(), }, } @@ -91,7 +92,7 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { return metaerr.Wrap(err) } - _ = c.estimateCacheSize() + c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 9620aab94..ada098fd4 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -4,6 +4,7 @@ import ( "io/fs" "os" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -26,8 +27,8 @@ type options struct { // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 6641d3b70..9f60972d3 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -60,8 +60,11 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { addr := prm.Address.EncodeToString() - cacheSz := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeFS(cacheSz) { + estimatedObjSize := uint64(len(prm.RawData)) + if estimatedObjSize == 0 { + estimatedObjSize = prm.Object.PayloadSize() + } + if !c.hasFreeSpace(estimatedObjSize) { return ErrOutOfSpace } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index c2198cc09..6d02df245 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,48 +1,19 @@ package writecache -import ( - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" -) - -func (c *cache) estimateCacheSize() uint64 { - fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file +func (c *cache) estimateCacheSize() { + count, size := c.counter.Value() + var ucount, usize uint64 + if count > 0 { + ucount = uint64(count) } - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(fsSize) - c.metrics.SetActualCounters(fsCount) - return fsSize + if size > 0 { + usize = uint64(size) + } + c.metrics.SetEstimateSize(ucount) + c.metrics.SetActualCounters(usize) } -func (c *cache) incSizeFS(sz uint64) uint64 { - return sz + c.maxObjectSize -} - -var _ fstree.FileCounter = &counters{} - -type counters struct { - cFS atomic.Uint64 -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) +func (c *cache) hasFreeSpace(sz uint64) bool { + _, size := c.counter.Value() + return size+int64(sz) <= int64(c.maxCacheSize) } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 2dc922032..6aface7a5 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -28,7 +28,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err) -- 2.45.2 From e61f9ac796c431754db1813d7411a4ea545b81d5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 24 Jul 2024 11:36:46 +0300 Subject: [PATCH 3/4] [#1273] writecache: Add count limit Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 3 +++ cmd/frostfs-node/config/engine/config_test.go | 2 ++ .../config/engine/shard/writecache/config.go | 21 ++++++++++++++++++- config/example/node.env | 3 ++- config/example/node.json | 3 ++- config/example/node.yaml | 3 ++- docs/storage-node-configuration.md | 17 ++++++++------- .../writecache/options.go | 9 ++++++++ pkg/local_object_storage/writecache/state.go | 5 +++-- 9 files changed, 53 insertions(+), 13 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 03b67e53a..446d7bb3e 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -149,6 +149,7 @@ type shardCfg struct { maxObjSize uint64 flushWorkerCount int sizeLimit uint64 + countLimit uint64 noSync bool } @@ -269,6 +270,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() + wc.countLimit = writeCacheCfg.CountLimit() wc.noSync = writeCacheCfg.NoSync() } } @@ -858,6 +860,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), + writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log), ) diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index a4a102229..0f191fb37 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -77,6 +77,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 3221225472, wc.SizeLimit()) + require.EqualValues(t, 0, wc.CountLimit()) require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) @@ -131,6 +132,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 4294967296, wc.SizeLimit()) + require.EqualValues(t, 10000, wc.CountLimit()) require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index 1d4fbce9b..9293ad5fe 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -80,11 +80,20 @@ func (x *Config) WorkerCount() int { return WorkersNumberDefault } -// SizeLimit returns the value of "capacity" config parameter. +// SizeLimit returns the value of "capacity_size" or "capacity" config parameter. // // Returns SizeLimitDefault if the value is not a positive number. func (x *Config) SizeLimit() uint64 { c := config.SizeInBytesSafe( + (*config.Config)(x), + "capacity_size", + ) + + if c > 0 { + return c + } + + c = config.SizeInBytesSafe( (*config.Config)(x), "capacity", ) @@ -96,6 +105,16 @@ func (x *Config) SizeLimit() uint64 { return SizeLimitDefault } +// CountLimit returns the value of "capacity_count" config parameter. +// +// Returns 0 (means no limit) if the value is not a positive number. +func (x *Config) CountLimit() uint64 { + return config.SizeInBytesSafe( + (*config.Config)(x), + "capacity_count", + ) +} + // NoSync returns the value of "no_sync" config parameter. // // Returns false if the value is not a boolean. diff --git a/config/example/node.env b/config/example/node.env index 68a0503b3..e49821c2b 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -156,7 +156,8 @@ FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30 -FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296 +FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_SIZE=4294967296 +FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_COUNT=10000 ### Metabase config FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644 diff --git a/config/example/node.json b/config/example/node.json index 7d4b34c7b..a2b699703 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -204,7 +204,8 @@ "memcache_capacity": 2147483648, "max_object_size": 134217728, "flush_worker_count": 30, - "capacity": 4294967296 + "capacity_size": 4294967296, + "capacity_count": 10000 }, "metabase": { "path": "tmp/1/meta", diff --git a/config/example/node.yaml b/config/example/node.yaml index cd13fa836..50e46630c 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -207,7 +207,8 @@ storage: 1: writecache: path: tmp/1/cache # write-cache root directory - capacity: 4 G # approximate write-cache total size, bytes + capacity_size: 4 G # approximate write-cache total size, bytes + capacity_count: 10000 metabase: path: tmp/1/meta # metabase path diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index f0216a25d..f6657980a 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -286,17 +286,20 @@ metabase: writecache: enabled: true path: /path/to/writecache - capacity: 4294967296 + capacity_size: 4294967296 + capacity_count: 100000 max_object_size: 134217728 flush_worker_count: 30 ``` -| Parameter | Type | Default value | Description | -|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| -| `path` | `string` | | Path to the metabase file. | -| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | -| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | +| Parameter | Type | Default value | Description | +|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------------------| +| `path` | `string` | | Path to the metabase file. | +| `capacity_size` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `capacity` | `size` | `1G` | The same as for `capacity`. Deprecated, use `capacity_size`. | +| `capacity_count` | `int` | unrestricted | Approximate maximum count of objects in the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | +| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | # `node` section diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index ada098fd4..9b242afdf 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -27,6 +27,8 @@ type options struct { // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 + // maxCacheCount is the maximum total count of all objects saved in cache. + maxCacheCount uint64 // counter contains atomic counters for the number of objects stored in cache. counter *fstree.SimpleCounter // noSync is true iff FSTree allows unsynchronized writes. @@ -93,6 +95,13 @@ func WithMaxCacheSize(sz uint64) Option { } } +// WithMaxCacheCount sets maximum write-cache count of objects. +func WithMaxCacheCount(cnt uint64) Option { + return func(o *options) { + o.maxCacheCount = cnt + } +} + // WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 6d02df245..0d2e74302 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -14,6 +14,7 @@ func (c *cache) estimateCacheSize() { } func (c *cache) hasFreeSpace(sz uint64) bool { - _, size := c.counter.Value() - return size+int64(sz) <= int64(c.maxCacheSize) + count, size := c.counter.Value() + return (size+int64(sz) <= int64(c.maxCacheSize)) && + (c.maxCacheCount == 0 || count+1 <= int64(c.maxCacheCount)) } -- 2.45.2 From 81df490709c123596380ec61e02e35398626a630 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 24 Jul 2024 14:10:22 +0300 Subject: [PATCH 4/4] [#1273] writecache: Flush writecache concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-lens/internal/meta/inspect.go | 6 +- internal/logs/logs.go | 1 + .../metabase/storage_id.go | 38 ++----- .../metabase/storage_id_test.go | 11 +- pkg/local_object_storage/shard/delete.go | 6 +- .../shard/gc_internal_test.go | 7 +- pkg/local_object_storage/shard/get.go | 5 +- pkg/local_object_storage/shard/rebuilder.go | 4 +- .../writecache/benchmark/writecache_test.go | 26 ++++- pkg/local_object_storage/writecache/cache.go | 10 +- pkg/local_object_storage/writecache/flush.go | 104 ++++++++++++++---- .../writecache/flush_test.go | 6 +- .../writecache/writecache.go | 1 + 13 files changed, 138 insertions(+), 87 deletions(-) diff --git a/cmd/frostfs-lens/internal/meta/inspect.go b/cmd/frostfs-lens/internal/meta/inspect.go index 9eb60f966..205c71dc6 100644 --- a/cmd/frostfs-lens/internal/meta/inspect.go +++ b/cmd/frostfs-lens/internal/meta/inspect.go @@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) { db := openMeta(cmd) defer db.Close() - storageID := meta.StorageIDPrm{} - storageID.SetAddress(addr) - + storageID := meta.StorageIDPrm{Address: addr} resStorageID, err := db.StorageID(cmd.Context(), storageID) common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err)) - if id := resStorageID.StorageID(); id != nil { + if id := resStorageID.StorageID; id != nil { cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path()) } else { cmd.Printf("Object does not contain storageID\n\n") diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 1a0bda00d..f73866861 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -468,6 +468,7 @@ const ( FSTreeCantUnmarshalObject = "can't unmarshal an object" FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor" FSTreeCantUpdateID = "can't update object storage ID" + FSTreeCantGetID = "can't get object storage ID" FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" PutSingleRedirectFailure = "failed to redirect PutSingle request" StorageIDRetrievalFailure = "can't get storage ID from metabase" diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 6d620b41a..657334d17 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -15,22 +15,12 @@ import ( // StorageIDPrm groups the parameters of StorageID operation. type StorageIDPrm struct { - addr oid.Address + Address oid.Address } // StorageIDRes groups the resulting values of StorageID operation. type StorageIDRes struct { - id []byte -} - -// SetAddress is a StorageID option to set the object address to check. -func (p *StorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// StorageID returns storage ID. -func (r StorageIDRes) StorageID() []byte { - return r.id + StorageID []byte } // StorageID returns storage descriptor for objects from the blobstor. @@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes _, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), + attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() @@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes } err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.id, err = db.storageID(tx, prm.addr) + res.StorageID, err = db.storageID(tx, prm.Address) return err }) @@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. type UpdateStorageIDPrm struct { - addr oid.Address - id []byte + Address oid.Address + StorageID []byte } // UpdateStorageIDRes groups the resulting values of UpdateStorageID operation. type UpdateStorageIDRes struct{} -// SetAddress is an UpdateStorageID option to set the object address to check. -func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// SetStorageID is an UpdateStorageID option to set the storage ID. -func (p *UpdateStorageIDPrm) SetStorageID(id []byte) { - p.id = id -} - // UpdateStorageID updates storage descriptor for objects from the blobstor. func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) { var ( @@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res _, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), - attribute.String("storage_id", string(prm.id)), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), )) defer span.End() @@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res } err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return setStorageID(tx, prm.addr, prm.id, true) + return setStorageID(tx, prm.Address, prm.StorageID, true) }) success = err == nil return res, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/storage_id_test.go b/pkg/local_object_storage/metabase/storage_id_test.go index aaf6480ab..d6e4a2290 100644 --- a/pkg/local_object_storage/metabase/storage_id_test.go +++ b/pkg/local_object_storage/metabase/storage_id_test.go @@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) { } func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error { - var sidPrm meta.UpdateStorageIDPrm - sidPrm.SetAddress(addr) - sidPrm.SetStorageID(id) - + sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id} _, err := db.UpdateStorageID(context.Background(), sidPrm) return err } func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) { - var sidPrm meta.StorageIDPrm - sidPrm.SetAddress(addr) - + sidPrm := meta.StorageIDPrm{Address: addr} r, err := db.StorageID(context.Background(), sidPrm) - return r.StorageID(), err + return r.StorageID, err } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index c898fdf41..4b57f82b6 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr } func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error { - var sPrm meta.StorageIDPrm - sPrm.SetAddress(addr) - + sPrm := meta.StorageIDPrm{Address: addr} res, err := s.metaBase.StorageID(ctx, sPrm) if err != nil { s.log.Debug(logs.StorageIDRetrievalFailure, @@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - storageID := res.StorageID() + storageID := res.StorageID if storageID == nil { // if storageID is nil it means: // 1. there is no such object diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index 3993593ad..5e4b9b5a2 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { require.True(t, client.IsErrObjectNotFound(err), "invalid error type") // storageID - var metaStIDPrm meta.StorageIDPrm - metaStIDPrm.SetAddress(addr) + metaStIDPrm := meta.StorageIDPrm{Address: addr} storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) require.NoError(t, err, "failed to get storage ID") // check existence in blobstore var bsExisted common.ExistsPrm bsExisted.Address = addr - bsExisted.StorageID = storageID.StorageID() + bsExisted.StorageID = storageID.StorageID exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) require.NoError(t, err, "failed to check blobstore existence") require.True(t, exRes.Exists, "invalid blobstore existence result") @@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { // drop from blobstor var bsDeletePrm common.DeletePrm bsDeletePrm.Address = addr - bsDeletePrm.StorageID = storageID.StorageID() + bsDeletePrm.StorageID = storageID.StorageID _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) require.NoError(t, err, "failed to delete from blobstore") diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 2e7c84bcd..8b43b4fc3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta return res, false, err } - var mPrm meta.StorageIDPrm - mPrm.SetAddress(addr) + mPrm := meta.StorageIDPrm{Address: addr} mExRes, err := s.metaBase.StorageID(ctx, mPrm) if err != nil { return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } - storageID := mExRes.StorageID() + storageID := mExRes.StorageID if storageID == nil { // `nil` storageID returned without any error // means that object is big, `cb` expects an diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go index f18573c57..06a254319 100644 --- a/pkg/local_object_storage/shard/rebuilder.go +++ b/pkg/local_object_storage/shard/rebuilder.go @@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres return errMBIsNotAvailable } - var prm meta.UpdateStorageIDPrm - prm.SetAddress(addr) - prm.SetStorageID(storageID) + prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID} _, err := u.mb.UpdateStorageID(ctx, prm) return err } diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index 8d3ba4b74..495f66b0d 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -2,6 +2,7 @@ package benchmark import ( "context" + "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,6 +11,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) { require.NoError(b, cache.Init(), "initializing") } -type testMetabase struct{} +type testMetabase struct { + storageIDs map[oid.Address][]byte + guard *sync.RWMutex +} -func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { +func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { + t.guard.Lock() + defer t.guard.Unlock() + t.storageIDs[prm.Address] = prm.StorageID return meta.UpdateStorageIDRes{}, nil } +func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) { + t.guard.RLock() + defer t.guard.RUnlock() + + if id, found := t.storageIDs[prm.Address]; found { + return meta.StorageIDRes{ + StorageID: id, + }, nil + } + return meta.StorageIDRes{}, nil +} + func newCache(b *testing.B) writecache.Cache { bs := teststore.New( teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), @@ -93,7 +113,7 @@ func newCache(b *testing.B) writecache.Cache { return writecache.New( writecache.WithPath(b.TempDir()), writecache.WithBlobstor(bs), - writecache.WithMetabase(testMetabase{}), + writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}), writecache.WithMaxCacheSize(256<<30), ) } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index e57f4fc5e..a6d055dba 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -10,6 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -26,6 +28,9 @@ type cache struct { // whether object should be compressed. compressFlags map[string]struct{} + flushCh chan objectInfo + flushingGuard *utilSync.KeyLocker[oid.Address] + // cancel is cancel function, protected by modeMtx in Close. cancel atomic.Value // wg is a wait group for flush workers. @@ -47,8 +52,9 @@ var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - mode: mode.Disabled, - + mode: mode.Disabled, + flushCh: make(chan objectInfo), + flushingGuard: utilSync.NewKeyLocker[oid.Address](), compressFlags: make(map[string]struct{}), options: options{ log: &logger.Logger{Logger: zap.NewNop()}, diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index d595ddc6c..105d7a189 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -6,7 +6,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -14,6 +13,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -32,33 +32,78 @@ func (c *cache) runFlushLoop(ctx context.Context) { return } + for i := 0; i < c.workersCount; i++ { + c.wg.Add(1) + go c.workerFlush(ctx) + } + c.wg.Add(1) - go func() { - c.workerFlushBig(ctx) - c.wg.Done() - }() + go c.workerSelect(ctx) } -func (c *cache) workerFlushBig(ctx context.Context) { +func (c *cache) workerSelect(ctx context.Context) { + defer c.wg.Done() tick := time.NewTicker(defaultFlushInterval) for { select { case <-tick.C: - c.modeMtx.RLock() - if c.readOnly() || c.noMetabase() { - c.modeMtx.RUnlock() - break + var prm common.IteratePrm + prm.IgnoreErrors = true + prm.Handler = func(ie common.IterationElement) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.readOnly() { + return ErrReadOnly + } + if c.noMetabase() { + return ErrDegraded + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.flushCh <- objectInfo{ + data: ie.ObjectData, + address: ie.Address, + }: + return nil + } } - - _ = c.flushFSTree(ctx, true) - - c.modeMtx.RUnlock() + _, _ = c.fsTree.Iterate(ctx, prm) case <-ctx.Done(): return } } } +func (c *cache) workerFlush(ctx context.Context) { + defer c.wg.Done() + + var objInfo objectInfo + for { + select { + case objInfo = <-c.flushCh: + case <-ctx.Done(): + return + } + + var obj objectSDK.Object + err := obj.Unmarshal(objInfo.data) + if err != nil { + c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err)) + continue + } + + err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDisk(ctx, objInfo.address) + } +} + func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) @@ -85,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) + err = c.flushObject(ctx, e.Address, &obj, e.ObjectData) if err != nil { if ignoreErrors { return nil @@ -102,15 +147,25 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { - var err error +func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error { + c.flushingGuard.Lock(addr) + defer c.flushingGuard.Unlock(addr) + + stPrm := meta.StorageIDPrm{Address: addr} + stRes, err := c.metabase.StorageID(ctx, stPrm) + if err != nil { + c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err) + return err + } + if stRes.StorageID != nil { + // already flushed + return nil + } defer func() { - c.metrics.Flush(err == nil, st) + c.metrics.Flush(err == nil, StorageTypeFSTree) }() - addr := objectCore.AddressOf(obj) - var prm common.PutPrm prm.Object = obj prm.RawData = data @@ -125,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b return err } - var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(addr) - updPrm.SetStorageID(res.StorageID) + updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID} _, err = c.metabase.UpdateStorageID(ctx, updPrm) if err != nil { @@ -169,3 +222,8 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.flushFSTree(ctx, ignoreErrors) } + +type objectInfo struct { + data []byte + address oid.Address +} diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 861be6fd2..3038f6470 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -226,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair { func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { for i := range objects { - var mPrm meta.StorageIDPrm - mPrm.SetAddress(objects[i].addr) - + mPrm := meta.StorageIDPrm{Address: objects[i].addr} mRes, err := mb.StorageID(context.Background(), mPrm) require.NoError(t, err) var prm common.GetPrm prm.Address = objects[i].addr - prm.StorageID = mRes.StorageID() + prm.StorageID = mRes.StorageID res, err := bs.Get(context.Background(), prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 76ea84eda..960137dfb 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -55,6 +55,7 @@ type MainStorage interface { // Metabase is the interface of the metabase used by Cache implementations. type Metabase interface { UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) + StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error) } var ( -- 2.45.2