Compare commits
2 commits
master
...
feature/64
Author | SHA1 | Date | |
---|---|---|---|
0f4633d65d | |||
0b7fd293f1 |
5 changed files with 49 additions and 65 deletions
|
@ -26,12 +26,12 @@ type cache struct {
|
|||
// helps to avoid multiple flushing of one object
|
||||
scheduled4Flush map[oid.Address]struct{}
|
||||
scheduled4FlushMtx sync.RWMutex
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
store
|
||||
// cancel is cancel function, protected by modeMtx in Close.
|
||||
cancel func()
|
||||
}
|
||||
|
||||
// wcStorageType is used for write-cache operations logging.
|
||||
|
@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
|||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// Opening after Close is done during maintenance mode,
|
||||
// thus we need to create a channel here.
|
||||
c.closeCh = make(chan struct{})
|
||||
|
||||
return metaerr.Wrap(c.initCounters())
|
||||
}
|
||||
|
||||
|
@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
|||
func (c *cache) Init() error {
|
||||
c.log.Info(logs.WritecacheBadgerInitExperimental)
|
||||
c.metrics.SetMode(c.mode)
|
||||
c.runFlushLoop()
|
||||
c.runGCLoop()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.cancel = cancel
|
||||
c.runFlushLoop(ctx)
|
||||
c.runGCLoop(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -111,8 +108,9 @@ func (c *cache) Close() error {
|
|||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
if c.closeCh != nil {
|
||||
close(c.closeCh)
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
}
|
||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||
c.modeMtx.Unlock()
|
||||
|
@ -122,7 +120,6 @@ func (c *cache) Close() error {
|
|||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
c.closeCh = nil
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
|
@ -39,18 +39,16 @@ type collector struct {
|
|||
cache *cache
|
||||
scheduled int
|
||||
processed int
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (c *collector) Send(buf *z.Buffer) error {
|
||||
func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error {
|
||||
list, err := badger.BufferToKVList(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, kv := range list.Kv {
|
||||
select {
|
||||
case <-c.cache.closeCh:
|
||||
c.cancel()
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error {
|
|||
return nil
|
||||
}
|
||||
if c.scheduled >= flushBatchSize {
|
||||
c.cancel()
|
||||
cancel()
|
||||
return nil
|
||||
}
|
||||
if got, want := len(kv.Key), len(internalKey{}); got != want {
|
||||
|
@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error {
|
|||
data: val,
|
||||
obj: obj,
|
||||
}:
|
||||
case <-c.cache.closeCh:
|
||||
c.cancel()
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error {
|
|||
}
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop() {
|
||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.workerFlushSmall()
|
||||
go c.workerFlushSmall(ctx)
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
|
@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-tt.C:
|
||||
c.flushSmallObjects()
|
||||
c.flushSmallObjects(ctx)
|
||||
tt.Reset(defaultFlushInterval)
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *cache) flushSmallObjects() {
|
||||
func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() {
|
|||
c.modeMtx.RUnlock()
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
coll := collector{
|
||||
cache: c,
|
||||
cancel: cancel,
|
||||
cache: c,
|
||||
}
|
||||
stream := c.db.NewStream()
|
||||
// All calls to Send are done by a single goroutine
|
||||
stream.Send = coll.Send
|
||||
stream.Send = func(buf *z.Buffer) error {
|
||||
return coll.send(ctx, cancel, buf)
|
||||
}
|
||||
if err := stream.Orchestrate(ctx); err != nil {
|
||||
c.log.Debug(fmt.Sprintf(
|
||||
"error during flushing object from wc: %s", err))
|
||||
|
@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
|||
}
|
||||
|
||||
// workerFlushSmall writes small objects to the main storage.
|
||||
func (c *cache) workerFlushSmall() {
|
||||
func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||
defer c.wg.Done()
|
||||
|
||||
var objInfo objectInfo
|
||||
|
@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() {
|
|||
// Give priority to direct put.
|
||||
select {
|
||||
case objInfo = <-c.flushCh:
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
if err == nil {
|
||||
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
|
||||
}
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
)
|
||||
|
||||
func (c *cache) runGCLoop() {
|
||||
func (c *cache) runGCLoop(ctx context.Context) {
|
||||
c.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
|
@ -17,7 +18,7 @@ func (c *cache) runGCLoop() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
// This serves to synchronize the c.db field when changing mode as well.
|
||||
|
|
|
@ -30,8 +30,8 @@ type cache struct {
|
|||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan objectInfo
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
// cancel is cancel function, protected by modeMtx in Close.
|
||||
cancel func()
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
|
@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
|||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// Opening after Close is done during maintenance mode,
|
||||
// thus we need to create a channel here.
|
||||
c.closeCh = make(chan struct{})
|
||||
|
||||
return metaerr.Wrap(c.initCounters())
|
||||
}
|
||||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init() error {
|
||||
c.metrics.SetMode(c.mode)
|
||||
c.runFlushLoop()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.cancel = cancel
|
||||
c.runFlushLoop(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -123,8 +121,9 @@ func (c *cache) Close() error {
|
|||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
if c.closeCh != nil {
|
||||
close(c.closeCh)
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
}
|
||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||
c.modeMtx.Unlock()
|
||||
|
@ -134,7 +133,6 @@ func (c *cache) Close() error {
|
|||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
c.closeCh = nil
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
|
@ -36,20 +36,10 @@ const (
|
|||
)
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ch := c.closeCh
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
<-ch
|
||||
cancel()
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.workerFlushSmall()
|
||||
go c.workerFlushSmall(ctx)
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
|
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-tt.C:
|
||||
c.flushSmallObjects()
|
||||
c.flushSmallObjects(ctx)
|
||||
tt.Reset(defaultFlushInterval)
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *cache) flushSmallObjects() {
|
||||
func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||
var lastKey []byte
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
|
|||
count++
|
||||
select {
|
||||
case c.flushCh <- m[i]:
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
c.modeMtx.RUnlock()
|
||||
return
|
||||
}
|
||||
|
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
|
|||
_ = c.flushFSTree(ctx, true)
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
}
|
||||
|
||||
// workerFlushSmall writes small objects to the main storage.
|
||||
func (c *cache) workerFlushSmall() {
|
||||
func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||
defer c.wg.Done()
|
||||
|
||||
var objInfo objectInfo
|
||||
|
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
|
|||
// Give priority to direct put.
|
||||
select {
|
||||
case objInfo = <-c.flushCh:
|
||||
case <-c.closeCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
|
|
Loading…
Reference in a new issue