forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
b142b6f48e
FSTree file counter used by writecache. As writecache has now only one storage, so it is required to use real object size to get writecache size more accurate than `count * max_object_size`. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
149 lines
3.8 KiB
Go
149 lines
3.8 KiB
Go
package writecache
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type cache struct {
|
|
options
|
|
|
|
mode mode.Mode
|
|
modeMtx sync.RWMutex
|
|
|
|
// flushCh is a channel with objects to flush.
|
|
flushCh chan objectInfo
|
|
// cancel is cancel function, protected by modeMtx in Close.
|
|
cancel atomic.Value
|
|
// wg is a wait group for flush workers.
|
|
wg sync.WaitGroup
|
|
// fsTree contains big files stored directly on file-system.
|
|
fsTree *fstree.FSTree
|
|
// counter contains atomic counters for the number of objects stored in cache.
|
|
counter *fstree.SimpleCounter
|
|
}
|
|
|
|
// wcStorageType is used for write-cache operations logging.
|
|
const wcStorageType = "write-cache"
|
|
|
|
type objectInfo struct {
|
|
addr string
|
|
data []byte
|
|
obj *objectSDK.Object
|
|
}
|
|
|
|
const (
|
|
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
|
|
defaultSmallObjectSize = 32 * 1024 // 32 KiB
|
|
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
|
)
|
|
|
|
var (
|
|
defaultBucket = []byte{0}
|
|
dummyCanceler context.CancelFunc = func() {}
|
|
)
|
|
|
|
// New creates new writecache instance.
|
|
func New(opts ...Option) Cache {
|
|
c := &cache{
|
|
flushCh: make(chan objectInfo),
|
|
mode: mode.Disabled,
|
|
counter: fstree.NewSimpleCounter(),
|
|
|
|
options: options{
|
|
log: &logger.Logger{Logger: zap.NewNop()},
|
|
maxObjectSize: defaultMaxObjectSize,
|
|
smallObjectSize: defaultSmallObjectSize,
|
|
workersCount: defaultFlushWorkersCount,
|
|
maxCacheSize: defaultMaxCacheSize,
|
|
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
|
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
|
metrics: DefaultMetrics(),
|
|
},
|
|
}
|
|
|
|
for i := range opts {
|
|
opts[i](&c.options)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
|
func (c *cache) SetLogger(l *logger.Logger) {
|
|
c.log = l
|
|
}
|
|
|
|
func (c *cache) DumpInfo() Info {
|
|
return Info{
|
|
Path: c.path,
|
|
}
|
|
}
|
|
|
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
|
func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
|
c.modeMtx.Lock()
|
|
defer c.modeMtx.Unlock()
|
|
c.mode = mod
|
|
if mod.NoMetabase() {
|
|
return nil
|
|
}
|
|
err := c.openStore(mode.ConvertToComponentModeDegraded(mod))
|
|
if err != nil {
|
|
return metaerr.Wrap(err)
|
|
}
|
|
return metaerr.Wrap(c.initCounters())
|
|
}
|
|
|
|
// Init runs necessary services.
|
|
func (c *cache) Init() error {
|
|
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
|
if err := c.flushAndDropBBoltDB(context.Background()); err != nil {
|
|
return fmt.Errorf("flush previous version write-cache database: %w", err)
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.cancel.Store(cancel)
|
|
c.runFlushLoop(ctx)
|
|
return nil
|
|
}
|
|
|
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
|
func (c *cache) Close() error {
|
|
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
|
|
cancelValue.(context.CancelFunc)()
|
|
}
|
|
// We cannot lock mutex for the whole operation duration
|
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
|
c.modeMtx.Lock()
|
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
|
c.modeMtx.Unlock()
|
|
|
|
c.wg.Wait()
|
|
|
|
c.modeMtx.Lock()
|
|
defer c.modeMtx.Unlock()
|
|
|
|
var err error
|
|
if c.fsTree != nil {
|
|
err = c.fsTree.Close()
|
|
if err != nil {
|
|
c.fsTree = nil
|
|
}
|
|
}
|
|
c.metrics.Close()
|
|
return nil
|
|
}
|
|
|
|
func (c *cache) GetMetrics() Metrics {
|
|
return c.metrics
|
|
}
|