Simplify writecache #377
9 changed files with 33 additions and 416 deletions
|
@ -37,12 +37,12 @@ const (
|
|||
func (c *cache) runFlushLoop() {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.flushWorker(i)
|
||||
go c.workerFlushSmall()
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
c.flushBigObjects(context.TODO())
|
||||
c.workerFlushBig(context.TODO())
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -56,7 +56,7 @@ func (c *cache) runFlushLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-tt.C:
|
||||
c.flushDB()
|
||||
c.flushSmallObjects()
|
||||
tt.Reset(defaultFlushInterval)
|
||||
case <-c.closeCh:
|
||||
return
|
||||
|
@ -65,7 +65,7 @@ func (c *cache) runFlushLoop() {
|
|||
}()
|
||||
}
|
||||
|
||||
func (c *cache) flushDB() {
|
||||
func (c *cache) flushSmallObjects() {
|
||||
var lastKey []byte
|
||||
var m []objectInfo
|
||||
for {
|
||||
|
@ -78,7 +78,7 @@ func (c *cache) flushDB() {
|
|||
m = m[:0]
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() || !c.initialized.Load() {
|
||||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
|
@ -117,10 +117,6 @@ func (c *cache) flushDB() {
|
|||
|
||||
var count int
|
||||
for i := range m {
|
||||
if c.flushed.Contains(m[i].addr) {
|
||||
continue
|
||||
}
|
||||
|
||||
obj := object.New()
|
||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||
continue
|
||||
|
@ -148,7 +144,7 @@ func (c *cache) flushDB() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *cache) flushBigObjects(ctx context.Context) {
|
||||
func (c *cache) workerFlushBig(ctx context.Context) {
|
||||
tick := time.NewTicker(defaultFlushInterval * 10)
|
||||
for {
|
||||
select {
|
||||
|
@ -157,9 +153,6 @@ func (c *cache) flushBigObjects(ctx context.Context) {
|
|||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
} else if !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
|
||||
_ = c.flushFSTree(ctx, true)
|
||||
|
@ -187,10 +180,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
c.reportFlushError("can't read a file", sAddr, err)
|
||||
|
@ -218,9 +207,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.flushed.Add(sAddr, false)
|
||||
|
||||
c.deleteFromDisk(ctx, []string{sAddr})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -228,8 +215,8 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// flushWorker writes objects to the main storage.
|
||||
func (c *cache) flushWorker(_ int) {
|
||||
// workerFlushSmall writes small objects to the main storage.
|
||||
func (c *cache) workerFlushSmall() {
|
||||
defer c.wg.Done()
|
||||
|
||||
var obj *object.Object
|
||||
|
@ -242,9 +229,12 @@ func (c *cache) flushWorker(_ int) {
|
|||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), obj, nil)
|
||||
if err == nil {
|
||||
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
}
|
||||
|
||||
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,10 +296,6 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
cs := b.Cursor()
|
||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||
sa := string(k)
|
||||
if _, ok := c.flushed.Peek(sa); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := addr.DecodeString(sa); err != nil {
|
||||
c.reportFlushError("can't decode object address from the DB", sa, err)
|
||||
if ignoreErrors {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -15,7 +14,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
|
|||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.Flush(context.Background(), false))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
_, err := mb.Get(context.Background(), mPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||
|
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
|
|||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
_, err := mb.Get(context.Background(), mPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
||||
t.Run("ignore errors", func(t *testing.T) {
|
||||
|
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("on init", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t)
|
||||
objects := []objectPair{
|
||||
// removed
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
// not found
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
// ok
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
}
|
||||
|
||||
require.NoError(t, wc.Close())
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
for i := range objects {
|
||||
var prm meta.PutPrm
|
||||
prm.SetObject(objects[i].obj)
|
||||
_, err := mb.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
|
||||
inhumePrm.SetTombstoneAddress(oidtest.Address())
|
||||
_, err := mb.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var deletePrm meta.DeletePrm
|
||||
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
|
||||
_, err = mb.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
|
||||
// Open in read-only: no error, nothing is removed.
|
||||
require.NoError(t, wc.Open(true))
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
require.NoError(t, wc.Close())
|
||||
|
||||
// Open in read-write: no error, something is removed.
|
||||
require.NoError(t, wc.Open(false))
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
if i < 2 {
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
||||
} else {
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func putObject(t *testing.T, c Cache, size int) objectPair {
|
||||
|
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
|||
|
||||
func initWC(t *testing.T, wc Cache) {
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
rawWc := wc.(*cache)
|
||||
return rawWc.initialized.Load()
|
||||
}, 100*time.Second, 1*time.Millisecond)
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
|
|
@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
value, err := Get(c.db, []byte(saddr))
|
||||
if err == nil {
|
||||
obj := objectSDK.New()
|
||||
c.flushed.Get(saddr)
|
||||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
|
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
c.flushed.Get(saddr)
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,192 +0,0 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (c *cache) initFlushMarks(ctx context.Context) {
|
||||
var localWG sync.WaitGroup
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.fsTreeFlushMarkUpdate(ctx)
|
||||
}()
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.dbFlushMarkUpdate(ctx)
|
||||
}()
|
||||
|
||||
c.initWG.Add(1)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer c.initWG.Done()
|
||||
|
||||
localWG.Wait()
|
||||
|
||||
select {
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
case <-c.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.initialized.Store(true)
|
||||
}()
|
||||
}
|
||||
|
||||
var errStopIter = errors.New("stop iteration")
|
||||
|
||||
func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return errStopIter
|
||||
case <-c.stopInitCh:
|
||||
return errStopIter
|
||||
default:
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
var prm common.DeletePrm
|
||||
prm.Address = addr
|
||||
|
||||
_, err := c.fsTree.Delete(ctx, prm)
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
|
||||
_, _ = c.fsTree.Iterate(prm)
|
||||
|
||||
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
|
||||
}
|
||||
|
||||
func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
|
||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
|
||||
var m []string
|
||||
var indices []int
|
||||
var lastKey []byte
|
||||
var batchSize = flushBatchSize
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
indices = indices[:0]
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
|
||||
m = append(m, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var addr oid.Address
|
||||
for i := range m {
|
||||
if err := addr.DecodeString(m[i]); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
indices = append(indices, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(m) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
for _, j := range indices {
|
||||
if err := b.Delete([]byte(m[j])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
for _, j := range indices {
|
||||
storagelog.Write(c.log,
|
||||
zap.String("address", m[j]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
lastKey = append([]byte(m[len(m)-1]), 0)
|
||||
}
|
||||
|
||||
c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
|
||||
}
|
||||
|
||||
// flushStatus returns info about the object state in the main storage.
|
||||
// First return value is true iff object exists.
|
||||
// Second return value is true iff object can be safely removed.
|
||||
func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(addr)
|
||||
|
||||
_, err := c.metabase.Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
|
||||
return needRemove, needRemove
|
||||
}
|
||||
|
||||
var prm meta.StorageIDPrm
|
||||
prm.SetAddress(addr)
|
||||
|
||||
mRes, _ := c.metabase.StorageID(ctx, prm)
|
||||
res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
|
||||
return err == nil && res.Exists, false
|
||||
}
|
|
@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
|||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
return b.ForEach(func(k, data []byte) error {
|
||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
||||
return nil
|
||||
}
|
||||
return prm.handler(data)
|
||||
})
|
||||
})
|
||||
|
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
|||
var fsPrm common.IteratePrm
|
||||
fsPrm.IgnoreErrors = prm.ignoreErrors
|
||||
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||
return nil
|
||||
}
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
|
|
|
@ -37,22 +37,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
|||
var err error
|
||||
turnOffMeta := m.NoMetabase()
|
||||
|
||||
if !c.initialized.Load() {
|
||||
close(c.stopInitCh)
|
||||
|
||||
c.initWG.Wait()
|
||||
c.stopInitCh = make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
if err == nil && !turnOffMeta {
|
||||
c.initFlushMarks(ctx)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
if turnOffMeta && !c.mode.NoMetabase() {
|
||||
err = c.flush(ctx, true)
|
||||
if err != nil {
|
||||
|
|
|
@ -37,8 +37,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return common.PutRes{}, ErrReadOnly
|
||||
} else if !c.initialized.Load() {
|
||||
return common.PutRes{}, ErrNotInitialized
|
||||
}
|
||||
|
||||
sz := uint64(len(prm.RawData))
|
||||
|
@ -78,7 +76,7 @@ func (c *cache) putSmall(obj objectInfo) error {
|
|||
)
|
||||
c.objCounters.IncDB()
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
|
|
|
@ -13,8 +13,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -22,19 +20,7 @@ import (
|
|||
// store represents persistent storage with in-memory LRU cache
|
||||
// for flushed items on top of it.
|
||||
type store struct {
|
||||
maxFlushedMarksCount int
|
||||
maxRemoveBatchSize int
|
||||
|
||||
// flushed contains addresses of objects that were already flushed to the main storage.
|
||||
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
|
||||
// frequently read ones.
|
||||
// MUST NOT be used inside bolt db transaction because it's eviction handler
|
||||
// removes untracked items from the database.
|
||||
flushed simplelru.LRUCache[string, bool]
|
||||
db *bbolt.DB
|
||||
|
||||
dbKeysToRemove []string
|
||||
fsKeysToRemove []string
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
const dbName = "small.bolt"
|
||||
|
@ -73,35 +59,9 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
}
|
||||
|
||||
// Write-cache can be opened multiple times during `SetMode`.
|
||||
// flushed map must not be re-created in this case.
|
||||
if c.flushed == nil {
|
||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFlushed removes an object from the writecache.
|
||||
// To minimize interference with the client operations, the actual removal
|
||||
// is done in batches.
|
||||
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
||||
func (c *cache) removeFlushed(key string, value bool) {
|
||||
fromDatabase := value
|
||||
if fromDatabase {
|
||||
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
||||
} else {
|
||||
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
|
||||
}
|
||||
|
||||
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
|
||||
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
|
||||
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDB(keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
|
@ -133,7 +93,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
|
|||
return keys[:len(keys)-errorIndex]
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDisk(keys []string) []string {
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
}
|
||||
|
@ -147,7 +107,7 @@ func (c *cache) deleteFromDisk(keys []string) []string {
|
|||
continue
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -13,7 +12,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -52,11 +50,8 @@ type cache struct {
|
|||
// mtx protects statistics, counters and compressFlags.
|
||||
mtx sync.RWMutex
|
||||
|
||||
mode mode.Mode
|
||||
initialized atomic.Bool
|
||||
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
||||
initWG sync.WaitGroup // for initialisation routines only
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
// whether object should be compressed.
|
||||
|
@ -96,9 +91,8 @@ var (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
stopInitCh: make(chan struct{}),
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
|
@ -117,12 +111,6 @@ func New(opts ...Option) Cache {
|
|||
opts[i](&c.options)
|
||||
}
|
||||
|
||||
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
|
||||
// Assume small and big objects are stored in 50-50 proportion.
|
||||
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
|
||||
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
|
||||
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -153,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
|
|||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init() error {
|
||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
||||
defer span.End()
|
||||
|
||||
c.initFlushMarks(ctx)
|
||||
c.runFlushLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
// Finish all in-progress operations.
|
||||
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
if c.closeCh != nil {
|
||||
close(c.closeCh)
|
||||
}
|
||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||
c.modeMtx.Unlock()
|
||||
|
||||
c.wg.Wait()
|
||||
if c.closeCh != nil {
|
||||
c.closeCh = nil
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
c.closeCh = nil
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
Loading…
Reference in a new issue