forked from TrueCloudLab/frostfs-node
[#1367] writecache: Drop bbolt DB
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
66e17f4b8e
commit
5f6c7cbdb1
12 changed files with 82 additions and 415 deletions
146
pkg/local_object_storage/writecache/cache.go
Normal file
146
pkg/local_object_storage/writecache/cache.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cache struct {
|
||||
options
|
||||
|
||||
mode mode.Mode
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan objectInfo
|
||||
// cancel is cancel function, protected by modeMtx in Close.
|
||||
cancel atomic.Value
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// fsTree contains big files stored directly on file-system.
|
||||
fsTree *fstree.FSTree
|
||||
}
|
||||
|
||||
// wcStorageType is used for write-cache operations logging.
|
||||
const wcStorageType = "write-cache"
|
||||
|
||||
type objectInfo struct {
|
||||
addr string
|
||||
data []byte
|
||||
obj *objectSDK.Object
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
|
||||
defaultSmallObjectSize = 32 * 1024 // 32 KiB
|
||||
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||||
)
|
||||
|
||||
var (
|
||||
defaultBucket = []byte{0}
|
||||
dummyCanceler context.CancelFunc = func() {}
|
||||
)
|
||||
|
||||
// New creates new writecache instance.
|
||||
func New(opts ...Option) Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan objectInfo),
|
||||
mode: mode.Disabled,
|
||||
|
||||
options: options{
|
||||
log: &logger.Logger{Logger: zap.NewNop()},
|
||||
maxObjectSize: defaultMaxObjectSize,
|
||||
smallObjectSize: defaultSmallObjectSize,
|
||||
workersCount: defaultFlushWorkersCount,
|
||||
maxCacheSize: defaultMaxCacheSize,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
metrics: DefaultMetrics(),
|
||||
},
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](&c.options)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||||
func (c *cache) SetLogger(l *logger.Logger) {
|
||||
c.log = l
|
||||
}
|
||||
|
||||
func (c *cache) DumpInfo() Info {
|
||||
return Info{
|
||||
Path: c.path,
|
||||
}
|
||||
}
|
||||
|
||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||
func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
c.mode = mod
|
||||
if mod.NoMetabase() {
|
||||
return nil
|
||||
}
|
||||
err := c.openStore(mode.ConvertToComponentModeDegraded(mod))
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
return metaerr.Wrap(c.initCounters())
|
||||
}
|
||||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init() error {
|
||||
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
||||
if err := c.flushAndDropBBoltDB(context.Background()); err != nil {
|
||||
return fmt.Errorf("flush previous version write-cache database: %w", err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.cancel.Store(cancel)
|
||||
c.runFlushLoop(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
|
||||
cancelValue.(context.CancelFunc)()
|
||||
}
|
||||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||
c.modeMtx.Unlock()
|
||||
|
||||
c.wg.Wait()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
var err error
|
||||
if c.fsTree != nil {
|
||||
err = c.fsTree.Close()
|
||||
if err != nil {
|
||||
c.fsTree = nil
|
||||
}
|
||||
}
|
||||
c.metrics.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) GetMetrics() Metrics {
|
||||
return c.metrics
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue