frostfs-node/pkg/local_object_storage/writecache/cache.go

142 lines
3.6 KiB
Go
Raw Normal View History

package writecache
import (
"context"
"fmt"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type cache struct {
options
mode mode.Mode
modeMtx sync.RWMutex
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
// cancel is cancel function, protected by modeMtx in Close.
cancel atomic.Value
// wg is a wait group for flush workers.
wg sync.WaitGroup
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
// counter contains atomic counters for the number of objects stored in cache.
counter *fstree.SimpleCounter
}
// wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache"
type objectInfo struct {
addr oid.Address
size uint64
}
const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
)
var dummyCanceler context.CancelFunc = func() {}
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan objectInfo),
mode: mode.Disabled,
counter: fstree.NewSimpleCounter(),
options: options{
log: logger.NewLoggerWrapper(zap.NewNop()),
maxObjectSize: defaultMaxObjectSize,
workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize,
metrics: DefaultMetrics(),
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
},
}
for i := range opts {
opts[i](&c.options)
}
return c
}
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (c *cache) SetLogger(l *logger.Logger) {
c.log = l
}
func (c *cache) DumpInfo() Info {
return Info{
Path: c.path,
}
}
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(_ context.Context, mod mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.mode = mod
if mod.NoMetabase() {
return nil
}
err := c.openStore(mode.ConvertToComponentModeDegraded(mod))
if err != nil {
return metaerr.Wrap(err)
}
return metaerr.Wrap(c.initCounters())
}
// Init runs necessary services.
func (c *cache) Init(ctx context.Context) error {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
if err := c.flushAndDropBBoltDB(ctx); err != nil {
return fmt.Errorf("flush previous version write-cache database: %w", err)
}
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) // canceling performed by cache
c.cancel.Store(cancel)
c.runFlushLoop(ctx)
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close(ctx context.Context) error {
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
cancelValue.(context.CancelFunc)()
}
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
var err error
if c.fsTree != nil {
err = c.fsTree.Close(ctx)
if err != nil {
c.fsTree = nil
}
}
c.metrics.Close()
return nil
}
func (c *cache) GetMetrics() Metrics {
return c.metrics
}