frostfs-node/pkg/local_object_storage/writecache/cachebbolt.go

153 lines
3.6 KiB
Go

package writecache
import (
"context"
"os"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type cache struct {
options
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mode mode.Mode
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
compressFlags map[string]struct{}
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
// cancel is cancel function, protected by modeMtx in Close.
cancel func()
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree
}
// wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache"
type objectInfo struct {
addr string
data []byte
obj *objectSDK.Object
}
const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB
defaultMaxCacheSize = 1 << 30 // 1 GiB
)
var defaultBucket = []byte{0}
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan objectInfo),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize,
workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
metrics: DefaultMetrics(),
},
}
for i := range opts {
opts[i](&c.options)
}
return c
}
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (c *cache) SetLogger(l *logger.Logger) {
c.log = l
}
func (c *cache) DumpInfo() Info {
return Info{
Path: c.path,
}
}
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(_ context.Context, mode mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.mode = mode
if mode.NoMetabase() {
return nil
}
err := c.openStore(mode.ReadOnly())
if err != nil {
return metaerr.Wrap(err)
}
return metaerr.Wrap(c.initCounters())
}
// Init runs necessary services.
func (c *cache) Init() error {
c.metrics.SetMode(c.mode)
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.runFlushLoop(ctx)
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
if c.cancel != nil {
c.cancel()
c.cancel = nil
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
var err error
if c.db != nil {
err = c.db.Close()
if err != nil {
c.db = nil
}
}
c.metrics.Close()
return nil
}
func (c *cache) GetMetrics() Metrics {
return c.metrics
}