package writecachebadger import ( "bytes" "context" "encoding/hex" "errors" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/dgraph-io/badger/v4" "github.com/mr-tron/base58" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( // flushBatchSize is amount of keys which will be read from cache to be flushed // to the main storage. It is used to reduce contention between cache put // and cache persist. flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. defaultFlushInterval = time.Second ) // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall() } c.wg.Add(1) go func() { defer c.wg.Done() tt := time.NewTimer(defaultFlushInterval) defer tt.Stop() for { select { case <-tt.C: c.flushSmallObjects() tt.Reset(defaultFlushInterval) case <-c.closeCh: return } } }() } func (c *cache) flushSmallObjects() { var lastKey internalKey var m []objectInfo for { select { case <-c.closeCh: return default: } m = m[:0] c.modeMtx.RLock() if c.readOnly() { c.modeMtx.RUnlock() time.Sleep(time.Second) continue } _ = c.db.View(func(tx *badger.Txn) error { it := tx.NewIterator(badger.DefaultIteratorOptions) defer it.Close() if len(lastKey) == 0 { it.Rewind() } else { it.Seek(lastKey[:]) if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) { it.Next() } } for ; it.Valid() && len(m) < flushBatchSize; it.Next() { if got, want := int(it.Item().KeySize()), len(lastKey); got != want { return fmt.Errorf("invalid db key len: got %d, want %d", got, want) } it.Item().KeyCopy(lastKey[:]) value, err := it.Item().ValueCopy(nil) if err != nil { return err } m = append(m, objectInfo{ addr: lastKey.address(), data: value, }) } return nil }) var count int for i := range m { obj := objectSDK.New() if err := obj.Unmarshal(m[i].data); err != nil { continue } count++ select { case c.flushCh <- obj: case <-c.closeCh: c.modeMtx.RUnlock() return } } if count == 0 { c.modeMtx.RUnlock() break } c.modeMtx.RUnlock() c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Int("count", count), zap.String("start", base58.Encode(lastKey[:]))) } } func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) } else { c.log.Error(msg, zap.String("address", addr), zap.Error(err)) } } // workerFlushSmall writes small objects to the main storage. func (c *cache) workerFlushSmall() { defer c.wg.Done() var obj *objectSDK.Object for { // Give priority to direct put. select { case obj = <-c.flushCh: case <-c.closeCh: return } err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) if err != nil { // Error is handled in flushObject. continue } c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()}) } } // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { var err error defer func() { c.metrics.Flush(err == nil, st) }() addr := objectCore.AddressOf(obj) var prm common.PutPrm prm.Object = obj prm.RawData = data res, err := c.blobstor.Put(ctx, prm) if err != nil { if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && !errors.Is(err, blobstor.ErrNoPlaceFound) { c.reportFlushError("can't flush an object to blobstor", addr.EncodeToString(), err) } return err } var updPrm meta.UpdateStorageIDPrm updPrm.SetAddress(addr) updPrm.SetStorageID(res.StorageID) _, err = c.metabase.UpdateStorageID(updPrm) if err != nil { c.reportFlushError("can't update object storage ID", addr.EncodeToString(), err) } return err } // Flush flushes all objects from the write-cache to the main storage. // Write-cache must be in readonly mode to ensure correctness of an operation and // to prevent interference with background flush workers. func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), )) defer span.End() c.modeMtx.RLock() defer c.modeMtx.RUnlock() return c.flush(ctx, ignoreErrors) } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.db.View(func(tx *badger.Txn) error { it := tx.NewIterator(badger.DefaultIteratorOptions) defer it.Close() var key internalKey for it.Rewind(); it.Valid(); it.Next() { if got, want := int(it.Item().KeySize()), len(key); got != want { err := fmt.Errorf("invalid db key len: got %d, want %d", got, want) c.reportFlushError("can't decode object address from the DB", hex.EncodeToString(it.Item().Key()), metaerr.Wrap(err)) if ignoreErrors { continue } return err } if err := it.Item().Value(func(data []byte) error { var obj objectSDK.Object if err := obj.Unmarshal(data); err != nil { copy(key[:], it.Item().Key()) c.reportFlushError("can't unmarshal an object from the DB", key.address().EncodeToString(), metaerr.Wrap(err)) if ignoreErrors { return nil } return err } return c.flushObject(ctx, &obj, data, writecache.StorageTypeDB) }); err != nil { return err } } return nil }) }