forked from TrueCloudLab/frostfs-node
130 lines
3.1 KiB
Go
130 lines
3.1 KiB
Go
|
package writecachebadger
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
type cache struct {
|
||
|
options
|
||
|
|
||
|
mode mode.Mode
|
||
|
modeMtx sync.RWMutex
|
||
|
|
||
|
// flushCh is a channel with objects to flush.
|
||
|
flushCh chan *objectSDK.Object
|
||
|
// closeCh is close channel, protected by modeMtx.
|
||
|
closeCh chan struct{}
|
||
|
// wg is a wait group for flush workers.
|
||
|
wg sync.WaitGroup
|
||
|
// store contains underlying database.
|
||
|
store
|
||
|
}
|
||
|
|
||
|
// wcStorageType is used for write-cache operations logging.
|
||
|
const wcStorageType = "write-cache"
|
||
|
|
||
|
type objectInfo struct {
|
||
|
addr oid.Address
|
||
|
data []byte
|
||
|
obj *objectSDK.Object
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
defaultMaxObjectSize = 64 << 20 // 64 MiB
|
||
|
defaultSmallObjectSize = 32 << 10 // 32 KiB
|
||
|
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||
|
)
|
||
|
|
||
|
// New creates new writecache instance.
|
||
|
func New(opts ...Option) writecache.Cache {
|
||
|
c := &cache{
|
||
|
flushCh: make(chan *objectSDK.Object),
|
||
|
mode: mode.ReadWrite,
|
||
|
|
||
|
options: options{
|
||
|
log: &logger.Logger{Logger: zap.NewNop()},
|
||
|
maxObjectSize: defaultMaxObjectSize,
|
||
|
workersCount: defaultFlushWorkersCount,
|
||
|
maxCacheSize: defaultMaxCacheSize,
|
||
|
metrics: writecache.DefaultMetrics(),
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for i := range opts {
|
||
|
opts[i](&c.options)
|
||
|
}
|
||
|
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||
|
func (c *cache) SetLogger(l *logger.Logger) {
|
||
|
c.log = l
|
||
|
}
|
||
|
|
||
|
func (c *cache) DumpInfo() writecache.Info {
|
||
|
return writecache.Info{
|
||
|
Path: c.path,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||
|
func (c *cache) Open(readOnly bool) error {
|
||
|
err := c.openStore(readOnly)
|
||
|
if err != nil {
|
||
|
return metaerr.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// Opening after Close is done during maintenance mode,
|
||
|
// thus we need to create a channel here.
|
||
|
c.closeCh = make(chan struct{})
|
||
|
|
||
|
return metaerr.Wrap(c.initCounters())
|
||
|
}
|
||
|
|
||
|
// Init runs necessary services.
|
||
|
func (c *cache) Init() error {
|
||
|
c.log.Info(logs.WritecacheBadgerInitExperimental)
|
||
|
c.metrics.SetMode(c.mode)
|
||
|
c.runFlushLoop()
|
||
|
c.runGCLoop()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||
|
func (c *cache) Close() error {
|
||
|
// We cannot lock mutex for the whole operation duration
|
||
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||
|
c.modeMtx.Lock()
|
||
|
if c.closeCh != nil {
|
||
|
close(c.closeCh)
|
||
|
}
|
||
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||
|
c.modeMtx.Unlock()
|
||
|
|
||
|
c.wg.Wait()
|
||
|
|
||
|
c.modeMtx.Lock()
|
||
|
defer c.modeMtx.Unlock()
|
||
|
|
||
|
c.closeCh = nil
|
||
|
var err error
|
||
|
if c.db != nil {
|
||
|
err = c.db.Close()
|
||
|
if err != nil {
|
||
|
c.db = nil
|
||
|
}
|
||
|
}
|
||
|
c.metrics.Close()
|
||
|
return nil
|
||
|
}
|