package writecachebadger import ( "encoding/binary" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type cache struct { options mode mode.Mode modeMtx sync.RWMutex // flushCh is a channel with objects to flush. flushCh chan *objectSDK.Object // closeCh is close channel, protected by modeMtx. closeCh chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup // initCounterWG is necessary to wait for asynchronous initialization. initCounterWG sync.WaitGroup // store contains underlying database. store } // wcStorageType is used for write-cache operations logging. const wcStorageType = "write-cache" type objectInfo struct { addr oid.Address data []byte obj *objectSDK.Object } const ( defaultMaxObjectSize = 64 << 20 // 64 MiB defaultSmallObjectSize = 32 << 10 // 32 KiB defaultMaxCacheSize = 1 << 30 // 1 GiB ) // New creates new writecache instance. func New(opts ...Option) writecache.Cache { c := &cache{ flushCh: make(chan *objectSDK.Object), mode: mode.ReadWrite, options: options{ log: &logger.Logger{Logger: zap.NewNop()}, maxObjectSize: defaultMaxObjectSize, workersCount: defaultFlushWorkersCount, maxCacheSize: defaultMaxCacheSize, metrics: writecache.DefaultMetrics(), }, } for i := range opts { opts[i](&c.options) } return c } // SetLogger sets logger. It is used after the shard ID was generated to use it in logs. func (c *cache) SetLogger(l *logger.Logger) { c.log = l } func (c *cache) DumpInfo() writecache.Info { return writecache.Info{ Path: c.path, } } // Open opens and initializes database. Reads object counters from the ObjectCounters instance. func (c *cache) Open(readOnly bool) error { err := c.openStore(readOnly) if err != nil { return metaerr.Wrap(err) } // Opening after Close is done during maintenance mode, // thus we need to create a channel here. c.closeCh = make(chan struct{}) return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init() error { c.log.Info(logs.WritecacheBadgerInitExperimental) c.metrics.SetMode(c.mode) c.runFlushLoop() c.runGCLoop() return nil } // flushKeysCount writes the keys count to db if it should be flushed down. func (c *cache) flushKeysCount() { // Close may happen after resetting mode if c.db.IsClosed() { return } if c.objCounters.isReadyToFlush() { k := keyCountPrefix v := make([]byte, 8) binary.LittleEndian.PutUint64(v, c.objCounters.DB()) if err := c.putRaw(k[:], v); err != nil { c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err)) } } } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { c.cancelInitCountByCloseAndWait() c.flushKeysCount() // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() if c.closeCh != nil { close(c.closeCh) } c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.modeMtx.Unlock() c.wg.Wait() c.modeMtx.Lock() defer c.modeMtx.Unlock() c.closeCh = nil var err error if c.db != nil { err = c.db.Close() if err != nil { c.db = nil } } c.metrics.Close() return nil } // cancelInitCountByCloseAndWait cancels the counting process if Close is invoked // and waits for correct finish of goroutine within initCount. func (c *cache) cancelInitCountByCloseAndWait() { c.objCounters.cDB.cancelByClose() c.initCounterWG.Wait() }