package writecache import ( "bytes" "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/mr-tron/base58" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( // flushBatchSize is amount of keys which will be read from cache to be flushed // to the main storage. It is used to reduce contention between cache put // and cache persist. flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. defaultFlushInterval = time.Second ) var errIterationCompleted = errors.New("iteration completed") // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall(ctx) } c.wg.Add(1) go func() { c.workerFlushBig(ctx) c.wg.Done() }() c.wg.Add(1) go func() { defer c.wg.Done() tt := time.NewTimer(defaultFlushInterval) defer tt.Stop() for { select { case <-tt.C: c.flushSmallObjects(ctx) tt.Reset(defaultFlushInterval) c.estimateCacheSize() case <-ctx.Done(): return } } }() } func (c *cache) flushSmallObjects(ctx context.Context) { var lastKey []byte for { select { case <-ctx.Done(): return default: } var m []objectInfo c.modeMtx.RLock() if c.readOnly() { c.modeMtx.RUnlock() time.Sleep(time.Second) continue } // We put objects in batches of fixed size to not interfere with main put cycle a lot. _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) cs := b.Cursor() var k, v []byte if len(lastKey) == 0 { k, v = cs.First() } else { k, v = cs.Seek(lastKey) if bytes.Equal(k, lastKey) { k, v = cs.Next() } } for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { if len(lastKey) == len(k) { copy(lastKey, k) } else { lastKey = bytes.Clone(k) } m = append(m, objectInfo{ addr: string(k), data: bytes.Clone(v), }) } return nil }) var count int for i := range m { obj := objectSDK.New() if err := obj.Unmarshal(m[i].data); err != nil { continue } m[i].obj = obj count++ select { case c.flushCh <- m[i]: case <-ctx.Done(): c.modeMtx.RUnlock() return } } c.modeMtx.RUnlock() if count == 0 { break } c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Int("count", count), zap.String("start", base58.Encode(lastKey))) } } func (c *cache) workerFlushBig(ctx context.Context) { tick := time.NewTicker(defaultFlushInterval * 10) for { select { case <-tick.C: c.modeMtx.RLock() if c.readOnly() { c.modeMtx.RUnlock() break } _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() case <-ctx.Done(): return } } } func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) } else { c.log.Error(msg, zap.String("address", addr), zap.Error(err)) } } func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { var prm common.IteratePrm prm.IgnoreErrors = ignoreErrors prm.Handler = func(e common.IterationElement) error { sAddr := e.Address.EncodeToString() var obj objectSDK.Object err := obj.Unmarshal(e.ObjectData) if err != nil { c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err)) if ignoreErrors { return nil } return err } err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { if ignoreErrors { return nil } return err } c.deleteFromDisk(ctx, e.Address) return nil } _, err := c.fsTree.Iterate(ctx, prm) return err } // workerFlushSmall writes small objects to the main storage. func (c *cache) workerFlushSmall(ctx context.Context) { defer c.wg.Done() var objInfo objectInfo for { // Give priority to direct put. select { case objInfo = <-c.flushCh: case <-ctx.Done(): return } err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) if err != nil { // Error is handled in flushObject. continue } c.deleteFromDB(objInfo.addr, true) } } // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error defer func() { c.metrics.Flush(err == nil, st) }() addr := objectCore.AddressOf(obj) var prm common.PutPrm prm.Object = obj prm.RawData = data res, err := c.blobstor.Put(ctx, prm) if err != nil { if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && !errors.Is(err, blobstor.ErrNoPlaceFound) { c.reportFlushError(logs.FSTreeCantFushObjectBlobstor, addr.EncodeToString(), err) } return err } var updPrm meta.UpdateStorageIDPrm updPrm.SetAddress(addr) updPrm.SetStorageID(res.StorageID) _, err = c.metabase.UpdateStorageID(ctx, updPrm) if err != nil { c.reportFlushError(logs.FSTreeCantUpdateID, addr.EncodeToString(), err) } return err } // Flush flushes all objects from the write-cache to the main storage. func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), attribute.Bool("seal", seal), )) defer span.End() c.modeMtx.Lock() // exclusive lock to not to conflict with background flush defer c.modeMtx.Unlock() if err := c.flush(ctx, ignoreErrors); err != nil { return err } if seal { m := c.mode | mode.ReadOnly if err := c.setMode(ctx, m, ignoreErrors); err != nil { return err } c.metrics.SetMode(m) } return nil } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { if err := c.flushFSTree(ctx, ignoreErrors); err != nil { return err } var last string for { batch, err := c.readNextDBBatch(ignoreErrors, last) if err != nil { return err } if len(batch) == 0 { break } for _, item := range batch { var obj objectSDK.Object if err := obj.Unmarshal(item.data); err != nil { c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) if ignoreErrors { continue } return err } if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { return err } c.deleteFromDB(item.address, false) } last = batch[len(batch)-1].address } return nil } type batchItem struct { data []byte address string } func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { const batchSize = 100 var batch []batchItem err := c.db.View(func(tx *bbolt.Tx) error { var addr oid.Address b := tx.Bucket(defaultBucket) cs := b.Cursor() for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { sa := string(k) if sa == last { continue } if err := addr.DecodeString(sa); err != nil { c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) if ignoreErrors { continue } return err } batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) if len(batch) == batchSize { return errIterationCompleted } } return nil }) if err == nil || errors.Is(err, errIterationCompleted) { return batch, nil } return nil, err }