package writecachebadger import ( "bytes" "context" "encoding/hex" "errors" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/ristretto/z" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( // flushBatchSize is amount of keys which will be read from cache to be flushed // to the main storage. It is used to reduce contention between cache put // and cache persist. flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. defaultFlushInterval = time.Second ) type collector struct { cache *cache scheduled int processed int cancel func() } func (c *collector) Send(buf *z.Buffer) error { list, err := badger.BufferToKVList(buf) if err != nil { return err } for _, kv := range list.Kv { select { case <-c.cache.closeCh: c.cancel() return nil default: } if kv.StreamDone { return nil } if c.scheduled >= flushBatchSize { c.cancel() return nil } if got, want := len(kv.Key), len(internalKey{}); got != want { c.cache.log.Debug( fmt.Sprintf("not expected db key len: got %d, want %d", got, want)) continue } c.processed++ obj := objectSDK.New() val := bytes.Clone(kv.Value) if err = obj.Unmarshal(val); err != nil { continue } addr := objectCore.AddressOf(obj) c.cache.scheduled4FlushMtx.RLock() _, ok := c.cache.scheduled4Flush[addr] c.cache.scheduled4FlushMtx.RUnlock() if ok { c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr)) continue } c.cache.scheduled4FlushMtx.Lock() c.cache.scheduled4Flush[addr] = struct{}{} c.cache.scheduled4FlushMtx.Unlock() c.scheduled++ select { case c.cache.flushCh <- obj: case <-c.cache.closeCh: c.cancel() return nil } } return nil } // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall() } c.wg.Add(1) go func() { defer c.wg.Done() tt := time.NewTimer(defaultFlushInterval) defer tt.Stop() for { select { case <-tt.C: c.flushSmallObjects() tt.Reset(defaultFlushInterval) case <-c.closeCh: return } } }() } func (c *cache) flushSmallObjects() { for { select { case <-c.closeCh: return default: } c.modeMtx.RLock() if c.readOnly() { c.modeMtx.RUnlock() time.Sleep(time.Second) continue } // Using the db after Close will panic and badger won't wait for outstanding txs, // so we need to check manually. if c.db.IsClosed() { c.modeMtx.RUnlock() return } ctx, cancel := context.WithCancel(context.TODO()) coll := collector{ cache: c, cancel: cancel, } stream := c.db.NewStream() // All calls to Send are done by a single goroutine stream.Send = coll.Send if err := stream.Orchestrate(ctx); err != nil { c.log.Debug(fmt.Sprintf( "error during flushing object from wc: %s", err)) } c.modeMtx.RUnlock() if coll.scheduled == 0 { break } c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed)) } } func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) } else { c.log.Error(msg, zap.String("address", addr), zap.Error(err)) } } // workerFlushSmall writes small objects to the main storage. func (c *cache) workerFlushSmall() { defer c.wg.Done() var obj *objectSDK.Object for { // Give priority to direct put. select { case obj = <-c.flushCh: case <-c.closeCh: return } addr := objectCore.AddressOf(obj) err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) if err == nil { c.deleteFromDB([]internalKey{addr2key(addr)}) } c.scheduled4FlushMtx.Lock() delete(c.scheduled4Flush, addr) c.scheduled4FlushMtx.Unlock() } } // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { var err error defer func() { c.metrics.Flush(err == nil, st) }() addr := objectCore.AddressOf(obj) var prm common.PutPrm prm.Object = obj prm.RawData = data res, err := c.blobstor.Put(ctx, prm) if err != nil { if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && !errors.Is(err, blobstor.ErrNoPlaceFound) { c.reportFlushError(logs.FrostFSNodeCantFlushObjectToBlobstor, addr.EncodeToString(), err) } return err } var updPrm meta.UpdateStorageIDPrm updPrm.SetAddress(addr) updPrm.SetStorageID(res.StorageID) _, err = c.metabase.UpdateStorageID(updPrm) if err != nil { c.reportFlushError(logs.FrostFSNodeCantUpdateObjectStorageID, addr.EncodeToString(), err) } return err } // Flush flushes all objects from the write-cache to the main storage. // Write-cache must be in readonly mode to ensure correctness of an operation and // to prevent interference with background flush workers. func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), )) defer span.End() c.modeMtx.RLock() defer c.modeMtx.RUnlock() return c.flush(ctx, ignoreErrors) } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.db.View(func(tx *badger.Txn) error { it := tx.NewIterator(badger.DefaultIteratorOptions) defer it.Close() var key internalKey for it.Rewind(); it.Valid(); it.Next() { if got, want := int(it.Item().KeySize()), len(key); got != want { err := fmt.Errorf("invalid db key len: got %d, want %d", got, want) c.reportFlushError(logs.FrostFSNodeCantDecodeObjectAddressFromDB, hex.EncodeToString(it.Item().Key()), metaerr.Wrap(err)) if ignoreErrors { continue } return err } if err := it.Item().Value(func(data []byte) error { var obj objectSDK.Object if err := obj.Unmarshal(data); err != nil { copy(key[:], it.Item().Key()) c.reportFlushError(logs.FrostFSNodeCantUnmarshalObjectFromDB, key.address().EncodeToString(), metaerr.Wrap(err)) if ignoreErrors { return nil } return err } return c.flushObject(ctx, &obj, data, writecache.StorageTypeDB) }); err != nil { return err } } return nil }) }