package writecache import ( "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. defaultFlushInterval = 10 * time.Second ) var errIterationCompleted = errors.New("iteration completed") // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } fl := newFlushLimiter(c.flushSizeLimit) c.wg.Add(1) go func() { defer c.wg.Done() c.pushToFlushQueue(ctx, fl) }() for range c.workersCount { c.wg.Add(1) go c.workerFlush(ctx, fl) } } func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { stopf := context.AfterFunc(ctx, func() { fl.close() }) defer stopf() tick := time.NewTicker(defaultFlushInterval) for { select { case <-tick.C: c.modeMtx.RLock() if c.readOnly() || c.noMetabase() { c.modeMtx.RUnlock() continue } err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { if err := fl.acquire(oi.DataSize); err != nil { return err } select { case c.flushCh <- objectInfo{ addr: oi.Address, size: oi.DataSize, }: return nil case <-ctx.Done(): fl.release(oi.DataSize) return ctx.Err() } }) if err != nil { c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err)) } c.modeMtx.RUnlock() case <-ctx.Done(): return } } } func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) { defer c.wg.Done() var objInfo objectInfo for { select { case objInfo = <-c.flushCh: c.flushIfAnObjectExistsWorker(ctx, objInfo, fl) case <-ctx.Done(): return } } } func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) { defer fl.release(objInfo.size) res, err := c.fsTree.Get(ctx, common.GetPrm{ Address: objInfo.addr, }) if err != nil { if !client.IsErrObjectNotFound(err) { c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) } return } err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) if err != nil { // Error is handled in flushObject. return } c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData))) } func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) } else { c.log.Error(context.Background(), msg, zap.String("address", addr), zap.Error(err)) } } func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { var prm common.IteratePrm prm.IgnoreErrors = ignoreErrors prm.Handler = func(e common.IterationElement) error { sAddr := e.Address.EncodeToString() var obj objectSDK.Object err := obj.Unmarshal(e.ObjectData) if err != nil { c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err)) if ignoreErrors { return nil } return err } err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { return err } c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData))) return nil } _, err := c.fsTree.Iterate(ctx, prm) return err } // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error defer func() { c.metrics.Flush(err == nil, st) }() addr := objectCore.AddressOf(obj) var prm common.PutPrm prm.Object = obj prm.RawData = data res, err := c.blobstor.Put(ctx, prm) if err != nil { if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && !errors.Is(err, blobstor.ErrNoPlaceFound) { c.reportFlushError(logs.FSTreeCantFushObjectBlobstor, addr.EncodeToString(), err) } return err } var updPrm meta.UpdateStorageIDPrm updPrm.SetAddress(addr) updPrm.SetStorageID(res.StorageID) _, err = c.metabase.UpdateStorageID(ctx, updPrm) if err != nil { c.reportFlushError(logs.FSTreeCantUpdateID, addr.EncodeToString(), err) } return err } // Flush flushes all objects from the write-cache to the main storage. func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), attribute.Bool("seal", seal), )) defer span.End() c.modeMtx.Lock() // exclusive lock to not to conflict with background flush defer c.modeMtx.Unlock() if c.noMetabase() { return ErrDegraded } if err := c.flush(ctx, ignoreErrors); err != nil { return err } if seal { m := c.mode | mode.ReadOnly if err := c.setMode(ctx, m, setModePrm{ignoreErrors: ignoreErrors}); err != nil { return err } c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m)) } return nil } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.flushFSTree(ctx, ignoreErrors) }