frostfs-node/pkg/local_object_storage/writecache/cache.go
Dmitrii Stepanov b142b6f48e [#1367] fstree: Add size to file counter
FSTree file counter used by writecache. As writecache has now only one
storage, so it is required to use real object size to get writecache
size more accurate than `count * max_object_size`.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00

149 lines
3.8 KiB
Go

package writecache
import (
"context"
"fmt"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type cache struct {
options
mode mode.Mode
modeMtx sync.RWMutex
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
// cancel is cancel function, protected by modeMtx in Close.
cancel atomic.Value
// wg is a wait group for flush workers.
wg sync.WaitGroup
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
// counter contains atomic counters for the number of objects stored in cache.
counter *fstree.SimpleCounter
}
// wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache"
type objectInfo struct {
addr string
data []byte
obj *objectSDK.Object
}
const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
)
var (
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan objectInfo),
mode: mode.Disabled,
counter: fstree.NewSimpleCounter(),
options: options{
log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize,
workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
metrics: DefaultMetrics(),
},
}
for i := range opts {
opts[i](&c.options)
}
return c
}
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (c *cache) SetLogger(l *logger.Logger) {
c.log = l
}
func (c *cache) DumpInfo() Info {
return Info{
Path: c.path,
}
}
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(_ context.Context, mod mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.mode = mod
if mod.NoMetabase() {
return nil
}
err := c.openStore(mode.ConvertToComponentModeDegraded(mod))
if err != nil {
return metaerr.Wrap(err)
}
return metaerr.Wrap(c.initCounters())
}
// Init runs necessary services.
func (c *cache) Init() error {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
if err := c.flushAndDropBBoltDB(context.Background()); err != nil {
return fmt.Errorf("flush previous version write-cache database: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
c.cancel.Store(cancel)
c.runFlushLoop(ctx)
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
cancelValue.(context.CancelFunc)()
}
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
var err error
if c.fsTree != nil {
err = c.fsTree.Close()
if err != nil {
c.fsTree = nil
}
}
c.metrics.Close()
return nil
}
func (c *cache) GetMetrics() Metrics {
return c.metrics
}