diff --git a/internal/logs/logs.go b/internal/logs/logs.go index d0bac4d11..7e8bf5926 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -518,6 +518,5 @@ const ( FailedToSealWritecacheAsync = "failed to seal writecache async" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" - WritecacheCantGetObject = "can't get an object from fstree" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" ) diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index ea4e7b5d3..1e11d2ec3 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -10,7 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -37,8 +37,9 @@ type cache struct { const wcStorageType = "write-cache" type objectInfo struct { - addr oid.Address - size uint64 + addr string + data []byte + obj *objectSDK.Object } const ( diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 6f1300c06..094c6486d 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -13,12 +13,10 @@ import ( objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -31,7 +29,7 @@ const ( // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. - defaultFlushInterval = 10 * time.Second + defaultFlushInterval = time.Second ) var errIterationCompleted = errors.New("iteration completed") @@ -43,41 +41,23 @@ func (c *cache) runFlushLoop(ctx context.Context) { } c.wg.Add(1) go func() { - defer c.wg.Done() - c.pushToFlushQueue(ctx) + c.workerFlushBig(ctx) + c.wg.Done() }() - - for range c.workersCount { - c.wg.Add(1) - go c.workerFlush(ctx) - } } -func (c *cache) pushToFlushQueue(ctx context.Context) { - tick := time.NewTicker(defaultFlushInterval) +func (c *cache) workerFlushBig(ctx context.Context) { + tick := time.NewTicker(defaultFlushInterval * 10) for { select { case <-tick.C: c.modeMtx.RLock() if c.readOnly() || c.noMetabase() { c.modeMtx.RUnlock() - continue + break } - err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { - select { - case c.flushCh <- objectInfo{ - addr: oi.Address, - size: oi.DataSize, - }: - return nil - case <-ctx.Done(): - return ctx.Err() - } - }) - if err != nil { - c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err)) - } + _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() case <-ctx.Done(): @@ -86,37 +66,6 @@ func (c *cache) pushToFlushQueue(ctx context.Context) { } } -func (c *cache) workerFlush(ctx context.Context) { - defer c.wg.Done() - - var objInfo objectInfo - for { - select { - case objInfo = <-c.flushCh: - case <-ctx.Done(): - return - } - - res, err := c.fsTree.Get(ctx, common.GetPrm{ - Address: objInfo.addr, - }) - if err != nil { - if !errors.As(err, new(*apistatus.ObjectNotFound)) { - c.reportFlushError(ctx, logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) - } - continue - } - - err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDisk(ctx, objInfo.addr) - } -} - func (c *cache) reportFlushError(ctx context.Context, msg string, addr string, err error) { if c.reportError != nil { c.reportError(ctx, msg, err)